Flink Exactly Once 和 幂等
Exactly once
简单来说,Flink 的 exactly-once 语义确保了即使在发生故障的情况下,每一条数据也只会被处理一次,对最终结果产生且仅产生一次影响。这对于金融、实时计费等对数据准确性要求极高的场景至关重要。
Flink 实现“端到端”的 exactly-once 主要依赖于两个关键机制:分布式快照(Checkpointing) 和 两阶段提交(Two-Phase Commit)。
1. 分布式快照 / 检查点 (Checkpointing)
这是 Flink 容错机制的核心。可以将其理解为 Flink 会定期为整个正在运行的应用(包括数据流和算子的状态)拍一张“快照”,并将其存储在可靠的持久化存储中(如 HDFS 或 S3)。
工作原理:Flink 在数据流中注入一种名为
barrier
的特殊数据。当barrier
流经所有算子时,每个算子都会将自己当前的状态保存到快照中。当所有算子都完成了快照,这个checkpoint
才算完成。故障恢复:如果发生故障,Flink 会停止应用,然后从最近一次成功完成的
checkpoint
中恢复所有算子的状态,并从快照中记录的数据源位置(例如 Kafka 的 offset)开始重新处理数据。这保证了 Flink 内部处理的 exactly-once。
2. 两阶段提交协议 (Two-Phase Commit)
仅仅有 Checkpoint 只能保证 Flink 内部的状态一致,但要实现从数据源(Source)到数据汇(Sink)的端到端 exactly-once,还需要 Sink 组件的支持。
如果数据在check point中已经处理了,然后发生故障,之后的恢复会再次处理,因此check point只保证至少一次。
Flink 集成了一个两阶段提交协议来协调 checkpoint
和 Sink 的写入操作。
这个过程分为两个阶段:
预提交 (Pre-commit):
当一个
checkpoint
开始时,Flink 的 Sink 连接器会开启一个外部事务(例如,一个 Kafka 事务)。在当前
checkpoint
期间,所有需要输出的数据都会被写入这个事务中,但并不实际提交。当算子完成自己的状态快照后,它会通知 Flink 的总控节点(JobManager)预提交已完成。
提交 (Commit):
当 JobManager 收到来自流处理任务中所有算子的“预提交完成”通知后,它就确认这个
checkpoint
已经全局完成。随后,JobManager 会向所有算子发出“提交”指令。
Sink 连接器收到指令后,才会去提交之前开启的那个外部事务,此时数据才真正在外部系统中可见。
如果发生故障会怎么样?
如果在预提交完成但提交之前发生故障,Flink 会从上一个
checkpoint
恢复。由于这次的事务没有被提交,它会被中止。恢复后,这些数据会被重新处理并写入一个新的事务中,不会造成数据重复。如果在提交之后发生故障,
checkpoint
已经成功,恢复时会从这个checkpoint
之后开始处理,同样不会造成数据重复。
实现前提
要实现端到端的 exactly-once,需要满足以下条件:
数据源 (Source):必须是可重放的(Replayable),例如 Apache Kafka。Flink 需要能够回退到某个特定的数据点重新读取数据。
数据汇 (Sink):必须支持事务(Transactional),或者 Flink 的 Sink 连接器能够模拟事务。例如,Kafka Producer、JDBC Sink 都提供了实现 exactly-once 的能力。
总结来说,Flink 通过分布式快照来保证内部状态的一致性和故障恢复能力,再通过与外部系统集成的两阶段提交协议,将状态的保存和数据写入的提交操作绑定为原子操作,从而实现了强大的端到端 exactly-once 语义。
基于处理时间(Processing Time)的开窗操作天然是非幂等的
“幂等”指什么?
在 Flink 和流处理的上下文中,一个操作的幂等性(Idempotence) 指的是:对于同一批输入数据,无论计算任务执行多少次,其产生的输出结果都是完全相同的。
这在故障恢复场景下至关重要。当 Flink 作业从一个 checkpoint
恢复时,它会重放一部分数据。如果窗口计算是幂等的,那么重放数据后计算出的窗口结果,应该和第一次成功计算时完全一样,这样才能保证 exactly-once 的准确性。
为什么基于处理时间的开窗是非幂等的?
原因在于处理时间(Processing Time)的定义和不确定性。
定义:处理时间是指执行计算任务的机器的系统时钟时间(也常被称为“墙上时钟”)。它反映的是数据到达某个算子(Operator)的那个瞬间的物理时间。
不确定性:这个时间是完全不可预测和不可重现的。它会受到各种因素的影响,例如:
网络延迟(数据从上游传到当前算子的时间波动)。
背压(Backpressure)情况。
Flink 任务的调度延迟。
JVM 的垃圾回收(GC)停顿。
机器负载情况。
分析与举例:
假设我们有一个基于处理时间的5分钟滚动窗口,例如 TUMBLE(p_time, INTERVAL '5' MINUTE)
,其中一个窗口的范围是 10:00:00
到 10:04:59
。
第一次运行:
一个数据元素
A
在10:04:58
到达了窗口算子,它被正确地划分到了10:00 - 10:05
这个窗口中。紧接着,作业在
10:05:00
之前发生了故障,需要从上一个checkpoint
恢复。
故障恢复后的第二次运行(重放数据):
作业从
checkpoint
恢复,数据元素A
被重新从数据源读取并处理。由于恢复过程中的一些延迟(例如,资源重新申请、网络抖动等),这次数据元素
A
在10:05:01
才到达窗口算子。根据处理时间的规则,
10:05:01
已经不属于10:00 - 10:05
窗口了,它将被划分到下一个窗口10:05 - 10:10
中。
结论:
对于同一个输入数据 A
,在两次不同的执行中,它被分配到了不同的窗口里。这导致 10:00 - 10:05
这个窗口的计算结果在两次运行中是不一致的。因此,这个基于处理时间的窗口操作是非幂等的。
解决方案:事件时间 (Event Time)
为了解决这个问题,Flink 引入了事件时间(Event Time)。事件时间是数据本身自带的时间戳,它标记了事件真实发生的时间。无论数据何时到达、被处理多少次,它的事件时间戳是固定不变的。因此,基于事件时间的窗口划分是确定的、可重现的,从而保证了计算的幂等性和结果的准确性。
既然外部世界只看到了 结果B ,那 结果A 和 结果B 不一样又有什么关系呢?
这正是在讨论 Exactly-Once 和幂等性时最容易让人困惑,也是最核心的一点。这里解释为什么“两次计算不同”会产生严重影响。
逻辑链是:
Flink 第一次计算(非幂等),得到
结果A
。在提交到 Sink 前,任务失败。
结果A
从未被外部看到,被丢弃了。Flink 恢复,第二次计算(非幂等),得到
结果B
。结果B
被成功提交到 Sink。疑问:既然外部世界只看到了
结果B
,那结果A
和结果B
不一样又有什么关系呢?
这个问题的答案是:影响在于破坏了数据的时序和状态的连续性,导致 Sink 中的数据在某段时间内是错误的。
让我们用一个具体的例子来说明这个影响。
场景:计算用户账户的实时余额
业务逻辑:每分钟计算一次用户的总余额,并写入 Paimon 表。
Flink 设置:使用处理时间(Processing Time) 进行一分钟的滚动窗口聚合。
正常流程(无故障)
10:00:00 - 10:00:59:
用户A 存入 100元。
10:01:00
时,窗口触发计算,结果是余额: 100
。Checkpoint 1 成功,Paimon 表更新为
{用户A: 100}
。
10:01:00 - 10:01:59:
用户A 又存入 50元,这条数据在
10:01:58
到达 Flink。10:02:00
时,窗口触发计算,结果是余额: 150
。Checkpoint 2 成功,Paimon 表更新为
{用户A: 150}
。
一切看起来都很好。
发生故障的流程
10:00:00 - 10:00:59:
同上,Checkpoint 1 成功,Paimon 表状态是
{用户A: 100}
。
10:01:00 - 10:01:59 (第一次尝试):
用户A 存入 50元,这条数据在
10:01:58
到达 Flink。在
10:01:59
,Flink 任务突然失败! Checkpoint 2 未能提交。
任务恢复与重算 (第二次尝试):
任务从 Checkpoint 1 恢复,Paimon 的状态依然是
{用户A: 100}
。Flink 从 Kafka 重新读取那笔 50元 的存款数据。
由于恢复过程中的延迟,这次这条 50元 的数据在
10:02:01
才到达 Flink。
灾难性的后果:
根据处理时间的规则,
10:02:01
已经不属于10:01:00 - 10:01:59
这个窗口了!它被划分到了下一个窗口10:02:00 - 10:02:59
。因此,当
10:02:00
这个时间点到来时,10:01:00 - 10:01:59
这个窗口实际上是空的,没有收到任何新数据。Flink 基于空数据和上一个状态
{用户A: 100}
进行计算,得出的结果依然是余额: 100
。Checkpoint 2 成功,Paimon 表被更新为
{用户A: 100}
。
影响是什么?
在 10:02:00
到 10:03:00
这一整分钟内,Paimon 表中记录的用户A的余额是 100元,但他的真实余额应该是 150元。那丢失的 50元 要等到 10:03:00
下一个窗口关闭时才会被计算进去。
结论:
即使外部没有感知到那个失败的中间结果,但 Flink 计算的非幂等性导致了最终成功提交到 Sink 的数据是暂时错误的。它将本应属于上一个窗口的数据“推迟”到了下一个窗口,造成了数据状态的时序错乱。
对于需要实时准确性的系统(如实时大屏、实时风控、实时计费),这种“暂时性的错误”是致命的。而如果使用事件时间,无论数据何时到达,它的时间戳是固定的,总能被分到正确的窗口,从而保证每次重算的结果都一致,避免了这种时序问题。
Paimon 可以解决幂等写入吗
Paimon 从设计上就解决了幂等写入的问题。
Paimon 是一种流式数据湖存储,它的核心能力之一就是提供原子性的、可重复的写入和提交。这意味着,即使 Flink 作业发生故障并从上一个 checkpoint 重试,向 Paimon 表重复写入相同的数据,Paimon 也能保证最终存储的数据是正确的,不会出现重复或丢失。
它通过类似于 Git 的提交和快照(Snapshot)机制来实现这一点。每一次 Flink 的 checkpoint 成功后,Paimon 都会生成一个新的、包含本次写入数据变更的快照。读取数据时,总是读取最新且已成功提交的快照,从而天然地屏蔽了重试带来的重复数据问题。
Paimon 是不是也用了 commit + 事件时间?
这里需要将两个概念分开来看:Paimon 的 commit
机制和 Flink 的 事件时间
语义。
Paimon 确实使用了 commit
机制。这个机制与 Flink 的两阶段提交(Two-Phase Commit)协议紧密集成。
预提交 (Pre-commit):当 Flink 开始一个 checkpoint 时,Paimon Sink 会将新数据写入新的数据文件和清单文件(Manifest File),但并不更新表的元数据来指向这个新版本。这相当于一个“暂存”操作。
提交 (Commit):当 Flink 的 checkpoint 成功完成后,JobManager 会通知 Paimon Sink。此时,Paimon Sink 才会执行一个原子操作,将表的元数据指针更新到刚刚预提交成功的新快照(Snapshot)上。这个更新操作是原子性的,一旦完成,新的数据就对所有查询可见了。
这个过程保证了只有在 Flink 确认整个流处理任务的状态都已成功保存(checkpoint 成功)时,Paimon 中的数据才会真正“生效”。如果 checkpoint 失败,预提交的数据就会被废弃,不会影响表的正确状态。
与“事件时间”的关系
Paimon 本身不关心“事件时间”。
Paimon (存储层):它的职责是忠实地、原子地存储 Flink 计算完成后交给它的数据。它的
commit
操作是和 Flink 的checkpoint
绑定的,而不是和数据内容中的“事件时间”戳绑定。Flink (计算层):它负责处理时间语义。当您在 Flink 作业中使用事件时间进行开窗或其他计算时,Flink 保证了计算逻辑的正确性和幂等性(如我们上次讨论的)。计算完成后,Flink 将结果交给 Paimon Sink。
两者的结合点在于:
Flink 使用事件时间保证了计算结果的正确性,然后 Paimon 通过与 Flink checkpoint 绑定的 commit
机制,保证了将这个正确的计算结果写入外部存储的原子性和幂等性。
总结
如果 Flink 的计算逻辑本身是 非幂等 的(例如,使用了处理时间开窗),那么:
第一次尝试计算 Checkpoint N :得到结果 Result_A 。
发生故障后,重试计算 Checkpoint N :由于处理时间变化,可能得到一个完全不同的结果 Result_B 。
在这种情况下,Flink 会将 Result_B 交给 Paimon 去提交。Paimon 会忠实地、幂等地将 Result_B 存下来。最终,存储里的结果就是 Result_B 。
可以这样理解它们的协作关系:
Flink 的事件时间:解决了计算过程的幂等性问题,确保“算得对”。
Paimon 的 Commit 机制:解决了写入存储过程的幂等性问题,确保“存得对”。
将两者结合,就实现了从数据计算到数据持久化端到端的 Exactly-Once 语义,即使在 Flink 中使用了非幂等的计算逻辑(如基于处理时间的窗口),Paimon 也能保证单次 checkpoint 内的数据写入是幂等的。但要实现端到端的业务逻辑正确,还是强烈推荐在 Flink 中使用事件时间。
Paimon 写入数据时 SequenceNumber
总的来说,是的,Paimon 在写入主键表时,会为每一条记录(KeyValue)分配一个唯一的、单调递增的 SequenceNumber
。这个机制是实现其 Merge-Tree (LSM) 数据结构和保证数据更新顺序性的核心。
SequenceNumber
的决定方式主要有两种:
- Paimon 写入时总是会分配一个内部的、自增的
SequenceNumber
。 - 这个序列号是保证数据按写入顺序更新的关键。
- 用户可以通过
sequence.field
选项使用业务数据中的字段(如事件时间)来作为主要的合并依据,以处理乱序数据,此时内部SequenceNumber
作为最终的决胜条件(tie-breaker)。
内部自动生成的序列号
这是 Paimon 的默认行为。
- 初始化: 当一个
Writer
任务启动时,它会首先读取当前表中所有数据文件(DataFileMeta)中的最大sequenceNumber
。然后,它会将自己的序列号计数器初始化为maxSequenceNumber + 1
。 - 写入过程: 每当一条新的记录被写入时,
Writer
就会将当前的序列号分配给这条记录,然后将序列号自增 1。
这确保了在整个表范围内,新写入的记录总是有比旧记录更大的序列号。当对具有相同主键的数据进行合并(Compaction)时,Paimon 会保留 SequenceNumber
最大的那条记录,从而实现了“后写入的数据覆盖先写入的数据”的更新逻辑。
在 MergeTreeWriter.java
中,我们可以清晰地看到这个过程:
// ... existing code ...public MergeTreeWriter(// ...long maxSequenceNumber,// ...) {// ...// 初始化 newSequenceNumber 为当前表中已存在的最大序列号 + 1this.newSequenceNumber = maxSequenceNumber + 1;// ...}// 提供一个方法用于获取并递增序列号private long newSequenceNumber() {return newSequenceNumber++;}// ... existing code ...@Overridepublic void write(KeyValue kv) throws Exception {// 为每条写入的 KeyValue 记录分配一个新的序列号long sequenceNumber = newSequenceNumber();boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());if (!success) {flushWriteBuffer(false, false);success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());if (!success) {throw new RuntimeException("Mem table is too small to hold a single element.");}}}
// ... existing code ...
用户指定的序列号字段 (sequence.field
)
除了内部序列号,Paimon 还允许用户在表属性中通过 'sequence.field' = 'your_field_name'
来指定数据中的某一列作为序列号。
- 工作方式: 当设置了
sequence.field
后,在合并具有相同主键的记录时,Paimon 会优先比较这个指定字段的值。例如,默认情况下会保留该字段值最大的记录。这对于处理乱序数据非常有用,比如可以用事件时间戳作为sequence.field
来保证最终结果的正确性。 - 与内部序列号的关系: 即使配置了
sequence.field
,Paimon 仍然会为每一条记录分配内部的SequenceNumber
。这个内部序列号在这里扮演了“第二裁判”的角色:如果两条记录的主键和sequence.field
的值都完全相同,Paimon 就会用内部的SequenceNumber
来决定保留哪一条,从而保证合并行为的确定性。
下面的代码片段展示了 SortBufferWriteBuffer
在排序时会同时考虑用户定义的序列号字段和内部序列号:
// ... existing code ...// key fieldsIntStream sortFields = IntStream.range(0, keyType.getFieldCount());// user define sequence fieldsif (userDefinedSeqComparator != null) {IntStream udsFields =IntStream.of(userDefinedSeqComparator.compareFields()).map(operand -> operand + keyType.getFieldCount() + 2);// 排序时,在主键之后,加入用户指定的 sequence.fieldsortFields = IntStream.concat(sortFields, udsFields);}// sequence field// 最后,加入内部生成的 sequence number 作为排序依据sortFields = IntStream.concat(sortFields, IntStream.of(keyType.getFieldCount()));int[] sortFieldArray = sortFields.toArray();
// ... existing code ...