李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
11.Flink之SQL中窗口的定义
Leefs
2022-03-04 PM
1953℃
0条
### 前言 我们已经了解了在 Table API 里 window 的调用方式,同样,我们也可以在 SQL 中直接加入窗口的定义和使用。 ### 一、Group Windows Group Windows 在 SQL 查询的 Group BY 子句中定义。与使用常规 GROUP BY 子句的查询一样,使用 GROUP BY 子句的查询会计算每个组的单个结果行。 **SQL 支持以下 Group 窗口函数:** + **TUMBLE(time_attr, interval)** 定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口长度。 + **HOP(time_attr, interval, interval)** 定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是窗口长度。 + **SESSION(time_attr, interval)** 定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔(Gap)。 另外还有一些辅助函数,可以用来选择 Group Window 的开始和结束时间戳,以及时间属性。 这里只写 `TUMBLE_*`,滑动和会话窗口是类似的(`HOP_*`,`SESSION_*`)。 + TUMBLE_START(time_attr, interval) + TUMBLE_END(time_attr, interval) + TUMBLE_ROWTIME(time_attr, interval) + TUMBLE_PROCTIME(time_attr, interval) ### 二、Over Windows 由于 Over 本来就是 SQL 内置支持的语法,所以这在 SQL 中属于基本的聚合操作。所有聚合必须在同一窗口上定义,也就是说,必须是相同的分区、排序和范围。目前仅支持在当前行范围之前的窗口(无边界和有边界)。 注意,ORDER BY 必须在单一的时间属性上指定。 ```sql SELECT COUNT(amount) OVER ( PARTITION BY user ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM Orders // 也可以做多个聚合 SELECT COUNT(amount) OVER w, SUM(amount) OVER w FROM Orders WINDOW w AS ( PARTITION BY user ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) ``` ### 三、代码练习(以分组滚动窗口为例) 我们可以综合学习过的内容,用一段完整的代码实现一个具体的需求。例如,可以开一 个滚动窗口,统计 10 秒内出现的每个 sensor 的个数。 代码如下: ```java import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.{EnvironmentSettings, Table, Tumble} import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row /** * @author lilinchao * @date 2022/3/3 * @description 分组滚动窗口 **/ object FlinkSQLGroupDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() //创建表env val tableEnv = StreamTableEnvironment.create(env,settings) //读取数据 val inputStream = env.readTextFile("datas/sensor.txt") // 先转换成样例类类型(简单转换操作) val dataStream = inputStream.map(data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000L }) val dataTable: Table = tableEnv.fromDataStream(dataStream,'id,'temperature,'timestamp.rowtime) val resultTable = dataTable.window(Tumble over 10.seconds on 'timestamp as 'tw) .groupBy('id, 'tw) .select('id, 'id.count) val sqlDataTable: Table = dataTable.select('id,'temperature,'timestamp as 'ts) val resultSqlTable: Table = tableEnv.sqlQuery("select id,count(id) from "+sqlDataTable+" group by id,tumble(ts,interval '10' second)") //把Table转化成数据流 val value = resultTable.toAppendStream[Row] val resultDstream = resultSqlTable.toRetractStream[(String,Long)] resultDstream.filter(_._1).print() value.print("result") env.execute("FlinkSQLGroupDemo") } } ``` *附文章来源:* *《尚硅谷大数据之Flink》*
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1932.html
上一篇
10.Table API和Flink SQL之窗口操作
下一篇
12.Table API和Flink SQL之函数
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Java编程思想
Linux
nginx
GET和POST
Java工具类
SpringCloudAlibaba
MyBatisX
Jenkins
Nacos
SpringBoot
Zookeeper
Java
FastDFS
查找
前端
微服务
Typora
Thymeleaf
Jquery
国产数据库改造
数据结构和算法
Hbase
锁
Spark Streaming
Flink
Hadoop
RSA加解密
MyBatis
Golang基础
Yarn
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭