李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
05.将DataStream转换成表
Leefs
2022-02-23 PM
826℃
0条
[TOC] ### 一、概述 Flink 允许我们把 Table 和 DataStream 做转换:我们可以基于一个 DataStream,先流式地读取数据源,然后 map 成样例类,再把它转成 Table。Table 的列字段(column fields),就是样例类里的字段,这样就不用再麻烦地定义 schema 了。 ### 二、代码表达 代码中实现非常简单,直接用 `tableEnv.fromDataStream()`就可以了。 ```scala val dataStream: DataStream[SensorReading] = ... val sensorTable: Table = tableEnv.fromDataStream(dataStream) ``` 默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来。 ```scala val dataStream: DataStream[SensorReading] = ... val sensorTable = tableEnv.fromDataStream(dataStream,'id, 'timestamp, 'temperature) ``` 这就允许我们更换字段的顺序、重命名,或者只选取某些字段出来,相当于做了一次 map 操作(或者 Table API的select 操作)。 **完整代码** ```scala import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.Table import org.apache.flink.table.api.scala._ /** * @author lilinchao * @date 2022/2/23 * @description 1.0 **/ object DataStreamToTable { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) val inputStream: DataStream[String] = env.readTextFile("./datas/sensor.txt") val datastream: DataStream[SensorReading] = inputStream.map(data => { val dataArray: Array[String] = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }) val sensorTable: Table = tableEnv.fromDataStream(datastream) val sensorTable2: Table = tableEnv.fromDataStream(datastream, 'id, 'temperature as 'te) sensorTable.toAppendStream[(String,Long,Double)].print("sensorTable") sensorTable2.toAppendStream[(String,Double)].print("ST2") env.execute("DataStreamToTable") } } ``` ### 三、数据类型与Table schema的对应 DataStream 中的数据类型,与表的 Schema 之间的对应关系,是按照样例类中的字段名来对应的(name-based mapping),所以还可以用 as 做重命名。 另外一种对应方式是,直接按照字段的位置来对应(position-based mapping),对应的过程中,就可以直接指定新的字段名了。 **基于名称的对应:** ```scala val sensorTable = tableEnv.fromDataStream(dataStream, 'timestamp as 'ts, 'id as 'myId, 'temperature) ``` **基于位置的对应:** ```scala val sensorTable = tableEnv.fromDataStream(dataStream, 'myId, 'ts) ``` Flink 的 DataStream 和 DataSet API 支持多种类型。 组合类型,比如元组(内置 Scala 和 Java 元组)、POJO、Scala case 类和 Flink 的 Row 类 型等,允许具有多个字段的嵌套数据结构,这些字段可以在 Table 的表达式中访问。其他类型,则被视为原子类型。 元组类型和原子类型,一般用位置对应会好一些;如果非要用名称对应,也是可以的: 元组类型,默认的名称是 “_1”, “_2”;而原子类型,默认名称是 ”f0”。 *附文章来源:* *《尚硅谷大数据之flink教程》*
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1907.html
上一篇
04.Table API和Flink SQL表的查询
下一篇
06.Table API输出表
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
43
标签云
JavaWeb
Elastisearch
MySQL
SpringBoot
Elasticsearch
GET和POST
Golang
Hbase
Netty
SpringCloud
二叉树
Thymeleaf
Spark Core
RSA加解密
Java工具类
Jenkins
队列
Yarn
线程池
DataWarehouse
JVM
Beego
Git
Python
ClickHouse
JavaScript
Eclipse
Kafka
并发线程
DataX
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞