李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
29.Flink状态后端(State Backends)
Leefs
2022-02-12 PM
1228℃
0条
[TOC] ### 一、概念 + 每传入一条数据,有状态的算子任务都会读取和更新状态; + 由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务都会在本地维护其状态,以确保快速的状态访问; + 状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做**状态后端**(state backend); + 状态后端主要负责两件事:**本地的状态管理**,以及**将检查点(checkpoint)状态写入远程存储。** **总结:State Backends 的作用就是用来维护State的。** ### 二、功能 #### 2.1 本地状态管理(Local State Management) **State Management的主要任务是确保状态的更新和访问。**类似于关系数据库中的数据,`State Backends` 的状态管理就是提供对State 的访问或更新操作,从这一点上看,`State Backends`与数据库很相似。 **Flink提供的`State Backends`主要有两种形式的状态管理:** - 直接将State以对象的形式存储到JVM的堆上面 - 将State对象序列化后存储到RocksDB中(RocksDB会写到本地的磁盘上) 以上两种方式,第一种存储到JVM堆中,因为是在内存中读写,延迟会很低,但State的大小受限于内存的大小;第二种方式存储到State Backends上(本地磁盘上),读写较内存会慢一些,但不受内存大小的限制,同时因为state存储在磁盘上,可以减少应用程序对内存的占用。根据使用经验,对延迟不是特别敏感的应用,选择第二种方式较好,尤其是State比较大的情况下。 #### 2.2 远程状态备份(Remote State Checkpointing) Flink程序是分布式运行的,而State都是存储到各个节点上的,一但TaskManager节点出现问题,就会导致State的丢失。`State Backend` 提供了`State Checkpointing`的功能,将TaskManager本地的State的备份到远程的存储介质上,可以是分布式的存储系统或者数据库。不同的State Backends备份的方式不同,会有效率高低的区别。 ### 三、分类 Flink一共提供了三种StateBackend,包括: + 基于内存的`MemoryStateBackend` + 基于文件系统的`FsStateBackend` + 基于RockDB存储介质的`RocksDBState-Backend` #### 3.1 MemoryStateBackend + MemoryStateBackend是将状态维护在 Java 堆上的一个内部状态后端。 + 键值状态和窗口算子使用哈希表来存储数据值和定时器。当应用程序checkpoint时,状态后端会在将状态发给JobManager之前对状态进行快照,JobManager会将状态存储在 Java 堆上。 + 默认情况下,MemoryStateBackend会配置成支持异步快照。异步快照可以避免阻塞数据流的处理,从而避免反压的发生。 **使用 MemoryStateBackend 时的注意点:** - 默认情况下,每一个状态最大为 5 MB。可以通过 MemoryStateBackend 的构造函数增加最大大小。 - 状态大小受到 Akka 帧大小的限制,所以无论在配置中怎么配置状态大小,都不能大于 Akka 的帧大小。 - 状态的总大小不能超过 JobManager 的内存。 **什么时候使用 MemoryStateBackend:** + 本地开发或调试时建议使用 MemoryStateBackend,因为这种场景的状态大小的是有限的。 + MemoryStateBackend 非常适合状态比较小的用例和流处理程序。例如一次仅一条记录的函数(Map, FlatMap,或 Filter)或者 Kafka consumer。 **使用示例** ```java val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //配置使用MemoryStateBackend env.setStateBackend(new MemoryStateBackend) //设置最大状态值 //streamEnv.setStateBackend(new MemoryStateBackend(10*1024*1024)) ``` #### 3.2 FsStateBackend FsStateBackend配置需要文件系统的URL(类型,地址,路径)等来配置。 举个例子,比如可以是: - hdfs://namenode:50070/flink/checkpoints - s3://flink/checkpoints 当选择 FsStateBackend 时,正在处理的数据会保存在 TaskManager 的内存中。在 checkpoint 时,状态后端会将状态快照写入配置的文件系统目录和文件中,同时会在 JobManager 或者 Zookeeper(在高可用场景下)的内存中存储极少的元数据。 默认情况下,FsStateBackend 会配置提供异步快照,以避免在写状态 checkpoint 时阻塞数据流的处理。该特性可以通过在实例化 FsStateBackend 时将布尔标志设置为 false 来禁用,例如: ```java new FsStateBackend(path, false); ``` **注意:**当前的状态仍然会先存在 TaskManager 中,所以状态的大小同样不能超过 TaskManager 的内存。 **什么时候使用 FsStateBackend:** + FsStateBackend 非常适合处理大状态,长窗口,或大键值状态的有状态流处理作业。 + FsStateBackend 非常适合高可用方案。 **使用示例** ```java val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //配置使用FsStateBackend env.setStateBackend(new FsStateBackend("存储路径")) ``` #### 3.3 RocksDBStateBackend RocksDBStateBackend 的配置同样需要文件系统的 URL(类型,地址,路径)等来配置,如下所示: - hdfs://namenode:50070/flink/checkpoints - s3://flink/checkpoints RocksDBStateBackend 将正在处理的数据使用 RocksDB 存储在本地磁盘上。 在 checkpoint 时,整个 RocksDB 数据库会被存储到配置的文件系统中,或者在超大状态作业时可以将增量差异数据存储到配置的文件系统中。 该状态后端同时也会在 JobManager 或者 Zookeeper(在高可用场景下)的内存中存储极少的元数据。 RocksDB 默认也是配置成异步快照。 **使用 RocksDBStateBackend 时的注意点:** + RocksDB 的每个 key 和 value 的最大大小为 2^31 字节。这是因为 RocksDB 的 JNI API 是基于 byte[] 的。 + 我们需要在此强调,对于使用合并操作的有状态流处理应用程序,例如 ListState,随着时间的推移可能会累积超过 2^31 字节大小,这将会导致后续的任何检索的失败。 **何时使用 RocksDBStateBackend:** - RocksDBStateBackend 非常适合处理大状态,长窗口,或大键值状态的有状态流处理作业。 - RocksDBStateBackend 非常适合高可用方案。 - RocksDBStateBackend 是目前唯一支持有状态流处理应用程序增量检查点的状态后端。 在使用 RocksDB 时,状态大小只受限于磁盘可用空间的大小。这也使得 RocksDBStateBackend 成为管理超大状态的比较好的选择。使用 RocksDB 的权衡点在于所有状态的访问和检索都需要序列化(或反序列化)才能跨越 JNI 边界。与上面提到的堆上后端相比,这可能会影响应用程序的吞吐量。 **使用示例** RocksDBStateBackend比较特殊,如果需要使用,需要添加依赖: ```xml
org.apache.flink
flink-statebackend-rocksdb_2.12
1.10.1
``` 根据自己的使用的scala和flink版本进行修改 **代码** ```java val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //RocksDBStateBackend除了配置存储路径,还需要配置是否增量存储,否则就是全量存储 env.setStateBackend(new RocksDBStateBackend("存储路径",true)) ``` ### 四、总结 + MemoryStateBackend + 内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager 的 JVM 堆上,而将 checkpoint 存储在JobManager 的内存中 + 特点:快速、低延迟,但不稳定 + FsStateBackend + 将 checkpoint 存到远程的持久化文件系统(FileSystem)上,而对于本地状态,跟 MemoryStateBackend 一样,也会存在 TaskManager 的JVM堆上 + 同时拥有内存级的本地访问速度,和更好的容错保证 + RocksDBStateBackend + 将所有状态序列化后,存入本地的 RocksDB 中存储。 *附参考文章链接:* *https://copyfuture.com/blogs-details/20200102182023903qo0309nxybp71dy* *https://cdn.modb.pro/db/79784*
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1886.html
上一篇
28.Flink ProcessFunction应用示例
下一篇
30.Flink状态一致性
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Hive
正则表达式
JavaWEB项目搭建
MyBatisX
NIO
Jquery
Eclipse
gorm
Java工具类
Hbase
前端
Spark SQL
设计模式
Beego
Hadoop
链表
BurpSuite
Spring
Jenkins
JavaSE
JVM
Redis
Azkaban
栈
Spark Streaming
Ubuntu
队列
递归
VUE
Livy
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞