李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
26.Flink状态编程操作示例
Leefs
2022-01-27 PM
1302℃
0条
[TOC] ### 一、键控状态的使用 ```java import java.util import com.lilinchao.flink.window.SensorReading import org.apache.flink.api.common.functions.{ReduceFunction, RichReduceFunction} import org.apache.flink.api.common.state._ import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._ /** * @author lilinchao * @date 2022/1/27 * @description 键控状态Demo **/ object KeyedStateDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputStream: DataStream[String] = env.readTextFile("datas/sensor.txt") val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array: Array[String] = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) val resultData = dataStream.keyBy(_.id) .reduce(new MyKeyedState) resultData.print() env.execute("KeyedStateDemo") } } class MyKeyedState extends RichReduceFunction[SensorReading]{ // state 定义 // 声明一个键控状态 // lazy 定义的惰性变量 会实现延迟加载,即在编译的时候并没有执行 // 惰性变量只能是不可变变量,且只有在调用惰性变量时,才会实例化这个变量 // 全局定义值状态 var valueState:ValueState[Double] = _ //定义值状态 lazy val myValueState:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("myValue",classOf[Double])) // 定义列表状态 lazy val myListState:ListState[String] = getRuntimeContext.getListState(new ListStateDescriptor[String]("myList",classOf[String])) //定义mapState状态 lazy val myMapState:MapState[String,Double] = getRuntimeContext.getMapState(new MapStateDescriptor[String,Double]("myMap",classOf[String],classOf[Double])) //Reducing状态 lazy val myReducingState:ReducingState[SensorReading] = getRuntimeContext.getReducingState(new ReducingStateDescriptor[SensorReading]("myReduce",new MyReduceFunction,classOf[SensorReading])) override def open(parameters: Configuration): Unit = { // 定义值状态 只有在open生命周期内才生效 valueState = getRuntimeContext.getState(new ValueStateDescriptor[Double]("valuestate", classOf[Double])) } override def reduce(t: SensorReading, t1: SensorReading): SensorReading = { //状态的读写 //值状态的读取 val myValue: Double = myValueState.value() // 更新状态 valueState.update(t.temperature) // 列表状态 myListState.add("aaa") val list = new util.ArrayList[String]() list.add("bbb") list.add("ccc") myListState.addAll(list) // 更新替换掉列表内所有的数据 myListState.update(list) // 获取整个列表的值,可遍历 myListState.get() //mapState状态操作 myMapState.contains("sensor_1") // 是否包含key myMapState.get("sensor_1") // 获取key对应的value myMapState.put("sensor_1",1.3) // 更新状态 myMapState.remove("sensor_1") //移除某个值 //Reducing状态 myReducingState.get() myReducingState.add(t) // 与之前的状态聚合 t1 } } class MyReduceFunction extends ReduceFunction[SensorReading]{ override def reduce(value1: SensorReading, value2: SensorReading): SensorReading = SensorReading(value1.id, value2.timestamp, value1.temperature.min(value2.temperature)) } ``` **说明** **声明一个键控状态方式:** + **进行全局定义状态** + 只有在open生命周期内才生效 + **通过lazy惰性变量定义** + lazy 定义的惰性变量会实现延迟加载,即在编译的时候并没有执行; + 惰性变量只能是不可变变量,且只有在调用惰性变量时,才会实例化这个变量。 ### 二、案例实操 #### 2.1 需求 > 对于温度传感器温度值跳变,超过10度,报警 #### 2.2 数据准备 ```basic sensor_1,1547718199,35.8 sensor_1,1547718206,32.3 sensor_1,1547718208,36.2 sensor_1,1547718210,20.3 sensor_1,1547718213,22.5 sensor_1,1547718215,34.6 sensor_1,1547718218,30.9 ``` #### 2.3 示例代码 + **方式一:通过自定义RichFlatmapFunction实现** ```java class TempChangeAlert(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)]{ //定义状态保存上一次的温度值 lazy val lastTempSate:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-temp",classOf[Double])) lazy val flagState:ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean] ("flag",classOf[Boolean])) override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = { //获取上次的温度值 val lastTemp = lastTempSate.value() //跟最新的温度值求差值作比较 val diff = (value.temperature - lastTemp).abs if(diff > threshold) out.collect((value.id,lastTemp,value.temperature)) //更新状态 lastTempSate.update(value.temperature) flagState.update(true) } } ``` 该种方式如果在第一次输入温度值时因为获取上次的温度值为0,如果初始温度不在`-10~10`度之间会出现误判。 + **方式二:使用`FlatMap with keyed ValueState` 的快捷方式 `flatMapWithState`实现** ```java .flatMapWithState[(String, Double, Double), Double]{ case (data:SensorReading,None) => (List.empty,Some(data.temperature)) case (data:SensorReading,lastTemp:Some[Double]) => { //跟最新的温度值求差值作比较 val diff = (data.temperature - lastTemp.get).abs if(diff > 10.0) (List((data.id,lastTemp.get,data.temperature)),Some(data.temperature)) else (List.empty,Some(data.temperature)) } } ``` #### 2.4 完整代码 ```java import com.lilinchao.flink.window.SensorReading import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector /** * @author lilinchao * @date 2022/1/27 * @description 需求:对于温度传感器温度值跳变,超过10度,报警 **/ object TempChangeAlertDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputStream: DataStream[String] = env.socketTextStream("192.168.159.139",8888) //样例类转换 val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) val resultData: DataStream[(String, Double, Double)] = dataStream .keyBy(_.id) // .flatMap(new TempChangeAlert(10.0)) .flatMapWithState[(String, Double, Double), Double]{ case (data:SensorReading,None) => (List.empty,Some(data.temperature)) case (data:SensorReading,lastTemp:Some[Double]) => { //跟最新的温度值求差值作比较 val diff = (data.temperature - lastTemp.get).abs if(diff > 10.0) (List((data.id,lastTemp.get,data.temperature)),Some(data.temperature)) else (List.empty,Some(data.temperature)) } } resultData.print() env.execute("TempChangeAlertDemo") } } /** * 实现自定义RichFlatmapFunction * @param threshold 跳变温度值 */ class TempChangeAlert(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)]{ //定义状态保存上一次的温度值 lazy val lastTempSate:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-temp",classOf[Double])) lazy val flagState:ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean] ("flag",classOf[Boolean])) override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = { //获取上次的温度值 val lastTemp = lastTempSate.value() //跟最新的温度值求差值作比较 val diff = (value.temperature - lastTemp).abs if(diff > threshold) out.collect((value.id,lastTemp,value.temperature)) //更新状态 lastTempSate.update(value.temperature) flagState.update(true) } } ```
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1879.html
上一篇
25.Flink状态管理介绍
下一篇
27.【转载】Flink ProcessFunction API全解析及实战
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
DataWarehouse
DataX
二叉树
设计模式
MyBatisX
Git
Shiro
排序
pytorch
Jquery
数学
ClickHouse
人工智能
Java编程思想
MyBatis-Plus
稀疏数组
Hive
HDFS
Azkaban
Kibana
Sentinel
工具
JavaWEB项目搭建
Python
微服务
GET和POST
查找
Spark SQL
Stream流
Flume
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭