当前位置: 首页 > news >正文

23TaskExecutor初始化

TaskExecutor初始化流程梳理

1. 核心入口:onStart方法

TaskExecutor 继承自 RpcEndpoint,其初始化的核心逻辑始于 onStart 方法,主要负责启动 TaskExecutor 的核心服务并处理启动异常:

@Override
public void onStart() throws Exception {try {startTaskExecutorServices(); // 启动核心服务} catch (Throwable t) {final TaskManagerException exception =new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), t);onFatalError(exception);throw exception;}startRegistrationTimeout(); // 启动注册超时机制
}

2. 核心服务启动:startTaskExecutorServices

private void startTaskExecutorServices() throws Exception {try {// 1. 连接ResourceManager并注册(通过ResourceManagerLeaderListener)resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());// 2. 初始化任务槽表(TaskSlotTable),指定槽操作的实现类taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());// 3. 启动JobLeader服务,用于跟踪JobManager的leaderjobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());// 4. 初始化文件缓存(FileCache)fileCache =new FileCache(taskManagerConfiguration.getTmpDirectories(),taskExecutorBlobService.getPermanentBlobService());// 5. 加载本地分配快照tryLoadLocalAllocationSnapshots();} catch (Exception e) {handleStartTaskExecutorServicesException(e);}
}

3. TaskManagerRunner:初始化启动器

TaskManagerRunner 是 TaskExecutor 的启动入口,通过匿名函数 TaskManagerRunner::createTaskExecutorService 创建并启动 TaskExecutor 服务:

public static int runTaskManager(Configuration configuration, PluginManager pluginManager) throws Exception {final TaskManagerRunner taskManagerRunner;try {taskManagerRunner =new TaskManagerRunner(configuration,pluginManager,TaskManagerRunner::createTaskExecutorService); // 匿名函数指定服务创建逻辑taskManagerRunner.start();} catch (Exception exception) {throw new FlinkException("Failed to start the TaskManagerRunner.", exception);}// 等待终止并返回退出码try {return taskManagerRunner.getTerminationFuture().get().getExitCode();} catch (Throwable t) {throw new FlinkException("Unexpected failure during runtime of TaskManagerRunner.",ExceptionUtils.stripExecutionException(t));}
}
  • TaskManagerRunnerstartTaskManagerRunnerServices 方法会初始化 RPC 服务、高可用服务、 metrics 等基础组件,并最终调用匿名函数创建 taskExecutorService
private void startTaskManagerRunnerServices() throws Exception {synchronized (lock) {// 初始化RPC系统、线程池、高可用服务等rpcSystem = RpcSystem.load(configuration);executor = Executors.newScheduledThreadPool(...);highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(...);// ... 省略其他基础组件初始化(metrics、Blob服务等)// 调用匿名函数创建TaskExecutor服务taskExecutorService =taskExecutorServiceFactory.createTaskExecutor(this.configuration,this.resourceId.unwrap(),rpcService,highAvailabilityServices,heartbeatServices,metricRegistry,blobCacheService,false,externalResourceInfoProvider,workingDirectory.unwrap(),this,delegationTokenReceiverRepository);}
}

4. TaskExecutor 实例化过程

createTaskExecutorService 方法通过 startTaskManager 实例化 TaskExecutor,并封装为 TaskExecutorService

public static TaskExecutorService createTaskExecutorService(...) throws Exception {final TaskExecutor taskExecutor =startTaskManager(...); // 实例化TaskExecutorreturn TaskExecutorToServiceAdapter.createFor(taskExecutor); // 适配为服务
}

startTaskManager 是实例化 TaskExecutor 的核心方法,主要完成:

  • 初始化资源配置(TaskExecutorResourceSpec
  • 构建任务管理器服务配置(TaskManagerServicesConfiguration
  • 初始化任务管理器服务(TaskManagerServices,包含 IO、Shuffle、任务槽等组件)
  • 最终创建 TaskExecutor 实例
public static TaskExecutor startTaskManager(...) throws Exception {// 1. 从配置中解析资源规格final TaskExecutorResourceSpec taskExecutorResourceSpec =TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);// 2. 构建服务配置TaskManagerServicesConfiguration taskManagerServicesConfiguration =TaskManagerServicesConfiguration.fromConfiguration(...);// 3. 初始化核心服务(包含TaskSlotTable、IO管理器、Shuffle环境等)TaskManagerServices taskManagerServices =TaskManagerServices.fromConfiguration(...);// 4. 构建TaskExecutor配置TaskManagerConfiguration taskManagerConfiguration =TaskManagerConfiguration.fromConfiguration(...);// 5. 实例化TaskExecutorreturn new TaskExecutor(rpcService,taskManagerConfiguration,highAvailabilityServices,taskManagerServices,// ... 其他参数);
}

5. 与 ResourceManager 的交互

TaskExecutor 启动后需与 ResourceManager 建立连接并注册,核心逻辑在 connectToResourceManager 方法:

private void connectToResourceManager() {// 1. 封装TaskExecutor注册信息(地址、资源ID、端口、资源规格等)final TaskExecutorRegistration taskExecutorRegistration =new TaskExecutorRegistration(getAddress(),getResourceID(),unresolvedTaskManagerLocation.getDataPort(),JMXService.getPort().orElse(-1),hardwareDescription,memoryConfiguration,taskManagerConfiguration.getDefaultSlotResourceProfile(),taskManagerConfiguration.getTotalResourceProfile(),unresolvedTaskManagerLocation.getNodeId());// 2. 创建与ResourceManager的连接并启动注册resourceManagerConnection =new TaskExecutorToResourceManagerConnection(log,getRpcService(),taskManagerConfiguration.getRetryingRegistrationConfiguration(),resourceManagerAddress.getAddress(),resourceManagerAddress.getResourceManagerId(),getMainThreadExecutor(),new ResourceManagerRegistrationListener(),taskExecutorRegistration);resourceManagerConnection.start();
}
  • 注册过程通过 TaskExecutorToResourceManagerConnectioninvokeRegistration 调用 ResourceManager 的 registerTaskExecutor 方法完成:
@Override
protected CompletableFuture<RegistrationResponse> invokeRegistration(ResourceManagerGateway resourceManager,ResourceManagerId fencingToken,long timeoutMillis) throws Exception {Time timeout = Time.milliseconds(timeoutMillis);return resourceManager.registerTaskExecutor(taskExecutorRegistration, timeout); // 调用RM注册接口
}
  • 注册成功后,会通过 createNewRegistrationonRegistrationSuccess 方法调用 establishResourceManagerConnection
@Override
public void onRegistrationSuccess(TaskExecutorToResourceManagerConnection connection,TaskExecutorRegistrationSuccess success) {final ResourceID resourceManagerId = success.getResourceManagerId();final InstanceID taskExecutorRegistrationId = success.getRegistrationId();final ClusterInformation clusterInformation = success.getClusterInformation();final ResourceManagerGateway resourceManagerGateway = connection.getTargetGateway();byte[] tokens = success.getInitialTokens();if (tokens != null) {try {log.info("Receive initial delegation tokens from resource manager");delegationTokenReceiverRepository.onNewTokensObtained(tokens);} catch (Throwable t) {log.error("Could not update delegation tokens.", t);ExceptionUtils.rethrowIfFatalError(t);}}runAsync(() -> {// 过滤掉过时的连接//noinspection ObjectEqualityif (resourceManagerConnection == connection) {try {establishResourceManagerConnection(resourceManagerGateway,resourceManagerId,taskExecutorRegistrationId,clusterInformation);} catch (Throwable t) {log.error("Establishing Resource Manager connection in Task Executor failed",t);}}});
}private void establishResourceManagerConnection(ResourceManagerGateway resourceManagerGateway,ResourceID resourceManagerResourceId,InstanceID taskExecutorRegistrationId,ClusterInformation clusterInformation) {// 这一步非常重要,向ResourceManager报告当前TaskExecutor拥有的slot数量final CompletableFuture<Acknowledge> slotReportResponseFuture =resourceManagerGateway.sendSlotReport(getResourceID(),taskExecutorRegistrationId,taskSlotTable.createSlotReport(getResourceID()),Time.fromDuration(taskManagerConfiguration.getRpcTimeout()));slotReportResponseFuture.whenCompleteAsync((acknowledge, throwable) -> {if (throwable != null) {reconnectToResourceManager(new TaskManagerException("Failed to send initial slot report to ResourceManager.",throwable));}},getMainThreadExecutor());// 监控ResourceManager作为心跳目标resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId,new ResourceManagerHeartbeatReceiver(resourceManagerGateway));// 设置传播的blob服务器地址final InetSocketAddress blobServerAddress =new InetSocketAddress(clusterInformation.getBlobServerHostname(),clusterInformation.getBlobServerPort());taskExecutorBlobService.setBlobServerAddress(blobServerAddress);establishedResourceManagerConnection =new EstablishedResourceManagerConnection(resourceManagerGateway,resourceManagerResourceId,taskExecutorRegistrationId);stopRegistrationTimeout();
}

establishResourceManagerConnection 方法主要完成:

  1. 向 ResourceManager 发送初始 slot 报告,告知自身可用的资源情况
  2. 建立与 ResourceManager 的心跳监控机制
  3. 配置 Blob 服务地址,用于后续的文件传输
  4. 保存已建立的连接信息并停止注册超时计时器
  • 注册成功后,会通过 createNewRegistrationonRegistrationSuccess 方法调用 establishResourceManagerConnection

6. 任务槽表(TaskSlotTable)初始化

TaskSlotTable 负责管理 TaskExecutor 的任务槽,其初始化在 TaskManagerServices 中完成:

public static TaskManagerServices fromConfiguration(...) throws Exception {// ... 其他服务初始化final TaskSlotTable<Task> taskSlotTable =createTaskSlotTable(taskManagerServicesConfiguration.getNumberOfSlots(), // 槽数量taskManagerServicesConfiguration.getTaskExecutorResourceSpec(), // 资源规格taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),taskManagerServicesConfiguration.getPageSize(),ioExecutor);// ... 封装到TaskManagerServices并返回
}

TaskSlotTablestartTaskExecutorServices 中被启动,并关联槽操作的实现类 SlotActionsImpl,用于处理槽的分配、释放等操作。

总结

TaskExecutor 的初始化流程可概括为:

  1. TaskManagerRunner 启动,通过匿名函数指定服务创建逻辑;
  2. 初始化基础组件(RPC、高可用、metrics 等);
  3. 调用 onStart 方法启动核心服务,包括连接 ResourceManager、初始化任务槽表等;
  4. 与 ResourceManager 建立连接并注册自身信息(资源、端口等);
  5. 完成初始化后,通过 TaskSlotTable 管理任务槽,准备接收任务分配。
http://www.dtcms.com/a/341529.html

相关文章:

  • Windows 命令行:dir 命令
  • MyBatis 动态查询语句详解:让 SQL 更灵活可控
  • 前端调用阿里云接口语音合成演示
  • 20人团队文件共享选哪款?群晖DS925+ 和 DS1525+深度对比
  • 反射基础知识初入(up晚上回家再补完剩下的)
  • Anomalib:在Linux服务器上安装使用Anomalib 2.1.0
  • 生意参谋-市场竞争分析-提升商品成长效率
  • PostgreSQL 中的金钱计算处理
  • C语言第十章内存函数
  • 《SQLAlchemy 2 In Practice》读后感
  • win与ubuntu双系统安装笔记
  • 小波函数多尺度变换的 Curvelet 变换
  • vue3项目,使用vue2方式来写,可以吗
  • 【嵌入式】CAN通信
  • 基于XGBoost算法的数据回归预测 极限梯度提升算法 XGBoost
  • 虚拟机部署HDFS集群
  • JDK 工具
  • IDEA(十四) IntelliJ Idea 常用快捷键(Mac)
  • 会计人员职业发展框架:核心能力构建与进阶路径
  • ROADS落地的架构蓝图
  • Java 通过 m3u8 链接下载所有 ts 视频切片并合并转换为 mp4 格式
  • Odoo 18 通用图片导入工具:从零到一的企业级开发实战
  • 记录一次ubuntu系统下ovito无法调用显卡驱动报错
  • keepalived的配置
  • Java内置注解
  • 区块链技术:重塑未来互联网的伟大动力
  • 中金所股指期货交易规则
  • c++之指针和引用
  • 第三十三天(信号量)
  • 大模型—— DeepSeek V3.1 Base / Instruct 发布