李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
20. Flink增量聚合函数和全窗口函数示例
Leefs
2022-01-22 PM
2775℃
0条
[TOC] ### 一、概念 window function 定义了要对窗口中收集的数据做的计算操作,主要可以分为两类: - **增量聚合函数(incremental aggregation functions)** 每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有 `ReduceFunction`, `AggregateFunction`。 - **全窗口函数(full window functions)** 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。 `ProcessWindowFunction` 就是一个全窗口函数。 ### 二、比较 + 增量聚合函数计算性能较高,占用存储空间少,主要因为中间状态的计算结果,窗口中只维护中间结果状态值,不需要缓存原始数据。 + 全量窗口函数使用的代价相对较高,性能比较弱,主要因为此时算子需要对所有属于该窗口的接入数据进行缓存,然后等到窗口触发时,对所有的原始数据进行汇总计算。 ### 三、增量聚合函数示例 #### 3.1 ReduceFunction `ReduceFunction` 定义了对输入的两个相同类型的数据元素按照指定的计算方法及逆行聚合操作,然后输出类型相同的一个结果元素。 > 需求:增量reduce求最大温度和最小温度值 ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector /** * Created by lilinchao * Date 2022/1/22 * Description 增量reduce求最大温度和最小温度值 */ object MinMaxTempByReduceAndProcess { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //数据源 val inputStream: DataStream[String] = env.socketTextStream("192.168.61.129",8888) //样例类转换 val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) val resultData: DataStream[MinMaxTemp] = dataStream.map(r => (r.id, r.temperature, r.temperature)) .keyBy(_._1) .timeWindow(Time.seconds(15)) .reduce( (r1: (String, Double, Double), r2: (String, Double, Double)) => { (r1._1, r1._2.min(r2._2), r1._3.max(r2._3)) }, new WindowResult ) resultData.print() env.execute("MinMaxTempByReduceAndProcess") } class WindowResult extends ProcessWindowFunction[(String,Double,Double),MinMaxTemp, String, TimeWindow]{ override def process(key: String, context: Context, elements: Iterable[(String, Double, Double)], out: Collector[MinMaxTemp]): Unit = { val temp = elements.head out.collect(MinMaxTemp(temp._1, temp._2, temp._3, context.window.getEnd)) } } } case class MinMaxTemp(id: String,min: Double,max: Double,endTs: Long) ``` #### 3.2 AggregateFunction 和 `ReduceFunction` 相似,`AggregateFunction`也是基于中间状态计算结果的增量计算函数,但`AggregateFunction`在窗口计算上更加通用。 `ReduceFunction`更加灵活,实现复杂度也相对较高。 AggregateFunction 接口中定义了3个需要复写的方法: + **add()**:定义数据的添加逻辑; + **getResult()**:定义了根据`createAccumulator()` 计算结果的逻辑; + **merge()**:方法定义合并 `createAccumulator()` 的逻辑 > 需求:增量聚合函数计算平均温度 ```java import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time /** * Created by lilinchao * Date 2022/1/22 * Description 增量聚合函数计算平均温度 */ object AvgTempPerWindow { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //数据源 val inputStream: DataStream[String] = env.socketTextStream("192.168.61.129",8888) //样例类转换 val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) val resultData: DataStream[(String, Double)] = dataStream.map(r => (r.id, r.temperature)) .keyBy(_._1) .timeWindow(Time.seconds(15)) .aggregate(new AvgTempFunction) resultData.print() env.execute("AvgTempPerWindow") } // 平均温度值 = 总的温度值 / 温度的条数 class AvgTempFunction extends AggregateFunction[(String, Double), (String, Double, Long), (String, Double)]{ //创建累加器 override def createAccumulator(): (String, Double, Long) = ("", 0.0, 0L) //对数据进行累加操作 override def add(in: (String, Double), acc: (String, Double, Long)): (String, Double, Long) = { (in._1,acc._2+in._2,acc._3+1) } //返回结果 override def getResult(acc: (String, Double, Long)): (String, Double) = { (acc._1,acc._2 / acc._3) } //合并操作 override def merge(a: (String, Double, Long), b: (String, Double, Long)): (String, Double, Long) = { (a._1, a._2 + b._2, a._3 + b._3) } } } ``` ### 四、 全窗口函数示例 #### 4.1 ProcessWindowFunction `ReduceFunction` 和 `AggregateFunction` 都是基于中间状态实现增量计算的窗口函数,已经满足大多数的情况,但是某些情况下,统计更复杂的指标可能需要依赖于窗口中所有的元素,或需要操作窗口中的状态数据和窗口元数据,就需要使用 `ProcessWindowFunction`。 `ProcessWindowFunction`能够更灵活地支持基于窗口全部数据元素的计算结果,例如对整个窗口中的数据操作,获取TopN,就需要使用 `ProcessWindowFunction`。 > 需求:全量聚合函数计算平均温度 ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector /** * Created by lilinchao * Date 2022/1/22 * Description 全量聚合函数计算平均温度 */ object AvgTempPerWindowByProcessWindowFunction { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //数据源 val inputStream: DataStream[String] = env.socketTextStream("192.168.61.129",8888) //样例类转换 val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) val resultData: DataStream[(String, Double)] = dataStream.map(r => (r.id, r.temperature)) .keyBy(_._1) .timeWindow(Time.seconds(15)) .process(new AvgTempFunc) resultData.print() env.execute("AvgTempPerWindowByProcessWindowFunction") } class AvgTempFunc extends ProcessWindowFunction[(String, Double), (String, Double), String, TimeWindow]{ override def process(key: String, context: Context, elements: Iterable[(String, Double)], out: Collector[(String, Double)]): Unit = { val size = elements.size var sum:Double = 0.0 for(r <- elements){ sum += r._2 } out.collect((key,sum / size)) } } } ``` > 需求:全量求最大温度和最小温度值 ```java import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector /** * Created by lilinchao * Date 2022/1/22 * Description 全量求最大温度和最小温度值 */ object MinMaxTempByAggregateAndProcess { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //数据源 val inputStream: DataStream[String] = env.socketTextStream("192.168.61.129",8888) //样例类转换 val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) val resultData = dataStream.keyBy(_.id) .timeWindow(Time.seconds(15)) .aggregate(new Agg, new WindowResult)// 第一个参数:增量聚合,第二个参数:全窗口聚合 resultData.print() env.execute("MinMaxTempByAggregateAndProcess") } class WindowResult extends ProcessWindowFunction[(String, Double, Double),MinMaxTemp, String, TimeWindow]{ override def process(key: String, context: Context, elements: Iterable[(String, Double, Double)], out: Collector[MinMaxTemp]): Unit = { //迭代器中只有一个值,就是增量聚合函数发送过来的聚合结果 val minMax = elements.head out.collect(MinMaxTemp(key, minMax._2, minMax._3, context.window.getEnd)) } } class Agg extends AggregateFunction[SensorReading, (String, Double, Double), (String, Double, Double)]{ //累加器 override def createAccumulator(): (String, Double, Double) = { ("", Double.MaxValue, Double.MinValue) } //每来一条数据调用一次 override def add(value: SensorReading, accumulator: (String, Double, Double)): (String, Double, Double) = { (value.id, value.temperature.min(accumulator._2), value.temperature.max(accumulator._3)) } //关窗的时候返回结果 override def getResult(acc: (String, Double, Double)): (String, Double, Double) = acc //分区间的聚合 override def merge(a: (String, Double, Double), b: (String, Double, Double)): (String, Double, Double) = { (a._1, a._2.min(b._2), a._3.max(b._3)) } } } case class MinMaxTemp(id: String,min: Double,max: Double,endTs: Long) ``` *附参考文章链接:* *https://blog.csdn.net/andyonlines/article/details/107776284* *https://www.cnblogs.com/alen-apple/p/13072999.html*
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1863.html
上一篇
19.Flink Window API使用详解
下一篇
21.Flink中的时间语义
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Filter
高并发
DataX
Ubuntu
NIO
Livy
Elasticsearch
排序
HDFS
Shiro
MySQL
Elastisearch
Python
Tomcat
BurpSuite
数据结构
Hive
Redis
Thymeleaf
字符串
JavaSE
SQL练习题
Jquery
MyBatis-Plus
序列化和反序列化
队列
查找
GET和POST
随笔
Zookeeper
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭