Flink 的 checkpoint 对 key state 是怎么样存储的?
核心结论
- Checkpoint 对 Key State 的存储:按 Key Group 划分并持久化到外部存储(如 HDFS);RocksDBStateBackend 会将本地 RocksDB 的状态快照(而非完整内容)上传到 Checkpoint 目录。
- 故障恢复逻辑:作业重启后,故障算子的 Task 会重新分配到其他 TaskManager,通过 Checkpoint 目录下载对应 Key Group 的状态快照,恢复到本地 StateBackend(Heap/RocksDB),最终基于恢复的状态继续处理数据。
一、Checkpoint 对 Key State 的存储机制(按 StateBackend 分类)
Flink 的 Key State 存储和 Checkpoint 持久化逻辑,完全由 StateBackend 决定,核心分为两类:HeapStateBackend
和 RocksDBStateBackend
(生产常用)。两者的本地存储和 Checkpoint 流程差异显著。
1. 核心前提:Key State 按「Key Group」划分
无论哪种 StateBackend,Key State 都会先按 Key Group拆分 —— 这是 Flink 实现状态分片和并行恢复的核心机制:
- Key Group:将所有 Key 哈希映射到固定数量的分组(数量 = 作业最大并行度
maxParallelism
),每个 Task 负责处理若干个 Key Group 的数据和状态。 - 作用:Checkpoint 时按 Key Group 持久化状态,恢复时按 Key Group 重新分配,实现并行恢复。
2. 两种 StateBackend 的 Checkpoint 存储流程
(1)HeapStateBackend(内存型,适用于小状态)
- 本地存储:Key State 直接存储在 TaskManager 的 JVM 堆内存中(如
HashMap
结构)。 - Checkpoint 持久化流程:
- Checkpoint 触发时,Flink 对本地堆内存中的 Key State(按 Key Group 划分)进行 序列化。
- 将序列化后的状态数据,通过网络上传到 外部持久化存储(如 HDFS、S3,由
state.checkpoints.dir
配置)。 - 最终在 Checkpoint 目录生成每个 Key Group 的序列化状态文件,完成持久化。
(2)RocksDBStateBackend(磁盘型,适用于大状态,生产主流)
- 本地存储:Key State 存储在本地磁盘的 RocksDB 实例中(数据以 SST 文件、MANIFEST 等格式持久化到本地磁盘)。
- Checkpoint 持久化流程(核心:上传快照,而非完整复制):
- Checkpoint 触发时,RocksDB 先对当前状态生成 增量快照(Incremental Checkpoint) 或 全量快照(Full Checkpoint):
- 全量:将所有 Key Group 的完整状态快照写入临时文件。
- 增量(默认推荐):仅上传自上次 Checkpoint 以来变化的 SST 文件,大幅减少数据传输量。
- Flink 将 RocksDB 生成的快照文件(按 Key Group 划分)上传到外部持久化存储(如 HDFS)。
- 在 Checkpoint 目录生成对应算子、对应 Key Group 的状态元数据和数据文件,完成持久化。
- Checkpoint 触发时,RocksDB 先对当前状态生成 增量快照(Incremental Checkpoint) 或 全量快照(Full Checkpoint):
关键结论:RocksDB 的本地内容不会完整复制到 HDFS,而是通过「快照 + 增量上传」的方式,将状态快照持久化到 Checkpoint 目录 —— 既保证数据安全,又避免大量冗余传输。
二、Checkpoint 目录结构(以 RocksDB 为例)
外部存储(如 HDFS)的 Checkpoint 目录按「作业 → Checkpoint ID → 算子 → Key Group」层级组织,清晰存储每个 Key State 的快照,示例结构如下:
hdfs:///flink-checkpoints/ # 配置的 state.checkpoints.dir
├─ job-xxx/ # 作业唯一标识
│ ├─ cp-123/ # Checkpoint ID(递增)
│ │ ├─ operator-456/ # 算子唯一标识(如 KeyedProcessFunction)
│ │ │ ├─ keygroup-0/ # Key Group 0 的状态
│ │ │ │ ├─ 0.sst # RocksDB 的 SST 数据文件
│ │ │ │ └─ MANIFEST # 状态元数据文件
│ │ │ ├─ keygroup-1/ # Key Group 1 的状态
│ │ │ └─ ...
│ │ ├─ operator-789/ # 其他算子的状态
│ │ └─ _metadata # Checkpoint 元数据文件(记录所有算子、Key Group 的状态位置)
_metadata
是核心:记录本次 Checkpoint 包含的所有算子、Key Group 的状态文件路径、版本等信息,恢复时作为 “索引” 使用。
三、Operator 所在机器故障的恢复流程
当 Operator 所在的 TaskManager(物理机器)故障时,Flink 基于 Checkpoint 快照 和 Key Group 重分配 实现状态恢复,核心流程如下
步骤序号 | 步骤名称 | 核心内容 |
---|---|---|
1 | 故障检测 | JobManager 通过心跳机制检测 TaskManager 失联,标记该 TaskManager 上所有 Task 为失败状态 |
2 | 作业重启 | 按 restart-strategy 配置触发作业重启,将失败 Operator 的 Task 重新分配至正常 TaskManager |
3 | Key Group 重分配 | 依据作业并行度,将故障 Task 负责的 Key Group 重分配给新 Task(单个接管或多 Task 分摊) |
4 | 从 Checkpoint 下载状态 | 新 Task 读取最新 Checkpoint 的_metadata 文件,获取自身 Key Group 状态文件路径并下载;HeapStateBackend 反序列化为堆内存结构,RocksDBStateBackend 恢复快照至本地实例 |
5 | 本地状态验证与一致性保障 | 验证下载状态完整性(如校验文件哈希),确保与 Checkpoint 时的状态一致 |
6 | 从 Checkpoint 位置恢复消费 | 状态恢复后,从 Checkpoint 记录的数据源消费位置(如 Kafka offset)重新消费数据 |
7 | 基于恢复状态处理数据 | 结合恢复的 Key State(如累计计数、会话状态)处理数据,保障输出一致性(如 Exactly-Once) |
1. 故障检测与作业重启
- 步骤 1:故障检测:JobManager 通过「心跳机制」检测到 TaskManager 失联(超时未发送心跳),标记该 TaskManager 上的所有 Task 为失败状态。
- 步骤 2:作业重启:JobManager 触发作业重启(根据
restart-strategy
配置,如固定延迟重启),重新为失败的 Operator 分配 Task 到其他正常的 TaskManager。
2. State 恢复(核心:Key Group 重分配 + 状态下载)
-
步骤 3:Key Group 重分配:JobManager 根据作业的并行度,将故障 Task 负责的 Key Group 重新分配给新的 Task(可能是单个 Task 接管,或分摊给多个 Task,取决于并行度是否调整)。
- 例:原 Task A 负责 Key Group 0~9,故障后可能由新 Task B 接管 0~9,或由 Task B 接管 0~4、Task C 接管 5~9。
-
步骤 4:从 Checkpoint 下载状态:新 Task 启动后,读取最新完成的 Checkpoint 目录中的
_metadata
文件,找到自身负责的 Key Group 对应的状态文件路径(如 HDFS 上的keygroup-0/
、keygroup-1/
等)。- 若使用 HeapStateBackend:下载序列化的状态文件,反序列化为堆内存中的 Key State 结构(如
HashMap
)。 - 若使用 RocksDBStateBackend:下载 RocksDB 的快照文件(SST、MANIFEST 等),恢复到本地磁盘的 RocksDB 实例中,重建 Key State。
- 若使用 HeapStateBackend:下载序列化的状态文件,反序列化为堆内存中的 Key State 结构(如
-
步骤 5:本地状态验证与一致性保障:新 Task 验证下载的状态完整性(如校验文件哈希),确保与 Checkpoint 时的状态一致,避免数据损坏。
3. 作业继续运行
- 步骤 6:从 Checkpoint 位置恢复数据消费:状态恢复完成后,Task 从 Checkpoint 记录的「数据源消费位置」(如 Kafka 的 offset)开始,重新消费未处理或可能重复的数据。
- 步骤 7:基于恢复的状态处理数据:Task 结合恢复的 Key State(如累计计数、会话状态等),继续处理数据,确保后续输出的一致性(如 Exactly-Once)。
四、关键总结
- Key State 存储核心:按 Key Group 划分,Checkpoint 时持久化到外部存储,存储方式由 StateBackend 决定。
- RocksDB 与 Checkpoint:不是完整复制本地内容,而是上传快照(增量 / 全量)到外部存储,兼顾效率与安全性。
- 故障恢复本质:Key Group 重分配 + 从 Checkpoint 下载对应状态 + 恢复数据源消费位置,最终实现 “状态一致、数据不丢不重”。
推荐阅读
深入探讨 Apache Flink 中的可扩展状态
Flink重启策略有啥用