李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
32.Flink Checkpoint配置
Leefs
2022-02-13 PM
2789℃
0条
### 前言 Flink 中的每个方法或算子都能够是**有状态的**。状态化的方法在处理单个`元素/事件`的时候存储数据,让状态成为使各个类型的算子更加精细的重要部分。为了让状态容错,Flink 需要为状态添加 **checkpoint(检查点)**。Checkpoint 使得 Flink 能够恢复状态和在流中的位置,从而向应用提供和无故障执行时一样的语义。 ### 一、开启与配置 Checkpoint 默认情况下 checkpoint 是禁用的。通过调用 `StreamExecutionEnvironment` 的 `enableCheckpointing(n)` 来启用 checkpoint,里面的 *n* 是进行 checkpoint 的间隔,单位毫秒。 **Checkpoint 其他的属性包括:** - **精确一次(exactly-once)对比至少一次(at-least-once)**:你可以选择向 `enableCheckpointing(long interval, CheckpointingMode mode)` 方法中传入一个模式来选择使用两种保证等级中的哪一种。 对于大多数应用来说,精确一次是较好的选择。至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。 - **checkpoint 超时**:如果 checkpoint 执行的时间超过了该配置的阈值,还在进行中的 checkpoint 操作就会被抛弃。 + **checkpoints 之间的最小时间**:该属性定义在 checkpoint 之间需要多久的时间,以确保流应用在 checkpoint 之间有足够的进展。如果值设置为了 *5000*, 无论 checkpoint 持续时间与间隔是多久,在前一个 checkpoint 完成时的至少五秒后会才开始下一个 checkpoint。 往往使用“checkpoints 之间的最小时间”来配置应用会比 checkpoint 间隔容易很多,因为“checkpoints 之间的最小时间”在 checkpoint 的执行时间超过平均值时不会受到影响(例如如果目标的存储系统忽然变得很慢)。 注意这个值也意味着并发 checkpoint 的数目是*一*。 + **checkpoint 可容忍连续失败次数**:该属性定义可容忍多少次连续的 checkpoint 失败。超过这个阈值之后会触发作业错误 fail over。 默认次数为“0”,这意味着不容忍 checkpoint 失败,作业将在第一次 checkpoint 失败时fail over。 + **并发 checkpoint 的数目**: 默认情况下,在上一个 checkpoint 未完成(失败或者成功)的情况下,系统不会触发另一个 checkpoint。这确保了拓扑不会在 checkpoint 上花费太多时间,从而影响正常的处理流程。 不过允许多个 checkpoint 并行进行是可行的,对于有确定的处理延迟(例如某方法所调用比较耗时的外部服务),但是仍然想进行频繁的 checkpoint 去最小化故障后重跑的 pipelines 来说,是有意义的。 该选项不能和 “checkpoints 间的最小时间"同时使用。 + **externalized checkpoints**: 你可以配置周期存储 checkpoint 到外部系统中。Externalized checkpoints 将他们的元数据写到持久化存储上并且在 job 失败的时候*不会*被自动删除。 这种方式下,如果你的 job 失败,你将会有一个现有的 checkpoint 去恢复。 ### 二、Checkpoint 参数详解 ```java StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 每 60s 开始一次 checkpoint env.enableCheckpointing(60000); // 高级选项: // checkpoint 语义设置为 EXACTLY_ONCE(精确一次),这是默认语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 两次 checkpoint 的间隔时间至少为 1 s,默认是 0,立即进行下一次 checkpoint env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); // checkpoint 必须在 60s 内结束,否则被丢弃,默认是 10 分钟 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一时间只能允许有一个 checkpoint env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 最多允许 checkpoint 失败 3 次 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); // 当 Flink 任务取消时,保留外部保存的 checkpoint 信息 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 当有较新的 Savepoint 时,作业也会从 Checkpoint 处恢复 env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 允许实验性的功能:非对齐的 checkpoint,以提升性能 env.getCheckpointConfig().enableUnalignedCheckpoints(); ``` **说明** + **env.enableCheckpointing(60000)**:1 分钟触发一次 checkpoint; + **setCheckpointTimeout**:checkpoint 超时时间,默认是 10 分钟超时,超过了超时时间就会被丢弃; + **setCheckpointingMode**:设置 checkpoint 语义,可以设置为 EXACTLY_ONCE,表示既不重复消费也不丢数据;AT_LEAST_ONCE,表示至少消费一次,可能会重复消费; + **setMinPauseBetweenCheckpoints**:两次 checkpoint 之间的间隔时间。 假如设置每分钟进行一次 checkpoint,两次 checkpoint 间隔时间为 30s。假设某一次 checkpoint 耗时 40s,那么理论上20s 后就要进行一次 checkpoint,但是设置了两次 checkpoint 之间的间隔时间为 30s,所以是 30s 之后才会进行 checkpoint。另外,如果配置了该参数,那么同时进行的 checkpoint 数量只能为 1; + **enableExternalizedCheckpoints**:Flink 任务取消后,外部 checkpoint 信息是否被清理。 + **DELETE_ON_CANCELLATION**:任务取消后,所有的 checkpoint 都将会被清理。只有在任务失败后,才会被保留; + **RETAIN_ON_CANCELLATION**:任务取消后,所有的 checkpoint 都将会被保留,需要手工清理。 + **setPreferCheckpointForRecovery**:恢复任务时,是否从最近一个比较新的 savepoint 处恢复,默认是 false; + **enableUnalignedCheckpoints**:是否开启试验性的非对齐的 checkpoint,可以在反压情况下极大减少 checkpoint 的次数; *附参考文章链接* *https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/datastream/fault-tolerance/checkpointing/*
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1892.html
上一篇
31.Flink容错机制介绍
下一篇
33.Flink重启策略
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Nacos
Spring
MySQL
数据结构和算法
字符串
Linux
HDFS
CentOS
Azkaban
Spark
持有对象
栈
并发编程
Git
Jquery
MyBatisX
JavaWeb
线程池
GET和POST
机器学习
MyBatis
二叉树
人工智能
pytorch
Eclipse
ClickHouse
微服务
Java
Stream流
RSA加解密
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭