李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
24.EvnetTime在window中的使用
Leefs
2022-01-25 PM
1081℃
0条
### 前言 **版本**: + Flink 1.10.1 **示例数据** ```basic 0001,1550116440000 0001,1550116441000 0001,1550116442000 0001,1550116443000 0001,1550116444000 0001,1550116445000 0001,1550116446000 0001,1550116450000 0001,1550116451000 0001,1550116452000 0001,1550116453000 0001,1550116456000 0001,1550116460000 0001,1550116461000 0001,1550116462000 0001,1550116464000 ``` ### 一、滚动窗口(TumblingEventTimeWindows) ```java import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import scala.collection.mutable /** * @author lilinchao * @date 2022/1/25 * @description 滚动窗口Demo **/ object TumblingEventTimeDemo { def main(args: Array[String]): Unit = { //构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val inputStream: DataStream[String] = env.socketTextStream("192.168.159.139",8888) //样例类转换 val dataStream: DataStream[(String, Long, Int)] = inputStream.map(data => { val array = data.split(",") (array(0), array(1).toLong,1) }) val waterMarkData: DataStream[(String, Long, Int)] = dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(3)) { override def extractTimestamp(element: (String, Long, Int)): Long = { element._2 } }) val keyedStreamData = waterMarkData.keyBy(0) keyedStreamData.print("keyByData:") val windowStreamData: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = keyedStreamData.window(TumblingEventTimeWindows.of(Time.seconds(5))) val resultData: DataStream[mutable.HashSet[Long]] = windowStreamData.fold(new mutable.HashSet[Long]()) { case (set, (key, ts, count)) => { set += ts } } resultData.print("window:") env.execute("TumblingEventTimeDemo") } } ``` 结果是按照 Event Time的时间窗口计算得出的,而无关系统的时间(包括输入的快慢)。 ### 二、滑动窗口(SlidingEventTimeWindows) ```java import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import scala.collection.mutable /** * @author lilinchao * @date 2022/1/25 * @description 滑动窗口Demo **/ object SlidingEventTimeDemo { def main(args: Array[String]): Unit = { //构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val inputStream: DataStream[String] = env.socketTextStream("192.168.159.139",8888) //样例类转换 val dataStream: DataStream[(String, Long, Int)] = inputStream.map(data => { val array = data.split(",") (array(0), array(1).toLong,1) }) val waterMarkData: DataStream[(String, Long, Int)] = dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(3)) { override def extractTimestamp(element: (String, Long, Int)): Long = { element._2 } }) val keyedStreamData: KeyedStream[(String, Long, Int), Tuple] = waterMarkData.keyBy(0) keyedStreamData.print("keyByData:") val windowStreamData: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = keyedStreamData.window(SlidingEventTimeWindows.of(Time.seconds(5),Time.milliseconds(500))) val resultData: DataStream[mutable.HashSet[Long]] = windowStreamData.fold(new mutable.HashSet[Long]()) { case (set, (key, ts, count)) => { set += ts } } resultData.print("window:") env.execute("SlidingEventTimeDemo") } } ``` ### 三、会话窗口 相邻两次数据的 EventTime 的时间差超过指定的时间间隔就会触发执行。如果加入Watermark, 会在符合窗口触发的情况下进行延迟。到达延迟水位再进行窗口触发。 ```java package com.lilinchao.flink.watermark import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow /** * @author lilinchao * @date 2022/1/25 * @description 会话窗口Demo **/ object EventTimeSessionDemo { def main(args: Array[String]): Unit = { //构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val inputStream: DataStream[String] = env.socketTextStream("192.168.159.139",8888) //样例类转换 val dataStream: DataStream[(String, Long, Int)] = inputStream.map(data => { val array = data.split(",") (array(0), array(1).toLong,1) }) val waterMarkData: DataStream[(String, Long, Int)] = dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(3)) { override def extractTimestamp(element: (String, Long, Int)): Long = { element._2 } }) val keyedStreamData = waterMarkData.keyBy(0) keyedStreamData.print("keyByData:") val windowStreamData: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = keyedStreamData.window(EventTimeSessionWindows.withGap(Time.milliseconds(500))) val resultData: DataStream[Int] = windowStreamData.reduce((t1, t2) => { (t1._1, 0L, t1._3 + t2._3) }).map(_._3) resultData.print("window:") env.execute("EventTimeSessionDemo") } } ``` *附参考文章来源:* *《尚硅谷大数据之Flink》*
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1873.html
上一篇
23.Flink之Watermark使用详解
下一篇
25.Flink状态管理介绍
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
容器深入研究
Jenkins
Java
JavaWEB项目搭建
Spark RDD
前端
GET和POST
Flume
SpringCloud
国产数据库改造
散列
JavaScript
Stream流
Shiro
Golang基础
JVM
Hadoop
Python
NIO
ajax
Elasticsearch
查找
算法
Hbase
SQL练习题
Spring
工具
MyBatis
Netty
DataWarehouse
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭