Flink-1.19.0源码详解5-JobGraph生成-前篇

发布于:2025-07-03 ⋅ 阅读:(21) ⋅ 点赞:(0)

        Flink是Apache软件基金会下开源的分布式流批一体计算框架,具备实时流计算和高吞吐批处理计算的大数据计算能力。本专栏内容为Flink源码解析的记录与分享。

        本文解析的Kafka源码版本为:flink-1.19.0

1.JobGraph生成功能概述

        在上文《Flink-1.19.0源码详解4-StreamGraph生成》中,已介绍了Flink StreamGraph的生成,解析了Flink遍历Transformation集合,逐步生成StreamNode与StreamEdge,构建StreamGraph图的完整源码的过程。在完成 StreamGraph的生成后,Flink会执行StreamExecutionEnvironmentd的execute()方法,开始进行JobGraph生成。

        本文从Flink执行StreamExecutionEnvironmentd.execute()方法开始,解析Flink JobGraph生成的源码(内容为下流程图的红色部分),解析Flink遍历StreamGraph每个StreamNode节点,逐步生成JobVertex节点、JobEdge边和IntermediateDataSet数据集,逐步构建JobGraph图的完整源码。

        JobGraph生成的本质是生成算子链把可链接的StreamNode节点链接起来成为一个JobVertex节点,集约算子内部计算,减少算子间的网络传输,形成计算更高效的JobGraph图。

         Flink的JobGraph生成主要是通过遍历StreamGraph中的每个StreamNode节点,把可链接的StreamNode节点链接成一个JobVertex节点,把StreamEdge边转换为记录连接关系的JobEdge边和记录节点结果输出的IntermediateDataSet,并把数据处理逻辑、算子链信息、内部可链的边、节点不可链的输出都序列化在JobVertex节点的configuration配置中,形成可被Yarn中Flink集群的JobMaster调度的数据处理流图。

JobGraph生成的具体步骤如下:

        1.递归调用生成算子链:从Source节点向Sink递归,检查每个输出边是否可链,逐步合并可链的节点,创建Chain算子链。

        2.JobVertex节点创建:如果递归节点本次为链头节点,则创建JobVertex节点封装算子信息与计算逻辑,并在StreamingJobGraphGenerator的数据结构中保存节点和算子链信息。

        3.创建JobEdge边:遍历每个JobVertex链头节点所有不可链的StreamEdge,创建IntermediateDataSet和JobEdge。

        4.序列化算子链、链输出、链内部边:取出每个链头的算子链信息,序列化算子链信息并添加到JobVertex节点的configuration配置中。

        5.最终生成完整的JobGraph:最终递归完所有的StreamNode,依次生成JobVertex、JobEdge和IntermediateDataSet,构建完整的JobGraph图。

JobGraph生成源码图解:

完整代码解析:

2.StreamExecutionEnvironment.execute()执行

      当Flink完成StreamGraph生成后,继续调用StreamExecutionEnvironment.execute()方法执行StreamGraph继续进行JobGraph生成。

源码图解:



StreamExecutionEnvironment.execute()方法源码:

public JobExecutionResult execute(String jobName) throws Exception {
    final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations);
    
    //创建StreamGraph
    StreamGraph streamGraph = getStreamGraph();
    //...
    
    //执行StreamGraph进行JobGraph调度
    return execute(streamGraph);
}

        又进行两级调用,最终进入AbstractSessionClusterExecutor.execute()方法。

StreamExecutionEnvironment.execute()方法源码:

public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
    //继续调用executeAsync()
    final JobClient jobClient = executeAsync(streamGraph);
    //...
}

StreamExecutionEnvironment.executeAsync()方法源码:

public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
    //...
    
    //由Executor异步执行StreamGraph,Executor会根据不同部署模式有具体实现,以AbstractSessionClusterExecutor为例
    CompletableFuture<JobClient> jobClientFuture =
            executor.execute(streamGraph, configuration, userClassloader);
    //...
}

        由Executor异步执行StreamGraph,Executor会根据不同部署模式有具体实现,以AbstractSessionClusterExecutor(Flink的Yarn Session模式)为例,进入AbstractSessionClusterExecutor.execute()方法。

        AbstractSessionClusterExecutor.execute()方法是Flink客户端调度的关键方法,主要是调用PipelineExecutorUtils.getJobGraph()方法生成了JobGraph,创建ClusterClient并通过ClusterClient向Yarn中的Flink集群提交JobGraph。

AbstractSessionClusterExecutor.execute()方法源码:

public CompletableFuture<JobClient> execute(
        @Nonnull final Pipeline pipeline,
        @Nonnull final Configuration configuration,
        @Nonnull final ClassLoader userCodeClassloader)
        throws Exception {
    //生成JobGraph 
    final JobGraph jobGraph =
            PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);
    //...
    
    //建立ClusterClient,向集群提交JobGraph
    ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();
    
    //Client向集群提交JobGraph
    clusterClient.submitJob(jobGraph)
    //...         
}

3.进入JobGraph生成逻辑

        AbstractSessionClusterExecutor.execute()方法最终调用PipelineExecutorUtils.getJobGraph()方法进入JobGraph生成逻辑。

源码图解:

        首先PipelineExecutorUtils获取了Flink执行的配置,再通过FlinkPipelineTranslationUtil的getJobGraph()方法生成JobGraph,最后再对JobGraph配置Jar、ClassPath、Savepoint。

PipelineExecutorUtils.getJobGraph()方法源码:

public static JobGraph getJobGraph(
        @Nonnull final Pipeline pipeline,
        @Nonnull final Configuration configuration,
        @Nonnull ClassLoader userClassloader)
        throws MalformedURLException {
    checkNotNull(pipeline);
    checkNotNull(configuration);

    //获取执行配置访问器(封装配置信息)
    final ExecutionConfigAccessor executionConfigAccessor =
            ExecutionConfigAccessor.fromConfiguration(configuration);
            
    //由FlinkPipelineTranslationUtil生成JobGraph       
    final JobGraph jobGraph =
            FlinkPipelineTranslationUtil.getJobGraph(
                    userClassloader,
                    pipeline,
                    configuration,
                    executionConfigAccessor.getParallelism());

    //...

    //对JobGraph配置Jar、ClassPath、Savepoint
    jobGraph.addJars(executionConfigAccessor.getJars());
    jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
    jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());

    //返回jobGraph
    return jobGraph;
}

      在JobGraph生成中,FlinkPipelineTranslationUtil.getJobGraph()又经历几层调用,最终通过创建StreamingJobGraphGenerator,并通过StreamingJobGraphGenerator生成JobGraph。

FlinkPipelineTranslationUtil.getJobGraph()方法源码:

public static JobGraph getJobGraph(
        ClassLoader userClassloader,
        Pipeline pipeline,
        Configuration optimizerConfiguration,
        int defaultParallelism) {

    //获取Pipeline翻译器(Pipeline指StreamGraph)
    FlinkPipelineTranslator pipelineTranslator =
            getPipelineTranslator(userClassloader, pipeline);

    //由pipelineTranslator生成JobGraph,PipelineTranslator有多个实现,流计算选 
    //择:StreamGraphTranslator
    JobGraph jobGraph =
            pipelineTranslator.translateToJobGraph(
                    pipeline, optimizerConfiguration, defaultParallelism);
    
    optimizerConfiguration
            .getOptional(PipelineOptions.PARALLELISM_OVERRIDES)
            .ifPresent(
                    map ->
                            jobGraph.getJobConfiguration()
                                    .set(PipelineOptions.PARALLELISM_OVERRIDES, map));

    return jobGraph;
}

        PipelineTranslator有多个实现,流计算选则StreamGraphTranslator,批处理为FlinkPipelineTranslator,本文分析流计算JobGraph生成逻辑,进入StreamGraphTranslator.translateToJobGraph()方法。

        StreamGraphTranslator.translateToJobGraph()方法调用了StreamGraph.getJobGraph()方法生成JobGraph。

StreamGraphTranslator.translateToJobGraph()方法源码:

public JobGraph translateToJobGraph(
        Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) {
    checkArgument(
            pipeline instanceof StreamGraph, "Given pipeline is not a DataStream StreamGraph.");
    //获取StreamGraph
    StreamGraph streamGraph = (StreamGraph) pipeline;
    
    //调用StreamGraph.getJobGraph生成JobGraph
    return streamGraph.getJobGraph(userClassloader, null);
}

        StreamGraph.getJobGraph()方法由调用了StreamingJobGraphGenerator.createJobGraph()方法生成JobGraph。

StreamGraph.getJobGraph()方法源码:

public JobGraph getJobGraph(ClassLoader userClassLoader, @Nullable JobID jobID) {
    //最终通过StreamingJobGraphGenerator生成JobGraph
    return StreamingJobGraphGenerator.createJobGraph(userClassLoader, this, jobID);
}

       最终StreamingJobGraphGenerator.createJobGraph()方法创建StreamingJobGraphGenerator实例,最终通过StreamingJobGraphGenerator实例生成JobGraph。

StreamingJobGraphGenerator.createJobGraph()方法源码:

public static JobGraph createJobGraph(
        ClassLoader userClassLoader, StreamGraph streamGraph, @Nullable JobID jobID) {
    // TODO Currently, we construct a new thread pool for the compilation of each job. In the
    // future, we may refactor the job submission framework and make it reusable across jobs.
    
    //定义用于序列化的线程池
    final ExecutorService serializationExecutor =
            Executors.newFixedThreadPool(
                    Math.max(
                            1,
                            Math.min(
                                    Hardware.getNumberCPUCores(),
                                    streamGraph.getExecutionConfig().getParallelism())),
                    new ExecutorThreadFactory("flink-operator-serialization-io"));
    try { 
        //创建StreamingJobGraphGenerator实例调用其createJobGraph()生成JobGraph
        return new StreamingJobGraphGenerator(
                        userClassLoader, streamGraph, jobID, serializationExecutor)
                .createJobGraph();
    } finally {
        serializationExecutor.shutdown();
    }
}

4.StreamingJobGraphGenerator生成JobGraph

        StreamingJobGraphGenerator的createJobGraph()方法为JobGraph生成的具体方法,生成JobGraph详细逻辑如下图的源码图解。

源码图解:

        首先分析StreamingJobGraphGenerator的createJobGraph()方法,createJobGraph()方法包含了所有创建JobGraph的关键步骤,首先创建了算子链把可链的节点链起来并创建JobVertex,并对不可链的输出创建JobEdge连接前后JobVertex,将算子信息序列化到JobVertex中,并最终配置JobGraph的SlotSharingGroup、Checkpoint、SavepointRestore、ExecutionConfig完成JobGraph的创建。

源码图解:

StreamingJobGraphGenerator.createJobGraph()方法源码:

private JobGraph createJobGraph() {
    //验证Checkpoint以及配置JobType、Dynamic
    preValidate();
    jobGraph.setJobType(streamGraph.getJobType());
    jobGraph.setDynamic(streamGraph.isDynamic());
    jobGraph.enableApproximateLocalRecovery(
            streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());

    // Generate deterministic hashes for the nodes in order to identify them across
    // submission iff they didn't change.
    //创建用于标识各节点的哈希值
    Map<Integer, byte[]> hashes =
            defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
    // Generate legacy version hashes for backwards compatibility
    List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
    for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
        legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
    }

    //把可链的节点链接起来
    setChaining(hashes, legacyHashes);

    if (jobGraph.isDynamic()) {
        setVertexParallelismsForDynamicGraphIfNecessary();
    }

    // Note that we set all the non-chainable outputs configuration here because the
    // "setVertexParallelismsForDynamicGraphIfNecessary" may affect the parallelism of job
    // vertices and partition-reuse
    final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs =
            new HashMap<>();
            
    //对每个不能chain的输出进行序列化        
    setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs);
    
    //遍历不可链的JobVertex生成JobEdge
    setAllVertexNonChainedOutputsConfigs(opIntermediateOutputs);

    //设置PhysicalEdges,将每个JobVertex的入边集合也序列化到该JobVertex的StreamConfig中
    setPhysicalEdges();

    markSupportingConcurrentExecutionAttempts();

    validateHybridShuffleExecuteInBatchMode();

    //为每个 JobVertex 设置所属的 SlotSharingGroup
    setSlotSharingAndCoLocation();

    setManagedMemoryFraction(
            Collections.unmodifiableMap(jobVertices),
            Collections.unmodifiableMap(vertexConfigs),
            Collections.unmodifiableMap(chainedConfigs),
            id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
            id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());

    //配置checkpoint
    configureCheckpointing();

    //配置savepointRestore
    jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());

    final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =
            JobGraphUtils.prepareUserArtifactEntries(
                    streamGraph.getUserArtifacts().stream()
                            .collect(Collectors.toMap(e -> e.f0, e -> e.f1)),
                    jobGraph.getJobID());

   
    for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
            distributedCacheEntries.entrySet()) {
        jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
    }

    //配置ExecutionConfig
    // set the ExecutionConfig last when it has been finalized
    try {
        jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
    } catch (IOException e) {
        throw new IllegalConfigurationException(
                "Could not serialize the ExecutionConfig."
                        + "This indicates that non-serializable types (like custom serializers) were registered");
    }
    jobGraph.setJobConfiguration(streamGraph.getJobConfiguration());

    addVertexIndexPrefixInVertexName();

    setVertexDescription();

    //对每个链头节点,序列化其算子链,放到节点配置中
    // Wait for the serialization of operator coordinators and stream config.
    try {
        FutureUtils.combineAll(
                        vertexConfigs.values().stream()
                                .map(
                                        config ->
                                                config.triggerSerializationAndReturnFuture(
                                                        serializationExecutor))
                                .collect(Collectors.toList()))
                .get();

        waitForSerializationFuturesAndUpdateJobVertices();
    } catch (Exception e) {
        throw new FlinkRuntimeException("Error in serialization.", e);
    }

    if (!streamGraph.getJobStatusHooks().isEmpty()) {
        jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());
    }
    
    //返回创建好的JobGraph
    return jobGraph;
}

5.递归创建算子链

        StreamingJobGraphGenerator的createJobGraph()方法中,setChaining(hashes, legacyHashes)为JobGraph生成的关键步骤:递归创建算子链和JobVertex节点。

        Flink会从Source节点向Sink递归StreamGraph,检查每个StreamNode节点的输出边是否可链,逐步合并可链的节点,创建Chain算子链。

源码图解:

       在创建算子链的StreamingJobGraphGenerator.setChaining()方法中,Flink取出所有source算子,从source到sink递归遍历所有算子,对可链接的算子进行chain操作。

StreamingJobGraphGenerator.setChaining()方法源码:

private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
    // we separate out the sources that run as inputs to another operator (chained inputs)
    // from the sources that needs to run as the main (head) operator.
    
    //取出所有source算子
    final Map<Integer, OperatorChainInfo> chainEntryPoints =
            buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
            
    //根据key排序并将value转为list     
    final Collection<OperatorChainInfo> initialEntryPoints =
            chainEntryPoints.entrySet().stream()
                    .sorted(Comparator.comparing(Map.Entry::getKey))
                    .map(Map.Entry::getValue)
                    .collect(Collectors.toList());

    //从source到sink递归遍历所有算子,对可链接的算子进行chain操作
    // iterate over a copy of the values, because this map gets concurrently modified
    for (OperatorChainInfo info : initialEntryPoints) {
        createChain(
                info.getStartNodeId(),
                1, // operators start at position 1 because 0 is for chained source inputs
                info,
                chainEntryPoints);
    }
}

        从source到sink递归遍历所有算子进行算子链创建是通过StreamingJobGraphGenerator的createChain()方法进行的。

        StreamingJobGraphGenerator会遍历当前StreamNode结点的每个output出边,判断是否可chain链接,对于可链的结点,建立chain,对于不可链的结点,建立链头。StreamingJobGraphGenerator会依次从source到sink遍历完所有的StreamNode递归进行Chain。

        当递归到的结点是当次遍历的起始结点,是则调用createJobVertex()生成JobVertex链头节点,否则只生成StreamConfig记录operator。对于JobVertex链头节点,会序列化所有算子链节点的StremConfig,写入JobVertex链头节点的配置中保存。

StreamingJobGraphGenerator.createChain()方法源码:

private List<StreamEdge> createChain(
        final Integer currentNodeId,
        final int chainIndex,
        final OperatorChainInfo chainInfo,
        final Map<Integer, OperatorChainInfo> chainEntryPoints) {

    //获取本次遍历的起始节点id
    Integer startNodeId = chainInfo.getStartNodeId();
    
    //如果以遍历过的节点未生成JobVertex 进行chain操作
    if (!builtVertices.contains(startNodeId)) {

        //初始化数据结构
        List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
        List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
        List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

        //获取当前结点
        StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
        
        //遍历当前结点的每个output出边,判断是否可chain链接
        for (StreamEdge outEdge : currentNode.getOutEdges()) {
            if (isChainable(outEdge, streamGraph)) {
                chainableOutputs.add(outEdge);
            } else {
                nonChainableOutputs.add(outEdge);
            }
        }
        
        //对于可链的结点,建立chain,并把出边加入已转换的出边集合,递归调用createChain()判断下游是否可Chain
        for (StreamEdge chainable : chainableOutputs) {
            transitiveOutEdges.addAll(
                    createChain(
                            chainable.getTargetId(),
                            chainIndex + 1,
                            chainInfo,
                            chainEntryPoints));
        }

        //对于不可链的结点,建立链头,并把出边加入已转换的出边集合,递归调用createChain()判断下游是否可Chain
        for (StreamEdge nonChainable : nonChainableOutputs) {
            transitiveOutEdges.add(nonChainable);
            createChain(
                    nonChainable.getTargetId(),
                    1, // operators start at position 1 because 0 is for chained source inputs
                    chainEntryPoints.computeIfAbsent(
                            nonChainable.getTargetId(),
                            (k) -> chainInfo.newChain(nonChainable.getTargetId())),
                    chainEntryPoints);
        }
         
        chainedNames.put(
                currentNodeId,
                createChainedName(
                        currentNodeId,
                        chainableOutputs,
                        Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
        chainedMinResources.put(
                currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
        chainedPreferredResources.put(
                currentNodeId,
                createChainedPreferredResources(currentNodeId, chainableOutputs));

        //生成把当前节点的hash加入链头节点的链信息中
        OperatorID currentOperatorId =
                chainInfo.addNodeToChain(
                        currentNodeId,
                        streamGraph.getStreamNode(currentNodeId).getOperatorName());

        if (currentNode.getInputFormat() != null) {
            getOrCreateFormatContainer(startNodeId)
                    .addInputFormat(currentOperatorId, currentNode.getInputFormat());
        }

        if (currentNode.getOutputFormat() != null) {
            getOrCreateFormatContainer(startNodeId)
                    .addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
        }

        //判断当前结点是否是当此遍历的起始结点,是则调用createJobVertex()生成JobVertex,否则只生成StreamConfig记录operator
        StreamConfig config =
                currentNodeId.equals(startNodeId)
                        ? createJobVertex(startNodeId, chainInfo)
                        : new StreamConfig(new Configuration());
      
        //对当前结点进行配置
        tryConvertPartitionerForDynamicGraph(chainableOutputs, nonChainableOutputs);
        setOperatorConfig(currentNodeId, config, chainInfo.getChainedSources());

        //对当前结点的可链边进行配置和序列化
        setOperatorChainedOutputsConfig(config, chainableOutputs);

        //缓存未被chain的输出
        // we cache the non-chainable outputs here, and set the non-chained config later
        opNonChainableOutputsCache.put(currentNodeId, nonChainableOutputs);

        //已完成chain,根据是否是当前结点生成相应的chain配置信息
        if (currentNodeId.equals(startNodeId)) {
            记录不可链的边
            chainInfo.setTransitiveOutEdges(transitiveOutEdges);
            chainInfos.put(startNodeId, chainInfo);

            //配置为链头
            config.setChainStart();
            config.setChainIndex(chainIndex);
            config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
            //将所有子节点的StremConfig,写入head结点的chainedTaskFutures(Map)中
            config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

        } else {
            chainedConfigs.computeIfAbsent(
                    startNodeId, k -> new HashMap<Integer, StreamConfig>());

            config.setChainIndex(chainIndex);
            StreamNode node = streamGraph.getStreamNode(currentNodeId);
            config.setOperatorName(node.getOperatorName());
            chainedConfigs.get(startNodeId).put(currentNodeId, config);
        }

        config.setOperatorID(currentOperatorId);

        if (chainableOutputs.isEmpty()) {
            config.setChainEnd();
        }
        return transitiveOutEdges;

    } else {
        return new ArrayList<>();
    }
}

        其中,isChainable(outEdge, streamGraph)为判断算子是否可链的代码。StreamingJobGraphGenerator.isChainable()方法首先判断出边为是否为1,然后再继续调用isChainableInput()方法进行进一步判断。

StreamingJobGraphGenerator.isChainable()方法:

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
    StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
    
    //为判断是否可链的函数,首先出边为1,然后再看isChainableInput()方法
    return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph);
}

        StreamingJobGraphGenerator.isChainableInput()方法中,先判断了算子是否配置了允许可链、判断上下算子是否再同一个共享Slot,检查前后算子的ChainingStrategy,前算子的是否为HEAD\ALWAYS、后算子是否为ALWAYS,判断分区器Partitioner是否forward,若都满足,则可把前后两个节点链接起来。

StreamingJobGraphGenerator.isChainableInput()方法:

private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {
    StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
    StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
    
    //streamGraph配置允许Chain 
    if (!(streamGraph.isChainingEnabled()
            //是否上下游算子在同一个共享slot
            && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
            //检查ChainingStrategy,前算子的是否为HEAD\ALWAYS、后算子是否为ALWAYS
            && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
             //判断分区器Partitioner是否forward
            && arePartitionerAndExchangeModeChainable(
                    edge.getPartitioner(), edge.getExchangeMode(), streamGraph.isDynamic()))) {

        return false;
    }

    // check that we do not have a union operation, because unions currently only work
    // through the network/byte-channel stack.
    // we check that by testing that each "type" (which means input position) is used only once
    for (StreamEdge inEdge : downStreamVertex.getInEdges()) {
        if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {
            return false;
        }
    }
    return true;
}

        回到StreamingJobGraphGenerator.createChain()方法,判断完是否可链后,Flink会继续向下游节点递归创建算子链。对于不可链节点,chainIndex重新被赋值为1,表示新建一条算子链;而可链的节点,chainIndex=chainIndex+1,继续向下游递归创建本条算子链。

StreamingJobGraphGenerator.createChain()方法:

private List<StreamEdge> createChain(
        final Integer currentNodeId,
        final int chainIndex,
        final OperatorChainInfo chainInfo,
        final Map<Integer, OperatorChainInfo> chainEntryPoints) {

        //...
        
        //遍历当前结点的每个output出边,判断是否可chain链接
        for (StreamEdge outEdge : currentNode.getOutEdges()) {
            if (isChainable(outEdge, streamGraph)) {
                chainableOutputs.add(outEdge);
            } else {
                nonChainableOutputs.add(outEdge);
            }
        }
        
        //对于可链的结点,建立chain,并把出边加入已转换的出边集合,递归调用createChain()判断下游是否可Chain
        for (StreamEdge chainable : chainableOutputs) {
            transitiveOutEdges.addAll(
                    createChain(
                            chainable.getTargetId(),
                            chainIndex + 1,
                            chainInfo,
                            chainEntryPoints));
        }

        //对于不可链的结点,建立链头,并把出边加入已转换的出边集合,递归调用createChain()判断下游是否可Chain
        for (StreamEdge nonChainable : nonChainableOutputs) {
            transitiveOutEdges.add(nonChainable);
            createChain(
                    nonChainable.getTargetId(),
                    1, // operators start at position 1 because 0 is for chained source inputs
                    chainEntryPoints.computeIfAbsent(
                            nonChainable.getTargetId(),
                            (k) -> chainInfo.newChain(nonChainable.getTargetId())),
                    chainEntryPoints);
        }

        
        //判断当前结点是否是当次递归的起始结点,是则调用createJobVertex()生成JobVertex,否则只生成StreamConfig记录operator
       StreamConfig config =
                    currentNodeId.equals(startNodeId)
                            ? createJobVertex(startNodeId, chainInfo)
                            : new StreamConfig(new Configuration());
        
         
        //...

}

        若本次递归的节点为链头节点(当次递归的起始结点),则调用createJobVertex()创建JobVertex,否则只生成StreamConfig记录operator。

6.创建JobVertex链头节点

        如果递归节点本次为链头节点,则创建JobVertex封装算子信息与计算逻辑,并在StreamingJobGraphGenerator的数据结构中保存节点和算子链信息。

        创建链头节点JobVertex的方法为StreamingJobGraphGenerator.createJobVertex()方法。方法内容为创建JobVertex实例、配置JobVertex、向JobGraph添加JobVertex。

源码图解:

        创建时会判断节点是否是source/sink结点,分别创建InputOutputFormatVertex和普通的JobVertex。

StreamingJobGraphGenerator.createJobVertex()方法源码:

private StreamConfig createJobVertex(Integer streamNodeId, OperatorChainInfo chainInfo) {
    //..
    
    //根据StreamNode是否是source/sink结点,分别创建InputOutputFormatVertex和普通的JobVertex
		//source/sink结点
		if (chainedInputOutputFormats.containsKey(streamNodeId)) {
		    //创建InputOutputFormatVertex
		    jobVertex = new InputOutputFormatVertex(
		       chainedNames.get(streamNodeId), jobVertexId, operatorIDPairs);
		    chainedInputOutputFormats.get(streamNodeId)
		       .write(new TaskConfig(jobVertex.getConfiguration()));
		} else {
		    //普通opreator,创建普通JobVertex
		    jobVertex = new JobVertex(chainedNames.get(streamNodeId), 
		                                                                    jobVertexId, operatorIDPairs);
		}
    //..
    
    //配置JobVertex
    jobVertex.addIntermediateDataSetIdToConsume(streamNode.getConsumeClusterDatasetId());
    jobVertex.setResources(chainedMinResources.get(streamNodeId), chainedPreferredResources.get(streamNodeId));
    jobVertex.setInvokableClass(streamNode.getJobVertexClass());
	jobVertex.setParallelism(parallelism);
	jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
    jobVertex.setParallelismConfigured(chainInfo.getAllChainedNodes().stream().anyMatch(StreamNode::isParallelismConfigured));

    //向JobGraph添加JobVertex
    jobVertices.put(streamNodeId, jobVertex);
    builtVertices.add(streamNodeId);
    jobGraph.addVertex(jobVertex);

    jobVertex.setParallelismConfigured(
            chainInfo.getAllChainedNodes().stream()
                    .anyMatch(StreamNode::isParallelismConfigured));
    
    //返回的StreamConfig封装了JobVertex的Config
    return new StreamConfig(jobVertex.getConfiguration());
}

        执行完成StreamingJobGraphGenerator.createJobVertex()方法创建了JobVertex节点后,返回StreamingJobGraphGenerator.createChain()方法,还会对JobVertex进行进一步的配置与序列化。链头节点的算子链信息会记录在链头节点的StreamConfig配置中。

StreamingJobGraphGenerator.createChain()方法源码:

·private List<StreamEdge> createChain(
        final Integer currentNodeId,
        final int chainIndex,
        final OperatorChainInfo chainInfo,
        final Map<Integer, OperatorChainInfo> chainEntryPoints) {
        //...
        
        //判断当前结点是否是当此遍历的起始结点,是则调用createJobVertex()生成JobVertex,否则只生成StreamConfig记录operator
        StreamConfig config =
                currentNodeId.equals(startNodeId)
                        ? createJobVertex(startNodeId, chainInfo)
                        : new StreamConfig(new Configuration());
      
        //对当前结点进行配置
        tryConvertPartitionerForDynamicGraph(chainableOutputs, nonChainableOutputs);
        setOperatorConfig(currentNodeId, config, chainInfo.getChainedSources());

        //对当前结点的可链边进行配置和序列化
        setOperatorChainedOutputsConfig(config, chainableOutputs);

        //缓存未被chain的输出
        // we cache the non-chainable outputs here, and set the non-chained config later
        opNonChainableOutputsCache.put(currentNodeId, nonChainableOutputs);

        //已完成chain,根据是否是当前结点生成相应的chain配置信息
        if (currentNodeId.equals(startNodeId)) {
            记录不可链的边
            chainInfo.setTransitiveOutEdges(transitiveOutEdges);
            chainInfos.put(startNodeId, chainInfo);

            //配置为链头
            config.setChainStart();
            config.setChainIndex(chainIndex);
            config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
            //将所有子节点的StremConfig,写入head结点的chainedTaskFutures(Map)中
            config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

        } else {
            chainedConfigs.computeIfAbsent(
                    startNodeId, k -> new HashMap<Integer, StreamConfig>());

            config.setChainIndex(chainIndex);
            StreamNode node = streamGraph.getStreamNode(currentNodeId);
            config.setOperatorName(node.getOperatorName());
            chainedConfigs.get(startNodeId).put(currentNodeId, config);
        }

        config.setOperatorID(currentOperatorId);

        if (chainableOutputs.isEmpty()) {
            config.setChainEnd();
        }
        return transitiveOutEdges;

    } else {
        return new ArrayList<>();
    }
}

7.结语

        自此,本文解析的JobGraph生成源码,已完成了递归创建算子链与JobVertex链头节点生成的源码解析,后篇将继续解析JobGraph的JobEdge边创建、IntermediateDataSet数据集创建、算子链序列化的完整源码。


网站公告

今日签到

点亮在社区的每一天
去签到