当前位置: 首页 > news >正文

Flink-1.19.0源码详解5-JobGraph生成-前篇

        Flink是Apache软件基金会下开源的分布式流批一体计算框架,具备实时流计算和高吞吐批处理计算的大数据计算能力。本专栏内容为Flink源码解析的记录与分享。

        本文解析的Kafka源码版本为:flink-1.19.0

1.JobGraph生成功能概述

        在上文《Flink-1.19.0源码详解4-StreamGraph生成》中,已介绍了Flink StreamGraph的生成,解析了Flink遍历Transformation集合,逐步生成StreamNode与StreamEdge,构建StreamGraph图的完整源码的过程。在完成 StreamGraph的生成后,Flink会执行StreamExecutionEnvironmentd的execute()方法,开始进行JobGraph生成。

        本文从Flink执行StreamExecutionEnvironmentd.execute()方法开始,解析Flink JobGraph生成的源码(内容为下流程图的红色部分),解析Flink遍历StreamGraph每个StreamNode节点,逐步生成JobVertex节点、JobEdge边和IntermediateDataSet数据集,逐步构建JobGraph图的完整源码。

        JobGraph生成的本质是生成算子链把可链接的StreamNode节点链接起来成为一个JobVertex节点,集约算子内部计算,减少算子间的网络传输,形成计算更高效的JobGraph图。

         Flink的JobGraph生成主要是通过遍历StreamGraph中的每个StreamNode节点,把可链接的StreamNode节点链接成一个JobVertex节点,把StreamEdge边转换为记录连接关系的JobEdge边和记录节点结果输出的IntermediateDataSet,并把数据处理逻辑、算子链信息、内部可链的边、节点不可链的输出都序列化在JobVertex节点的configuration配置中,形成可被Yarn中Flink集群的JobMaster调度的数据处理流图。

JobGraph生成的具体步骤如下:

        1.递归调用生成算子链:从Source节点向Sink递归,检查每个输出边是否可链,逐步合并可链的节点,创建Chain算子链。

        2.JobVertex节点创建:如果递归节点本次为链头节点,则创建JobVertex节点封装算子信息与计算逻辑,并在StreamingJobGraphGenerator的数据结构中保存节点和算子链信息。

        3.创建JobEdge边:遍历每个JobVertex链头节点所有不可链的StreamEdge,创建IntermediateDataSet和JobEdge。

        4.序列化算子链、链输出、链内部边:取出每个链头的算子链信息,序列化算子链信息并添加到JobVertex节点的configuration配置中。

        5.最终生成完整的JobGraph:最终递归完所有的StreamNode,依次生成JobVertex、JobEdge和IntermediateDataSet,构建完整的JobGraph图。

JobGraph生成源码图解:

完整代码解析:

2.StreamExecutionEnvironment.execute()执行

      当Flink完成StreamGraph生成后,继续调用StreamExecutionEnvironment.execute()方法执行StreamGraph继续进行JobGraph生成。

源码图解:



StreamExecutionEnvironment.execute()方法源码:

public JobExecutionResult execute(String jobName) throws Exception {final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations);//创建StreamGraphStreamGraph streamGraph = getStreamGraph();//...//执行StreamGraph进行JobGraph调度return execute(streamGraph);
}

        又进行两级调用,最终进入AbstractSessionClusterExecutor.execute()方法。

StreamExecutionEnvironment.execute()方法源码:

public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {//继续调用executeAsync()final JobClient jobClient = executeAsync(streamGraph);//...
}

StreamExecutionEnvironment.executeAsync()方法源码:

public JobClient executeAsync(StreamGraph streamGraph) throws Exception {//...//由Executor异步执行StreamGraph,Executor会根据不同部署模式有具体实现,以AbstractSessionClusterExecutor为例CompletableFuture<JobClient> jobClientFuture =executor.execute(streamGraph, configuration, userClassloader);//...
}

        由Executor异步执行StreamGraph,Executor会根据不同部署模式有具体实现,以AbstractSessionClusterExecutor(Flink的Yarn Session模式)为例,进入AbstractSessionClusterExecutor.execute()方法。

        AbstractSessionClusterExecutor.execute()方法是Flink客户端调度的关键方法,主要是调用PipelineExecutorUtils.getJobGraph()方法生成了JobGraph,创建ClusterClient并通过ClusterClient向Yarn中的Flink集群提交JobGraph。

AbstractSessionClusterExecutor.execute()方法源码:

public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline,@Nonnull final Configuration configuration,@Nonnull final ClassLoader userCodeClassloader)throws Exception {//生成JobGraph final JobGraph jobGraph =PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);//...//建立ClusterClient,向集群提交JobGraphClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();//Client向集群提交JobGraphclusterClient.submitJob(jobGraph)//...         
}

3.进入JobGraph生成逻辑

        AbstractSessionClusterExecutor.execute()方法最终调用PipelineExecutorUtils.getJobGraph()方法进入JobGraph生成逻辑。

源码图解:

        首先PipelineExecutorUtils获取了Flink执行的配置,再通过FlinkPipelineTranslationUtil的getJobGraph()方法生成JobGraph,最后再对JobGraph配置Jar、ClassPath、Savepoint。

PipelineExecutorUtils.getJobGraph()方法源码:

public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline,@Nonnull final Configuration configuration,@Nonnull ClassLoader userClassloader)throws MalformedURLException {checkNotNull(pipeline);checkNotNull(configuration);//获取执行配置访问器(封装配置信息)final ExecutionConfigAccessor executionConfigAccessor =ExecutionConfigAccessor.fromConfiguration(configuration);//由FlinkPipelineTranslationUtil生成JobGraph       final JobGraph jobGraph =FlinkPipelineTranslationUtil.getJobGraph(userClassloader,pipeline,configuration,executionConfigAccessor.getParallelism());//...//对JobGraph配置Jar、ClassPath、SavepointjobGraph.addJars(executionConfigAccessor.getJars());jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());//返回jobGraphreturn jobGraph;
}

      在JobGraph生成中,FlinkPipelineTranslationUtil.getJobGraph()又经历几层调用,最终通过创建StreamingJobGraphGenerator,并通过StreamingJobGraphGenerator生成JobGraph。

FlinkPipelineTranslationUtil.getJobGraph()方法源码:

public static JobGraph getJobGraph(ClassLoader userClassloader,Pipeline pipeline,Configuration optimizerConfiguration,int defaultParallelism) {//获取Pipeline翻译器(Pipeline指StreamGraph)FlinkPipelineTranslator pipelineTranslator =getPipelineTranslator(userClassloader, pipeline);//由pipelineTranslator生成JobGraph,PipelineTranslator有多个实现,流计算选 //择:StreamGraphTranslatorJobGraph jobGraph =pipelineTranslator.translateToJobGraph(pipeline, optimizerConfiguration, defaultParallelism);optimizerConfiguration.getOptional(PipelineOptions.PARALLELISM_OVERRIDES).ifPresent(map ->jobGraph.getJobConfiguration().set(PipelineOptions.PARALLELISM_OVERRIDES, map));return jobGraph;
}

        PipelineTranslator有多个实现,流计算选则StreamGraphTranslator,批处理为FlinkPipelineTranslator,本文分析流计算JobGraph生成逻辑,进入StreamGraphTranslator.translateToJobGraph()方法。

        StreamGraphTranslator.translateToJobGraph()方法调用了StreamGraph.getJobGraph()方法生成JobGraph。

StreamGraphTranslator.translateToJobGraph()方法源码:

public JobGraph translateToJobGraph(Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) {checkArgument(pipeline instanceof StreamGraph, "Given pipeline is not a DataStream StreamGraph.");//获取StreamGraphStreamGraph streamGraph = (StreamGraph) pipeline;//调用StreamGraph.getJobGraph生成JobGraphreturn streamGraph.getJobGraph(userClassloader, null);
}

        StreamGraph.getJobGraph()方法由调用了StreamingJobGraphGenerator.createJobGraph()方法生成JobGraph。

StreamGraph.getJobGraph()方法源码:

public JobGraph getJobGraph(ClassLoader userClassLoader, @Nullable JobID jobID) {//最终通过StreamingJobGraphGenerator生成JobGraphreturn StreamingJobGraphGenerator.createJobGraph(userClassLoader, this, jobID);
}

       最终StreamingJobGraphGenerator.createJobGraph()方法创建StreamingJobGraphGenerator实例,最终通过StreamingJobGraphGenerator实例生成JobGraph。

StreamingJobGraphGenerator.createJobGraph()方法源码:

public static JobGraph createJobGraph(ClassLoader userClassLoader, StreamGraph streamGraph, @Nullable JobID jobID) {// TODO Currently, we construct a new thread pool for the compilation of each job. In the// future, we may refactor the job submission framework and make it reusable across jobs.//定义用于序列化的线程池final ExecutorService serializationExecutor =Executors.newFixedThreadPool(Math.max(1,Math.min(Hardware.getNumberCPUCores(),streamGraph.getExecutionConfig().getParallelism())),new ExecutorThreadFactory("flink-operator-serialization-io"));try { //创建StreamingJobGraphGenerator实例调用其createJobGraph()生成JobGraphreturn new StreamingJobGraphGenerator(userClassLoader, streamGraph, jobID, serializationExecutor).createJobGraph();} finally {serializationExecutor.shutdown();}
}

4.StreamingJobGraphGenerator生成JobGraph

        StreamingJobGraphGenerator的createJobGraph()方法为JobGraph生成的具体方法,生成JobGraph详细逻辑如下图的源码图解。

源码图解:

        首先分析StreamingJobGraphGenerator的createJobGraph()方法,createJobGraph()方法包含了所有创建JobGraph的关键步骤,首先创建了算子链把可链的节点链起来并创建JobVertex,并对不可链的输出创建JobEdge连接前后JobVertex,将算子信息序列化到JobVertex中,并最终配置JobGraph的SlotSharingGroup、Checkpoint、SavepointRestore、ExecutionConfig完成JobGraph的创建。

源码图解:

StreamingJobGraphGenerator.createJobGraph()方法源码:

private JobGraph createJobGraph() {//验证Checkpoint以及配置JobType、DynamicpreValidate();jobGraph.setJobType(streamGraph.getJobType());jobGraph.setDynamic(streamGraph.isDynamic());jobGraph.enableApproximateLocalRecovery(streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());// Generate deterministic hashes for the nodes in order to identify them across// submission iff they didn't change.//创建用于标识各节点的哈希值Map<Integer, byte[]> hashes =defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);// Generate legacy version hashes for backwards compatibilityList<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());for (StreamGraphHasher hasher : legacyStreamGraphHashers) {legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));}//把可链的节点链接起来setChaining(hashes, legacyHashes);if (jobGraph.isDynamic()) {setVertexParallelismsForDynamicGraphIfNecessary();}// Note that we set all the non-chainable outputs configuration here because the// "setVertexParallelismsForDynamicGraphIfNecessary" may affect the parallelism of job// vertices and partition-reusefinal Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs =new HashMap<>();//对每个不能chain的输出进行序列化        setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs);//遍历不可链的JobVertex生成JobEdgesetAllVertexNonChainedOutputsConfigs(opIntermediateOutputs);//设置PhysicalEdges,将每个JobVertex的入边集合也序列化到该JobVertex的StreamConfig中setPhysicalEdges();markSupportingConcurrentExecutionAttempts();validateHybridShuffleExecuteInBatchMode();//为每个 JobVertex 设置所属的 SlotSharingGroupsetSlotSharingAndCoLocation();setManagedMemoryFraction(Collections.unmodifiableMap(jobVertices),Collections.unmodifiableMap(vertexConfigs),Collections.unmodifiableMap(chainedConfigs),id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());//配置checkpointconfigureCheckpointing();//配置savepointRestorejobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =JobGraphUtils.prepareUserArtifactEntries(streamGraph.getUserArtifacts().stream().collect(Collectors.toMap(e -> e.f0, e -> e.f1)),jobGraph.getJobID());for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :distributedCacheEntries.entrySet()) {jobGraph.addUserArtifact(entry.getKey(), entry.getValue());}//配置ExecutionConfig// set the ExecutionConfig last when it has been finalizedtry {jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());} catch (IOException e) {throw new IllegalConfigurationException("Could not serialize the ExecutionConfig."+ "This indicates that non-serializable types (like custom serializers) were registered");}jobGraph.setJobConfiguration(streamGraph.getJobConfiguration());addVertexIndexPrefixInVertexName();setVertexDescription();//对每个链头节点,序列化其算子链,放到节点配置中// Wait for the serialization of operator coordinators and stream config.try {FutureUtils.combineAll(vertexConfigs.values().stream().map(config ->config.triggerSerializationAndReturnFuture(serializationExecutor)).collect(Collectors.toList())).get();waitForSerializationFuturesAndUpdateJobVertices();} catch (Exception e) {throw new FlinkRuntimeException("Error in serialization.", e);}if (!streamGraph.getJobStatusHooks().isEmpty()) {jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());}//返回创建好的JobGraphreturn jobGraph;
}

5.递归创建算子链

        StreamingJobGraphGenerator的createJobGraph()方法中,setChaining(hashes, legacyHashes)为JobGraph生成的关键步骤:递归创建算子链和JobVertex节点。

        Flink会从Source节点向Sink递归StreamGraph,检查每个StreamNode节点的输出边是否可链,逐步合并可链的节点,创建Chain算子链。

源码图解:

       在创建算子链的StreamingJobGraphGenerator.setChaining()方法中,Flink取出所有source算子,从source到sink递归遍历所有算子,对可链接的算子进行chain操作。

StreamingJobGraphGenerator.setChaining()方法源码:

private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {// we separate out the sources that run as inputs to another operator (chained inputs)// from the sources that needs to run as the main (head) operator.//取出所有source算子final Map<Integer, OperatorChainInfo> chainEntryPoints =buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);//根据key排序并将value转为list     final Collection<OperatorChainInfo> initialEntryPoints =chainEntryPoints.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getKey)).map(Map.Entry::getValue).collect(Collectors.toList());//从source到sink递归遍历所有算子,对可链接的算子进行chain操作// iterate over a copy of the values, because this map gets concurrently modifiedfor (OperatorChainInfo info : initialEntryPoints) {createChain(info.getStartNodeId(),1, // operators start at position 1 because 0 is for chained source inputsinfo,chainEntryPoints);}
}

        从source到sink递归遍历所有算子进行算子链创建是通过StreamingJobGraphGenerator的createChain()方法进行的。

        StreamingJobGraphGenerator会遍历当前StreamNode结点的每个output出边,判断是否可chain链接,对于可链的结点,建立chain,对于不可链的结点,建立链头。StreamingJobGraphGenerator会依次从source到sink遍历完所有的StreamNode递归进行Chain。

        当递归到的结点是当次遍历的起始结点,是则调用createJobVertex()生成JobVertex链头节点,否则只生成StreamConfig记录operator。对于JobVertex链头节点,会序列化所有算子链节点的StremConfig,写入JobVertex链头节点的配置中保存。

StreamingJobGraphGenerator.createChain()方法源码:

private List<StreamEdge> createChain(final Integer currentNodeId,final int chainIndex,final OperatorChainInfo chainInfo,final Map<Integer, OperatorChainInfo> chainEntryPoints) {//获取本次遍历的起始节点idInteger startNodeId = chainInfo.getStartNodeId();//如果以遍历过的节点未生成JobVertex 进行chain操作if (!builtVertices.contains(startNodeId)) {//初始化数据结构List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();//获取当前结点StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);//遍历当前结点的每个output出边,判断是否可chain链接for (StreamEdge outEdge : currentNode.getOutEdges()) {if (isChainable(outEdge, streamGraph)) {chainableOutputs.add(outEdge);} else {nonChainableOutputs.add(outEdge);}}//对于可链的结点,建立chain,并把出边加入已转换的出边集合,递归调用createChain()判断下游是否可Chainfor (StreamEdge chainable : chainableOutputs) {transitiveOutEdges.addAll(createChain(chainable.getTargetId(),chainIndex + 1,chainInfo,chainEntryPoints));}//对于不可链的结点,建立链头,并把出边加入已转换的出边集合,递归调用createChain()判断下游是否可Chainfor (StreamEdge nonChainable : nonChainableOutputs) {transitiveOutEdges.add(nonChainable);createChain(nonChainable.getTargetId(),1, // operators start at position 1 because 0 is for chained source inputschainEntryPoints.computeIfAbsent(nonChainable.getTargetId(),(k) -> chainInfo.newChain(nonChainable.getTargetId())),chainEntryPoints);}chainedNames.put(currentNodeId,createChainedName(currentNodeId,chainableOutputs,Optional.ofNullable(chainEntryPoints.get(currentNodeId))));chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));chainedPreferredResources.put(currentNodeId,createChainedPreferredResources(currentNodeId, chainableOutputs));//生成把当前节点的hash加入链头节点的链信息中OperatorID currentOperatorId =chainInfo.addNodeToChain(currentNodeId,streamGraph.getStreamNode(currentNodeId).getOperatorName());if (currentNode.getInputFormat() != null) {getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());}if (currentNode.getOutputFormat() != null) {getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());}//判断当前结点是否是当此遍历的起始结点,是则调用createJobVertex()生成JobVertex,否则只生成StreamConfig记录operatorStreamConfig config =currentNodeId.equals(startNodeId)? createJobVertex(startNodeId, chainInfo): new StreamConfig(new Configuration());//对当前结点进行配置tryConvertPartitionerForDynamicGraph(chainableOutputs, nonChainableOutputs);setOperatorConfig(currentNodeId, config, chainInfo.getChainedSources());//对当前结点的可链边进行配置和序列化setOperatorChainedOutputsConfig(config, chainableOutputs);//缓存未被chain的输出// we cache the non-chainable outputs here, and set the non-chained config lateropNonChainableOutputsCache.put(currentNodeId, nonChainableOutputs);//已完成chain,根据是否是当前结点生成相应的chain配置信息if (currentNodeId.equals(startNodeId)) {记录不可链的边chainInfo.setTransitiveOutEdges(transitiveOutEdges);chainInfos.put(startNodeId, chainInfo);//配置为链头config.setChainStart();config.setChainIndex(chainIndex);config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());//将所有子节点的StremConfig,写入head结点的chainedTaskFutures(Map)中config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));} else {chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());config.setChainIndex(chainIndex);StreamNode node = streamGraph.getStreamNode(currentNodeId);config.setOperatorName(node.getOperatorName());chainedConfigs.get(startNodeId).put(currentNodeId, config);}config.setOperatorID(currentOperatorId);if (chainableOutputs.isEmpty()) {config.setChainEnd();}return transitiveOutEdges;} else {return new ArrayList<>();}
}

        其中,isChainable(outEdge, streamGraph)为判断算子是否可链的代码。StreamingJobGraphGenerator.isChainable()方法首先判断出边为是否为1,然后再继续调用isChainableInput()方法进行进一步判断。

StreamingJobGraphGenerator.isChainable()方法:

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);//为判断是否可链的函数,首先出边为1,然后再看isChainableInput()方法return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph);
}

        StreamingJobGraphGenerator.isChainableInput()方法中,先判断了算子是否配置了允许可链、判断上下算子是否再同一个共享Slot,检查前后算子的ChainingStrategy,前算子的是否为HEAD\ALWAYS、后算子是否为ALWAYS,判断分区器Partitioner是否forward,若都满足,则可把前后两个节点链接起来。

StreamingJobGraphGenerator.isChainableInput()方法:

private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);//streamGraph配置允许Chain if (!(streamGraph.isChainingEnabled()//是否上下游算子在同一个共享slot&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)//检查ChainingStrategy,前算子的是否为HEAD\ALWAYS、后算子是否为ALWAYS&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)//判断分区器Partitioner是否forward&& arePartitionerAndExchangeModeChainable(edge.getPartitioner(), edge.getExchangeMode(), streamGraph.isDynamic()))) {return false;}// check that we do not have a union operation, because unions currently only work// through the network/byte-channel stack.// we check that by testing that each "type" (which means input position) is used only oncefor (StreamEdge inEdge : downStreamVertex.getInEdges()) {if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {return false;}}return true;
}

        回到StreamingJobGraphGenerator.createChain()方法,判断完是否可链后,Flink会继续向下游节点递归创建算子链。对于不可链节点,chainIndex重新被赋值为1,表示新建一条算子链;而可链的节点,chainIndex=chainIndex+1,继续向下游递归创建本条算子链。

StreamingJobGraphGenerator.createChain()方法:

private List<StreamEdge> createChain(final Integer currentNodeId,final int chainIndex,final OperatorChainInfo chainInfo,final Map<Integer, OperatorChainInfo> chainEntryPoints) {//...//遍历当前结点的每个output出边,判断是否可chain链接for (StreamEdge outEdge : currentNode.getOutEdges()) {if (isChainable(outEdge, streamGraph)) {chainableOutputs.add(outEdge);} else {nonChainableOutputs.add(outEdge);}}//对于可链的结点,建立chain,并把出边加入已转换的出边集合,递归调用createChain()判断下游是否可Chainfor (StreamEdge chainable : chainableOutputs) {transitiveOutEdges.addAll(createChain(chainable.getTargetId(),chainIndex + 1,chainInfo,chainEntryPoints));}//对于不可链的结点,建立链头,并把出边加入已转换的出边集合,递归调用createChain()判断下游是否可Chainfor (StreamEdge nonChainable : nonChainableOutputs) {transitiveOutEdges.add(nonChainable);createChain(nonChainable.getTargetId(),1, // operators start at position 1 because 0 is for chained source inputschainEntryPoints.computeIfAbsent(nonChainable.getTargetId(),(k) -> chainInfo.newChain(nonChainable.getTargetId())),chainEntryPoints);}//判断当前结点是否是当次递归的起始结点,是则调用createJobVertex()生成JobVertex,否则只生成StreamConfig记录operatorStreamConfig config =currentNodeId.equals(startNodeId)? createJobVertex(startNodeId, chainInfo): new StreamConfig(new Configuration());//...}

        若本次递归的节点为链头节点(当次递归的起始结点),则调用createJobVertex()创建JobVertex,否则只生成StreamConfig记录operator。

6.创建JobVertex链头节点

        如果递归节点本次为链头节点,则创建JobVertex封装算子信息与计算逻辑,并在StreamingJobGraphGenerator的数据结构中保存节点和算子链信息。

        创建链头节点JobVertex的方法为StreamingJobGraphGenerator.createJobVertex()方法。方法内容为创建JobVertex实例、配置JobVertex、向JobGraph添加JobVertex。

源码图解:

        创建时会判断节点是否是source/sink结点,分别创建InputOutputFormatVertex和普通的JobVertex。

StreamingJobGraphGenerator.createJobVertex()方法源码:

private StreamConfig createJobVertex(Integer streamNodeId, OperatorChainInfo chainInfo) {//..//根据StreamNode是否是source/sink结点,分别创建InputOutputFormatVertex和普通的JobVertex//source/sink结点if (chainedInputOutputFormats.containsKey(streamNodeId)) {//创建InputOutputFormatVertexjobVertex = new InputOutputFormatVertex(chainedNames.get(streamNodeId), jobVertexId, operatorIDPairs);chainedInputOutputFormats.get(streamNodeId).write(new TaskConfig(jobVertex.getConfiguration()));} else {//普通opreator,创建普通JobVertexjobVertex = new JobVertex(chainedNames.get(streamNodeId), jobVertexId, operatorIDPairs);}//..//配置JobVertexjobVertex.addIntermediateDataSetIdToConsume(streamNode.getConsumeClusterDatasetId());jobVertex.setResources(chainedMinResources.get(streamNodeId), chainedPreferredResources.get(streamNodeId));jobVertex.setInvokableClass(streamNode.getJobVertexClass());jobVertex.setParallelism(parallelism);jobVertex.setMaxParallelism(streamNode.getMaxParallelism());jobVertex.setParallelismConfigured(chainInfo.getAllChainedNodes().stream().anyMatch(StreamNode::isParallelismConfigured));//向JobGraph添加JobVertexjobVertices.put(streamNodeId, jobVertex);builtVertices.add(streamNodeId);jobGraph.addVertex(jobVertex);jobVertex.setParallelismConfigured(chainInfo.getAllChainedNodes().stream().anyMatch(StreamNode::isParallelismConfigured));//返回的StreamConfig封装了JobVertex的Configreturn new StreamConfig(jobVertex.getConfiguration());
}

        执行完成StreamingJobGraphGenerator.createJobVertex()方法创建了JobVertex节点后,返回StreamingJobGraphGenerator.createChain()方法,还会对JobVertex进行进一步的配置与序列化。链头节点的算子链信息会记录在链头节点的StreamConfig配置中。

StreamingJobGraphGenerator.createChain()方法源码:

·private List<StreamEdge> createChain(final Integer currentNodeId,final int chainIndex,final OperatorChainInfo chainInfo,final Map<Integer, OperatorChainInfo> chainEntryPoints) {//...//判断当前结点是否是当此遍历的起始结点,是则调用createJobVertex()生成JobVertex,否则只生成StreamConfig记录operatorStreamConfig config =currentNodeId.equals(startNodeId)? createJobVertex(startNodeId, chainInfo): new StreamConfig(new Configuration());//对当前结点进行配置tryConvertPartitionerForDynamicGraph(chainableOutputs, nonChainableOutputs);setOperatorConfig(currentNodeId, config, chainInfo.getChainedSources());//对当前结点的可链边进行配置和序列化setOperatorChainedOutputsConfig(config, chainableOutputs);//缓存未被chain的输出// we cache the non-chainable outputs here, and set the non-chained config lateropNonChainableOutputsCache.put(currentNodeId, nonChainableOutputs);//已完成chain,根据是否是当前结点生成相应的chain配置信息if (currentNodeId.equals(startNodeId)) {记录不可链的边chainInfo.setTransitiveOutEdges(transitiveOutEdges);chainInfos.put(startNodeId, chainInfo);//配置为链头config.setChainStart();config.setChainIndex(chainIndex);config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());//将所有子节点的StremConfig,写入head结点的chainedTaskFutures(Map)中config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));} else {chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());config.setChainIndex(chainIndex);StreamNode node = streamGraph.getStreamNode(currentNodeId);config.setOperatorName(node.getOperatorName());chainedConfigs.get(startNodeId).put(currentNodeId, config);}config.setOperatorID(currentOperatorId);if (chainableOutputs.isEmpty()) {config.setChainEnd();}return transitiveOutEdges;} else {return new ArrayList<>();}
}

7.结语

        自此,本文解析的JobGraph生成源码,已完成了递归创建算子链与JobVertex链头节点生成的源码解析,后篇将继续解析JobGraph的JobEdge边创建、IntermediateDataSet数据集创建、算子链序列化的完整源码。

http://www.dtcms.com/a/264628.html

相关文章:

  • 渐变色的进度条控件
  • 探访国际数字影像产业园 短剧制作发行的全新平台
  • 基于FPGA的ds18b20温度采集
  • 123页满分PPT | 华为流程体系建设与运营华为数字化转型流程解决方案及建设案例
  • ECharts 安装使用教程
  • 分布式事务理论基础及常见解决方案
  • glTF /glb文件深度指南:揭示 3D 可视化的核心
  • ESP32-S3开发板LVGL图形界面开发实战教程
  • 【实战指南】Ubuntu源码部署LNMP生产环境|企业级性能调优方案
  • STEP-BACK PROMPTING:退一步:通过抽象在大型语言模型中唤起推理能力
  • Ubuntu-18.04-bionic 的apt的/etc/apt/sources.list 更换国内镜像软件源 笔记250702
  • 【Note】《深入理解Linux内核》 Chapter 5 :内存地址的表示——Linux虚拟内存体系结构详解
  • Minio安装配置,桶权限设置,nginx代理 https minio
  • (nice!!!) (LeetCode 每日一题) 3333. 找到初始输入字符串 II (贪心+动态规划dp+前缀和)
  • 如何解决wordpress批量删除媒体库中的图片很慢甚至卡死问题
  • 音视频会议服务搭建(设计方案-两种集成方案对比)-03
  • U+平台配置免密登录、安装Hadoop配置集群、Spark配置
  • OpenLayers 入门指南【一】:WebGIS基础与OpenLayers概述
  • Chart.js 安装使用教程
  • AI自动化神器-DroidRun使用体验
  • OpenCASCADE学习|点云可视化深度优化指南
  • 【数字后端】- tcbn28hpcplusbwp30p140,标准单元库命名含义
  • 记一次事务中更新与查询数据不一致的问题分析
  • HTTP 协议深入理解
  • Git 分支与远程仓库基础教学总结
  • sudo本地提权漏洞(CVE-2025-32462)
  • S7-1200 PN与G120变频器控制起停及调速PROFINET实现详解
  • 微信小程序能不能获取物联网的上的设备数据
  • 在 proteus8或者proteus 9 中查看 micropython 的 print 输出
  • Redis搭建集群模式