李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
16.Flink实现UDF函数
Leefs
2022-01-16 PM
1508℃
0条
### 前言 实现UDF的目的是为了更加细粒度的控制流。 ### 一、函数类(Function Classes) Flink暴露了所有UDF函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等。 **下面例子实现了FilterFunction接口:** ```java //自定义函数类,筛选出成绩大于等于60的学生 class MyFilter extends FilterFunction[Student]{ override def filter(t: Student): Boolean = { t.score >= 60 } } ``` **还可以将函数实现成匿名类:** ```java //匿名类实现 val filterData = studentData.filter( new FilterFunction[Student] { override def filter(t: Student): Boolean = { t.score >= 60 } } ) ``` **将过滤条件当作参数传递进去:** ```java //将过滤条件作为参数传递 class KeywordFilter(keyword:Double) extends FilterFunction[Student]{ override def filter(t: Student): Boolean = { t.score >= keyword } } ``` ### 二、**使用匿名函数(Lambda Functions)** ```java //匿名函数+Lambda表达式 val filterData = studentData.filter(_.score >= 60) ``` *注意:Lambda 函数是真的简洁,不过没办法传参* ### 三、富函数(Rich Functions) “富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一 些生命周期方法,所以可以实现更复杂的功能。 1. RichMapFunction 2. RichFlatMapFunction 3. RichFilterFunction 4. ...... Rich Function有一个生命周期的概念。 典型的生命周期方法有: + **open()**:方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter 被调用之前 open()会被调用。 + **close()**:方法是生命周期中的最后一个调用的方法,做一些清理工作。 + **getRuntimeContext()**:方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态。 ```java class MyRichFilter extends RichFilterFunction[Student]{ //可以做一些初始化工作,例如:进行建立HDFS或者数据库等连接操作 override def open(parameters: Configuration): Unit = { println("my open function") } //过滤操作 override def filter(t: Student): Boolean = { t.score >= 60 } //可以做一些清理工作,例如断开和HDFS或者数据库等的连接 override def close(): Unit = { println("my close function") } } ``` + **ctrl+O:**查看重写方法 ![16.Flink实现UDF函数01.jpg](https://lilinchao.com/usr/uploads/2022/01/2510685511.jpg) ### 四、完整代码 **示例数据** ```basic "10001,Thyee,60" "10002,Jeyoo,100" "10001,Leefs,90" "10001,Thsue,58" "10001,Hyee,56" ``` **需求** > 获取及格学生的信息 **代码实现** ```java import org.apache.flink.api.common.functions._ import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/16 * Description 获取及格学生的信息 */ object FilterFunctionTest { def main(args: Array[String]): Unit = { // 创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //从集合中读取数据 val dataList = List("10001,Thyee,60","10002,Jeyoo,100","10001,Leefs,90","10001,Thsue,58","10001,Hyee,56") val studentStream: DataStream[String] = env.fromCollection(dataList) //将数据转换成样例类 val studentData: DataStream[Student] = studentStream.map(data => { val array: Array[String] = data.split(",") Student(array(0).toInt, array(1), array(2).toDouble) }) //调用自定义函数类 // val filterData = studentData.filter(new MyFilter) //匿名类实现 /*val filterData = studentData.filter( new FilterFunction[Student] { override def filter(t: Student): Boolean = { t.score >= 60 } } )*/ //调用函数,传入过滤条件 // val filterData = studentData.filter(new KeywordFilter(60)) //匿名函数+Lambda表达式 // val filterData = studentData.filter(_.score >= 60) //调用MyRichFilter类 val filterData = studentData.filter(new MyRichFilter) filterData.print() env.execute("filter function test") } } //定义样例类 case class Student(id:Int,name:String,score:Double) //自定义函数类,筛选出成绩大于等于60的学生 class MyFilter extends FilterFunction[Student]{ override def filter(t: Student): Boolean = { t.score >= 60 } } //将过滤条件作为参数传递 class KeywordFilter(keyword:Double) extends FilterFunction[Student]{ override def filter(t: Student): Boolean = { t.score >= keyword } } class MyRichFilter extends RichFilterFunction[Student]{ //可以做一些初始化工作,例如:进行建立HDFS或者数据库等连接操作 override def open(parameters: Configuration): Unit = { println("my open function") } //过滤操作 override def filter(t: Student): Boolean = { t.score >= 60 } //可以做一些清理工作,例如断开和HDFS或者数据库等的连接 override def close(): Unit = { println("my close function") } } ```
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1851.html
上一篇
15【转载】Flink数据类型和序列化
下一篇
17.Flink流处理API之Sink
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
MyBatis
Hive
MyBatis-Plus
DataWarehouse
RSA加解密
DataX
队列
Java
FastDFS
Redis
Flume
Quartz
gorm
国产数据库改造
ClickHouse
Spring
BurpSuite
Scala
数据结构
递归
NIO
Ubuntu
二叉树
Typora
Hbase
Kibana
FileBeat
HDFS
LeetCode刷题
正则表达式
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭