当前位置: 首页 > news >正文

Flink Checkpoint 设计理念深度解析(附源码)

Flink Checkpoint 设计理念深度解析-附源码

  • 1. Checkpoint 的核心设计理念
    • 1.1 为什么需要 Checkpoint?
    • 1.2 核心设计目标
  • 2. Checkpoint 触发流程(JobManager 侧)
    • 2.1 CheckpointCoordinator 核心组件
    • 2.2 Checkpoint 触发完整流程
  • 3. CheckpointBarrier 机制(核心!)
    • 3.1 CheckpointBarrier 数据结构
    • 3.2 CheckpointOptions 配置
    • 3.3 Barrier 在数据流中的传播
  • 4. Task 端 Checkpoint 执行(核心实现)
    • 4.1 SubtaskCheckpointCoordinator 协调器
    • 4.2 算子状态快照
    • 4.3 异步 Checkpoint 完成
  • 5. Aligned vs Unaligned Checkpoint(重点对比)
    • 5.1 Aligned Checkpoint(传统方式)
    • 5.2 Unaligned Checkpoint(新方式)
    • 5.3 Channel State 持久化(Unaligned 关键)
  • 6. PendingCheckpoint 完成流程
    • 6.1 接收 Task 确认
    • 6.2 转换为 CompletedCheckpoint
  • 7. Checkpoint 恢复流程
    • 7.1 从 Checkpoint 恢复
    • 7.2 Task 加载状态
  • 8. 关键配置与优化
    • 8.1 核心配置参数
    • 8.2 性能优化建议
  • 9. 总结:Checkpoint 设计精髓

1. Checkpoint 的核心设计理念

1.1 为什么需要 Checkpoint?

问题场景:

流式作业运行中:
Source → Map → KeyBy → Window → Sink↓       ↓       ↓       ↓       ↓
状态A    状态B    状态C    状态D    状态E如果作业在任意时刻失败,如何恢复到一致性状态?
→ Checkpoint!基于 Chandy-Lamport 分布式快照算法

1.2 核心设计目标

// 三大核心目标
1. Exactly-Once 语义:每条数据恰好被处理一次
2. 高性能:最小化对吞吐量的影响
3. 分布式一致性:全局状态快照一致

2. Checkpoint 触发流程(JobManager 侧)

2.1 CheckpointCoordinator 核心组件

// flink-runtime/.../CheckpointCoordinator.java
public class CheckpointCoordinator {/** Checkpoint ID 生成器 */private final CheckpointIDCounter checkpointIdCounter;/** 待确认的 Checkpoint */@GuardedBy("lock")private final Map<Long, PendingCheckpoint> pendingCheckpoints;/** 已完成的 Checkpoint 存储 */private final CompletedCheckpointStore completedCheckpointStore;/** Checkpoint 存储视图(HDFS/S3等) */private final CheckpointStorageCoordinatorView checkpointStorageView;/** Checkpoint 触发间隔 */private final long baseInterval;  // 默认配置的间隔/** Checkpoint 超时时间 */private final long checkpointTimeout;/** 最小间隔时间 */private final long minPauseBetweenCheckpoints;/** 是否启用 Unaligned Checkpoint */private final boolean unalignedCheckpointsEnabled;
}

2.2 Checkpoint 触发完整流程

// 步骤 1: 周期性触发 Checkpoint
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointProperties props,@Nullable String externalSavepointLocation,CheckpointType checkpointType) {synchronized (lock) {// 1.1 生成 Checkpoint IDlong checkpointID = checkpointIdCounter.getAndIncrement();// 1.2 计算需要触发的任务CheckpointPlan checkpointPlan = checkpointPlanCalculator.calculateCheckpointPlan();// 1.3 初始化存储位置CheckpointStorageLocation checkpointLocation = checkpointStorageView.initializeLocationForCheckpoint(checkpointID);// 1.4 创建 PendingCheckpointPendingCheckpoint checkpoint = new PendingCheckpoint(job,checkpointID,timestamp,checkpointPlan,operatorCoordinatorsToConfirm,masterStateIdentifiers,props,onCompletionPromise,pendingCheckpointStats,masterTriggerCompletionPromise);pendingCheckpoints.put(checkpointID, checkpoint);// 1.5 设置超时清理ScheduledFuture<?> cancellerHandle = timer.schedule(() -> {try {synchronized (lock) {if (!checkpoint.isDiscarded()) {checkpoint.abort(CheckpointFailureReason.CHECKPOINT_EXPIRED,null,checkpointsCleaner);}}} catch (Throwable throwable) {LOG.warn("Error while aborting checkpoint", throwable);}},checkpointTimeout,TimeUnit.MILLISECONDS);checkpoint.setCancellerHandle(cancellerHandle);// 步骤 2: 向所有 Source 任务发送 Trigger 消息for (Execution execution : checkpointPlan.getTasksToTrigger()) {execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);}return checkpoint.getCompletionFuture();}
}

执行流程图:

CheckpointCoordinator (JobManager)│├─ 1. 生成 Checkpoint ID (e.g., ID=100)├─ 2. 创建 PendingCheckpoint├─ 3. 计算触发计划└─ 4. 向 Source Tasks 发送 RPC│↓┌───────────────────────────────┐│  Source Task 1  Source Task 2 ││  收到 Trigger   收到 Trigger  │└───────────────────────────────┘↓插入 CheckpointBarrier 到数据流

3. CheckpointBarrier 机制(核心!)

3.1 CheckpointBarrier 数据结构

// flink-runtime/.../CheckpointBarrier.java
public class CheckpointBarrier extends RuntimeEvent {/** Checkpoint ID(严格单调递增) */private final long id;/** Checkpoint 触发时间戳 */private final long timestamp;/** Checkpoint 选项(Aligned/Unaligned) */private final CheckpointOptions checkpointOptions;public CheckpointBarrier(long id, long timestamp, CheckpointOptions options) {this.id = id;this.timestamp = timestamp;this.checkpointOptions = checkNotNull(options);}
}

3.2 CheckpointOptions 配置

// flink-runtime/.../CheckpointOptions.java
public class CheckpointOptions implements Serializable {/** 对齐类型 */public enum AlignmentType {AT_LEAST_ONCE,      // 至少一次ALIGNED,            // 对齐 Checkpoint(传统)UNALIGNED,          // 非对齐 CheckpointFORCED_ALIGNED      // 强制对齐}private final SnapshotType checkpointType;private final CheckpointStorageLocationReference targetLocation;private final AlignmentType alignmentType;private final long alignedCheckpointTimeout;  // 对齐超时// 工厂方法public static CheckpointOptions forConfig(SnapshotType checkpointType,CheckpointStorageLocationReference locationReference,boolean isExactlyOnceMode,boolean isUnalignedEnabled,long alignedCheckpointTimeout) {if (!isExactlyOnceMode) {return notExactlyOnce(checkpointType, locationReference);} else if (checkpointType.isSavepoint()) {return alignedNoTimeout(checkpointType, locationReference);} 
http://www.dtcms.com/a/481752.html

相关文章:

  • 从 TF-IDF 到 Word2Vec:让推荐系统更懂语义
  • 01-ELK安装ES,ES-head
  • OpenCV4-直方图与傅里叶变换-项目实战-信用卡数字识别
  • 医院排班挂号系统小程序
  • 河北建设厅网站打不开是什么原因国际新闻直播
  • C++设计模式_行为型模式_命令模式Command
  • Blender自动化展UV插件 UV Factory 4.3 v1 – Powerful Modular Uv Tools
  • 网络与通信安全课程复习汇总2——信息保密
  • 密码学安全:CIA三元组与三大核心技术
  • 建网站怎么做本地的营销网站建设
  • 短剧分销系统技术拆解:渠道推广码生成、订单归因与实时分账系统实现
  • ​RocketMQ 与 RabbitMQ 全面对比:架构、性能与适用场景解析
  • RabbitMQ 消息可靠投递
  • RabbitMQ全面详解:从核心概念到企业级应用
  • 北京市建设工程第四检测所网站小程序定制开发团队
  • 安徽网站优化flash如何做网页
  • AI文档处理:AI在处理扫描版PDF时准确率低,如何提升?
  • TDengine 数学函数 EXP 用户手册
  • C语言自定义变量类型结构体理论:从初见到精通​​​​​​​(下)
  • 医疗网络功能虚拟化与深度强化学习的动态流量调度优化研究(下)
  • SpringMVC练习:加法计算器与登录
  • 小模型的应用
  • 深度学习进阶(一)——从 LeNet 到 Transformer:卷积的荣光与注意力的崛起
  • QPSK信号载波同步技术---极性Costas 法载波同步
  • 盘多多网盘搜索苏州seo排名公司
  • 国外有趣的网站wordpress小视频主题
  • RTC、UDP、TCP和HTTP以及直播等区别
  • Java面试场景:从Spring Web到Kafka的音视频应用挑战
  • 基于EDBO-ELM(改进蜣螂算法优化极限学习机)数据回归预测
  • gaussdb数据库的集中式和分布式