李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
10.Table API和Flink SQL之窗口操作
Leefs
2022-03-03 PM
1791℃
0条
[TOC] ### 前言 时间语义,要配合窗口操作才能发挥作用。最主要的用途,当然就是开窗口、根据时间段做计算了。下面我们就来看看 Table API 和 SQL 中,怎么利用时间字段做窗口操作。 在 Table API 和 SQL 中,主要有两种窗口:**Group Windows 和 Over Windows** **数据准备** + **sensor.txt** ```basic sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9 ``` ### 一、分组窗口(Group Windows) 分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。 Table API 中的 Group Windows 都是使用`.window(w:GroupWindow)`子句定义的,并且必须由 as 子句指定一个别名。为了按窗口对表进行分组,窗口的别名必须在 group by 子句中,像常规的分组字段一样引用。 ```java val table = input .window([w: GroupWindow] as 'w) // 定义窗口,别名 w .groupBy('w, 'a) // 以属性 a 和窗口 w 作为分组的 key .select('a, 'b.sum) // 聚合字段 b 的值,求和 ``` 或者,还可以把窗口的相关信息,作为字段添加到结果表中: ```java val table = input .window([w: GroupWindow] as 'w) .groupBy('w, 'a) .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) ``` Table API 提供了一组具有特定语义的预定义 Window 类,这些类会被转换为底层 DataStream 或 DataSet 的窗口操作。 Table API 支持的窗口定义,和我们熟悉的一样,主要也是三种:**滚动(Tumbling)、滑 动(Sliding)和会话(Session)**。 #### 1.1 滚动窗口 滚动窗口(Tumbling windows)要用 Tumble 类来定义,另外还有三个方法: + **over**:定义窗口长度 + **on**:用来分组(按时间间隔)或者排序(按行数)的时间字段 + **as**:别名,必须出现在后面的 groupBy 中 **代码如下:** ```java // Tumbling Event-time Window(事件时间字段 rowtime) .window(Tumble over 10.minutes on 'rowtime as 'w) // Tumbling Processing-time Window(处理时间字段 proctime) .window(Tumble over 10.minutes on 'proctime as 'w) // Tumbling Row-count Window (类似于计数窗口,按处理时间排序,10 行一组) .window(Tumble over 10.rows on 'proctime as 'w) ``` **案例** > 需求:设置`滚动窗口为10秒钟`统计`id`出现的次数。 **代码** ```java import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.{EnvironmentSettings, Table, Tumble} import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row /** * @author lilinchao * @date 2022/3/2 * @description 设置滚动窗口 **/ object FlinkSQLTumBlingTie { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create(env,settings) //读取数据 val inputStream = env.readTextFile("datas/sensor.txt") // 先转换成样例类类型(简单转换操作) val dataStream = inputStream.map(data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000L }) val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts) //注册表 tableEnv.createTemporaryView("sensor",sensorTable) //Table API 实现 val resultTable = sensorTable .window(Tumble over 10.seconds on 'ts as 'tw) //每10秒统计一次,滚动时间窗口 .groupBy('id, 'tw) .select('id, 'id.count, 'tw.end) //sql实现 val sqlTable = tableEnv.sqlQuery( """ |select | id, | count(id), | tumble_end(ts,interval '10' second) |from sensor |group by | id, | tumble(ts,interval '10' second) """.stripMargin ) resultTable.toAppendStream[Row].print("table") sqlTable.toRetractStream[Row].print("sqlTable") env.execute("FlinkSQLTumBlingTie") } case class SensorReading(id: String, timestamp: Long, temperature: Double) } ``` #### 1.2 滑动窗口 滑动窗口(Sliding windows)要用 Slide 类来定义,另外还有四个方法: + **over**:定义窗口长度 + **every**:定义滑动步长 + **on**:用来分组(按时间间隔)或者排序(按行数)的时间字段 + **as**:别名,必须出现在后面的 groupBy 中 代码如下: ```java // Sliding Event-time Window .window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w) // Sliding Processing-time window .window(Slide over 10.minutes every 5.minutes on 'proctime as 'w) // Sliding Row-count window .window(Slide over 10.rows every 5.rows on 'proctime as 'w) ``` **案例** > 需求: 设置`窗口大小为10秒钟设置滑动距离为5秒钟`,统计`id`的出现的次数。 **代码实现** ```java import com.lilinchao.flink.sqldemo.TumbleWindowExample.SensorReading import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.{EnvironmentSettings, Slide} import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row /** * @author lilinchao * @date 2022/3/2 * @description 滑动窗口 **/ object FlinkSQLSlideTime { def main(args: Array[String]): Unit = { //构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 设置分区为1 方便后面测试 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件时间 val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() //创建表 env val tableEnv = StreamTableEnvironment.create(env,settings) //读取数据 val inputStream = env.readTextFile("datas/sensor.txt") // 先转换成样例类类型(简单转换操作) val dataStream = inputStream.map(data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000L }) val sensorTable = tableEnv.fromDataStream(dataStream,'id,'temperature,'timestamp.rowtime as 'ts) //注册表 tableEnv.createTemporaryView("sensor",sensorTable) //table API实现 val tableApi = sensorTable.window(Slide over 10.seconds every 5.seconds on 'ts as 'w) .groupBy('w, 'id) .select('id, 'id.count, 'w.end) val tableSQL = tableEnv.sqlQuery( """ |select | id, | count(id), | HOP_END(ts,INTERVAL '10' SECOND,INTERVAL '5' SECOND) as w |from sensor |group by | HOP(ts,INTERVAL '10' SECOND,INTERVAL '5' SECOND), | id """.stripMargin) tableApi.toAppendStream[Row].print("tableApi====") tableSQL.toAppendStream[Row].print("result") env.execute("FlinkSQLSlideTime") } } ``` #### 1.3 会话窗口 会话窗口(Session windows)要用 Session 类来定义,另外还有三个方法: + **withGap**:会话时间间隔 + **on**:用来分组(按时间间隔)或者排序(按行数)的时间字段 + **as**:别名,必须出现在后面的 groupBy 中 **代码如下:** ```java // Session Event-time Window .window(Session withGap 10.minutes on 'rowtime as 'w) // Session Processing-time Window .window(Session withGap 10.minutes on 'proctime as 'w) ``` **案例** > 需求:设置一个`session 为10秒钟` 统计`id`的个数 **代码** ```java import com.lilinchao.flink.sqldemo.TumbleWindowExample.SensorReading import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.{EnvironmentSettings, Session, Table} import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row /** * @author lilinchao * @date 2022/3/2 * @description 会话窗口 **/ object FlinkSqlSessionTime { def main(args: Array[String]): Unit = { //构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 设置分区为1 方便后面测试 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件时间 val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() //创建表env val tableEnv = StreamTableEnvironment.create(env,settings) //读取数据 val inputStream = env.readTextFile("datas/sensor.txt") // 先转换成样例类类型(简单转换操作) val dataStream = inputStream.map(data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000L }) val sensorTable = tableEnv.fromDataStream(dataStream,'id,'temperature,'timestamp.rowtime as 'ts) //注册表 tableEnv.createTemporaryView("sensor",sensorTable) val tableApi: Table = sensorTable .window(Session withGap 10.seconds on 'ts as 'w) .groupBy('id, 'w) .select('id, 'id.count, 'w.end) val tableSQL = tableEnv.sqlQuery( """ |SELECT | id, | count(id), | SESSION_END(ts,INTERVAL '10' SECOND) as w |FROM sensor |GROUP BY | id, | SESSION(ts,INTERVAL '10' SECOND) """.stripMargin) tableApi.toAppendStream[Row].print("table api") tableSQL.toAppendStream[Row].print("sql") env.execute("FlinkSqlSessionTime") } } ``` ### 二、Over Windows Over window 聚合是标准 SQL 中已有的(Over 子句),可以在查询的 SELECT 子句中定义。 Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。 Over windows 使用`.window(w:overwindows*)`子句定义,并在 select()方法中通过别名来引用。 **代码如下:** ```java val table = input .window([w: OverWindow] as 'w) .select('a, 'b.sum over 'w, 'c.min over 'w) ``` Table API 提供了 Over 类,来配置 Over 窗口的属性。可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义 Over windows。 无界的over window是使用常量指定的。也就是说,时间间隔要指定`UNBOUNDED_RANGE`, 或者行计数间隔要指定`UNBOUNDED_ROW`。而有界的 over window 是用间隔的大小指定的。 **实际代码应用如下:** **(1)无界的over window** ```java // 无界的事件时间 over window (时间字段 "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w) //无界的处理时间 over window (时间字段"proctime") .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w) // 无界的事件时间 Row-count over window (时间字段 "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w) //无界的处理时间 Row-count over window (时间字段 "rowtime") .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w) ``` **(2)有界的 over window** ```java // 有界的事件时间 over window (时间字段 "rowtime",之前 1 分钟) .window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w) // 有界的处理时间 over window (时间字段 "rowtime",之前 1 分钟) .window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w) // 有界的事件时间 Row-count over window (时间字段 "rowtime",之前 10 行) .window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w) // 有界的处理时间 Row-count over window (时间字段 "rowtime",之前 10 行) .window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w) ``` **案例** > 需求:我们可以综合学习过的内容,用一段完整的代码实现一个具体的需求。例如,**统计每个sensor每条数据,与之前两行数据的平均温度。** **实现代码** ```java import com.lilinchao.flink.sqldemo.TumbleWindowExample.SensorReading import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.{EnvironmentSettings, Over} import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row /** * @author lilinchao * @date 2022/3/3 * @description 1.0 **/ object FlinkSqlTumBlingOverTime { def main(args: Array[String]): Unit = { //构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 设置分区为1 方便后面测试 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件时间 val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() //创建表env val tableEnv = StreamTableEnvironment.create(env,settings) //读取数据 val inputStream = env.readTextFile("datas/sensor.txt") // 先转换成样例类类型(简单转换操作) val dataStream = inputStream.map(data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000L }) // 将数据注册成一张临时表 val sensorTable = tableEnv.fromDataStream(dataStream,'id,'temperature,'timestamp.rowtime as 'ts) //注册表 tableEnv.createTemporaryView("sensor",sensorTable) val tableRes = sensorTable.window(Over partitionBy 'id orderBy 'ts preceding 2.rows as 'ow) .select('id,'ts,'id.count over 'ow,'temperature.avg over 'ow) val tableSQL = tableEnv.sqlQuery( """ |select | id, | ts, | count(id) over ow, | avg(temperature) over ow |from sensor |window ow as( | partition by id | order by ts | rows between 2 preceding and current row |) """.stripMargin) tableRes.toAppendStream[Row].print("tableRes") tableSQL.toAppendStream[Row].print("tableSQL") env.execute("FlinkSqlTumBlingOverTime") } } ``` *附参考文章链接:* *https://cloud.tencent.com/developer/article/1785228*
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1931.html
上一篇
03.FastDFS整体架构
下一篇
11.Flink之SQL中窗口的定义
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
FileBeat
链表
Python
SpringBoot
Flink
Map
Flume
JavaScript
查找
Docker
Kafka
HDFS
递归
BurpSuite
Java阻塞队列
Spring
JavaWeb
ajax
MyBatis
持有对象
锁
随笔
RSA加解密
并发编程
Tomcat
并发线程
工具
Git
Golang
队列
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭