李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
14.Flink流处理API之Transform转换算子
Leefs
2022-01-13 PM
1307℃
0条
[TOC] #### 1、Map + **作用** 将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素。 ![14.Flink流处理API之Transform转换算子01.png](https://lilinchao.com/usr/uploads/2022/01/1201784901.png) + **示例** > 需求:使用Map将数据转换成样例类 **代码** ```java import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/13 * Description 使用Map将数据转换成样例类 */ object StreamMapTest { def main(args: Array[String]): Unit = { //1.构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.使用fromElements val dataStream: DataStream[String] = env.fromElements("张三,10","李四,12","王五,14") //3.使用Map将数据封装成样例类 val userData: DataStream[User] = dataStream.map(m => { User(m.split(",")(0), m.split(",")(1)) }) //4.打印输出 userData.print() env.execute() } case class User(name:String,age:String) } ``` #### 2、flatMap + **作用** 消费一个元素并产生零个或多个元素 + **示例** > 需求:分别将以下数据,转换成 国家 、省份 、城市三个维度的数据。 ```scala 将"张三,中国,江西省,南昌市","李四,中国,河北省,石家庄市","Tom,America,NewYork,Manhattan" 转换为: 张三,中国 张三,中国江西省 张三,中国江西省南昌市 ``` **代码** ```java import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/13 * Description 分别将以下数据,转换成 国家 、省份 、城市 三个维度的数据。 */ object StreamFlatMapTest { /** * 1.构建批处理运行环境 * 2.构建本地集合数据源 * 3.使用 flatMap 将一条数据转换为三条数据 * a. 使用逗号分隔字段 * b. 分别构建国家、国家省份、国家省份城市三个元组 * 4.打印输出 */ def main(args: Array[String]): Unit = { //1.构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.使用fromCollection构建数据集 val dataStream = env.fromCollection(List("张三,中国,江西省,南昌市", "李四,中国,河北省,石家庄市", "Tom,America,NewYork,Manhattan")) val flatMapData: DataStream[(String, String)] = dataStream.flatMap(line => { val arr = line.split(",") List( (arr(0), arr(1)), (arr(0), arr(1) + arr(2)), (arr(0), arr(1) + arr(2) + arr(3)) ) }) //3.输出结果 flatMapData.print() env.execute("stream flatmap test") } } ``` #### 3、Filter + **作用** 根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃 ![14.Flink流处理API之Transform转换算子02.png](https://lilinchao.com/usr/uploads/2022/01/501101129.png) + **示例** > 需求:使用filter过滤掉大于10的数字 **代码** ```java import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/13 * Description 需求:使用filter过滤掉大于10的数字 */ object StreamFilterTest { def main(args: Array[String]): Unit = { //1.构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.构建数据集 val dataStream = env.fromElements(1,2,3,4,10,12,11,14,5,6) //3.使用 filter 操作执行过滤 val filter = dataStream.filter(_ < 10) //4.结果输出 filter.print() env.execute("stream filter test") } } ``` #### 4、KeyBy + **作用** 把流中的数据分到不同的分区(并行度)中.具有`相同key的元素`会分到同一个分区中.一个分区中可以有多重不同的key。 在内部是使用的是key的`hash分区`来实现的。 ![14.Flink流处理API之Transform转换算子03.png](https://lilinchao.com/usr/uploads/2022/01/3032307013.png) + **说明** + 对数据分组主要是为了进行后续的聚合操作,即对同组数据进行聚合分析。 + keyBy会将一个DataStream转化为一个KeyedStream,聚合操作会将KeyedStream转化为DataStream。 + 如果聚合前每个元素数据类型是T,聚合后的数据类型仍为T。 ![14.Flink流处理API之Transform转换算子04.png](https://lilinchao.com/usr/uploads/2022/01/2673194641.png) + **使用** > dataStream.keyby(param) **param:**数据字段下标 默认从0开始,也可以输入id的字段,就会按照id分流 + **示例** > 需求:使用keyBy对集合数据进行转换,最后通过reduce进行聚合操作 **代码** ```java import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/13 * Description 使用keyBy对集合数据进行转换,最后通过reduce进行聚合操作 */ object StreamKeyByTest { def main(args: Array[String]): Unit = { //1.构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //2.使用fromCollection val dataStream: DataStream[String] =env.fromCollection(Array("hello1 flink","hello1 world1","hello2 world1","hello3 world2")) //3.通过keyBy将DataStream转换成KeyedStream val keyByedData: KeyedStream[(String, String), Tuple] = dataStream.map(r => { val array: Array[String] = r.split(" ") (array(0), array(1)) }).keyBy(0) //4.通过reduce聚合操作,将KeyedStream转换成DataStream //一个分组数据流的聚合操作,合并当前的元素 //和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果 val reduceData: DataStream[(String, String)] = keyByedData.reduce((v1, v2) => (v1._1 + "---" + v2._1, v1._2 + "---" + v2._2)) reduceData.print("stream") env.execute("keyby test") } } ``` #### 5、滚动聚合算子(Rolling Aggregation) + **作用** + sum :求和 + max : 选择每条流的最大值 + min : 选择每条流的最小值 + minby : 针对 keyedStream中的某个字段数据进行选择最小值 + maxby : 针对 keyedStream中的某个字段数据进行选择最大值 + **示例** > 需求:对集合中的数据进行分区后,再进行滚动聚合操作 **代码** ```java import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/13 * Description 1.0 */ object StreamRollingAggregation { def main(args: Array[String]): Unit = { //1.构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //2.使用fromCollection val arrayData: DataStream[String] = env.fromCollection(Array("spark,2","spark,1","scala,1","scala,2","flink,3")) //3.通过map对数据进行类型转换,再使用keyBy对数据重分区 val keyByedData: KeyedStream[(String, Int), Tuple] = arrayData.map(r => { val array = r.split(",") (array(0), array(1).toInt) }).keyBy(0) //4.对重分区好的数据进行滚动聚合操作 val sumedDS: DataStream[(String, Int)] = keyByedData.sum(1) val minedDS: DataStream[(String, Int)] = keyByedData.min(1) val maxedDS: DataStream[(String, Int)] = keyByedData.max(1) val minByedDS: DataStream[(String, Int)] = keyByedData.minBy(1) val maxByedDS: DataStream[(String, Int)] = keyByedData.maxBy(1) //5.输出结果 sumedDS.print("stream1") minedDS.print("stream2") maxedDS.print("stream3") minByedDS.print("stream4") maxByedDS.print("stream5") env.execute() } } ``` #### 6、Reduce + **作用** 一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。 为什么还要把中间值也保存下来? 考虑流式数据的特点: 没有终点, 也就没有最终的概念了. 任何一个中间的聚合结果都是值! + **示例** > 需求:获得各学生成绩总和 **代码** ```java import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/13 * Description 获得各学生成绩总和 */ object StreamReduceTest { def main(args: Array[String]): Unit = { //1.构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.使用fromElements val dataStream: DataStream[String] = env.fromElements("张三,10","李四,12","王五,14","张三,12","李四,8") //3.根据name对成绩通过reduce进行累加操作 val studentData: DataStream[Student] = dataStream.map(line => { val data = line.split(",") Student(data(0), data(1).toInt) }).keyBy("name").reduce((x,y) => Student(x.name,x.score+y.score)) //4.输出结果 studentData.print() env.execute("stream reduce test") } case class Student(name:String,score:Int) } ``` #### 7、Split和Select(已废弃) + **作用** **split** DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。 ![14.Flink流处理API之Transform转换算子05.png](https://lilinchao.com/usr/uploads/2022/01/2558627232.png) **Select** SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。 ![14.Flink流处理API之Transform转换算子06.png](https://lilinchao.com/usr/uploads/2022/01/3033594272.png) + **示例** ```java import org.apache.flink.streaming.api.scala._ object StreamSplitAndSelect { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val arr: Array[(String, Int)] = Array(("scala", 1),("spark", 2), ("flink", 3), ("scala", 4)) val ds: DataStream[(String, Int)] = env.fromCollection(arr) val splitedSS: SplitStream[(String, Int)] = ds.split(r => { if (r._2 > 2) Seq("big") else Seq("small") }) val bigDS: DataStream[(String, Int)] = splitedSS.select("big") val smallDS: DataStream[(String, Int)] = splitedSS.select("small") val allDS: DataStream[(String, Int)] = splitedSS.select("big", "small") bigDS.print("bigDS") smallDS.print("smallDS") allDS.print("allDS") env.execute() } } ``` split在fink 1.13版本之后因为性能等原因,被SideOutput函数给替换,在后面会对SideOutput做详细介绍,本篇将不再过多叙述。 #### 8、Connect和CoMap + **作用** **connect** **DataStream,DataStream → ConnectedStreams:**连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。 ![14.Flink流处理API之Transform转换算子07.png](https://lilinchao.com/usr/uploads/2022/01/54086602.png) **CoMap,CoFlatMap** **ConnectedStreams → DataStream:**作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一 个Stream分别进行map和flatMap处理。 ![14.Flink流处理API之Transform转换算子08.png](https://lilinchao.com/usr/uploads/2022/01/2200907536.png) + 注意 1. 两个流中存储的数据类型`可以不同`; 2. 只是机械的合并在一起, 内部仍然是分离的2个流; 3. 只能2个流进行connect, `不能`有第3个参与; 4. 把两个流连接在一起,输入类型可以不同,但是返回的类型必须是一致。 + 示例 > 需求:将两个数据集合并到一个流当中,对数值类型进行加减操作 ```java import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/13 * Description 1.0 */ object StreamConnectAndComap { def main(args: Array[String]): Unit = { //1.构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.构建两个数据集 val arrayData1: DataStream[(String, Int)] = env.fromCollection(Array(("flink",1),("spark",2),("scala",3),("java",4))) val arrayData2: DataStream[(Int, String)] = env.fromCollection(Array((1,"flink"),(2,"spark"),(3,"scala"))) //3.通过connect函数对两个数据集进行关联 val connectStream: ConnectedStreams[(String, Int), (Int, String)] = arrayData1.connect(arrayData2) //4.分别对两个数据集进行相应操作 val coMap: DataStream[(Any, Any)] = connectStream.map(r1 => (r1._1, r1._2 - 5), r2 => (r2._1 + 5, r2._2)) //5.输出结果 coMap.print() env.execute() } } ``` #### 9、Union + **作用** 对`两个或者两个以上`的DataStream进行union操作,产生一个包含所有DataStream元素的`新DataStream` ![14.Flink流处理API之Transform转换算子09.png](https://lilinchao.com/usr/uploads/2022/01/3170407162.png) + **示例** > 需求:合并两个数据集 ```java import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/13 * Description 1.0 */ object StreamUnion { def main(args: Array[String]): Unit = { //1.构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.构建两个数据集 val arrayData1: DataStream[(String, Int)] = env.fromCollection(Array(("flink",1),("spark",2))) val arrayData2: DataStream[(String, Int)] = env.fromCollection(Array(("scala",3),("java",4))) //3.合并两个数据集 val unionData: DataStream[(String, Int)] = arrayData1.union(arrayData2) //4.输出结果 unionData.print("stream") env.execute() } } ``` **Connect与Union 区别:** 1. Union之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap 中再去调整成为一样的。 2. Connect只能操作两个流,Union 可以操作多个。
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1841.html
上一篇
13.Flink流处理API之Source
下一篇
15【转载】Flink数据类型和序列化
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Spark RDD
Spark Streaming
Elasticsearch
ClickHouse
随笔
HDFS
锁
Shiro
Redis
线程池
Kibana
SpringCloudAlibaba
Java
Java阻塞队列
链表
队列
VUE
Tomcat
设计模式
SQL练习题
正则表达式
稀疏数组
栈
二叉树
字符串
SpringCloud
Spark SQL
国产数据库改造
序列化和反序列化
Http
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭