李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
24.EvnetTime在window中的使用
Leefs
2022-01-25 PM
733℃
0条
### 前言 **版本**: + Flink 1.10.1 **示例数据** ```basic 0001,1550116440000 0001,1550116441000 0001,1550116442000 0001,1550116443000 0001,1550116444000 0001,1550116445000 0001,1550116446000 0001,1550116450000 0001,1550116451000 0001,1550116452000 0001,1550116453000 0001,1550116456000 0001,1550116460000 0001,1550116461000 0001,1550116462000 0001,1550116464000 ``` ### 一、滚动窗口(TumblingEventTimeWindows) ```java import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import scala.collection.mutable /** * @author lilinchao * @date 2022/1/25 * @description 滚动窗口Demo **/ object TumblingEventTimeDemo { def main(args: Array[String]): Unit = { //构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val inputStream: DataStream[String] = env.socketTextStream("192.168.159.139",8888) //样例类转换 val dataStream: DataStream[(String, Long, Int)] = inputStream.map(data => { val array = data.split(",") (array(0), array(1).toLong,1) }) val waterMarkData: DataStream[(String, Long, Int)] = dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(3)) { override def extractTimestamp(element: (String, Long, Int)): Long = { element._2 } }) val keyedStreamData = waterMarkData.keyBy(0) keyedStreamData.print("keyByData:") val windowStreamData: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = keyedStreamData.window(TumblingEventTimeWindows.of(Time.seconds(5))) val resultData: DataStream[mutable.HashSet[Long]] = windowStreamData.fold(new mutable.HashSet[Long]()) { case (set, (key, ts, count)) => { set += ts } } resultData.print("window:") env.execute("TumblingEventTimeDemo") } } ``` 结果是按照 Event Time的时间窗口计算得出的,而无关系统的时间(包括输入的快慢)。 ### 二、滑动窗口(SlidingEventTimeWindows) ```java import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import scala.collection.mutable /** * @author lilinchao * @date 2022/1/25 * @description 滑动窗口Demo **/ object SlidingEventTimeDemo { def main(args: Array[String]): Unit = { //构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val inputStream: DataStream[String] = env.socketTextStream("192.168.159.139",8888) //样例类转换 val dataStream: DataStream[(String, Long, Int)] = inputStream.map(data => { val array = data.split(",") (array(0), array(1).toLong,1) }) val waterMarkData: DataStream[(String, Long, Int)] = dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(3)) { override def extractTimestamp(element: (String, Long, Int)): Long = { element._2 } }) val keyedStreamData: KeyedStream[(String, Long, Int), Tuple] = waterMarkData.keyBy(0) keyedStreamData.print("keyByData:") val windowStreamData: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = keyedStreamData.window(SlidingEventTimeWindows.of(Time.seconds(5),Time.milliseconds(500))) val resultData: DataStream[mutable.HashSet[Long]] = windowStreamData.fold(new mutable.HashSet[Long]()) { case (set, (key, ts, count)) => { set += ts } } resultData.print("window:") env.execute("SlidingEventTimeDemo") } } ``` ### 三、会话窗口 相邻两次数据的 EventTime 的时间差超过指定的时间间隔就会触发执行。如果加入Watermark, 会在符合窗口触发的情况下进行延迟。到达延迟水位再进行窗口触发。 ```java package com.lilinchao.flink.watermark import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow /** * @author lilinchao * @date 2022/1/25 * @description 会话窗口Demo **/ object EventTimeSessionDemo { def main(args: Array[String]): Unit = { //构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val inputStream: DataStream[String] = env.socketTextStream("192.168.159.139",8888) //样例类转换 val dataStream: DataStream[(String, Long, Int)] = inputStream.map(data => { val array = data.split(",") (array(0), array(1).toLong,1) }) val waterMarkData: DataStream[(String, Long, Int)] = dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(3)) { override def extractTimestamp(element: (String, Long, Int)): Long = { element._2 } }) val keyedStreamData = waterMarkData.keyBy(0) keyedStreamData.print("keyByData:") val windowStreamData: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = keyedStreamData.window(EventTimeSessionWindows.withGap(Time.milliseconds(500))) val resultData: DataStream[Int] = windowStreamData.reduce((t1, t2) => { (t1._1, 0L, t1._3 + t2._3) }).map(_._3) resultData.print("window:") env.execute("EventTimeSessionDemo") } } ``` *附参考文章来源:* *《尚硅谷大数据之Flink》*
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1873.html
上一篇
23.Flink之Watermark使用详解
下一篇
25.Flink状态管理介绍
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
43
标签云
二叉树
BurpSuite
Spring
Filter
Golang基础
队列
HDFS
LeetCode刷题
数据结构和算法
NIO
Map
Scala
散列
SpringBoot
容器深入研究
稀疏数组
Hbase
Azkaban
Linux
Http
Thymeleaf
Ubuntu
锁
FileBeat
Docker
Jenkins
Livy
Typora
数学
Nacos
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞