当前位置: 首页 > news >正文

Flink-1.19.0源码详解6-JobGraph生成-后篇

        在上文《Flink-1.19.0源码详解5-JobGraph生成-前篇》已解析了JobGraph生成源码中递归创建算子链与JobVertex链头节点生成的内容,下面将继续解析JobGraph的JobEdge边创建、IntermediateDataSet数据集创建、算子链序列化、客户端请求提交的完整源码。

1.JobGraph生成核心方法

        StreamingJobGraphGenerator.createJobGraph()方法为JobGraph生成的核心方法,包含了所有创建JobGraph的关键步骤。首先创建了算子链把可链的节点链起来并创建JobVertex链头节点,并对不可链的输出创建JobEdge连接前后JobVertex,将算子信息序列化到JobVertex中,并最终配置JobGraph的SlotSharingGroup、Checkpoint、SavepointRestore、ExecutionConfig完成JobGraph的创建。

源码图解:

StreamingJobGraphGenerator.createJobGraph()方法源码:

private JobGraph createJobGraph() {//验证Checkpoint以及配置JobType、DynamicpreValidate();jobGraph.setJobType(streamGraph.getJobType());jobGraph.setDynamic(streamGraph.isDynamic());jobGraph.enableApproximateLocalRecovery(streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());// Generate deterministic hashes for the nodes in order to identify them across// submission iff they didn't change.//创建用于标识各节点的哈希值Map<Integer, byte[]> hashes =defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);// Generate legacy version hashes for backwards compatibilityList<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());for (StreamGraphHasher hasher : legacyStreamGraphHashers) {legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));}//把可链的节点链接起来setChaining(hashes, legacyHashes);if (jobGraph.isDynamic()) {setVertexParallelismsForDynamicGraphIfNecessary();}// Note that we set all the non-chainable outputs configuration here because the// "setVertexParallelismsForDynamicGraphIfNecessary" may affect the parallelism of job// vertices and partition-reusefinal Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs =new HashMap<>();//对每个不能chain的输出进行序列化        setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs);//遍历不可链的JobVertex生成JobEdgesetAllVertexNonChainedOutputsConfigs(opIntermediateOutputs);//设置PhysicalEdges,将每个JobVertex的入边集合也序列化到该JobVertex的StreamConfig中setPhysicalEdges();markSupportingConcurrentExecutionAttempts();validateHybridShuffleExecuteInBatchMode();//为每个 JobVertex 设置所属的 SlotSharingGroupsetSlotSharingAndCoLocation();setManagedMemoryFraction(Collections.unmodifiableMap(jobVertices),Collections.unmodifiableMap(vertexConfigs),Collections.unmodifiableMap(chainedConfigs),id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());//配置checkpointconfigureCheckpointing();//配置savepointRestorejobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =JobGraphUtils.prepareUserArtifactEntries(streamGraph.getUserArtifacts().stream().collect(Collectors.toMap(e -> e.f0, e -> e.f1)),jobGraph.getJobID());for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :distributedCacheEntries.entrySet()) {jobGraph.addUserArtifact(entry.getKey(), entry.getValue());}//配置ExecutionConfig// set the ExecutionConfig last when it has been finalizedtry {jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());} catch (IOException e) {throw new IllegalConfigurationException("Could not serialize the ExecutionConfig."+ "This indicates that non-serializable types (like custom serializers) were registered");}jobGraph.setJobConfiguration(streamGraph.getJobConfiguration());addVertexIndexPrefixInVertexName();setVertexDescription();//对每个链头节点,序列化其算子链,放到节点配置中// Wait for the serialization of operator coordinators and stream config.try {FutureUtils.combineAll(vertexConfigs.values().stream().map(config ->config.triggerSerializationAndReturnFuture(serializationExecutor)).collect(Collectors.toList())).get();waitForSerializationFuturesAndUpdateJobVertices();} catch (Exception e) {throw new FlinkRuntimeException("Error in serialization.", e);}if (!streamGraph.getJobStatusHooks().isEmpty()) {jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());}//返回创建好的JobGraphreturn jobGraph;
}

2.创建JobEdge边与IntermediateDataSet数据集

       JobEdge与IntermediateDataSet的创建主要是通过遍历每个JobVertex链头节点所有不可链的StreamEdge,把每个不可链的边创建成IntermediateDataSet和JobEdge。

        其中从setAllVertexNonChainedOutputsConfigs(opIntermediateOutputs)进入StreamingJobGraphGenerator.setAllVertexNonChainedOutputsConfigs()方法,开始进入JobEdge边与IntermediateDataSet数据集创建的源码。   

源码图解:

        StreamingJobGraphGenerator.setAllVertexNonChainedOutputsConfigs()方法首先对所有JobVertex进行遍历。

StreamingJobGraphGenerator.setAllVertexNonChainedOutputsConfigs()方法源码:

private void setAllVertexNonChainedOutputsConfigs(final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs) {//对每个JobVertex遍历,生成JobEdge jobVertices.keySet().forEach(startNodeId ->setVertexNonChainedOutputsConfig(startNodeId,vertexConfigs.get(startNodeId),chainInfos.get(startNodeId).getTransitiveOutEdges(),opIntermediateOutputs));
}

        对每个JobVertex节点,遍历其不可链的StreamEdge边,调用connect()进行边生成。

StreamingJobGraphGenerator.setVertexNonChainedOutputsConfig()方法源码:

private void setVertexNonChainedOutputsConfig(Integer startNodeId,StreamConfig config,List<StreamEdge> transitiveOutEdges,final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs) {//遍历该JobVertex所有不可链的StreamEdge,调用connect()进行边生成LinkedHashSet<NonChainedOutput> transitiveOutputs = new LinkedHashSet<>();for (StreamEdge edge : transitiveOutEdges) {NonChainedOutput output = opIntermediateOutputs.get(edge.getSourceId()).get(edge);transitiveOutputs.add(output);//调用connect()connect(startNodeId, edge, output);}
}

        StreamingJobGraphGenerator.connect()方法获取了当前JobVertex节点与下游JobVertex节点,获取节点的获取patitioner类型与shuffle模式,根据partitioner连接类型时POINTWISE还是ALL_TO_ALL具体生成JobEdge。

StreamingJobGraphGenerator.connect()方法源码:

private void connect(Integer headOfChain, StreamEdge edge, NonChainedOutput output) {//在physicalEdgesInOrder中添加本edgephysicalEdgesInOrder.add(edge);Integer downStreamVertexID = edge.getTargetId();//找到本JobVertex的链头JobVertex headVertex = jobVertices.get(headOfChain);//获取本边连接的下游JobVertexJobVertex downStreamVertex = jobVertices.get(downStreamVertexID);//配置下游JobVertex的input加一StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());downStreamConfig.setNumberOfNetworkInputs(downStreamConfig.getNumberOfNetworkInputs() + 1);//获取patitioner类型与shuffle模式StreamPartitioner<?> partitioner = output.getPartitioner();ResultPartitionType resultPartitionType = output.getPartitionType();checkBufferTimeout(resultPartitionType, edge);//根据partitioner连接类型具体生成JobEdgeJobEdge jobEdge;if (partitioner.isPointwise()) {jobEdge =downStreamVertex.connectNewDataSetAsInput(headVertex,DistributionPattern.POINTWISE,resultPartitionType,output.getDataSetId(),partitioner.isBroadcast());} else {jobEdge =downStreamVertex.connectNewDataSetAsInput(headVertex,DistributionPattern.ALL_TO_ALL,resultPartitionType,output.getDataSetId(),partitioner.isBroadcast());}//配置JobEdge// set strategy name so that web interface can show it.jobEdge.setShipStrategyName(partitioner.toString());jobEdge.setForward(partitioner instanceof ForwardPartitioner);jobEdge.setDownstreamSubtaskStateMapper(partitioner.getDownstreamSubtaskStateMapper());jobEdge.setUpstreamSubtaskStateMapper(partitioner.getUpstreamSubtaskStateMapper());if (LOG.isDebugEnabled()) {LOG.debug("CONNECTED: {} - {} -> {}",partitioner.getClass().getSimpleName(),headOfChain,downStreamVertexID);}
}

        JobVertex.connectNewDataSetAsInput()为具体创建JobEdge的方法,在方法中分别创建了IntermediateDataSet与JobEdge实例,并用JobEdge连接JobVertex与IntermediateDataSet。

JobVertex.connectNewDataSetAsInput()方法源码:

public JobEdge connectNewDataSetAsInput(JobVertex input,DistributionPattern distPattern,ResultPartitionType partitionType,IntermediateDataSetID intermediateDataSetId,boolean isBroadcast) {//生成IntermediateDataSetIntermediateDataSet dataSet =input.getOrCreateResultDataSet(intermediateDataSetId, partitionType);//生成JobEdgeJobEdge edge = new JobEdge(dataSet, this, distPattern, isBroadcast);//把JobEdge连接上JobVertex与IntermediateDataSetthis.inputs.add(edge);dataSet.addConsumer(edge);return edge;
}

        其中IntermediateDataSet的创建在JobVertex.connectNewDataSetAsInput()方法中。

JobVertex.connectNewDataSetAsInput()方法源码:

public IntermediateDataSet getOrCreateResultDataSet(IntermediateDataSetID id, ResultPartitionType partitionType) {return this.results.computeIfAbsent(//创建IntermediateDataSetid, key -> new IntermediateDataSet(id, partitionType, this));
}

3.序列化算子计算逻辑、算子链、链输出、链内部边

        JobGraph为Flink计算逻辑图,需要保存算子的计算信息,并从Flink客户端发送到Yarn上的Flink集群端,进行分布式执行。因此,在发送JobGraph前,需在每个JobVertex节点序列化JobVertex的计算逻辑、JobVertex的算子链、链输出和链内部边。

在StreamingJobGraphGenerator.createChain()方法下

        首先在StreamingJobGraphGenerator.createChain()递归每个节点创建算子链时,会将节点的封装计算逻辑的StreamOperatorFactory和其可链边ChainedOutputs放入保存其需要序列化信息的Map:toBeSerializedConfigObjects中,并在后面对其进行序列化。

源码图解:

StreamingJobGraphGenerator.createChain()方法源码

private List<StreamEdge> createChain(final Integer currentNodeId,final int chainIndex,final OperatorChainInfo chainInfo,final Map<Integer, OperatorChainInfo> chainEntryPoints) {//递归创建算子链...//...//方法内包含配置StreamOperatorFactory的序列化的代码setOperatorConfig(currentNodeId, config, chainInfo.getChainedSources());//配置不可链边的序列化setOperatorChainedOutputsConfig(config, chainableOutputs);//...
}

算子计算逻辑序列化

        在StreamingJobGraphGenerator.setOperatorConfig()方法中执行setStreamOperatorFactory(vertex.getOperatorFactory())配置StreamOperatorFactory的序列化。

StreamingJobGraphGenerator.setOperatorConfig()方法源码:

private void setOperatorConfig(Integer vertexId, StreamConfig config, Map<Integer, ChainedSourceInfo> chainedSources) {//...//配置StreamOperatorFactory序列化config.setStreamOperatorFactory(vertex.getOperatorFactory());//...
}

        StreamConfig.setStreamOperatorFactory()方法配置了StreamOperatorFactory的序列化,把StreamOperatorFactory放入保存序列化信息的Map:toBeSerializedConfigObjects中,并在后续进行具体的序列化操作。

StreamConfig.setStreamOperatorFactory()方法源码:

public void setStreamOperatorFactory(StreamOperatorFactory<?> factory) {if (factory != null) {//配置OperatorFactory的序列化toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory);toBeSerializedConfigObjects.put(SERIALIZED_UDF_CLASS, factory.getClass());}
}

算子可链边的序列化

        在StreamingJobGraphGenerator.setOperatorConfig()方法中执行setOperatorChainedOutputsConfig(config, chainableOutputs)配置节点可链边ChainedOutputs的序列化。

StreamingJobGraphGenerator.setOperatorChainedOutputsConfig方法源码:

private void setOperatorChainedOutputsConfig(StreamConfig config, List<StreamEdge> chainableOutputs) {//为每个边创建序列化器for (StreamEdge edge : chainableOutputs) {if (edge.getOutputTag() != null) {config.setTypeSerializerSideOut(edge.getOutputTag(),edge.getOutputTag().getTypeInfo().createSerializer(streamGraph.getExecutionConfig().getSerializerConfig()));}}//配置可链边的序列化config.setChainedOutputs(chainableOutputs);
}

        StreamConfig.setStreamOperatorFactory()方法配置了节点可链边ChainedOutputs的序列化,把可链边ChainedOutputs放入保存序列化信息的Map:toBeSerializedConfigObjects中,并在后续进行具体的序列化操作。

StreamConfig.setChainedOutputs()方法源码:

public void setChainedOutputs(List<StreamEdge> chainedOutputs) {//配置可链边的序列化toBeSerializedConfigObjects.put(CHAINED_OUTPUTS, chainedOutputs);
}

在StreamingJobGraphGenerator.createJobGraph()方法下

        StreamingJobGraphGenerator的createJobGraph()方法为构造JobGraph主要方法,当在进行JobGraph创建时,会遍历JobGraph中所有节点的不可链的输出进行序列化,并把算子链的数据都序列化到链头的JobVertex节点的配置中,序列化每个JobVertex中toBeSerializedConfigObjects定义的配置项,完成JobGraph最终的序列化。

源码图解:

StreamingJobGraphGenerator.createJobGraph()方法源码:

private JobGraph createJobGraph() {//...//创建算子链与每个JobVertex链头节点setChaining(hashes, legacyHashes);//配置所有不可链边的序列化setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs);//...//对每个节点序列化其配置与算子链,完成最终的序列化// Wait for the serialization of operator coordinators and stream config.try {FutureUtils.combineAll(vertexConfigs.values().stream().map(config ->config.triggerSerializationAndReturnFuture(serializationExecutor)).collect(Collectors.toList())).get();waitForSerializationFuturesAndUpdateJobVertices();} catch (Exception e) {throw new FlinkRuntimeException("Error in serialization.", e);}//...//最终把创建好的JobGraph进行返回return jobGraph;
}

序列化不可链输出:

        在StreamingJobGraphGenerator.createJobGraph()方法中执行了setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs)对JobGraph中所有不可链输出配置了序列化。StreamingJobGraphGenerator的setAllOperatorNonChainedOutputsConfigs()方法对所有不可链输出进行遍历,把不可链输出配置到其前置节点的需要序列化的配置中。

StreamingJobGraphGenerator.setAllOperatorNonChainedOutputsConfigs()方法源码:

private void setAllOperatorNonChainedOutputsConfigs(final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs) {// set non chainable output config//遍历每个不可链的输出opNonChainableOutputsCache.forEach((vertexId, nonChainableOutputs) -> {Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge =opIntermediateOutputs.computeIfAbsent(vertexId, ignored -> new HashMap<>());//序列化到他前一个节点的配置中setOperatorNonChainedOutputsConfig(vertexId,vertexConfigs.get(vertexId),nonChainableOutputs,outputsConsumedByEdge);});
}

        setOperatorNonChainedOutputsConfig()方法创建了序列化器,并把不可链输出配置在前置节点的序列化配置中。

StreamingJobGraphGenerator.setOperatorNonChainedOutputsConfig()方法源码:

private void setOperatorNonChainedOutputsConfig(Integer vertexId,StreamConfig config,List<StreamEdge> nonChainableOutputs,Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge) {//为每个输出创建序列化器// iterate edges, find sideOutput edges create and save serializers for each outputTag typefor (StreamEdge edge : nonChainableOutputs) {if (edge.getOutputTag() != null) {config.setTypeSerializerSideOut(edge.getOutputTag(),edge.getOutputTag().getTypeInfo().createSerializer(streamGraph.getExecutionConfig().getSerializerConfig()));}}//序列化到他前一个节点的配置中List<NonChainedOutput> deduplicatedOutputs =mayReuseNonChainedOutputs(vertexId, nonChainableOutputs, outputsConsumedByEdge);config.setNumberOfOutputs(deduplicatedOutputs.size());config.setOperatorNonChainedOutputs(deduplicatedOutputs);
}

         StreamConfig.setStreamOperatorFactory()方法配置了不可链边NonChainedOutput的序列化,把NonChainedOutput放入保存序列化信息的Map:toBeSerializedConfigObjects中,并在后续进行具体的序列化操作。

StreamConfig.setOperatorNonChainedOutputs方法源码:

public void setOperatorNonChainedOutputs(List<NonChainedOutput> nonChainedOutputs) {//进行序列化toBeSerializedConfigObjects.put(OP_NONCHAINED_OUTPUTS, nonChainedOutputs);
}

序列化算子链并最终执行序列化

       在StreamingJobGraphGenerator.createJobGraph()方法中,遍历了JobGraph中的每个JobVertex节点,对每个链头节点的算子链和配置信息进行序列化。

//对每个链头节点,序列化其算子链,放到节点配置中
FutureUtils.combineAll(vertexConfigs.values().stream().map(config ->config.triggerSerializationAndReturnFuture(serializationExecutor)).collect(Collectors.toList())).get();waitForSerializationFuturesAndUpdateJobVertices();

        在StreamConfig的triggerSerializationAndReturnFuture()方法中,首先先调用serializeAllConfigs()方法对JobVertex节点的配置进行了序列化,再序列化了JobVertex节点的算子链。

StreamConfig.triggerSerializationAndReturnFuture()方法源码:

public CompletableFuture<StreamConfig> triggerSerializationAndReturnFuture(Executor ioExecutor) {FutureUtils.combineAll(chainedTaskFutures.values()).thenAcceptAsync(chainedConfigs -> {try {// Serialize all the objects to config.//序列化节点的StreamConfigserializeAllConfigs();//将算子链配置写入对应节点InstantiationUtil.writeObjectToConfig(chainedConfigs.stream().collect(Collectors.toMap(StreamConfig::getVertexID,Function.identity())),this.config,CHAINED_TASK_CONFIG);serializationFuture.complete(this);} catch (Throwable throwable) {serializationFuture.completeExceptionally(throwable);}},ioExecutor);return serializationFuture;
}

      StreamConfig.serializeAllConfigs()方法将之前需要序列化的Map:toBeSerializedConfigObjects配置逐项遍历,进行序列化。包含对上述解析的封装节点计算逻辑的StreamOperatorFactory、节点可链边ChainedOutputs、节点不可链输出NonChainedOutput进行具体的序列化操作。

StreamConfig.serializeAllConfigs()方法源码:

public void serializeAllConfigs() {toBeSerializedConfigObjects.forEach((key, object) -> {try {//将需要序列化的配置进行序列化InstantiationUtil.writeObjectToConfig(object, this.config, key);} catch (IOException e) {throw new StreamTaskException(String.format("Could not serialize object for key %s.", key), e);}});
}

      InstantiationUtil.writeObjectToConfig()方法为具体的序列化方法,将配置对象序列化成字节数组,并把序列化后的数据写入JobVertex的配置中。

InstantiationUtil.writeObjectToConfig()方法源码:

public static void writeObjectToConfig(Object o, Configuration config, String key)throws IOException {//序列化对象byte[] bytes = serializeObject(o);//将序列化的结果写入配置中config.setBytes(key, bytes);
}

        自此JobGraph生成已完成,完整的JobGraph结构如下:

完整JobGraph结构:

4.客户端向集群提交任务

         完成JobGraph生成后,Flink CliFrontend客户端通过ClusterClient向HDFS上传JobGraph、Jar、Artifacts,向Yarn的Flink集群提交Request,完成Flink客户端调度,后续将进行Flink的集群端的调度与执行。

源码图解:

       回到AbstractSessionClusterExecutor.execute()方法,当完成了JobGraph生成后,Flink创建了RestClusterClient客户端(ClusterClient的具体实现),向集群提交JobGraph。

AbstractSessionClusterExecutor.execute()方法源码:

public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline,@Nonnull final Configuration configuration,@Nonnull final ClassLoader userCodeClassloader)throws Exception {//生成JobGraph final JobGraph jobGraph =PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);//...//建立ClusterClient,向集群提交JobGraphClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();//Client向集群提交JobGraphclusterClient.submitJob(jobGraph)//...         
}

        RestClusterClient.submitJob()方法为Flink任务提交的具体方法,创建了jobGraph文件并将其写到本地目录,将本地的jobGraph文件、jar、artifact上传到集群的HDFS文件系统,构建JobSubmit的请求体并向Flink集群发送请求。

RestClusterClient.submitJob()方法源码:

public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {//...//创建jobGraph文件(flink-jobgraph-id.bin)java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph-" + jobGraph.getJobID(), ".bin");//...                                   //把jobGraph文件写到本地文件目录ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {objectOut.writeObject(jobGraph);}//...  //将本地的jobGraph文件、jar、artifact上传hdfsfilesToUpload.add( new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));filesToUpload.add(new FileUpload(Paths.get(jar.toUri()),RestConstants.CONTENT_TYPE_JAR));filesToUpload.add(new FileUpload(Paths.get(artifactFilePath.getPath()),RestConstants.CONTENT_TYPE_BINARY));//...//构建JobSubmit的请求体final JobSubmitRequestBody requestBody =new JobSubmitRequestBody(jobGraphFile.getFileName().toString(),jarFileNames,artifactFileNames);//...//向集群发送JobSubmit请求sendRetriableRequest(JobSubmitHeaders.getInstance(),EmptyMessageParameters.getInstance(),requestAndFileUploads.f0,requestAndFileUploads.f1,isConnectionProblemOrServiceUnavailable(),(receiver, error) -> {//异常处理});//...//删除本地生成的jobGraph文件Files.delete(jobGraphFile);
}

        RestClusterClient.sendRetriableRequest()为请求发送的方法,最终RestClient客户端通过向Yarn中Flink集群发送请求,完成Flink的客户端调度并开启了Flink的集群端调度。

RestClusterClient.sendRetriableRequest()方法源码:

private <M extends MessageHeaders<R, P, U>,U extends MessageParameters,R extends RequestBody,P extends ResponseBody>CompletableFuture<P> sendRetriableRequest(M messageHeaders,U messageParameters,R request,Collection<FileUpload> filesToUpload,Predicate<Throwable> retryPredicate,BiConsumer<String, Throwable> consumer) {//...restClient.sendRequest(webMonitorBaseUrl.getHost(),webMonitorBaseUrl.getPort(),headers,messageParameters,request,filesToUpload);}

        至此,JobGraph生成及Flink客户端调度部分已完成,JobGraph生成的完整代码解析和完整的JobGraph结构如下:

完整代码解析:

完整JobGraph结构:

5.结语

        本文及上篇博文《Flink-1.19.0源码详解5-JobGraph生成-前篇》已完整解析了JobGraph的生成源码,Flink客户端在JobGraph生成后,把调度请求和程序Jar包及依赖向Yarn中的Flink集群提交,完成Flink客户端调度,下篇博文将开始解析Flink集群端调度,继续解析ExecutionGraph生成与Task执行的源码。

http://www.dtcms.com/a/271050.html

相关文章:

  • AJAX总结
  • Flink1.20.1集成Paimon遇到的问题
  • Electron 应用打包全指南
  • 机器学习模型在C++平台的部署
  • 基于 Redis 实现高并发滑动窗口限流:Java实战与深度解析
  • 开始读 PostgreSQL 16 Administration Cookbook
  • 深度学习 最简单的神经网络 线性回归网络
  • ArtifactsBench: 弥合LLM 代码生成评估中的视觉交互差距
  • 论文解析篇 | YOLOv12:以注意力机制为核心的实时目标检测算法
  • 腾讯云COS,阿里云OSS对象存储服务-删除操作的响应码204
  • 汽车智能化2.0引爆「万亿蛋糕」,谁在改写游戏规则?
  • 通用游戏前端架构设计思考
  • VSCode配置Cline插件调用MCP服务实现任务自动化
  • 旅游管理实训室建设的关键要点探讨
  • 向量空间 线性代数
  • 软件测试偏技术方向学习路线是怎样的?
  • 安装nvm管理node.js,详细安装使用教程和详细命令
  • Spring Boot微服务中集成gRPC实践经验分享
  • 【每日算法】专题六_模拟
  • 全球发展币GDEV:从中国出发,走向全球的数字发展合作蓝图
  • 2 STM32单片机-蜂鸣器驱动
  • 【vLLM 学习】Eagle
  • oracle ocp题库有多少道题,以及题库背诵技巧
  • Context Engineering:从Prompt Engineering到上下文工程的演进
  • 破局电机制造四大痛点:MES与AI视觉的协同智造实践
  • 基于SD-WAN的管件制造数字化产线系统集成方案
  • 中山排气歧管批量自动化智能化3D尺寸测量及cav检测分析
  • 什么是幂等
  • clickhouse 各个引擎适用的场景
  • 飞算 JavaAI 智能编程助手 - 重塑编程新模态