当前位置: 首页 > news >正文

JobManager 初始化流程解析

JobManager 初始化流程解析

JobManager 核心角色

  • JobManager 是 Flink 集群中的核心进程(在 Standalone 模式、YARN、K8s 等部署模式下均存在),主要负责资源管理、任务调度和作业生命周期管理。
  • 其内部包含多个子组件,核心组件为 ResourceManager 和 JobMaster,二者均运行在 JobManager 进程中。
  • 本文将从客户端提交任务后的接收环节开始,解析 JobManager 的初始化流程。

JobSubmitHandler:接收任务提交

JobSubmitHandler 类专门负责接收客户端提交的任务,核心逻辑在 handleRequest 方法中:

@Override
protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody> request,@Nonnull DispatcherGateway gateway)throws RestHandlerException {// 处理上传文件final Collection<File> uploadedFiles = request.getUploadedFiles();final Map<String, Path> nameToFile =uploadedFiles.stream().collect(Collectors.toMap(File::getName, Path::fromLocalFile));// 校验文件唯一性if (uploadedFiles.size() != nameToFile.size()) {throw new RestHandlerException(String.format("上传文件数量与预期不符。预期: %s 实际: %s",nameToFile.size(),uploadedFiles.size()),HttpResponseStatus.BAD_REQUEST);}final JobSubmitRequestBody requestBody = request.getRequestBody();// 校验JobGraph文件是否存在if (requestBody.jobGraphFileName == null) {throw new RestHandlerException(String.format("%s字段不可省略或为null。",JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),HttpResponseStatus.BAD_REQUEST);}// 加载JobGraphCompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);// 获取需上传的JAR和artifact文件Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);Collection<Tuple2<String, Path>> artifacts =getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);// 上传JobGraph相关文件并完成初始化CompletableFuture<JobGraph> finalizedJobGraphFuture =uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);// 提交作业CompletableFuture<Acknowledge> jobSubmissionFuture =finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));// 返回作业提交结果return jobSubmissionFuture.thenCombine(jobGraphFuture,(ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
}

Dispatcher:作业提交与管理

Dispatcher 是作业提交的核心协调者,其 submitJob 方法负责作业提交的校验与处理:

@Override
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {final JobID jobID = jobGraph.getJobID();log.info("接收作业提交 '{}' ({})。", jobGraph.getName(), jobID);return isInGloballyTerminalState(jobID).thenComposeAsync(isTerminated -> {// 校验作业状态:已终止则拒绝提交if (isTerminated) {log.warn("忽略作业提交 '{}' ({}),因为作业已处于全局终止状态。",jobGraph.getName(),jobID);return FutureUtils.completedExceptionally(DuplicateJobSubmissionException.ofGloballyTerminated(jobID));} // 校验作业是否已存在else if (jobManagerRunnerRegistry.isRegistered(jobID)|| submittedAndWaitingTerminationJobIDs.contains(jobID)) {return FutureUtils.completedExceptionally(DuplicateJobSubmissionException.of(jobID));} // 校验资源配置else if (isPartialResourceConfigured(jobGraph)) {return FutureUtils.completedExceptionally(new JobSubmissionException(jobID,"当前不支持部分顶点配置资源的作业,该限制将在未来版本移除。"));} // 执行实际提交逻辑else {return internalSubmitJob(jobGraph);}},getMainThreadExecutor());
}

internalSubmitJob 方法进一步处理作业提交的核心流程:

private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {applyParallelismOverrides(jobGraph);log.info("提交作业 '{}' ({})。", jobGraph.getName(), jobGraph.getJobID());// 标记为待处理作业submittedAndWaitingTerminationJobIDs.add(jobGraph.getJobID());// 持久化并运行作业return waitForTerminatingJob(jobGraph.getJobID(), jobGraph, this::persistAndRunJob).handle((ignored, throwable) -> handleTermination(jobGraph.getJobID(), throwable)).thenCompose(Function.identity()).whenComplete((ignored, throwable) ->// 移除已处理的作业标记submittedAndWaitingTerminationJobIDs.remove(jobGraph.getJobID()));
}

persistAndRunJob 方法负责作业的持久化与 JobMasterRunner 的创建:

private void persistAndRunJob(JobGraph jobGraph) throws Exception {// 持久化JobGraphjobGraphWriter.putJobGraph(jobGraph);// 初始化作业客户端过期时间initJobClientExpiredTime(jobGraph);// 创建并运行JobMasterRunnerrunJob(createJobMasterRunner(jobGraph), ExecutionType.SUBMISSION);
}

JobMasterRunner 的创建与启动

createJobMasterRunner 方法通过工厂类创建 JobMasterRunner:

private JobManagerRunner createJobMasterRunner(JobGraph jobGraph) throws Exception {Preconditions.checkState(!jobManagerRunnerRegistry.isRegistered(jobGraph.getJobID()));// 通过工厂类创建JobManagerRunnerreturn jobManagerRunnerFactory.createJobManagerRunner(jobGraph,configuration,getRpcService(),highAvailabilityServices,heartbeatServices,jobManagerSharedServices,new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),fatalErrorHandler,failureEnrichers,System.currentTimeMillis());
}

runJob 方法启动 JobMasterRunner 并完成注册:

private void runJob(JobManagerRunner jobManagerRunner, ExecutionType executionType)throws Exception {// 启动JobManagerjobManagerRunner.start();// 注册JobManagerRunnerjobManagerRunnerRegistry.register(jobManagerRunner);// 处理JobMaster结束后的清理逻辑final JobID jobId = jobManagerRunner.getJobID();final CompletableFuture<CleanupJobState> cleanupJobStateFuture =jobManagerRunner.getResultFuture().handleAsync((jobManagerRunnerResult, throwable) -> {Preconditions.checkState(jobManagerRunnerRegistry.isRegistered(jobId)&& jobManagerRunnerRegistry.get(jobId)== jobManagerRunner,"运行中的作业条目必须与JobManagerRunner的生命周期绑定。");if (jobManagerRunnerResult != null) {return handleJobManagerRunnerResult(jobManagerRunnerResult, executionType);} else {return CompletableFuture.completedFuture(jobManagerRunnerFailed(jobId, JobStatus.FAILED, throwable));}},getMainThreadExecutor()).thenCompose(Function.identity());// 作业终止后的清理final CompletableFuture<Void> jobTerminationFuture =cleanupJobStateFuture.thenCompose(cleanupJobState ->removeJob(jobId, cleanupJobState).exceptionally(throwable ->logCleanupErrorWarning(jobId, throwable)));// 处理未捕获的异常FutureUtils.handleUncaughtException(jobTerminationFuture,(thread, throwable) -> fatalErrorHandler.onFatalError(throwable));registerJobManagerRunnerTerminationFuture(jobId, jobTerminationFuture);
}

JobMasterServiceLeadershipRunnerFactory

该工厂类负责构建 JobMaster,为 JobMasterRunner 启动 JobMaster 做准备:

@Override
public JobManagerRunner createJobManagerRunner(JobGraph jobGraph,Configuration configuration,RpcService rpcService,HighAvailabilityServices highAvailabilityServices,HeartbeatServices heartbeatServices,JobManagerSharedServices jobManagerServices,JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,FatalErrorHandler fatalErrorHandler,Collection<FailureEnricher> failureEnrichers,long initializationTimestamp)throws Exception {checkArgument(jobGraph.getNumberOfVertices() > 0, "作业不能为空");// 创建JobMaster配置final JobMasterConfiguration jobMasterConfiguration =JobMasterConfiguration.fromConfiguration(configuration);// 获取作业结果存储final JobResultStore jobResultStore = highAvailabilityServices.getJobResultStore();// 获取JobManager leader选举服务final LeaderElection jobManagerLeaderElection =highAvailabilityServices.getJobManagerLeaderElection(jobGraph.getJobID());// 创建Slot池服务调度器工厂(负责资源分配)final SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory =DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(configuration, jobGraph.getJobType(), jobGraph.isDynamic());// 校验响应式模式与调度器类型的兼容性if (jobMasterConfiguration.getConfiguration().get(JobManagerOptions.SCHEDULER_MODE)== SchedulerExecutionMode.REACTIVE) {Preconditions.checkState(slotPoolServiceSchedulerFactory.getSchedulerType()== JobManagerOptions.SchedulerType.Adaptive,"响应式模式需要自适应调度器");}// 注册类加载器租约final LibraryCacheManager.ClassLoaderLease classLoaderLease =jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID());// 获取用户代码类加载器final ClassLoader userCodeClassLoader =classLoaderLease.getOrResolveClassLoader(jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths()).asClassLoader();// 创建JobMaster服务工厂final DefaultJobMasterServiceFactory jobMasterServiceFactory =new DefaultJobMasterServiceFactory(jobManagerServices.getIoExecutor(),rpcService,jobMasterConfiguration,jobGraph,highAvailabilityServices,slotPoolServiceSchedulerFactory,jobManagerServices,heartbeatServices,jobManagerJobMetricGroupFactory,fatalErrorHandler,userCodeClassLoader,failureEnrichers,initializationTimestamp);// 创建JobMaster服务进程工厂final DefaultJobMasterServiceProcessFactory jobMasterServiceProcessFactory =new DefaultJobMasterServiceProcessFactory(jobGraph.getJobID(),jobGraph.getName(),jobGraph.getCheckpointingSettings(),initializationTimestamp,jobMasterServiceFactory);// 返回JobMaster服务领导力运行器return new JobMasterServiceLeadershipRunner(jobMasterServiceProcessFactory,jobManagerLeaderElection,jobResultStore,classLoaderLease,fatalErrorHandler);
}

JobMasterServiceLeadershipRunner

JobMasterServiceLeadershipRunner 继承了 JobManagerRunnerLeaderContender,核心逻辑在 grantLeadership 方法中,负责获取领导力后启动 JobMaster:

@Override
public void grantLeadership(UUID leaderSessionID) {runIfStateRunning(// 启动JobMasterServiceProcess() -> startJobMasterServiceProcessAsync(leaderSessionID),"启动新的JobMasterServiceProcess");
}@GuardedBy("lock")
private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {sequentialOperation =sequentialOperation.thenCompose(unused ->jobResultStore.hasJobResultEntryAsync(getJobID()).thenCompose(hasJobResult -> {if (hasJobResult) {return handleJobAlreadyDoneIfValidLeader(leaderSessionId);} else {// 创建新的JobMaster服务进程return createNewJobMasterServiceProcessIfValidLeader(leaderSessionId);}}));handleAsyncOperationError(sequentialOperation, "无法启动作业管理器。");
}private CompletableFuture<Void> createNewJobMasterServiceProcessIfValidLeader(UUID leaderSessionId) {return runIfValidLeader(leaderSessionId,() ->// 执行JobMaster服务进程的创建ThrowingRunnable.unchecked(() -> createNewJobMasterServiceProcess(leaderSessionId)).run(),"创建新的JobMaster服务进程");
}@GuardedBy("lock")
private void createNewJobMasterServiceProcess(UUID leaderSessionId) throws FlinkException {Preconditions.checkState(jobMasterServiceProcess.closeAsync().isDone());LOG.info("{} 为作业 {} 授予领导力(leader id: {})。创建新的{}。",getClass().getSimpleName(),getJobID(),leaderSessionId,JobMasterServiceProcess.class.getSimpleName());// 创建JobMaster服务进程jobMasterServiceProcess = jobMasterServiceProcessFactory.create(leaderSessionId);// 配置异步回调逻辑forwardIfValidLeader(leaderSessionId,jobMasterServiceProcess.getJobMasterGatewayFuture(),jobMasterGatewayFuture,"来自JobMasterServiceProcess的JobMasterGatewayFuture");forwardResultFuture(leaderSessionId, jobMasterServiceProcess.getResultFuture());confirmLeadership(leaderSessionId, jobMasterServiceProcess.getLeaderAddressFuture());
}

最终的 JobMaster 创建

DefaultJobMasterServiceProcessDefaultJobMasterServiceFactory 完成 JobMaster 的最终创建:

// DefaultJobMasterServiceProcess 构造方法
public DefaultJobMasterServiceProcess(JobID jobId,UUID leaderSessionId,JobMasterServiceFactory jobMasterServiceFactory,Function<Throwable, ArchivedExecutionGraph> failedArchivedExecutionGraphFactory) {this.jobId = jobId;this.leaderSessionId = leaderSessionId;// 创建JobMasterServicethis.jobMasterServiceFuture =jobMasterServiceFactory.createJobMasterService(leaderSessionId, this);// 处理创建结果jobMasterServiceFuture.whenComplete((jobMasterService, throwable) -> {if (throwable != null) {final JobInitializationException jobInitializationException =new JobInitializationException(jobId, "无法启动JobMaster。", throwable);LOG.debug("作业 {} 在leader id {} 下的JobMasterService初始化失败。",jobId,leaderSessionId,jobInitializationException);resultFuture.complete(JobManagerRunnerResult.forInitializationFailure(new ExecutionGraphInfo(failedArchivedExecutionGraphFactory.apply(jobInitializationException)),jobInitializationException));} else {registerJobMasterServiceFutures(jobMasterService);}});
}// DefaultJobMasterServiceFactory 创建JobMaster
@Override
public CompletableFuture<JobMasterService> createJobMasterService(UUID leaderSessionId, OnCompletionActions onCompletionActions) {return CompletableFuture.supplyAsync(FunctionUtils.uncheckedSupplier(() -> internalCreateJobMasterService(leaderSessionId, onCompletionActions)),executor);
}private JobMasterService internalCreateJobMasterService(UUID leaderSessionId, OnCompletionActions onCompletionActions) throws Exception {// 创建JobMaster实例final JobMaster jobMaster =new JobMaster(rpcService,JobMasterId.fromUuidOrNull(leaderSessionId),jobMasterConfiguration,ResourceID.generate(),jobGraph,haServices,slotPoolServiceSchedulerFactory,jobManagerSharedServices,heartbeatServices,jobManagerJobMetricGroupFactory,onCompletionActions,fatalErrorHandler,userCodeClassloader,shuffleMaster,lookup ->new JobMasterPartitionTrackerImpl(jobGraph.getJobID(), shuffleMaster, lookup),new DefaultExecutionDeploymentTracker(),DefaultExecutionDeploymentReconciler::new,BlocklistUtils.loadBlocklistHandlerFactory(jobMasterConfiguration.getConfiguration()),failureEnrichers,initializationTimestamp);// 启动JobMasterjobMaster.start();return jobMaster;
}

JobMaster 创建后,会与 ResourceManager 建立通信并协调资源分配,后续将详细解析这部分逻辑。


文章转载自:

http://SZB8g5bw.bsxws.cn
http://TYVXdcvX.bsxws.cn
http://2hAfexew.bsxws.cn
http://utWrBCO0.bsxws.cn
http://U2mstirf.bsxws.cn
http://cuyDf1W0.bsxws.cn
http://5J3Dq4fL.bsxws.cn
http://MO7LE5Qc.bsxws.cn
http://sYmWoYhi.bsxws.cn
http://1wnIFpd9.bsxws.cn
http://r6vrE6ay.bsxws.cn
http://Bys4OBuC.bsxws.cn
http://I07HIDh4.bsxws.cn
http://C42pOj5q.bsxws.cn
http://VnUDnQe8.bsxws.cn
http://hKL6uYEi.bsxws.cn
http://0XVFrBov.bsxws.cn
http://Z24SBCT6.bsxws.cn
http://WakMsNMf.bsxws.cn
http://Lw9XLJHx.bsxws.cn
http://LOEj43aM.bsxws.cn
http://MsKjET8M.bsxws.cn
http://AKouj9fR.bsxws.cn
http://clsJHzQ1.bsxws.cn
http://vikWxMVN.bsxws.cn
http://eh1FidFK.bsxws.cn
http://STCx65xQ.bsxws.cn
http://BcnNPZ0R.bsxws.cn
http://W98SIC0j.bsxws.cn
http://gaqRuS83.bsxws.cn
http://www.dtcms.com/a/371022.html

相关文章:

  • 天气预报云服务器部署实战
  • 【C++ 双指针技巧】
  • Leetcode77:组合问题
  • Java多线程学习笔记
  • 命令行工具集 CDO 功能总结
  • ​抢占AI搜索新入口:2025年五大专业GEO优化服务商解析
  • 使用Ansible自动化部署Hadoop集群(含源码)--环境准备
  • FlashAttention:突破Transformer内存瓶颈的IO感知革命
  • C++ 并发编程指南 实现无锁队列
  • 制衣跟单高效管理软件推荐
  • lesson55:CSS导航组件全攻略:从基础导航条到动态三级菜单与伸缩菜单实现
  • Typora处理markdown文件【给.md文档加水印】
  • MySQL 视图全方位解析
  • 基于SVN搭建企业内部知识库系统实践
  • 编排与存储
  • Ai8051 2.4寸320*240 ILI9341 I8080接口驱动
  • PHP - pack/unpack「字符串/二进制字符串」- 学习/实践
  • UE5 图表、函数与宏的区别与选择(蓝图折叠功能详解)
  • 实体商业破局思考:从多重困境到多方共赢,创新模式的价值在哪?
  • 中州养老项目:利用Redis解决权限接口响应慢的问题
  • 轻量应用服务器具体指的是什么?
  • 3.进程调度:常见算法
  • LeetCode - 202. 快乐数
  • Docker容器安全最佳实践:镜像扫描、权限控制与逃逸防范
  • 【攻防实战】浅谈Cobalt Strike远控实战
  • 优化MySQL分区表备份流程详解
  • 《论文阅读》贴纸对多模态聊天情感分析和意图识别的影响:一个新的任务、数据集和基线 ACM MM 2025
  • 主数据系统是否对于企业是必需的?
  • 深入理解 RequestContextHolder、ThreadLocal 与 RequestContextFilter
  • GD32入门到就业37--文件系统于FatFs