李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
09.Table API和Flink SQL之表的时间特性
Leefs
2022-02-28 PM
1566℃
0条
### 一、概述 基于时间的操作(比如 Table API 和 SQL 中窗口操作),需要定义相关的时间语义和时间数据来源的信息。所以,**Table可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳**。 时间属性,可以是每个表 schema 的一部分。一旦定义了时间属性,它就可以作为一个字段引用,并且可以在基于时间的操作中使用。 时间属性的行为类似于常规时间戳,可以访问,并且进行计算。 ### 二、处理时间(Processing Time) 处理时间语义下,允许表处理程序根据机器的本地时间生成结果。它是时间的最简单概念。它既不需要提取时间戳,也不需要生成 watermark。 **定义处理时间属性有三种方法:** + 在 DataStream 转化时直接指定; + 在定义 Table Schema 时指定; + 在创建表的 DDL 中指定。 **(1)DataStream 转化成 Table 时指定** 由 DataStream 转换成表时,可以在后面指定字段名来定义 Schema。在定义 Schema 期间,可以使用**.proctime**,定义处理时间字段。 注意,这个 proctime 属性只能通过附加逻辑字段,来扩展物理 schema。因此,只能在 schema 定义的末尾定义它。 ```java // 定义好 DataStream val inputStream: DataStream[String] = env.readTextFile("\\sensor.txt") val dataStream: DataStream[SensorReading] = inputStream .map(data => { val dataArray = data.split(",") SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble) }) // 将 DataStream 转换为 Table,并指定时间字段 val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp, 'pt.proctime) ``` **(2)定义 Table Schema 时指定** 这种方法其实也很简单,只要在定义 Schema 的时候,加上一个新的字段,并指定成 proctime 就可以了。 **代码如下:** ```java // 2. 连接外部系统,读取数据 tableEnv .connect(new FileSystem().path("datas/sensor.txt")) //定义表数据来源,外部连接 .withFormat(new Csv()) //定义从外部系统读取数据之后的格式化方法 .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temperature",DataTypes.DOUBLE()) .field("pt",DataTypes.TIMESTAMP(3)).proctime() //指定 pt 字段为处理时间 ) //定义表结构 .createTemporaryTable("inputTable") ``` 注意:此种方式在输出时很可能会报错。 **(3)创建表的 DDL 中指定** 在创建表的 DDL 中,增加一个字段并指定成 proctime,也可以指定当前的时间字段。 **代码如下:** ```java val sinkDDL: String = """ |create table dataTable ( | id varchar(20) not null, | ts bigint, | temperature double, | pt AS PROCTIME() |) with ( | 'connector.type' = 'filesystem', | 'connector.path' = 'file:///D:\\..\\sensor.txt', | 'format.type' = 'csv' |) """.stripMargin tableEnv.sqlUpdate(sinkDDL) // 执行 DDL ``` 注意:运行这段 DDL,必须使用 `Blink Planner`。 ### 三、事件时间(Event Time) 事件时间语义,允许表处理程序根据每个记录中包含的时间生成结果。这样即使在有乱序事件或者延迟事件时,也可以获得正确的结果。 为了处理无序事件,并区分流中的准时和迟到事件;Flink 需要从事件数据中,提取时间戳,并用来推进事件时间的进展(watermark)。 **(1) DataStream 转化成 Table 时指定** 在 DataStream 转换成 Table,schema 的定义期间,使用**.rowtime** 可以定义事件时间属性。 注意,必须在转换的数据流中分配时间戳和watermark。 在将数据流转换为表时,有两种定义时间属性的方法。根据指定的.rowtime 字段名是否存在于数据流的架构中,timestamp 字段可以: + 作为新字段追加到 schema + 替换现有字段 在这两种情况下,定义的事件时间戳字段,都将保存 DataStream 中事件时间戳的值。 ```java val inputStream: DataStream[String] = env.readTextFile("datas/sensor.txt") val dataStream: DataStream[SensorReading] = inputStream.map(data => { val dataArray = data.split(",") SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble) }) .assignAscendingTimestamps(_.timestamp * 1000L) // 将 DataStream 转换为 Table,并指定时间字段 val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'timestamp.rowtime, 'temperature) // 或者,直接追加字段 val sensorTable2 = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp, 'rt.rowtime) ``` **(2) 定义 Table Schema 时指定** 这种方法只要在定义 Schema 的时候,将事件时间字段,并指定成 rowtime 就可以了。 **代码如下:** ```java tableEnv.connect( new FileSystem().path("sensor.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .rowtime( new Rowtime() .timestampsFromField("timestamp") // 从字段中提取时间戳 .watermarksPeriodicBounded(1000) ) // watermark 延迟 1 秒 .field("temperature", DataTypes.DOUBLE()) )// 定义表结构 .createTemporaryTable("inputTable") // 创建临时表 ``` **(3)创建表的 DDL 中指定** 事件时间属性,是使用 `CREATE TABLE DDL` 中的 `WARDMARK` 语句定义的。watermark 语 句,定义现有事件时间字段上的 watermark 生成表达式,该表达式将事件时间字段标记为事 件时间属性。 **代码如下:** ```java val sinkDDL: String = """ |create table dataTable ( | id varchar(20) not null, | ts bigint, | temperature double, | rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ), | watermark for rt as rt - interval '1' second |) with ( | 'connector.type' = 'filesystem', | 'connector.path' = 'file:///D:\\..\\sensor.txt', | 'format.type' = 'csv' |) """.stripMargin tableEnv.sqlUpdate(sinkDDL) // 执行 DDL ``` 这里 `FROM_UNIXTIME` 是系统内置的时间函数,用来将一个整数(秒数)转换成 “`YYYY-MM-DD hh:mm:ss`”格式(默认,也可以作为第二个 String 参数传入)的日期时间字符串(date time string);然后再用 TO_TIMESTAMP 将其转换成 Timestamp。 *附参考原文:* *《尚硅谷大数据之flink教程》*
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1921.html
上一篇
08.Table API和Flink SQL动态表和持续查询
下一篇
01.FastDFS简介
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
6
标签云
MyBatisX
栈
Spark Core
Java工具类
二叉树
哈希表
随笔
LeetCode刷题
Quartz
JavaWeb
Spark Streaming
Shiro
Stream流
并发编程
gorm
队列
Http
设计模式
Hadoop
Thymeleaf
nginx
Map
GET和POST
Spark RDD
Elasticsearch
MyBatis-Plus
链表
CentOS
HDFS
Beego
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭