当前位置: 首页 > news >正文

Flink Task线程处理模型:Mailbox

Task的线程 和 MailboxProcessor 的绑定

executingThread 是 Task 类(StreamTask 的父类)在构造时创建的物理线程。MailboxProcessor 是 StreamTask 用来处理异步事件和驱动其主要处理逻辑(processInput)的核心组件。它们之间的绑定关系如下:

  • Task 作为 Runnable:

    • Task 类实现了 Runnable 接口,其 run() 方法是 executingThread 的入口点。
      // ...
      public class Task implements Runnable, TaskSlotPayload {// ...private final Thread executingThread; // 在构造函数中创建public Task(/*...*/) {// ...this.executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); // this 指向 Task 实例}@Overridepublic void run() { // 这是 executingThread 执行的入口try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) {doRun(); // 调用实际的工作方法} finally {terminationFuture.complete(executionState);}}private void doRun() {// ...// 对于 StreamTask,这里会调用到 StreamTask 的 invoke() 方法invokable.invoke(); // invokable 就是 StreamTask 实例// ...}// ...
      }
      

StreamTask创建了TaskMailboxImpl,传递给MailboxProcessor,因此是MailboxProcessor的执行线程。

 protected StreamTask(Environment environment,@Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,StreamTaskActionExecutor actionExecutor)throws Exception {this(environment,timerService,uncaughtExceptionHandler,actionExecutor,new TaskMailboxImpl(Thread.currentThread()));}this.mailboxProcessor =new MailboxProcessor(this::processInput, mailbox, actionExecutor, mailboxMetricsControl);

StreamTask.invoke() 和 MailboxProcessor:

  • 当 executingThread 启动并执行到 StreamTask.invoke() 时,StreamTask 会使用其内部的 MailboxProcessor 来驱动其核心事件循环。

    StreamTask.java

    // ...
    @Override
    public final void invoke() throws Exception {// ... (初始化,如 restoreInternal()) ...// let the task do its workgetEnvironment().getMetricGroup().getIOMetricGroup().markTaskStart();runMailboxLoop(); // <--- 关键调用// ... (清理工作,如 afterInvoke()) ...
    }public void runMailboxLoop() throws Exception {mailboxProcessor.runMailboxLoop(); // 将控制权交给 MailboxProcessor
    }
    // ...
    
  • mailboxProcessor.runMailboxLoop() 是一个阻塞调用(从 executingThread 的视角看)。这个方法会在 executingThread 上运行一个循环,不断地从邮箱 (Mailbox) 中取出邮件 (Mail) 并执行它们,或者在没有邮件时执行默认操作 (通常是 StreamTask.processInput(),用于处理输入数据和调用算子)。

Mailbox 的线程模型:

  • MailboxProcessor 被设计为在其“拥有者”线程(即 executingThread)上执行其核心循环和邮件处理。
  • TaskMailbox (被 MailboxProcessor 使用) 内部有检查,确保其关键方法(如 takeput 的某些变体,以及邮件的执行)是在预期的邮箱线程(即 executingThread)上调用的。

    MailboxProcessor.java

    public void runMailboxLoop() throws Exception {// ...final TaskMailbox localMailbox = mailbox;checkState(localMailbox.isMailboxThread(), // 确保当前线程是邮箱线程"Method must be executed by declared mailbox thread!");// ...while (isNextLoopPossible()) {processMail(localMailbox, false); // 处理邮件,在 executingThread 上执行if (isNextLoopPossible()) {mailboxDefaultAction.runDefaultAction(mailboxController); // 执行默认动作,在 executingThread 上执行}}
    }
    
  • MailboxDefaultAction 通常包装了 StreamTask.processInput(),所以数据处理和算子调用也是在 executingThread 上发生的。
  • 其他线程(例如网络线程接收到数据后,或者定时器线程触发定时器)想要与 StreamTask 交互时,它们不会直接调用 StreamTask 的方法,而是向其 Mailbox 中放入一个“邮件”(一个 Runnable 或 Callable)。MailboxProcessor 会在 executingThread 上从邮箱中取出这个邮件并执行它。

总结线程与 Mailbox 的绑定:

  1. Task 构造时创建 executingThread,并将 Task 自身作为 Runnable 传递给该线程。
  2. executingThread 启动后,执行 Task.run() -> Task.doRun() -> StreamTask.invoke()
  3. 在 StreamTask.invoke() 中,调用 mailboxProcessor.runMailboxLoop()
  4. mailboxProcessor.runMailboxLoop() 在 executingThread 上运行,它负责从邮箱中拉取任务并执行,或者执行默认的数据处理逻辑 (processInput)。
  5. 所有提交到该 StreamTask 邮箱的异步操作最终都会在 executingThread 上被 MailboxProcessor 串行化执行。

因此,executingThread 成为了 MailboxProcessor 的“工作线程”。MailboxProcessor 确保了 StreamTask 的核心逻辑(包括状态访问、算子调用等)都在这个单一的 executingThread 上顺序执行,从而简化了并发控制。

MailboxProcessor的功能

MailboxProcessor 是 Flink 中任务(Task)执行模型的核心组件,它实现了基于邮箱(Mailbox)的单线程执行模式。其主要能力包括:

管理邮箱 (TaskMailbox):

  • 持有一个 TaskMailbox 实例,用于存储需要串行执行的各种动作(Mail)。这些动作可以是来自外部的请求(如 Checkpoint 触发、Timer 回调)或内部控制命令。
// ... existing code ...
public class MailboxProcessor implements Closeable {// ... existing code .../*** The mailbox data-structure that manages request for special actions, like timers,* checkpoints, ...*/protected final TaskMailbox mailbox;
// ... existing code ...

执行默认动作 (MailboxDefaultAction):

  • 在邮箱为空时,会循环执行一个预定义的“默认动作”。在 StreamTask 的上下文中,这个默认动作通常是处理输入数据(processInput)。
this.mailboxProcessor =new MailboxProcessor(this::processInput, mailbox, actionExecutor, mailboxMetricsControl);// ... existing code .../*** Action that is repeatedly executed if no action request is in the mailbox. Typically record* processing.*/protected final MailboxDefaultAction mailboxDefaultAction;
// ... existing code ...

单线程执行循环 (runMailboxLoop):

  • 核心方法 runMailboxLoop() 驱动整个执行逻辑。它会不断检查邮箱中是否有新的 Mail,如果有则执行它们;如果没有,则执行默认动作。
  • 这种机制保证了默认动作(如数据处理)和邮箱中的其他动作(如 Checkpoint、Timer 事件)之间是单线程顺序执行的,避免了并发冲突。
// ... existing code ...public void runMailboxLoop() throws Exception {suspended = !mailboxLoopRunning;final TaskMailbox localMailbox = mailbox;checkState(localMailbox.isMailboxThread(),"Method must be executed by declared mailbox thread!");assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";final MailboxController mailboxController = new MailboxController(this);while (isNextLoopPossible()) {// The blocking `processMail` call will not return until default action is available.processMail(localMailbox, false);if (isNextLoopPossible()) {mailboxDefaultAction.runDefaultAction(mailboxController); // lock is acquired inside default action as needed}}}
// ... existing code ...

提供邮箱执行器 (MailboxExecutor):

  • 通过 getMainMailboxExecutor() 和 getMailboxExecutor(int priority) 方法,向外部提供 MailboxExecutor。这使得其他组件(如 TimerService、CheckpointCoordinator)可以将它们的动作提交到邮箱中,由 MailboxProcessor 在其单线程循环中统一调度执行。
// ... existing code ...public MailboxExecutor getMainMailboxExecutor() {return new MailboxExecutorImpl(mailbox, MIN_PRIORITY, actionExecutor);}/*** Returns an executor service facade to submit actions to the mailbox.** @param priority the priority of the {@link MailboxExecutor}.*/public MailboxExecutor getMailboxExecutor(int priority) {return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this);}
// ... existing code ...

生命周期管理:

  • 实现了 Closeable 接口,并有 prepareClose() 和 close() 方法,对应 TaskMailbox 的 quiesce() 和 close()。这确保了在任务结束时,邮箱能被正确关闭,并处理(如取消)剩余的 Mail
// ... existing code .../** Lifecycle method to close the mailbox for action submission. */public void prepareClose() {mailbox.quiesce();}/*** Lifecycle method to close the mailbox for action submission/retrieval. This will cancel all* instances of {@link java.util.concurrent.RunnableFuture} that are still contained in the* mailbox.*/@Overridepublic void close() {List<Mail> droppedMails = mailbox.close();
// ... existing code ...}

挂起与恢复:

  • MailboxProcessor 的执行循环可以被挂起 (suspend()) 和恢复(通过再次调用 runMailboxLoop() 或相关控制逻辑)。默认动作也可以通过 MailboxController 暂时挂起。
// ... existing code .../** Suspend the running of the loop which was started by {@link #runMailboxLoop()}}. */public void suspend() {sendPoisonMail(() -> suspended = true);}
// ... existing code ...

以及 MailboxController 中的 suspendDefaultAction()

异常处理:

  • reportThrowable(Throwable throwable) 方法允许将其他线程中发生的异常报告给邮箱线程,并在邮箱线程中重新抛出,从而中断任务执行。
// ... existing code ...public void reportThrowable(Throwable throwable) {sendControlMail(() -> {if (throwable instanceof Exception) {throw (Exception) throwable;} else if (throwable instanceof Error) {throw (Error) throwable;} else {throw WrappingRuntimeException.wrapIfNecessary(throwable);}},"Report throwable %s",throwable);}
// ... existing code ...

度量指标控制 (MailboxMetricsController):

  • 包含一个 MailboxMetricsController 用于控制和访问邮箱相关的度量指标,如邮箱延迟、处理的邮件数量等。
// ... existing code ...private final MailboxMetricsController mailboxMetricsControl;// ... existing code ...@VisibleForTestingpublic MailboxMetricsController getMailboxMetricsControl() {return this.mailboxMetricsControl;}
// ... existing code ...

MailboxProcessor 与 StreamTask 的互动

MailboxProcessor 为 StreamTask 提供了一个强大的、基于邮箱的单线程执行引擎。StreamTask 委托 MailboxProcessor 来驱动其核心的数据处理循环,并通过 MailboxExecutor 将所有需要与任务主线程同步的异步操作(如 Timer、Checkpoint 事件)统一提交到邮箱中进行调度。这种设计确保了任务内部操作的串行化,简化了并发控制,并提高了系统的稳定性和可维护性。

StreamTask 是 Flink 流处理任务的基类,它使用 MailboxProcessor 来管理其核心执行逻辑。

  1. 创建和持有 MailboxProcessor:

    • StreamTask 在其构造函数中创建并持有一个 MailboxProcessor 实例。
    • MailboxDefaultAction 通常被设置为 StreamTask::processInput,这意味着当邮箱为空时,StreamTask 会执行其数据处理逻辑。
    • StreamTaskActionExecutor 也被传递给 MailboxProcessor
  2. 驱动执行循环:

    • StreamTask 的 invoke() 方法是任务的执行入口。在其核心逻辑中,它会调用 mailboxProcessor.runMailboxLoop() 来启动邮箱处理循环。这个循环会一直运行,直到任务完成或被取消。
    • 代码见 StreamTask.invoke():

      StreamTask.java

      // ... existing code ...
      public final void invoke() throws Exception {// ... initialization ...try {// ...// Run mailbox until all gates will be recovered.mailboxProcessor.runMailboxLoop(); // 启动邮箱循环// ...} finally {// ... cleanup ...// let mailbox execution reject all new letters from this pointmailboxProcessor.prepareClose();// ...mailboxProcessor.close();}
      }
      // ... existing code ...
      
  3. 提交异步动作:

    • StreamTask 及其相关的组件(如 TimerServiceSubtaskCheckpointCoordinator)需要执行一些异步操作,例如触发 Timer、执行 Checkpoint、响应外部事件等。这些操作需要确保在任务的主线程中执行,以避免并发问题。
    • StreamTask 通过从 mailboxProcessor 获取的 MailboxExecutor 来提交这些异步操作。这些操作会被封装成 Mail 放入邮箱,由 MailboxProcessor 在其循环中按顺序执行。
    • 例如,ProcessingTimeService 的实现会使用 MailboxExecutor 来调度 Timer 的触发:

      StreamOperatorFactoryUtil.java

      // ... existing code ...public static <OUT, OP extends StreamOperator<OUT>> OP createOperator(// ...MailboxExecutor mailboxExecutor = // Obtained via containingTask.getMailboxExecutorFactory()containingTask.getMailboxExecutorFactory().createExecutor(configuration.getChainIndex());// ...final ProcessingTimeService processingTimeService;if (operatorFactory instanceof ProcessingTimeServiceAware) {processingTimeService =((ProcessingTimeServiceAware) operatorFactory).createProcessingTimeService(mailboxExecutor);} else {processingTimeService = processingTimeServiceFactory.get();}
      // ... existing code ...
      
      ProcessingTimeServiceImpl 内部会使用这个 mailboxExecutor 来 execute 或 schedule 定时任务。
  4. 控制流程与状态:

    • StreamTask 的 processInput 方法(作为 MailboxDefaultAction)可以通过 MailboxDefaultAction.Controller 与 MailboxProcessor 交互。例如,当输入数据处理完毕或遇到反压时,它可以调用 controller.suspendDefaultAction() 来暂时挂起默认动作的执行,让 MailboxProcessor 优先处理邮箱中的其他 Mail
    • 代码见 StreamTask.processInput():
      // ... existing code ...
      protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {DataInputStatus status = inputProcessor.processInput();switch (status) {// ...case END_OF_INPUT:// Suspend the mailbox processor, it would be resumed in afterInvoke and finished// after all records processed by the downstream tasks. We also suspend the default// actions to avoid repeat executing the empty default operation (namely process// records).controller.suspendDefaultAction(); // 通过 Controller 控制 MailboxProcessormailboxProcessor.suspend();return;}// ...
      }
      // ... existing code ...
      
  5. 生命周期同步:

    • StreamTask 在其生命周期的不同阶段(如 cancelTaskafterInvoke)会调用 MailboxProcessor 的相应方法(如 prepareClosecloseallActionsCompleted)来同步状态和清理资源。
    • 例如,在任务正常结束或需要最终 Checkpoint 完成后,会调用 mailboxProcessor.allActionsCompleted():

      StreamTask.java

      // ... existing code ...FutureUtils.waitForAll(terminationConditions).thenRun(mailboxProcessor::allActionsCompleted);// Resumes the mailbox processor. The mailbox processor would be completed// after all records are processed by the downstream tasks.mailboxProcessor.runMailboxLoop();
      // ... existing code ...
      

TaskMailboxImpl 

虽然这个类的核心结构是“一个锁(ReentrantLock)加一个队列(Deque<Mail>)”,但它的实现中包含了一些针对 Flink Task 执行模型的特定优化和设计,使其不仅仅是一个简单的线程安全队列。

@ThreadSafe
public class TaskMailboxImpl implements TaskMailbox {/** Lock for all concurrent ops. */private final ReentrantLock lock = new ReentrantLock();/** Internal queue of mails. */@GuardedBy("lock")private final Deque<Mail> queue = new ArrayDeque<>();/** Condition that is triggered when the mailbox is no longer empty. */@GuardedBy("lock")private final Condition notEmpty = lock.newCondition();/** The state of the mailbox in the lifecycle of open, quiesced, and closed. */@GuardedBy("lock")private State state = OPEN;/** Reference to the thread that executes the mailbox mails. */@Nonnull private final Thread taskMailboxThread;/*** The current batch of mails. A new batch can be created with {@link #tryBuildBatch()} and* consumed with {@link #tryTakeFromBatch()}.*/private final Deque<Mail> batch = new ArrayDeque<>();/*** Performance optimization where hasNewMail == !queue.isEmpty(). Will not reflect the state of* {@link #batch}.*/private volatile boolean hasNewMail = false;/*** Performance optimization where there is new urgent mail. When there is no urgent mail in the* batch, it should be checked every time mail is taken, including taking mail from batch queue.*/private volatile boolean hasNewUrgentMail = false;public TaskMailboxImpl(@Nonnull final Thread taskMailboxThread) {this.taskMailboxThread = taskMailboxThread;}@VisibleForTestingpublic TaskMailboxImpl() {this(Thread.currentThread());}

核心结构:

  1. lock: ReentrantLock: 这是实现线程安全的核心。所有对内部队列 queue 和状态 state 的并发访问都由此锁保护。

  2. queue: Deque<Mail>:

    • 实际存储待处理 Mail 对象的双端队列。Mail 对象封装了需要执行的动作(通常是一个 Runnable)及其优先级。
    • 使用 ArrayDeque 作为底层实现。
  3. notEmpty: Condition:

    • 与 lock 关联的条件变量。当邮箱从空变为非空时(即有新的 Mail 被放入),会通过 notEmpty.signal() 或 notEmpty.signalAll() 来唤醒可能正在等待获取 Mail 的线程(主要是邮箱处理线程)。
    • 在 take() 方法中,如果队列为空,线程会调用 notEmpty.await() 等待。
  4. state: State (enum: OPENQUIESCEDCLOSED):

    • 表示邮箱的生命周期状态:
      • OPEN: 邮箱正常工作,可以接收和发送邮件。
      • QUIESCED: 邮箱处于静默状态,不再接受新的邮件(put 操作会失败),但仍然可以取出已有的邮件。通常在任务准备关闭时进入此状态。
      • CLOSED: 邮箱已关闭,不能进行任何操作。所有未处理的邮件会被清空。
    • 状态转换由 quiesce() 和 close() 方法控制,并且这些操作也受 lock 保护。
  5. taskMailboxThread: Thread:

    • 一个非常重要的字段,它存储了被指定为“邮箱线程”的线程引用。
    • 很多操作(如 taketryTakehasMailcreateBatchtryTakeFromBatchquiesceclose)都强制要求调用者必须是这个 taskMailboxThread,通过 checkIsMailboxThread() 进行检查。这是因为 Flink 的 Task 执行模型是单线程的,MailboxProcessor 会在其专用的线程中处理邮箱中的邮件和默认动作。
  6. batch: Deque<Mail>:

    • 这是一个性能优化的设计。MailboxProcessor 在其主循环中,会先调用 createBatch() 将主队列 queue 中的所有邮件一次性转移到这个 batch 队列中。然后,MailboxProcessor 会优先从 batch 中通过 tryTakeFromBatch() 获取邮件进行处理。
    • 目的: 减少锁的竞争。createBatch() 在持有锁的情况下将一批邮件转移出来,之后 MailboxProcessor 处理 batch 中的邮件时就不再需要频繁获取锁去访问主队列 queue。这对于高吞吐量的场景非常重要。
    • batch 的操作也仅限于 taskMailboxThread
  7. hasNewMail: volatile boolean:

    • 这是另一个性能优化。它大致反映了主队列 queue 是否为空 (!queue.isEmpty())。
    • volatile 关键字确保了不同线程对它的可见性。
    • 目的: 允许邮箱线程在不获取锁的情况下快速检查是否有新邮件。例如,在 hasMail() 和 tryTake() 方法中,会先检查 batch,然后检查 hasNewMail,只有当 hasNewMail 为 true 时,才尝试获取锁并检查主队列 queue
    • 当有新邮件通过 put() 或 putFirst()(从非邮箱线程调用时)添加到 queue 时,hasNewMail 会被设置为 true。当邮件从 queue 中被取出或通过 createBatch() 转移到 batch 时,hasNewMail 会被更新。

特别需要注意的点:

  1. 单消费者(邮箱线程)设计:

    • 尽管 put() 和 putFirst() 方法允许从任何线程添加邮件(是线程安全的),但所有取邮件的操作(taketryTakecreateBatchtryTakeFromBatch)以及生命周期管理方法(quiesceclose)都必须由 taskMailboxThread 调用。这是 Flink Mailbox 模型的核心设计,确保了任务逻辑的单线程执行。
  2. 批处理优化 (batch 队列):

    • 理解 batch 队列的作用对于分析性能至关重要。它不是一个独立的邮箱,而是主队列 queue 的一个临时缓存,用于减少锁争用。MailboxProcessor 会周期性地将 queue 中的内容“批发”到 batch 中。
  3. hasNewMail 优化hasNewMail 变量提供了一种轻量级的检查机制,避免了邮箱线程在主队列可能为空时仍频繁获取锁。

  4. 优先级处理 (takeOrNull 方法):

    • takeOrNull(Deque<Mail> queue, int priority) 方法实现了从队列中根据优先级取出邮件的逻辑。它会遍历队列,找到第一个优先级大于或等于指定 priority 的邮件并返回。这意味着高优先级的邮件(如控制命令、Checkpoint barrier)可以被优先处理。
  5. putFirst() 的特殊行为:

    • putFirst(@Nonnull Mail mail) 方法很有意思:
      • 如果调用者是 taskMailboxThread,邮件会直接被添加到 batch 队列的头部。这是因为邮箱线程是当前批次邮件的消费者,将邮件直接放入批处理队列的头部可以使其被更快处理,而无需等待下一轮 createBatch
      • 如果调用者不是 taskMailboxThread,邮件会被添加到主队列 queue 的头部,并通过 notEmpty.signal() 唤醒邮箱线程。
  6. 生命周期管理 (statequiesce()close()):

    • 邮箱的生命周期状态转换是严格控制的,并且与任务的生命周期紧密相关。
    • quiesce(): 使邮箱不再接受新邮件,但允许处理完已有的邮件。
    • close(): 彻底关闭邮箱,清空所有邮件,并唤醒所有可能在等待的线程(通过 notEmpty.signalAll()),通常是为了让它们感知到关闭状态并退出。
  7. 锁的粒度和使用:

    • ReentrantLock 用于保护对共享数据(queuestate)的访问。
    • Condition (notEmpty) 用于实现生产者-消费者模式中的等待和通知机制。
    • lock.lockInterruptibly() 在 take() 方法中使用,允许等待的邮箱线程响应中断。
  8. runExclusively(Runnable runnable):

    • 提供了一种机制,允许以独占方式在邮箱的锁保护下执行一段代码。这对于需要原子地执行多个邮箱操作(例如,检查状态然后根据状态放入邮件)的场景非常有用,可以避免竞态条件。

总而言之,TaskMailboxImpl 虽然基于简单的锁和队列,但通过引入批处理、hasNewMail 标志、严格的线程模型以及精细的生命周期管理,为 Flink 的 MailboxProcessor 提供了一个高效且功能完备的邮件调度机制。这些设计都是为了在保证单线程执行模型的前提下,最大化吞吐量并减少不必要的同步开销。

Mail 类分析

Mail 类是 Apache Flink 流处理运行时任务邮箱机制中的一个核心组件。它代表一个可执行的任务单元,绑定到特定的操作符链中,可以被下游邮箱处理器选择执行。

主要属性

  • mailOptions: 邮件选项,用于配置邮件的行为,如是否可延迟执行。
  • runnable: 要执行的操作,是一个 ThrowingRunnable 类型的实例,可以抛出异常。
  • priority: 邮件的优先级。优先级并不直接决定执行顺序,而是用于避免上下游操作符之间的活锁或死锁问题。
  • descriptionFormat 和 descriptionArgs: 用于调试和错误报告的邮件描述信息。
  • actionExecutor: 用于执行 runnable 的执行器。

Mail 类提供了三个构造函数,允许灵活地创建邮件对象:

  1. 最简单的构造函数只需要 runnablepriority 和描述信息。
  2. 可以指定 MailboxExecutor.MailOptions 来配置邮件选项。
  3. 可以指定 StreamTaskActionExecutor 来控制操作的执行方式。

核心方法

  • getMailOptions(): 获取邮件选项。
  • getPriority(): 获取邮件的优先级。如果邮件是可延迟的,则返回最小优先级。
  • tryCancel(): 尝试取消邮件的执行。
  • toString(): 返回邮件的描述信息。
  • run(): 执行邮件中的操作。

Mail 类在 Flink 的流处理任务中扮演着重要角色。它允许将任务分解为小的、可执行的单元,并通过邮箱机制进行调度和执行。这种设计有助于提高任务的并发性和响应性,同时避免复杂的同步问题。

在实际使用中,可以通过创建 Mail 对象来封装需要执行的操作,并将其提交到邮箱中等待执行。通过设置不同的优先级和选项,可以控制操作的执行顺序和行为。

MailboxExecutorImpl

MailboxExecutorImpl 实现了 flink.api.common.operators.MailboxExecutor 接口,它充当了向 Flink Task 的邮箱(TaskMailbox)提交执行单元(Runnable 或 Callable)的一个入口或门面。它的核心目标是允许其他组件将代码片段(封装为 Mail 对象)放入邮箱,这些代码片段最终会由 MailboxProcessor 在其专用的单线程中执行。

核心成员变量:

  • mailbox: TaskMailbox: 这是实际存储待执行邮件的邮箱实例。MailboxExecutorImpl 将通过它来提交新的邮件。

    MailboxExecutorImpl.java

    // ... existing code .../** The mailbox that manages the submitted runnable objects. */@Nonnull private final TaskMailbox mailbox;
    // ... existing code ...
    
  • priority: int: 与此 MailboxExecutorImpl 实例关联的邮件的默认优先级。当通过这个执行器提交任务时,任务会带上这个优先级。
    // ... existing code ...private final int priority;
    // ... existing code ...
    
  • actionExecutor: StreamTaskActionExecutor: 这是一个执行器,用于实际运行封装在 Mail 对象中的命令。Mail 对象在被 MailboxProcessor 取出后,其 run() 方法会使用这个 actionExecutor 来执行具体的逻辑。
    // ... existing code ...private final StreamTaskActionExecutor actionExecutor;
    // ... existing code ...
    
  • mailboxProcessor: MailboxProcessor (可能为 null): 指向驱动邮箱循环的 MailboxProcessor。主要用于 isIdle() 方法的判断。
    // ... existing code ...private final MailboxProcessor mailboxProcessor;
    // ... existing code ...
    

构造函数:

  • 提供了两个构造函数,主要的区别在于是否传入 MailboxProcessor
    // ... existing code ...public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor) {this(mailbox, priority, actionExecutor, null);}public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox,int priority,StreamTaskActionExecutor actionExecutor,MailboxProcessor mailboxProcessor) {this.mailbox = mailbox;this.priority = priority;this.actionExecutor = Preconditions.checkNotNull(actionExecutor);this.mailboxProcessor = mailboxProcessor;}
    // ... existing code ...
    
    它们初始化了执行器的核心组件。

主要方法分析:

execute

// ... existing code ...@Overridepublic void execute(MailOptions mailOptions,final ThrowingRunnable<? extends Exception> command,final String descriptionFormat,final Object... descriptionArgs) {try {mailbox.put(new Mail(mailOptions,command,priority,actionExecutor,descriptionFormat,descriptionArgs));} catch (MailboxClosedException mbex) {throw new RejectedExecutionException(mbex);}}
// ... existing code ...
  • 这是向邮箱提交任务的核心方法。
  • 它接收一个 ThrowingRunnable 作为要执行的命令,以及 MailOptions(用于配置邮件行为,例如是否可延迟)、描述信息等。
  • 内部会创建一个新的 Mail 对象,该对象封装了传入的 command、此执行器实例的 priorityactionExecutor 以及描述信息。
  • 然后调用 mailbox.put(new Mail(...)) 将这个新创建的 Mail 对象放入 TaskMailbox 中。
  • 如果邮箱已经关闭(MailboxClosedException),则会抛出 RejectedExecutionException,这是 Executor 服务在无法接受新任务时的标准行为。

yield 

此方法设计为由邮箱线程自身调用

  • 它尝试从 mailbox 中获取一个至少具有 此执行器 priority 的邮件 (mailbox.take(priority))。这是一个阻塞操作,如果当前没有符合条件的邮件,它会等待。
  • 一旦获取到 Mail 对象,它会立即在当前线程(即邮箱线程)中执行 mail.run()
  • 目的: 允许当前正在邮箱线程中执行的某个可能耗时较长的操作(例如用户函数)主动暂停,让邮箱中其他待处理的邮件(特别是具有相同或更高优先级的邮件,如 Checkpoint Barrier)有机会执行。这是一种协作式多任务处理机制,对于保证邮箱系统的响应性至关重要。
    @Overridepublic void yield() throws InterruptedException {Mail mail = mailbox.take(priority);try {mail.run();} catch (Exception ex) {throw WrappingRuntimeException.wrapIfNecessary(ex);}}

  1. tryYield():

    • 与 yield() 类似,但是一个非阻塞版本。
    • 它调用 mailbox.tryTake(priority) 尝试获取邮件。
    • 如果成功获取到邮件,则执行它并返回 true
    • 如果没有符合条件的邮件,则立即返回 false,不会阻塞。
    • 需要注意的是,根据 MailboxExecutor 接口的约定和 MailOptions.deferrable() 的设计,yield() 和 tryYield() 通常不会执行被标记为 "deferrable"(可延迟)的邮件。这是为了在需要快速让出执行权(例如为了处理 Checkpoint)时,避免执行那些可以稍后处理的低优先级或非紧急任务。
  2. shouldInterrupt():

    • 此方法用于指示当前正在邮箱线程中执行的操作是否应该被中断(例如,一个长时间运行的用户函数)。
    • 目前的实现是简单地检查 mailbox.hasMail(),即只要邮箱中还有任何待处理的邮件,就建议中断。
    • 代码中的 TODO: FLINK-35051 注释表明,这是一个待优化的点。理想情况下,只有当邮箱中有时间敏感的邮件(例如与 Checkpoint 相关的邮件)时,才应该建议中断,以避免不必要的性能开销。
  3. isIdle():

    // ... existing code ...public boolean isIdle() {return !mailboxProcessor.isDefaultActionAvailable()&& !mailbox.hasMail()&& mailbox.getState().isAcceptingMails();}
    // ... existing code ...
    
    • 检查关联的 MailboxProcessor 是否处于空闲状态。
    • 判断条件为:
      • MailboxProcessor 的默认操作(通常是 processInput)当前不可用(即被挂起)。
      • TaskMailbox 中没有待处理的邮件。
      • TaskMailbox 的状态仍然是接受邮件的状态(即不是 QUIESCED 或 CLOSED)。
    • 这个方法需要 mailboxProcessor 成员不为 null

总结与作用

MailboxExecutorImpl 为 Flink 的异步操作和事件驱动模型提供了一个关键的接口。它使得系统中的不同部分(例如 Timer Service、Checkpoint Coordinator,甚至是算子自身)能够安全地将需要在 Task 主执行线程(即邮箱线程)中执行的逻辑提交到邮箱队列。

  • 封装提交逻辑: 它将创建 Mail 对象并将其放入 TaskMailbox 的细节封装起来,提供了一个更简洁的 Executor 风格的 API。
  • 支持优先级: 允许为通过特定执行器实例提交的任务指定一个默认优先级。
  • 协作式调度 (yield/tryYield): 这是 Mailbox 模型单线程执行模式下实现并发感和响应性的核心机制。它允许长时间运行的任务主动让出控制权,确保高优先级任务(如系统事件)能够及时处理。
  • 中断提示 (shouldInterrupt): 为长时间运行的用户代码提供了一个检查点,以便在需要时(例如为了执行 Checkpoint)能够优雅地中断。

通过 MailboxExecutorImpl,Flink 能够确保所有关键的 Task 级别操作(数据处理、状态访问、Checkpoint、Timer 回调等)都在同一个线程中有序执行,从而避免了复杂的并发控制问题,简化了状态管理和一致性保证。

MailboxProcessor 细节分析

MailboxProcessor 封装了基于 Mailbox 的执行模型的完整逻辑。它的核心是一个事件循环 (runMailboxLoop),该循环持续执行两个主要任务:

  1. 处理邮箱中的邮件 (Mail): 检查 TaskMailbox 中是否有待处理的邮件(例如 Checkpoint 触发、Timer 事件、用户通过 MailboxExecutor 提交的自定义逻辑等),并按优先级顺序执行它们。
  2. 执行默认动作 (MailboxDefaultAction): 如果邮箱中没有邮件,或者邮件处理完毕后,它会执行一个“默认动作”。在 StreamTask 的上下文中,这个默认动作通常是 processInput(),即处理来自上游的数据。

这种设计确保了 Task 内部所有操作(数据处理、Checkpoint、Timer 等)的单线程执行,从而极大地简化了并发控制和状态管理。

主要结构组件:

  1. mailbox: TaskMailbox: 这是实际存储和管理 Mail 对象的组件。MailboxProcessor 从它那里获取邮件。

  2. mailboxDefaultAction: MailboxDefaultAction: 代表在邮箱空闲时重复执行的默认操作。它通过 MailboxDefaultAction.Controller 与 MailboxProcessor 交互,例如在没有输入数据时通知 MailboxProcessor 暂停调用默认动作。

  3. actionExecutor: StreamTaskActionExecutor: 用于实际执行 Mail 中封装的 RunnableMail 对象本身不直接执行逻辑,而是委托给这个执行器。

  4. 控制标志 (Control Flags) - 这些标志必须只能从邮箱线程访问,以避免竞态条件:

    • mailboxLoopRunning: boolean: 控制主事件循环是否应该继续运行。当设置为 false 时,循环会在当前迭代完成后终止。
    • suspended: boolean: 控制邮箱处理器是否被临时挂起。如果为 truerunMailboxLoop 会退出,但之后可以被重新调用以恢复。
    • suspendedDefaultAction: DefaultActionSuspension: 记录当前默认动作是否被挂起。如果非 null,表示默认动作已挂起,MailboxProcessor 不会调用它。
    // ... existing code .../*** Control flag to terminate the mailbox processor. Once it was terminated could not be* restarted again. Must only be accessed from mailbox thread.*/private boolean mailboxLoopRunning;/*** Control flag to temporary suspend the mailbox loop/processor. After suspending the mailbox* processor can be still later resumed. Must only be accessed from mailbox thread.*/private boolean suspended;/*** Remembers a currently active suspension of the default action. Serves as flag to indicate a* suspended default action (suspended if not-null) and to reuse the object as return value in* consecutive suspend attempts. Must only be accessed from mailbox thread.*/private DefaultActionSuspension suspendedDefaultAction;
    // ... existing code ...
    
  5. mailboxMetricsControl: MailboxMetricsController: 用于管理和暴露与邮箱相关的度量指标。

MailboxProcessor 提供了多个构造函数,允许不同程度的定制。核心的构造函数接收 MailboxDefaultActionTaskMailboxStreamTaskActionExecutor 和 MailboxMetricsController

一个常见的用法是传入一个 MailboxDefaultAction,然后 MailboxProcessor 会使用默认的 TaskMailboxImpl(与当前线程绑定)和 StreamTaskActionExecutor.IMMEDIATE

// ... existing code ...public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction,TaskMailbox mailbox,StreamTaskActionExecutor actionExecutor,MailboxMetricsController mailboxMetricsControl) {this.mailboxDefaultAction = Preconditions.checkNotNull(mailboxDefaultAction);this.actionExecutor = Preconditions.checkNotNull(actionExecutor);this.mailbox = Preconditions.checkNotNull(mailbox);this.mailboxLoopRunning = true;this.suspendedDefaultAction = null;this.mailboxMetricsControl = mailboxMetricsControl;}
// ... existing code ...

runMailboxLoop()

// ... existing code ...public void runMailboxLoop() throws Exception {suspended = !mailboxLoopRunning;final TaskMailbox localMailbox = mailbox;checkState(localMailbox.isMailboxThread(),"Method must be executed by declared mailbox thread!");assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";final MailboxController mailboxController = new MailboxController(this);while (isNextLoopPossible()) {// The blocking `processMail` call will not return until default action is available.processMail(localMailbox, false);if (isNextLoopPossible()) {mailboxDefaultAction.runDefaultAction(mailboxController); // lock is acquired inside default action as needed}}}private boolean isNextLoopPossible() {// 'Suspended' can be false only when 'mailboxLoopRunning' is true.return !suspended;}
// ... existing code ...
  • 这是 MailboxProcessor 的心脏。它在一个 while (isNextLoopPossible()) 循环中运行。
  • 前置检查: 确保该方法由指定的邮箱线程执行,并且邮箱处于 OPEN 状态。
  • 创建 MailboxControllerMailboxController 是 MailboxDefaultAction 与 MailboxProcessor 交互的桥梁。
  • 循环体:
    • processMail(localMailbox, false): 调用此方法处理邮箱中的邮件。这是一个关键步骤,它会尝试非阻塞地处理一批邮件。如果默认操作被挂起,它可能会阻塞地等待邮件或默认操作变为可用。false 表示不是单步执行。
    • if (isNextLoopPossible()) { mailboxDefaultAction.runDefaultAction(mailboxController); }: 如果循环仍然可以继续(例如,没有被挂起或关闭),并且默认动作是可用的,则执行默认动作。
  • 设计理念: 注释中提到,runMailboxLoop 的设计目标是保持热路径(默认动作,邮箱中没有邮件)尽可能快。因此,对控制标志(如 mailboxLoopRunningsuspendedDefaultAction)的检查通常与 mailbox.hasMail() 为 true 相关联。这意味着,如果要在邮箱线程内部更改这些标志,必须确保邮箱中至少有一个邮件,以便更改能被及时感知。

processMail

// ... existing code ...private boolean processMail(TaskMailbox mailbox, boolean singleStep) throws Exception {// Doing this check is an optimization to only have a volatile read in the expected hot// path, locks are only// acquired after this point.boolean isBatchAvailable = mailbox.createBatch();// Take mails in a non-blockingly and execute them.boolean processed = isBatchAvailable && processMailsNonBlocking(singleStep);if (singleStep) {return processed;}// If the default action is currently not available, we can run a blocking mailbox execution// until the default action becomes available again.processed |= processMailsWhenDefaultActionUnavailable();return processed;}private boolean processMailsNonBlocking(boolean singleStep) throws Exception {long processedMails = 0;Optional<Mail> maybeMail;while (isNextLoopPossible() && (maybeMail = mailbox.tryTakeFromBatch()).isPresent()) {if (processedMails++ == 0) {maybePauseIdleTimer();}runMail(maybeMail.get());if (singleStep) {break;}}if (processedMails > 0) {maybeRestartIdleTimer();return true;} else {return false;}}
// ... existing code ...

其中 processMailsNonBlocking 和 processMailsWhenDefaultActionUnavailable 内部会调用 runMail(Mail mail) 来实际执行邮件:

// ... existing code ...private void runMail(Mail mail) throws Exception {mailboxMetricsControl.getMailCounter().inc();mail.run();
// ... existing code ...
  • 此方法负责处理邮箱中的邮件。
  • mailbox.createBatch(): 首先尝试从主队列创建一批邮件到 TaskMailbox 的内部批处理队列。这是一个优化,减少锁竞争。
  • processMailsNonBlocking(singleStep): 非阻塞地处理批处理队列中的邮件。如果 singleStep 为 true,则只处理一个邮件(用于测试或调试)。
  • processMailsWhenDefaultActionUnavailable(): 如果默认动作当前不可用(例如,由于反压或没有输入),此方法会尝试从邮箱中获取并处理邮件。它可能会阻塞地等待新邮件的到来,直到默认动作再次可用或循环终止。
  • 返回 true 如果至少处理了一封邮件。

suspend()

// ... existing code .../** Suspend the running of the loop which was started by {@link #runMailboxLoop()}}. */public void suspend() {sendPoisonMail(() -> suspended = true);}/** Send mail in first priority for internal needs. */private void sendPoisonMail(RunnableWithException mail) {mailbox.runExclusively(() -> {// keep state check and poison mail enqueuing atomic, such that no intermediate// #close may cause a// MailboxStateException in #sendPriorityMail.if (mailbox.getState() == TaskMailbox.State.OPEN) {sendControlMail(mail, "poison mail");}});public void runExclusively(Runnable runnable) {lock.lock();try {runnable.run();} finally {lock.unlock();}}
// ... existing code ...
  • 用于从外部(非邮箱线程)请求挂起邮箱循环。
  • 它通过 sendPoisonMail() 向邮箱头部插入一个高优先级的“毒丸”邮件。当这个邮件被处理时,它会将 suspended 标志设置为 true,从而导致 runMailboxLoop 在下一次检查 isNextLoopPossible() 时退出。
  • Poison Mail: 是一种特殊控制邮件,用于改变 MailboxProcessor 的内部状态。

allActionsCompleted()

// ... existing code .../*** This method must be called to end the stream task when all actions for the tasks have been* performed.*/public void allActionsCompleted() {sendPoisonMail(() -> {mailboxLoopRunning = false;suspended = true;});}
// ... existing code ...
  • 当 Task 的所有动作都已完成,需要终止邮箱循环时调用此方法。
  • 与 suspend() 类似,它也通过 sendPoisonMail() 发送一个毒丸邮件。该邮件会将 mailboxLoopRunning 设置为 false 并将 suspended 设置为 true,从而彻底停止事件循环。

sendPoisonMail 和 sendControlMail(...):

// ... existing code .../** Send mail in first priority for internal needs. */private void sendPoisonMail(RunnableWithException mail) {mailbox.runExclusively(() -> {// keep state check and poison mail enqueuing atomic, such that no intermediate// #close may cause a// MailboxStateException in #sendPriorityMail.if (mailbox.getState() == TaskMailbox.State.OPEN) {sendControlMail(mail, "poison mail");}});}/*** Sends the given <code>mail</code> using {@link TaskMailbox#putFirst(Mail)} . Intended use is* to control this <code>MailboxProcessor</code>; no interaction with tasks should be performed;*/private void sendControlMail(RunnableWithException mail, String descriptionFormat, Object... descriptionArgs) {mailbox.putFirst(new Mail(mail,Integer.MAX_VALUE /*not used with putFirst*/,descriptionFormat,descriptionArgs));}
// ... existing code ...
  • sendPoisonMail: 确保在邮箱 OPEN 状态下,通过 sendControlMail 发送一个控制邮件。它使用 mailbox.runExclusively 来原子地检查状态和入队。
  • sendControlMail: 将一个具有最高优先级的 Mail 对象(通过 mailbox.putFirst())放入邮箱。这些邮件用于内部控制,如挂起、终止、报告错误等。

生命周期方法 (prepareClose()close()):

// ... existing code .../** Lifecycle method to close the mailbox for action submission. */public void prepareClose() {mailbox.quiesce();}/*** Lifecycle method to close the mailbox for action submission/retrieval. This will cancel all* instances of {@link java.util.concurrent.RunnableFuture} that are still contained in the* mailbox.*/@Overridepublic void close() {List<Mail> droppedMails = mailbox.close();
// ... existing code ...
  • prepareClose(): 调用 mailbox.quiesce()。这会使邮箱进入静默状态,不再接受新的邮件,但允许处理已有的邮件。这是关闭过程的第一步。
  • close(): 调用 mailbox.close()。这会彻底关闭邮箱,清空所有未处理的邮件,并尝试取消仍在邮箱中的 RunnableFuture 实例。

与 MailboxDefaultAction 的交互 (通过 MailboxController):

// ... existing code ...
protected static final class MailboxController implements MailboxDefaultAction.Controller {private final MailboxProcessor mailboxProcessor;protected MailboxController(MailboxProcessor mailboxProcessor) {this.mailboxProcessor = mailboxProcessor;}@Overridepublic void allActionsCompleted() {mailboxProcessor.allActionsCompleted();}@Overridepublic MailboxDefaultAction.Suspension suspendDefaultAction(PeriodTimer suspensionPeriodTimer) {return mailboxProcessor.suspendDefaultAction(suspensionPeriodTimer);}
// ... existing code ...
}// ... existing code ...
private final class DefaultActionSuspension implements MailboxDefaultAction.Suspension {@Nullable private final PeriodTimer suspensionTimer;public DefaultActionSuspension(@Nullable PeriodTimer suspensionTimer) {this.suspensionTimer = suspensionTimer;}@Overridepublic void resume() {if (mailbox.isMailboxThread()) {resumeInternal();} else {try {sendControlMail(this::resumeInternal, "resume default action");} catch (MailboxClosedException ex) {// Ignored}}}private void resumeInternal() {// This method must be called from the mailbox thread.if (mailboxProcessor.suspendedDefaultAction == this) {mailboxProcessor.suspendedDefaultAction = null;if (suspensionTimer != null) {suspensionTimer.markEnd();}}}}
// ... existing code ...
  • MailboxController 是一个内部类,实现了 MailboxDefaultAction.Controller 接口。
  • MailboxDefaultAction 通过这个 Controller 来与 MailboxProcessor 通信。
  • suspendDefaultAction(): 当默认动作(如 processInput)发现当前没有工作可做时(例如,没有输入数据或下游反压),它会调用 controller.suspendDefaultAction()
  • MailboxProcessor.suspendDefaultAction(@Nullable PeriodTimer suspensionTimer):
    • 此方法(只能由邮箱线程调用)将 suspendedDefaultAction 设置为一个新的 DefaultActionSuspension 实例。
    • DefaultActionSuspension 实现了 MailboxDefaultAction.Suspension 接口,其 resume() 方法用于恢复默认动作的执行。resume() 可以从任何线程调用,如果不是邮箱线程,它会发送一个控制邮件来确保恢复逻辑在邮箱线程中执行。

获取 MailboxExecutor:

// ... existing code ...public MailboxExecutor getMainMailboxExecutor() {return new MailboxExecutorImpl(mailbox, MIN_PRIORITY, actionExecutor);}/*** Returns an executor service facade to submit actions to the mailbox.** @param priority the priority of the {@link MailboxExecutor}.*/public MailboxExecutor getMailboxExecutor(int priority) {return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this);}
// ... existing code ...
  • getMainMailboxExecutor(): 返回一个具有最低优先级的 MailboxExecutor
  • getMailboxExecutor(int priority): 返回一个指定优先级的 MailboxExecutor。这些 MailboxExecutor 实例允许其他组件向此 MailboxProcessor 的邮箱提交任务。

总结

MailboxProcessor 是 Flink Task 单线程执行模型的核心。它通过一个事件循环来协调处理高优先级的控制/事件邮件和低优先级的默认数据处理动作。这种机制确保了:

  • 单线程执行: 所有关键逻辑都在同一个线程中执行,避免了复杂的并发同步。
  • 响应性: 高优先级邮件(如 Checkpoint barriers)可以抢占默认动作,保证系统事件的及时处理。
  • 可控性: 提供了挂起、恢复、终止事件循环的机制。
  • 可扩展性: 通过 MailboxExecutor 允许外部组件向邮箱提交自定义任务。

processInput

processInput 方法是 StreamTask 执行其核心数据处理逻辑的地方。它是作为 MailboxProcessor默认动作 (MailboxDefaultAction) 来执行的。这意味着,当 MailboxProcessor 的邮箱中没有更高优先级的“邮件”(如 Checkpoint 触发、Timer 事件等)需要处理时,它就会循环调用这个 processInput 方法。

下面是对 processInput 方法的详细分析:

  1. 方法职责与设计理念:

    1. 处理输入事件: 其核心职责是从输入源(由 inputProcessor 代表)获取一个事件(通常是一条记录或一组记录),并将其传递给后续的算子链进行处理。

    2. 非阻塞性: 注释中强调“Implementations should (in general) be non-blocking”。这是非常关键的一点。因为 MailboxProcessor 是单线程执行其邮箱中的邮件和默认动作的,如果 processInput 长时间阻塞,将会导致 Checkpoint barriers、Timer 等重要事件无法及时处理,影响任务的正确性和性能。

    3. 与 MailboxProcessor 协作: 通过 MailboxDefaultAction.Controller controller 参数,processInput 可以与 MailboxProcessor 进行交互,例如在没有数据或遇到反压时,通知 MailboxProcessor 暂停调用默认动作。

  2. 处理输入 (inputProcessor.processInput()):

  • 方法首先调用 inputProcessor.processInput()InputProcessor 负责从上游读取数据、反序列化,并将数据喂给当前 Task 的第一个 Operator。
  • processInput() 的返回值 DataInputStatus 描述了本次输入处理的结果。

根据 DataInputStatus 进行分支处理:

DataInputStatus status = inputProcessor.processInput();switch (status) {case MORE_AVAILABLE:if (taskIsAvailable()) {return;}break;case NOTHING_AVAILABLE:break;case END_OF_RECOVERY:throw new IllegalStateException("We should not receive this event here.");case STOPPED:endData(StopMode.NO_DRAIN);return;case END_OF_DATA:endData(StopMode.DRAIN);notifyEndOfData();return;case END_OF_INPUT:// Suspend the mailbox processor, it would be resumed in afterInvoke and finished// after all records processed by the downstream tasks. We also suspend the default// actions to avoid repeat executing the empty default operation (namely process// records).controller.suspendDefaultAction();mailboxProcessor.suspend();return;}

MORE_AVAILABLE: 表示 inputProcessor 中还有更多数据可以立即处理。

  1. if (taskIsAvailable()) { return; }: 如果当前任务本身也是可用的(例如,下游没有反压),则直接返回。MailboxProcessor 会很快再次调用 processInput 来处理更多数据。

  2. NOTHING_AVAILABLE: 表示 inputProcessor 当前没有可用的数据。此时,方法不会立即返回,而是会继续检查是否存在反压等情况,可能需要暂停默认动作的执行。

  3. END_OF_RECOVERY: 这是一个不期望在此处出现的状态,表示任务恢复逻辑可能存在问题,因此抛出 IllegalStateException

  4. STOPPED: 表示输入流被强制停止(例如任务被取消,且不需要流干数据)。

    • endData(StopMode.NO_DRAIN): 通知算子链以非排空模式结束处理。

    • return;: 结束当前 processInput 调用。

  5. END_OF_DATA: 表示当前输入流的所有数据都已到达(例如,有限流Source结束)。

    • endData(StopMode.DRAIN): 通知算子链以排空模式结束处理(处理完所有已缓冲的数据)。

    • notifyEndOfData(): 通知 TaskManager 当前任务的数据已结束。

    • return;: 结束当前 processInput 调用。

  6. END_OF_INPUT: 表示该 Task 的所有输入都已经结束。这是一个更强的结束信号。

    • controller.suspendDefaultAction(): 通知 MailboxProcessor 暂停调用 processInput。因为已经没有新的输入了,再继续调用也没有意义。

    • mailboxProcessor.suspend(): 暂停整个 MailboxProcessor 的事件循环。任务此时会等待下游处理完所有数据,并完成最终的 Checkpoint 等操作。

    • return;: 结束当前 processInput 调用。

处理反压和等待逻辑 (当 NOTHING_AVAILABLE 或其他需要等待的情况): 如果 inputProcessor.processInput() 返回 NOTHING_AVAILABLE,或者虽然有数据但任务本身不可用(例如下游反压),代码会进入等待逻辑:

// 如果前面没有returnTaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup();PeriodTimer timer;CompletableFuture<?> resumeFuture;if (!recordWriter.isAvailable()) {timer = new GaugePeriodTimer(ioMetrics.getSoftBackPressuredTimePerSecond());resumeFuture = recordWriter.getAvailableFuture();} else if (!inputProcessor.isAvailable()) {timer = new GaugePeriodTimer(ioMetrics.getIdleTimeMsPerSecond());resumeFuture = inputProcessor.getAvailableFuture();} else if (changelogWriterAvailabilityProvider != null&& !changelogWriterAvailabilityProvider.isAvailable()) {// waiting for changelog availability is reported as busytimer = new GaugePeriodTimer(ioMetrics.getChangelogBusyTimeMsPerSecond());resumeFuture = changelogWriterAvailabilityProvider.getAvailableFuture();} else {// data availability has changed in the meantime; retry immediatelyreturn;}assertNoException(resumeFuture.thenRun(new ResumeWrapper(controller.suspendDefaultAction(timer), timer)));
  1. TaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup();: 获取IO相关的度量指标组。

  2. PeriodTimer timer; CompletableFuture<?> resumeFuture;: 声明计时器和用于恢复的 Future。

  3. 检查输出是否可用 (!recordWriter.isAvailable()):

    • 如果 recordWriter(负责将处理结果写到下游)不可用,说明下游存在反压。

    • timer = new GaugePeriodTimer(ioMetrics.getSoftBackPressuredTimePerSecond());: 启动一个计时器,用于度量由于下游反压导致的等待时间。

    • resumeFuture = recordWriter.getAvailableFuture();: 获取一个 Future,当 recordWriter 再次可用时,该 Future 会完成。

  4. 检查输入处理器是否可用 (!inputProcessor.isAvailable()):

    • 如果 inputProcessor 本身不可用(例如,等待网络缓冲区的到来)。

    • timer = new GaugePeriodTimer(ioMetrics.getIdleTimeMsPerSecond());: 启动一个计时器,用于度量由于上游输入不可用导致的空闲时间。

    • resumeFuture = inputProcessor.getAvailableFuture();: 获取一个 Future,当 inputProcessor 再次可用时,该 Future 会完成。

  5. 检查 Changelog Writer 是否可用:

    • 如果使用了 Changelog State Backend,并且其 changelogWriterAvailabilityProvider 表示不可用。

    • timer = new GaugePeriodTimer(ioMetrics.getChangelogBusyTimeMsPerSecond());: 启动计时器,度量等待 Changelog Writer 的繁忙时间。

    • resumeFuture = changelogWriterAvailabilityProvider.getAvailableFuture();: 获取 Future,等待其可用。

  6. 数据可用性已改变 (else { return; }):

    • 如果以上等待条件都不满足,说明在 inputProcessor.processInput() 调用之后,数据的可用性可能已经发生了变化(例如,新的数据刚刚到达)。此时直接 return,让 MailboxProcessor 立即重试 processInput

  7. 挂起默认动作并等待恢复:

    • controller.suspendDefaultAction(timer): 调用 controllersuspendDefaultAction 方法,并传入之前启动的 timer。这会通知 MailboxProcessor 暂时停止调用 processInputMailboxProcessor 会使用这个 timer 来记录挂起的时间(用于监控和度量)。该方法返回一个 MailboxDefaultAction.Suspension 对象。

    • resumeFuture.thenRun(new ResumeWrapper(controller.suspendDefaultAction(timer), timer)): 当 resumeFuture 完成时(即等待的条件解除,例如下游不再反压或上游数据到达),会执行 ResumeWrapper 中的逻辑。ResumeWrapper 会调用 Suspension 对象的 resume() 方法,这会通知 MailboxProcessor 可以重新开始调用 processInput 了。同时,timer 也会被停止。

    • assertNoException(...): 确保 thenRun 中的操作不会抛出未捕获的异常。

Checkpoint

StreamTask 通过其内部的 MailboxProcessor 和相关的 MailboxExecutor 来发送和处理与 Checkpoint 相关的邮件(即需要在 Task 主线程中执行的 Checkpoint 操作)。

以下是 StreamTask 如何发送和处理与 Checkpoint 相关邮件的关键机制分析:

  1. MailboxProcessor 和 MailboxExecutor:

    • 每个 StreamTask 都有一个 MailboxProcessor 实例 (mailboxProcessor),它负责驱动 Task 的事件循环。
    • StreamTask 可以通过 mailboxProcessor.getMailboxExecutor(priority) 获取一个 MailboxExecutor。这个 MailboxExecutor 提供了 execute(...) 方法,可以将一个 Runnable(封装了 Checkpoint 相关逻辑)作为 Mail 提交到邮箱中。
    • 这些邮件会被 MailboxProcessor 在其主循环中按优先级取出并执行。
  2. SubtaskCheckpointCoordinator:

    • StreamTask 包含一个 SubtaskCheckpointCoordinator 实例 (subtaskCheckpointCoordinator)。这个协调器负责处理 Task 级别的 Checkpoint 逻辑,例如触发操作符的快照、处理 Barrier 对齐、通知 Checkpoint 完成或中止等。
    • 很多 Checkpoint 相关的操作会首先由 SubtaskCheckpointCoordinator 发起或处理,然后它可能会通过 StreamTask 的 MailboxExecutor 将具体的执行步骤提交到邮箱。
  3. actionExecutor:

    • StreamTask 还有一个 StreamTaskActionExecutor 实例 (actionExecutor)。虽然 MailboxExecutor 用于将任务 放入 邮箱,但当 Mail 从邮箱中被取出后,其内部的 Runnable 通常会通过这个 actionExecutor 来实际执行。对于 Checkpoint 相关的操作,这确保了它们在正确的 Task 主线程上下文中运行。

发送 Checkpoint 相关邮件的典型场景和方法:

  • 触发 Checkpoint (triggerCheckpointAsync):

    • 当 JobManager 向 TaskManager 发送触发 Checkpoint 的 RPC 时,Task(通常是 StreamTask 的父类或其本身)会调用 triggerCheckpointAsync 方法。
    • 这个方法会将实际的 Checkpoint 执行逻辑封装成一个 Runnable,并通过 mainMailboxExecutor(一个具有默认优先级的 MailboxExecutor)提交到邮箱。
    • 这样做是为了确保 Checkpoint 的所有阶段(例如调用操作符的 snapshotState)都在 Task 的主线程中执行,从而避免与正常的数据处理流程发生并发冲突。

    StreamTask.java

    // ... existing code ...
    @Override
    public CompletableFuture<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {checkForcedFullSnapshotSupport(checkpointOptions);CompletableFuture<Boolean> result = new CompletableFuture<>();mainMailboxExecutor.execute(() -> {try {// Lock the mailbox to ensure that the checkpoint is not concurrent with other// actionssynchronized (mailboxProcessor) {result.complete(triggerUnfinishedChannelsCheckpoint(checkpointMetaData, checkpointOptions));}} catch (Exception ex) {// Report the failure both via the Future result but also to the mailboxresult.completeExceptionally(ex);throw ex;}},"checkpoint %s with %s",checkpointMetaData,checkpointOptions);return result;
    }
    // ... existing code ...
    

    在上面的代码片段中,mainMailboxExecutor.execute(...) 就是将 Checkpoint 触发逻辑(triggerUnfinishedChannelsCheckpoint)作为邮件发送到邮箱的关键步骤。

  • 通知 Checkpoint 完成 (notifyCheckpointCompleteAsync):

    • 当 Task 完成一个 Checkpoint 并收到 JobManager 的确认后,会调用此方法。
    • 同样,通知操作符 Checkpoint 完成的逻辑也会被封装并通过 MailboxExecutor 提交到邮箱。

    StreamTask.java

    // ... existing code ...
    @Override
    public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {return notifyCheckpointOperation(() -> notifyCheckpointComplete(checkpointId),String.format("checkpoint %d completed", checkpointId));
    }
    // ... existing code ...
    

    而 notifyCheckpointOperation 内部会使用 MailboxExecutor

    StreamTask.java

    // ... existing code ...
    private Future<Void> notifyCheckpointOperation(RunnableWithException runnable, String description) {CompletableFuture<Void> result = new CompletableFuture<>();mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).execute(() -> {try {runnable.run();} catch (Exception ex) {result.completeExceptionally(ex);throw ex;}result.complete(null);},description);return result;
    }
    // ... existing code ...
    

    这里使用了 TaskMailbox.MAX_PRIORITY,表明这是一个高优先级的操作。

  • 通知 Checkpoint 中止 (notifyCheckpointAbortAsync):

    • 当一个 Checkpoint 因为各种原因(超时、错误、被新的 Checkpoint 取代)需要中止时,会调用此方法。
    • 中止逻辑,包括清理操作符可能产生的临时状态,也会通过邮件发送到邮箱执行。

    StreamTask.java

    // ... existing code ...
    @Override
    public Future<Void> notifyCheckpointAbortAsync(long checkpointId, long latestCompletedCheckpointId) {return notifyCheckpointOperation(() -> {if (latestCompletedCheckpointId > 0) {notifyCheckpointComplete(latestCompletedCheckpointId);}if (isCurrentSyncSavepoint(checkpointId)) {throw new FlinkRuntimeException("Stop-with-savepoint failed.");}subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, this::isRunning);},String.format("checkpoint %d aborted", checkpointId));
    }
    // ... existing code ...
    

    同样,它也使用了 notifyCheckpointOperation 方法,将中止逻辑放入邮箱。

  • 处理 Barrier 对齐时的 Timer 回调:

    • 当使用 Barrier 对齐策略时,如果一个 InputGate 等待某个 Channel 的 Barrier 超时,SubtaskCheckpointCoordinator 会注册一个 Timer。当这个 Timer 触发时,其回调逻辑(例如取消 Checkpoint 或强制触发 Checkpoint)也会被封装成邮件并通过 MailboxExecutor 提交到邮箱执行。
    • 在 BarrierAlignmentUtil.createRegisterTimerCallback 中可以看到相关的逻辑,它会返回一个 BiConsumer<Long, Long>,这个 Consumer 内部会使用 mainMailboxExecutor 来执行超时处理。
  • Source Task 的特定行为:

    • 例如在 SourceOperatorStreamTask 中,notifyCheckpointAbortAsync 和 notifyCheckpointSubsumedAsync 方法会直接使用 mainMailboxExecutor 来执行清理 Checkpoint 的逻辑。

    SourceOperatorStreamTask.java

    // ... existing code ...
    @Override
    public Future<Void> notifyCheckpointAbortAsync(long checkpointId, long latestCompletedCheckpointId) {mainMailboxExecutor.execute(() -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);return super.notifyCheckpointAbortAsync(checkpointId, latestCompletedCheckpointId);
    }@Override
    public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId) {mainMailboxExecutor.execute(() -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);return super.notifyCheckpointSubsumedAsync(checkpointId);
    // ... existing code ...
    

总结:

StreamTask 依赖其 MailboxProcessor 和通过它获取的 MailboxExecutor 来确保所有与 Checkpoint 相关的关键操作(触发、通知完成/中止、Barrier 处理等)都在 Task 的主事件循环线程中串行执行。这避免了复杂的并发控制,保证了 Checkpoint 过程与正常数据处理流程的一致性和正确性。当需要执行一个 Checkpoint 相关操作时,通常会将其封装为一个 Runnable,然后通过 MailboxExecutor.execute() 方法将其作为一封邮件提交到邮箱队列中,等待 MailboxProcessor 的调度执行。


文章转载自:

http://hZHYiXp7.wpqwk.cn
http://grq6YXEU.wpqwk.cn
http://VnWmMAo4.wpqwk.cn
http://chR8l3HJ.wpqwk.cn
http://5bNETSvf.wpqwk.cn
http://o7mbfaYc.wpqwk.cn
http://iIfJDQgJ.wpqwk.cn
http://D75n7gJe.wpqwk.cn
http://c9TcJjAX.wpqwk.cn
http://okWBIrNB.wpqwk.cn
http://O6Xt5Fhe.wpqwk.cn
http://ARca9gzv.wpqwk.cn
http://VMeiLBVi.wpqwk.cn
http://BmSb6rcs.wpqwk.cn
http://2JHp6aS9.wpqwk.cn
http://J6G0ZOFo.wpqwk.cn
http://10sVldlk.wpqwk.cn
http://W6doSKyo.wpqwk.cn
http://7tQlg4Zq.wpqwk.cn
http://fjNHPSUq.wpqwk.cn
http://WSEkEpUb.wpqwk.cn
http://IfpZfz7p.wpqwk.cn
http://45ua8uUz.wpqwk.cn
http://IP9BvLFh.wpqwk.cn
http://nX2s7fDY.wpqwk.cn
http://gQUXRHKt.wpqwk.cn
http://6UzlpUWn.wpqwk.cn
http://sU8zhb8g.wpqwk.cn
http://Eyx3oEKL.wpqwk.cn
http://4N0O05zf.wpqwk.cn
http://www.dtcms.com/a/372844.html

相关文章:

  • ActiveMQ classic ,artemis ,artemis console ,nms clients,cms client详解
  • 【论文阅读】Far3D: Expanding the Horizon for Surround-view 3D Object Detection
  • Three.js使用outlinePass描边后,描边颜色和背景叠加变淡
  • GPT系列--类GPT2源码剖析
  • 反编译分析C#闭包
  • DTO与POJO:核心差异与最佳实践
  • #C语言——刷题攻略:牛客编程入门训练(九):攻克 分支控制(三)、循环控制(一),轻松拿捏!
  • Android 中 自定义 RecyclerView 控件限制显示高度
  • Codesy中的UDP发送信息
  • Hadoop进程:深入理解分布式计算引擎的核心机制
  • SQL Server死锁排查实战指南
  • 自学嵌入式第三十八天:数据库
  • 【开题答辩全过程】以 基于springboot的酒店管理系统设计与实现为例,包含答辩的问题和答案
  • SpringBoot控制层接收参数处理、Logback日志入门和使用
  • Python快速入门专业版(十三):Python变量进阶:全局变量与局部变量(含global关键字用法)
  • 深度学习(二):神经元与神经网络
  • 如何在不同 iOS 设备上测试和上架 uni-app 应用 实战全流程解析
  • iOS 开发全流程实战 基于 uni-app 的 iOS 应用开发、打包、测试与上架流程详解
  • [论文阅读] 人工智能 + 软件工程 | 大模型破局跨平台测试!LLMRR让iOS/安卓/鸿蒙脚本无缝迁移
  • 汇编基础1
  • CSS @scope与12个降低css冲突方法
  • pytorch 中是如何实现embeding 的
  • 【.Net技术栈梳理】02-核心框架与运行时(GC管理)
  • 洗完头后根据个人需求选择合适的自然风干 | 电吹风 (在保护发质的同时,也能兼顾到生活的便利和舒适。)
  • 人才教育导向下:老年生活照护实训室助力提升学生老年照护服务能力
  • Typescript入门-类型断言讲解
  • 使用Pycharm进行远程ssh(以Featurize为例)
  • 云原生:微服务与Serverless指南
  • 时序数据库选型指南:大数据与物联网时代下的深度剖析与 Apache IoTDB 实践
  • Python 实现 HTML 转 Word 和 PDF