Flink-1.19.0源码详解-番外补充4-JobGraph图
1.JobGraph图
JobGraph是Flink在StreamGraph的基础上合并算子链,优化后生成的计算调度流图,JobGraph描述了Flink作业的数据逻辑结构,并可实现对计算逻辑的序列化,以便后续从Flink客户端发送到Flink集群端进行调度与执行。
JobGraph包含封装计算逻辑的JobVertex节点、保存节点输出中间结果的IntermediateDataSet数据集和连接节点的JobEdge边。
JobGraph图解:
2.JobVertex节点:
JobVertex是JobGraph中的逻辑执行单元,代表了数据处理逻辑中的一个独立算子链操作。每个JobVetex对应StreamGraph中一系列可链接成算子链的StreamNode操作。
其中JobVertex的通过inputs(List<JobEdge>)、results(Map<IntermediateDataSetID, IntermediateDataSet>)保存上游输入的JobEdge与下游输出的IntermediateDataSet,保存了上下游的逻辑关系。算子计算逻辑、算子链、可链的内部边、不可链的输出都序列化到JobVertex中的配置configuration 中了。
JobVertex完整源码:
public class JobVertex implements java.io.Serializable {private static final long serialVersionUID = 1L;private static final String DEFAULT_NAME = "(unnamed vertex)";public static final int MAX_PARALLELISM_DEFAULT = -1;// --------------------------------------------------------------------------------------------// Members that define the structure / topology of the graph// --------------------------------------------------------------------------------------------/** The ID of the vertex. */private final JobVertexID id;/*** The IDs of all operators contained in this vertex.** <p>The ID pairs are stored depth-first post-order; for the forking chain below the ID's would* be stored as [D, E, B, C, A].** <pre>* A - B - D* \ \* C E* </pre>** <p>This is the same order that operators are stored in the {@code StreamTask}.*/private final List<OperatorIDPair> operatorIDs;//输出的IntermediateDataSet/** Produced data sets, one per writer. */private final Map<IntermediateDataSetID, IntermediateDataSet> results = new LinkedHashMap<>();//输入的JobEdge/** List of edges with incoming data. One per Reader. */private final List<JobEdge> inputs = new ArrayList<>();/** The list of factories for operator coordinators. */private final List<SerializedValue<OperatorCoordinator.Provider>> operatorCoordinators =new ArrayList<>();//并行度/** Number of subtasks to split this task into at runtime. */private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;/** Maximum number of subtasks to split this task into a runtime. */private int maxParallelism = MAX_PARALLELISM_DEFAULT;/** The minimum resource of the vertex. */private ResourceSpec minResources = ResourceSpec.DEFAULT;/** The preferred resource of the vertex. */private ResourceSpec preferredResources = ResourceSpec.DEFAULT;//封装序列化计算信息的configuration/** Custom configuration passed to the assigned task at runtime. */private Configuration configuration;//封装计算逻辑的class/** The class of the invokable. */private String invokableClassName;/** Indicates of this job vertex is stoppable or not. */private boolean isStoppable = false;/** Optionally, a source of input splits. */private InputSplitSource<?> inputSplitSource;/*** The name of the vertex. This will be shown in runtime logs and will be in the runtime* environment.*/private String name;/*** Optionally, a sharing group that allows subtasks from different job vertices to run* concurrently in one slot.*/@Nullable private SlotSharingGroup slotSharingGroup;/** The group inside which the vertex subtasks share slots. */@Nullable private CoLocationGroupImpl coLocationGroup;/*** Optional, the name of the operator, such as 'Flat Map' or 'Join', to be included in the JSON* plan.*/private String operatorName;/*** Optional, the description of the operator, like 'Hash Join', or 'Sorted Group Reduce', to be* included in the JSON plan.*/private String operatorDescription;/** Optional, pretty name of the operator, to be displayed in the JSON plan. */private String operatorPrettyName;/*** Optional, the JSON for the optimizer properties of the operator result, to be included in the* JSON plan.*/private String resultOptimizerProperties;/*** The intermediateDataSetId of the cached intermediate dataset that the job vertex consumes.*/private final List<IntermediateDataSetID> intermediateDataSetIdsToConsume = new ArrayList<>();/*** Indicates whether this job vertex supports multiple attempts of the same subtask executing at* the same time.*/private boolean supportsConcurrentExecutionAttempts = true;private boolean parallelismConfigured = false;// --------------------------------------------------------------------------------------------/*** Constructs a new job vertex and assigns it with the given name.** @param name The name of the new job vertex.*/public JobVertex(String name) {this(name, null);}/*** Constructs a new job vertex and assigns it with the given name.** @param name The name of the new job vertex.* @param id The id of the job vertex.*/public JobVertex(String name, JobVertexID id) {this.name = name == null ? DEFAULT_NAME : name;this.id = id == null ? new JobVertexID() : id;OperatorIDPair operatorIDPair =OperatorIDPair.generatedIDOnly(OperatorID.fromJobVertexID(this.id));this.operatorIDs = Collections.singletonList(operatorIDPair);}/*** Constructs a new job vertex and assigns it with the given name.** @param name The name of the new job vertex.* @param primaryId The id of the job vertex.* @param operatorIDPairs The operator ID pairs of the job vertex.*/public JobVertex(String name, JobVertexID primaryId, List<OperatorIDPair> operatorIDPairs) {this.name = name == null ? DEFAULT_NAME : name;this.id = primaryId == null ? new JobVertexID() : primaryId;this.operatorIDs = Collections.unmodifiableList(operatorIDPairs);}// --------------------------------------------------------------------------------------------/*** Returns the ID of this job vertex.** @return The ID of this job vertex*/public JobVertexID getID() {return this.id;}/*** Returns the name of the vertex.** @return The name of the vertex.*/public String getName() {return this.name;}/*** Sets the name of the vertex.** @param name The new name.*/public void setName(String name) {this.name = name == null ? DEFAULT_NAME : name;}/*** Returns the number of produced intermediate data sets.** @return The number of produced intermediate data sets.*/public int getNumberOfProducedIntermediateDataSets() {return this.results.size();}/*** Returns the number of inputs.** @return The number of inputs.*/public int getNumberOfInputs() {return this.inputs.size();}public List<OperatorIDPair> getOperatorIDs() {return operatorIDs;}/*** Returns the vertex's configuration object which can be used to pass custom settings to the* task at runtime.** @return the vertex's configuration object*/public Configuration getConfiguration() {if (this.configuration == null) {this.configuration = new Configuration();}return this.configuration;}public void setInvokableClass(Class<? extends TaskInvokable> invokable) {Preconditions.checkNotNull(invokable);this.invokableClassName = invokable.getName();}// This method can only be called once when jobGraph generatedpublic void setParallelismConfigured(boolean parallelismConfigured) {this.parallelismConfigured = parallelismConfigured;}public boolean isParallelismConfigured() {return parallelismConfigured;}/*** Returns the name of the invokable class which represents the task of this vertex.** @return The name of the invokable class, <code>null</code> if not set.*/public String getInvokableClassName() {return this.invokableClassName;}/*** Returns the invokable class which represents the task of this vertex.** @param cl The classloader used to resolve user-defined classes* @return The invokable class, <code>null</code> if it is not set*/public Class<? extends TaskInvokable> getInvokableClass(ClassLoader cl) {if (cl == null) {throw new NullPointerException("The classloader must not be null.");}if (invokableClassName == null) {return null;}try {return Class.forName(invokableClassName, true, cl).asSubclass(TaskInvokable.class);} catch (ClassNotFoundException e) {throw new RuntimeException("The user-code class could not be resolved.", e);} catch (ClassCastException e) {throw new RuntimeException("The user-code class is no subclass of " + TaskInvokable.class.getName(), e);}}/*** Gets the parallelism of the task.** @return The parallelism of the task.*/public int getParallelism() {return parallelism;}/*** Sets the parallelism for the task.** @param parallelism The parallelism for the task.*/public void setParallelism(int parallelism) {if (parallelism < 1 && parallelism != ExecutionConfig.PARALLELISM_DEFAULT) {throw new IllegalArgumentException("The parallelism must be at least one, or "+ ExecutionConfig.PARALLELISM_DEFAULT+ " (unset).");}this.parallelism = parallelism;}/*** Gets the maximum parallelism for the task.** @return The maximum parallelism for the task.*/public int getMaxParallelism() {return maxParallelism;}/*** Sets the maximum parallelism for the task.** @param maxParallelism The maximum parallelism to be set. must be between 1 and* Short.MAX_VALUE + 1.*/public void setMaxParallelism(int maxParallelism) {this.maxParallelism = maxParallelism;}/*** Gets the minimum resource for the task.** @return The minimum resource for the task.*/public ResourceSpec getMinResources() {return minResources;}/*** Gets the preferred resource for the task.** @return The preferred resource for the task.*/public ResourceSpec getPreferredResources() {return preferredResources;}/*** Sets the minimum and preferred resources for the task.** @param minResources The minimum resource for the task.* @param preferredResources The preferred resource for the task.*/public void setResources(ResourceSpec minResources, ResourceSpec preferredResources) {this.minResources = checkNotNull(minResources);this.preferredResources = checkNotNull(preferredResources);}public InputSplitSource<?> getInputSplitSource() {return inputSplitSource;}public void setInputSplitSource(InputSplitSource<?> inputSplitSource) {this.inputSplitSource = inputSplitSource;}public List<IntermediateDataSet> getProducedDataSets() {return new ArrayList<>(results.values());}public List<JobEdge> getInputs() {return this.inputs;}public List<SerializedValue<OperatorCoordinator.Provider>> getOperatorCoordinators() {return Collections.unmodifiableList(operatorCoordinators);}public void addOperatorCoordinator(SerializedValue<OperatorCoordinator.Provider> serializedCoordinatorProvider) {operatorCoordinators.add(serializedCoordinatorProvider);}/*** Associates this vertex with a slot sharing group for scheduling. Different vertices in the* same slot sharing group can run one subtask each in the same slot.** @param grp The slot sharing group to associate the vertex with.*/public void setSlotSharingGroup(SlotSharingGroup grp) {checkNotNull(grp);if (this.slotSharingGroup != null) {this.slotSharingGroup.removeVertexFromGroup(this.getID());}grp.addVertexToGroup(this.getID());this.slotSharingGroup = grp;}/*** Gets the slot sharing group that this vertex is associated with. Different vertices in the* same slot sharing group can run one subtask each in the same slot.** @return The slot sharing group to associate the vertex with*/public SlotSharingGroup getSlotSharingGroup() {if (slotSharingGroup == null) {// create a new slot sharing group for this vertex if it was in no other slot sharing// group.// this should only happen in testing cases at the moment because production code path// will// always set a value to it before usedsetSlotSharingGroup(new SlotSharingGroup());}return slotSharingGroup;}/*** Tells this vertex to strictly co locate its subtasks with the subtasks of the given vertex.* Strict co-location implies that the n'th subtask of this vertex will run on the same parallel* computing instance (TaskManager) as the n'th subtask of the given vertex.** <p>NOTE: Co-location is only possible between vertices in a slot sharing group.** <p>NOTE: This vertex must (transitively) depend on the vertex to be co-located with. That* means that the respective vertex must be a (transitive) input of this vertex.** @param strictlyCoLocatedWith The vertex whose subtasks to co-locate this vertex's subtasks* with.* @throws IllegalArgumentException Thrown, if this vertex and the vertex to co-locate with are* not in a common slot sharing group.* @see #setSlotSharingGroup(SlotSharingGroup)*/public void setStrictlyCoLocatedWith(JobVertex strictlyCoLocatedWith) {if (this.slotSharingGroup == null|| this.slotSharingGroup != strictlyCoLocatedWith.slotSharingGroup) {throw new IllegalArgumentException("Strict co-location requires that both vertices are in the same slot sharing group.");}CoLocationGroupImpl thisGroup = this.coLocationGroup;CoLocationGroupImpl otherGroup = strictlyCoLocatedWith.coLocationGroup;if (otherGroup == null) {if (thisGroup == null) {CoLocationGroupImpl group = new CoLocationGroupImpl(this, strictlyCoLocatedWith);this.coLocationGroup = group;strictlyCoLocatedWith.coLocationGroup = group;} else {thisGroup.addVertex(strictlyCoLocatedWith);strictlyCoLocatedWith.coLocationGroup = thisGroup;}} else {if (thisGroup == null) {otherGroup.addVertex(this);this.coLocationGroup = otherGroup;} else {// both had yet distinct groups, we need to merge themthisGroup.mergeInto(otherGroup);}}}@Nullablepublic CoLocationGroup getCoLocationGroup() {return coLocationGroup;}public void updateCoLocationGroup(CoLocationGroupImpl group) {this.coLocationGroup = group;}// --------------------------------------------------------------------------------------------public IntermediateDataSet getOrCreateResultDataSet(IntermediateDataSetID id, ResultPartitionType partitionType) {return this.results.computeIfAbsent(id, key -> new IntermediateDataSet(id, partitionType, this));}public JobEdge connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern, ResultPartitionType partitionType) {return connectNewDataSetAsInput(input, distPattern, partitionType, false);}public JobEdge connectNewDataSetAsInput(JobVertex input,DistributionPattern distPattern,ResultPartitionType partitionType,boolean isBroadcast) {return connectNewDataSetAsInput(input, distPattern, partitionType, new IntermediateDataSetID(), isBroadcast);}public JobEdge connectNewDataSetAsInput(JobVertex input,DistributionPattern distPattern,ResultPartitionType partitionType,IntermediateDataSetID intermediateDataSetId,boolean isBroadcast) {IntermediateDataSet dataSet =input.getOrCreateResultDataSet(intermediateDataSetId, partitionType);JobEdge edge = new JobEdge(dataSet, this, distPattern, isBroadcast);this.inputs.add(edge);dataSet.addConsumer(edge);return edge;}// --------------------------------------------------------------------------------------------public boolean isInputVertex() {return this.inputs.isEmpty();}public boolean isStoppable() {return this.isStoppable;}public boolean isOutputVertex() {return this.results.isEmpty();}public boolean hasNoConnectedInputs() {return inputs.isEmpty();}public void setSupportsConcurrentExecutionAttempts(boolean supportsConcurrentExecutionAttempts) {this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts;}public boolean isSupportsConcurrentExecutionAttempts() {return supportsConcurrentExecutionAttempts;}// --------------------------------------------------------------------------------------------/*** A hook that can be overwritten by sub classes to implement logic that is called by the master* when the job starts.** @param context Provides contextual information for the initialization* @throws Exception The method may throw exceptions which cause the job to fail immediately.*/public void initializeOnMaster(InitializeOnMasterContext context) throws Exception {}/*** A hook that can be overwritten by sub classes to implement logic that is called by the master* after the job completed.** @param context Provides contextual information for the initialization* @throws Exception The method may throw exceptions which cause the job to fail immediately.*/public void finalizeOnMaster(FinalizeOnMasterContext context) throws Exception {}public interface InitializeOnMasterContext {/** The class loader for user defined code. */ClassLoader getClassLoader();/*** The actual parallelism this vertex will be run with. In contrast, the {@link* #getParallelism()} is the original parallelism set when creating the {@link JobGraph} and* might be updated e.g. by the {@link* org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler}.*/int getExecutionParallelism();}/** The context exposes some runtime infos for finalization. */public interface FinalizeOnMasterContext {/** The class loader for user defined code. */ClassLoader getClassLoader();/*** The actual parallelism this vertex will be run with. In contrast, the {@link* #getParallelism()} is the original parallelism set when creating the {@link JobGraph} and* might be updated e.g. by the {@link* org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler}.*/int getExecutionParallelism();/*** Get the finished attempt number of subtask.** @param subtaskIndex the subtask index.* @return the finished attempt.* @throws IllegalArgumentException Thrown, if subtaskIndex is invalid.*/int getFinishedAttempt(int subtaskIndex);}// --------------------------------------------------------------------------------------------public String getOperatorName() {return operatorName;}public void setOperatorName(String operatorName) {this.operatorName = operatorName;}public String getOperatorDescription() {return operatorDescription;}public void setOperatorDescription(String operatorDescription) {this.operatorDescription = operatorDescription;}public void setOperatorPrettyName(String operatorPrettyName) {this.operatorPrettyName = operatorPrettyName;}public String getOperatorPrettyName() {return operatorPrettyName;}public String getResultOptimizerProperties() {return resultOptimizerProperties;}public void setResultOptimizerProperties(String resultOptimizerProperties) {this.resultOptimizerProperties = resultOptimizerProperties;}public void addIntermediateDataSetIdToConsume(IntermediateDataSetID intermediateDataSetId) {intermediateDataSetIdsToConsume.add(intermediateDataSetId);}public List<IntermediateDataSetID> getIntermediateDataSetIdsToConsume() {return intermediateDataSetIdsToConsume;}// --------------------------------------------------------------------------------------------@Overridepublic String toString() {return this.name + " (" + this.invokableClassName + ')';}
}
3.JobEdge边:
JobEdge是JobGraph中连接各个JobVertex的边,它定义了数据在JobVertex节点之间的流动方式和分区策略,决定了数据如何从上游算子传递到下游算子,并影响作业的并行计算、数据分区和任务调度。
JobEdge图解:
其中JobVertex的通过source(IntermediateDataSet)、target(JobVertex)保存上游输入的IntermediateDataSet与下游输出的JobVertex的连接关系,distributionPattern记录了上下游的连接关系是ALL_TO_ALL还是POINTWISE,此外JobVertex还记录了边连接是否是Forward还是Broadcast等连接配置。
JobEdge源码:
public class JobEdge implements java.io.Serializable {private static final long serialVersionUID = 1L;//下游JobVertex/** The vertex connected to this edge. */private final JobVertex target;//记录连接关系/** The distribution pattern that should be used for this job edge. */private final DistributionPattern distributionPattern;/** The channel rescaler that should be used for this job edge on downstream side. */private SubtaskStateMapper downstreamSubtaskStateMapper = SubtaskStateMapper.ROUND_ROBIN;/** The channel rescaler that should be used for this job edge on upstream side. */private SubtaskStateMapper upstreamSubtaskStateMapper = SubtaskStateMapper.ROUND_ROBIN;//上游IntermediateDataSet/** The data set at the source of the edge, may be null if the edge is not yet connected. */private final IntermediateDataSet source;/*** Optional name for the data shipping strategy (forward, partition hash, rebalance, ...), to be* displayed in the JSON plan.*/private String shipStrategyName;//是否是广播private final boolean isBroadcast;//是否是Forwardprivate boolean isForward;/*** Optional name for the pre-processing operation (sort, combining sort, ...), to be displayed* in the JSON plan.*/private String preProcessingOperationName;/** Optional description of the caching inside an operator, to be displayed in the JSON plan. */private String operatorLevelCachingDescription;/*** Constructs a new job edge, that connects an intermediate result to a consumer task.** @param source The data set that is at the source of this edge.* @param target The operation that is at the target of this edge.* @param distributionPattern The pattern that defines how the connection behaves in parallel.* @param isBroadcast Whether the source broadcasts data to the target.*/public JobEdge(IntermediateDataSet source,JobVertex target,DistributionPattern distributionPattern,boolean isBroadcast) {if (source == null || target == null || distributionPattern == null) {throw new NullPointerException();}this.target = target;this.distributionPattern = distributionPattern;this.source = source;this.isBroadcast = isBroadcast;}/*** Returns the data set at the source of the edge. May be null, if the edge refers to the source* via an ID and has not been connected.** @return The data set at the source of the edge*/public IntermediateDataSet getSource() {return source;}/*** Returns the vertex connected to this edge.** @return The vertex connected to this edge.*/public JobVertex getTarget() {return target;}/*** Returns the distribution pattern used for this edge.** @return The distribution pattern used for this edge.*/public DistributionPattern getDistributionPattern() {return this.distributionPattern;}/*** Gets the ID of the consumed data set.** @return The ID of the consumed data set.*/public IntermediateDataSetID getSourceId() {return source.getId();}// --------------------------------------------------------------------------------------------/*** Gets the name of the ship strategy for the represented input, like "forward", "partition* hash", "rebalance", "broadcast", ...** @return The name of the ship strategy for the represented input, or null, if none was set.*/public String getShipStrategyName() {return shipStrategyName;}/*** Sets the name of the ship strategy for the represented input.** @param shipStrategyName The name of the ship strategy.*/public void setShipStrategyName(String shipStrategyName) {this.shipStrategyName = shipStrategyName;}/** Gets whether the edge is broadcast edge. */public boolean isBroadcast() {return isBroadcast;}/** Gets whether the edge is forward edge. */public boolean isForward() {return isForward;}/** Sets whether the edge is forward edge. */public void setForward(boolean forward) {isForward = forward;}/*** Gets the channel state rescaler used for rescaling persisted data on downstream side of this* JobEdge.** @return The channel state rescaler to use, or null, if none was set.*/public SubtaskStateMapper getDownstreamSubtaskStateMapper() {return downstreamSubtaskStateMapper;}/*** Sets the channel state rescaler used for rescaling persisted data on downstream side of this* JobEdge.** @param downstreamSubtaskStateMapper The channel state rescaler selector to use.*/public void setDownstreamSubtaskStateMapper(SubtaskStateMapper downstreamSubtaskStateMapper) {this.downstreamSubtaskStateMapper = checkNotNull(downstreamSubtaskStateMapper);}/*** Gets the channel state rescaler used for rescaling persisted data on upstream side of this* JobEdge.** @return The channel state rescaler to use, or null, if none was set.*/public SubtaskStateMapper getUpstreamSubtaskStateMapper() {return upstreamSubtaskStateMapper;}/*** Sets the channel state rescaler used for rescaling persisted data on upstream side of this* JobEdge.** @param upstreamSubtaskStateMapper The channel state rescaler selector to use.*/public void setUpstreamSubtaskStateMapper(SubtaskStateMapper upstreamSubtaskStateMapper) {this.upstreamSubtaskStateMapper = checkNotNull(upstreamSubtaskStateMapper);}/*** Gets the name of the pro-processing operation for this input.** @return The name of the pro-processing operation, or null, if none was set.*/public String getPreProcessingOperationName() {return preProcessingOperationName;}/*** Sets the name of the pre-processing operation for this input.** @param preProcessingOperationName The name of the pre-processing operation.*/public void setPreProcessingOperationName(String preProcessingOperationName) {this.preProcessingOperationName = preProcessingOperationName;}/*** Gets the operator-level caching description for this input.** @return The description of operator-level caching, or null, is none was set.*/public String getOperatorLevelCachingDescription() {return operatorLevelCachingDescription;}/*** Sets the operator-level caching description for this input.** @param operatorLevelCachingDescription The description of operator-level caching.*/public void setOperatorLevelCachingDescription(String operatorLevelCachingDescription) {this.operatorLevelCachingDescription = operatorLevelCachingDescription;}// --------------------------------------------------------------------------------------------@Overridepublic String toString() {return String.format("%s --> %s [%s]", source.getId(), target, distributionPattern.name());}
}
4.IntermediateDataSet数据集:
IntermediateDataSet是保存经JobVertex计算后的结果数据集,并通过JobEdge连接下游JobVertex,作为下游算子的输入数据。
IntermediateDataSet图解:
其中IntermediateDataSet的通过producer(JobVertex)、consumers(List<JobEdge>)保存上游输入的JobVertex与下游输出的JobEdge的连接关系,distributionPattern记录了上下游的连接关系是ALL_TO_ALL还是POINTWISE,ResultPartitionType记录了分区的Partition路由信息。
IntermediateDataSet源码:
public class IntermediateDataSet implements java.io.Serializable {private static final long serialVersionUID = 1L;private final IntermediateDataSetID id; // the identifier//上游JobVertexprivate final JobVertex producer; // the operation that produced this data set//下游JobEdge// All consumers must have the same partitioner and parallelismprivate final List<JobEdge> consumers = new ArrayList<>();//封装Partition信息// The type of partition to use at runtimeprivate final ResultPartitionType resultType;//封装连接关系private DistributionPattern distributionPattern;private boolean isBroadcast;// --------------------------------------------------------------------------------------------public IntermediateDataSet(IntermediateDataSetID id, ResultPartitionType resultType, JobVertex producer) {this.id = checkNotNull(id);this.producer = checkNotNull(producer);this.resultType = checkNotNull(resultType);}// --------------------------------------------------------------------------------------------public IntermediateDataSetID getId() {return id;}public JobVertex getProducer() {return producer;}public List<JobEdge> getConsumers() {return this.consumers;}public boolean isBroadcast() {return isBroadcast;}public DistributionPattern getDistributionPattern() {return distributionPattern;}public ResultPartitionType getResultType() {return resultType;}// --------------------------------------------------------------------------------------------public void addConsumer(JobEdge edge) {// sanity checkcheckState(id.equals(edge.getSourceId()), "Incompatible dataset id.");if (consumers.isEmpty()) {distributionPattern = edge.getDistributionPattern();isBroadcast = edge.isBroadcast();} else {checkState(distributionPattern == edge.getDistributionPattern(),"Incompatible distribution pattern.");checkState(isBroadcast == edge.isBroadcast(), "Incompatible broadcast type.");}consumers.add(edge);}// --------------------------------------------------------------------------------------------@Overridepublic String toString() {return "Intermediate Data Set (" + id + ")";}
}
本文是对JobGraph的解释与补充,完整jobGraph创建源码解析见《Flink-1.19.0源码详解5-JobGraph生成-前篇》。