当前位置: 首页 > news >正文

Flink checkpoint

对齐检查点 (Aligned Checkpoint)

Flink 的分布式快照机制受到 Chandy-Lamport 算法的启发。 其核心元素是数据流中的屏障(Barrier)。

  1. Barrier 注入 :JobManager 中的 Checkpoint Coordinator 指示 Source 任务开始 Checkpoint。Source 任务在数据流中注入 Barrier。这些 Barrier 携带 Checkpoint ID,将数据流分割成属于本次快照的记录和属于下次快照的记录。

  2. Barrier 对齐 :当一个算子(Operator)有多个输入流时,它必须等待所有输入流的 Barrier 都到达后,才会进行自己的状态快照,并向下游广播 Barrier。 在对齐过程中,已经接收到 Barrier 的输入通道会被阻塞,算子会继续处理来自尚未接收到 Barrier 的通道的数据。

  3. 状态快照 :一旦所有 Barrier 到达,算子就进行本地状态快照,并异步上传到持久化存储。

  4. Checkpoint 完成 :当所有 Sink 任务接收到所有输入流的 Barrier 并完成自己的快照后,会通知 JobManager,该 Checkpoint 完成。

对齐检查点的局限性 (尤其在反压情况下)

在应用产生反压时,对齐检查点会面临以下问题:

  • Barrier 流动缓慢 :由于反压,Buffer 中缓存了大量数据,导致 Barrier 在数据流中流动缓慢。

  • 处理阻塞 :对于已经接收到 Barrier 的 Channel,由于需要等待其他 Channel 的 Barrier 进行对齐,其上游数据处理会被阻塞。

  • Checkpoint 完成时间长 :Barrier 可能需要很长时间才能到达 Sink,导致 Checkpoint 完成时间过长。一个耗时过长的 Checkpoint 在完成时可能已经“过时”了。

  • 恶性循环 :长时间的 Checkpoint 可能导致任务超时、崩溃,然后从一个较旧的 Checkpoint 恢复,这可能加剧反压,形成恶性循环,使得任务几乎没有进展。

非对齐检查点 (Unaligned Checkpoint) 的工作原理

为了解决上述问题,Flink 引入了非对齐检查点(FLIP-76)。其核心思想是取消中间算子的 Barrier 对齐过程。

  1. Barrier 注入 :与对齐检查点类似,Barrier 仍然在 Source 端注入。

  2. 无需对齐,立即转发 :中间算子在接收到任何一个输入流的 Barrier 后,不再等待其他输入流的 Barrier。它会:

    1. 短暂阻塞 任务。

    2. 标记 Buffer :记录当前 Buffer 中的数据(这些数据属于当前 Checkpoint)。

    3. 转发 Barrier :立即将 Barrier 向下游算子转发。

    4. 创建状态快照 :进行本地状态快照,这个快照包含了算子自身的状态以及在其输入 Buffer 中、尚未被处理但在 Barrier 之前到达的数据(即所谓的“in-flight data”)。

  3. 快速到达 Sink :由于 Barrier 不再需要等待对齐,它们可以非常快速地传递到 Sink。

  4. Sink 端对齐(可选) :在某些描述中,提到非对齐检查点只在 Sink 端进行对齐,而中间算子则不进行对齐。

怎么保证 Barrier 不会超过之前的数据,和不会被之后的数据超过?

这是通过 Flink 数据传输和处理的有序性来保证的,主要依赖以下几点:

  • 发送端的有序性:

    • 当一个上游算子处理完一批数据后,如果接下来需要发送 Checkpoint Barrier N,它会确保先将这批数据发送出去,然后再发送 Barrier N。之后产生的数据,则会在 Barrier N 之后发送。
    • 这意味着在逻辑上,数据记录和 Barrier 在发送端是被串行化处理和发送的。
  • Flink 网络栈的有序性保证 (Channel 级别):

    • Flink 的 TaskManager 内部(对于 chained operators)的数据传输通道(InputChannel 和 ResultSubpartition)被设计为先进先出 (FIFO) 的。
    • 无论是普通的数据记录还是特殊的 Checkpoint Barrier,一旦被写入一个输出通道,它们就会按照写入的顺序被下游的输入通道读取。
    • 对于跨 TaskManager 的网络传输,Flink 通常依赖 TCP/IP。在单个 TCP 连接内,TCP 协议本身保证了数据包的有序传输。 Flink 会为有数据交互的 Task Subtask 之间建立逻辑连接,这些连接底层可能复用物理 TCP 连接,但 Flink 的网络层会确保逻辑上的数据顺序。
  • 算子处理的有序性:

    • 算子从其输入通道读取数据时,也是按顺序读取的。它不会跳过前面的数据去处理后面的数据(在一个通道内)。

总结一下保证顺序的关键:

  1. 上游按序发送:数据记录和 Barrier 在源头就是按逻辑顺序发送的。
  2. 通道保证 FIFO:Flink 的数据传输通道(无论是内存中的还是跨网络的)保证了消息的先进先出。
  3. 下游按序接收和处理:下游算子按顺序从通道中读取数据。

当然,如果一个算子有多个输入通道(例如来自不同的上游算子,或者同一个上游算子的不同并行实例),那么不同通道上的 Barrier N 到达时间确实可能因为网络延迟、处理速度等原因而不同。这正是 Barrier Alignment (屏障对齐) 机制要解决的问题:算子会等待所有输入通道的 Barrier N 都到达后,才进行自己的状态快照,以确保快照的一致性。

PipelinedSubpartition

PipelinedSubpartition 使用 PrioritizedDeque 使得 Flink 能够在保证普通数据流的 FIFO 特性的基础上,有效地处理需要优先响应的特殊事件。这比单纯的 ArrayDeque 提供了更灵活的控制流管理。

PrioritizedDeque 是一个很有意思的数据结构,它结合了队列和优先级处理的能力:

  1. 基本行为: 它仍然是一个双端队列 (Deque),可以从头部和尾部添加或移除元素。
  2. 优先级处理:
    • 当一个具有优先级的 BufferConsumer(例如,一个非对齐检查点的 CheckpointBarrier,或者其他一些控制事件)被添加到 PipelinedSubpartition 时,它通常会被放入 PrioritizedDeque 的队首(通过类似 addPriorityElement 的方法,具体实现在 PrioritizedDeque 内部)。
    • 普通的、非优先级的 BufferConsumer(即大部分数据 Buffer)则会被添加到 PrioritizedDeque 的队尾(通过 buffers.add(...),这通常是 Deque 的标准 addLast 行为)。
  3. 数据消费:
    • 当数据被消费时(例如通过 pollBuffer() 方法),元素总是从 PrioritizedDeque 的队首被取出。

这对 FIFO 意味着什么?

  • 对于普通数据流: 如果没有优先事件插入,普通的数据 Buffer 遵循严格的 FIFO 顺序。它们被加入队尾,然后按顺序从队首被消费。
  • 当优先事件发生时: 如果一个优先事件(比如一个需要立即处理的 Barrier)到达,它会被插入到队首。这意味着它会在任何已在队列中但尚未被消费的普通数据 Buffer 之前被处理。这对于确保像非对齐 Checkpoint Barrier 这样的控制信令能够及时响应是至关重要的。一旦所有优先元素被处理完毕,队列会继续从队首消费那些之前按 FIFO 顺序排列的普通数据 Buffer。

Flink 在恢复时怎么知道快照包含的“物理部分”是什么

主要是通过以下信息和机制:

  1. 算子标识 (Operator ID / UID)

    • 在 Flink 作业中,每个有状态的算子(Operator)都有一个唯一的标识符。这个标识符可以是 Flink 自动生成的,也可以是用户通过 uid(String) 方法指定的。
    • 当进行 Checkpoint 时,每个算子产生的状态都会与这个唯一的算子标识符关联起来并存储。
  2. 子任务索引 (Subtask Index)

    • Flink 作业中的算子通常会以一定的并行度执行。每个并行实例被称为一个子任务(Subtask),并且它们都有一个从 0 到 parallelism-1 的索引。
    • Checkpoint 快照会分别记录下属于每一个算子的每一个子任务的状态。你当前正在查看的 AcknowledgeCheckpoint.java 文件中的 TaskStateSnapshot subtaskState 字段,就代表了一个特定子任务的状态快照。
  3. 键组 (Key Groups) - 针对 Keyed State

    • 对于 Keyed State(例如,在 keyBy 之后使用的 ValueStateMapState 等),数据是根据 key 的哈希值被划分到逻辑的“键组”(Key Groups)中的。
    • 一个作业的最大并行度(maxParallelism)决定了总共有多少个键组。
    • 在 Checkpoint 时,每个键组的状态都会被保存下来。
    • 在恢复时,每个(可能新的)子任务会被确定性地分配一组它需要负责的键组。这个分配算法保证了无论并行度如何变化(在 maxParallelism 范围内),每个 key 始终属于同一个键组,并且每个键组始终会被分配给一个确定的子任务。因此,子任务可以准确地知道应该从快照中加载哪些键组的状态。
  4. 快照元数据 (Checkpoint Metadata)

    • 当一个 Checkpoint 成功完成后,JobManager 会将关于这个 Checkpoint 的所有元数据信息持久化。这些元数据通常包含:
      • Checkpoint ID。
      • 每个算子(通过其唯一标识符识别)的状态信息。
      • 对于每个算子,其每个子任务(通过其索引识别)的状态句柄(State Handle),这些句柄指向实际存储状态数据的位置(例如 HDFS 上的文件路径)。
      • 对于 Keyed State,会记录每个键组范围的状态句柄。

恢复过程如何利用这些信息:

当 Flink 作业从一个 Checkpoint 恢复时:

  1. JobManager 首先从持久化存储中读取选定的 Checkpoint 的元数据。
  2. JobManager 根据当前的作业拓扑和每个算子的并行度,为每个新启动的 Task(即算子的子任务实例)分配任务。
  3. 对于每一个需要恢复状态的 Task:
    • JobManager 会在其元数据中查找与该 Task 对应的算子标识符和子任务索引(或者它负责的键组范围)。
    • 通过这些信息,JobManager 可以定位到该 Task 在 Checkpoint 中对应的状态句柄。
    • Task 接收到这些状态句柄后,就会从持久化存储中读取并加载属于自己的那部分状态数据。

所以,所谓的“物理部分”其实就是指特定算子的特定并行实例(子任务)所拥有的那部分状态数据

Flink 通过在 Checkpoint 时精确记录这种“逻辑算子/子任务”到“实际状态数据存储位置”的映射关系,并在恢复时利用这种映射关系,来确保每个新的 Task 实例都能正确地加载其先前保存的状态。

什么是非对齐检查点 (Unaligned Checkpoint)?

在标准的对齐检查点(Aligned Checkpoint)模式下,当一个算子接收到来自上游某个输入通道的检查点屏障 (Checkpoint Barrier) 时,它会暂停处理该通道的数据,直到接收到所有输入通道的屏障。这个过程称为“对齐”。对齐的目的是确保所有算子在同一时刻对数据流进行快照,从而保证精确一次 (Exactly-Once) 的处理语义。然而,在高背压的情况下,对齐过程可能会非常耗时,因为某些通道的屏障可能需要等待很长时间才能到达,这会导致检查点时长增加,甚至超时。

非对齐检查点通过允许检查点屏障“越过”通道中正在传输的数据来解决这个问题。当屏障到达算子时,算子会立即开始进行快照,并将通道中尚未处理的数据(即所谓的“飞行中”数据)也作为检查点状态的一部分保存下来。

非对齐检查点是如何工作的?

核心思想是:

  1. 屏障超越数据 (Barrier Overtaking Data):检查点屏障不再需要等待所有数据处理完毕。当屏障到达一个算子时,该算子会立即开始其快照过程。
  2. 飞行中数据作为状态 (In-flight Data as State):在屏障之后、算子处理之前的数据(即飞行中数据)会被捕获并存储为检查点状态的一部分。这意味着,当从检查点恢复时,这些飞行中数据也会被恢复并重新处理,就好像它们从未被屏障越过一样。
  3. 源端插入屏障:尽管非对齐检查点在概念上更接近 Chandy-Lamport 算法,但 Flink 仍然在数据源端插入屏障,以避免检查点协调器过载。

在代码层面,CheckpointOptions.java 文件定义了不同的对齐类型,包括 UNALIGNED 和 FORCED_ALIGNEDCheckpointConfig.java 中有启用非对齐检查点的方法。

当启用非对齐检查点时,如果设置了 alignedCheckpointTimeout,检查点会先尝试对齐。如果在超时时间内未能完成对齐,则会自动切换到非对齐模式。如果 alignedCheckpointTimeout 设置为0,则检查点会直接以非对齐方式启动。

为什么可以非对齐?

非对齐检查点之所以能够实现,关键在于它改变了对“一致性快照”的实现方式

  • 传统对齐检查点:通过确保所有算子在逻辑上的同一时间点(即所有输入屏障都到达时)进行快照,来保证数据的一致性。这意味着在屏障到达之前的数据都已被处理并反映在算子状态中,屏障之后的数据则不包含在当前快照中。
  • 非对齐检查点:它放宽了“同一逻辑时间点”的严格要求。通过将飞行中的数据(即那些在屏障已经通过但尚未被下游算子处理的数据)也包含在检查点状态中,它依然能够保证在恢复时,所有数据不多不少恰好被处理一次。当从非对齐检查点恢复时,这些被保存的飞行中数据会被重新注入到数据流中,就好像它们从未被屏障“跳过”一样。

优势:

  • 减少背压影响:在高背压场景下,数据处理缓慢,导致屏障对齐时间过长。非对齐检查点由于不需要等待数据处理,可以显著缩短检查点时间。
  • 提高检查点频率:由于检查点时间缩短,可以更频繁地进行检查点,从而减少故障恢复时需要重放的数据量。

限制和注意事项:

  • 精确一次语义:非对齐检查点仅在精确一次 (Exactly-Once) 语义下可用。
  • Savepoint:Savepoint 通常不能是非对齐的,因为它们通常用于版本升级或作业迁移,需要一个完全对齐的状态。
  • 状态大小:由于飞行中的数据也被包含在状态中,非对齐检查点的状态大小可能会比对齐检查点更大。
  • Sink 的特殊处理:如 Flink 2.0 的发布说明中提到 (docs/content/release-notes/flink-2.0.md),对于 Sink 拓扑中的操作(如 committer),非对齐检查点会被禁用,以确保 committable 能够在 notifyCheckpointComplete 时位于相应的算子,从而正确提交所有副作用。

总而言之,非对齐检查点通过将飞行中数据纳入状态管理,实现了在不牺牲精确一次语义的前提下,大幅优化高背压场景下检查点性能的目标。

相关文章:

  • 港股TRS交易系统开发:跨境资本的精密调度引擎
  • 海康工业相机文档大小写错误
  • 独家首发!低照度环境下YOLOv8的增强方案——从理论到TensorRT部署
  • 中科院提出多方协作注意力控制方法MCA-Ctrl,无需调优的即可使用文本和复杂的视觉条件实现高质量的图像定制。
  • Java开发过程中,trycatch异常处理的避坑梳理
  • 巧克力---贪心+堆模拟/优先队列
  • 图像识别预处理(配合pytesseract使用)
  • 一个典型的Qt界面拆分(解耦)方案
  • figma MCP + cursor如何将设计稿生成前端页面
  • 链游技术破壁:NFT资产确权与Play-to-Earn经济模型实战
  • 【数据结构】B树
  • TP6 实现一个字段对数组中的多个值进行LIKE模糊查询(OR逻辑)
  • 【nano与Vim】常用命令
  • K8S认证|CKS题库+答案| 3. 默认网络策略
  • 无需域名,直接加密IP的SSL方案
  • 【Survival Analysis】【机器学习】【3】 SHAP可解釋 AI
  • DDD架构实战 领域层 事件驱动
  • MCP Server 之旅第 5 站:服务鉴权体系解密
  • vanna+deepseek+chainlit 实现自然语言转SQL的精度调优
  • 构建 MCP 服务器:第 4 部分 — 创建工具
  • ps做素材下载网站有哪些/免费外网加速器
  • 设计网站导航大全/搜索引擎优化营销
  • 做网站使用什么软件的/最新地址
  • 优惠券网站是怎么做的/刚刚发生 北京严重发生
  • 做网站的论文摘要/网络营销品牌推广
  • 昆山设计网站的公司/mac蜜桃923色号