李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
06.Table API输出表
Leefs
2022-02-24 PM
1069℃
0条
[TOC] ### 一、概述 表的输出,是通过将数据写入 **TableSink** 来实现的。**TableSink** 是一个通用接口,可以支持不同的文件格式、存储数据库和消息队列。 具体实现,输出表最直接的方法,就是通过 `Table.insertInto()` 方法将一个 Table 写入注册过的 TableSink中。 ### 二、输出到文件 ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.{DataTypes, Table} import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} import org.apache.flink.types.Row /** * @author lilinchao * @date 2022/2/24 * @description 从文件中读数据,输出到文件 **/ object FileOutputDemo { def main(args: Array[String]): Unit = { //1.创建环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) //2. 连接外部系统,读取数据,注册表 val filePath = "datas/sensor.txt" tableEnv.connect(new FileSystem().path(filePath)) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temp",DataTypes.DOUBLE()) ) .createTemporaryTable("inputTable") //3.转换操作 val sensorTable: Table = tableEnv.from("inputTable") //3.1简单转换操作 val resultTable: Table = sensorTable .select('id, 'temp) .filter('id !== "sensor_1") //3.2 聚合转换 val aggTable = sensorTable .groupBy('id) //基于id分组 .select('id, 'id.count as 'count) //4.输出到文件(核心代码) val outputPath = "datas/sensor_out_agg.txt" tableEnv.connect(new FileSystem().path(outputPath)) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) // .field("temperature",DataTypes.DOUBLE()) .field("cnt",DataTypes.BIGINT()) ) .createTemporaryTable("outputTable") // resultTable.insertInto("outputTable") // resultTable.toAppendStream[(String, Double)].print("result") aggTable.toRetractStream[Row].print("agg") env.execute() } } ``` ### 三、更新模式(Update Mode) + 在流处理过程中,表的处理并不像传统定义的那样简单。 + 对于流式查询(Streaming Queries),需要声明如何在(动态)表和外部连接器之间执行转换。与外部系统交换的消息类型,由**更新模式(update mode)**指定。 **Flink Table API中的更新模式有以下三种:** + **追加模式(Append Mode)** 在追加模式下,表(动态表)和外部连接器只交换插入(Insert)消息。 + **撤回模式(Retract Mode)** 在撤回模式下,表和外部连接器交换的是:**添加(Add)和撤回(Retract)消息**。 + 插入(Insert)会被编码为添加消息; + 删除(Delete)则编码为撤回消息; + 更新(Update)则会编码为,已更新行(上一行)的撤回消息,和更新行(新行) 的添加消息。 在此模式下,不能定义 key,这一点跟 upsert 模式完全不同。 + **Upsert(更新插入)模式** 在 Upsert 模式下,动态表和外部连接器交换 Upsert 和 Delete 消息。 这个模式需要一个唯一的 key,通过这个 key 可以传递更新消息。为了正确应用消息, 外部连接器需要知道这个唯一 key 的属性。 + 插入(Insert)和更新(Update)都被编码为 Upsert 消息; + 删除(Delete)编码为 Delete 信息 这种模式和 Retract 模式的主要区别在于,Update 操作是用单个消息编码的,所以效率 会更高。 ### 四、输出到Kafka 除了输出到文件,也可以输出到Kafka。我们可以结合前面Kafka作为输入数据,构建数据管道,kafka进,kafka出。 ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, Kafka, Schema} /** * @author lilinchao * @date 2022/2/24 * @description 从Kafka接收数据,输出到Kafka **/ object KafkaPipelineDemo { def main(args: Array[String]): Unit = { // 1. 创建环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv = StreamTableEnvironment.create(env) //2.从Kafka读取数据 tableEnv.connect(new Kafka() .version("0.11") .topic("sensor") .property("zookeeper.connect","localhost:2181") .property("bootstrap.servers", "localhost:9092") ) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temperature",DataTypes.DOUBLE()) ) .createTemporaryTable("kafkaInputTable") //3.查询转换 //3.1 简单转换 val sensorTable = tableEnv.from("kafkaInputTable") val resultTable = sensorTable .select('id,'temperature) .filter('id === "sensor_1") //3.2 聚合转换 val aggTable = sensorTable .groupBy('id) //基于id分组 .select('id,'id.count as 'count) //4.输出到kafka tableEnv.connect(new Kafka() .version("0.11") .topic("sinktest") .property("zookeeper.connect","localhost:2181") .property("bootstrap.servers","localhost:9092") ) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("temp",DataTypes.DOUBLE()) ) .createTemporaryTable("kafkaOutputTable") resultTable.insertInto("kafkaOutputTable") resultTable.toAppendStream[(String, Double)].print("result") env.execute("KafkaPipelineDemo") } } ``` ### 五、输出到ElasticSearch ElasticSearch 的 connector 可以在 upsert(update+insert,更新插入)模式下操作,这样就可以使用 Query 定义的键(key)与外部系统交换 **UPSERT/DELETE** 消息。 另外,对于“仅追加”(append-only)的查询,connector 还可以在 append 模式下操作,这样就可以与外部系统只交换 insert 消息。 es 目前支持的数据格式,只有 Json,而 flink 本身并没有对应的支持。 **引入依赖** ```xml
org.apache.flink
flink-json
1.10.1
org.apache.flink
flink-connector-elasticsearch7_2.12
1.10.1
``` **代码** ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors._ /** * @author lilinchao * @date 2022/2/24 * @description 从文件中读入数据,输出到ES **/ object EsOutputDemo { def main(args: Array[String]): Unit = { //1.创建环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) //2. 连接外部系统,读取数据,注册表 val filePath = "datas/sensor.txt" tableEnv.connect(new FileSystem().path(filePath)) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temp",DataTypes.DOUBLE()) ) .createTemporaryTable("inputTable") //3.转换操作 val sensorTable = tableEnv.from("inputTable") //3.1简单转换操作 val resultTable = sensorTable .select('id, 'temp) .filter('id !== "sensor_1") //3.2 聚合转换 val aggTable = sensorTable .groupBy('id) //基于id分组 .select('id, 'id.count as 'count) //4.输出到ES tableEnv.connect(new Elasticsearch() .version("7") .host("localhost", 9200, "http") .index("sensor") .documentType("temperature") ) .inUpsertMode() .withFormat(new Json()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("count", DataTypes.BIGINT()) ) .createTemporaryTable("esOutputTable") aggTable.insertInto("esOutputTable") // resultTable.insertInto("esOutputTable") // resultTable.toAppendStream[(String, Double)].print("result") env.execute("EsOutputDemo") } } ``` ### 六、输出到MySQL Flink 专门为 Table API 的 jdbc 连接提供了 flink-jdbc 连接器,我们需要先引入依赖: ```xml
org.apache.flink
flink-jdbc_2.12
1.10.1
``` jdbc 连接的代码实现比较特殊,因为没有对应的 java/scala 类实现 ConnectorDescriptor, 所以不能直接 tableEnv.connect()。 不过 Flink SQL留下了执行 DDL的接口:**tableEnv.sqlUpdate()**。 对于 jdbc 的创建表操作,天生就适合直接写 DDL 来实现,所以我们的代码可以这样写: ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} /** * @author lilinchao * @date 2022/2/24 * @description 从文件读取,输出到MySQL **/ object MysqlOutputDemo { def main(args: Array[String]): Unit = { //1.创建环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) //2. 连接外部系统,读取数据,注册表 val filePath = "datas/sensor.txt" tableEnv.connect(new FileSystem().path(filePath)) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temp",DataTypes.DOUBLE()) ) .createTemporaryTable("inputTable") //3.转换操作 val sensorTable = tableEnv.from("inputTable") //3.1 聚合转换 val aggTable = sensorTable .groupBy('id) //基于id分组 .select('id, 'id.count as 'count) // 4. 输出到 Mysql val sinkDDL: String = """ |create table jdbcOutputTable ( | id varchar(20) not null, | cnt bigint not null |) with ( | 'connector.type' = 'jdbc', | 'connector.url' = 'jdbc:mysql://localhost:3306/sensor', | 'connector.table' = 'sensor_count', | 'connector.driver' = 'com.mysql.jdbc.Driver', | 'connector.username' = 'root', | 'connector.password' = '123456' |) """.stripMargin tableEnv.sqlUpdate(sinkDDL) aggTable.insertInto("jdbcOutputTable") env.execute("MysqlOutputDemo") } } ``` *附参考文章来源:* *《尚硅谷大数据之Flink》*
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1908.html
上一篇
05.将DataStream转换成表
下一篇
07.将表转换成DataStream
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Azkaban
排序
SpringCloudAlibaba
DataX
FileBeat
Jquery
Python
CentOS
散列
JavaWEB项目搭建
Spark Streaming
Golang
微服务
机器学习
稀疏数组
数据结构和算法
DataWarehouse
Java编程思想
二叉树
线程池
Golang基础
Elastisearch
NIO
Git
随笔
JVM
SpringBoot
gorm
Redis
Yarn
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞