李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
12.Table API和Flink SQL之函数
Leefs
2022-03-05 PM
1014℃
0条
[TOC] ### 前言 Flink Table 和 SQL 内置了很多 SQL 中支持的函数;如果有无法满足的需要,则可以实现用户自定义的函数(UDF)来解决。 ### 一、系统内置函数 Flink Table API 和 SQL 为用户提供了一组用于数据转换的内置函数。SQL 中支持的很多函数,Table API 和 SQL 都已经做了实现,其它还在快速开发扩展中。 以下是一些典型函数的举例,全部的内置函数,可以参考官网介绍。 | 类型 | TableApi | SQLAPI | | :------------: | :----------------------: | :---------------------------------: | | **比较函数** | ANY1 === ANY2 | value1 = value2 | | **比较函数** | NY1 > ANY2 | value1 > value2 | | **逻辑函数** | BOOLEAN1 `双竖杠` BOOLEAN2 | boolean1 OR boolean2 | | **逻辑函数** | BOOLEAN.isFalse | boolean IS FALSE | | **逻辑函数** | !BOOLEAN | NOT boolean | | **算术函数** | NUMERIC1 + NUMERIC2 | numeric1 + numeric2 | | **算术函数** | NUMERIC1.power(NUMERIC2) | POWER(numeric1, numeric2) | | **字符串函数** | STRING1 + STRING2 | string1 `双竖杠` string2 | | **字符串函数** | STRING.upperCase() | UPPER(string) | | **字符串函数** | STRING.charLength() | CHAR_LENGTH(string) | | **时间函数** | STRING.toDate | DATE string | | **时间函数** | STRING.toTimestamp | TIMESTAMP string | | **时间函数** | currentTime() | CURRENT_TIME | | **时间函数** | NUMERIC.days | INTERVAL string range | | **时间函数** | NUMERIC.minutes | | | **聚合函数** | FIELD.count | COUNT(`*`) | | **聚合函数** | FIELD.sum0 | SUM([ ALL `竖杠` DISTINCT ] expression) | | **聚合函数** | | RANK() | | **聚合函数** | | ROW_NUMBER() | ### 二、UDF函数 用户定义函数(User-defined Functions,UDF)是一个重要的特性,因为它们显著地扩展了查询(Query)的表达能力。一些系统内置函数无法解决的需求,我们可以用 UDF 来自 定义实现。 #### 2.1 **注册用户自定义函数 UDF** 在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用。不需要专门为Scala 的 Table API 注册函数。 函数通过调用 `registerFunction()`方法在 TableEnvironment 中注册。当用户定义的函数被注册时,它被插入到 TableEnvironment 的函数目录中,这样 Table API 或 SQL 解析器就可以识别并正确地解释它。 #### 2.2 **标量函数(Scalar Functions)** 用户定义的标量函数,可以将 0、1 或多个标量值,映射到新的标量值。 为了定义标量函数,必须在 `org.apache.flink.table.functions` 中扩展基类 Scalar Function,并实现(一个或多个)求值(evaluation,eval)方法。 标量函数的行为由求值方法决定,求值方法必须公开声明并命名为 eval(直接 def 声明,没有 override)。求值方法的参数类型和返回类型,确定了标量函数的参数和返回类型。 在下面的代码中,我们定义自己的 HashCode 函数,在 TableEnvironment 中注册它,并在查询中调用它。 + **数据准备** ```basic sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9 ``` + **代码实现** ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.types.Row /** * @author lilinchao * @date 2022/3/5 * @description 1.0 **/ object FlinkSqlUdfHashCode { def main(args: Array[String]): Unit = { //1.构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 设置并行度为1 //2.构建TableEnv val tableEnv = StreamTableEnvironment.create(env) //3.构建数据源 tableEnv.connect(new FileSystem().path("datas/sensor.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temperature",DataTypes.DOUBLE()) ).createTemporaryTable("sensor") //转为表 val tableSensor = tableEnv.from("sensor") //创建转换对象 val code = new HashCode() //使用table api进行测试 val tableRes = tableSensor.select('id,code('id)) tableEnv.registerFunction("code",code) //注册udf val tableSQL = tableEnv.sqlQuery( """ |select | id, | code(id) |from | sensor """.stripMargin) //输出 tableRes.toAppendStream[Row].print("tableapi") tableSQL.toAppendStream[Row].print("sql") env.execute("FlinkSqlUdfHashCode") } class HashCode() extends ScalarFunction{ def eval(s:String):String = { s.hashCode.toString } } } ``` #### 2.3 表函数(Table Functions) 与用户定义的标量函数类似,用户定义的表函数,可以将 0、1 或多个标量值作为输入参数; 与标量函数不同的是,它可以返回任意数量的行作为输出,而不是单个值。 为了定义一个表函数,必须扩展 `org.apache.flink.table.functions` 中的基类 TableFunction并实现(一个或多个)求值方法。 表函数的行为由其求值方法决定,求值方法必须是 public的,并命名为 eval。求值方法的参数类型,决定表函数的所有有效参数。 返回表的类型由 TableFunction 的泛型类型确定。求值方法使用 `protected collect(T)`方法发出输出行。 在 Table API 中,Table 函数需要与`.joinLateral` 或`.leftOuterJoinLateral` 一起使用。 `joinLateral` 算子,会将外部表中的每一行,与表函数(TableFunction,算子的参数是它的表达式)计算得到的所有行连接起来。 而 `leftOuterJoinLateral` 算子,则是左外连接,它同样会将外部表中的每一行与表函数计算生成的所有行连接起来;并且,对于表函数返回的是空表的外部行,也要保留下来。 在 SQL 中,则需要使用 `Lateral Table()`,或者带有 ON TRUE 条件的左连接。 下面的代码中,我们将定义一个表函数,在表环境中注册它,并在查询中调用它。 + **数据准备** ```basic hello|word, hello|spark, hello|Flink, hello|java, hello|python, hello|scala ``` + **代码实现** ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.Table import org.apache.flink.table.api.scala._ import org.apache.flink.table.functions.TableFunction import org.apache.flink.types.Row /** * @author lilinchao * @date 2022/3/5 * @description 表函数 **/ object FlinkSqlUDFTableFunction { def main(args: Array[String]): Unit = { //1.构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 设置并行度为1 //2.构建TableEnv val tableEnv = StreamTableEnvironment.create(env) //3.构建数据源 val inputData = env.readTextFile("datas/word_udf.txt") //解析数据 val wordData: DataStream[String] = inputData.flatMap(_.split(",")) //类型转换 val tableWord: Table = tableEnv.fromDataStream(wordData,'id) //调用TableFunction val split = new Split() // Table API方式一 val resTable1 = tableWord.joinLateral(split('id) as ('word,'length)) .select('id,'word,'length) //Table API 方式二 val resTable2 = tableWord.leftOuterJoinLateral(split('id) as ('word,'length)) .select('id,'word,'length) //将数据注册成表 tableEnv.createTemporaryView("sensor",tableWord) tableEnv.registerFunction("split",split) //SQL方式一 val tableSQL1 = tableEnv.sqlQuery( """ |select | id, | word, | length |from | sensor, | LATERAL TABLE(split(id)) AS newsensor(word, length) """.stripMargin) //SQL方式二 val tableSQL2 = tableEnv.sqlQuery( """ |select | id, | word, | length |from | sensor | LEFT JOIN LATERAL TABLE(split(id)) AS newsensor(word, length) ON TRUE """.stripMargin) //调用数据 resTable1.toAppendStream[Row].print("resTable1") resTable2.toAppendStream[Row].print("resTable2") tableSQL1.toAppendStream[Row].print("tableSQL1") tableSQL2.toAppendStream[Row].print("TableSQL2") env.execute("FlinkSqlUDFTableFunction") } class Split() extends TableFunction[(String,Int)]{ def eval(str:String):Unit = { str.split("\\|").foreach( word => collect((word,word.length)) ) } } } ``` #### 2.4 **聚合函数(Aggregate Functions)** 用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的数据,聚合成一个标量值。用户定义的聚合函数,是通过继承 `AggregateFunction` 抽象类实现的。 ![12.Table API和Flink SQL之函数01.png](https://lilinchao.com/usr/uploads/2022/03/2321844792.png) 上图中显示了一个聚合的例子。 假设现在有一张表,包含了各种饮料的数据。该表由三列(id、name 和 price)、五行组成数据。现在我们需要找到表中所有饮料的最高价格,即执行 max()聚合,结果将是一个数值。 **AggregateFunction 的工作原理如下:** + 首先,它需要一个累加器,用来保存聚合中间结果的数据结构(状态)。可以通过调用 `AggregateFunction` 的 `createAccumulator()`方法创建空累加器。 + 随后,对每个输入行调用函数的 accumulate() 方法来更新累加器。 + 处理完所有行后,将调用函数的 getValue() 方法来计算并返回最终结果。 **AggregationFunction 要求必须实现的方法:** + createAccumulator() + accumulate() + getValue() 除了上述方法之外,还有一些可选择实现的方法。其中一些方法,可以让系统执行查询更有效率,而另一些方法,对于某些场景是必需的。例如,如果聚合函数应用在会话窗口 (session group window)的上下文中,则 merge()方法是必需的。 + retract() + merge() + resetAccumulator() 接下来我们写一个自定义 AggregateFunction,计算一下每个 sensor 的平均温度值。 + **数据准备** ```basic 1,Latte,6 2,Milk,3 3,Breve,5 4,Mocha,8 5,Tea,4 ``` + **代码实现** ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.types.Row /** * @author lilinchao * @date 2022/3/5 * @description 1.0 **/ object FlinkSQUDFAggregateFunction { def main(args: Array[String]): Unit = { //1.构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 设置并行度为1 //2.构建TableEnv val tableEnv = StreamTableEnvironment.create(env) //3.构建数据源 tableEnv.connect(new FileSystem().path("datas/tea.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("name",DataTypes.STRING()) .field("price",DataTypes.DOUBLE()) ).createTemporaryTable("datas") val AvgTemp = new AvgTemp() val table = tableEnv.from("datas") val resTableApi = table.groupBy('id) .aggregate(AvgTemp('price) as 'sumprice) .select('id, 'sumprice) tableEnv.registerFunction("avgTemp",AvgTemp) val tableSQL = tableEnv.sqlQuery( """ |select | id, | avgTemp(price) |from datas group by id """.stripMargin) resTableApi.toRetractStream[Row].print("resTableApi") tableSQL.toRetractStream[Row].print("tableSQL") env.execute("FlinkSQUDFAggregateFunction") } class AvgTempAcc{ var sum:Double = 0.0 var count:Int = 0 } class AvgTemp extends AggregateFunction[Double,AvgTempAcc]{ //处理完所有行后,将调用函数的 getValue()方法来计算并返回最终结果。 override def getValue(acc: AvgTempAcc): Double = { acc.sum / acc.count } //累加器保存结果状态 override def createAccumulator(): AvgTempAcc = new AvgTempAcc() //对每个输入行调用函数的 accumulate()方法来更新累加器 def accumulate(accumulator:AvgTempAcc,price:Double):Unit = { accumulator.sum += price accumulator.count += 1 } } } ``` #### 2.5 **表聚合函数(Table Aggregate Functions)** 户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAGGs),可以把一个表中数据,聚合为具有多行和多列的结果表。这跟 AggregateFunction 非常类似,只是之前聚合结果是一个标量值,现在变成了一张表。 ![12.Table API和Flink SQL之函数02.png](https://lilinchao.com/usr/uploads/2022/03/1999145406.png) 比如现在我们需要找到表中所有饮料的前 2 个最高价格,即执行 top2()表聚合。我们需要检查 5 行中的每一行,得到的结果将是一个具有排序后前 2 个值的表。 用户定义的表聚合函数,是通过继承 `TableAggregateFunction` 抽象类来实现的。 **TableAggregateFunction 的工作原理如下:** + 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。 通过调用 `TableAggregateFunction` 的 `createAccumulator()`方法可以创建空累加器。 + 随后,对每个输入行调用函数的 `accumulate()`方法来更新累加器。 + 处理完所有行后,将调用函数的 emitValue()方法来计算并返回最终结果。 **AggregationFunction 要求必须实现的方法:** + createAccumulator() + accumulate() 除了上述方法之外,还有一些可选择实现的方法。 + retract() + merge() + resetAccumulator() + emitValue() + emitUpdateWithRetract() 接下来我们写一个自定义 TableAggregateFunction,用来提取每个 price 最高的两个平均值。 + **数据准备** ```basic 1,Latte,6 2,Milk,3 3,Breve,5 4,Mocha,8 5,Tea,4 ``` + **实现代码** ```java import org.apache.flink.util.Collector import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} import org.apache.flink.table.functions.TableAggregateFunction import org.apache.flink.types.Row /** * @author lilinchao * @date 2022/3/4 * @description 表聚合函数 **/ object FlinkSqlUDFTableAggregateFunction { def main(args: Array[String]): Unit = { //1.构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 设置并行度为1 //2.构建TableEnv val tableEnv = StreamTableEnvironment.create(env) //3.构建数据源 tableEnv.connect(new FileSystem().path("datas/tea.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("name",DataTypes.STRING()) .field("price",DataTypes.DOUBLE()) ).createTemporaryTable("datas") val table = tableEnv.from("datas") val temp = new Top2Temp() val tableApi = table.groupBy('id) .flatAggregate(temp('price) as ('tmpprice,'rank)) .select('id,'tmpprice,'rank) tableEnv.registerFunction("temp",temp) tableApi.toRetractStream[Row].print() env.execute("FlinkSqlUDFTableAggregateFunction") } class Top2TempAcc{ var highestPrice:Double = Int.MinValue var secodeHighestPrice:Double = Int.MinValue } class Top2Temp extends TableAggregateFunction[(Double,Int),Top2TempAcc]{ override def createAccumulator(): Top2TempAcc = new Top2TempAcc def accumulate(acc:Top2TempAcc,temp:Double):Unit = { if(temp > acc.highestPrice){ acc.secodeHighestPrice = acc.highestPrice acc.highestPrice = temp }else if(temp > acc.secodeHighestPrice){ acc.highestPrice = temp } } def emitValue(acc:Top2TempAcc,out:Collector[(Double,Int)]):Unit = { out.collect(acc.highestPrice,1) out.collect(acc.secodeHighestPrice,2) } } } ``` *附参考文章:* *《尚硅谷大数据之Flink》* *https://www.51cto.com/article/638155.html*
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1935.html
上一篇
11.Flink之SQL中窗口的定义
下一篇
DataX介绍
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Java
链表
SQL练习题
Flink
Map
Eclipse
Git
VUE
Redis
机器学习
Spark
哈希表
散列
Elasticsearch
JavaScript
MyBatis
Ubuntu
SpringCloudAlibaba
设计模式
持有对象
FileBeat
工具
Nacos
数据结构
Golang
序列化和反序列化
nginx
Spark Core
字符串
SpringBoot
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞