当前位置: 首页 > news >正文

16.TaskExecutor启动

TaskExecutor

  • TaskExecutor 是 Flink 中实际执行任务的组件,负责与 ResourceManager 通信、初始化自身资源并上报,接收客户端提交的 JobGraph 并进行执行。

    本文重点分析 TaskExecutor 的启动与初始化流程。

TaskManagerRunner

  • Standalone 模式下,TaskManagerRunner 作为 TaskExecutor 的启动入口,核心执行流程为 runTaskManagerProcessSecurely 方法,最终通过 runTaskManager 方法正式启动。

    public static int runTaskManager(Configuration configuration, PluginManager pluginManager)throws Exception {final TaskManagerRunner taskManagerRunner;try {//初始化一个 taskManagertaskManagerRunner =new TaskManagerRunner(configuration,pluginManager,//TaskExecutorService适配器,可以理解在TaskExecutor和TaskManagerRunner之间的设配器TaskManagerRunner::createTaskExecutorService);//调用 taskManager.start 方法taskManagerRunner.start();} catch (Exception exception) {throw new FlinkException("Failed to start the TaskManagerRunner.", exception);}try {return taskManagerRunner.getTerminationFuture().get().getExitCode();} catch (Throwable t) {throw new FlinkException("Unexpected failure during runtime of TaskManagerRunner.",ExceptionUtils.stripExecutionException(t));}}public void start() throws Exception {synchronized (lock) {//这一步是初始化服务的前期服务,保活rpc通信等startTaskManagerRunnerServices();//这一步才是启动taskExecutortaskExecutorService.start();}}//核心方法:该方法完成了资源初始化与通信组件搭建
    private void startTaskManagerRunnerServices() throws Exception {synchronized (lock) {//导入pekkoRpcrpcSystem = RpcSystem.load(configuration);//线程池this.executor =Executors.newScheduledThreadPool(Hardware.getNumberCPUCores(),new ExecutorThreadFactory("taskmanager-future"));//高可用highAvailabilityServices =HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration,executor,AddressResolution.NO_ADDRESS_RESOLUTION,rpcSystem,this);//JMX的监控JMXService.startInstance(configuration.get(JMXServerOptions.JMX_SERVER_PORT));//正式开启一个rpc通信rpcService = createRpcService(configuration, highAvailabilityServices, rpcSystem);//获取该资源名称this.resourceId =getTaskManagerResourceID(configuration, rpcService.getAddress(), rpcService.getPort());this.workingDirectory =ClusterEntrypointUtils.createTaskManagerWorkingDirectory(configuration, resourceId);LOG.info("Using working directory: {}", workingDirectory);//心跳服务HeartbeatServices heartbeatServices =HeartbeatServices.fromConfiguration(configuration);metricRegistry =new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration,rpcSystem.getMaximumMessageSizeInBytes(configuration)),ReporterSetup.fromConfiguration(configuration, pluginManager),TraceReporterSetup.fromConfiguration(configuration, pluginManager));final RpcService metricQueryServiceRpcService =MetricUtils.startRemoteMetricsRpcService(configuration,rpcService.getAddress(),configuration.get(TaskManagerOptions.BIND_HOST),rpcSystem);metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId.unwrap());blobCacheService =BlobUtils.createBlobCacheService(configuration,Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()),highAvailabilityServices.createBlobStore(),null);final ExternalResourceInfoProvider externalResourceInfoProvider =ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig(configuration, pluginManager);final DelegationTokenReceiverRepository delegationTokenReceiverRepository =new DelegationTokenReceiverRepository(configuration, pluginManager);//工厂类,正式创建一个 taskExecutorServicetaskExecutorService =taskExecutorServiceFactory.createTaskExecutor(this.configuration,this.resourceId.unwrap(),rpcService,highAvailabilityServices,heartbeatServices,metricRegistry,blobCacheService,false,externalResourceInfoProvider,workingDirectory.unwrap(),this,delegationTokenReceiverRepository);handleUnexpectedTaskExecutorServiceTermination();MemoryLogger.startIfConfigured(LOG, configuration, terminationFuture.thenAccept(ignored -> {}));}}
    

startTaskManagerRunnerServices

  • 该方法作为 TaskExecutor启动的核心方法:

    • RPC 服务搭建(基于 Pekko
    • 高可用机制、高性能线程池、心跳机制、指标监控等的初始化
    • 资源目录与 ID 配置
    • 创建并启动核心的 TaskExecutor
  • 理论上,可以从 createRpcService(configuration, highAvailabilityServices, rpcSystem) 进一步分析,但实质上该过程与 JobManager 类似,都是将组件作为 RpcEndpoint 挂载到 Pekko Actor 系统上,因此细节可以略过。

  • 由于 TaskExecutor 作为一个标准的 RpcEndpoint,在启动过程中也会回调其 onStart 方法,后续逻辑由该方法展开。

TaskExecutor

  • TaskExecutor 是 Flink 中专门负责执行任务的核心组件。由于继承自 RpcEndpoint,它本身具备远程通信能力。同时,TaskExecutor 不涉及 fencingToken 机制,因此在通信过程中无需关注“新旧”状态的问题(即无需处理领导权交替的影响)。

    启动流程从 TaskExecutoronStart 方法开始。下面就从其 onStart 方法入手,分析 TaskExecutor 的启动过程。

 @Overridepublic void onStart() throws Exception {try {//正式启动 taskExecutorstartTaskExecutorServices();} catch (Throwable t) {final TaskManagerException exception =new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), t);onFatalError(exception);throw exception;}startRegistrationTimeout();}private void startTaskExecutorServices() throws Exception {try {// 启动 ResouceManagerresourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());// 初始化 taskSlot,需要报告给ResourceManagertaskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());// jobLeader,专门负责jobMasterjobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());// 4️⃣ 文件缓存fileCache =new FileCache(taskManagerConfiguration.getTmpDirectories(),taskExecutorBlobService.getPermanentBlobService());// 5️⃣ 加载本地 slot 分配快照tryLoadLocalAllocationSnapshots();} catch (Exception e) {handleStartTaskExecutorServicesException(e);}}

总结:

  • TaskExecutor 启动过程中,核心的初始化步骤主要集中在以下三个组件的启动上:
    • resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener())
      启动 ResourceManager 领导者监听,用于动态发现 ResourceManager,并建立注册关系。
    • taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor())
      启动 Slot 管理与上报机制,负责本地资源槽的管理与状态同步。
    • jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl())
      启动 Job 领导者发现与注册机制,用于与各个 JobMaster 建立会话与通信。
  • 上述三部分构成了 TaskExecutor 启动过程的核心逻辑
  • 接下来的分析,将围绕这三大模块进一步拆解:TaskExecutorResourceManagerJobMaster 的交互机制
http://www.dtcms.com/a/289846.html

相关文章:

  • Windows批量修改文件属性方法
  • pyhton基础【27】课后拓展
  • 【华为机试】169. 多数元素
  • C++ STL中迭代器学习笔记
  • day057-docker-compose案例与docker镜像仓库
  • 元学习算法的数学本质:从MAML到Reptile的理论统一与深度分析
  • Vision Transformer (ViT) 介绍
  • 面试高频题 力扣 417. 太平洋大西洋水流问题 洪水灌溉(FloodFill) 深度优先遍历(dfs) 暴力搜索 C++解题思路 每日一题
  • 使用unsloth模型微调过程
  • 软件反调试(5)- 基于注册表实时调试器检测
  • MYSQL:从增删改查到高级查询
  • 数据结构-线性表的链式表示
  • 《P3398 仓鼠找 sugar》
  • 【1】YOLOv13 AI大模型-可视化图形用户(GUI)界面系统开发
  • 【实证分析】会计稳健性指标分析-ACF、CScore、Basu模型(2000-2023年)
  • MySQL锁(二) 共享锁与互斥锁
  • Filter快速入门 Java web
  • Compose笔记(三十七)--FilterChip
  • TVLT:无文本视觉-语言Transformer
  • c++ duiLib 显示一个简单的窗口
  • AMD处理器 5700G 矿卡RX580-8G 打英雄联盟怎么样
  • 洛谷 P10287 [GESP样题 七级] 最长不下降子序列-普及/提高-
  • 《P2680 [NOIP 2015 提高组] 运输计划》
  • 【66】MFC入门到精通——(CComboBox)下拉框选项顺序与添加顺序不一致
  • 前端静态资源免费cdn服务推荐
  • Dify极简部署手册
  • 30天打好数模基础-逻辑回归讲解
  • 7-大语言模型—指令理解:指令微调训练+模型微调
  • 【算法训练营Day15】二叉树part5
  • 编程研发工作日记