李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
10.Table API和Flink SQL之窗口操作
Leefs
2022-03-03 PM
1680℃
0条
[TOC] ### 前言 时间语义,要配合窗口操作才能发挥作用。最主要的用途,当然就是开窗口、根据时间段做计算了。下面我们就来看看 Table API 和 SQL 中,怎么利用时间字段做窗口操作。 在 Table API 和 SQL 中,主要有两种窗口:**Group Windows 和 Over Windows** **数据准备** + **sensor.txt** ```basic sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9 ``` ### 一、分组窗口(Group Windows) 分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。 Table API 中的 Group Windows 都是使用`.window(w:GroupWindow)`子句定义的,并且必须由 as 子句指定一个别名。为了按窗口对表进行分组,窗口的别名必须在 group by 子句中,像常规的分组字段一样引用。 ```java val table = input .window([w: GroupWindow] as 'w) // 定义窗口,别名 w .groupBy('w, 'a) // 以属性 a 和窗口 w 作为分组的 key .select('a, 'b.sum) // 聚合字段 b 的值,求和 ``` 或者,还可以把窗口的相关信息,作为字段添加到结果表中: ```java val table = input .window([w: GroupWindow] as 'w) .groupBy('w, 'a) .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) ``` Table API 提供了一组具有特定语义的预定义 Window 类,这些类会被转换为底层 DataStream 或 DataSet 的窗口操作。 Table API 支持的窗口定义,和我们熟悉的一样,主要也是三种:**滚动(Tumbling)、滑 动(Sliding)和会话(Session)**。 #### 1.1 滚动窗口 滚动窗口(Tumbling windows)要用 Tumble 类来定义,另外还有三个方法: + **over**:定义窗口长度 + **on**:用来分组(按时间间隔)或者排序(按行数)的时间字段 + **as**:别名,必须出现在后面的 groupBy 中 **代码如下:** ```java // Tumbling Event-time Window(事件时间字段 rowtime) .window(Tumble over 10.minutes on 'rowtime as 'w) // Tumbling Processing-time Window(处理时间字段 proctime) .window(Tumble over 10.minutes on 'proctime as 'w) // Tumbling Row-count Window (类似于计数窗口,按处理时间排序,10 行一组) .window(Tumble over 10.rows on 'proctime as 'w) ``` **案例** > 需求:设置`滚动窗口为10秒钟`统计`id`出现的次数。 **代码** ```java import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.{EnvironmentSettings, Table, Tumble} import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row /** * @author lilinchao * @date 2022/3/2 * @description 设置滚动窗口 **/ object FlinkSQLTumBlingTie { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create(env,settings) //读取数据 val inputStream = env.readTextFile("datas/sensor.txt") // 先转换成样例类类型(简单转换操作) val dataStream = inputStream.map(data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000L }) val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts) //注册表 tableEnv.createTemporaryView("sensor",sensorTable) //Table API 实现 val resultTable = sensorTable .window(Tumble over 10.seconds on 'ts as 'tw) //每10秒统计一次,滚动时间窗口 .groupBy('id, 'tw) .select('id, 'id.count, 'tw.end) //sql实现 val sqlTable = tableEnv.sqlQuery( """ |select | id, | count(id), | tumble_end(ts,interval '10' second) |from sensor |group by | id, | tumble(ts,interval '10' second) """.stripMargin ) resultTable.toAppendStream[Row].print("table") sqlTable.toRetractStream[Row].print("sqlTable") env.execute("FlinkSQLTumBlingTie") } case class SensorReading(id: String, timestamp: Long, temperature: Double) } ``` #### 1.2 滑动窗口 滑动窗口(Sliding windows)要用 Slide 类来定义,另外还有四个方法: + **over**:定义窗口长度 + **every**:定义滑动步长 + **on**:用来分组(按时间间隔)或者排序(按行数)的时间字段 + **as**:别名,必须出现在后面的 groupBy 中 代码如下: ```java // Sliding Event-time Window .window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w) // Sliding Processing-time window .window(Slide over 10.minutes every 5.minutes on 'proctime as 'w) // Sliding Row-count window .window(Slide over 10.rows every 5.rows on 'proctime as 'w) ``` **案例** > 需求: 设置`窗口大小为10秒钟设置滑动距离为5秒钟`,统计`id`的出现的次数。 **代码实现** ```java import com.lilinchao.flink.sqldemo.TumbleWindowExample.SensorReading import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.{EnvironmentSettings, Slide} import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row /** * @author lilinchao * @date 2022/3/2 * @description 滑动窗口 **/ object FlinkSQLSlideTime { def main(args: Array[String]): Unit = { //构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 设置分区为1 方便后面测试 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件时间 val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() //创建表 env val tableEnv = StreamTableEnvironment.create(env,settings) //读取数据 val inputStream = env.readTextFile("datas/sensor.txt") // 先转换成样例类类型(简单转换操作) val dataStream = inputStream.map(data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000L }) val sensorTable = tableEnv.fromDataStream(dataStream,'id,'temperature,'timestamp.rowtime as 'ts) //注册表 tableEnv.createTemporaryView("sensor",sensorTable) //table API实现 val tableApi = sensorTable.window(Slide over 10.seconds every 5.seconds on 'ts as 'w) .groupBy('w, 'id) .select('id, 'id.count, 'w.end) val tableSQL = tableEnv.sqlQuery( """ |select | id, | count(id), | HOP_END(ts,INTERVAL '10' SECOND,INTERVAL '5' SECOND) as w |from sensor |group by | HOP(ts,INTERVAL '10' SECOND,INTERVAL '5' SECOND), | id """.stripMargin) tableApi.toAppendStream[Row].print("tableApi====") tableSQL.toAppendStream[Row].print("result") env.execute("FlinkSQLSlideTime") } } ``` #### 1.3 会话窗口 会话窗口(Session windows)要用 Session 类来定义,另外还有三个方法: + **withGap**:会话时间间隔 + **on**:用来分组(按时间间隔)或者排序(按行数)的时间字段 + **as**:别名,必须出现在后面的 groupBy 中 **代码如下:** ```java // Session Event-time Window .window(Session withGap 10.minutes on 'rowtime as 'w) // Session Processing-time Window .window(Session withGap 10.minutes on 'proctime as 'w) ``` **案例** > 需求:设置一个`session 为10秒钟` 统计`id`的个数 **代码** ```java import com.lilinchao.flink.sqldemo.TumbleWindowExample.SensorReading import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.{EnvironmentSettings, Session, Table} import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row /** * @author lilinchao * @date 2022/3/2 * @description 会话窗口 **/ object FlinkSqlSessionTime { def main(args: Array[String]): Unit = { //构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 设置分区为1 方便后面测试 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件时间 val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() //创建表env val tableEnv = StreamTableEnvironment.create(env,settings) //读取数据 val inputStream = env.readTextFile("datas/sensor.txt") // 先转换成样例类类型(简单转换操作) val dataStream = inputStream.map(data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000L }) val sensorTable = tableEnv.fromDataStream(dataStream,'id,'temperature,'timestamp.rowtime as 'ts) //注册表 tableEnv.createTemporaryView("sensor",sensorTable) val tableApi: Table = sensorTable .window(Session withGap 10.seconds on 'ts as 'w) .groupBy('id, 'w) .select('id, 'id.count, 'w.end) val tableSQL = tableEnv.sqlQuery( """ |SELECT | id, | count(id), | SESSION_END(ts,INTERVAL '10' SECOND) as w |FROM sensor |GROUP BY | id, | SESSION(ts,INTERVAL '10' SECOND) """.stripMargin) tableApi.toAppendStream[Row].print("table api") tableSQL.toAppendStream[Row].print("sql") env.execute("FlinkSqlSessionTime") } } ``` ### 二、Over Windows Over window 聚合是标准 SQL 中已有的(Over 子句),可以在查询的 SELECT 子句中定义。 Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。 Over windows 使用`.window(w:overwindows*)`子句定义,并在 select()方法中通过别名来引用。 **代码如下:** ```java val table = input .window([w: OverWindow] as 'w) .select('a, 'b.sum over 'w, 'c.min over 'w) ``` Table API 提供了 Over 类,来配置 Over 窗口的属性。可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义 Over windows。 无界的over window是使用常量指定的。也就是说,时间间隔要指定`UNBOUNDED_RANGE`, 或者行计数间隔要指定`UNBOUNDED_ROW`。而有界的 over window 是用间隔的大小指定的。 **实际代码应用如下:** **(1)无界的over window** ```java // 无界的事件时间 over window (时间字段 "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w) //无界的处理时间 over window (时间字段"proctime") .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w) // 无界的事件时间 Row-count over window (时间字段 "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w) //无界的处理时间 Row-count over window (时间字段 "rowtime") .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w) ``` **(2)有界的 over window** ```java // 有界的事件时间 over window (时间字段 "rowtime",之前 1 分钟) .window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w) // 有界的处理时间 over window (时间字段 "rowtime",之前 1 分钟) .window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w) // 有界的事件时间 Row-count over window (时间字段 "rowtime",之前 10 行) .window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w) // 有界的处理时间 Row-count over window (时间字段 "rowtime",之前 10 行) .window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w) ``` **案例** > 需求:我们可以综合学习过的内容,用一段完整的代码实现一个具体的需求。例如,**统计每个sensor每条数据,与之前两行数据的平均温度。** **实现代码** ```java import com.lilinchao.flink.sqldemo.TumbleWindowExample.SensorReading import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.{EnvironmentSettings, Over} import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row /** * @author lilinchao * @date 2022/3/3 * @description 1.0 **/ object FlinkSqlTumBlingOverTime { def main(args: Array[String]): Unit = { //构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 设置分区为1 方便后面测试 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件时间 val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() //创建表env val tableEnv = StreamTableEnvironment.create(env,settings) //读取数据 val inputStream = env.readTextFile("datas/sensor.txt") // 先转换成样例类类型(简单转换操作) val dataStream = inputStream.map(data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000L }) // 将数据注册成一张临时表 val sensorTable = tableEnv.fromDataStream(dataStream,'id,'temperature,'timestamp.rowtime as 'ts) //注册表 tableEnv.createTemporaryView("sensor",sensorTable) val tableRes = sensorTable.window(Over partitionBy 'id orderBy 'ts preceding 2.rows as 'ow) .select('id,'ts,'id.count over 'ow,'temperature.avg over 'ow) val tableSQL = tableEnv.sqlQuery( """ |select | id, | ts, | count(id) over ow, | avg(temperature) over ow |from sensor |window ow as( | partition by id | order by ts | rows between 2 preceding and current row |) """.stripMargin) tableRes.toAppendStream[Row].print("tableRes") tableSQL.toAppendStream[Row].print("tableSQL") env.execute("FlinkSqlTumBlingOverTime") } } ``` *附参考文章链接:* *https://cloud.tencent.com/developer/article/1785228*
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1931.html
上一篇
03.FastDFS整体架构
下一篇
11.Flink之SQL中窗口的定义
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
NIO
Quartz
线程池
Spark
HDFS
VUE
哈希表
Netty
FileBeat
ClickHouse
Jenkins
Golang
Git
Redis
Zookeeper
人工智能
Nacos
Spark SQL
SpringCloudAlibaba
Livy
JavaSE
Filter
JVM
国产数据库改造
Yarn
Java编程思想
MyBatisX
Flink
Elastisearch
SpringBoot
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭