李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
09.Table API和Flink SQL之表的时间特性
Leefs
2022-02-28 PM
1495℃
0条
### 一、概述 基于时间的操作(比如 Table API 和 SQL 中窗口操作),需要定义相关的时间语义和时间数据来源的信息。所以,**Table可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳**。 时间属性,可以是每个表 schema 的一部分。一旦定义了时间属性,它就可以作为一个字段引用,并且可以在基于时间的操作中使用。 时间属性的行为类似于常规时间戳,可以访问,并且进行计算。 ### 二、处理时间(Processing Time) 处理时间语义下,允许表处理程序根据机器的本地时间生成结果。它是时间的最简单概念。它既不需要提取时间戳,也不需要生成 watermark。 **定义处理时间属性有三种方法:** + 在 DataStream 转化时直接指定; + 在定义 Table Schema 时指定; + 在创建表的 DDL 中指定。 **(1)DataStream 转化成 Table 时指定** 由 DataStream 转换成表时,可以在后面指定字段名来定义 Schema。在定义 Schema 期间,可以使用**.proctime**,定义处理时间字段。 注意,这个 proctime 属性只能通过附加逻辑字段,来扩展物理 schema。因此,只能在 schema 定义的末尾定义它。 ```java // 定义好 DataStream val inputStream: DataStream[String] = env.readTextFile("\\sensor.txt") val dataStream: DataStream[SensorReading] = inputStream .map(data => { val dataArray = data.split(",") SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble) }) // 将 DataStream 转换为 Table,并指定时间字段 val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp, 'pt.proctime) ``` **(2)定义 Table Schema 时指定** 这种方法其实也很简单,只要在定义 Schema 的时候,加上一个新的字段,并指定成 proctime 就可以了。 **代码如下:** ```java // 2. 连接外部系统,读取数据 tableEnv .connect(new FileSystem().path("datas/sensor.txt")) //定义表数据来源,外部连接 .withFormat(new Csv()) //定义从外部系统读取数据之后的格式化方法 .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temperature",DataTypes.DOUBLE()) .field("pt",DataTypes.TIMESTAMP(3)).proctime() //指定 pt 字段为处理时间 ) //定义表结构 .createTemporaryTable("inputTable") ``` 注意:此种方式在输出时很可能会报错。 **(3)创建表的 DDL 中指定** 在创建表的 DDL 中,增加一个字段并指定成 proctime,也可以指定当前的时间字段。 **代码如下:** ```java val sinkDDL: String = """ |create table dataTable ( | id varchar(20) not null, | ts bigint, | temperature double, | pt AS PROCTIME() |) with ( | 'connector.type' = 'filesystem', | 'connector.path' = 'file:///D:\\..\\sensor.txt', | 'format.type' = 'csv' |) """.stripMargin tableEnv.sqlUpdate(sinkDDL) // 执行 DDL ``` 注意:运行这段 DDL,必须使用 `Blink Planner`。 ### 三、事件时间(Event Time) 事件时间语义,允许表处理程序根据每个记录中包含的时间生成结果。这样即使在有乱序事件或者延迟事件时,也可以获得正确的结果。 为了处理无序事件,并区分流中的准时和迟到事件;Flink 需要从事件数据中,提取时间戳,并用来推进事件时间的进展(watermark)。 **(1) DataStream 转化成 Table 时指定** 在 DataStream 转换成 Table,schema 的定义期间,使用**.rowtime** 可以定义事件时间属性。 注意,必须在转换的数据流中分配时间戳和watermark。 在将数据流转换为表时,有两种定义时间属性的方法。根据指定的.rowtime 字段名是否存在于数据流的架构中,timestamp 字段可以: + 作为新字段追加到 schema + 替换现有字段 在这两种情况下,定义的事件时间戳字段,都将保存 DataStream 中事件时间戳的值。 ```java val inputStream: DataStream[String] = env.readTextFile("datas/sensor.txt") val dataStream: DataStream[SensorReading] = inputStream.map(data => { val dataArray = data.split(",") SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble) }) .assignAscendingTimestamps(_.timestamp * 1000L) // 将 DataStream 转换为 Table,并指定时间字段 val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'timestamp.rowtime, 'temperature) // 或者,直接追加字段 val sensorTable2 = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp, 'rt.rowtime) ``` **(2) 定义 Table Schema 时指定** 这种方法只要在定义 Schema 的时候,将事件时间字段,并指定成 rowtime 就可以了。 **代码如下:** ```java tableEnv.connect( new FileSystem().path("sensor.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .rowtime( new Rowtime() .timestampsFromField("timestamp") // 从字段中提取时间戳 .watermarksPeriodicBounded(1000) ) // watermark 延迟 1 秒 .field("temperature", DataTypes.DOUBLE()) )// 定义表结构 .createTemporaryTable("inputTable") // 创建临时表 ``` **(3)创建表的 DDL 中指定** 事件时间属性,是使用 `CREATE TABLE DDL` 中的 `WARDMARK` 语句定义的。watermark 语 句,定义现有事件时间字段上的 watermark 生成表达式,该表达式将事件时间字段标记为事 件时间属性。 **代码如下:** ```java val sinkDDL: String = """ |create table dataTable ( | id varchar(20) not null, | ts bigint, | temperature double, | rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ), | watermark for rt as rt - interval '1' second |) with ( | 'connector.type' = 'filesystem', | 'connector.path' = 'file:///D:\\..\\sensor.txt', | 'format.type' = 'csv' |) """.stripMargin tableEnv.sqlUpdate(sinkDDL) // 执行 DDL ``` 这里 `FROM_UNIXTIME` 是系统内置的时间函数,用来将一个整数(秒数)转换成 “`YYYY-MM-DD hh:mm:ss`”格式(默认,也可以作为第二个 String 参数传入)的日期时间字符串(date time string);然后再用 TO_TIMESTAMP 将其转换成 Timestamp。 *附参考原文:* *《尚硅谷大数据之flink教程》*
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1921.html
上一篇
08.Table API和Flink SQL动态表和持续查询
下一篇
01.FastDFS简介
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Spark Core
递归
数学
Python
Sentinel
SQL练习题
链表
Nacos
序列化和反序列化
Jquery
线程池
Spark RDD
Stream流
Hadoop
CentOS
Hive
Java
ClickHouse
Typora
SpringBoot
gorm
BurpSuite
RSA加解密
Linux
MyBatisX
Beego
Yarn
字符串
国产数据库改造
Java编程思想
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭