李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
06.Table API输出表
Leefs
2022-02-24 PM
1712℃
0条
[TOC] ### 一、概述 表的输出,是通过将数据写入 **TableSink** 来实现的。**TableSink** 是一个通用接口,可以支持不同的文件格式、存储数据库和消息队列。 具体实现,输出表最直接的方法,就是通过 `Table.insertInto()` 方法将一个 Table 写入注册过的 TableSink中。 ### 二、输出到文件 ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.{DataTypes, Table} import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} import org.apache.flink.types.Row /** * @author lilinchao * @date 2022/2/24 * @description 从文件中读数据,输出到文件 **/ object FileOutputDemo { def main(args: Array[String]): Unit = { //1.创建环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) //2. 连接外部系统,读取数据,注册表 val filePath = "datas/sensor.txt" tableEnv.connect(new FileSystem().path(filePath)) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temp",DataTypes.DOUBLE()) ) .createTemporaryTable("inputTable") //3.转换操作 val sensorTable: Table = tableEnv.from("inputTable") //3.1简单转换操作 val resultTable: Table = sensorTable .select('id, 'temp) .filter('id !== "sensor_1") //3.2 聚合转换 val aggTable = sensorTable .groupBy('id) //基于id分组 .select('id, 'id.count as 'count) //4.输出到文件(核心代码) val outputPath = "datas/sensor_out_agg.txt" tableEnv.connect(new FileSystem().path(outputPath)) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) // .field("temperature",DataTypes.DOUBLE()) .field("cnt",DataTypes.BIGINT()) ) .createTemporaryTable("outputTable") // resultTable.insertInto("outputTable") // resultTable.toAppendStream[(String, Double)].print("result") aggTable.toRetractStream[Row].print("agg") env.execute() } } ``` ### 三、更新模式(Update Mode) + 在流处理过程中,表的处理并不像传统定义的那样简单。 + 对于流式查询(Streaming Queries),需要声明如何在(动态)表和外部连接器之间执行转换。与外部系统交换的消息类型,由**更新模式(update mode)**指定。 **Flink Table API中的更新模式有以下三种:** + **追加模式(Append Mode)** 在追加模式下,表(动态表)和外部连接器只交换插入(Insert)消息。 + **撤回模式(Retract Mode)** 在撤回模式下,表和外部连接器交换的是:**添加(Add)和撤回(Retract)消息**。 + 插入(Insert)会被编码为添加消息; + 删除(Delete)则编码为撤回消息; + 更新(Update)则会编码为,已更新行(上一行)的撤回消息,和更新行(新行) 的添加消息。 在此模式下,不能定义 key,这一点跟 upsert 模式完全不同。 + **Upsert(更新插入)模式** 在 Upsert 模式下,动态表和外部连接器交换 Upsert 和 Delete 消息。 这个模式需要一个唯一的 key,通过这个 key 可以传递更新消息。为了正确应用消息, 外部连接器需要知道这个唯一 key 的属性。 + 插入(Insert)和更新(Update)都被编码为 Upsert 消息; + 删除(Delete)编码为 Delete 信息 这种模式和 Retract 模式的主要区别在于,Update 操作是用单个消息编码的,所以效率 会更高。 ### 四、输出到Kafka 除了输出到文件,也可以输出到Kafka。我们可以结合前面Kafka作为输入数据,构建数据管道,kafka进,kafka出。 ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, Kafka, Schema} /** * @author lilinchao * @date 2022/2/24 * @description 从Kafka接收数据,输出到Kafka **/ object KafkaPipelineDemo { def main(args: Array[String]): Unit = { // 1. 创建环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv = StreamTableEnvironment.create(env) //2.从Kafka读取数据 tableEnv.connect(new Kafka() .version("0.11") .topic("sensor") .property("zookeeper.connect","localhost:2181") .property("bootstrap.servers", "localhost:9092") ) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temperature",DataTypes.DOUBLE()) ) .createTemporaryTable("kafkaInputTable") //3.查询转换 //3.1 简单转换 val sensorTable = tableEnv.from("kafkaInputTable") val resultTable = sensorTable .select('id,'temperature) .filter('id === "sensor_1") //3.2 聚合转换 val aggTable = sensorTable .groupBy('id) //基于id分组 .select('id,'id.count as 'count) //4.输出到kafka tableEnv.connect(new Kafka() .version("0.11") .topic("sinktest") .property("zookeeper.connect","localhost:2181") .property("bootstrap.servers","localhost:9092") ) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("temp",DataTypes.DOUBLE()) ) .createTemporaryTable("kafkaOutputTable") resultTable.insertInto("kafkaOutputTable") resultTable.toAppendStream[(String, Double)].print("result") env.execute("KafkaPipelineDemo") } } ``` ### 五、输出到ElasticSearch ElasticSearch 的 connector 可以在 upsert(update+insert,更新插入)模式下操作,这样就可以使用 Query 定义的键(key)与外部系统交换 **UPSERT/DELETE** 消息。 另外,对于“仅追加”(append-only)的查询,connector 还可以在 append 模式下操作,这样就可以与外部系统只交换 insert 消息。 es 目前支持的数据格式,只有 Json,而 flink 本身并没有对应的支持。 **引入依赖** ```xml
org.apache.flink
flink-json
1.10.1
org.apache.flink
flink-connector-elasticsearch7_2.12
1.10.1
``` **代码** ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors._ /** * @author lilinchao * @date 2022/2/24 * @description 从文件中读入数据,输出到ES **/ object EsOutputDemo { def main(args: Array[String]): Unit = { //1.创建环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) //2. 连接外部系统,读取数据,注册表 val filePath = "datas/sensor.txt" tableEnv.connect(new FileSystem().path(filePath)) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temp",DataTypes.DOUBLE()) ) .createTemporaryTable("inputTable") //3.转换操作 val sensorTable = tableEnv.from("inputTable") //3.1简单转换操作 val resultTable = sensorTable .select('id, 'temp) .filter('id !== "sensor_1") //3.2 聚合转换 val aggTable = sensorTable .groupBy('id) //基于id分组 .select('id, 'id.count as 'count) //4.输出到ES tableEnv.connect(new Elasticsearch() .version("7") .host("localhost", 9200, "http") .index("sensor") .documentType("temperature") ) .inUpsertMode() .withFormat(new Json()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("count", DataTypes.BIGINT()) ) .createTemporaryTable("esOutputTable") aggTable.insertInto("esOutputTable") // resultTable.insertInto("esOutputTable") // resultTable.toAppendStream[(String, Double)].print("result") env.execute("EsOutputDemo") } } ``` ### 六、输出到MySQL Flink 专门为 Table API 的 jdbc 连接提供了 flink-jdbc 连接器,我们需要先引入依赖: ```xml
org.apache.flink
flink-jdbc_2.12
1.10.1
``` jdbc 连接的代码实现比较特殊,因为没有对应的 java/scala 类实现 ConnectorDescriptor, 所以不能直接 tableEnv.connect()。 不过 Flink SQL留下了执行 DDL的接口:**tableEnv.sqlUpdate()**。 对于 jdbc 的创建表操作,天生就适合直接写 DDL 来实现,所以我们的代码可以这样写: ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} /** * @author lilinchao * @date 2022/2/24 * @description 从文件读取,输出到MySQL **/ object MysqlOutputDemo { def main(args: Array[String]): Unit = { //1.创建环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) //2. 连接外部系统,读取数据,注册表 val filePath = "datas/sensor.txt" tableEnv.connect(new FileSystem().path(filePath)) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temp",DataTypes.DOUBLE()) ) .createTemporaryTable("inputTable") //3.转换操作 val sensorTable = tableEnv.from("inputTable") //3.1 聚合转换 val aggTable = sensorTable .groupBy('id) //基于id分组 .select('id, 'id.count as 'count) // 4. 输出到 Mysql val sinkDDL: String = """ |create table jdbcOutputTable ( | id varchar(20) not null, | cnt bigint not null |) with ( | 'connector.type' = 'jdbc', | 'connector.url' = 'jdbc:mysql://localhost:3306/sensor', | 'connector.table' = 'sensor_count', | 'connector.driver' = 'com.mysql.jdbc.Driver', | 'connector.username' = 'root', | 'connector.password' = '123456' |) """.stripMargin tableEnv.sqlUpdate(sinkDDL) aggTable.insertInto("jdbcOutputTable") env.execute("MysqlOutputDemo") } } ``` *附参考文章来源:* *《尚硅谷大数据之Flink》*
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1908.html
上一篇
05.将DataStream转换成表
下一篇
07.将表转换成DataStream
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
JavaSE
Shiro
Golang基础
nginx
正则表达式
Flink
Ubuntu
VUE
Java编程思想
Hadoop
Tomcat
数据结构和算法
Spring
Http
Livy
Kibana
国产数据库改造
Spark SQL
锁
Yarn
随笔
Docker
Quartz
Azkaban
Elasticsearch
LeetCode刷题
数学
栈
设计模式
MyBatis-Plus
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭