Flink是Apache软件基金会下开源的分布式流批一体计算框架,具备实时流计算和高吞吐批处理计算的大数据计算能力。本专栏内容为Flink源码解析的记录与分享。
本文解析的Kafka源码版本为:flink-1.19.0
1.JobGraph生成功能概述
在上文《Flink-1.19.0源码详解4-StreamGraph生成》中,已介绍了Flink StreamGraph的生成,解析了Flink遍历Transformation集合,逐步生成StreamNode与StreamEdge,构建StreamGraph图的完整源码的过程。在完成 StreamGraph的生成后,Flink会执行StreamExecutionEnvironmentd的execute()方法,开始进行JobGraph生成。
本文从Flink执行StreamExecutionEnvironmentd.execute()方法开始,解析Flink JobGraph生成的源码(内容为下流程图的红色部分),解析Flink遍历StreamGraph每个StreamNode节点,逐步生成JobVertex节点、JobEdge边和IntermediateDataSet数据集,逐步构建JobGraph图的完整源码。
JobGraph生成的本质是生成算子链把可链接的StreamNode节点链接起来成为一个JobVertex节点,集约算子内部计算,减少算子间的网络传输,形成计算更高效的JobGraph图。
Flink的JobGraph生成主要是通过遍历StreamGraph中的每个StreamNode节点,把可链接的StreamNode节点链接成一个JobVertex节点,把StreamEdge边转换为记录连接关系的JobEdge边和记录节点结果输出的IntermediateDataSet,并把数据处理逻辑、算子链信息、内部可链的边、节点不可链的输出都序列化在JobVertex节点的configuration配置中,形成可被Yarn中Flink集群的JobMaster调度的数据处理流图。
JobGraph生成的具体步骤如下:
1.递归调用生成算子链:从Source节点向Sink递归,检查每个输出边是否可链,逐步合并可链的节点,创建Chain算子链。
2.JobVertex节点创建:如果递归节点本次为链头节点,则创建JobVertex节点封装算子信息与计算逻辑,并在StreamingJobGraphGenerator的数据结构中保存节点和算子链信息。
3.创建JobEdge边:遍历每个JobVertex链头节点所有不可链的StreamEdge,创建IntermediateDataSet和JobEdge。
4.序列化算子链、链输出、链内部边:取出每个链头的算子链信息,序列化算子链信息并添加到JobVertex节点的configuration配置中。
5.最终生成完整的JobGraph:最终递归完所有的StreamNode,依次生成JobVertex、JobEdge和IntermediateDataSet,构建完整的JobGraph图。
JobGraph生成源码图解:
完整代码解析:
2.StreamExecutionEnvironment.execute()执行
当Flink完成StreamGraph生成后,继续调用StreamExecutionEnvironment.execute()方法执行StreamGraph继续进行JobGraph生成。
源码图解:
StreamExecutionEnvironment.execute()方法源码:
public JobExecutionResult execute(String jobName) throws Exception {
final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations);
//创建StreamGraph
StreamGraph streamGraph = getStreamGraph();
//...
//执行StreamGraph进行JobGraph调度
return execute(streamGraph);
}
又进行两级调用,最终进入AbstractSessionClusterExecutor.execute()方法。
StreamExecutionEnvironment.execute()方法源码:
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
//继续调用executeAsync()
final JobClient jobClient = executeAsync(streamGraph);
//...
}
StreamExecutionEnvironment.executeAsync()方法源码:
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
//...
//由Executor异步执行StreamGraph,Executor会根据不同部署模式有具体实现,以AbstractSessionClusterExecutor为例
CompletableFuture<JobClient> jobClientFuture =
executor.execute(streamGraph, configuration, userClassloader);
//...
}
由Executor异步执行StreamGraph,Executor会根据不同部署模式有具体实现,以AbstractSessionClusterExecutor(Flink的Yarn Session模式)为例,进入AbstractSessionClusterExecutor.execute()方法。
AbstractSessionClusterExecutor.execute()方法是Flink客户端调度的关键方法,主要是调用PipelineExecutorUtils.getJobGraph()方法生成了JobGraph,创建ClusterClient并通过ClusterClient向Yarn中的Flink集群提交JobGraph。
AbstractSessionClusterExecutor.execute()方法源码:
public CompletableFuture<JobClient> execute(
@Nonnull final Pipeline pipeline,
@Nonnull final Configuration configuration,
@Nonnull final ClassLoader userCodeClassloader)
throws Exception {
//生成JobGraph
final JobGraph jobGraph =
PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);
//...
//建立ClusterClient,向集群提交JobGraph
ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();
//Client向集群提交JobGraph
clusterClient.submitJob(jobGraph)
//...
}
3.进入JobGraph生成逻辑
AbstractSessionClusterExecutor.execute()方法最终调用PipelineExecutorUtils.getJobGraph()方法进入JobGraph生成逻辑。
源码图解:
首先PipelineExecutorUtils获取了Flink执行的配置,再通过FlinkPipelineTranslationUtil的getJobGraph()方法生成JobGraph,最后再对JobGraph配置Jar、ClassPath、Savepoint。
PipelineExecutorUtils.getJobGraph()方法源码:
public static JobGraph getJobGraph(
@Nonnull final Pipeline pipeline,
@Nonnull final Configuration configuration,
@Nonnull ClassLoader userClassloader)
throws MalformedURLException {
checkNotNull(pipeline);
checkNotNull(configuration);
//获取执行配置访问器(封装配置信息)
final ExecutionConfigAccessor executionConfigAccessor =
ExecutionConfigAccessor.fromConfiguration(configuration);
//由FlinkPipelineTranslationUtil生成JobGraph
final JobGraph jobGraph =
FlinkPipelineTranslationUtil.getJobGraph(
userClassloader,
pipeline,
configuration,
executionConfigAccessor.getParallelism());
//...
//对JobGraph配置Jar、ClassPath、Savepoint
jobGraph.addJars(executionConfigAccessor.getJars());
jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
//返回jobGraph
return jobGraph;
}
在JobGraph生成中,FlinkPipelineTranslationUtil.getJobGraph()又经历几层调用,最终通过创建StreamingJobGraphGenerator,并通过StreamingJobGraphGenerator生成JobGraph。
FlinkPipelineTranslationUtil.getJobGraph()方法源码:
public static JobGraph getJobGraph(
ClassLoader userClassloader,
Pipeline pipeline,
Configuration optimizerConfiguration,
int defaultParallelism) {
//获取Pipeline翻译器(Pipeline指StreamGraph)
FlinkPipelineTranslator pipelineTranslator =
getPipelineTranslator(userClassloader, pipeline);
//由pipelineTranslator生成JobGraph,PipelineTranslator有多个实现,流计算选
//择:StreamGraphTranslator
JobGraph jobGraph =
pipelineTranslator.translateToJobGraph(
pipeline, optimizerConfiguration, defaultParallelism);
optimizerConfiguration
.getOptional(PipelineOptions.PARALLELISM_OVERRIDES)
.ifPresent(
map ->
jobGraph.getJobConfiguration()
.set(PipelineOptions.PARALLELISM_OVERRIDES, map));
return jobGraph;
}
PipelineTranslator有多个实现,流计算选则StreamGraphTranslator,批处理为FlinkPipelineTranslator,本文分析流计算JobGraph生成逻辑,进入StreamGraphTranslator.translateToJobGraph()方法。
StreamGraphTranslator.translateToJobGraph()方法调用了StreamGraph.getJobGraph()方法生成JobGraph。
StreamGraphTranslator.translateToJobGraph()方法源码:
public JobGraph translateToJobGraph(
Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) {
checkArgument(
pipeline instanceof StreamGraph, "Given pipeline is not a DataStream StreamGraph.");
//获取StreamGraph
StreamGraph streamGraph = (StreamGraph) pipeline;
//调用StreamGraph.getJobGraph生成JobGraph
return streamGraph.getJobGraph(userClassloader, null);
}
StreamGraph.getJobGraph()方法由调用了StreamingJobGraphGenerator.createJobGraph()方法生成JobGraph。
StreamGraph.getJobGraph()方法源码:
public JobGraph getJobGraph(ClassLoader userClassLoader, @Nullable JobID jobID) {
//最终通过StreamingJobGraphGenerator生成JobGraph
return StreamingJobGraphGenerator.createJobGraph(userClassLoader, this, jobID);
}
最终StreamingJobGraphGenerator.createJobGraph()方法创建StreamingJobGraphGenerator实例,最终通过StreamingJobGraphGenerator实例生成JobGraph。
StreamingJobGraphGenerator.createJobGraph()方法源码:
public static JobGraph createJobGraph(
ClassLoader userClassLoader, StreamGraph streamGraph, @Nullable JobID jobID) {
// TODO Currently, we construct a new thread pool for the compilation of each job. In the
// future, we may refactor the job submission framework and make it reusable across jobs.
//定义用于序列化的线程池
final ExecutorService serializationExecutor =
Executors.newFixedThreadPool(
Math.max(
1,
Math.min(
Hardware.getNumberCPUCores(),
streamGraph.getExecutionConfig().getParallelism())),
new ExecutorThreadFactory("flink-operator-serialization-io"));
try {
//创建StreamingJobGraphGenerator实例调用其createJobGraph()生成JobGraph
return new StreamingJobGraphGenerator(
userClassLoader, streamGraph, jobID, serializationExecutor)
.createJobGraph();
} finally {
serializationExecutor.shutdown();
}
}
4.StreamingJobGraphGenerator生成JobGraph
StreamingJobGraphGenerator的createJobGraph()方法为JobGraph生成的具体方法,生成JobGraph详细逻辑如下图的源码图解。
源码图解:
首先分析StreamingJobGraphGenerator的createJobGraph()方法,createJobGraph()方法包含了所有创建JobGraph的关键步骤,首先创建了算子链把可链的节点链起来并创建JobVertex,并对不可链的输出创建JobEdge连接前后JobVertex,将算子信息序列化到JobVertex中,并最终配置JobGraph的SlotSharingGroup、Checkpoint、SavepointRestore、ExecutionConfig完成JobGraph的创建。
源码图解:
StreamingJobGraphGenerator.createJobGraph()方法源码:
private JobGraph createJobGraph() {
//验证Checkpoint以及配置JobType、Dynamic
preValidate();
jobGraph.setJobType(streamGraph.getJobType());
jobGraph.setDynamic(streamGraph.isDynamic());
jobGraph.enableApproximateLocalRecovery(
streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
//创建用于标识各节点的哈希值
Map<Integer, byte[]> hashes =
defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// Generate legacy version hashes for backwards compatibility
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
//把可链的节点链接起来
setChaining(hashes, legacyHashes);
if (jobGraph.isDynamic()) {
setVertexParallelismsForDynamicGraphIfNecessary();
}
// Note that we set all the non-chainable outputs configuration here because the
// "setVertexParallelismsForDynamicGraphIfNecessary" may affect the parallelism of job
// vertices and partition-reuse
final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs =
new HashMap<>();
//对每个不能chain的输出进行序列化
setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs);
//遍历不可链的JobVertex生成JobEdge
setAllVertexNonChainedOutputsConfigs(opIntermediateOutputs);
//设置PhysicalEdges,将每个JobVertex的入边集合也序列化到该JobVertex的StreamConfig中
setPhysicalEdges();
markSupportingConcurrentExecutionAttempts();
validateHybridShuffleExecuteInBatchMode();
//为每个 JobVertex 设置所属的 SlotSharingGroup
setSlotSharingAndCoLocation();
setManagedMemoryFraction(
Collections.unmodifiableMap(jobVertices),
Collections.unmodifiableMap(vertexConfigs),
Collections.unmodifiableMap(chainedConfigs),
id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());
//配置checkpoint
configureCheckpointing();
//配置savepointRestore
jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =
JobGraphUtils.prepareUserArtifactEntries(
streamGraph.getUserArtifacts().stream()
.collect(Collectors.toMap(e -> e.f0, e -> e.f1)),
jobGraph.getJobID());
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
distributedCacheEntries.entrySet()) {
jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
}
//配置ExecutionConfig
// set the ExecutionConfig last when it has been finalized
try {
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
} catch (IOException e) {
throw new IllegalConfigurationException(
"Could not serialize the ExecutionConfig."
+ "This indicates that non-serializable types (like custom serializers) were registered");
}
jobGraph.setJobConfiguration(streamGraph.getJobConfiguration());
addVertexIndexPrefixInVertexName();
setVertexDescription();
//对每个链头节点,序列化其算子链,放到节点配置中
// Wait for the serialization of operator coordinators and stream config.
try {
FutureUtils.combineAll(
vertexConfigs.values().stream()
.map(
config ->
config.triggerSerializationAndReturnFuture(
serializationExecutor))
.collect(Collectors.toList()))
.get();
waitForSerializationFuturesAndUpdateJobVertices();
} catch (Exception e) {
throw new FlinkRuntimeException("Error in serialization.", e);
}
if (!streamGraph.getJobStatusHooks().isEmpty()) {
jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());
}
//返回创建好的JobGraph
return jobGraph;
}
5.递归创建算子链
StreamingJobGraphGenerator的createJobGraph()方法中,setChaining(hashes, legacyHashes)为JobGraph生成的关键步骤:递归创建算子链和JobVertex节点。
Flink会从Source节点向Sink递归StreamGraph,检查每个StreamNode节点的输出边是否可链,逐步合并可链的节点,创建Chain算子链。
源码图解:
在创建算子链的StreamingJobGraphGenerator.setChaining()方法中,Flink取出所有source算子,从source到sink递归遍历所有算子,对可链接的算子进行chain操作。
StreamingJobGraphGenerator.setChaining()方法源码:
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
// we separate out the sources that run as inputs to another operator (chained inputs)
// from the sources that needs to run as the main (head) operator.
//取出所有source算子
final Map<Integer, OperatorChainInfo> chainEntryPoints =
buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
//根据key排序并将value转为list
final Collection<OperatorChainInfo> initialEntryPoints =
chainEntryPoints.entrySet().stream()
.sorted(Comparator.comparing(Map.Entry::getKey))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
//从source到sink递归遍历所有算子,对可链接的算子进行chain操作
// iterate over a copy of the values, because this map gets concurrently modified
for (OperatorChainInfo info : initialEntryPoints) {
createChain(
info.getStartNodeId(),
1, // operators start at position 1 because 0 is for chained source inputs
info,
chainEntryPoints);
}
}
从source到sink递归遍历所有算子进行算子链创建是通过StreamingJobGraphGenerator的createChain()方法进行的。
StreamingJobGraphGenerator会遍历当前StreamNode结点的每个output出边,判断是否可chain链接,对于可链的结点,建立chain,对于不可链的结点,建立链头。StreamingJobGraphGenerator会依次从source到sink遍历完所有的StreamNode递归进行Chain。
当递归到的结点是当次遍历的起始结点,是则调用createJobVertex()生成JobVertex链头节点,否则只生成StreamConfig记录operator。对于JobVertex链头节点,会序列化所有算子链节点的StremConfig,写入JobVertex链头节点的配置中保存。
StreamingJobGraphGenerator.createChain()方法源码:
private List<StreamEdge> createChain(
final Integer currentNodeId,
final int chainIndex,
final OperatorChainInfo chainInfo,
final Map<Integer, OperatorChainInfo> chainEntryPoints) {
//获取本次遍历的起始节点id
Integer startNodeId = chainInfo.getStartNodeId();
//如果以遍历过的节点未生成JobVertex 进行chain操作
if (!builtVertices.contains(startNodeId)) {
//初始化数据结构
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
//获取当前结点
StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
//遍历当前结点的每个output出边,判断是否可chain链接
for (StreamEdge outEdge : currentNode.getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
//对于可链的结点,建立chain,并把出边加入已转换的出边集合,递归调用createChain()判断下游是否可Chain
for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(
createChain(
chainable.getTargetId(),
chainIndex + 1,
chainInfo,
chainEntryPoints));
}
//对于不可链的结点,建立链头,并把出边加入已转换的出边集合,递归调用createChain()判断下游是否可Chain
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(
nonChainable.getTargetId(),
1, // operators start at position 1 because 0 is for chained source inputs
chainEntryPoints.computeIfAbsent(
nonChainable.getTargetId(),
(k) -> chainInfo.newChain(nonChainable.getTargetId())),
chainEntryPoints);
}
chainedNames.put(
currentNodeId,
createChainedName(
currentNodeId,
chainableOutputs,
Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
chainedMinResources.put(
currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
chainedPreferredResources.put(
currentNodeId,
createChainedPreferredResources(currentNodeId, chainableOutputs));
//生成把当前节点的hash加入链头节点的链信息中
OperatorID currentOperatorId =
chainInfo.addNodeToChain(
currentNodeId,
streamGraph.getStreamNode(currentNodeId).getOperatorName());
if (currentNode.getInputFormat() != null) {
getOrCreateFormatContainer(startNodeId)
.addInputFormat(currentOperatorId, currentNode.getInputFormat());
}
if (currentNode.getOutputFormat() != null) {
getOrCreateFormatContainer(startNodeId)
.addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
}
//判断当前结点是否是当此遍历的起始结点,是则调用createJobVertex()生成JobVertex,否则只生成StreamConfig记录operator
StreamConfig config =
currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, chainInfo)
: new StreamConfig(new Configuration());
//对当前结点进行配置
tryConvertPartitionerForDynamicGraph(chainableOutputs, nonChainableOutputs);
setOperatorConfig(currentNodeId, config, chainInfo.getChainedSources());
//对当前结点的可链边进行配置和序列化
setOperatorChainedOutputsConfig(config, chainableOutputs);
//缓存未被chain的输出
// we cache the non-chainable outputs here, and set the non-chained config later
opNonChainableOutputsCache.put(currentNodeId, nonChainableOutputs);
//已完成chain,根据是否是当前结点生成相应的chain配置信息
if (currentNodeId.equals(startNodeId)) {
记录不可链的边
chainInfo.setTransitiveOutEdges(transitiveOutEdges);
chainInfos.put(startNodeId, chainInfo);
//配置为链头
config.setChainStart();
config.setChainIndex(chainIndex);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
//将所有子节点的StremConfig,写入head结点的chainedTaskFutures(Map)中
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
chainedConfigs.computeIfAbsent(
startNodeId, k -> new HashMap<Integer, StreamConfig>());
config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
config.setOperatorID(currentOperatorId);
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
}
其中,isChainable(outEdge, streamGraph)为判断算子是否可链的代码。StreamingJobGraphGenerator.isChainable()方法首先判断出边为是否为1,然后再继续调用isChainableInput()方法进行进一步判断。
StreamingJobGraphGenerator.isChainable()方法:
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
//为判断是否可链的函数,首先出边为1,然后再看isChainableInput()方法
return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph);
}
StreamingJobGraphGenerator.isChainableInput()方法中,先判断了算子是否配置了允许可链、判断上下算子是否再同一个共享Slot,检查前后算子的ChainingStrategy,前算子的是否为HEAD\ALWAYS、后算子是否为ALWAYS,判断分区器Partitioner是否forward,若都满足,则可把前后两个节点链接起来。
StreamingJobGraphGenerator.isChainableInput()方法:
private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
//streamGraph配置允许Chain
if (!(streamGraph.isChainingEnabled()
//是否上下游算子在同一个共享slot
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
//检查ChainingStrategy,前算子的是否为HEAD\ALWAYS、后算子是否为ALWAYS
&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
//判断分区器Partitioner是否forward
&& arePartitionerAndExchangeModeChainable(
edge.getPartitioner(), edge.getExchangeMode(), streamGraph.isDynamic()))) {
return false;
}
// check that we do not have a union operation, because unions currently only work
// through the network/byte-channel stack.
// we check that by testing that each "type" (which means input position) is used only once
for (StreamEdge inEdge : downStreamVertex.getInEdges()) {
if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {
return false;
}
}
return true;
}
回到StreamingJobGraphGenerator.createChain()方法,判断完是否可链后,Flink会继续向下游节点递归创建算子链。对于不可链节点,chainIndex重新被赋值为1,表示新建一条算子链;而可链的节点,chainIndex=chainIndex+1,继续向下游递归创建本条算子链。
StreamingJobGraphGenerator.createChain()方法:
private List<StreamEdge> createChain(
final Integer currentNodeId,
final int chainIndex,
final OperatorChainInfo chainInfo,
final Map<Integer, OperatorChainInfo> chainEntryPoints) {
//...
//遍历当前结点的每个output出边,判断是否可chain链接
for (StreamEdge outEdge : currentNode.getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
//对于可链的结点,建立chain,并把出边加入已转换的出边集合,递归调用createChain()判断下游是否可Chain
for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(
createChain(
chainable.getTargetId(),
chainIndex + 1,
chainInfo,
chainEntryPoints));
}
//对于不可链的结点,建立链头,并把出边加入已转换的出边集合,递归调用createChain()判断下游是否可Chain
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(
nonChainable.getTargetId(),
1, // operators start at position 1 because 0 is for chained source inputs
chainEntryPoints.computeIfAbsent(
nonChainable.getTargetId(),
(k) -> chainInfo.newChain(nonChainable.getTargetId())),
chainEntryPoints);
}
//判断当前结点是否是当次递归的起始结点,是则调用createJobVertex()生成JobVertex,否则只生成StreamConfig记录operator
StreamConfig config =
currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, chainInfo)
: new StreamConfig(new Configuration());
//...
}
若本次递归的节点为链头节点(当次递归的起始结点),则调用createJobVertex()创建JobVertex,否则只生成StreamConfig记录operator。
6.创建JobVertex链头节点
如果递归节点本次为链头节点,则创建JobVertex封装算子信息与计算逻辑,并在StreamingJobGraphGenerator的数据结构中保存节点和算子链信息。
创建链头节点JobVertex的方法为StreamingJobGraphGenerator.createJobVertex()方法。方法内容为创建JobVertex实例、配置JobVertex、向JobGraph添加JobVertex。
源码图解:
创建时会判断节点是否是source/sink结点,分别创建InputOutputFormatVertex和普通的JobVertex。
StreamingJobGraphGenerator.createJobVertex()方法源码:
private StreamConfig createJobVertex(Integer streamNodeId, OperatorChainInfo chainInfo) {
//..
//根据StreamNode是否是source/sink结点,分别创建InputOutputFormatVertex和普通的JobVertex
//source/sink结点
if (chainedInputOutputFormats.containsKey(streamNodeId)) {
//创建InputOutputFormatVertex
jobVertex = new InputOutputFormatVertex(
chainedNames.get(streamNodeId), jobVertexId, operatorIDPairs);
chainedInputOutputFormats.get(streamNodeId)
.write(new TaskConfig(jobVertex.getConfiguration()));
} else {
//普通opreator,创建普通JobVertex
jobVertex = new JobVertex(chainedNames.get(streamNodeId),
jobVertexId, operatorIDPairs);
}
//..
//配置JobVertex
jobVertex.addIntermediateDataSetIdToConsume(streamNode.getConsumeClusterDatasetId());
jobVertex.setResources(chainedMinResources.get(streamNodeId), chainedPreferredResources.get(streamNodeId));
jobVertex.setInvokableClass(streamNode.getJobVertexClass());
jobVertex.setParallelism(parallelism);
jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
jobVertex.setParallelismConfigured(chainInfo.getAllChainedNodes().stream().anyMatch(StreamNode::isParallelismConfigured));
//向JobGraph添加JobVertex
jobVertices.put(streamNodeId, jobVertex);
builtVertices.add(streamNodeId);
jobGraph.addVertex(jobVertex);
jobVertex.setParallelismConfigured(
chainInfo.getAllChainedNodes().stream()
.anyMatch(StreamNode::isParallelismConfigured));
//返回的StreamConfig封装了JobVertex的Config
return new StreamConfig(jobVertex.getConfiguration());
}
执行完成StreamingJobGraphGenerator.createJobVertex()方法创建了JobVertex节点后,返回StreamingJobGraphGenerator.createChain()方法,还会对JobVertex进行进一步的配置与序列化。链头节点的算子链信息会记录在链头节点的StreamConfig配置中。
StreamingJobGraphGenerator.createChain()方法源码:
·private List<StreamEdge> createChain(
final Integer currentNodeId,
final int chainIndex,
final OperatorChainInfo chainInfo,
final Map<Integer, OperatorChainInfo> chainEntryPoints) {
//...
//判断当前结点是否是当此遍历的起始结点,是则调用createJobVertex()生成JobVertex,否则只生成StreamConfig记录operator
StreamConfig config =
currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, chainInfo)
: new StreamConfig(new Configuration());
//对当前结点进行配置
tryConvertPartitionerForDynamicGraph(chainableOutputs, nonChainableOutputs);
setOperatorConfig(currentNodeId, config, chainInfo.getChainedSources());
//对当前结点的可链边进行配置和序列化
setOperatorChainedOutputsConfig(config, chainableOutputs);
//缓存未被chain的输出
// we cache the non-chainable outputs here, and set the non-chained config later
opNonChainableOutputsCache.put(currentNodeId, nonChainableOutputs);
//已完成chain,根据是否是当前结点生成相应的chain配置信息
if (currentNodeId.equals(startNodeId)) {
记录不可链的边
chainInfo.setTransitiveOutEdges(transitiveOutEdges);
chainInfos.put(startNodeId, chainInfo);
//配置为链头
config.setChainStart();
config.setChainIndex(chainIndex);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
//将所有子节点的StremConfig,写入head结点的chainedTaskFutures(Map)中
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
chainedConfigs.computeIfAbsent(
startNodeId, k -> new HashMap<Integer, StreamConfig>());
config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
config.setOperatorID(currentOperatorId);
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
}
7.结语
自此,本文解析的JobGraph生成源码,已完成了递归创建算子链与JobVertex链头节点生成的源码解析,后篇将继续解析JobGraph的JobEdge边创建、IntermediateDataSet数据集创建、算子链序列化的完整源码。