李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
12.Table API和Flink SQL之函数
Leefs
2022-03-05 PM
1664℃
0条
[TOC] ### 前言 Flink Table 和 SQL 内置了很多 SQL 中支持的函数;如果有无法满足的需要,则可以实现用户自定义的函数(UDF)来解决。 ### 一、系统内置函数 Flink Table API 和 SQL 为用户提供了一组用于数据转换的内置函数。SQL 中支持的很多函数,Table API 和 SQL 都已经做了实现,其它还在快速开发扩展中。 以下是一些典型函数的举例,全部的内置函数,可以参考官网介绍。 | 类型 | TableApi | SQLAPI | | :------------: | :----------------------: | :---------------------------------: | | **比较函数** | ANY1 === ANY2 | value1 = value2 | | **比较函数** | NY1 > ANY2 | value1 > value2 | | **逻辑函数** | BOOLEAN1 `双竖杠` BOOLEAN2 | boolean1 OR boolean2 | | **逻辑函数** | BOOLEAN.isFalse | boolean IS FALSE | | **逻辑函数** | !BOOLEAN | NOT boolean | | **算术函数** | NUMERIC1 + NUMERIC2 | numeric1 + numeric2 | | **算术函数** | NUMERIC1.power(NUMERIC2) | POWER(numeric1, numeric2) | | **字符串函数** | STRING1 + STRING2 | string1 `双竖杠` string2 | | **字符串函数** | STRING.upperCase() | UPPER(string) | | **字符串函数** | STRING.charLength() | CHAR_LENGTH(string) | | **时间函数** | STRING.toDate | DATE string | | **时间函数** | STRING.toTimestamp | TIMESTAMP string | | **时间函数** | currentTime() | CURRENT_TIME | | **时间函数** | NUMERIC.days | INTERVAL string range | | **时间函数** | NUMERIC.minutes | | | **聚合函数** | FIELD.count | COUNT(`*`) | | **聚合函数** | FIELD.sum0 | SUM([ ALL `竖杠` DISTINCT ] expression) | | **聚合函数** | | RANK() | | **聚合函数** | | ROW_NUMBER() | ### 二、UDF函数 用户定义函数(User-defined Functions,UDF)是一个重要的特性,因为它们显著地扩展了查询(Query)的表达能力。一些系统内置函数无法解决的需求,我们可以用 UDF 来自 定义实现。 #### 2.1 **注册用户自定义函数 UDF** 在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用。不需要专门为Scala 的 Table API 注册函数。 函数通过调用 `registerFunction()`方法在 TableEnvironment 中注册。当用户定义的函数被注册时,它被插入到 TableEnvironment 的函数目录中,这样 Table API 或 SQL 解析器就可以识别并正确地解释它。 #### 2.2 **标量函数(Scalar Functions)** 用户定义的标量函数,可以将 0、1 或多个标量值,映射到新的标量值。 为了定义标量函数,必须在 `org.apache.flink.table.functions` 中扩展基类 Scalar Function,并实现(一个或多个)求值(evaluation,eval)方法。 标量函数的行为由求值方法决定,求值方法必须公开声明并命名为 eval(直接 def 声明,没有 override)。求值方法的参数类型和返回类型,确定了标量函数的参数和返回类型。 在下面的代码中,我们定义自己的 HashCode 函数,在 TableEnvironment 中注册它,并在查询中调用它。 + **数据准备** ```basic sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9 ``` + **代码实现** ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.types.Row /** * @author lilinchao * @date 2022/3/5 * @description 1.0 **/ object FlinkSqlUdfHashCode { def main(args: Array[String]): Unit = { //1.构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 设置并行度为1 //2.构建TableEnv val tableEnv = StreamTableEnvironment.create(env) //3.构建数据源 tableEnv.connect(new FileSystem().path("datas/sensor.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temperature",DataTypes.DOUBLE()) ).createTemporaryTable("sensor") //转为表 val tableSensor = tableEnv.from("sensor") //创建转换对象 val code = new HashCode() //使用table api进行测试 val tableRes = tableSensor.select('id,code('id)) tableEnv.registerFunction("code",code) //注册udf val tableSQL = tableEnv.sqlQuery( """ |select | id, | code(id) |from | sensor """.stripMargin) //输出 tableRes.toAppendStream[Row].print("tableapi") tableSQL.toAppendStream[Row].print("sql") env.execute("FlinkSqlUdfHashCode") } class HashCode() extends ScalarFunction{ def eval(s:String):String = { s.hashCode.toString } } } ``` #### 2.3 表函数(Table Functions) 与用户定义的标量函数类似,用户定义的表函数,可以将 0、1 或多个标量值作为输入参数; 与标量函数不同的是,它可以返回任意数量的行作为输出,而不是单个值。 为了定义一个表函数,必须扩展 `org.apache.flink.table.functions` 中的基类 TableFunction并实现(一个或多个)求值方法。 表函数的行为由其求值方法决定,求值方法必须是 public的,并命名为 eval。求值方法的参数类型,决定表函数的所有有效参数。 返回表的类型由 TableFunction 的泛型类型确定。求值方法使用 `protected collect(T)`方法发出输出行。 在 Table API 中,Table 函数需要与`.joinLateral` 或`.leftOuterJoinLateral` 一起使用。 `joinLateral` 算子,会将外部表中的每一行,与表函数(TableFunction,算子的参数是它的表达式)计算得到的所有行连接起来。 而 `leftOuterJoinLateral` 算子,则是左外连接,它同样会将外部表中的每一行与表函数计算生成的所有行连接起来;并且,对于表函数返回的是空表的外部行,也要保留下来。 在 SQL 中,则需要使用 `Lateral Table()`,或者带有 ON TRUE 条件的左连接。 下面的代码中,我们将定义一个表函数,在表环境中注册它,并在查询中调用它。 + **数据准备** ```basic hello|word, hello|spark, hello|Flink, hello|java, hello|python, hello|scala ``` + **代码实现** ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.Table import org.apache.flink.table.api.scala._ import org.apache.flink.table.functions.TableFunction import org.apache.flink.types.Row /** * @author lilinchao * @date 2022/3/5 * @description 表函数 **/ object FlinkSqlUDFTableFunction { def main(args: Array[String]): Unit = { //1.构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 设置并行度为1 //2.构建TableEnv val tableEnv = StreamTableEnvironment.create(env) //3.构建数据源 val inputData = env.readTextFile("datas/word_udf.txt") //解析数据 val wordData: DataStream[String] = inputData.flatMap(_.split(",")) //类型转换 val tableWord: Table = tableEnv.fromDataStream(wordData,'id) //调用TableFunction val split = new Split() // Table API方式一 val resTable1 = tableWord.joinLateral(split('id) as ('word,'length)) .select('id,'word,'length) //Table API 方式二 val resTable2 = tableWord.leftOuterJoinLateral(split('id) as ('word,'length)) .select('id,'word,'length) //将数据注册成表 tableEnv.createTemporaryView("sensor",tableWord) tableEnv.registerFunction("split",split) //SQL方式一 val tableSQL1 = tableEnv.sqlQuery( """ |select | id, | word, | length |from | sensor, | LATERAL TABLE(split(id)) AS newsensor(word, length) """.stripMargin) //SQL方式二 val tableSQL2 = tableEnv.sqlQuery( """ |select | id, | word, | length |from | sensor | LEFT JOIN LATERAL TABLE(split(id)) AS newsensor(word, length) ON TRUE """.stripMargin) //调用数据 resTable1.toAppendStream[Row].print("resTable1") resTable2.toAppendStream[Row].print("resTable2") tableSQL1.toAppendStream[Row].print("tableSQL1") tableSQL2.toAppendStream[Row].print("TableSQL2") env.execute("FlinkSqlUDFTableFunction") } class Split() extends TableFunction[(String,Int)]{ def eval(str:String):Unit = { str.split("\\|").foreach( word => collect((word,word.length)) ) } } } ``` #### 2.4 **聚合函数(Aggregate Functions)** 用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的数据,聚合成一个标量值。用户定义的聚合函数,是通过继承 `AggregateFunction` 抽象类实现的。 ![12.Table API和Flink SQL之函数01.png](https://lilinchao.com/usr/uploads/2022/03/2321844792.png) 上图中显示了一个聚合的例子。 假设现在有一张表,包含了各种饮料的数据。该表由三列(id、name 和 price)、五行组成数据。现在我们需要找到表中所有饮料的最高价格,即执行 max()聚合,结果将是一个数值。 **AggregateFunction 的工作原理如下:** + 首先,它需要一个累加器,用来保存聚合中间结果的数据结构(状态)。可以通过调用 `AggregateFunction` 的 `createAccumulator()`方法创建空累加器。 + 随后,对每个输入行调用函数的 accumulate() 方法来更新累加器。 + 处理完所有行后,将调用函数的 getValue() 方法来计算并返回最终结果。 **AggregationFunction 要求必须实现的方法:** + createAccumulator() + accumulate() + getValue() 除了上述方法之外,还有一些可选择实现的方法。其中一些方法,可以让系统执行查询更有效率,而另一些方法,对于某些场景是必需的。例如,如果聚合函数应用在会话窗口 (session group window)的上下文中,则 merge()方法是必需的。 + retract() + merge() + resetAccumulator() 接下来我们写一个自定义 AggregateFunction,计算一下每个 sensor 的平均温度值。 + **数据准备** ```basic 1,Latte,6 2,Milk,3 3,Breve,5 4,Mocha,8 5,Tea,4 ``` + **代码实现** ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.types.Row /** * @author lilinchao * @date 2022/3/5 * @description 1.0 **/ object FlinkSQUDFAggregateFunction { def main(args: Array[String]): Unit = { //1.构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 设置并行度为1 //2.构建TableEnv val tableEnv = StreamTableEnvironment.create(env) //3.构建数据源 tableEnv.connect(new FileSystem().path("datas/tea.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("name",DataTypes.STRING()) .field("price",DataTypes.DOUBLE()) ).createTemporaryTable("datas") val AvgTemp = new AvgTemp() val table = tableEnv.from("datas") val resTableApi = table.groupBy('id) .aggregate(AvgTemp('price) as 'sumprice) .select('id, 'sumprice) tableEnv.registerFunction("avgTemp",AvgTemp) val tableSQL = tableEnv.sqlQuery( """ |select | id, | avgTemp(price) |from datas group by id """.stripMargin) resTableApi.toRetractStream[Row].print("resTableApi") tableSQL.toRetractStream[Row].print("tableSQL") env.execute("FlinkSQUDFAggregateFunction") } class AvgTempAcc{ var sum:Double = 0.0 var count:Int = 0 } class AvgTemp extends AggregateFunction[Double,AvgTempAcc]{ //处理完所有行后,将调用函数的 getValue()方法来计算并返回最终结果。 override def getValue(acc: AvgTempAcc): Double = { acc.sum / acc.count } //累加器保存结果状态 override def createAccumulator(): AvgTempAcc = new AvgTempAcc() //对每个输入行调用函数的 accumulate()方法来更新累加器 def accumulate(accumulator:AvgTempAcc,price:Double):Unit = { accumulator.sum += price accumulator.count += 1 } } } ``` #### 2.5 **表聚合函数(Table Aggregate Functions)** 户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAGGs),可以把一个表中数据,聚合为具有多行和多列的结果表。这跟 AggregateFunction 非常类似,只是之前聚合结果是一个标量值,现在变成了一张表。 ![12.Table API和Flink SQL之函数02.png](https://lilinchao.com/usr/uploads/2022/03/1999145406.png) 比如现在我们需要找到表中所有饮料的前 2 个最高价格,即执行 top2()表聚合。我们需要检查 5 行中的每一行,得到的结果将是一个具有排序后前 2 个值的表。 用户定义的表聚合函数,是通过继承 `TableAggregateFunction` 抽象类来实现的。 **TableAggregateFunction 的工作原理如下:** + 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。 通过调用 `TableAggregateFunction` 的 `createAccumulator()`方法可以创建空累加器。 + 随后,对每个输入行调用函数的 `accumulate()`方法来更新累加器。 + 处理完所有行后,将调用函数的 emitValue()方法来计算并返回最终结果。 **AggregationFunction 要求必须实现的方法:** + createAccumulator() + accumulate() 除了上述方法之外,还有一些可选择实现的方法。 + retract() + merge() + resetAccumulator() + emitValue() + emitUpdateWithRetract() 接下来我们写一个自定义 TableAggregateFunction,用来提取每个 price 最高的两个平均值。 + **数据准备** ```basic 1,Latte,6 2,Milk,3 3,Breve,5 4,Mocha,8 5,Tea,4 ``` + **实现代码** ```java import org.apache.flink.util.Collector import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} import org.apache.flink.table.functions.TableAggregateFunction import org.apache.flink.types.Row /** * @author lilinchao * @date 2022/3/4 * @description 表聚合函数 **/ object FlinkSqlUDFTableAggregateFunction { def main(args: Array[String]): Unit = { //1.构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 设置并行度为1 //2.构建TableEnv val tableEnv = StreamTableEnvironment.create(env) //3.构建数据源 tableEnv.connect(new FileSystem().path("datas/tea.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("name",DataTypes.STRING()) .field("price",DataTypes.DOUBLE()) ).createTemporaryTable("datas") val table = tableEnv.from("datas") val temp = new Top2Temp() val tableApi = table.groupBy('id) .flatAggregate(temp('price) as ('tmpprice,'rank)) .select('id,'tmpprice,'rank) tableEnv.registerFunction("temp",temp) tableApi.toRetractStream[Row].print() env.execute("FlinkSqlUDFTableAggregateFunction") } class Top2TempAcc{ var highestPrice:Double = Int.MinValue var secodeHighestPrice:Double = Int.MinValue } class Top2Temp extends TableAggregateFunction[(Double,Int),Top2TempAcc]{ override def createAccumulator(): Top2TempAcc = new Top2TempAcc def accumulate(acc:Top2TempAcc,temp:Double):Unit = { if(temp > acc.highestPrice){ acc.secodeHighestPrice = acc.highestPrice acc.highestPrice = temp }else if(temp > acc.secodeHighestPrice){ acc.highestPrice = temp } } def emitValue(acc:Top2TempAcc,out:Collector[(Double,Int)]):Unit = { out.collect(acc.highestPrice,1) out.collect(acc.secodeHighestPrice,2) } } } ``` *附参考文章:* *《尚硅谷大数据之Flink》* *https://www.51cto.com/article/638155.html*
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1935.html
上一篇
11.Flink之SQL中窗口的定义
下一篇
DataX介绍
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Redis
数据结构和算法
JVM
容器深入研究
JavaWeb
Jenkins
MyBatis-Plus
设计模式
BurpSuite
VUE
Docker
MySQL
ajax
Thymeleaf
持有对象
队列
SQL练习题
JavaSE
MyBatisX
Java
Git
散列
Kafka
Quartz
Spring
高并发
Shiro
栈
Map
Spark SQL
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭