李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
04.Table API和Flink SQL表的查询
Leefs
2022-02-22 PM
1372℃
0条
[TOC] ### 一、表的查询 利用外部系统的连接器connector,我们可以读写数据,并在环境的Catalog中注册表。接下来就可以对表做查询转换了。 Flink给我们提供了两种查询方式:Table API和 SQL。 ### 二、Table API的调用 Table API 是集成在 Scala 和 Java 语言内的查询 API。与 SQL 不同,Table API 的查询不会用字符串表示,而是在宿主语言中一步一步调用完成的。 Table API 基于代表一张“表”的 Table 类,并提供一整套操作处理的方法 API。这些方法会返回一个新的 Table 对象,这个对象就表示对输入表应用转换操作的结果。有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构。例如 table.select(…).filter(…),其中 select(…)表示选择表中指定的字段,filter(…)表示筛选条件。 代码中的实现如下: ```java val sensorTable: Table = tableEnv.from("inputTable") val resultTable: Table = senorTable .select("id, temperature") .filter("id ='sensor_1'") ``` ### 三、SQL查询 Flink 的 SQL 集成,基于的是 ApacheCalcite,它实现了 SQL 标准。在 Flink 中,用常规字符串来定义 SQL 查询语句。SQL 查询的结果,是一个新的 Table。 代码实现如下: ```java val resultSqlTable: Table = tableEnv.sqlQuery("select id, temperature from inputTable where id ='sensor_1'") ``` 或者: ```java val resultSqlTable: Table = tableEnv.sqlQuery( """ |select id, temperature |from inputTable |where id = 'sensor_1' """.stripMargin) ``` 当然,也可以加上聚合操作,比如我们统计每个 sensor 温度数据出现的个数,做个 count 统计: ```java val aggResultTable = sensorTable .groupBy('id) .select('id, 'id.count as 'count) ``` SQL的实现: ```java val aggResultSqlTable = tableEnv.sqlQuery("select id, count(id) as cnt from inputTable group by id") ``` 这里 Table API 里指定的字段,前面加了一个单引号’,这是 Table API 中定义的 Expression类型的写法,可以很方便地表示一个表中的字段。 字段可以直接全部用双引号引起来,也可以用半边单引号+字段名的方式。以后的代码中,一般都用后一种形式。 ### 四、简单示例 #### 4.1 Table API示例 ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.scala._ /** * @author lilinchao * @date 2022/2/22 * @description 1.0 **/ object TableApiDemo03 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputStream: DataStream[String] = env.readTextFile("./datas/sensor.txt") val datastream: DataStream[SensorReading] = inputStream.map(data => { val dataArray: Array[String] = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }) //首先创建表执行环境 val tableEnv = StreamTableEnvironment.create(env) //基于流创建一张表 val dataTable = tableEnv.fromDataStream(datastream) //调用table API进行转换查询 val resultTable = dataTable .select("id,temperature") .filter("id =='sensor_1'")//筛选出id为sensor_1的温度值 //输出 resultTable.toAppendStream[(String,Double)].print("result") env.execute("TableApiDemo03") } } // 定义样例类,传感器 id,时间戳,温度 case class SensorReading(id: String, timestamp: Long, temperature: Double) ``` #### 4.2 SQL查询示例 ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.scala._ /** * @author lilinchao * @date 2022/2/22 * @description 1.0 **/ object SQLDemo01 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputStream: DataStream[String] = env.readTextFile("./datas/sensor.txt") val datastream: DataStream[SensorReading] = inputStream.map(data => { val dataArray: Array[String] = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }) //首先创建表执行环境 val tableEnv = StreamTableEnvironment.create(env) //基于流创建一张表 val dataTable = tableEnv.fromDataStream(datastream) //创建视图(虚拟的表) tableEnv.createTemporaryView("dataTable",dataTable) //查询转换 写到这里sqlQuery()里面敲三个双引号回车 val resultSqlTable = tableEnv.sqlQuery( """ |select id,temperature |from dataTable |where id = 'sensor_1' |""".stripMargin) //输出 resultSqlTable.toAppendStream[(String,Double)].print("result_sql") env.execute("SQLDemo01") } } ```
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1906.html
上一篇
03.Flink SQL之在Catalog中注册表
下一篇
05.将DataStream转换成表
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
链表
队列
散列
Java工具类
查找
NIO
JavaScript
数学
Linux
Git
Zookeeper
JavaWEB项目搭建
序列化和反序列化
MyBatis
前端
Scala
Golang基础
人工智能
数据结构
算法
GET和POST
Spark Core
容器深入研究
随笔
递归
SpringCloudAlibaba
MyBatis-Plus
设计模式
Beego
线程池
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭