当前位置: 首页 > news >正文

15.dispatcherRunner启动

dispatcherRunner

职责
Flink 中,dispatcherRunner 主要负责作业提交、作业分发以及创建 JobMaster。它还集成了高可用机制HA),保证在 failover 场景下作业可以恢复。

创建入口
通过 DispatcherRunnerFactory.createDispatcherRunner() 构建。不同部署模式(如 standaloneYarnKubernetes)会用不同的 factory 实现(如 DefaultDispatcherRunnerFactory)。

DefaultDispatcherRunnerFactory

作用
专门用于生成 DefaultDispatcherRunner,该类同时继承了 DispatcherRunnerLeaderContender,也就是说它是个具备高可用选举能力的 dispatcher。

Leader 选举与启动

  1. leaderElection.startLeaderElection(this)
    start() 方法启动选举。
  2. grantLeadership(UUID leaderSessionID)
    获得 leader 权限时触发,启动新的 DispatcherLeaderProcess。
    这里内部会构建:
    • SessionDispatcherLeaderProcess (典型模式下)
    • 实际调用 AbstractDispatcherLeaderProcess 管理作业恢复、JobGraph 生成等。

源码

dispatcherRunner =dispatcherRunnerFactory.createDispatcherRunner(//standalone模式下,直接获得leader组件。就是给container赋默认uuid值highAvailabilityServices.getDispatcherLeaderElection(),fatalErrorHandler,//高可用模式下作业的恢复。等后续高可用模式并且作业提交的时候,可以看下。目前启动用不上。new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),ioExecutor,//需要rpc通信rpcService,partialDispatcherServices);

dispatcherRunner 的运行机制

  • resourceManager 类似:
    • 都作为集群启动的核心组件
    • 都具备高可用选举机制。
    • 获得 leader 后启动内部流程(如资源管理、作业调度等)。
  • 但区别是:
    • dispatcher 专注于作业管理(提交、恢复、分发到 jobMaster)。
    • resourceManager 负责资源调度Slot 管理、TaskExecutor 注册等)。

入口源码

@Overridepublic DispatcherRunner createDispatcherRunner(LeaderElection leaderElection,FatalErrorHandler fatalErrorHandler,JobPersistenceComponentFactory jobPersistenceComponentFactory,Executor ioExecutor,RpcService rpcService,PartialDispatcherServices partialDispatcherServices)throws Exception {final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =dispatcherLeaderProcessFactoryFactory.createFactory(jobPersistenceComponentFactory,ioExecutor,rpcService,partialDispatcherServices,fatalErrorHandler);return DefaultDispatcherRunner.create(leaderElection, fatalErrorHandler, dispatcherLeaderProcessFactory);}@Overridepublic void grantLeadership(UUID leaderSessionID) {runActionIfRunning(() -> {LOG.info("{} was granted leadership with leader id {}. Creating new {}.",getClass().getSimpleName(),leaderSessionID,DispatcherLeaderProcess.class.getSimpleName());//这里就和前面的resourceManager一致。再提一下,主要启动 SessionDispatcherLeaderProcess。因为dispatcher还包含了作业恢复等操作,所以还有一个AbstractDispatcherLeaderProcess用于dispatcherLeader的启动。这个类启动之前会先启动对应的jobGraph等startNewDispatcherLeaderProcess(leaderSessionID);});}

总结

目前,JobManager 相关的组件中,已分析了 ResourceManagerDispatcherMetrics 等核心部分。剩余未讲解的包括 MetricsReportBlobServerHaServicesJobMaster 等组件。以下是这部分的简要总结与梳理:

MetricsReport

  • 作用
    负责将集群和作业的各类指标(如 CPU 使用率、内存占用、Task 运行状态等)主动上报到外部系统(如 Prometheus、JMX)。
    与 Dispatcher 和 ResourceManager 内部的 Metrics 模块不同,MetricsReport 属于指标数据的输出侧,通过配置的 reporter 实现对外发布。
  • 特点
    • 内部采集:通过 JMX Bean、系统监控等采集 CPU、内存等信息。
    • 对外上报:采用定时主动推送机制,而非 RPC 被动查询(与 Akka 交互的内部监控不同)。

BlobServer

  • 作用
    提供作业资源的分发与缓存。Flink 作业提交过程中所依赖的 JAR 包、配置文件等资源会上传到 BlobServer,并存储在本地目录下,供 TaskExecutor 下载使用。
  • 本质
    就是一个简化版的文件服务,为作业资源提供上传、下载、缓存能力。

HaServices

  • 作用
    提供高可用服务,包括:
    • Leader 选举(如 Dispatcher、ResourceManager 等的 HA 切换)。
    • 心跳管理(与 TaskExecutor 保持连接与状态监控)。
    • JobGraph 持久化(用于作业恢复)。
  • 说明
    虽然 HaServices 是在 JobManager 中初始化的,但它不仅服务于 JobManager 组件本身,也用于与 TaskExecutor 保持集群通信及作业状态一致性。
  • 后续
    HaServices 更适合在 TaskExecutor 相关部分展开细讲,例如如何参与心跳机制及失联检测。

JobMaster

  • 作用
    JobMaster 是 Flink 中每个作业的核心调度器,由 Dispatcher 在作业提交时动态创建。
    负责:
    • 管理作业生命周期;
    • 调度任务到各 TaskExecutor;
    • 监控任务运行状态;
    • 汇报作业进度及失败信息。
  • 关键点
    • 一个作业对应一个独立的 JobMaster 实例;
    • 与 Dispatcher、ResourceManager 通过 RPC 保持通信;
    • 管理 Slot 分配与 Task 启动,调度逻辑完全在 JobMaster 内部控制。

总结整理(模块职责)

组件作用说明
MetricsReport指标采集与对外推送主动推送(如 Prometheus、JMX)
BlobServer作业资源分发与缓存本地文件服务,供 TaskExecutor 下载
HaServices高可用与通信服务提供选举、心跳、作业持久化支持
JobMaster作业调度与管理每个作业一个,负责调度与状态管理

补充:Future 阻塞机制

Flink 框架整体采用 CompletableFuture 来管理异步线程和流程控制。不同于传统基于 NIOIO 构建的服务端程序(通常在主线程中依靠一个无限循环来保持服务常驻),Flink 并不依赖 while(true) 结构维持主流程。

returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();

这里的 .get() 方法会阻塞当前主线程,直到整个集群终止(正常关闭或发生致命异常)。这使得主线程可以简洁地等待集群结束,而不需要显式的死循环来维持进程存活。

以下是关键源码片段:

public static void  runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();try {clusterEntrypoint.startCluster();} catch (ClusterEntrypointException e) {LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),e);System.exit(STARTUP_FAILURE_RETURN_CODE);}int returnCode;Throwable throwable = null;try {//这一步保证服务端不退出returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();} catch (Throwable e) {throwable = ExceptionUtils.stripExecutionException(e);returnCode = RUNTIME_FAILURE_RETURN_CODE;}LOG.info("Terminating cluster entrypoint process {} with exit code {}.",clusterEntrypointName,returnCode,throwable);System.exit(returnCode);}
http://www.dtcms.com/a/282116.html

相关文章:

  • 图机器学习(10)——监督学习中的图神经网络
  • LLM大语言模型不适合统计算数,可以让大模型根据数据自己建表、插入数据、编写查询sql统计
  • ether.js的calldata
  • 探索阿里云DMS:解锁高效数据管理新姿势
  • 【WRFDA数据教程第一期】LITTLE_R 格式详细介绍
  • 常用 Benchmark 总结-GPT 4.1、GPT 4.5、DeepSeek模型
  • 【游戏引擎之路】登神长阶(十七):Humanoid动画——长风破浪会有时,直挂云帆济沧海
  • 联网工人安全解决方案:技术赋能下的安全新范式
  • Django REST Framework 入门指南:从 0 到 1 实现 RESTful API
  • 【LLM】OpenRouter调用Anthropic Claude上下文缓存处理
  • cudaOccupancyMaxActiveBlocksPerMultiprocessor配置内核的线程块大小
  • Linux运维新手的修炼手扎之第18天
  • 二刷 黑马点评 分布式锁-redission
  • 【芯片设计中的WDT IP:守护系统安全的电子警犬】
  • HDFS基本操作训练(创建、上传、下载、删除)
  • CSS面试题及详细答案140道之(21-40)
  • 智租换电与中国电信达成战略合作!共筑数字能源新基建
  • LeetCode|Day15|125. 验证回文串|Python刷题笔记
  • GaussDB 预写日志回收参数设置
  • Uniapp中双弹窗为什么无法显示?
  • Java虚拟机——JVM
  • uniapp各端通过webview实现互相通信
  • UniApp 多端人脸认证图片上传实现
  • AI Agent:重构智能边界的终极形态——从技术内核到未来图景全景解析
  • uniapp写好的弹窗组件
  • 【uni-ui】hbuilderx的uniapp 配置 -小程序左滑出现删除等功能
  • kafka3.6下载安装(传统架构/KRaft模式)+实例测试
  • uniapp小程序实现地图多个标记点
  • 《设计模式之禅》笔记摘录 - 7.中介者模式
  • C#中Lambda表达式与=>运算符