当前位置: 首页 > news >正文

朝阳网站推广企业网站模板价格

朝阳网站推广,企业网站模板价格,网站建设目标分析,如何建设酒店预订系统网站TaskExecutor初始化流程梳理 1. 核心入口:onStart方法 TaskExecutor 继承自 RpcEndpoint,其初始化的核心逻辑始于 onStart 方法,主要负责启动 TaskExecutor 的核心服务并处理启动异常: Override public void onStart() throws Exc…

TaskExecutor初始化流程梳理

1. 核心入口:onStart方法

TaskExecutor 继承自 RpcEndpoint,其初始化的核心逻辑始于 onStart 方法,主要负责启动 TaskExecutor 的核心服务并处理启动异常:

@Override
public void onStart() throws Exception {try {startTaskExecutorServices(); // 启动核心服务} catch (Throwable t) {final TaskManagerException exception =new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), t);onFatalError(exception);throw exception;}startRegistrationTimeout(); // 启动注册超时机制
}

2. 核心服务启动:startTaskExecutorServices

private void startTaskExecutorServices() throws Exception {try {// 1. 连接ResourceManager并注册(通过ResourceManagerLeaderListener)resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());// 2. 初始化任务槽表(TaskSlotTable),指定槽操作的实现类taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());// 3. 启动JobLeader服务,用于跟踪JobManager的leaderjobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());// 4. 初始化文件缓存(FileCache)fileCache =new FileCache(taskManagerConfiguration.getTmpDirectories(),taskExecutorBlobService.getPermanentBlobService());// 5. 加载本地分配快照tryLoadLocalAllocationSnapshots();} catch (Exception e) {handleStartTaskExecutorServicesException(e);}
}

3. TaskManagerRunner:初始化启动器

TaskManagerRunner 是 TaskExecutor 的启动入口,通过匿名函数 TaskManagerRunner::createTaskExecutorService 创建并启动 TaskExecutor 服务:

public static int runTaskManager(Configuration configuration, PluginManager pluginManager) throws Exception {final TaskManagerRunner taskManagerRunner;try {taskManagerRunner =new TaskManagerRunner(configuration,pluginManager,TaskManagerRunner::createTaskExecutorService); // 匿名函数指定服务创建逻辑taskManagerRunner.start();} catch (Exception exception) {throw new FlinkException("Failed to start the TaskManagerRunner.", exception);}// 等待终止并返回退出码try {return taskManagerRunner.getTerminationFuture().get().getExitCode();} catch (Throwable t) {throw new FlinkException("Unexpected failure during runtime of TaskManagerRunner.",ExceptionUtils.stripExecutionException(t));}
}
  • TaskManagerRunnerstartTaskManagerRunnerServices 方法会初始化 RPC 服务、高可用服务、 metrics 等基础组件,并最终调用匿名函数创建 taskExecutorService
private void startTaskManagerRunnerServices() throws Exception {synchronized (lock) {// 初始化RPC系统、线程池、高可用服务等rpcSystem = RpcSystem.load(configuration);executor = Executors.newScheduledThreadPool(...);highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(...);// ... 省略其他基础组件初始化(metrics、Blob服务等)// 调用匿名函数创建TaskExecutor服务taskExecutorService =taskExecutorServiceFactory.createTaskExecutor(this.configuration,this.resourceId.unwrap(),rpcService,highAvailabilityServices,heartbeatServices,metricRegistry,blobCacheService,false,externalResourceInfoProvider,workingDirectory.unwrap(),this,delegationTokenReceiverRepository);}
}

4. TaskExecutor 实例化过程

createTaskExecutorService 方法通过 startTaskManager 实例化 TaskExecutor,并封装为 TaskExecutorService

public static TaskExecutorService createTaskExecutorService(...) throws Exception {final TaskExecutor taskExecutor =startTaskManager(...); // 实例化TaskExecutorreturn TaskExecutorToServiceAdapter.createFor(taskExecutor); // 适配为服务
}

startTaskManager 是实例化 TaskExecutor 的核心方法,主要完成:

  • 初始化资源配置(TaskExecutorResourceSpec
  • 构建任务管理器服务配置(TaskManagerServicesConfiguration
  • 初始化任务管理器服务(TaskManagerServices,包含 IO、Shuffle、任务槽等组件)
  • 最终创建 TaskExecutor 实例
public static TaskExecutor startTaskManager(...) throws Exception {// 1. 从配置中解析资源规格final TaskExecutorResourceSpec taskExecutorResourceSpec =TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);// 2. 构建服务配置TaskManagerServicesConfiguration taskManagerServicesConfiguration =TaskManagerServicesConfiguration.fromConfiguration(...);// 3. 初始化核心服务(包含TaskSlotTable、IO管理器、Shuffle环境等)TaskManagerServices taskManagerServices =TaskManagerServices.fromConfiguration(...);// 4. 构建TaskExecutor配置TaskManagerConfiguration taskManagerConfiguration =TaskManagerConfiguration.fromConfiguration(...);// 5. 实例化TaskExecutorreturn new TaskExecutor(rpcService,taskManagerConfiguration,highAvailabilityServices,taskManagerServices,// ... 其他参数);
}

5. 与 ResourceManager 的交互

TaskExecutor 启动后需与 ResourceManager 建立连接并注册,核心逻辑在 connectToResourceManager 方法:

private void connectToResourceManager() {// 1. 封装TaskExecutor注册信息(地址、资源ID、端口、资源规格等)final TaskExecutorRegistration taskExecutorRegistration =new TaskExecutorRegistration(getAddress(),getResourceID(),unresolvedTaskManagerLocation.getDataPort(),JMXService.getPort().orElse(-1),hardwareDescription,memoryConfiguration,taskManagerConfiguration.getDefaultSlotResourceProfile(),taskManagerConfiguration.getTotalResourceProfile(),unresolvedTaskManagerLocation.getNodeId());// 2. 创建与ResourceManager的连接并启动注册resourceManagerConnection =new TaskExecutorToResourceManagerConnection(log,getRpcService(),taskManagerConfiguration.getRetryingRegistrationConfiguration(),resourceManagerAddress.getAddress(),resourceManagerAddress.getResourceManagerId(),getMainThreadExecutor(),new ResourceManagerRegistrationListener(),taskExecutorRegistration);resourceManagerConnection.start();
}
  • 注册过程通过 TaskExecutorToResourceManagerConnectioninvokeRegistration 调用 ResourceManager 的 registerTaskExecutor 方法完成:
@Override
protected CompletableFuture<RegistrationResponse> invokeRegistration(ResourceManagerGateway resourceManager,ResourceManagerId fencingToken,long timeoutMillis) throws Exception {Time timeout = Time.milliseconds(timeoutMillis);return resourceManager.registerTaskExecutor(taskExecutorRegistration, timeout); // 调用RM注册接口
}
  • 注册成功后,会通过 createNewRegistrationonRegistrationSuccess 方法调用 establishResourceManagerConnection
@Override
public void onRegistrationSuccess(TaskExecutorToResourceManagerConnection connection,TaskExecutorRegistrationSuccess success) {final ResourceID resourceManagerId = success.getResourceManagerId();final InstanceID taskExecutorRegistrationId = success.getRegistrationId();final ClusterInformation clusterInformation = success.getClusterInformation();final ResourceManagerGateway resourceManagerGateway = connection.getTargetGateway();byte[] tokens = success.getInitialTokens();if (tokens != null) {try {log.info("Receive initial delegation tokens from resource manager");delegationTokenReceiverRepository.onNewTokensObtained(tokens);} catch (Throwable t) {log.error("Could not update delegation tokens.", t);ExceptionUtils.rethrowIfFatalError(t);}}runAsync(() -> {// 过滤掉过时的连接//noinspection ObjectEqualityif (resourceManagerConnection == connection) {try {establishResourceManagerConnection(resourceManagerGateway,resourceManagerId,taskExecutorRegistrationId,clusterInformation);} catch (Throwable t) {log.error("Establishing Resource Manager connection in Task Executor failed",t);}}});
}private void establishResourceManagerConnection(ResourceManagerGateway resourceManagerGateway,ResourceID resourceManagerResourceId,InstanceID taskExecutorRegistrationId,ClusterInformation clusterInformation) {// 这一步非常重要,向ResourceManager报告当前TaskExecutor拥有的slot数量final CompletableFuture<Acknowledge> slotReportResponseFuture =resourceManagerGateway.sendSlotReport(getResourceID(),taskExecutorRegistrationId,taskSlotTable.createSlotReport(getResourceID()),Time.fromDuration(taskManagerConfiguration.getRpcTimeout()));slotReportResponseFuture.whenCompleteAsync((acknowledge, throwable) -> {if (throwable != null) {reconnectToResourceManager(new TaskManagerException("Failed to send initial slot report to ResourceManager.",throwable));}},getMainThreadExecutor());// 监控ResourceManager作为心跳目标resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId,new ResourceManagerHeartbeatReceiver(resourceManagerGateway));// 设置传播的blob服务器地址final InetSocketAddress blobServerAddress =new InetSocketAddress(clusterInformation.getBlobServerHostname(),clusterInformation.getBlobServerPort());taskExecutorBlobService.setBlobServerAddress(blobServerAddress);establishedResourceManagerConnection =new EstablishedResourceManagerConnection(resourceManagerGateway,resourceManagerResourceId,taskExecutorRegistrationId);stopRegistrationTimeout();
}

establishResourceManagerConnection 方法主要完成:

  1. 向 ResourceManager 发送初始 slot 报告,告知自身可用的资源情况
  2. 建立与 ResourceManager 的心跳监控机制
  3. 配置 Blob 服务地址,用于后续的文件传输
  4. 保存已建立的连接信息并停止注册超时计时器
  • 注册成功后,会通过 createNewRegistrationonRegistrationSuccess 方法调用 establishResourceManagerConnection

6. 任务槽表(TaskSlotTable)初始化

TaskSlotTable 负责管理 TaskExecutor 的任务槽,其初始化在 TaskManagerServices 中完成:

public static TaskManagerServices fromConfiguration(...) throws Exception {// ... 其他服务初始化final TaskSlotTable<Task> taskSlotTable =createTaskSlotTable(taskManagerServicesConfiguration.getNumberOfSlots(), // 槽数量taskManagerServicesConfiguration.getTaskExecutorResourceSpec(), // 资源规格taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),taskManagerServicesConfiguration.getPageSize(),ioExecutor);// ... 封装到TaskManagerServices并返回
}

TaskSlotTablestartTaskExecutorServices 中被启动,并关联槽操作的实现类 SlotActionsImpl,用于处理槽的分配、释放等操作。

总结

TaskExecutor 的初始化流程可概括为:

  1. TaskManagerRunner 启动,通过匿名函数指定服务创建逻辑;
  2. 初始化基础组件(RPC、高可用、metrics 等);
  3. 调用 onStart 方法启动核心服务,包括连接 ResourceManager、初始化任务槽表等;
  4. 与 ResourceManager 建立连接并注册自身信息(资源、端口等);
  5. 完成初始化后,通过 TaskSlotTable 管理任务槽,准备接收任务分配。
http://www.dtcms.com/a/574145.html

相关文章:

  • ControlNet:Adding Conditional Control to Text-to-Image Diffusion Models
  • 网站维护的方法asp.net 4.0网站开发实例教程
  • web前端 DSL转换技术
  • 有什么好的网站推荐一下成都网页制作要多少钱
  • 厦门建设厅网站站长工具seo综合查询官网
  • 灯塔工厂:如何通过数字化实现制造升级
  • 电商网站 内容优化哪一个网站有做实验的过程
  • 电商平台数据分析市场洞察:API 技术如何打通数据价值闭环
  • 网站设计O2O平台小学学校网站
  • 做企业网站需要准备什么材料手机网站成功案例
  • gitLab如何新建分支(根据某个分支)
  • WEEX平台安全验证技术规范与实践
  • C++ opencv简化轮廓
  • 网站服务器租用和托管wordpress 个人国内收款
  • Java中将System.out内容写入Tomcat日志
  • 做国外的营销的网站官方网站撰写策划书
  • dmp导出到asm可以吗?
  • lprm命令 – 移除打印队列中的任务
  • BuildingAI二开 用户信息增加Coze套餐名称和剩余天数PRD
  • 网站建设有几种方法如何建设一个网站站
  • 格朗图手表网站网站建设服务亮点
  • 银行“两地三中心”网络高可用运维监控方案
  • 基于Vue的学分预警系统98k51(程序 + 源码 + 数据库 + 调试部署 + 开发环境配置),配套论文文档字数达万字以上,文末可获取,系统界面展示置于文末
  • 沈阳餐饮网站建设wordpress搜索功能调用
  • 网站建设公司如何签单上海的设计网站有哪些内容
  • 断层错动和近断层地震动联合作用下软岩隧洞衬砌损伤分析
  • 2025字节跳动工程训练营(测开)
  • 三轴云台之磁场定向控制技术
  • 盘县网站建设福州网站设计公司
  • MiniEngine学习笔记 : RootSignature