当前位置: 首页 > news >正文

Flink SourceOperator和WaterMark

SourceOperator

SourceOperator 本质上是一个标准的 Flink StreamOperator,但它被特殊设计用来托管新的 Source API。它的主要职责包括:

  • 生命周期管理:负责初始化、启动、快照、恢复和关闭其内部包含的 SourceReader
  • 数据拉取与发射:通过 emitNext 方法,调用 SourceReader 的 pollNext 来获取数据,并将数据记录(StreamRecord)和水印(Watermark)发射到下游。
  • 事件处理:作为 OperatorEventHandler,它接收并处理来自 JobManager 端 OperatorCoordinator 的事件,例如分配新的数据分片(Split)。
  • 状态管理:负责将 SourceReader 的状态(主要是当前分配到的 Splits)进行快照,并在故障恢复时还原。
  • 异步 I/O 支持:实现了 PushingAsyncDataInput 接口,能够与 Flink 的 Mailbox 调度模型无缝集成,实现非阻塞的数据输入。

核心接口与继承关系

SourceOperator 的设计体现在它实现的接口和继承的类上:

  • extends AbstractStreamOperator<OUT>: 这是所有 Flink 操作算子的基类,提供了算子生命周期、配置、度量(Metrics)等基础功能。
  • implements OperatorEventHandler: 表明该算子可以处理来自 OperatorCoordinator 的 OperatorEvent。这是实现 Source 与 Coordinator 之间双向通信的关键。
  • implements PushingAsyncDataInput<OUT>: 这是实现异步数据处理的核心。它允许 SourceOperator 在没有可用数据时,提供一个 CompletableFuture (getAvailableFuture()),让 StreamTask 可以非阻塞地等待。当数据可用时,这个 Future 会被完成,StreamTask 就会再次调用 emitNext
  • implements TimestampsAndWatermarks.WatermarkUpdateListener: 用于监听由 TimestampsAndWatermarks 组件计算出的有效 Watermark,并参与 Watermark 对齐等逻辑。

关键组件和字段

SourceOperator 内部维护了几个非常重要的字段来完成其工作:

  • readerFactory: 一个函数式接口,用于延迟创建 SourceReader。延迟创建是必要的,因为像 Metric Group 这样的运行时上下文信息在算子构造时还不可用,需要等到 setup 或 open 阶段。
  • sourceReaderSourceReader<OUT, SplitT> 实例。这是实际从外部系统(如 Kafka, Pulsar)读取数据的逻辑单元。SourceOperator 将大部分读取工作都委托给了它。
  • operatorEventGateway: 与 OperatorCoordinator 通信的网关。所有发送给协调器的事件(如请求 Split)都通过它发送。
  • splitSerializer: 用于序列化和反序列化 Split。这在状态快照和恢复时至关重要,因为 Split 信息需要被持久化到状态后端。
  • readerStateListState<SplitT> 类型,用于存储当前分配给这个 SourceOperator 实例的所有 Split。这是 SourceOperator 的核心状态,保证了故障恢复后能从正确的位置继续读取。
  • eventTimeLogicTimestampsAndWatermarks<OUT> 的实例。它根据用户定义的 WatermarkStrategy 负责从数据中提取时间戳,并生成和发射 Watermark。
  • operatingMode: 一个内部枚举,用于控制 SourceOperator 在不同阶段的行为模式,如 READING (正常读取)、WAITING_FOR_ALIGNMENT (等待水印对齐)、SOURCE_DRAINED (优雅关闭中) 等。这使得状态转换逻辑更清晰。
  • availabilityHelper 和 finishedCompletableFuture 对象,分别用于异步 I/O 的可用性通知和整个算子完成的信令。

核心方法与工作流程

a. 初始化与启动 (initReaderopen)
  1. initReader(): 在 open() 方法执行前被调用。它会创建一个 RichSourceReaderContext,这个上下文包含了 Metric、配置、子任务索引等信息,并将其传递给 readerFactory 来创建 SourceReader 实例。
  2. open():
    • 初始化 eventTimeLogic,用于处理时间戳和 Watermark。
    • 从 readerState 中恢复之前快照的 Splits。如果状态不为空(即故障恢复),则调用 sourceReader.addSplits(splits) 将这些 Splits 交给 Reader 去处理。
    • 通过 operatorEventGateway 向 OperatorCoordinator 发送 ReaderRegistrationEvent 事件,注册自身。
    • 调用 sourceReader.start(),启动 Reader 的内部逻辑。
    • 启动周期性的 Watermark 发射。
b. 数据处理 (emitNext)

这是 StreamTask 的主循环调用的方法,是数据流动的引擎:

  • 它首先会检查 operatingMode。在最常见的 READING 模式下,它会进入一个循环,调用 sourceReader.pollNext(currentMainOutput)
  • pollNext 方法会从外部系统拉取一条或多条数据,并通过 ReaderOutput(即 currentMainOutput)向下游发射。
  • pollNext 返回一个 InputStatusSourceOperator 将其转换为 DataInputStatus 返回给 StreamTask
    • MORE_AVAILABLE: 表示还有数据,StreamTask 会继续调用 emitNext
    • NOTHING_AVAILABLE: 表示暂时没有数据,StreamTask 会通过 getAvailableFuture() 等待数据可用的通知。
    • END_OF_INPUT: 表示数据源已经读取完毕(对于有界流),SourceOperator 会切换到 DATA_FINISHED 模式。
c. 状态与容错 (snapshotStatenotifyCheckpointComplete)
  • snapshotState(...): 当 Flink 触发 Checkpoint 时,此方法被调用。
    • 它会调用 sourceReader.snapshotState(checkpointId)SourceReader 会返回它当前持有的、还未处理完的 Splits 列表。
    • SourceOperator 使用 splitSerializer 将这些 Splits 序列化成字节数组,并更新到 readerState 中。
    • Flink 运行时负责将这个 readerState 持久化到配置的状态后端(如 RocksDB)。
  • notifyCheckpointComplete(...): 当一个 Checkpoint 成功完成后,此方法被调用,并会级联通知到 sourceReader。这允许 SourceReader 提交一些外部系统的事务(例如,更新 Kafka 的消费位点)。
d. 与协调器的交互 (handleOperatorEvent)

这是 SourceOperator 响应 OperatorCoordinator 指令的入口点。

// ... existing code ...@SuppressWarnings("unchecked")public void handleOperatorEvent(OperatorEvent event) {if (event instanceof WatermarkAlignmentEvent) {updateMaxDesiredWatermark((WatermarkAlignmentEvent) event);checkWatermarkAlignment();checkSplitWatermarkAlignment();} else if (event instanceof AddSplitEvent) {handleAddSplitsEvent(((AddSplitEvent<SplitT>) event));} else if (event instanceof SourceEventWrapper) {sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent());} else if (event instanceof NoMoreSplitsEvent) {sourceReader.notifyNoMoreSplits();} else if (event instanceof IsProcessingBacklogEvent) {
// ... existing code ...output.emitRecordAttributes(new RecordAttributesBuilder(Collections.emptyList()).setBacklog(((IsProcessingBacklogEvent) event).isProcessingBacklog()).build());} else {throw new IllegalStateException("Received unexpected operator event " + event);}}private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) {try {
// ... existing code ...
  • AddSplitEvent: 这是最常见的事件。Coordinator 发现或分配了新的数据分片(Splits),通过此事件发送给 SourceOperatorSourceOperator 收到后,调用 sourceReader.addSplits() 将新任务交给 Reader。
  • SourceEventWrapper: 这是一个通用事件包装器,用于在 Coordinator 和 SourceReader 之间传递自定义的 SourceEvent
  • NoMoreSplitsEvent: Coordinator 通知该算子,不会再有新的 Split 分配过来了。这对于有界数据源的关闭判断很有用。
  • WatermarkAlignmentEvent: 在启用了 Watermark 对齐的场景下,Coordinator 会周期性地计算一个全局的“最大允许 Watermark”,并通过此事件下发。SourceOperator 会用它来暂停那些读取过快的 Split,以防止数据倾斜导致整体 Watermark 无法前进。

Watermark 对齐

这是一个高级特性,SourceOperator 提供了内置支持。

  • 当一个 Split 的本地 Watermark (splitCurrentWatermarks) 超过了从 Coordinator 收到的全局最大允许 Watermark (currentMaxDesiredWatermark) 时,意味着这个 Split 读取得太快了。
  • 在 updateCurrentSplitWatermark 方法中会进行判断,如果超过了,就会调用 pauseOrResumeSplits 来暂停这个 Split 的数据拉取。
  • 当全局 Watermark 追上来后,之前被暂停的 Split 会被自动恢复。
  • 这个机制确保了最慢的 Split 不会落后最快的 Split 太多,从而保证了全局 Watermark 的正确推进。

总结

SourceOperator 是一个精心设计的组件,它成功地将数据源的通用逻辑(生命周期、状态、事件处理、与运行时的集成)与具体的数据读取逻辑(在 SourceReader 中实现)解耦。通过实现 PushingAsyncDataInput 和 OperatorEventHandler 等关键接口,它优雅地融入了 Flink 的异步调度和分布式协调机制中,为构建高性能、可容错、可扩展的新数据源提供了坚实的基础。


Source、Split、Kafka Offset 的关系

Source (数据源)

  • ​概念​​:代表整个数据源。对于 Kafka 而言,通常指一个或多个 Kafka Topic。

  • ​Flink 实现​​:通过 KafkaSource对象配置(包含连接 Kafka、反序列化数据等所有参数)。

  • ​架构定位​​:Flink 新数据源架构(FLIP-27)中的通用抽象,不直接绑定 Kafka,通过泛型 SourceReaderSourceSplit支持任意数据源。

Split (分片)

  • ​定义​​:Source 的可独立处理子集/工作单元。一个并行 SourceOperator任务通常处理一个或多个 Split。

  • ​Kafka 映射​​:一个 Split 对应一个 Kafka 分区(Partition)。

  • ​实现类示例​​:KafkaPartitionSplit包含:

    • Topic 名称

    • Partition 编号

    • 起始读取 Offset

    • 作用:定义任务的具体工作范围。

Kafka Offset (位移)

  • ​本质​​:Kafka 分区内每条消息的唯一标识符(类似"行号"或"指针")。

  • ​与 Flink 关系​​:

    • 非 Flink 原生概念(属于 Kafka),但被 Flink Kafka Source 用于实现:

      • 精确一次(Exactly-Once)语义

      • 故障恢复能力

    • ​运行时作用​​:SourceReader从 Split 指定的起始 Offset 开始消费,并持续追踪最新消费位置。

三者协作关系总结

  1. ​分层结构​​:

    Flink 的 Source(如 KafkaSource)→ 切分为多个 Split(通常 1 Partition : 1 Split)→ 每个 Split定义起始 Offset。

  2. ​执行流程​​:

    SourceOperator中的 SourceReader负责实际读取数据,并管理所分配 Split 的消费进度(即 Offset)。


状态保存机制

// flink-runtime\src\main\java\org\apache\flink\streaming\api\operators\SourceOperator.java
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {long checkpointId = context.getCheckpointId();LOG.debug("Taking a snapshot for checkpoint {}", checkpointId);readerState.update(sourceReader.snapshotState(checkpointId));  // 状态委托给 SourceReader
}
  • ​关键设计​​:SourceOperator不直接管理状态细节,通过 sourceReader.snapshotState()将快照责任委托给具体实现。

状态保存内容

  1. ​快照主体​​:

    sourceReader.snapshotState()返回当前所有​​未完成的 Split 列表​​。

    • Kafka 场景:即 KafkaSourceReader持有的全部 KafkaPartitionSplit

  2. ​Offset 处理​​:

    KafkaSplit 快照生成时,其起始 Offset 会被更新为​​下一个待读取的 Offset​​。

最终状态构成

  • ​存储内容​​:readerState保存当前算子实例分配的所有 Split(Kafka 中为 KafkaPartitionSplit列表)。

  • ​关键信息​​:每个 KafkaPartitionSplit包含:

    • 对应的 Topic-Partition

    • 下一次读取的精确 Offset

当任务从 Checkpoint 恢复时:

  1. SourceOperator从状态读取 Split 列表 → 重新分配给新 SourceReader

  2. SourceReader从各 Split 记录的​​精确 Offset​​ 位置继续消费。

    → ​​实现效果​​:数据零丢失且零重复。


KafkaPartitionSplit 

​KafkaPartitionSplit​​ 是 ​​Flink Kafka Connector​​ (现在在独立git库)中一个核心的内部类,它实现了 Flink 的 ​​SourceSplit​​ 接口。

简单来说,一个 ​​KafkaPartitionSplit​​ 对象就代表了 Flink Source 要读取的一个 Kafka 分区的一部分数据。它定义了读取任务的最小工作单元。

​KafkaPartitionSplit​​ 本质上是一个数据结构,它清晰地描述了一个 ​​Flink Kafka Source 的子任务​​:

“你要去读取哪个 Topic 的哪个 Partition,从哪里开始,到哪里结束”。

Flink 的作业管理器(​​JobManager​​)会创建这些 ​​KafkaPartitionSplit​​,然后将它们分发给任务管理器(​​TaskManager​​)上的 ​​Source Reader​​ 去执行实际的数据读取工作。


KafkaPartitionSplit​​ 主要包含以下几个核心内容:

核心属性

​KafkaPartitionSplit​​ 有三个关键的私有成员变量,它们共同定义了一个读取任务:

  • private final TopicPartition tp;

    ​含义​​: 代表要读取的 Kafka 分区。​​TopicPartition​​ 是 Kafka 客户端库中的一个类,它封装了主题(Topic)名称和分区(Partition)号。这是该 Split 最基本的身份标识。

  • private final long startingOffset;

    ​含义​​: 定义了从这个分区的哪个位置开始读取。它可以是一个具体的偏移量(>= 0),也可以是几个特殊的标记值。

  • private final long stoppingOffset;

    ​含义​​: 定义了读到这个分区的哪个位置就停止读取。这使得 Flink 可以处理有限流(Bounded Stream)的场景。


特殊的偏移量标记(Constants)

为了方便配置,代码中定义了一些特殊的 ​​long 型常量​​ 来表示起始和停止位置,而不是强制用户必须指定精确的数字偏移量。

起始偏移量标记 (startingOffset):
  • EARLIEST_OFFSET (-2L)​: 从分区的最早可用偏移量开始消费。

  • LATEST_OFFSET (-1L)​: 从分区的最新偏移量开始消费(即只消费新产生的数据)。

  • COMMITTED_OFFSET (-3L)​: 从消费者组在 Kafka 中最后提交的偏移量开始消费。如果找不到,则根据 auto.offset.reset配置来决定。

LATEST_OFFSET常量标记为废弃,建议使用OffsetsInitializer.latest()方法来设置Kafka源从最新偏移量开始消费:

 
KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setGroupId("my-group").setTopics("my-topic").setDeserializer(new MyDeserializer()).setStartingOffsets(OffsetsInitializer.latest()).build();

这种方式更加明确,并且符合Flink Kafka连接器的当前最佳实践。

停止偏移量标记 (stoppingOffset):
  • NO_STOPPING_OFFSET (Long.MIN_VALUE)​: 表示这是一个无限流任务,永远不会自动停止,会一直消费下去。这是默认值。

  • LATEST_OFFSET (-1L)​: 消费到任务启动时该分区的最新位置就停止。

  • COMMITTED_OFFSET (-3L)​: 消费到任务启动时该分区已提交的最新位置就停止。


主要方法

构造函数:

  • 提供了两个构造函数:

    • 一个可以指定 ​​起始和停止位置​​;

    • 另一个是简化的版本,只指定 ​​起始位置​​,​​停止位置默认为 NO_STOPPING_OFFSET(无限流)​​。

  • 在构造时会调用 ​verifyInitialOffset​ 方法,对传入的 ​startingOffset​ 和 ​stoppingOffset​ 值进行校验,确保它们是合法的(要么是 >= 0 的具体偏移量,要么是预定义的特殊标记值)。

Getter 方法:

  • getTopicPartition()​: 获取 ​​TopicPartition​​ 对象。

  • getStartingOffset()​: 获取起始偏移量。

  • getStoppingOffset()​: 以 ​Optional<Long>​ 的形式返回停止偏移量。

    如果停止偏移量是 ​NO_STOPPING_OFFSET​,则返回 ​Optional.empty()​,否则返回包含具体值的 ​Optional​。

    这种设计可以更优雅地处理“无停止点”的情况。

splitId() 方法:

  • 实现了 ​​SourceSplit​​ 接口的要求,为这个 Split 生成一个唯一的字符串 ID。

    它的实现是直接返回 ​​TopicPartition​​ 的字符串表示(例如 "my-topic-0"),因为一个分区在同一时间只会被一个 reader 读取。


Watermark 的完整流转过程

在分析流转过程之前,我们首先要理解 Watermark 的本质。在 Flink 的事件时间(Event Time)处理模式中,数据流中的事件往往是乱序到达的。为了让系统知道事件时间进行到了什么程度,从而可以安全地关闭窗口并进行计算,Flink 引入了 Watermark 的概念。

  • 定义Watermark(t) 是一个声明,表示在当前数据流中,时间戳小于或等于 t 的事件应该都已经到达了。任何时间戳 t' <= t 的事件再到达,就会被认为是“迟到数据”。
  • 作用:当一个算子(如窗口算子)接收到 Watermark(t) 时,它就可以推进自己的内部时钟到 t。这会触发所有结束时间小于 t 的窗口进行计算和输出。

正如 Flink 官方文档所描述的,Watermark 是数据流的一部分,它像一个特殊的记录在流中向下游传递。

Watermark 的诞生:在数据源 (SourceOperator) 中生成

Watermark 的生命周期始于数据源。在现代的 Flink Source 架构中,这个职责主要由 SourceOperator 及其内部组件承担。

a. 基于 WatermarkStrategy 的初始化

用户通过 DataStream.assignTimestampsAndWatermarks(WatermarkStrategy) 为数据流指定一个 Watermark 生成策略。这个策略被传递给 SourceOperator

在 SourceOperator.java 的 open() 方法中,我们可以看到 eventTimeLogic 这个关键组件的初始化:

// ... existing code ...@Overridepublic void open() throws Exception {
// ... existing code ...initReader();// in the future when we this one is migrated to the "eager initialization" operator// (StreamOperatorV2), then we should evaluate this during operator construction.if (emitProgressiveWatermarks) {eventTimeLogic =TimestampsAndWatermarks.createProgressiveEventTimeLogic(watermarkStrategy,sourceMetricGroup,getProcessingTimeService(),getExecutionConfig().getAutoWatermarkInterval(),mainInputActivityClock,getProcessingTimeService().getClock(),taskIOMetricGroup);} else {
// ... existing code ...}// ... existing code ...// Start the reader after registration, sending messages in start is allowed.sourceReader.start();eventTimeLogic.startPeriodicWatermarkEmits();}
// ... existing code ...
  • eventTimeLogic (TimestampsAndWatermarks 的实例) 封装了所有与时间戳提取和 Watermark 生成相关的逻辑。
  • 它接收用户定义的 watermarkStrategy
  • eventTimeLogic.startPeriodicWatermarkEmits() 启动了一个周期性任务,该任务会定时调用 WatermarkGenerator 的 onPeriodicEmit() 方法来生成并发出 Watermark。
b. 从数据到 Watermark
  1. SourceOperator 通过 sourceReader.pollNext() 从外部系统(如 Kafka)拉取数据。
  2. 拉取到的数据会经过 eventTimeLogic 处理。eventTimeLogic 内部的 TimestampAssigner 会从每条记录中提取事件时间戳。
  3. WatermarkGenerator 会观察这些时间戳(通过 onEvent() 方法),并根据其内部逻辑(例如 "bounded out-of-orderness" 策略)更新其当前的 Watermark。
  4. 当周期性发射任务触发时,WatermarkGenerator 会通过 onPeriodicEmit() 发出它认为安全的最新 Watermark。

周期任务到Task主线程

eventTimeLogic.startPeriodicWatermarkEmits()【考虑ProgressiveTimestampsAndWatermarks实现类】使用了ProcessingTimeService

    @Overridepublic void startPeriodicWatermarkEmits() {checkState(periodicEmitHandle == null, "periodic emitter already started");if (periodicWatermarkInterval == 0) {// a value of zero means not activatedreturn;}periodicEmitHandle =timeService.scheduleWithFixedDelay(this::emitImmediateWatermark,periodicWatermarkInterval,periodicWatermarkInterval);}

这里直接交给线程池处理,但是通过高阶函数进行封装,追踪水位线回调 会回到MailBox主线程,具体过程见:

揭秘Flink Timer机制:是否多线程触发?

Watermark 的发射:从 SourceOperator 到下游

当 eventTimeLogic 决定要发射一个新的 Watermark 时,它会调用注册的监听器。

  • 在 SourceOperator 中,updateCurrentEffectiveWatermark(long watermark) 方法就是这个监听器。它会更新算子内部的 latestWatermark 字段。
  • 最终,Watermark 通过 output.emitWatermark() 被发射到 Flink 的数据流网络中,成为一个 WatermarkEvent

此外,新的 SourceReader API 允许更精细的控制,Reader 本身可以直接发射针对某个 Split 的 Watermark。这在 initReader() 方法中创建的 RichSourceReaderContext 里有所体现:

// ... existing code ...@Overridepublic void emitWatermark(org.apache.flink.api.common.watermark.Watermark watermark) {checkState(watermarkIsAlignedMap.containsKey(watermark.getIdentifier()));output.emitWatermark(new WatermarkEvent(watermark,watermarkIsAlignedMap.get(watermark.getIdentifier())));}
// ... existing code ...

这里,一个原生的 Watermark 对象被包装成 WatermarkEvent,这个事件额外携带了是否对齐的信息,这对于后续的 Watermark 对齐机制至关重要。

并行流中的 Watermark:对齐与“木桶效应”

当数据流在并行环境中处理时,Watermark 的行为变得更加复杂。

a. “取其小者”原则 (The "Minimum" Rule)

一个算子可能有多个并行的输入流(例如,经过 keyBy 或 union 之后)。为了保证正确性,该算子的当前事件时间被定义为其所有输入流中最小的那个 Watermark。这意味着,只有当所有输入通道的 Watermark 都越过某个时间点 t 时,这个算子才能安全地将自己的内部时钟推进到 t

这个原则就像木桶效应:整个系统的事件时间进度被最慢的那个并行实例所限制。

b. Watermark 对齐机制 (Watermark Alignment)

为了解决上述木桶效应导致的高延迟问题,SourceOperator 实现了一套精巧的 Watermark 对齐机制。

问题场景:假设一个 Kafka Source 有10个分区,其中9个分区消费很快,Watermark 已经到达 12:01:00,但第10个分区因为数据稀疏或处理缓慢,Watermark 还停留在 12:00:00。根据“取其小者”原则,下游所有算子的事件时钟都会被卡在 12:00:00,无法触发 12:00:00 到 12:01:00 之间的窗口。

解决方案

  1. 上报与计算:每个 SourceOperator 会周期性地将自己当前的 Watermark (latestWatermark) 通过 ReportedWatermarkEvent 上报给 JobManager 端的 SourceCoordinator

  2. 下发指令:Coordinator 收集所有 SourceOperator 的 Watermark 后,会计算出一个全局的“最大允许 Watermark” (currentMaxDesiredWatermark),并通过 WatermarkAlignmentEvent 事件下发给所有的 SourceOperator

  3. 暂停与恢复SourceOperator 在收到这个事件后,会更新自己的 currentMaxDesiredWatermark。然后,在 updateCurrentSplitWatermark 方法中,它会检查每个 Split(分区)的局部 Watermark:

    // ... existing code ...
    @Override
    public void updateCurrentSplitWatermark(String splitId, long watermark) {splitCurrentWatermarks.put(splitId, watermark);if (watermark > currentMaxDesiredWatermark && !currentlyPausedSplits.contains(splitId)) {pauseOrResumeSplits(Collections.singletonList(splitId), Collections.emptyList());currentlyPausedSplits.add(splitId);}
    }
    // ... existing code ...
    

    如果某个 Split 的 Watermark 超过了 Coordinator 下发的最大允许值,SourceOperator 就会调用 pauseOrResumeSplits 暂停从这个过快的 Split 读取数据。

  4. 动态调整:当全局 Watermark 追上来后,Coordinator 会下发新的、更大的 maxDesiredWatermarkSourceOperator 在 checkSplitWatermarkAlignment 方法中会检查之前被暂停的 Split,如果它们的 Watermark 现在已经低于新的允许值,就会恢复对它们的读取。

这个机制通过动态地“拖慢”快的分区,等待慢的分区,从而使得整个数据源的 Watermark 能够更平稳、更同步地向前推进,极大地缓解了数据倾斜带来的延迟问题。

Watermark 的消费:触发计算

当一个 Watermark 最终流转到下游的一个算子(比如 WindowOperator)时:

  1. 该算子接收到 Watermark,并根据“取其小者”原则更新自己的内部事件时钟。
  2. 时钟的推进会触发检查。WindowOperator 会检查所有待处理的窗口,如果一个窗口的结束时间戳小于或等于新到达的 Watermark,这个窗口就被认为是完整的。
  3. 算子会触发该窗口的计算(调用用户定义的 WindowFunction 或 ProcessWindowFunction),并将结果发送到更下游。
  4. 处理完毕后,该算子会将这个 Watermark 原封不动地转发给它的下游算子,继续这个信令的传递。

总结

Watermark 的流转是一个贯穿 Flink 作业始终的、精妙的信令传递过程:

  1. 生成:在 SourceOperator 中,基于用户策略从数据中产生。
  2. 对齐:在并行 Source 内部,通过与 Coordinator 的交互,实现各分区的 Watermark 对齐,避免进度被个别慢分区卡住。
  3. 传播:作为特殊事件在数据流中向下游传播,遵循并行流中的“取其小者”原则。
  4. 消费:被下游算子(特别是窗口算子)消费,用于推进内部时钟和触发时间相关的计算。

SourceOperator 在这个过程中扮演了至关重要的角色,它不仅是 Watermark 的诞生地,更是通过内置的对齐机制,保证了在复杂并行场景下 Watermark 能够健康、平稳地在整个系统中流转。

http://www.dtcms.com/a/403920.html

相关文章:

  • 容器化 Djiango 应用程序
  • 营销网站建设企划案例网站建设业务越做越累
  • Java EE、Java SE 和 Spring Boot
  • 两学一做专题网站wordpress 用户密码的加密算法
  • 手写数据结构-- avl树
  • MySQL-事务日志
  • SpringBoot旅游管理系统
  • 永州市城乡建设规划局网站湖南大型网站建设公司
  • 买东西网站有哪些汽车设计公司排名前十强
  • IT 疑难杂症诊疗室:破解常见故障的实战指南​
  • 集团网站建设详细策划广告设计与制作模板
  • OSError: [WinError 182] 操作系统无法运行 %1。 解决办法
  • 部门网站建设的工作领导小组局域网建设简单的影视网站
  • 嵌入式学习(45)-基于STM32F407Hal库的Modbus Slave从机程序
  • 【字符串算法集合】KMP EXKMP Manacher Trie 树 AC 自动机
  • 网站是哪家公司开发的中山网站建设文化价位
  • 织梦网站如何备份教程企业网站建设公司网络
  • 杭州的网站建设公司4s店网站建设方案
  • 如果在自己电脑上运行,没有问题。但是移植到工控机,有问题
  • 网站建设计划方案中国著名的个人网站
  • 漫谈<爬虫与反爬的斗争>之反爬技术全景综述
  • @WebFilter 过滤器的执行顺序
  • 唐山建站方案七台河新闻综合频道直播
  • webpack library
  • 网站如何做背景音乐苏州集团网站建设
  • 建设工程招聘信息网站微信pc版
  • windows系统怎么做ppt下载网站永康外贸网站建设
  • 人工设计图像特征
  • 网站抓取qqwordpress 菜单 导航
  • centos网卡设置问题