李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
19.Flink Window API使用详解
Leefs
2022-01-22 PM
1252℃
0条
[TOC] ### 前言 + **使用版本** + Flink 1.10.1 + JDK 1.8 + **数据准备** ```basic sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9 ``` + nc工具命令 ```shell nc -lk 8888 ``` ### 一、窗口适配器(window assigner) #### 1.1 概述 **窗口分配器:**window() 方法 + 我们可以用 `.window()` 来定义一个窗口,然后基于这个window去做一些聚合或者其它处理操作。 注意 `window ()` 方法必须在keyBy之后才能用。 + Flink提供了更加简单的 `.timeWindow` 和 `.countWindow`方法,用于定义时间窗口和计数窗口。 #### **1.2 WindowAssigner说明** + window() 方法接收的输入参数是一个 WindowAssigner。 + WindowAssigner 负责将每条输入的数据分发到正确的 window 中 **Flink 提供了通用的 WindowAssigner:** + 滚动窗口(tumbling window) + 滑动窗口(sliding window) + 会话窗口(session window) + 全局窗口(global window) #### 1.3 创建不同类型的窗口 + **滚动时间窗口(tumbling time window)** ```java .timeWindow(Time.seconds(15) ``` + **滑动时间窗口(sliding time window)** ```java .timeWindow(Time.seconds(15),Time.seconds(5)) ``` + **会话窗口(session window)** ```java .window(EventTimeSessionWindows.withGap(Time.seconds(20))) ``` + **滚动计数窗口(tumbling count window)** ```java .countWindow(5) ``` + **滑动计数窗口(sliding count window)** ```java .countWindow(10,2) ``` ### 二、时间窗口(Time Window) TimeWindow是将指定时间范围内的所有数据组成一个window,一次对一个window里面的所有数据进行计算。 #### 2.1 滚动窗口(Tumbling Windows) Flink 默认的时间窗口根据Processing Time进行窗口的划分,将Flink获取到的数据根据进入Flink的时间划分到不同的窗口中。 **示例** > 需求:获得温度15秒内的最小值 ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time /** * Created by lilinchao * Date 2022/1/22 * Description 滚动时间窗口 */ object TumbTimeWindowTest { /** * 获得温度15秒内的最小值 * @param args */ def main(args: Array[String]): Unit = { //构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputStream: DataStream[String] = env.socketTextStream("192.168.61.129",8888) //样例类转换 val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) val resultData: DataStream[(String, Double, Long)] = dataStream.map(data => (data.id, data.temperature, data.timestamp)) .keyBy(_._1) .timeWindow(Time.seconds(15)) //滚动时间窗口 参数:滚动时长 .reduce((curRes, newData) => (curRes._1, curRes._2.min(newData._2), newData._3)) resultData.print() env.execute("tumb time window test2") } } ``` 时间间隔可以通过Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一个来指定。 #### 2.2 滑动窗口(SlidingEventTimeWindows) 滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参 数,一个是 window_size,一个是 sliding_size。 **示例** > 需求:每隔5秒时间,统计一次最近15秒时间内,温度的最小值 ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time /** * Created by lilinchao * Date 2022/1/22 * Description 滑动时间窗口 */ object SlidingTimeWindowTest { /** * 每隔5秒时间,统计一次最近15秒时间内,温度的最小值 */ def main(args: Array[String]): Unit = { //构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputStream: DataStream[String] = env.socketTextStream("192.168.61.129",8888) //样例类转换 val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) val resultData: DataStream[(String, Double)] = dataStream.map(data => (data.id, data.temperature)) .keyBy(_._1) .timeWindow(Time.seconds(15), Time.seconds(5)) //滑动时间窗口 第一个参数:滑动时长,第二个参数:步长 .reduce((r1, r2) => (r1._1, r1._2.min(r2._2))) resultData.print() env.execute("sliding time window test") } } ``` 上方代码中的 sliding_size 设置为了 5s,也就是说,每 5s 就计算输出结果一次, 每一次计算的window 范围是15s内的所有元素。 ### 三、计数窗口(Count Window) CountWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素 数量达到窗口大小的key对应的结果。 注意:CountWindow的window_size指的是相同Key的元素的个数,不是输入的所有元素的总数。 #### 3.1 滚动窗口 默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。 **示例** > 需求:每收到三个相同key的数据就计算一次,取得三条数据中温度的最大值 ```java import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/22 * Description 滚动计数窗口 */ object TumbCountWindowTest { /** * 每收到三个相同key的数据就计算一次,取得三条数据中温度的最大值 */ def main(args: Array[String]): Unit = { //构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputStream: DataStream[String] = env.socketTextStream("192.168.61.129",8888) //样例类转换 val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) val resultData: DataStream[(String, Double)] = dataStream.map(data => (data.id, data.temperature)) .keyBy(_._1) .countWindow(3) //滚动计数窗口 .reduce((r1, r2) => (r1._1, r1._2.max(r2._2))) resultData.print() env.execute("tumb count window test") } } ``` #### 3.2 滑动窗口 滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。 **示例** > 需求:每收到两个相同key的数据就计算一次,每一次计算的window范围是10个元素 ```java import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/22 * Description 滑动计数窗口 */ object SlidingCountWindowTest { /** * 每收到两个相同key的数据就计算一次,每一次计算的window范围是10个元素 */ def main(args: Array[String]): Unit = { //构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputStream: DataStream[String] = env.socketTextStream("192.168.61.129",8888) //样例类转换 val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) val resultData: DataStream[(String, Double)] = dataStream.map(data => (data.id, data.temperature)) .keyBy(_._1) .countWindow(10,2) //每当某一个key的个数达到2的时候,触发计算,计算最近该key最近10个元素的内容 .sum(1) resultData.print() env.execute("sliding count window test") } } ``` 上方代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的 window范围是10个元素。 ### 四、window function window function 定义了要对窗口中收集的数据做的计算操作,主要可以分为两 类: + **增量聚合函数(incremental aggregation functions)** 每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有 ReduceFunction, AggregateFunction。 + **全窗口函数(full window functions)** 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。 ProcessWindowFunction 就是一个全窗口函数。 ### 五、其它可选 API + `.trigger()` :触发器 + 定义 window 什么时候关闭,触发计算并输出结果; + `.evictor()` :移除器 + 定义移除某些数据的逻辑; **定义移除某些数据的逻辑** + `.allowedLateness()` :允许处理迟到的数据 + `.sideOutputLateData()`:将迟到的数据放入侧输出流 + `.getSideOutput()` :获取侧输出流 #### 函数调用表 **Keyed Windows** ```java stream.keyBy(...) <- //是Keyed类型数据集 .window(...) <- //指定窗口分配器类型 [.trigger(...)] <- //指定触发器类型(可选) [.evictor(...)] <- // 指 定 evictor 或者不指定(可选) [.allowedLateness()] <- //指定是否延迟处理数据(可选) [.sideOutputLateData(...)] <- //指定Output Lag(可选) .reduce/fold/apply() <- //指定窗口计算函数 [.getSideOutput(...)] <- //根据Tag输出数据(可选) ``` **Non-Keyed Windows** ```java stream.windowAll(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness()] <- optional: "lateness" (else zero) [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data) .reduce/fold/apply() <- required: "function" [.getSideOutput(...)] <- optional: "output tag" ``` 在上面的例子中,方括号[]内的命令是可选的,这表明Flink允许你根据最符合你的要求来定义自己的window逻辑。
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1862.html
上一篇
18.Flink window API介绍
下一篇
20. Flink增量聚合函数和全窗口函数示例
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Spark SQL
MyBatisX
Python
数据结构
随笔
Elasticsearch
HDFS
Java阻塞队列
二叉树
字符串
FileBeat
ajax
队列
并发线程
BurpSuite
Netty
Quartz
锁
排序
人工智能
Ubuntu
Thymeleaf
Livy
Hive
SpringCloudAlibaba
持有对象
稀疏数组
Filter
Kafka
数学
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞