一文读懂 Flink Exactly-Once 保证机制深度解析
一文读懂 Flink Exactly-Once 保证机制深度解析
- 1. 传统方案:Aligned Checkpoint(对齐 Checkpoint)
-
- 1.1 为什么需要 Barrier 对齐?
- 1.2 Aligned Checkpoint 工作流程
- 1.3 核心实现
- 1.4 Aligned Checkpoint 的问题
- 2. 新方案:Unaligned Checkpoint(非对齐 Checkpoint)
-
- 2.1 核心思想
- 2.2 Unaligned Checkpoint 工作流程
- 2.3 为什么还能保证 Exactly-Once?
- 2.4 核心实现
- 2.5 Channel State(通道状态)持久化
- 3. 两种模式对比
-
- 3.1 特性对比表
- 3.2 性能对比图
- 4. 混合模式:Aligned Timeout
-
- 4.1 配置
- 4.2 工作流程
- 4.3 实现原理
- 5. 何时使用哪种模式?
-
- 5.1 使用 Aligned Checkpoint
- 5.2 使用 Unaligned Checkpoint
- 5.3 使用混合模式(推荐)
- 6.配置建议
-
- 6.1 启用非对齐 Checkpoint
- 6.2 配置文件方式
- 7.核心源码位置
- 8. 总结
-
- 8.1 核心要点
- 8.2 演进历程
1. 传统方案:Aligned Checkpoint(对齐 Checkpoint)
1.1 为什么需要 Barrier 对齐?
在传统的 Aligned Checkpoint 中,Barrier 对齐是必须的,原因如下:
问题场景
考虑一个两输入流的算子:Input Stream 1: [A1] [A2] [A3] [Barrier-n] [A4] [A5]↓
Input Stream 2: [B1] [Barrier-n] [B2] [B3] [B4]↓Operator State
如果不对齐会发生什么?
时间线:
T1: 从 Stream2 收到 Barrier-n → 如果立即快照
T2: 继续处理 Stream2 的 [B2] [B3]
T3: 从 Stream1 收到 Barrier-n问题:B2 和 B3 已经影响了状态,但它们属于 Checkpoint n+1如果在 T2 和 T3 之间故障,恢复后:- 从 Checkpoint n 恢复状态(不包含 B2、B3 的影响)- 重放 B2、B3(再次影响状态)→ 重复处理!破坏 Exactly-Once!
1.2 Aligned Checkpoint 工作流程
┌─────────────────────────────────────────────────────────────┐
│ Step 1: 接收第一个 Barrier │
├─────────────────────────────────────────────────────────────┤
Input1: [A1][A2][Barrier-n] ← 收到 Barrier
Input2: [B1][B2][B3][B4] ← 还未收到Action: 阻塞 Input1,缓存后续数据 [A3][A4]继续处理 Input2 的数据
└─────────────────────────────────────────────────────────────┘┌─────────────────────────────────────────────────────────────┐
│ Step 2: 等待对齐(Barrier Alignment) │
├─────────────────────────────────────────────────────────────┤
Input1: [缓存: A3, A4] ← 阻塞
Input2: [B2][B3][Barrier-n] ← 收到 BarrierAction: 现在两个输入都收到 Barrier-n
└─────────────────────────────────────────────────────────────┘┌─────────────────────────────────────────────────────────────┐
│ Step 3: 执行快照 │
├─────────────────────────────────────────────────────────────┤
State: 状态包含所有 Barrier-n 之前的数据= f([A1,A2] 和 [B1,B2,B3])Action:
1. 异步保存状态
2. 向下游发送 Barrier-n
3. 解除阻塞,继续处理缓存的数据
└─────────────────────────────────────────────────────────────┘
1.3 核心实现
// org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierAlignerpublic class CheckpointBarrierAligner {// 收到 Barrier 时的处理public void processBarrier(CheckpointBarrier barrier, InputChannelInfo channel) {if (numBarriersReceived == 0) {// 第一个 BarrierstartOfAlignmentTimestamp = System.nanoTime();}// 阻塞该通道blockChannel(channel);numBarriersReceived++;// 检查是否所有输入都收到 Barrierif (numBarriersReceived == totalNumberOfInputChannels) {// ✅ 对齐完成!执行 CheckpointtriggerCheckpoint(barrier);// 解除所有通道的阻塞releaseBlocksAndResetBarriers();}}// 阻塞期间缓存数据private void blockChannel(InputChannelInfo channel) {blockedChannels[channel.getGateIdx(