Flink SourceOperator和WaterMark
SourceOperator
SourceOperator
本质上是一个标准的 Flink StreamOperator
,但它被特殊设计用来托管新的 Source
API。它的主要职责包括:
- 生命周期管理:负责初始化、启动、快照、恢复和关闭其内部包含的
SourceReader
。 - 数据拉取与发射:通过
emitNext
方法,调用SourceReader
的pollNext
来获取数据,并将数据记录(StreamRecord
)和水印(Watermark
)发射到下游。 - 事件处理:作为
OperatorEventHandler
,它接收并处理来自 JobManager 端OperatorCoordinator
的事件,例如分配新的数据分片(Split)。 - 状态管理:负责将
SourceReader
的状态(主要是当前分配到的 Splits)进行快照,并在故障恢复时还原。 - 异步 I/O 支持:实现了
PushingAsyncDataInput
接口,能够与 Flink 的 Mailbox 调度模型无缝集成,实现非阻塞的数据输入。
核心接口与继承关系
SourceOperator
的设计体现在它实现的接口和继承的类上:
extends AbstractStreamOperator<OUT>
: 这是所有 Flink 操作算子的基类,提供了算子生命周期、配置、度量(Metrics)等基础功能。implements OperatorEventHandler
: 表明该算子可以处理来自OperatorCoordinator
的OperatorEvent
。这是实现 Source 与 Coordinator 之间双向通信的关键。implements PushingAsyncDataInput<OUT>
: 这是实现异步数据处理的核心。它允许SourceOperator
在没有可用数据时,提供一个CompletableFuture
(getAvailableFuture()
),让StreamTask
可以非阻塞地等待。当数据可用时,这个 Future 会被完成,StreamTask
就会再次调用emitNext
。implements TimestampsAndWatermarks.WatermarkUpdateListener
: 用于监听由TimestampsAndWatermarks
组件计算出的有效 Watermark,并参与 Watermark 对齐等逻辑。
关键组件和字段
SourceOperator
内部维护了几个非常重要的字段来完成其工作:
readerFactory
: 一个函数式接口,用于延迟创建SourceReader
。延迟创建是必要的,因为像 Metric Group 这样的运行时上下文信息在算子构造时还不可用,需要等到setup
或open
阶段。sourceReader
:SourceReader<OUT, SplitT>
实例。这是实际从外部系统(如 Kafka, Pulsar)读取数据的逻辑单元。SourceOperator
将大部分读取工作都委托给了它。operatorEventGateway
: 与OperatorCoordinator
通信的网关。所有发送给协调器的事件(如请求 Split)都通过它发送。splitSerializer
: 用于序列化和反序列化 Split。这在状态快照和恢复时至关重要,因为 Split 信息需要被持久化到状态后端。readerState
:ListState<SplitT>
类型,用于存储当前分配给这个SourceOperator
实例的所有 Split。这是SourceOperator
的核心状态,保证了故障恢复后能从正确的位置继续读取。eventTimeLogic
:TimestampsAndWatermarks<OUT>
的实例。它根据用户定义的WatermarkStrategy
负责从数据中提取时间戳,并生成和发射 Watermark。operatingMode
: 一个内部枚举,用于控制SourceOperator
在不同阶段的行为模式,如READING
(正常读取)、WAITING_FOR_ALIGNMENT
(等待水印对齐)、SOURCE_DRAINED
(优雅关闭中) 等。这使得状态转换逻辑更清晰。availabilityHelper
和finished
:CompletableFuture
对象,分别用于异步 I/O 的可用性通知和整个算子完成的信令。
核心方法与工作流程
a. 初始化与启动 (initReader
, open
)
initReader()
: 在open()
方法执行前被调用。它会创建一个RichSourceReaderContext
,这个上下文包含了 Metric、配置、子任务索引等信息,并将其传递给readerFactory
来创建SourceReader
实例。open()
:- 初始化
eventTimeLogic
,用于处理时间戳和 Watermark。 - 从
readerState
中恢复之前快照的 Splits。如果状态不为空(即故障恢复),则调用sourceReader.addSplits(splits)
将这些 Splits 交给 Reader 去处理。 - 通过
operatorEventGateway
向OperatorCoordinator
发送ReaderRegistrationEvent
事件,注册自身。 - 调用
sourceReader.start()
,启动 Reader 的内部逻辑。 - 启动周期性的 Watermark 发射。
- 初始化
b. 数据处理 (emitNext
)
这是 StreamTask
的主循环调用的方法,是数据流动的引擎:
- 它首先会检查
operatingMode
。在最常见的READING
模式下,它会进入一个循环,调用sourceReader.pollNext(currentMainOutput)
。 pollNext
方法会从外部系统拉取一条或多条数据,并通过ReaderOutput
(即currentMainOutput
)向下游发射。pollNext
返回一个InputStatus
,SourceOperator
将其转换为DataInputStatus
返回给StreamTask
。MORE_AVAILABLE
: 表示还有数据,StreamTask
会继续调用emitNext
。NOTHING_AVAILABLE
: 表示暂时没有数据,StreamTask
会通过getAvailableFuture()
等待数据可用的通知。END_OF_INPUT
: 表示数据源已经读取完毕(对于有界流),SourceOperator
会切换到DATA_FINISHED
模式。
c. 状态与容错 (snapshotState
, notifyCheckpointComplete
)
snapshotState(...)
: 当 Flink 触发 Checkpoint 时,此方法被调用。- 它会调用
sourceReader.snapshotState(checkpointId)
,SourceReader
会返回它当前持有的、还未处理完的 Splits 列表。 SourceOperator
使用splitSerializer
将这些 Splits 序列化成字节数组,并更新到readerState
中。- Flink 运行时负责将这个
readerState
持久化到配置的状态后端(如 RocksDB)。
- 它会调用
notifyCheckpointComplete(...)
: 当一个 Checkpoint 成功完成后,此方法被调用,并会级联通知到sourceReader
。这允许SourceReader
提交一些外部系统的事务(例如,更新 Kafka 的消费位点)。
d. 与协调器的交互 (handleOperatorEvent
)
这是 SourceOperator
响应 OperatorCoordinator
指令的入口点。
// ... existing code ...@SuppressWarnings("unchecked")public void handleOperatorEvent(OperatorEvent event) {if (event instanceof WatermarkAlignmentEvent) {updateMaxDesiredWatermark((WatermarkAlignmentEvent) event);checkWatermarkAlignment();checkSplitWatermarkAlignment();} else if (event instanceof AddSplitEvent) {handleAddSplitsEvent(((AddSplitEvent<SplitT>) event));} else if (event instanceof SourceEventWrapper) {sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent());} else if (event instanceof NoMoreSplitsEvent) {sourceReader.notifyNoMoreSplits();} else if (event instanceof IsProcessingBacklogEvent) {
// ... existing code ...output.emitRecordAttributes(new RecordAttributesBuilder(Collections.emptyList()).setBacklog(((IsProcessingBacklogEvent) event).isProcessingBacklog()).build());} else {throw new IllegalStateException("Received unexpected operator event " + event);}}private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) {try {
// ... existing code ...
AddSplitEvent
: 这是最常见的事件。Coordinator 发现或分配了新的数据分片(Splits),通过此事件发送给SourceOperator
。SourceOperator
收到后,调用sourceReader.addSplits()
将新任务交给 Reader。SourceEventWrapper
: 这是一个通用事件包装器,用于在 Coordinator 和 SourceReader 之间传递自定义的SourceEvent
。NoMoreSplitsEvent
: Coordinator 通知该算子,不会再有新的 Split 分配过来了。这对于有界数据源的关闭判断很有用。WatermarkAlignmentEvent
: 在启用了 Watermark 对齐的场景下,Coordinator 会周期性地计算一个全局的“最大允许 Watermark”,并通过此事件下发。SourceOperator
会用它来暂停那些读取过快的 Split,以防止数据倾斜导致整体 Watermark 无法前进。
Watermark 对齐
这是一个高级特性,SourceOperator
提供了内置支持。
- 当一个 Split 的本地 Watermark (
splitCurrentWatermarks
) 超过了从 Coordinator 收到的全局最大允许 Watermark (currentMaxDesiredWatermark
) 时,意味着这个 Split 读取得太快了。 - 在
updateCurrentSplitWatermark
方法中会进行判断,如果超过了,就会调用pauseOrResumeSplits
来暂停这个 Split 的数据拉取。 - 当全局 Watermark 追上来后,之前被暂停的 Split 会被自动恢复。
- 这个机制确保了最慢的 Split 不会落后最快的 Split 太多,从而保证了全局 Watermark 的正确推进。
总结
SourceOperator
是一个精心设计的组件,它成功地将数据源的通用逻辑(生命周期、状态、事件处理、与运行时的集成)与具体的数据读取逻辑(在 SourceReader
中实现)解耦。通过实现 PushingAsyncDataInput
和 OperatorEventHandler
等关键接口,它优雅地融入了 Flink 的异步调度和分布式协调机制中,为构建高性能、可容错、可扩展的新数据源提供了坚实的基础。
Source、Split、Kafka Offset 的关系
Source (数据源)
-
概念:代表整个数据源。对于 Kafka 而言,通常指一个或多个 Kafka Topic。
-
Flink 实现:通过
KafkaSource
对象配置(包含连接 Kafka、反序列化数据等所有参数)。 -
架构定位:Flink 新数据源架构(FLIP-27)中的通用抽象,不直接绑定 Kafka,通过泛型
SourceReader
和SourceSplit
支持任意数据源。
Split (分片)
-
定义:Source 的可独立处理子集/工作单元。一个并行
SourceOperator
任务通常处理一个或多个 Split。 -
Kafka 映射:一个 Split 对应一个 Kafka 分区(Partition)。
-
实现类示例:
KafkaPartitionSplit
包含:-
Topic 名称
-
Partition 编号
-
起始读取 Offset
-
作用:定义任务的具体工作范围。
-
Kafka Offset (位移)
-
本质:Kafka 分区内每条消息的唯一标识符(类似"行号"或"指针")。
-
与 Flink 关系:
-
非 Flink 原生概念(属于 Kafka),但被 Flink Kafka Source 用于实现:
-
精确一次(Exactly-Once)语义
-
故障恢复能力
-
-
运行时作用:
SourceReader
从 Split 指定的起始 Offset 开始消费,并持续追踪最新消费位置。
-
三者协作关系总结
-
分层结构:
Flink 的
Source
(如KafkaSource
)→ 切分为多个Split
(通常 1 Partition : 1 Split)→ 每个Split
定义起始 Offset。 -
执行流程:
SourceOperator
中的SourceReader
负责实际读取数据,并管理所分配 Split 的消费进度(即 Offset)。
状态保存机制
// flink-runtime\src\main\java\org\apache\flink\streaming\api\operators\SourceOperator.java
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {long checkpointId = context.getCheckpointId();LOG.debug("Taking a snapshot for checkpoint {}", checkpointId);readerState.update(sourceReader.snapshotState(checkpointId)); // 状态委托给 SourceReader
}
-
关键设计:
SourceOperator
不直接管理状态细节,通过sourceReader.snapshotState()
将快照责任委托给具体实现。
状态保存内容
-
快照主体:
sourceReader.snapshotState()
返回当前所有未完成的 Split 列表。-
Kafka 场景:即
KafkaSourceReader
持有的全部KafkaPartitionSplit
。
-
-
Offset 处理:
KafkaSplit 快照生成时,其起始 Offset 会被更新为下一个待读取的 Offset。
最终状态构成
-
存储内容:
readerState
保存当前算子实例分配的所有 Split(Kafka 中为KafkaPartitionSplit
列表)。 -
关键信息:每个
KafkaPartitionSplit
包含:-
对应的 Topic-Partition
-
下一次读取的精确 Offset
-
当任务从 Checkpoint 恢复时:
-
SourceOperator
从状态读取 Split 列表 → 重新分配给新SourceReader
。 -
SourceReader
从各 Split 记录的精确 Offset 位置继续消费。→ 实现效果:数据零丢失且零重复。
KafkaPartitionSplit
KafkaPartitionSplit 是 Flink Kafka Connector (现在在独立git库)中一个核心的内部类,它实现了 Flink 的 SourceSplit 接口。
简单来说,一个 KafkaPartitionSplit 对象就代表了 Flink Source 要读取的一个 Kafka 分区的一部分数据。它定义了读取任务的最小工作单元。
KafkaPartitionSplit 本质上是一个数据结构,它清晰地描述了一个 Flink Kafka Source 的子任务:
“你要去读取哪个 Topic 的哪个 Partition,从哪里开始,到哪里结束”。
Flink 的作业管理器(JobManager)会创建这些 KafkaPartitionSplit,然后将它们分发给任务管理器(TaskManager)上的 Source Reader 去执行实际的数据读取工作。
KafkaPartitionSplit 主要包含以下几个核心内容:
核心属性
KafkaPartitionSplit 有三个关键的私有成员变量,它们共同定义了一个读取任务:
-
private final TopicPartition tp;
含义: 代表要读取的 Kafka 分区。TopicPartition 是 Kafka 客户端库中的一个类,它封装了主题(Topic)名称和分区(Partition)号。这是该 Split 最基本的身份标识。
-
private final long startingOffset;
含义: 定义了从这个分区的哪个位置开始读取。它可以是一个具体的偏移量(>= 0),也可以是几个特殊的标记值。
-
private final long stoppingOffset;
含义: 定义了读到这个分区的哪个位置就停止读取。这使得 Flink 可以处理有限流(Bounded Stream)的场景。
特殊的偏移量标记(Constants)
为了方便配置,代码中定义了一些特殊的 long 型常量 来表示起始和停止位置,而不是强制用户必须指定精确的数字偏移量。
起始偏移量标记 (startingOffset):
-
EARLIEST_OFFSET (-2L)
: 从分区的最早可用偏移量开始消费。 -
LATEST_OFFSET (-1L)
: 从分区的最新偏移量开始消费(即只消费新产生的数据)。 -
COMMITTED_OFFSET (-3L)
: 从消费者组在 Kafka 中最后提交的偏移量开始消费。如果找不到,则根据auto.offset.reset
配置来决定。
LATEST_OFFSET
常量标记为废弃,建议使用OffsetsInitializer.latest()
方法来设置Kafka源从最新偏移量开始消费:KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setGroupId("my-group").setTopics("my-topic").setDeserializer(new MyDeserializer()).setStartingOffsets(OffsetsInitializer.latest()).build();
这种方式更加明确,并且符合Flink Kafka连接器的当前最佳实践。
停止偏移量标记 (stoppingOffset):
-
NO_STOPPING_OFFSET (Long.MIN_VALUE)
: 表示这是一个无限流任务,永远不会自动停止,会一直消费下去。这是默认值。 -
LATEST_OFFSET (-1L)
: 消费到任务启动时该分区的最新位置就停止。 -
COMMITTED_OFFSET (-3L)
: 消费到任务启动时该分区已提交的最新位置就停止。
主要方法
构造函数:
-
提供了两个构造函数:
-
一个可以指定 起始和停止位置;
-
另一个是简化的版本,只指定 起始位置,停止位置默认为 NO_STOPPING_OFFSET(无限流)。
-
-
在构造时会调用
verifyInitialOffset
方法,对传入的 startingOffset
和 stoppingOffset
值进行校验,确保它们是合法的(要么是 >= 0 的具体偏移量,要么是预定义的特殊标记值)。
Getter 方法:
-
getTopicPartition()
: 获取 TopicPartition 对象。 -
getStartingOffset()
: 获取起始偏移量。 -
getStoppingOffset()
: 以 Optional<Long>
的形式返回停止偏移量。如果停止偏移量是
NO_STOPPING_OFFSET
,则返回 Optional.empty()
,否则返回包含具体值的 Optional
。这种设计可以更优雅地处理“无停止点”的情况。
splitId() 方法:
-
实现了 SourceSplit 接口的要求,为这个 Split 生成一个唯一的字符串 ID。
它的实现是直接返回 TopicPartition 的字符串表示(例如
"my-topic-0"
),因为一个分区在同一时间只会被一个 reader 读取。
Watermark 的完整流转过程
在分析流转过程之前,我们首先要理解 Watermark 的本质。在 Flink 的事件时间(Event Time)处理模式中,数据流中的事件往往是乱序到达的。为了让系统知道事件时间进行到了什么程度,从而可以安全地关闭窗口并进行计算,Flink 引入了 Watermark 的概念。
- 定义:
Watermark(t)
是一个声明,表示在当前数据流中,时间戳小于或等于t
的事件应该都已经到达了。任何时间戳t' <= t
的事件再到达,就会被认为是“迟到数据”。 - 作用:当一个算子(如窗口算子)接收到
Watermark(t)
时,它就可以推进自己的内部时钟到t
。这会触发所有结束时间小于t
的窗口进行计算和输出。
正如 Flink 官方文档所描述的,Watermark 是数据流的一部分,它像一个特殊的记录在流中向下游传递。
Watermark 的诞生:在数据源 (SourceOperator
) 中生成
Watermark 的生命周期始于数据源。在现代的 Flink Source
架构中,这个职责主要由 SourceOperator
及其内部组件承担。
a. 基于 WatermarkStrategy
的初始化
用户通过 DataStream.assignTimestampsAndWatermarks(WatermarkStrategy)
为数据流指定一个 Watermark 生成策略。这个策略被传递给 SourceOperator
。
在 SourceOperator.java
的 open()
方法中,我们可以看到 eventTimeLogic
这个关键组件的初始化:
// ... existing code ...@Overridepublic void open() throws Exception {
// ... existing code ...initReader();// in the future when we this one is migrated to the "eager initialization" operator// (StreamOperatorV2), then we should evaluate this during operator construction.if (emitProgressiveWatermarks) {eventTimeLogic =TimestampsAndWatermarks.createProgressiveEventTimeLogic(watermarkStrategy,sourceMetricGroup,getProcessingTimeService(),getExecutionConfig().getAutoWatermarkInterval(),mainInputActivityClock,getProcessingTimeService().getClock(),taskIOMetricGroup);} else {
// ... existing code ...}// ... existing code ...// Start the reader after registration, sending messages in start is allowed.sourceReader.start();eventTimeLogic.startPeriodicWatermarkEmits();}
// ... existing code ...
eventTimeLogic
(TimestampsAndWatermarks
的实例) 封装了所有与时间戳提取和 Watermark 生成相关的逻辑。- 它接收用户定义的
watermarkStrategy
。 eventTimeLogic.startPeriodicWatermarkEmits()
启动了一个周期性任务,该任务会定时调用WatermarkGenerator
的onPeriodicEmit()
方法来生成并发出 Watermark。
b. 从数据到 Watermark
SourceOperator
通过sourceReader.pollNext()
从外部系统(如 Kafka)拉取数据。- 拉取到的数据会经过
eventTimeLogic
处理。eventTimeLogic
内部的TimestampAssigner
会从每条记录中提取事件时间戳。 WatermarkGenerator
会观察这些时间戳(通过onEvent()
方法),并根据其内部逻辑(例如 "bounded out-of-orderness" 策略)更新其当前的 Watermark。- 当周期性发射任务触发时,
WatermarkGenerator
会通过onPeriodicEmit()
发出它认为安全的最新 Watermark。
周期任务到Task主线程
eventTimeLogic.startPeriodicWatermarkEmits()【考虑
ProgressiveTimestampsAndWatermarks实现类】使用了
ProcessingTimeService
@Overridepublic void startPeriodicWatermarkEmits() {checkState(periodicEmitHandle == null, "periodic emitter already started");if (periodicWatermarkInterval == 0) {// a value of zero means not activatedreturn;}periodicEmitHandle =timeService.scheduleWithFixedDelay(this::emitImmediateWatermark,periodicWatermarkInterval,periodicWatermarkInterval);}
这里直接交给线程池处理,但是通过高阶函数进行封装,追踪水位线回调 会回到MailBox主线程,具体过程见:
揭秘Flink Timer机制:是否多线程触发?
Watermark 的发射:从 SourceOperator
到下游
当 eventTimeLogic
决定要发射一个新的 Watermark 时,它会调用注册的监听器。
- 在
SourceOperator
中,updateCurrentEffectiveWatermark(long watermark)
方法就是这个监听器。它会更新算子内部的latestWatermark
字段。 - 最终,Watermark 通过
output.emitWatermark()
被发射到 Flink 的数据流网络中,成为一个WatermarkEvent
。
此外,新的 SourceReader
API 允许更精细的控制,Reader 本身可以直接发射针对某个 Split 的 Watermark。这在 initReader()
方法中创建的 RichSourceReaderContext
里有所体现:
// ... existing code ...@Overridepublic void emitWatermark(org.apache.flink.api.common.watermark.Watermark watermark) {checkState(watermarkIsAlignedMap.containsKey(watermark.getIdentifier()));output.emitWatermark(new WatermarkEvent(watermark,watermarkIsAlignedMap.get(watermark.getIdentifier())));}
// ... existing code ...
这里,一个原生的 Watermark
对象被包装成 WatermarkEvent
,这个事件额外携带了是否对齐的信息,这对于后续的 Watermark 对齐机制至关重要。
并行流中的 Watermark:对齐与“木桶效应”
当数据流在并行环境中处理时,Watermark 的行为变得更加复杂。
a. “取其小者”原则 (The "Minimum" Rule)
一个算子可能有多个并行的输入流(例如,经过 keyBy
或 union
之后)。为了保证正确性,该算子的当前事件时间被定义为其所有输入流中最小的那个 Watermark。这意味着,只有当所有输入通道的 Watermark 都越过某个时间点 t
时,这个算子才能安全地将自己的内部时钟推进到 t
。
这个原则就像木桶效应:整个系统的事件时间进度被最慢的那个并行实例所限制。
b. Watermark 对齐机制 (Watermark Alignment)
为了解决上述木桶效应导致的高延迟问题,SourceOperator
实现了一套精巧的 Watermark 对齐机制。
问题场景:假设一个 Kafka Source 有10个分区,其中9个分区消费很快,Watermark 已经到达 12:01:00
,但第10个分区因为数据稀疏或处理缓慢,Watermark 还停留在 12:00:00
。根据“取其小者”原则,下游所有算子的事件时钟都会被卡在 12:00:00
,无法触发 12:00:00
到 12:01:00
之间的窗口。
解决方案:
-
上报与计算:每个
SourceOperator
会周期性地将自己当前的 Watermark (latestWatermark
) 通过ReportedWatermarkEvent
上报给 JobManager 端的SourceCoordinator
。 -
下发指令:Coordinator 收集所有
SourceOperator
的 Watermark 后,会计算出一个全局的“最大允许 Watermark” (currentMaxDesiredWatermark
),并通过WatermarkAlignmentEvent
事件下发给所有的SourceOperator
。 -
暂停与恢复:
SourceOperator
在收到这个事件后,会更新自己的currentMaxDesiredWatermark
。然后,在updateCurrentSplitWatermark
方法中,它会检查每个 Split(分区)的局部 Watermark:// ... existing code ... @Override public void updateCurrentSplitWatermark(String splitId, long watermark) {splitCurrentWatermarks.put(splitId, watermark);if (watermark > currentMaxDesiredWatermark && !currentlyPausedSplits.contains(splitId)) {pauseOrResumeSplits(Collections.singletonList(splitId), Collections.emptyList());currentlyPausedSplits.add(splitId);} } // ... existing code ...
如果某个 Split 的 Watermark 超过了 Coordinator 下发的最大允许值,
SourceOperator
就会调用pauseOrResumeSplits
暂停从这个过快的 Split 读取数据。 -
动态调整:当全局 Watermark 追上来后,Coordinator 会下发新的、更大的
maxDesiredWatermark
。SourceOperator
在checkSplitWatermarkAlignment
方法中会检查之前被暂停的 Split,如果它们的 Watermark 现在已经低于新的允许值,就会恢复对它们的读取。
这个机制通过动态地“拖慢”快的分区,等待慢的分区,从而使得整个数据源的 Watermark 能够更平稳、更同步地向前推进,极大地缓解了数据倾斜带来的延迟问题。
Watermark 的消费:触发计算
当一个 Watermark 最终流转到下游的一个算子(比如 WindowOperator
)时:
- 该算子接收到 Watermark,并根据“取其小者”原则更新自己的内部事件时钟。
- 时钟的推进会触发检查。
WindowOperator
会检查所有待处理的窗口,如果一个窗口的结束时间戳小于或等于新到达的 Watermark,这个窗口就被认为是完整的。 - 算子会触发该窗口的计算(调用用户定义的
WindowFunction
或ProcessWindowFunction
),并将结果发送到更下游。 - 处理完毕后,该算子会将这个 Watermark 原封不动地转发给它的下游算子,继续这个信令的传递。
总结
Watermark 的流转是一个贯穿 Flink 作业始终的、精妙的信令传递过程:
- 生成:在
SourceOperator
中,基于用户策略从数据中产生。 - 对齐:在并行 Source 内部,通过与 Coordinator 的交互,实现各分区的 Watermark 对齐,避免进度被个别慢分区卡住。
- 传播:作为特殊事件在数据流中向下游传播,遵循并行流中的“取其小者”原则。
- 消费:被下游算子(特别是窗口算子)消费,用于推进内部时钟和触发时间相关的计算。
SourceOperator
在这个过程中扮演了至关重要的角色,它不仅是 Watermark 的诞生地,更是通过内置的对齐机制,保证了在复杂并行场景下 Watermark 能够健康、平稳地在整个系统中流转。