Flink 1.13 源码解析——Graph的转化以及JobGraph的构建

发布于:2022-11-28 ⋅ 阅读:(583) ⋅ 点赞:(0)

Flink 1.13 源码解析 目录汇总

Flink 1.13 源码解析——Graph的转化以及StreamGraph的构建

Flink 1.13 源码解析——Graph的转化以及ExecutionGraph的构建

目录

前言

Flink JobGraph的构建

总结


前言

        在上一章中,我们分析了Flink StreamGraph的构建流程,在这一章中,我们来看看StreamGraph是如何转换成JobGraph的

概述

        Flink中的Graph概念有四层,分别为StreamGraph、JobGraph、ExecutionGraph和物理执行图。其中,StreamGraph和JobGraph是在Client端完成的,或者说是在org.apache.flink.client.cli.CliFrontend类反射执行我们逻辑代码的main方法时完成的,在完成JobGraph的构建后,再将JobGraph以文件形式发送给JobManager的Dispatcher组件,并开始接下来ExecutionGraph的转化工作。

        首先来看StreamGraph,StreamGraph中的每一个顶点都是一个StreamNode,这个StreamNode其实就是一个Operator,连接两个StreamNode的是StreamEdge对象。

        在StreamGraph向JobGraph转化过程中,会对StreamNode进行相应的优化,根据一些条件(看源码的时候将)进行StreamNode的优化合并,合并后就成为了一个JobVertex,而每一个JobVertex就是JobGraph中的端点。JobGraph的输出对象是IntermediateDataSet,存储这JobGraph的输出内容,在JobGraph中,连接上游端点输出和下游端点的边对象叫做JobEdge。

 

Flink JobGraph的构建

我们回到FLink 的流式案例里flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java,找到env.execute 方法:

DataStream<Tuple2<String, Integer>> counts =
        // split up the lines in pairs (2-tuples) containing: (word,1)
        // TODO
        text.flatMap(new Tokenizer())
                // group by the tuple field "0" and sum up tuple field "1"
                .keyBy(value -> value.f0)
                .sum(1);
// emit result
if (params.has("output")) {
    counts.writeAsText(params.get("output"));
} else {
    System.out.println("Printing result to stdout. Use --output to specify output path.");
    counts.print();
}
// execute program
// TODO
env.execute("Streaming WordCount");

我们点进env.execute方法里:

    public JobExecutionResult execute(String jobName) throws Exception {
        Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");

        // TODO 获取到StreamGraph,并执行StreamGraph
        return execute(getStreamGraph(jobName));
    }

在上一章里,StreamGraph的构建是通过这里的getStreamGraph方法构建的,我们这里直接来看execute方法,来看StreamGraph的执行:

    public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
        // 异步执行StreamGraph
        final JobClient jobClient = executeAsync(streamGraph);

        try {
            final JobExecutionResult jobExecutionResult;

            if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
                // TODO 通过get方法阻塞等待StreamGraph的提交结果
                jobExecutionResult = jobClient.getJobExecutionResult().get();
            } else {
                jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
            }

            jobListeners.forEach(
                    jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));

            return jobExecutionResult;
        } catch (Throwable t) {
            // get() on the JobExecutionResult Future will throw an ExecutionException. This
            // behaviour was largely not there in Flink versions before the PipelineExecutor
            // refactoring so we should strip that exception.
            Throwable strippedException = ExceptionUtils.stripExecutionException(t);

            jobListeners.forEach(
                    jobListener -> {
                        jobListener.onJobExecuted(null, strippedException);
                    });
            ExceptionUtils.rethrowException(strippedException);

            // never reached, only make javac happy
            return null;
        }
    }

此处的代码在之前的作业提交章节中分析过,这里就不再赘述,只说重点,我们来看StreamGraph的异步执行方法,executeAsync,点进来:

    @Internal
    public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
        checkNotNull(streamGraph, "StreamGraph cannot be null.");
        checkNotNull(
                configuration.get(DeploymentOptions.TARGET),
                "No execution.target specified in your configuration file.");

        final PipelineExecutorFactory executorFactory =
                executorServiceLoader.getExecutorFactory(configuration);

        checkNotNull(
                executorFactory,
                "Cannot find compatible factory for specified execution.target (=%s)",
                configuration.get(DeploymentOptions.TARGET));

        /*
        TODO 异步提交得到future
         */
        CompletableFuture<JobClient> jobClientFuture =
                executorFactory
                        .getExecutor(configuration)
                        .execute(streamGraph, configuration, userClassloader);

        try {
            // TODO 阻塞获取StreamGraph的执行结果
            JobClient jobClient = jobClientFuture.get();
            jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
            return jobClient;
        } catch (ExecutionException executionException) {
            final Throwable strippedException =
                    ExceptionUtils.stripExecutionException(executionException);
            jobListeners.forEach(
                    jobListener -> jobListener.onJobSubmitted(null, strippedException));

            throw new FlinkException(
                    String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
                    strippedException);
        }
    }

我们继续来看异步执行的步骤,点进下面这段代码的execute方法:

 /*
 TODO 异步提交得到future
  */
 CompletableFuture<JobClient> jobClientFuture =
         executorFactory
                 .getExecutor(configuration)
                 .execute(streamGraph, configuration, userClassloader);

选择AbstractSessionClusterExecutor实现:

    // TODO 此处的pipeline参数就是StreamGraph
    @Override
    public CompletableFuture<JobClient> execute(
            @Nonnull final Pipeline pipeline,
            @Nonnull final Configuration configuration,
            @Nonnull final ClassLoader userCodeClassloader)
            throws Exception {
        // TODO 通过StreamGraph构建JobGraph
        final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);

        /*
        TODO 到此为止,JobGraph已经构建完成,接下来开始JobGraph的提交
         */

        // TODO
        try (final ClusterDescriptor<ClusterID> clusterDescriptor =
                clusterClientFactory.createClusterDescriptor(configuration)) {
            final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
            checkState(clusterID != null);

            /*
            TODO 用于创建RestClusterClient的 Provider: ClusterClientProvider
             1. 内部会初始化得到RestClusterClient
             2. 初始化RestClusterClient的时候,会初始化他内部的成员变量: RestClient
             3. 在初始化RestClient的时候,也会初始化他内部的一个netty客户端

             TODO 提交Job的客户端: RestClusterClient中的RestClient中的Netty客户端
             TODO 接受Job的服务端: JobManager中启动的WebMonitorEndpoint中的Netty 服务端
             */
            final ClusterClientProvider<ClusterID> clusterClientProvider =
                    clusterDescriptor.retrieve(clusterID);
            ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();

            /*
            TODO 提交执行
            1. MiniClusterClient 本地执行
            2. RestClusterClient 提交到Flink Rest服务器接受处理
             */
            return clusterClient
                    // TODO 调用RestClient 内部的netty客户端进行提交
                    .submitJob(jobGraph)
                    .thenApplyAsync(
                            FunctionUtils.uncheckedFunction(
                                    jobId -> {
                                        ClientUtils.waitUntilJobInitializationFinished(
                                                () -> clusterClient.getJobStatus(jobId).get(),
                                                () -> clusterClient.requestJobResult(jobId).get(),
                                                userCodeClassloader);
                                        return jobId;
                                    }))
                    .thenApplyAsync(
                            jobID ->
                                    (JobClient)
                                            new ClusterClientJobClientAdapter<>(
                                                    clusterClientProvider,
                                                    jobID,
                                                    userCodeClassloader))
                    .whenCompleteAsync((ignored1, ignored2) -> clusterClient.close());
        }
    }

在这段代码里构建了JobGraph,并将JobGraph提交给JobManager中的Dispatcher,这里就不再去看作业提交流程,感兴趣的可以去阅读总目录里作业提交的相关章节,我们继续看JobGraph的构建流程,点进getJobGraph方法:

    public static JobGraph getJobGraph(
            @Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration)
            throws MalformedURLException {
        checkNotNull(pipeline);
        checkNotNull(configuration);

        final ExecutionConfigAccessor executionConfigAccessor =
                ExecutionConfigAccessor.fromConfiguration(configuration);
        // TODO 构建JobGraph
        final JobGraph jobGraph =
                FlinkPipelineTranslationUtil.getJobGraph(
                        pipeline, configuration, executionConfigAccessor.getParallelism());

        configuration
                .getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)
                .ifPresent(strJobID -> jobGraph.setJobID(JobID.fromHexString(strJobID)));

        jobGraph.addJars(executionConfigAccessor.getJars());
        jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
        jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());

        return jobGraph;
    }

继续点进FlinkPipelineTranslationUtil.getJobGraph:

    /** Transmogrifies the given {@link Pipeline} to a {@link JobGraph}. */
    public static JobGraph getJobGraph(
            Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) {

        // TODO 获取FLinkPipelineTranslator翻译器
        FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);

        // TODO 通过FLinkPipelineTranslator来转换获取到JobGraph
        // TODO 此处 pipeline = StreamGraph
        return pipelineTranslator.translateToJobGraph(
                pipeline, optimizerConfiguration, defaultParallelism);
    }

在这里首先获取了一个用来将StreamGraph转换为JobGraph的翻译去,然后使用翻译器来获取JobGraph,我们点进pipelineTranslator.translateToJobGraph方法,选择StreamGraphTranslator实现:

    @Override
    public JobGraph translateToJobGraph(
            Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) {
        checkArgument(
                pipeline instanceof StreamGraph, "Given pipeline is not a DataStream StreamGraph.");

        StreamGraph streamGraph = (StreamGraph) pipeline;
        // TODO 通过StreamGraph转换得到JobGraph
        return streamGraph.getJobGraph(null);
    }

我们继续点进streamGraph.getJobGraph(null);方法:

    public JobGraph getJobGraph(@Nullable JobID jobID) {
        // TODO
        return StreamingJobGraphGenerator.createJobGraph(this, jobID);
    }

再点:

    public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
        // TODO
        return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
    }

再进来:

    private JobGraph createJobGraph() {
        // TODO 做一些配置参数检查校验
        preValidate();
        jobGraph.setJobType(streamGraph.getJobType());

        jobGraph.enableApproximateLocalRecovery(
                streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());

        // Generate deterministic hashes for the nodes in order to identify them across
        // submission iff they didn't change.
        // TODO 为节点生成确定性哈希,以便在提交为发生变化的情况下对其进行标识
        Map<Integer, byte[]> hashes =
                defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

        // TODO 生成旧版Hash以向后兼容
        // Generate legacy version hashes for backwards compatibility
        List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
        for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
        }

        /* TODO
            设置Chaining,将可以Chain到一起的StreamNode chain在一起
            这里会生成相应的JobVertex、JobEdge、IntermediateDataSet对象
            把能chain在一起的Operator都合并了,变成了OperatorChain
         */
        setChaining(hashes, legacyHashes);

        // TODO 设置PhysicalEdges,将每个JobVertex的入边集合也序列化到该JobVertex的StreamConfig中
        setPhysicalEdges();

        setSlotSharingAndCoLocation();

        setManagedMemoryFraction(
                Collections.unmodifiableMap(jobVertices),
                Collections.unmodifiableMap(vertexConfigs),
                Collections.unmodifiableMap(chainedConfigs),
                id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
                id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());

        configureCheckpointing();

        jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());

        final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =
                JobGraphUtils.prepareUserArtifactEntries(
                        streamGraph.getUserArtifacts().stream()
                                .collect(Collectors.toMap(e -> e.f0, e -> e.f1)),
                        jobGraph.getJobID());

        for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
                distributedCacheEntries.entrySet()) {
            jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
        }

        // set the ExecutionConfig last when it has been finalized
        try {
            jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
        } catch (IOException e) {
            throw new IllegalConfigurationException(
                    "Could not serialize the ExecutionConfig."
                            + "This indicates that non-serializable types (like custom serializers) were registered");
        }

        return jobGraph;
    }

终于来到了我们的核心逻辑,这里首先做了一些参数校验,然后开始进行StreamNode的合并,如果判断相邻的两个StreamNode可以合并,则会合并为一个Operatorchain。

这里的逻辑大致可以理解为,挨个遍历节点:
1. 如果该节点是一个chain的头结点,就生成一个JobVertex
2. 如果不是头结点,就要把自身配置并入头节点,然后把头节点和自己的输出边相连,对于不能chain的节点,当做只有头节点处理即可
           
作用:
       能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。

我们继续来看Flink是如何判断两个节点能否chain在一起的,点进setChaining方法

    private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
        // we separate out the sources that run as inputs to another operator (chained inputs)
        // from the sources that needs to run as the main (head) operator.
        final Map<Integer, OperatorChainInfo> chainEntryPoints =
                buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
        final Collection<OperatorChainInfo> initialEntryPoints =
                chainEntryPoints.entrySet().stream()
                        .sorted(Comparator.comparing(Map.Entry::getKey))
                        .map(Map.Entry::getValue)
                        .collect(Collectors.toList());


        // iterate over a copy of the values, because this map gets concurrently modified
        for (OperatorChainInfo info : initialEntryPoints) {
            // TODO
            createChain(
                    info.getStartNodeId(),
                    1, // operators start at position 1 because 0 is for chained source inputs
                    info,
                    chainEntryPoints);
        }
    }

再点进createChain方法:

    private List<StreamEdge> createChain(
            final Integer currentNodeId,
            final int chainIndex,
            final OperatorChainInfo chainInfo,
            final Map<Integer, OperatorChainInfo> chainEntryPoints) {

        Integer startNodeId = chainInfo.getStartNodeId();
        if (!builtVertices.contains(startNodeId)) {

            List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();

            // TODO 存储可chain的StreamEdge
            List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
            // TODO 存储不可chain的StreamEdge
            List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

            // TODO 当前要处理的StreamNode
            StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);

            // TODO 遍历当前StreamNode的边,可以通过边拿到边两边的StreamNode,在判断能否chain在一起
            for (StreamEdge outEdge : currentNode.getOutEdges()) {
                // TODO 判断一个StreamEdge连接的上下游Operator(StreamNode)是否可以chain在一起
                if (isChainable(outEdge, streamGraph)) {
                    // TODO 加入可chain集合
                    chainableOutputs.add(outEdge);
                } else {
                    // TODO 加入不可chain集合
                    nonChainableOutputs.add(outEdge);
                }
            }

            // TODO 把可chain在一起的StreamEdge 两边的Operator chain在一起形成一个OperatorChain
            for (StreamEdge chainable : chainableOutputs) {
                // TODO 递归chain,如果可以chain在一起,这里的chainIndex + 1
                // TODO 可以理解为,两个StreamNode在chain在一起后,会再去判断能否和再之前的StreamNode继续Chain在一起
                transitiveOutEdges.addAll(
                        createChain(
                                chainable.getTargetId(),
                                chainIndex + 1,
                                chainInfo,
                                chainEntryPoints));
            }

            // TODO 不能chain在一起的话
            for (StreamEdge nonChainable : nonChainableOutputs) {
                transitiveOutEdges.add(nonChainable);
                createChain(
                        nonChainable.getTargetId(),
                        1, // operators start at position 1 because 0 is for chained source inputs
                        chainEntryPoints.computeIfAbsent(
                                nonChainable.getTargetId(),
                                (k) -> chainInfo.newChain(nonChainable.getTargetId())),
                        chainEntryPoints);
            }

            chainedNames.put(
                    currentNodeId,
                    createChainedName(
                            currentNodeId,
                            chainableOutputs,
                            Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
            chainedMinResources.put(
                    currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
            chainedPreferredResources.put(
                    currentNodeId,
                    createChainedPreferredResources(currentNodeId, chainableOutputs));

            OperatorID currentOperatorId =
                    chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId));

            if (currentNode.getInputFormat() != null) {
                getOrCreateFormatContainer(startNodeId)
                        .addInputFormat(currentOperatorId, currentNode.getInputFormat());
            }

            if (currentNode.getOutputFormat() != null) {
                getOrCreateFormatContainer(startNodeId)
                        .addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
            }

            // TODO 判断是否为chain中的第一个节点,是则开始创建JobVertex
            StreamConfig config =
                    currentNodeId.equals(startNodeId)
                            ? createJobVertex(startNodeId, chainInfo)
                            : new StreamConfig(new Configuration());

            setVertexConfig(
                    currentNodeId,
                    config,
                    chainableOutputs,
                    nonChainableOutputs,
                    chainInfo.getChainedSources());

            if (currentNodeId.equals(startNodeId)) {

                config.setChainStart();
                config.setChainIndex(chainIndex);
                config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
               
                // TODO 构建JobEdge和IntermediateDataSet
                for (StreamEdge edge : transitiveOutEdges) {
                    connect(startNodeId, edge);
                }

                config.setOutEdgesInOrder(transitiveOutEdges);
                config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

            } else {
                chainedConfigs.computeIfAbsent(
                        startNodeId, k -> new HashMap<Integer, StreamConfig>());

                config.setChainIndex(chainIndex);
                StreamNode node = streamGraph.getStreamNode(currentNodeId);
                config.setOperatorName(node.getOperatorName());
                chainedConfigs.get(startNodeId).put(currentNodeId, config);
            }

            config.setOperatorID(currentOperatorId);

            if (chainableOutputs.isEmpty()) {
                config.setChainEnd();
            }
            return transitiveOutEdges;

        } else {
            return new ArrayList<>();
        }
    }

代码很长,我们拆分来看,主要内容有:

1、首先初始化了两个集合,来存储可chain和不可chain的StreamEdge,

2、然后获取到当前要处理的StreamNode

3、遍历当前StreamNode的边,来判断边两边上下游的StreamNode能否chain在一起,

4、将可以chain和不能chain的StreamEdge分别放入各自的集合

5、然后将可以chain的StreamNode,chain在一起形成一个OperatorChain,然后继续递归调用,判断chain完成后再下游的StreamNode能否继续chain在一起

6、将不能chain在一起的StreamNode取出,同样向下递归调用,判断下游的StreamNode能否和再下游的StreamNode合并。

7、在递归完成后判断当前节点是否是chain中的第一个StreamNode,如果是则开始构建JobVertex

8、同样判断当前节点是否是chain中的第一个StreamNode,如果是则开始构建JobEdge和IntermediateDataSet

我们来看能否chain在一起的判断依据,共有9个判断,我们点进isChainable方法:

    public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
        // TODO 获取下游端点
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

        // TODO 下游顶点的输入边只能是1,才可以进行chain的后续判断操作
        // TODO 除此以外还要满足9个条件
        return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph);

在这里首先进行了一个判断,判断下游端点的输入边只能是1才可以进行chain的后续判断操作,除此以外还需要满足9个条件,我们点进isChainableInput(edge, streamGraph)方法:

private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {
        // TODO 获取上下游端点
        StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

        // TODO 判断是否能chain在一起
        if (!(
                // TODO 上下游算子实例处于同一个SlotSharingGroup中
                upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
                        // TODO 这里面有3个条件
                && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
                        // TODO 两个算子建的物理分区逻辑是 ForwardPartitioner
                && (edge.getPartitioner() instanceof ForwardPartitioner)
                        // TODO 两个算子间的shuffle方式不等于批处理模式
                && edge.getShuffleMode() != ShuffleMode.BATCH
                        // TODO 上下游算子实例的并行度相同
                && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
                        // TODO 启动了chain
                && streamGraph.isChainingEnabled())) {

            return false;
        }

        // check that we do not have a union operation, because unions currently only work
        // through the network/byte-channel stack.
        // we check that by testing that each "type" (which means input position) is used only once
        for (StreamEdge inEdge : downStreamVertex.getInEdges()) {
            if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {
                return false;
            }
        }
        return true;
    }

这里有5个判断条件,分别是:

1、上下游算子实例处于同一个SlotSharingGroup中

2、两个算子间的物理分区逻辑是ForwardPartitioner

3、两个算子间的shuffle方式不是批处理模式

4、上下游算子实例的并行度相同

5、开启了chain

除此以外还有三个条件在areOperatorsChainable()方法里,我们点进来:

@VisibleForTesting
    static boolean areOperatorsChainable(
            StreamNode upStreamVertex, StreamNode downStreamVertex, StreamGraph streamGraph) {
        // TODO 前后算子不能为空
        StreamOperatorFactory<?> upStreamOperator = upStreamVertex.getOperatorFactory();
        StreamOperatorFactory<?> downStreamOperator = downStreamVertex.getOperatorFactory();
        if (downStreamOperator == null || upStreamOperator == null) {
            return false;
        }

        // yielding operators cannot be chained to legacy sources
        // unfortunately the information that vertices have been chained is not preserved at this
        // point
        if (downStreamOperator instanceof YieldingOperatorFactory
                && getHeadOperator(upStreamVertex, streamGraph).isLegacySource()) {
            return false;
        }

        // we use switch/case here to make sure this is exhaustive if ever values are added to the
        // ChainingStrategy enum
        boolean isChainable;

        // TODO 上游节点的chain策略为ALWAYS或HEAD(HEAD只能与下游连接,不能与上游连接,Source默认是HEAD)
        switch (upStreamOperator.getChainingStrategy()) {
            // TODO NEVER 表示该运算符将不会被链接到之前或之后的运算符
            case NEVER:
                isChainable = false;
                break;
                // TODO ALWAYS 表示 Operators将竭尽所能的连接在一起
            case ALWAYS:
                // TODO 运算符不会连接到上游,但是下游算子可以连接到此运算符
            case HEAD:
            case HEAD_WITH_SOURCES:
                isChainable = true;
                break;
            default:
                throw new RuntimeException(
                        "Unknown chaining strategy: " + upStreamOperator.getChainingStrategy());
        }

        // TODO 下游节点的chain策略为ALWAYS(可以与上下游连接,map、flatmap、filter等默认是ALWAYS)
        switch (downStreamOperator.getChainingStrategy()) {
            case NEVER:
            case HEAD:
                isChainable = false;
                break;
            case ALWAYS:
                // keep the value from upstream
                break;
            case HEAD_WITH_SOURCES:
                // only if upstream is a source
                isChainable &= (upStreamOperator instanceof SourceOperatorFactory);
                break;
            default:
                throw new RuntimeException(
                        "Unknown chaining strategy: " + upStreamOperator.getChainingStrategy());
        }

        return isChainable;
    }

在这里进行了三个判断:

1、上下游算子不能为空

2、上有节点的chain策略应当为ALWAYS、HEAD或HEAD_WITH_SOURCES,而不能为NEVER

3、下有节点的chain策略应当为ALWAYS或者当策略为HEAD_WITH_SOURCES时判断上游算子是不是Source算子,但是不能为HEAD策略或NEVER策略。

到这里StreamGraph到JobGraph的核心内容就分析完了。

总结

StreamGraph到JobGraph的转化中,我认为最重要的两点就是StreamNode怎么合并,和根据什么样的条件来判断是否应该合并,对此有以下总结内容:

9个合并条件:

1、下游端点的输入边是否为1

2、上下游算子实例处于同一个SlotSharingGroup中

3、两个算子间的物理分区逻辑是ForwardPartitioner

4、两个算子间的shuffle方式不是批处理模式

5、上下游算子实例的并行度相同

6、开启了chain

7、上下游算子不能为空

8、上有节点的chain策略应当为ALWAYS、HEAD或HEAD_WITH_SOURCES,而不能为NEVER

9、下有节点的chain策略应当为ALWAYS或者当策略为HEAD_WITH_SOURCES时判断上游算子是不是Source算子,但是不能为HEAD策略或NEVER策略。

如何合并:

1、首先初始化了两个集合,来存储可chain和不可chain的StreamEdge,

2、然后获取到当前要处理的StreamNode

3、遍历当前StreamNode的边,来判断边两边上下游的StreamNode能否chain在一起,

4、将可以chain和不能chain的StreamEdge分别放入各自的集合

5、然后将可以chain的StreamNode,chain在一起形成一个OperatorChain,然后继续递归调用,判断chain完成后再下游的StreamNode能否继续chain在一起

6、将不能chain在一起的StreamNode取出,同样向下递归调用,判断下游的StreamNode能否和再下游的StreamNode合并。

7、在递归完成后判断当前节点是否是chain中的第一个StreamNode,如果是则开始构建JobVertex

8、同样判断当前节点是否是chain中的第一个StreamNode,如果是则开始构建JobEdge和IntermediateDataSet

在下一章中我们来分析JobGraph向ExecutionGraph转化的核心内容。

本文含有隐藏内容,请 开通VIP 后查看