当前位置: 首页 > news >正文

Flink task、Operator 和 UDF 之间的关系

要真正驾驭 Flink 并构建出高效、稳定、可扩展的流处理应用,仅仅停留在 API 的表面使用是远远不够的。深入理解其内部的运行机制,洞悉数据从代码到分布式执行的完整生命周期,以及明晰各个核心组件之间错综复杂而又协同工作的关系,对于我们进行性能调优、故障排查以及设计更优的应用程序架构至关重要。

本文将带领大家一起揭开 Flink 的神秘面纱,我们将首先详细梳理一个 Flink 作业从客户端提交到在 TaskManager 上具体执行的完整启动流程,理解 StreamGraphJobGraph 到 ExecutionGraph 的演变。

紧接着,我们将深入剖析 Flink 中那些我们既熟悉又可能感到困惑的核心概念,如 DataStream 如何通过 Transformation 承载用户的 UDF,最终又是如何在 StreamOperator 和 StreamTask 中焕发生机,以及它们之间是如何相互关联、协同工作的。希望通过这次探索,能帮助构建起对 Flink 内部原理更为清晰和系统的认识。

Flink 任务启动流程

  1. 客户端(Client)准备与提交作业:

    • 用户通过 Flink 客户端(例如,执行 flink run 命令的 CLI,或者通过 REST API 提交的程序,或者在 IDE 中直接运行)提交一个 Flink 应用程序(例如,用 DataStream API 编写的程序)。
    • 在客户端,用户的程序(例如 StreamExecutionEnvironment.execute())首先会将用户定义的 DataStream 操作转换为一个 StreamGraphStreamGraph 是作业的最初的、面向流的逻辑表示,它包含了所有的算子、UDF、并行度设置、数据流向等信息。
    • 客户端随后将这个 StreamGraph 提交给 JobManager(具体来说是 JobManager 中的 Dispatcher 组件)。
  2. JobManager 接收与处理作业:【stream Graph转化Job Graph 版本有变动,但是通信链路是一致的】

    • Dispatcher: JobManager 中的 Dispatcher 接收到 StreamGraph 后,会为这个作业启动一个新的 JobMaster。Dispatcher 负责作业的提交、JobMaster 的生命周期管理,并提供 REST 接口。
    • JobMaster: 每个作业都有其专属的 JobMaster。JobMaster 负责该作业的整个生命周期管理。
      • StreamGraph -> JobGraph: JobMaster 首先将接收到的 StreamGraph 转换为 JobGraphJobGraph 是一个更通用的、并行的作业表示,它将 StreamGraph 中的算子链(Operator Chains)优化考虑在内,并确定了 JobVertex(逻辑上的并行算子)。每个 JobVertex 对应 StreamGraph 中的一个或多个(链式)算子。
        • 如 docs/content.zh/docs/internals/job_scheduling.md 中提到:“JobManager 会接收到一个 JobGraph,用来描述由多个算子顶点 (JobVertex) 组成的数据流图”。
      • JobGraph -> ExecutionGraph: JobMaster 接着将 JobGraph 转换为 ExecutionGraphExecutionGraph 是作业的物理执行计划,是 JobGraph 的并行化版本。
        • 它将每个 JobVertex 根据其并行度展开为多个并行的 ExecutionVertex
        • 每个 ExecutionVertex 代表了一个逻辑算子(或算子链)的一个并行实例。
        • ExecutionGraph 中的每个 ExecutionVertex 会有一个或多个 Execution 对象来跟踪其执行尝试(例如,初次执行、故障恢复后的重试)。
        • ExecutionGraph 是 JobMaster 调度和监控作业执行的核心数据结构。
    • 调度器 (Scheduler): JobMaster 内部的调度器负责将 ExecutionGraph 中的任务(具体来说是 Execution 对象代表的执行尝试)部署到可用的 TaskManager Slot 上。
      • 调度器会向 ResourceManager (如果使用了如 YARN, Kubernetes 等资源管理器,或者是 Flink 自身的 Standalone ResourceManager) 请求所需的 Task Slot。
      • 一旦 Slot 分配成功,调度器就会将任务部署到相应的 TaskManager。
  3. TaskManager 执行任务:

    • TaskManager 接收到 JobManager (JobMaster) 分配的任务部署指令后,会在其管理的某个空闲的 Task Slot 中为该任务启动执行。
    • Slot 与线程: 一个 Task Slot 代表了 TaskManager 提供的一份固定的计算资源(通常与 CPU核心数相关)。一个 Task Slot 会运行一个或多个(如果启用了 Slot Sharing Group 且任务属于同一共享组)任务,每个任务(Task)都在其自己的独立线程中执行。 这个线程本身不属于槽而属于task,Slot 是资源的划分,线程是执行的载体。
    • StreamTask: 在 TaskManager 内部,每个被部署的流处理任务的实际体现就是一个 StreamTask(或其特定子类,如 SourceTaskOneInputStreamTaskTwoInputStreamTask 等)的实例。
      • 如 docs/content/docs/internals/task_lifecycle.md 所述: "The StreamTask is the base for all different task sub-types in Flink's streaming engine."
      • StreamTask 负责初始化和运行其内部的 StreamOperator 链(OperatorChain),处理输入数据,执行用户定义的函数 (UDF),并产生输出数据。
      • StreamTask 的生命周期包括创建、部署、运行、取消、完成或失败等状态。
      • 如 flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java 文件所示,StreamTask 包含了大量的逻辑来管理算子链 (operatorChain)、配置 (configuration)、状态后端 (stateBackend)、检查点协调 (subtaskCheckpointCoordinator) 等。

关键补充点总结:

  • StreamGraph: 在客户端生成,是最初的逻辑图。
  • Dispatcher 与 JobMaster: JobManager 内部组件,Dispatcher 负责接收作业并为每个作业启动一个 JobMaster。JobMaster 负责单个作业的完整生命周期。
  • JobGraph: 由 StreamGraph 转换而来,是并行的逻辑图。
  • ExecutionGraph: 由  JobGraph 转换而来,是物理执行图,包含 ExecutionVertex 和 Execution
  • ResourceManager: 负责 Task Slot 的分配。
  • Task Slot 与线程: Slot 是资源单位,Task 在 Slot 内的独立线程中运行。
  • OperatorChainStreamTask 内部可以运行一个算子链,这是 Flink 的一项重要优化。

Flink 核心概念关系:从 API 到执行

  1. DataStream API 与 UDF (User-Defined Function - 用户定义函数):

    • 起点: 用户通过 DataStream API (例如 dataStream.map(myMapFunction).filter(myFilterFunction)) 来声明式地构建数据处理流程。
    • 业务逻辑UDF (例如 MyMapFunctionMyFilterFunctionKeyedProcessFunction) 是用户编写的包含具体业务处理逻辑的 Java/Scala 函数或类。它们是数据转换的核心。
  2. Transformation (转换):

    • 逻辑蓝图: 每当在 DataStream 上调用一个操作(如 mapfilterkeyBy),就会创建一个或多个 Transformation 对象。
    • Transformation 树是 Flink 作业的逻辑表示或蓝图。它详细描述了数据如何从一个操作流向下一个操作,包括操作类型、应用的 UDF、输入/输出数据类型、并行度设置等。它本身不执行计算。
  3. 逻辑 Operator (算子) 与逻辑 Subtask (子任务):

    • 逻辑处理单元: 在 Transformation 层面,我们可以认为每个转换操作对应一个逻辑 Operator (例如 Map Operator, Filter Operator)。
    • 并行实例 (逻辑): 如果一个逻辑 Operator 的并行度(parallelism)被设置为 N,那么在逻辑规划中,这个 Operator 就拥有 N 个并行的逻辑实例,这些逻辑实例通常被称为 Subtask。每个逻辑 Subtask 代表了该 Operator 的一个独立、并行的处理单元。
  4. StreamOperator (运行时算子/流算子):

    • 物理执行体StreamOperator 是 Flink 运行时的核心组件,是 Transformation 中定义的逻辑算子在物理执行时的具体实现和承载体。例如,StreamMapKeyedProcessOperator 都是 StreamOperator 的具体实现。
    • 封装 UDFStreamOperator 负责封装用户提供的 UDF。它管理 UDF 的生命周期(如调用 open()close() 方法)并调用 UDF 的核心处理方法(如 map()filter()processElement())来处理数据。
    • AbstractUdfStreamOperator 是许多包含 UDF 的 StreamOperator 的通用基类,它简化了 UDF 的管理。
  5. StreamTask (流任务/物理任务):

    • 执行单元StreamTask 是 Flink 在 TaskManager 上进行物理执行的基本单元。它是一个实现了 java.lang.Runnable 的对象,在 TaskManager 的一个 Slot 中的一个独立线程内运行。
    • 核心职责StreamTask 负责:
      • 管理其内部一个或多个 StreamOperator 的完整生命周期(初始化、打开、运行数据处理循环、响应 Checkpoint、关闭、清理)。
      • 处理数据的输入(从网络或上游 Task)和输出(到网络或下游 Task)。
      • 协调 Checkpoint 过程。
    • 执行入口StreamTask 的 invoke() 方法是其执行逻辑的入口点,它启动了数据处理的主循环。
  6. 算子链 (Operator Chaining) 的影响:

    • 优化: Flink 会尽可能地将满足条件的多个逻辑 Operator (及其对应的逻辑 Subtask) 链接(chain)在一起。例如,连续的 map -> filter -> map 操作,如果并行度相同且数据传输直接(非重分区),它们通常会被链接。
    • 结果:
      • 被链接起来的一系列 StreamOperator 实例会运行在同一个 StreamTask 内部,由一个 OperatorChain 对象管理。
      • 这意味着一个 StreamTask 可能只包含一个单独的 StreamOperator (如果没有发生链接),或者包含一串链式连接的 StreamOperator
      • 这样做能显著减少线程切换、数据序列化/反序列化以及网络传输的开销,提升性能。

总结关系流程:

  1. 用户使用 DataStream API 编写代码,并提供 UDF 来定义业务逻辑。
  2. 这些 API 调用会构建一个 Transformation 树,这是作业的逻辑蓝图。
  3. Flink 编译器将 Transformation 树转换为物理执行图 (JobGraph -> ExecutionGraph):
    • 每个逻辑 Operator (在 Transformation 中定义) 根据其并行度被实例化为多个逻辑 Subtask
    • 满足条件的逻辑 Subtask (来自不同的逻辑 Operator) 会被优化策略链接(chain)起来。
  4. 在运行时 (TaskManager):
    • 每个(可能经过链接的)Subtask 序列,作为一个整体,被调度为一个 StreamTask 实例,并在一个独立的线程中执行。
    • StreamTask 内部运行着一个或多个(如果发生链接,则形成 OperatorChainStreamOperator 实例。
    • 每个 StreamOperator 实例则封装并调用用户编写的 UDF 来对流经它的数据执行具体的业务逻辑处理。

结合源码说明

我们来看一些关键的源码片段来理解这个流程:

  1. StreamTask 的构造与初始化: 当 TaskManager 接收到部署任务的指令后,会创建 StreamTask 实例。

    // ... existing code ...
    public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>implements TaskInvokable,CheckpointableTask,CoordinatedTask,AsyncExceptionHandler,ContainingTaskDetails {// ... existing code ...protected StreamTask(Environment environment,@Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,StreamTaskActionExecutor actionExecutor,TaskMailbox taskMailbox)throws Exception {this.environment = Preconditions.checkNotNull(environment);this.configuration = new StreamConfig(environment.getTaskConfiguration());this.recordWriter = createRecordWriterDelegate(configuration, environment);this.resourceCloser = new AutoCloseableRegistry();this.mailboxProcessor =new MailboxProcessor(this::processInput,taskMailbox,actionExecutor,resourceCloser,this::shouldBeTerminated,this::handleAsyncExceptionDuringNormalExecution);
    // ... existing code ...this.asyncOperationsThreadPool =MdcUtils.scopeToJob(getEnvironment().getJobID(),new ThreadPoolExecutor(0,configuration.getMaxConcurrentCheckpoints() + 1,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler)));
    // ... existing code ...this.stateBackend = createStateBackend();this.checkpointStorage = createCheckpointStorage(stateBackend);
    // ... existing code ...this.subtaskCheckpointCoordinator =new SubtaskCheckpointCoordinatorImpl(checkpointStorageAccess,getName(),actionExecutor,getAsyncOperationsThreadPool(),environment,this,this::prepareInputSnapshot,configuration.getMaxConcurrentCheckpoints(),channelStateWriter,configuration.getConfiguration().get(CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH),BarrierAlignmentUtil.createRegisterTimerCallback(mainMailboxExecutor, systemTimerService),environment.getTaskStateManager().getFileMergingSnapshotManager());
    // ... existing code ...
    }
    

    在构造函数中,StreamTask 会初始化运行环境、配置、状态后端、Checkpoint 存储和协调器等。

  2. StreamTask 的执行入口 invoke(): 这是 TaskManager 启动 Task 后调用的核心方法。

    // ... existing code ...
    @Override
    public final void invoke() throws Exception {SubTaskInitializationMetricsBuilder initializationMetrics =SubTaskInitializationMetricsBuilder.create(getEnvironment().getMetricGroup());final long initializationStarted = SystemClock.getInstance().absoluteTimeMillis();initializationMetrics.addTimestampMetric(INITIALIZATION_START_TIMESTAMP, initializationStarted);// 初始化 OperatorChain,这里会创建 StreamOperator 实例// RegularOperatorChain 或 FinishedOperatorChaintry {operatorChain =getEnvironment().getTaskStateManager().isTaskDeployedAsFinished()? new FinishedOperatorChain<>(this, recordWriter): new RegularOperatorChain<>(this, recordWriter);mainOperator = operatorChain.getMainOperator();getEnvironment().getTaskStateManager().getRestoreCheckpointId().ifPresent(restoreId -> latestReportCheckpointId = restoreId);// task specific initialization,调用子类实现的 init() 方法init();configuration.clearInitialConfigs();// save the work of reloading state, etc, if the task is already canceledensureNotCanceled();// -------- Invoke --------LOG.debug("Invoking {}", getName());// we need to make sure that any triggers scheduled in open() cannot be// executed before all operators are opened// 恢复状态并打开算子CompletableFuture<Void> allGatesRecoveredFuture =actionExecutor.call(() -> restoreStateAndGates(initializationMetrics));// Run mailbox until all gates will be recovered.// 启动邮箱处理循环,这是任务处理数据的主要逻辑mailboxProcessor.runMailboxLoop();// ... existing code ...// make sure this is executed in any case!LOG.debug("Finished task {}", getName());} finally {// ... cleanup ...actionExecutor.runThrowing(() -> {// only set the StreamTask to not running after all operators have been// finished!
    // ... existing code ...disableInterruptOnCancel();// ... existing code ...// clean up everything we initializedisRunning = false;// ... existing code ...try {resourceCloser.close();} catch (Throwable t) {Exception e = t instanceof Exception ? (Exception) t : new Exception(t);throw firstOrSuppressed(e, cancelException);}}
    }
    

    在 invoke() 方法中:

    • 创建 OperatorChain (RegularOperatorChain 或 FinishedOperatorChain),它包含了这个 StreamTask 要执行的一个或多个 StreamOperator
    • 调用 init() 方法进行特定于任务类型的初始化(例如,SourceOperatorStreamTask 会在这里启动 SourceReader)。
    • 调用 restoreStateAndGates(),其中会调用 operatorChain.initializeStateAndOpenOperators()
  3. Operator 的初始化和开OperatorChain 的 initializeStateAndOpenOperators 方法会遍历链上的所有算子,调用它们的 initializeState() 和 open() 方法。

    // ... existing code ...operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer(initializationMetrics));initializeStateEndTs = SystemClock.getInstance().absoluteTimeMillis();
    // ... existing code ...
    

    而 createStreamTaskStateInitializer 会创建一个 StreamTaskStateInitializerImpl 实例,用于初始化算子的状态。

    // ... existing code ...
    public StreamTaskStateInitializer createStreamTaskStateInitializer(SubTaskInitializationMetricsBuilder initializationMetrics) {InternalTimeServiceManager.Provider timerServiceProvider =configuration.getTimerServiceProvider(getUserCodeClassLoader());return new StreamTaskStateInitializerImpl(getEnvironment(),stateBackend,
    // ... existing code ...
    
  4. UDF 的生命周期调用: 以 AbstractUdfStreamOperator 为例,它的 initializeState() 和 open() 方法会进一步调用 UDF 的相应方法。

    // ... existing code ...
    @Override
    public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);StreamingFunctionUtils.restoreFunctionState(context, userFunction);
    }@Override
    public void open() throws Exception {super.open();FunctionUtils.openFunction(userFunction, DefaultOpenContext.INSTANCE);
    }@Override
    public void finish() throws Exception {super.finish();if (userFunction instanceof SinkFunction) {((SinkFunction<?>) userFunction).finish();}
    }@Override
    public void close() throws Exception {super.close();FunctionUtils.closeFunction(userFunction);
    }
    // ... existing code ...
    

    这里 userFunction 就是用户定义的 UDF。FunctionUtils.openFunction 会调用 UDF 的 open() 方法,并传入 RuntimeContext

  5. 数据处理循环StreamTask 的 mailboxProcessor.runMailboxLoop() 启动后,会不断调用 processInput() 方法(如果邮箱中有待处理的邮件或默认操作可以执行)。

    // ... existing code ...
    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {DataInputStatus status = inputProcessor.processInput();switch (status) {case MORE_AVAILABLE:if (taskIsAvailable()) {return;}break;case NOTHING_AVAILABLE:break;case END_OF_RECOVERY:throw new IllegalStateException("We should not receive this event here.");
    // ... existing code ...
    

    inputProcessor.processInput() 会从输入源读取数据,并通过 OperatorChain 将数据传递给第一个 StreamOperator,然后数据会在算子链中依次处理,每个算子会调用其内部 UDF 的处理逻辑(如 map()filter()processElement())。

文档中的相关描述

  • Task Lifecycle/flink/docs/content/docs/internals/task_lifecycle.md 描述了 StreamTask 和 Operator 的生命周期。

    The StreamTask is the base for all different task sub-types in Flink's streaming engine. ... OPERATOR::setup -> UDF::setRuntimeContext OPERATOR::initializeState OPERATOR::open -> UDF::open OPERATOR::processElement -> UDF::run

  • Flink Architecture - Tasks and Operator Chainsflink/docs/content/docs/concepts/flink-architecture.md

    For distributed execution, Flink chains operator subtasks together into tasks. Each task is executed by one thread. Chaining operators together into tasks is a useful optimization...

通过以上分析,应该对 Flink 任务的启动流程以及 SubtaskStreamTaskOperator 和 UDF 之间的关系有了更清晰的理解。

StreamTask 是核心的执行单元,它承载了 Operator,而 Operator 又驱动着 UDF 的执行。整个过程由 JobManager 调度,并在 TaskManager 上实际运行。

相关文章:

  • 【论文解读】OmegaPRM:MCTS驱动的自动化过程监督,赋能LLM数学推理新高度
  • [学习] 牛顿迭代法:从数学原理到实战
  • 50天50个小项目 (Vue3 + Tailwindcss V4) ✨ | AnimatedNavigation(动态导航)
  • 【Spring源码核心篇-08】spring中配置类底层原理和源码实现
  • Vite 预构建机制深度解析(Vite缺点之一)
  • BeckHoff <--> Festo Cmmt AS驱动器 EtherCat通讯
  • C++基础算法————二分查找
  • JavaScript 事件循环
  • CH579 CH573 CH582 CH592 蓝牙主机(Central)实例应用讲解(二)——Central消息事件机制初探
  • Python学习笔记面向对象编程
  • 钉钉机器人-自定义卡片推送快速入门
  • keil一键烧录boot和app程序
  • jojojojojo
  • Hexo-butterfly友情链接页面优化
  • MySQL-DQL数据查询语句深度解析与实战指南
  • 保护地线与串扰-信号完整性分析
  • day 51 python打卡
  • Redis事务与驱动的学习(一)
  • Unity Demo-3DRaceCar详解
  • MiniCPM4端侧AI模型
  • 嘉兴做微网站设计/搜索关键词
  • 溧阳网站优化/2022搜索引擎
  • 平台推广员是干嘛的/账号seo是什么
  • 内蒙古集宁建设厅官方网站/刷排名seo软件
  • wordpress名著/厦门seo排名扣费
  • 网站流水怎么做/怎么买域名自己做网站