李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
28.Flink ProcessFunction应用示例
Leefs
2022-01-30 AM
1434℃
0条
[TOC] ### 前言 **版本:** + Flink 1.10.1 + Scala 1.12 **数据准备:** ```basic sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9 sensor_1,1547718215,32.9 sensor_1,1547718218,33.6 sensor_1,1547718225,35.8 ``` ### 一、定时器应用示例 #### 1.1 需求 > 连续15秒如果温度持续上升就报警 #### 1.2 示例代码 ```java import com.lilinchao.flink.window.SensorReading import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector /** * @author lilinchao * @date 2022/1/30 * @description 连续15秒如果温度持续上升就报警 **/ object ProcessFunctionTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //读取数据 val inputStream = env.socketTextStream("192.168.159.139",8888) //将数据转换成样例类类型 val dataStream: DataStream[SensorReading] = inputStream.map( data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) } ) val warningStream = dataStream .keyBy(_.id) .process(new TempIncreWarning(15000L)) warningStream.print() env.execute("ProcessFunctionTest") } } // 定义样例类,传感器 id,时间戳,温度 case class SensorReading(id: String, timestamp: Long, temperature: Double) // 实现自定义的KeyedProcessFunction class TempIncreWarning(interval: Long) extends KeyedProcessFunction[String, SensorReading, String]{ //定义状态:保存上一个温度值进行比较,保存注册定时器的时间戳用于删除 lazy val lastTempState:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-temp",classOf[Double])) ///定时器时间戳 lazy val timerTsState:ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer-ts",classOf[Long])) //每条数据都会经过这个方法 override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = { //先取出上次定时器状态和时间 val lastTemp = lastTempState.value() val timerTs = timerTsState.value() //更新温度值 lastTempState.update(value.temperature) //当前温度值和上次温度值进行比较 //timerTs == 0:说明是第一次 if(value.temperature > lastTemp && timerTs == 0){ //如果温度上升,且没有定时器,那么注册当前时间10s之后的定时器 val ts = ctx.timerService().currentProcessingTime() + interval ctx.timerService().registerProcessingTimeTimer(ts) timerTsState.update(ts) }else if(value.temperature < lastTemp){ //如果温度下降,那么删除定时器 ctx.timerService().deleteProcessingTimeTimer(timerTs) timerTsState.clear() } } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = { out.collect("传感器" + ctx.getCurrentKey + "的温度连续" + interval/1000 + "秒连续上升") timerTsState.clear() } } ``` ### 二、侧输出流应用示例 #### 2.1 需求 > 如果温度超过30,则输出到主流上,否则输出到侧流上,实现分流操作 #### 2.2 示例代码 ```java import com.lilinchao.flink.window.SensorReading import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector /** * @author lilinchao * @date 2022/1/30 * @description 如果温度超过30,则输出到主流上,否则输出到侧流上,实现分流操作 **/ object SideOutputTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //读取数据 val inputStream = env.socketTextStream("192.168.159.139",8888) //将数据转换成样例类类型 val dataStream: DataStream[SensorReading] = inputStream.map( data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) } ) val highTempStream: DataStream[SensorReading] = dataStream.process(new SplitTempProcessor(30.0)) highTempStream.print("high") highTempStream.getSideOutput(new OutputTag[(String, Long, Double)]("low")).print("low") env.execute("SideOutputTest") } } // 定义样例类,传感器 id,时间戳,温度 case class SensorReading(id: String, timestamp: Long, temperature: Double) // 实现自定义ProcessFunction,利用侧输出流,进行分流操作 /** * @param threshold * ProcessFunction传2个参数,第一个是输入的数据类型,第二个是输出的数据类型,都可以自定义 */ class SplitTempProcessor(threshold:Double) extends ProcessFunction[SensorReading, SensorReading]{ override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = { if(value.temperature > threshold){ //如果当前温度值大于30,那么输出到主流 out.collect(value) }else { //如果不超过30度,那么输出到侧输出流 ctx.output(new OutputTag[(String, Long, Double)]("low"),(value.id,value.timestamp,value.temperature)) } } } ```
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1885.html
上一篇
27.【转载】Flink ProcessFunction API全解析及实战
下一篇
29.Flink状态后端(State Backends)
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
随笔
Azkaban
Golang基础
人工智能
Yarn
排序
Spring
Redis
Spark SQL
并发线程
FastDFS
设计模式
Thymeleaf
国产数据库改造
nginx
Map
微服务
Jenkins
栈
队列
Java阻塞队列
DataX
JavaScript
Python
数据结构
Spark Core
Hive
Ubuntu
MySQL
MyBatis
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭