李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
19.Flink Window API使用详解
Leefs
2022-01-22 PM
1766℃
0条
[TOC] ### 前言 + **使用版本** + Flink 1.10.1 + JDK 1.8 + **数据准备** ```basic sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9 ``` + nc工具命令 ```shell nc -lk 8888 ``` ### 一、窗口适配器(window assigner) #### 1.1 概述 **窗口分配器:**window() 方法 + 我们可以用 `.window()` 来定义一个窗口,然后基于这个window去做一些聚合或者其它处理操作。 注意 `window ()` 方法必须在keyBy之后才能用。 + Flink提供了更加简单的 `.timeWindow` 和 `.countWindow`方法,用于定义时间窗口和计数窗口。 #### **1.2 WindowAssigner说明** + window() 方法接收的输入参数是一个 WindowAssigner。 + WindowAssigner 负责将每条输入的数据分发到正确的 window 中 **Flink 提供了通用的 WindowAssigner:** + 滚动窗口(tumbling window) + 滑动窗口(sliding window) + 会话窗口(session window) + 全局窗口(global window) #### 1.3 创建不同类型的窗口 + **滚动时间窗口(tumbling time window)** ```java .timeWindow(Time.seconds(15) ``` + **滑动时间窗口(sliding time window)** ```java .timeWindow(Time.seconds(15),Time.seconds(5)) ``` + **会话窗口(session window)** ```java .window(EventTimeSessionWindows.withGap(Time.seconds(20))) ``` + **滚动计数窗口(tumbling count window)** ```java .countWindow(5) ``` + **滑动计数窗口(sliding count window)** ```java .countWindow(10,2) ``` ### 二、时间窗口(Time Window) TimeWindow是将指定时间范围内的所有数据组成一个window,一次对一个window里面的所有数据进行计算。 #### 2.1 滚动窗口(Tumbling Windows) Flink 默认的时间窗口根据Processing Time进行窗口的划分,将Flink获取到的数据根据进入Flink的时间划分到不同的窗口中。 **示例** > 需求:获得温度15秒内的最小值 ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time /** * Created by lilinchao * Date 2022/1/22 * Description 滚动时间窗口 */ object TumbTimeWindowTest { /** * 获得温度15秒内的最小值 * @param args */ def main(args: Array[String]): Unit = { //构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputStream: DataStream[String] = env.socketTextStream("192.168.61.129",8888) //样例类转换 val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) val resultData: DataStream[(String, Double, Long)] = dataStream.map(data => (data.id, data.temperature, data.timestamp)) .keyBy(_._1) .timeWindow(Time.seconds(15)) //滚动时间窗口 参数:滚动时长 .reduce((curRes, newData) => (curRes._1, curRes._2.min(newData._2), newData._3)) resultData.print() env.execute("tumb time window test2") } } ``` 时间间隔可以通过Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一个来指定。 #### 2.2 滑动窗口(SlidingEventTimeWindows) 滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参 数,一个是 window_size,一个是 sliding_size。 **示例** > 需求:每隔5秒时间,统计一次最近15秒时间内,温度的最小值 ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time /** * Created by lilinchao * Date 2022/1/22 * Description 滑动时间窗口 */ object SlidingTimeWindowTest { /** * 每隔5秒时间,统计一次最近15秒时间内,温度的最小值 */ def main(args: Array[String]): Unit = { //构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputStream: DataStream[String] = env.socketTextStream("192.168.61.129",8888) //样例类转换 val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) val resultData: DataStream[(String, Double)] = dataStream.map(data => (data.id, data.temperature)) .keyBy(_._1) .timeWindow(Time.seconds(15), Time.seconds(5)) //滑动时间窗口 第一个参数:滑动时长,第二个参数:步长 .reduce((r1, r2) => (r1._1, r1._2.min(r2._2))) resultData.print() env.execute("sliding time window test") } } ``` 上方代码中的 sliding_size 设置为了 5s,也就是说,每 5s 就计算输出结果一次, 每一次计算的window 范围是15s内的所有元素。 ### 三、计数窗口(Count Window) CountWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素 数量达到窗口大小的key对应的结果。 注意:CountWindow的window_size指的是相同Key的元素的个数,不是输入的所有元素的总数。 #### 3.1 滚动窗口 默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。 **示例** > 需求:每收到三个相同key的数据就计算一次,取得三条数据中温度的最大值 ```java import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/22 * Description 滚动计数窗口 */ object TumbCountWindowTest { /** * 每收到三个相同key的数据就计算一次,取得三条数据中温度的最大值 */ def main(args: Array[String]): Unit = { //构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputStream: DataStream[String] = env.socketTextStream("192.168.61.129",8888) //样例类转换 val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) val resultData: DataStream[(String, Double)] = dataStream.map(data => (data.id, data.temperature)) .keyBy(_._1) .countWindow(3) //滚动计数窗口 .reduce((r1, r2) => (r1._1, r1._2.max(r2._2))) resultData.print() env.execute("tumb count window test") } } ``` #### 3.2 滑动窗口 滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。 **示例** > 需求:每收到两个相同key的数据就计算一次,每一次计算的window范围是10个元素 ```java import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/22 * Description 滑动计数窗口 */ object SlidingCountWindowTest { /** * 每收到两个相同key的数据就计算一次,每一次计算的window范围是10个元素 */ def main(args: Array[String]): Unit = { //构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputStream: DataStream[String] = env.socketTextStream("192.168.61.129",8888) //样例类转换 val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) val resultData: DataStream[(String, Double)] = dataStream.map(data => (data.id, data.temperature)) .keyBy(_._1) .countWindow(10,2) //每当某一个key的个数达到2的时候,触发计算,计算最近该key最近10个元素的内容 .sum(1) resultData.print() env.execute("sliding count window test") } } ``` 上方代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的 window范围是10个元素。 ### 四、window function window function 定义了要对窗口中收集的数据做的计算操作,主要可以分为两 类: + **增量聚合函数(incremental aggregation functions)** 每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有 ReduceFunction, AggregateFunction。 + **全窗口函数(full window functions)** 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。 ProcessWindowFunction 就是一个全窗口函数。 ### 五、其它可选 API + `.trigger()` :触发器 + 定义 window 什么时候关闭,触发计算并输出结果; + `.evictor()` :移除器 + 定义移除某些数据的逻辑; **定义移除某些数据的逻辑** + `.allowedLateness()` :允许处理迟到的数据 + `.sideOutputLateData()`:将迟到的数据放入侧输出流 + `.getSideOutput()` :获取侧输出流 #### 函数调用表 **Keyed Windows** ```java stream.keyBy(...) <- //是Keyed类型数据集 .window(...) <- //指定窗口分配器类型 [.trigger(...)] <- //指定触发器类型(可选) [.evictor(...)] <- // 指 定 evictor 或者不指定(可选) [.allowedLateness()] <- //指定是否延迟处理数据(可选) [.sideOutputLateData(...)] <- //指定Output Lag(可选) .reduce/fold/apply() <- //指定窗口计算函数 [.getSideOutput(...)] <- //根据Tag输出数据(可选) ``` **Non-Keyed Windows** ```java stream.windowAll(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness()] <- optional: "lateness" (else zero) [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data) .reduce/fold/apply() <- required: "function" [.getSideOutput(...)] <- optional: "output tag" ``` 在上面的例子中,方括号[]内的命令是可选的,这表明Flink允许你根据最符合你的要求来定义自己的window逻辑。
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1862.html
上一篇
18.Flink window API介绍
下一篇
20. Flink增量聚合函数和全窗口函数示例
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
MySQL
Filter
FastDFS
持有对象
算法
Java阻塞队列
Http
Hadoop
数学
Kibana
Linux
SpringCloudAlibaba
Typora
Spark RDD
并发编程
DataWarehouse
机器学习
Java编程思想
Quartz
ajax
并发线程
Elastisearch
nginx
Spring
Spark SQL
递归
Thymeleaf
Map
序列化和反序列化
数据结构和算法
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭