李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
01.Table API和Flink SQL介绍
Leefs
2022-02-17 PM
948℃
0条
[TOC] ### 前言 前面我们介绍过,Flink的API是分层的,而Table API与SQL就位于最顶层。也就是说Table API和SQL是Flink中封装程度最高的API。 ![01.Table API和Flink SQL介绍01.jpg](https://lilinchao.com/usr/uploads/2022/02/638776267.jpg) Flink 自从 0.9 版本开始支持 Table & SQL 功能一直处于完善开发中,且在不断进行迭代。 ### 一、概述 **Flink 本身是批流统一的处理框架,所以 Table API 和 SQL,就是批流统一的上层处理 API。** 目前功能尚未完善,处于活跃的开发阶段。 **Table API**:是一套内嵌在 Java 和 Scala 语言中的查询 API,它允许我们以非常直观的方式, 组合来自一些关系运算符的查询(比如 select、filter 和 join)。 **Flink SQL**:就是直接可以在代码中写 SQL,来实现一些查询(Query)操作。Flink 的 SQL 支持,基于实现了 SQL 标准的 Apache Calcite(Apache 开源SQL解析工具)。 无论输入是批输入还是流式输入,在这两套 API 中,指定的查询都具有相同的语义,得到相同的结果。 **注意** Table API和SQL对有些算子还没有支持。所以在使用Flink原生Table API和SQL时最好要明确其是否支持业务场景。若现有的Table API和SQL无法满足业务需求,那么我们就需要使用DataStream API或DataSet API来实现了。 ### 二、需要引入的依赖 Table API 和 SQL 需要引入的依赖有两个:**planner 和 bridge**。 ```xml
org.apache.flink
flink-table-planner_2.12
1.10.1
org.apache.flink
flink-table-api-scala-bridge_2.12
1.10.1
``` **注意** + 本次使用的Scala版本为2.12,Flink版本为1.10.1 **说明** + **flink-table-planner**:planner 计划器,是 table API 最主要的部分,提供了运行时环境和生成程序执行计划的planner; + **flink-table-api-scala-bridge**:bridge 桥接器,主要负责 table API 和 DataStream/DataSet API 的连接支持,按照语言分 java 和 scala。 这里的两个依赖,是 IDE 环境下运行需要添加的;如果是生产环境,lib 目录下默认已经有了planner,就只需要有 bridge 就可以了。 当然,如果想使用用户自定义函数,或是跟 kafka 做连接,需要有一个 SQL client,这个包含在 flink-table-common 里。 ### 三、两种 planner(old & blink)的区别 1. **批流统一**:Blink 将批处理作业,视为流式处理的特殊情况。所以,blink 不支持表和 DataSet 之间的转换,批处理作业将不转换为 DataSet 应用程序,而是跟流处理一样,转换为 DataStream 程序来处理。 2. 因为批流统一 , `Blink planner` 也不支持 `BatchTableSource` , 而使用有界的 `StreamTableSource` 代替。 3. `Blink planner` 只支持全新的目录,不支持已弃用的 `ExternalCatalog`。 4. 旧 planner 和 `Blink planner` 的 `FilterableTableSource` 实现不兼容。旧的 planner 会把 `PlannerExpressions` 下推到 `filterableTableSource` 中,而 `blink planner` 则会把 Expressions 下推。 5. 基于字符串的键值配置选项仅适用于 `Blink planner`。 6. `PlannerConfig` 在两个 planner 中的实现不同。 7. `Blink planner` 会将多个 sink 优化在一个 DAG 中(仅在 TableEnvironment 上受支持,而在 StreamTableEnvironment 上不受支持)。而旧 planner 的优化总是将每一个 sink 放在一个新的 DAG 中,其中所有 DAG 彼此独立。 8. 旧的 planner 不支持目录统计,而 `Blink planner` 支持。 ### 四、使用示例 #### 4.1 准备数据 + **sensor.txt** ```basic sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 ``` #### 4.2 需求 > 筛选出id为sensor_1的温度值并打印到控制台 #### 4.3 引入依赖 直接引入上方二中的两个依赖 #### 4.4 代码实现 ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.{EnvironmentSettings, Table} import org.apache.flink.table.api.scala._ /** * @author lilinchao * @date 2022/2/17 * @description 筛选出id为sensor_1的温度值并打印到控制台 **/ object TableApiDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputStream: DataStream[String] = env.readTextFile("./datas/sensor.txt") val dataStream: DataStream[SensorReading] = inputStream.map(data => { val dataArray: Array[String] = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }) //基于env创建tableEnv val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build() val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,settings) //从一条流创建一张表 val dataTable: Table = tableEnv.fromDataStream(dataStream) //从表里选取特定的数据 val selectedTable: Table = dataTable.select('id, 'temperature) .filter("id = 'sensor_1'") //进行控制台打印 val selectedStream: DataStream[(String, Double)] = selectedTable.toAppendStream[(String,Double)] selectedStream.print() env.execute("TableApiDemo") } } // 定义样例类,传感器 id,时间戳,温度 case class SensorReading(id: String, timestamp: Long, temperature: Double) ```
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1902.html
上一篇
35.Flink和Kafka实现端到端exactly-once语义详解
下一篇
02.Table API和Flink SQL程序的结构
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Tomcat
JavaSE
BurpSuite
Shiro
队列
并发线程
ajax
前端
Typora
字符串
Azkaban
JavaWeb
Linux
二叉树
CentOS
微服务
Ubuntu
NIO
GET和POST
Flink
栈
Elasticsearch
Livy
Redis
Python
稀疏数组
Flume
Docker
递归
Jenkins
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞