当前位置: 首页 > news >正文

Flink Exactly Once 和 幂等

Exactly once

简单来说,Flink 的 exactly-once 语义确保了即使在发生故障的情况下,每一条数据也只会被处理一次,对最终结果产生且仅产生一次影响。这对于金融、实时计费等对数据准确性要求极高的场景至关重要。

Flink 实现“端到端”的 exactly-once 主要依赖于两个关键机制:分布式快照(Checkpointing) 和 两阶段提交(Two-Phase Commit)。

1. 分布式快照 / 检查点 (Checkpointing)

这是 Flink 容错机制的核心。可以将其理解为 Flink 会定期为整个正在运行的应用(包括数据流和算子的状态)拍一张“快照”,并将其存储在可靠的持久化存储中(如 HDFS 或 S3)。

  • 工作原理:Flink 在数据流中注入一种名为 barrier 的特殊数据。当 barrier 流经所有算子时,每个算子都会将自己当前的状态保存到快照中。当所有算子都完成了快照,这个 checkpoint 才算完成。

  • 故障恢复:如果发生故障,Flink 会停止应用,然后从最近一次成功完成的 checkpoint 中恢复所有算子的状态,并从快照中记录的数据源位置(例如 Kafka 的 offset)开始重新处理数据。这保证了 Flink 内部处理的 exactly-once。

    2. 两阶段提交协议 (Two-Phase Commit)

    仅仅有 Checkpoint 只能保证 Flink 内部的状态一致,但要实现从数据源(Source)到数据汇(Sink)的端到端 exactly-once,还需要 Sink 组件的支持。

    如果数据在check point中已经处理了,然后发生故障,之后的恢复会再次处理,因此check point只保证至少一次。

    Flink 集成了一个两阶段提交协议来协调 checkpoint 和 Sink 的写入操作。

    这个过程分为两个阶段:

    1. 预提交 (Pre-commit)

      1. 当一个 checkpoint 开始时,Flink 的 Sink 连接器会开启一个外部事务(例如,一个 Kafka 事务)。

      2. 在当前 checkpoint 期间,所有需要输出的数据都会被写入这个事务中,但并不实际提交。

      3. 当算子完成自己的状态快照后,它会通知 Flink 的总控节点(JobManager)预提交已完成。

      4. 提交 (Commit)

        1. 当 JobManager 收到来自流处理任务中所有算子的“预提交完成”通知后,它就确认这个 checkpoint 已经全局完成。

        2. 随后,JobManager 会向所有算子发出“提交”指令。

        3. Sink 连接器收到指令后,才会去提交之前开启的那个外部事务,此时数据才真正在外部系统中可见。

        如果发生故障会怎么样?

        • 如果在预提交完成但提交之前发生故障,Flink 会从上一个 checkpoint 恢复。由于这次的事务没有被提交,它会被中止。恢复后,这些数据会被重新处理并写入一个新的事务中,不会造成数据重复。

        • 如果在提交之后发生故障,checkpoint 已经成功,恢复时会从这个 checkpoint 之后开始处理,同样不会造成数据重复。

          实现前提

          要实现端到端的 exactly-once,需要满足以下条件:

          • 数据源 (Source):必须是可重放的(Replayable),例如 Apache Kafka。Flink 需要能够回退到某个特定的数据点重新读取数据。

          • 数据汇 (Sink):必须支持事务(Transactional),或者 Flink 的 Sink 连接器能够模拟事务。例如,Kafka Producer、JDBC Sink 都提供了实现 exactly-once 的能力。

            总结来说,Flink 通过分布式快照来保证内部状态的一致性和故障恢复能力,再通过与外部系统集成的两阶段提交协议,将状态的保存和数据写入的提交操作绑定为原子操作,从而实现了强大的端到端 exactly-once 语义。

            基于处理时间(Processing Time)的开窗操作天然是非幂等的

            “幂等”指什么?

            在 Flink 和流处理的上下文中,一个操作的幂等性(Idempotence) 指的是:对于同一批输入数据,无论计算任务执行多少次,其产生的输出结果都是完全相同的。

            这在故障恢复场景下至关重要。当 Flink 作业从一个 checkpoint 恢复时,它会重放一部分数据。如果窗口计算是幂等的,那么重放数据后计算出的窗口结果,应该和第一次成功计算时完全一样,这样才能保证 exactly-once 的准确性。

            为什么基于处理时间的开窗是非幂等的?

            原因在于处理时间(Processing Time)的定义和不确定性。

            1. 定义:处理时间是指执行计算任务的机器的系统时钟时间(也常被称为“墙上时钟”)。它反映的是数据到达某个算子(Operator)的那个瞬间的物理时间。

              1. 不确定性:这个时间是完全不可预测和不可重现的。它会受到各种因素的影响,例如:

                1. 网络延迟(数据从上游传到当前算子的时间波动)。

                2. 背压(Backpressure)情况。

                3. Flink 任务的调度延迟。

                4. JVM 的垃圾回收(GC)停顿。

                5. 机器负载情况。

                分析与举例:

                假设我们有一个基于处理时间的5分钟滚动窗口,例如 TUMBLE(p_time, INTERVAL '5' MINUTE),其中一个窗口的范围是 10:00:0010:04:59

                • 第一次运行

                  • 一个数据元素 A10:04:58 到达了窗口算子,它被正确地划分到了 10:00 - 10:05 这个窗口中。

                  • 紧接着,作业在 10:05:00 之前发生了故障,需要从上一个 checkpoint 恢复。

                  • 故障恢复后的第二次运行(重放数据)

                    • 作业从 checkpoint 恢复,数据元素 A 被重新从数据源读取并处理。

                    • 由于恢复过程中的一些延迟(例如,资源重新申请、网络抖动等),这次数据元素 A10:05:01 才到达窗口算子。

                    • 根据处理时间的规则,10:05:01 已经不属于 10:00 - 10:05 窗口了,它将被划分到下一个窗口 10:05 - 10:10 中。

                    结论:

                    对于同一个输入数据 A,在两次不同的执行中,它被分配到了不同的窗口里。这导致 10:00 - 10:05 这个窗口的计算结果在两次运行中是不一致的。因此,这个基于处理时间的窗口操作是非幂等的。

                    解决方案:事件时间 (Event Time)

                    为了解决这个问题,Flink 引入了事件时间(Event Time)。事件时间是数据本身自带的时间戳,它标记了事件真实发生的时间。无论数据何时到达、被处理多少次,它的事件时间戳是固定不变的。因此,基于事件时间的窗口划分是确定的、可重现的,从而保证了计算的幂等性和结果的准确性。

                    既然外部世界只看到了 结果B ,那 结果A 和 结果B 不一样又有什么关系呢?

                    这正是在讨论 Exactly-Once 和幂等性时最容易让人困惑,也是最核心的一点。这里解释为什么“两次计算不同”会产生严重影响。

                    逻辑链是:

                    1. Flink 第一次计算(非幂等),得到 结果A

                    2. 在提交到 Sink 前,任务失败。

                    3. 结果A 从未被外部看到,被丢弃了。

                    4. Flink 恢复,第二次计算(非幂等),得到 结果B

                    5. 结果B 被成功提交到 Sink。

                    6. 疑问:既然外部世界只看到了 结果B,那 结果A结果B 不一样又有什么关系呢?

                      这个问题的答案是:影响在于破坏了数据的时序和状态的连续性,导致 Sink 中的数据在某段时间内是错误的。

                      让我们用一个具体的例子来说明这个影响。

                      场景:计算用户账户的实时余额

                      • 业务逻辑:每分钟计算一次用户的总余额,并写入 Paimon 表。

                      • Flink 设置:使用处理时间(Processing Time) 进行一分钟的滚动窗口聚合。

                        正常流程(无故障)

                        • 10:00:00 - 10:00:59

                          • 用户A 存入 100元。

                          • 10:01:00 时,窗口触发计算,结果是 余额: 100

                          • Checkpoint 1 成功,Paimon 表更新为 {用户A: 100}

                          • 10:01:00 - 10:01:59

                            • 用户A 又存入 50元,这条数据在 10:01:58 到达 Flink。

                            • 10:02:00 时,窗口触发计算,结果是 余额: 150

                            • Checkpoint 2 成功,Paimon 表更新为 {用户A: 150}

                            一切看起来都很好。

                            发生故障的流程

                            • 10:00:00 - 10:00:59

                              • 同上,Checkpoint 1 成功,Paimon 表状态是 {用户A: 100}

                              • 10:01:00 - 10:01:59 (第一次尝试)

                                • 用户A 存入 50元,这条数据在 10:01:58 到达 Flink。

                                • 10:01:59,Flink 任务突然失败! Checkpoint 2 未能提交。

                                • 任务恢复与重算 (第二次尝试)

                                  • 任务从 Checkpoint 1 恢复,Paimon 的状态依然是 {用户A: 100}

                                  • Flink 从 Kafka 重新读取那笔 50元 的存款数据。

                                  • 由于恢复过程中的延迟,这次这条 50元 的数据在 10:02:01 才到达 Flink。

                                  • 灾难性的后果

                                    • 根据处理时间的规则,10:02:01 已经不属于 10:01:00 - 10:01:59 这个窗口了!它被划分到了下一个窗口 10:02:00 - 10:02:59

                                    • 因此,当 10:02:00 这个时间点到来时,10:01:00 - 10:01:59 这个窗口实际上是空的,没有收到任何新数据。

                                    • Flink 基于空数据和上一个状态 {用户A: 100} 进行计算,得出的结果依然是 余额: 100

                                    • Checkpoint 2 成功,Paimon 表被更新为 {用户A: 100}

                                    影响是什么?

                                    10:02:0010:03:00 这一整分钟内,Paimon 表中记录的用户A的余额是 100元,但他的真实余额应该是 150元。那丢失的 50元 要等到 10:03:00 下一个窗口关闭时才会被计算进去。

                                    结论:

                                    即使外部没有感知到那个失败的中间结果,但 Flink 计算的非幂等性导致了最终成功提交到 Sink 的数据是暂时错误的。它将本应属于上一个窗口的数据“推迟”到了下一个窗口,造成了数据状态的时序错乱。

                                    对于需要实时准确性的系统(如实时大屏、实时风控、实时计费),这种“暂时性的错误”是致命的。而如果使用事件时间,无论数据何时到达,它的时间戳是固定的,总能被分到正确的窗口,从而保证每次重算的结果都一致,避免了这种时序问题。

                                    Paimon 可以解决幂等写入吗

                                    Paimon 从设计上就解决了幂等写入的问题。

                                    Paimon 是一种流式数据湖存储,它的核心能力之一就是提供原子性的、可重复的写入和提交。这意味着,即使 Flink 作业发生故障并从上一个 checkpoint 重试,向 Paimon 表重复写入相同的数据,Paimon 也能保证最终存储的数据是正确的,不会出现重复或丢失。

                                    它通过类似于 Git 的提交和快照(Snapshot)机制来实现这一点。每一次 Flink 的 checkpoint 成功后,Paimon 都会生成一个新的、包含本次写入数据变更的快照。读取数据时,总是读取最新且已成功提交的快照,从而天然地屏蔽了重试带来的重复数据问题。

                                    Paimon 是不是也用了 commit + 事件时间?

                                    这里需要将两个概念分开来看:Paimon 的 commit 机制和 Flink 的 事件时间 语义。

                                    Paimon 确实使用了 commit 机制。这个机制与 Flink 的两阶段提交(Two-Phase Commit)协议紧密集成。

                                    1. 预提交 (Pre-commit):当 Flink 开始一个 checkpoint 时,Paimon Sink 会将新数据写入新的数据文件和清单文件(Manifest File),但并不更新表的元数据来指向这个新版本。这相当于一个“暂存”操作。

                                    2. 提交 (Commit):当 Flink 的 checkpoint 成功完成后,JobManager 会通知 Paimon Sink。此时,Paimon Sink 才会执行一个原子操作,将表的元数据指针更新到刚刚预提交成功的新快照(Snapshot)上。这个更新操作是原子性的,一旦完成,新的数据就对所有查询可见了。

                                      这个过程保证了只有在 Flink 确认整个流处理任务的状态都已成功保存(checkpoint 成功)时,Paimon 中的数据才会真正“生效”。如果 checkpoint 失败,预提交的数据就会被废弃,不会影响表的正确状态。

                                      与“事件时间”的关系

                                      Paimon 本身不关心“事件时间”。

                                      • Paimon (存储层):它的职责是忠实地、原子地存储 Flink 计算完成后交给它的数据。它的 commit 操作是和 Flink 的 checkpoint 绑定的,而不是和数据内容中的“事件时间”戳绑定。

                                        • Flink (计算层):它负责处理时间语义。当您在 Flink 作业中使用事件时间进行开窗或其他计算时,Flink 保证了计算逻辑的正确性和幂等性(如我们上次讨论的)。计算完成后,Flink 将结果交给 Paimon Sink。

                                          两者的结合点在于:

                                          Flink 使用事件时间保证了计算结果的正确性,然后 Paimon 通过与 Flink checkpoint 绑定的 commit 机制,保证了将这个正确的计算结果写入外部存储的原子性和幂等性。

                                          总结

                                          如果 Flink 的计算逻辑本身是 非幂等 的(例如,使用了处理时间开窗),那么:

                                          • 第一次尝试计算 Checkpoint N :得到结果 Result_A 。

                                          • 发生故障后,重试计算 Checkpoint N :由于处理时间变化,可能得到一个完全不同的结果 Result_B 。

                                          在这种情况下,Flink 会将 Result_B 交给 Paimon 去提交。Paimon 会忠实地、幂等地将 Result_B 存下来。最终,存储里的结果就是 Result_B 。

                                          可以这样理解它们的协作关系:

                                          • Flink 的事件时间:解决了计算过程的幂等性问题,确保“算得对”。

                                          • Paimon 的 Commit 机制:解决了写入存储过程的幂等性问题,确保“存得对”。

                                            将两者结合,就实现了从数据计算到数据持久化端到端的 Exactly-Once 语义,即使在 Flink 中使用了非幂等的计算逻辑(如基于处理时间的窗口),Paimon 也能保证单次 checkpoint 内的数据写入是幂等的。但要实现端到端的业务逻辑正确,还是强烈推荐在 Flink 中使用事件时间。

                                            Paimon 写入数据时 SequenceNumber

                                            总的来说,是的,Paimon 在写入主键表时,会为每一条记录(KeyValue)分配一个唯一的、单调递增的 SequenceNumber。这个机制是实现其 Merge-Tree (LSM) 数据结构和保证数据更新顺序性的核心。

                                            SequenceNumber 的决定方式主要有两种:

                                            • Paimon 写入时总是会分配一个内部的、自增的 SequenceNumber
                                            • 这个序列号是保证数据按写入顺序更新的关键。
                                            • 用户可以通过 sequence.field 选项使用业务数据中的字段(如事件时间)来作为主要的合并依据,以处理乱序数据,此时内部 SequenceNumber 作为最终的决胜条件(tie-breaker)。

                                            内部自动生成的序列号

                                            这是 Paimon 的默认行为。

                                            • 初始化: 当一个 Writer 任务启动时,它会首先读取当前表中所有数据文件(DataFileMeta)中的最大 sequenceNumber。然后,它会将自己的序列号计数器初始化为 maxSequenceNumber + 1
                                            • 写入过程: 每当一条新的记录被写入时,Writer 就会将当前的序列号分配给这条记录,然后将序列号自增 1。

                                            这确保了在整个表范围内,新写入的记录总是有比旧记录更大的序列号。当对具有相同主键的数据进行合并(Compaction)时,Paimon 会保留 SequenceNumber 最大的那条记录,从而实现了“后写入的数据覆盖先写入的数据”的更新逻辑。

                                            在 MergeTreeWriter.java 中,我们可以清晰地看到这个过程:

                                            // ... existing code ...public MergeTreeWriter(// ...long maxSequenceNumber,// ...) {// ...// 初始化 newSequenceNumber 为当前表中已存在的最大序列号 + 1this.newSequenceNumber = maxSequenceNumber + 1;// ...}// 提供一个方法用于获取并递增序列号private long newSequenceNumber() {return newSequenceNumber++;}// ... existing code ...@Overridepublic void write(KeyValue kv) throws Exception {// 为每条写入的 KeyValue 记录分配一个新的序列号long sequenceNumber = newSequenceNumber();boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());if (!success) {flushWriteBuffer(false, false);success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());if (!success) {throw new RuntimeException("Mem table is too small to hold a single element.");}}}
                                            // ... existing code ...
                                            

                                            用户指定的序列号字段 (sequence.field)

                                            除了内部序列号,Paimon 还允许用户在表属性中通过 'sequence.field' = 'your_field_name' 来指定数据中的某一列作为序列号。

                                            • 工作方式: 当设置了 sequence.field 后,在合并具有相同主键的记录时,Paimon 会优先比较这个指定字段的值。例如,默认情况下会保留该字段值最大的记录。这对于处理乱序数据非常有用,比如可以用事件时间戳作为 sequence.field 来保证最终结果的正确性。
                                            • 与内部序列号的关系: 即使配置了 sequence.field,Paimon 仍然会为每一条记录分配内部的 SequenceNumber。这个内部序列号在这里扮演了“第二裁判”的角色:如果两条记录的主键和 sequence.field 的值都完全相同,Paimon 就会用内部的 SequenceNumber 来决定保留哪一条,从而保证合并行为的确定性。

                                            下面的代码片段展示了 SortBufferWriteBuffer 在排序时会同时考虑用户定义的序列号字段和内部序列号:

                                            // ... existing code ...// key fieldsIntStream sortFields = IntStream.range(0, keyType.getFieldCount());// user define sequence fieldsif (userDefinedSeqComparator != null) {IntStream udsFields =IntStream.of(userDefinedSeqComparator.compareFields()).map(operand -> operand + keyType.getFieldCount() + 2);// 排序时,在主键之后,加入用户指定的 sequence.fieldsortFields = IntStream.concat(sortFields, udsFields);}// sequence field// 最后,加入内部生成的 sequence number 作为排序依据sortFields = IntStream.concat(sortFields, IntStream.of(keyType.getFieldCount()));int[] sortFieldArray = sortFields.toArray();
                                            // ... existing code ...
                                            

                                            http://www.dtcms.com/a/273254.html

                                            相关文章:

                                          • 【郑大二年级信安小学期】Day9:XSS跨站攻击XSS绕过CSRF漏洞SSRF漏洞
                                          • 服务器深夜告警?可能是攻击前兆!
                                          • Unity插件——ABC详解
                                          • AI驱动的低代码革命:解构与重塑开发范式
                                          • LeetCode 8. 字符串转换整数 (atoi)
                                          • 【保姆级喂饭教程】idea中安装Conventional Commit插件
                                          • FreeRTOS—任务创建和删除的API函数和方法
                                          • 书生实训营第二关:大模型对战
                                          • 列表初始化
                                          • C++ Lambda 表达式详解
                                          • 《棒垒球知道》奥运会的吉祥物是什么·棒球1号位
                                          • 【c++八股文】Day6:using和typedef
                                          • [yolo-world]YOLO-World数据集介绍及标注格式详解
                                          • SoC程序如何使用单例模式运行
                                          • 什么是 MIT License?核心要点解析
                                          • [数据结构与算法] 优先队列 | 最小堆 C++
                                          • 几种LLM推理加速技术的区别
                                          • 列表页与详情页的智能识别:多维度判定方法与工业级实现
                                          • 海光芯赋能:国产化高性能计算平台,重塑边缘与工业智能新算力
                                          • 使用虚拟机远程登陆ensp模拟器交换机
                                          • ROS1学习第二弹
                                          • 1 C++提高——模板
                                          • H5微应用四端调试工具—网页版:深入解析与使用指南
                                          • FS-TAS如何提升电催化反应的效率-测试GO
                                          • 人大金仓下载安装教程总结
                                          • 区块链基础知识:从比特币到区块链的全面解析
                                          • 复杂度简介
                                          • Android-jetpack之DataBinding实战应用
                                          • NMEA-0183 协议 GPS 介绍
                                          • Redis-集群Cluster