李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
23.Flink之Watermark使用详解
Leefs
2022-01-25 PM
1999℃
0条
[TOC] ### 前言 **版本:** + Flink 1.10.1 **示例数据** ```basic sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9 sensor_1,1547718215,32.9 sensor_1,1547718218,31.8 sensor_1,1547718225,37.2 ``` ### 一、引入Watermark(使用已有的类) **方式一:**对于一个没有乱序,时间为升序的流设置引入Watermark ```java .assignAscendingTimestamps(_.timestamp * 1000L) //升序数据提取时间戳 ``` **方式二:当流中存在时间乱序问题,引入watermark,并设置延迟时间** ```java .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) { override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L }) ``` **说明** 1、`BoundedOutOfOrdernessTimestampExtractor`中的泛型为流中数据的类型; 2、传入的参数为 watermark 的最大延迟时间(即允许数据迟到的时间); 3、重写的extractTimestamp方法返回的是设置数据中EventTime的字段,单位为毫秒,需要将时间转换成Long(最近时间为13位的长整形)才能返回; 4、当我们能大约估计到流中的最大乱序时,建议使用此方式,比较方便。 Event Time的使用一定要指定数据源中的时间戳。否则程序无法知道事件的事件时间是什么(数据源里的数据没有时间戳的话,就只能使用 Processing Time 了)。 如果要使用 EventTime,那么需要引入 EventTime 的时间属性,引入方式如下所示: ```java val env = StreamExecutionEnvironment.getExecutionEnvironment // 必须指定 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) ``` **完整代码** ```java import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time /** * @author lilinchao * @date 2022/1/25 * @description Watermark引入 **/ object WaterMarkDemo { def main(args: Array[String]): Unit = { //构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //从调用时刻开始给env创建的每一个stream追加时间特性 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //设置生成watermark的时间间隔,系统默认为200毫秒,一般使用系统默认即可 env.getConfig.setAutoWatermarkInterval(5000) val inputStream: DataStream[String] = env.socketTextStream("192.168.159.139",8888) //样例类转换 val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) //引入Watermark(使用已有的类) val waterMarkData = dataStream // 给一个没有乱序,时间为升序的流设置一个EventTime // .assignAscendingTimestamps(_.timestamp * 1000L) //升序数据提取时间戳 /** * 当流中存在时间乱序问题,引入watermark,并设置延迟时间 * 1、BoundedOutOfOrdernessTimestampExtractor中的泛型为流中数据的类型 * 2、传入的参数为 watermark 的最大延迟时间(即允许数据迟到的时间) * 3、重写的extractTimestamp方法返回的是设置数据中EventTime的字段,单位为毫秒,需要将时间转换成Long(最近时间为13位的长整形)才能返回 * 4、当我们能大约估计到流中的最大乱序时,建议使用此中方式,比较方便 */ .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) { override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L }) val resultData: DataStream[(String, Double, Long)] = waterMarkData.map(data => (data.id, data.temperature, data.timestamp)) .keyBy(_._1) .timeWindow(Time.seconds(10)) //滚动时间窗口 参数:滚动时长 .reduce((curRes, newData) => (curRes._1, curRes._2.min(newData._2), newData._3)) resultData.print() env.execute("WaterMarkDemo") } } // 定义样例类,传感器 id,时间戳,温度 case class SensorReading(id: String, timestamp: Long, temperature: Double) ``` ### 二、自定义WaterMark生成机制 我们看到上面的例子中创建了一个看起来有点复杂的类,这个类实现的其实就是分配时间戳的接口。 Flink暴露了TimestampAssigner接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳。 **自定义WaterMark两种方式**: + AssignerWithPeriodicWatermarks + AssignerWithPunctuatedWatermarks 以上两个接口都继承自TimestampAssigner。 #### 2.1 定期水位线(Assigner with periodic watermarks) **根据从事件数据中去获取时间戳设置水位线存在的问题:** 没有达到水位线时不管现实中的时间推进了多久都不会触发关窗。 ##### 概念 **定期水位线(Periodic Watermark)**:按照固定时间间隔生成新的水位线。 **优势** 不管是否有新的消息抵达,水位线提升的时间间隔是由用户设置的,在两次水位线提升时隔内会有一部分消息流入,用户可以根据这部分数据来计算出新的水位线。 最简单的水位线算法就是取目前为止最大的事件时间,然而这种方式比较暴力,对乱序事件的容忍程度比较低,容易出现大量迟到事件。 **代码示例** 应用定期水位线需要**实现AssignerWithPeriodicWatermarks API** ```java import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.time.Time /** * @author lilinchao * @date 2022/1/25 * @description 定期水位线Demo **/ object PeriodicWaterMarkDemo { def main(args: Array[String]): Unit = { //构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //从调用时刻开始给env创建的每一个stream追加时间特性 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //设置生成watermark的时间间隔,系统默认为200毫秒,一般使用系统默认即可 env.getConfig.setAutoWatermarkInterval(5000) val inputStream: DataStream[String] = env.socketTextStream("192.168.159.139",8888) //样例类转换 val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) //使用 TimestampAssigner 引入 Watermark /** * Assigner with periodic watermarks(周期性引入watermark) * 1、系统会周期性的将watermark插入到流中,默认周期是200毫秒,可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置,单位为毫秒 * 2、产生watermark的逻辑:每隔5秒钟,Flink会调用AssignerWithPeriodicWatermarks的getCurrentWatermark()方法,如果大于流中最大watermark就插入,小于就不插入 * 3、如下,可以自定义一个周期性的时间戳抽取(需要实现 AssignerWithPeriodicWatermarks 接口) */ val waterMarkData = dataStream.assignTimestampsAndWatermarks(new MyPeriodicAssigner(10)) val resultData: DataStream[(String, Double, Long)] = waterMarkData.map(data => (data.id, data.temperature, data.timestamp)) .keyBy(_._1) .timeWindow(Time.seconds(10)) //滚动时间窗口 参数:滚动时长 .reduce((curRes, newData) => (curRes._1, curRes._2.min(newData._2), newData._3)) resultData.print() env.execute("PeriodicWaterMarkDemo") } } /** * 自定义一个周期生成watermark的类 * @param bound watermark的延时时间(毫秒) */ class MyPeriodicAssigner(bound:Long) extends AssignerWithPeriodicWatermarks[SensorReading]{ // 当前为止的最大时间戳(毫秒) var maxTs: Long = Long.MinValue /** * 获取当前的watermark(默认200毫秒获取一次,可以通过 env.getConfig.setAutoWatermarkInterval(5000) 来设置) * @return 当前watermark,当前最大时间戳 - 延时时间 */ override def getCurrentWatermark: Watermark = { new Watermark(maxTs - bound) } /** * 指定eventTime对应的字段(流中每条数据都会调用一次此方法) * @param element 流中的每条数据 * @param previousElementTimestamp 无 * @return 当前流的eventTime(单位:毫秒) */ override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = { // 每条数据都获取其中的时间戳,跟最大时间戳取大,并重新赋值给最大时间戳 maxTs = maxTs.max(element.timestamp * 1000) element.timestamp * 1000 } } ``` 其中**extractTimestamp用于从消息中提取事件时间**,而**getCurrentWatermark**用于生成新的水位线,新的水位线只有大于当前水位线才是有效的。每个窗口都会有该类的一个实例,因此可以利用实例的成员变量保存状态,比如上例中的当前最大时间戳。 **注:**周期性的(一定时间间隔或者达到一定的记录条数)产生一个Watermark。在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时 #### 2.2 标点水位线(Assigner with punctuated watermarks) ##### 概念 标点水位线(Punctuated Watermark)通过数据流中某些特殊标记事件来触发新水位线的生成。这种方式下窗口的触发与时间无关,而是决定于何时收到标记事件。 ##### 代码示例 应用标点水位线需要实现`AssignerWithPunctuatedWatermarks API` ```java import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.time.Time /** * @author lilinchao * @date 2022/1/25 * @description 标点水位线Demo **/ object PunctuatedWaterMarkDemo { def main(args: Array[String]): Unit = { //构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //从调用时刻开始给env创建的每一个stream追加时间特性 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //设置生成watermark的时间间隔,系统默认为200毫秒,一般使用系统默认即可 env.getConfig.setAutoWatermarkInterval(5000) val inputStream: DataStream[String] = env.socketTextStream("192.168.159.139",8888) //样例类转换 val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) //使用 TimestampAssigner 引入 Watermark /** * Assigner with punctuated watermarks(标点水位线) * 1、通过数据流中某些特殊标记事件来触发新水位线的生成。 * 2、种方式下窗口的触发与时间无关,而是决定于何时收到标记事件 */ val waterMarkData = dataStream.assignTimestampsAndWatermarks(new MyPunctuatedAssigner(10)) val resultData: DataStream[(String, Double, Long)] = waterMarkData.map(data => (data.id, data.temperature, data.timestamp)) .keyBy(_._1) .timeWindow(Time.seconds(10)) //滚动时间窗口 参数:滚动时长 .reduce((curRes, newData) => (curRes._1, curRes._2.min(newData._2), newData._3)) resultData.print() env.execute("PeriodicWaterMarkDemo") } } /** * 自定义一个周期生成watermark的类 * @param bound watermark的延时时间(毫秒) */ class MyPunctuatedAssigner(bound:Long) extends AssignerWithPunctuatedWatermarks[SensorReading]{ //checkAndGetNextWatermark用于检查事件是否标点事件,若是则生成新的水位线 override def checkAndGetNextWatermark(t: SensorReading, extractedTS: Long): Watermark = { if(t.id == "sensor_1"){ new Watermark(extractedTS - bound) }else{ null } } //extractTimestamp用于从消息中提取事件时间 override def extractTimestamp(element: SensorReading, previousTS: Long): Long = { element.timestamp * 1000 } } ``` **说明** + **extractTimestamp**:用于从消息中提取事件时间; + **checkAndGetNextWatermark**:用于检查事件是否标点事件,若是则生成新的水位线。 不同于定期水位线定时调用getCurrentWatermark,标点水位线是每接受一个事件就需要调用checkAndGetNextWatermark,若返回值非 null 且新水位线大于当前水位线,则触发窗口计算。 **注:**数据流中每一个递增的EventTime都会产生一个Watermark。在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。 #### 2.3 迟到事件 ##### 概念 虽说水位线表明着早于它的事件不应该再出现,但是如上文所讲,接收到水位线以前的的消息是不可避免的,这就是所谓的迟到事件。 实际上迟到事件是乱序事件的特例,和一般乱序事件不同的是它们的乱序程度超出了水位线的预计,导致窗口在它们到达之前已经关闭。 ##### 处理迟到数据的方法 迟到事件出现时窗口已经关闭并产出了计算结果,因此处理的方法有3种: - 重新激活已经关闭的窗口并重新计算以修正结果。 - 将迟到事件收集起来另外处理。 - 将迟到事件视为错误消息并丢弃。 Flink 默认的处理方式是第3种直接丢弃,其他两种方式分别使用**Side Output**和**Allowed Lateness**。 - Side Output机制可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。 - Allowed Lateness机制允许用户设置一个允许的最大迟到时长。Flink 会再窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。 ### 三、Watermark的设定 + 在 Flink 中,watermark 由应用程序开发人员生成,这通常需要对相应的领域有一定的了解; + 如果watermark设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果; + 而如果watermark到达得太早,则可能收到错误结果,不过Flink处理迟到数据的机制可以解决这个问题。
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1872.html
上一篇
22.Flink之Watermark基本概念
下一篇
24.EvnetTime在window中的使用
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Java编程思想
Golang基础
Python
VUE
RSA加解密
Redis
Java阻塞队列
Hive
Spark Streaming
nginx
微服务
序列化和反序列化
GET和POST
Flink
Spark RDD
DataX
Spring
Beego
栈
Zookeeper
ClickHouse
字符串
Azkaban
容器深入研究
JavaScript
NIO
Livy
Thymeleaf
数据结构和算法
SpringCloud
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭