李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
05.将DataStream转换成表
Leefs
2022-02-23 PM
1334℃
0条
[TOC] ### 一、概述 Flink 允许我们把 Table 和 DataStream 做转换:我们可以基于一个 DataStream,先流式地读取数据源,然后 map 成样例类,再把它转成 Table。Table 的列字段(column fields),就是样例类里的字段,这样就不用再麻烦地定义 schema 了。 ### 二、代码表达 代码中实现非常简单,直接用 `tableEnv.fromDataStream()`就可以了。 ```scala val dataStream: DataStream[SensorReading] = ... val sensorTable: Table = tableEnv.fromDataStream(dataStream) ``` 默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来。 ```scala val dataStream: DataStream[SensorReading] = ... val sensorTable = tableEnv.fromDataStream(dataStream,'id, 'timestamp, 'temperature) ``` 这就允许我们更换字段的顺序、重命名,或者只选取某些字段出来,相当于做了一次 map 操作(或者 Table API的select 操作)。 **完整代码** ```scala import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.Table import org.apache.flink.table.api.scala._ /** * @author lilinchao * @date 2022/2/23 * @description 1.0 **/ object DataStreamToTable { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) val inputStream: DataStream[String] = env.readTextFile("./datas/sensor.txt") val datastream: DataStream[SensorReading] = inputStream.map(data => { val dataArray: Array[String] = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }) val sensorTable: Table = tableEnv.fromDataStream(datastream) val sensorTable2: Table = tableEnv.fromDataStream(datastream, 'id, 'temperature as 'te) sensorTable.toAppendStream[(String,Long,Double)].print("sensorTable") sensorTable2.toAppendStream[(String,Double)].print("ST2") env.execute("DataStreamToTable") } } ``` ### 三、数据类型与Table schema的对应 DataStream 中的数据类型,与表的 Schema 之间的对应关系,是按照样例类中的字段名来对应的(name-based mapping),所以还可以用 as 做重命名。 另外一种对应方式是,直接按照字段的位置来对应(position-based mapping),对应的过程中,就可以直接指定新的字段名了。 **基于名称的对应:** ```scala val sensorTable = tableEnv.fromDataStream(dataStream, 'timestamp as 'ts, 'id as 'myId, 'temperature) ``` **基于位置的对应:** ```scala val sensorTable = tableEnv.fromDataStream(dataStream, 'myId, 'ts) ``` Flink 的 DataStream 和 DataSet API 支持多种类型。 组合类型,比如元组(内置 Scala 和 Java 元组)、POJO、Scala case 类和 Flink 的 Row 类 型等,允许具有多个字段的嵌套数据结构,这些字段可以在 Table 的表达式中访问。其他类型,则被视为原子类型。 元组类型和原子类型,一般用位置对应会好一些;如果非要用名称对应,也是可以的: 元组类型,默认的名称是 “_1”, “_2”;而原子类型,默认名称是 ”f0”。 *附文章来源:* *《尚硅谷大数据之flink教程》*
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1907.html
上一篇
04.Table API和Flink SQL表的查询
下一篇
06.Table API输出表
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
算法
Spark SQL
MyBatisX
Thymeleaf
线程池
Python
Spark RDD
Ubuntu
Spring
Nacos
ajax
稀疏数组
锁
CentOS
Linux
MyBatis-Plus
HDFS
Docker
Filter
Azkaban
链表
微服务
机器学习
Map
Typora
并发编程
pytorch
Flink
Netty
gorm
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭