Flink Checkpoint 设计理念深度解析(附源码)
Flink Checkpoint 设计理念深度解析-附源码
- 1. Checkpoint 的核心设计理念
-
- 1.1 为什么需要 Checkpoint?
- 1.2 核心设计目标
- 2. Checkpoint 触发流程(JobManager 侧)
-
- 2.1 CheckpointCoordinator 核心组件
- 2.2 Checkpoint 触发完整流程
- 3. CheckpointBarrier 机制(核心!)
-
- 3.1 CheckpointBarrier 数据结构
- 3.2 CheckpointOptions 配置
- 3.3 Barrier 在数据流中的传播
- 4. Task 端 Checkpoint 执行(核心实现)
-
- 4.1 SubtaskCheckpointCoordinator 协调器
- 4.2 算子状态快照
- 4.3 异步 Checkpoint 完成
- 5. Aligned vs Unaligned Checkpoint(重点对比)
-
- 5.1 Aligned Checkpoint(传统方式)
- 5.2 Unaligned Checkpoint(新方式)
- 5.3 Channel State 持久化(Unaligned 关键)
- 6. PendingCheckpoint 完成流程
-
- 6.1 接收 Task 确认
- 6.2 转换为 CompletedCheckpoint
- 7. Checkpoint 恢复流程
-
- 7.1 从 Checkpoint 恢复
- 7.2 Task 加载状态
- 8. 关键配置与优化
-
- 8.1 核心配置参数
- 8.2 性能优化建议
- 9. 总结:Checkpoint 设计精髓
1. Checkpoint 的核心设计理念
1.1 为什么需要 Checkpoint?
问题场景:
流式作业运行中:
Source → Map → KeyBy → Window → Sink↓ ↓ ↓ ↓ ↓
状态A 状态B 状态C 状态D 状态E如果作业在任意时刻失败,如何恢复到一致性状态?
→ Checkpoint!基于 Chandy-Lamport 分布式快照算法
1.2 核心设计目标
// 三大核心目标
1. Exactly-Once 语义:每条数据恰好被处理一次
2. 高性能:最小化对吞吐量的影响
3. 分布式一致性:全局状态快照一致
2. Checkpoint 触发流程(JobManager 侧)
2.1 CheckpointCoordinator 核心组件
// flink-runtime/.../CheckpointCoordinator.java
public class CheckpointCoordinator {/** Checkpoint ID 生成器 */private final CheckpointIDCounter checkpointIdCounter;/** 待确认的 Checkpoint */@GuardedBy("lock")private final Map<Long, PendingCheckpoint> pendingCheckpoints;/** 已完成的 Checkpoint 存储 */private final CompletedCheckpointStore completedCheckpointStore;/** Checkpoint 存储视图(HDFS/S3等) */private final CheckpointStorageCoordinatorView checkpointStorageView;/** Checkpoint 触发间隔 */private final long baseInterval; // 默认配置的间隔/** Checkpoint 超时时间 */private final long checkpointTimeout;/** 最小间隔时间 */private final long minPauseBetweenCheckpoints;/** 是否启用 Unaligned Checkpoint */private final boolean unalignedCheckpointsEnabled;
}
2.2 Checkpoint 触发完整流程
// 步骤 1: 周期性触发 Checkpoint
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointProperties props,@Nullable String externalSavepointLocation,CheckpointType checkpointType) {synchronized (lock) {// 1.1 生成 Checkpoint IDlong checkpointID = checkpointIdCounter.getAndIncrement();// 1.2 计算需要触发的任务CheckpointPlan checkpointPlan = checkpointPlanCalculator.calculateCheckpointPlan();// 1.3 初始化存储位置CheckpointStorageLocation checkpointLocation = checkpointStorageView.initializeLocationForCheckpoint(checkpointID);// 1.4 创建 PendingCheckpointPendingCheckpoint checkpoint = new PendingCheckpoint(job,checkpointID,timestamp,checkpointPlan,operatorCoordinatorsToConfirm,masterStateIdentifiers,props,onCompletionPromise,pendingCheckpointStats,masterTriggerCompletionPromise);pendingCheckpoints.put(checkpointID, checkpoint);// 1.5 设置超时清理ScheduledFuture<?> cancellerHandle = timer.schedule(() -> {try {synchronized (lock) {if (!checkpoint.isDiscarded()) {checkpoint.abort(CheckpointFailureReason.CHECKPOINT_EXPIRED,null,checkpointsCleaner);}}} catch (Throwable throwable) {LOG.warn("Error while aborting checkpoint", throwable);}},checkpointTimeout,TimeUnit.MILLISECONDS);checkpoint.setCancellerHandle(cancellerHandle);// 步骤 2: 向所有 Source 任务发送 Trigger 消息for (Execution execution : checkpointPlan.getTasksToTrigger()) {execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);}return checkpoint.getCompletionFuture();}
}
执行流程图:
CheckpointCoordinator (JobManager)│├─ 1. 生成 Checkpoint ID (e.g., ID=100)├─ 2. 创建 PendingCheckpoint├─ 3. 计算触发计划└─ 4. 向 Source Tasks 发送 RPC│↓┌───────────────────────────────┐│ Source Task 1 Source Task 2 ││ 收到 Trigger 收到 Trigger │└───────────────────────────────┘↓插入 CheckpointBarrier 到数据流
3. CheckpointBarrier 机制(核心!)
3.1 CheckpointBarrier 数据结构
// flink-runtime/.../CheckpointBarrier.java
public class CheckpointBarrier extends RuntimeEvent {/** Checkpoint ID(严格单调递增) */private final long id;/** Checkpoint 触发时间戳 */private final long timestamp;/** Checkpoint 选项(Aligned/Unaligned) */private final CheckpointOptions checkpointOptions;public CheckpointBarrier(long id, long timestamp, CheckpointOptions options) {this.id = id;this.timestamp = timestamp;this.checkpointOptions = checkNotNull(options);}
}
3.2 CheckpointOptions 配置
// flink-runtime/.../CheckpointOptions.java
public class CheckpointOptions implements Serializable {/** 对齐类型 */public enum AlignmentType {AT_LEAST_ONCE, // 至少一次ALIGNED, // 对齐 Checkpoint(传统)UNALIGNED, // 非对齐 CheckpointFORCED_ALIGNED // 强制对齐}private final SnapshotType checkpointType;private final CheckpointStorageLocationReference targetLocation;private final AlignmentType alignmentType;private final long alignedCheckpointTimeout; // 对齐超时// 工厂方法public static CheckpointOptions forConfig(SnapshotType checkpointType,CheckpointStorageLocationReference locationReference,boolean isExactlyOnceMode,boolean isUnalignedEnabled,long alignedCheckpointTimeout) {if (!isExactlyOnceMode) {return notExactlyOnce(checkpointType, locationReference);} else if (checkpointType.isSavepoint()) {return alignedNoTimeout(checkpointType, locationReference);}