李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
08.Table API和Flink SQL动态表和持续查询
Leefs
2022-02-26 PM
1952℃
0条
[TOC] ### 前言 SQL 和关系代数在设计时并未考虑流数据。因此,在关系代数(和 SQL)之间几乎没有概念上的差异。 本文将介绍 Flink 如何在**无界数据集**上实现与**数据库引擎在有界数据上的处理**具有相同的语义。 ### 一、DataStream 上的关系查询 下表比较了传统的关系代数和流处理与输入数据、执行和输出结果的关系。 | | 关系代数(表)/SQL | 流处理 | | ----------------------- | -------------------------- | -------------------------------------------- | | 处理的数据对象 | 字段元祖的有界集合 | 字段元祖的无限序列 | | 查询(Query)对数据的访问 | 可以访问到完整的数据输入 | 无法访问所有数据,必须持续“等待”流式输入 | | 查询终止条件 | 生成固定大小的结果集后终止 | 永不停止,根据持续收到的数据不断更新查询结果 | 尽管存在这些差异,但是使用关系查询和 SQL 处理流并不是不可能的。 #### 物化视图(Materialized Views) + 高级关系数据库系统提供了一个称为**物化视图(Materialized Views)**的特性。 + 物化视图被定义为一条 SQL 查询,就像常规的虚拟视图一样。 + 与虚拟视图相反,**物化视图缓存查询的结果,因此在访问视图时不需要对查询进行计算**。 + 缓存的一个常见难题是防止缓存为过期的结果提供服务。当其定义查询的基表被修改时,物化视图将过期。 **即时视图维护(Eager View Maintenance)**是一种一旦更新了物化视图的基表就立即更新视图的技术。 如果考虑以下问题,那么**即时视图维护**和**流上的SQL查询**之间的联系就会变得显而易见: + 数据库表是 `INSERT`、`UPDATE` 和 `DELETE` DML 语句的 *stream* 的结果,通常称为 *changelog stream* 。 + 物化视图被定义为一条 SQL 查询。为了更新视图,查询不断地处理视图的基本关系的changelog 流。 + 物化视图是流式 SQL 查询的结果。 有了上面的基础,下面可以介绍一下动态表的概念了。 ### 二、动态表(Dynamic Tables) #### 2.1 动态表概念 因为流处理面对的数据,是连续不断的,这和我们熟悉的关系型数据库中保存的“表”完全不同。所以,如果我们把流数据转换成 Table,然后执行类似于table 的select 操作,结果就不是一成不变的,而是随着新数据的到来,会不停更新。 我们可以随着新数据的到来,不停地在之前的基础上更新结果。这样得到的表,在FlinkTable API 概念里,就叫做“**动态表**”(Dynamic Tables)。 #### 2.2 动态表的特点 动态表是 Flink 对流数据的 `Table API` 和 `SQL` 支持的核心概念。 + 与表示批处理数据的静态表不同,动态表是随时间变化的。 + 动态表可以像静态的批处理表一样进行查询,查询一个动态表会产生**持续查询(Continuous Query)**。连续查询永远不会终止,并会生成另一个动态表。 + 查询(Query)会不断更新其**动态结果表**,以反映其动态输入表上的更改。 本质上,动态表上的连续查询非常类似于定义物化视图的查询。 需要注意的是,连续查询的结果在语义上总是等价于以批处理模式在输入表快照上执行的相同查询的结果。 ### 三、持续查询(Continuous Query) #### 3.1 流式持续查询的过程 下图显示了流、动态表和连续查询之间的关系: ![08.Table API和Flink SQL动态表和持续查询01.png](https://lilinchao.com/usr/uploads/2022/02/3833178427.png) 1. 将流转换为动态表。 2. 在动态表上计算一个连续查询,生成一个新的动态表。 3. 生成的动态表被转换回流。 **注意:** 动态表首先是一个逻辑概念。在查询执行期间不一定(完全)物化动态表。 #### 3.2 将流转换成表(Table) 为了处理带有关系查询的流,必须先将其转换为表。 从概念上讲,流的每个数据记录,都被解释为对结果表的插入(Insert)修改。因为流是持续不断的,而且之前的输出结果无法改变。本质上,我们其实是从一个只有插入操作的**changelog(更新日志)流**,来构建一个表。 为了更好地说明动态表和持续查询的概念,我们来举一个具体的例子。 比如,我们现在的输入数据,就是用户在网站上的访问行为,数据类型(Schema)如下: ```json [ user: VARCHAR, // 用户名 cTime: TIMESTAMP, // 访问某个 URL 的时间戳 url: VARCHAR // 用户访问的 URL ] ``` 下图显示了如何将访问 URL 事件流,或者叫点击事件流(左侧)转换为表(右侧)。 ![08.Table API和Flink SQL动态表和持续查询02.png](https://lilinchao.com/usr/uploads/2022/02/1209487823.png) 随着插入更多的访问事件流记录,生成的表将不断增长。 #### 3.3 持续查询(Continuous Query) + 持续查询,会在动态表上做计算处理,并作为结果生成新的动态表。与批处理查询不同,连续查询从不终止,并根据输入表上的更新更新其结果表。 + 在任何时间点,连续查询的结果在语义上,等同于在输入表的快照上,以批处理模式执行的同一查询的结果。 在下面的示例中,我们展示了对点击事件流中的一个持续查询。 这个 Query 很简单,是一个分组聚合做 count 统计的查询。它将用户字段上的clicks 表分组,并统计访问的 url 数。图中显示了随着时间的推移,当 clicks 表被其他行更新时如何计算查询。 ![08.Table API和Flink SQL动态表和持续查询03.png](https://lilinchao.com/usr/uploads/2022/02/2135373799.png) **过程分析** + 当查询开始,`clicks` 表(左侧)是空的。 + 当第一行数据被插入到 `clicks` 表时,查询开始计算结果表。第一行数据 `[Mary,./home]` 插入后,结果表(右侧,上部)由一行 `[Mary, 1]` 组成。 + 当第二行 `[Bob, ./cart]` 插入到 `clicks` 表时,查询会更新结果表并插入了一行新数据 `[Bob, 1]`。 + 第三行 `[Mary, ./prod?id=1]` 将产生已计算的结果行的更新,`[Mary, 1]` 更新成 `[Mary, 2]`。 + 最后,当第四行数据加入 `clicks` 表时,查询将第三行 `[Liz, 1]` 插入到结果表中。 #### 3.4 查询限制 许多(但不是全部)语义上有效的查询可以作为流上的连续查询进行评估。有些查询代价太高而无法计算,这可能是由于它们需要维护的状态大小,也可能是由于计算更新代价太高。 + **状态大小**:连续查询在无界流上计算,通常应该运行数周或数月。因此,连续查询处理的数据总量可能非常大。必须更新先前输出的结果的查询需要维护所有输出的行,以便能够更新它们。 例如,第一个查询示例需要存储每个用户的 URL 计数,以便能够增加该计数并在输入表接收新行时发送新结果。如果只跟踪注册用户,则要维护的计数数量可能不会太高。但是,如果未注册的用户分配了一个惟一的用户名,那么要维护的计数数量将随着时间增长,并可能最终导致查询失败。 ```sql SELECT user, COUNT(url) FROM clicks GROUP BY user; ``` + **计算更新**:有些查询需要重新计算和更新大量已输出的结果行,即使只添加或更新一条输入记录。显然,这样的查询不适合作为连续查询执行。 下面的查询就是一个例子,它根据最后一次单击的时间为每个用户计算一个 `RANK`。一旦 `click` 表接收到一个新行,用户的 `lastAction` 就会更新,并必须计算一个新的排名。然而,由于两行不能具有相同的排名,所以所有较低排名的行也需要更新。 ```sql SELECT user, RANK() OVER (ORDER BY lastAction) FROM ( SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user ); ``` #### 3.5 将动态表转换成流 与常规的数据库表一样,动态表可以通过**插入(Insert)、更新(Update)和删除(Delete)**更改,进行持续的修改。 将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。 Flink 的 `Table API` 和 SQL 支持三种方式对动态表的更改进行编码: ##### **(1)仅追加(Append-only)流** 仅通过插入(Insert)更改,来修改的动态表,可以直接转换为“仅追加”流。这个流中发出的数据,就是动态表中新增的每一行。 ##### **(2)撤回(Retract)流** Retract 流是包含两类消息的流,**添加(Add)消息和撤回(Retract)消息**。 动态表通过将**INSERT编码为 add 消息**、**DELETE 编码为 retract 消息**、**UPDATE 编码为被更改行(前一行)的 retract 消息和更新后行(新行)的 add 消息**,转换为retract 流。 下图显示了将动态表转换为 retract 流的过程。 ![08.Table API和Flink SQL动态表和持续查询04.png](https://lilinchao.com/usr/uploads/2022/02/699858371.png) ##### **(3)Upsert(更新插入)流** Upsert 流包含两种类型的消息:**Upsert 消息和 delete 消息**。**转换为upsert 流的动态表,需要有唯一的键(key)**。 通过将 INSERT 和 UPDATE 更改编码为 upsert 消息,将 DELETE 更改编码为DELETE 消息,就可以将具有唯一键(Unique Key)的动态表转换为流。 下图显示了将动态表转换为 upsert 流的过程。 ![08.Table API和Flink SQL动态表和持续查询05.png](https://lilinchao.com/usr/uploads/2022/02/3309073888.png) 这些概念我们之前都已提到过。需要注意的是,在代码里将动态表转换为`DataStream`时,仅支持 Append 和 Retract 流。而向外部系统输出动态表的 `TableSink` 接口,则可以有不同的实现,比如ES,就可以有 `Upsert` 模式。 *附参考文章链接:* *https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/dynamic_tables/*
标签: none
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1915.html
上一篇
07.将表转换成DataStream
下一篇
09.Table API和Flink SQL之表的时间特性
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
数据结构和算法
Jenkins
Azkaban
Eclipse
Ubuntu
递归
字符串
查找
Golang基础
Kibana
数据结构
序列化和反序列化
Tomcat
Python
散列
栈
Yarn
Scala
锁
gorm
SQL练习题
MyBatis
Flume
Java阻塞队列
链表
nginx
Nacos
FileBeat
JavaWEB项目搭建
Filter
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭