李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
11.Flink之SQL中窗口的定义
Leefs
2022-03-04 PM
2246℃
0条
### 前言 我们已经了解了在 Table API 里 window 的调用方式,同样,我们也可以在 SQL 中直接加入窗口的定义和使用。 ### 一、Group Windows Group Windows 在 SQL 查询的 Group BY 子句中定义。与使用常规 GROUP BY 子句的查询一样,使用 GROUP BY 子句的查询会计算每个组的单个结果行。 **SQL 支持以下 Group 窗口函数:** + **TUMBLE(time_attr, interval)** 定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口长度。 + **HOP(time_attr, interval, interval)** 定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是窗口长度。 + **SESSION(time_attr, interval)** 定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔(Gap)。 另外还有一些辅助函数,可以用来选择 Group Window 的开始和结束时间戳,以及时间属性。 这里只写 `TUMBLE_*`,滑动和会话窗口是类似的(`HOP_*`,`SESSION_*`)。 + TUMBLE_START(time_attr, interval) + TUMBLE_END(time_attr, interval) + TUMBLE_ROWTIME(time_attr, interval) + TUMBLE_PROCTIME(time_attr, interval) ### 二、Over Windows 由于 Over 本来就是 SQL 内置支持的语法,所以这在 SQL 中属于基本的聚合操作。所有聚合必须在同一窗口上定义,也就是说,必须是相同的分区、排序和范围。目前仅支持在当前行范围之前的窗口(无边界和有边界)。 注意,ORDER BY 必须在单一的时间属性上指定。 ```sql SELECT COUNT(amount) OVER ( PARTITION BY user ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM Orders // 也可以做多个聚合 SELECT COUNT(amount) OVER w, SUM(amount) OVER w FROM Orders WINDOW w AS ( PARTITION BY user ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) ``` ### 三、代码练习(以分组滚动窗口为例) 我们可以综合学习过的内容,用一段完整的代码实现一个具体的需求。例如,可以开一 个滚动窗口,统计 10 秒内出现的每个 sensor 的个数。 代码如下: ```java import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.{EnvironmentSettings, Table, Tumble} import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row /** * @author lilinchao * @date 2022/3/3 * @description 分组滚动窗口 **/ object FlinkSQLGroupDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() //创建表env val tableEnv = StreamTableEnvironment.create(env,settings) //读取数据 val inputStream = env.readTextFile("datas/sensor.txt") // 先转换成样例类类型(简单转换操作) val dataStream = inputStream.map(data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000L }) val dataTable: Table = tableEnv.fromDataStream(dataStream,'id,'temperature,'timestamp.rowtime) val resultTable = dataTable.window(Tumble over 10.seconds on 'timestamp as 'tw) .groupBy('id, 'tw) .select('id, 'id.count) val sqlDataTable: Table = dataTable.select('id,'temperature,'timestamp as 'ts) val resultSqlTable: Table = tableEnv.sqlQuery("select id,count(id) from "+sqlDataTable+" group by id,tumble(ts,interval '10' second)") //把Table转化成数据流 val value = resultTable.toAppendStream[Row] val resultDstream = resultSqlTable.toRetractStream[(String,Long)] resultDstream.filter(_._1).print() value.print("result") env.execute("FlinkSQLGroupDemo") } } ``` *附文章来源:* *《尚硅谷大数据之Flink》*
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1932.html
上一篇
10.Table API和Flink SQL之窗口操作
下一篇
12.Table API和Flink SQL之函数
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
8
标签云
Spark Core
MySQL
Shiro
Sentinel
DataWarehouse
Filter
Flume
Zookeeper
栈
CentOS
Jenkins
并发编程
Spark Streaming
工具
Redis
RSA加解密
Spark
数学
ajax
ClickHouse
SpringCloud
设计模式
链表
Git
Spark SQL
SQL练习题
Azkaban
Docker
字符串
FileBeat
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭