Flink-1.19.0源码详解-番外补充3-StreamGraph图

发布于:2025-07-04 ⋅ 阅读:(19) ⋅ 点赞:(0)

1.StreamGraph图:

        StreamGraph是Flink流处理作业的第一个计算调度流图,它是从用户编写的 DataStream API程序转换而来的逻辑图。StreamGraph由StreamNode与StreamEdge组成,StreamNode为记录数据处理的节点,StreamEdge为连接两个StreamNode的边。

StreamGraph图解:

2.StreamNode节点:

        StreamNode是StreamGraph中的基本构建单元,代表了数据处理逻辑中的一个独立操作或阶段。每个StreamNode对应DataStream API中的一个转换操作(Transformation)。

StreamNode图解:

        StreamNode主要封装了算子操作逻辑StreamOperator和其SimpleOperatorFactory,算子的入边与出边集合、还记录SlotSharingGroup与ColocationGroup的等配置信息。

StreamNode完整源码:

public class StreamNode {

    private final int id;

    //并行度
    private int parallelism;
    /**
     * Maximum parallelism for this stream node. The maximum parallelism is the upper limit for
     * dynamic scaling and the number of key groups used for partitioned state.
     */
    private int maxParallelism;

    private ResourceSpec minResources = ResourceSpec.DEFAULT;
    private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
    private final Map<ManagedMemoryUseCase, Integer> managedMemoryOperatorScopeUseCaseWeights =
            new HashMap<>();
    private final Set<ManagedMemoryUseCase> managedMemorySlotScopeUseCases = new HashSet<>();
    private long bufferTimeout;
    private final String operatorName;
    private String operatorDescription;

    //slotSharingGroup与coLocationGroup配置
    private @Nullable String slotSharingGroup;
    private @Nullable String coLocationGroup;

    private KeySelector<?, ?>[] statePartitioners = new KeySelector[0];
    private TypeSerializer<?> stateKeySerializer;

    //封装算子的StreamOperatorFactory
    private StreamOperatorFactory<?> operatorFactory;
    private TypeSerializer<?>[] typeSerializersIn = new TypeSerializer[0];
    private TypeSerializer<?> typeSerializerOut;

    //算子的入边与出边
    private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
    private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();

    private final Class<? extends TaskInvokable> jobVertexClass;

    private InputFormat<?, ?> inputFormat;
    private OutputFormat<?> outputFormat;

    private String transformationUID;
    private String userHash;

    private final Map<Integer, StreamConfig.InputRequirement> inputRequirements = new HashMap<>();

    private @Nullable IntermediateDataSetID consumeClusterDatasetId;

    private boolean supportsConcurrentExecutionAttempts = true;

    private boolean parallelismConfigured = false;

    @VisibleForTesting
    public StreamNode(
            Integer id,
            @Nullable String slotSharingGroup,
            @Nullable String coLocationGroup,
            StreamOperator<?> operator,
            String operatorName,
            Class<? extends TaskInvokable> jobVertexClass) {
        this(
                id,
                slotSharingGroup,
                coLocationGroup,
                SimpleOperatorFactory.of(operator),
                operatorName,
                jobVertexClass);
    }

    public StreamNode(
            Integer id,
            @Nullable String slotSharingGroup,
            @Nullable String coLocationGroup,
            StreamOperatorFactory<?> operatorFactory,
            String operatorName,
            Class<? extends TaskInvokable> jobVertexClass) {
        this.id = id;
        this.operatorName = operatorName;
        this.operatorDescription = operatorName;
        this.operatorFactory = operatorFactory;
        this.jobVertexClass = jobVertexClass;
        this.slotSharingGroup = slotSharingGroup;
        this.coLocationGroup = coLocationGroup;
    }

    public void addInEdge(StreamEdge inEdge) {
        checkState(
                inEdges.stream().noneMatch(inEdge::equals),
                "Adding not unique edge = %s to existing inEdges = %s",
                inEdge,
                inEdges);
        if (inEdge.getTargetId() != getId()) {
            throw new IllegalArgumentException("Destination id doesn't match the StreamNode id");
        } else {
            inEdges.add(inEdge);
        }
    }

    public void addOutEdge(StreamEdge outEdge) {
        checkState(
                outEdges.stream().noneMatch(outEdge::equals),
                "Adding not unique edge = %s to existing outEdges = %s",
                outEdge,
                outEdges);
        if (outEdge.getSourceId() != getId()) {
            throw new IllegalArgumentException("Source id doesn't match the StreamNode id");
        } else {
            outEdges.add(outEdge);
        }
    }

    public List<StreamEdge> getOutEdges() {
        return outEdges;
    }

    public List<StreamEdge> getInEdges() {
        return inEdges;
    }

    public List<Integer> getOutEdgeIndices() {
        List<Integer> outEdgeIndices = new ArrayList<Integer>();

        for (StreamEdge edge : outEdges) {
            outEdgeIndices.add(edge.getTargetId());
        }

        return outEdgeIndices;
    }

    public List<Integer> getInEdgeIndices() {
        List<Integer> inEdgeIndices = new ArrayList<Integer>();

        for (StreamEdge edge : inEdges) {
            inEdgeIndices.add(edge.getSourceId());
        }

        return inEdgeIndices;
    }

    public int getId() {
        return id;
    }

    public int getParallelism() {
        return parallelism;
    }

    public void setParallelism(Integer parallelism) {
        setParallelism(parallelism, true);
    }

    void setParallelism(Integer parallelism, boolean parallelismConfigured) {
        this.parallelism = parallelism;
        this.parallelismConfigured =
                parallelismConfigured && parallelism != ExecutionConfig.PARALLELISM_DEFAULT;
    }

    /**
     * Get the maximum parallelism for this stream node.
     *
     * @return Maximum parallelism
     */
    int getMaxParallelism() {
        return maxParallelism;
    }

    /**
     * Set the maximum parallelism for this stream node.
     *
     * @param maxParallelism Maximum parallelism to be set
     */
    void setMaxParallelism(int maxParallelism) {
        this.maxParallelism = maxParallelism;
    }

    public ResourceSpec getMinResources() {
        return minResources;
    }

    public ResourceSpec getPreferredResources() {
        return preferredResources;
    }

    public void setResources(ResourceSpec minResources, ResourceSpec preferredResources) {
        this.minResources = minResources;
        this.preferredResources = preferredResources;
    }

    public void setManagedMemoryUseCaseWeights(
            Map<ManagedMemoryUseCase, Integer> operatorScopeUseCaseWeights,
            Set<ManagedMemoryUseCase> slotScopeUseCases) {
        managedMemoryOperatorScopeUseCaseWeights.putAll(operatorScopeUseCaseWeights);
        managedMemorySlotScopeUseCases.addAll(slotScopeUseCases);
    }

    public Map<ManagedMemoryUseCase, Integer> getManagedMemoryOperatorScopeUseCaseWeights() {
        return Collections.unmodifiableMap(managedMemoryOperatorScopeUseCaseWeights);
    }

    public Set<ManagedMemoryUseCase> getManagedMemorySlotScopeUseCases() {
        return Collections.unmodifiableSet(managedMemorySlotScopeUseCases);
    }

    public long getBufferTimeout() {
        return bufferTimeout;
    }

    public void setBufferTimeout(Long bufferTimeout) {
        this.bufferTimeout = bufferTimeout;
    }

    @VisibleForTesting
    public StreamOperator<?> getOperator() {
        return (StreamOperator<?>) ((SimpleOperatorFactory) operatorFactory).getOperator();
    }

    public StreamOperatorFactory<?> getOperatorFactory() {
        return operatorFactory;
    }

    public String getOperatorName() {
        return operatorName;
    }

    public String getOperatorDescription() {
        return operatorDescription;
    }

    public void setOperatorDescription(String operatorDescription) {
        this.operatorDescription = operatorDescription;
    }

    public void setSerializersIn(TypeSerializer<?>... typeSerializersIn) {
        checkArgument(typeSerializersIn.length > 0);
        // Unfortunately code above assumes type serializer can be null, while users of for example
        // getTypeSerializersIn would be confused by returning an array size of two with all
        // elements set to null...
        this.typeSerializersIn =
                Arrays.stream(typeSerializersIn)
                        .filter(typeSerializer -> typeSerializer != null)
                        .toArray(TypeSerializer<?>[]::new);
    }

    public TypeSerializer<?>[] getTypeSerializersIn() {
        return typeSerializersIn;
    }

    public TypeSerializer<?> getTypeSerializerOut() {
        return typeSerializerOut;
    }

    public void setSerializerOut(TypeSerializer<?> typeSerializerOut) {
        this.typeSerializerOut = typeSerializerOut;
    }

    public Class<? extends TaskInvokable> getJobVertexClass() {
        return jobVertexClass;
    }

    public InputFormat<?, ?> getInputFormat() {
        return inputFormat;
    }

    public void setInputFormat(InputFormat<?, ?> inputFormat) {
        this.inputFormat = inputFormat;
    }

    public OutputFormat<?> getOutputFormat() {
        return outputFormat;
    }

    public void setOutputFormat(OutputFormat<?> outputFormat) {
        this.outputFormat = outputFormat;
    }

    public void setSlotSharingGroup(@Nullable String slotSharingGroup) {
        this.slotSharingGroup = slotSharingGroup;
    }

    @Nullable
    public String getSlotSharingGroup() {
        return slotSharingGroup;
    }

    public void setCoLocationGroup(@Nullable String coLocationGroup) {
        this.coLocationGroup = coLocationGroup;
    }

    public @Nullable String getCoLocationGroup() {
        return coLocationGroup;
    }

    public boolean isSameSlotSharingGroup(StreamNode downstreamVertex) {
        return (slotSharingGroup == null && downstreamVertex.slotSharingGroup == null)
                || (slotSharingGroup != null
                        && slotSharingGroup.equals(downstreamVertex.slotSharingGroup));
    }

    @Override
    public String toString() {
        return operatorName + "-" + id;
    }

    public KeySelector<?, ?>[] getStatePartitioners() {
        return statePartitioners;
    }

    public void setStatePartitioners(KeySelector<?, ?>... statePartitioners) {
        checkArgument(statePartitioners.length > 0);
        this.statePartitioners = statePartitioners;
    }

    public TypeSerializer<?> getStateKeySerializer() {
        return stateKeySerializer;
    }

    public void setStateKeySerializer(TypeSerializer<?> stateKeySerializer) {
        this.stateKeySerializer = stateKeySerializer;
    }

    public String getTransformationUID() {
        return transformationUID;
    }

    void setTransformationUID(String transformationId) {
        this.transformationUID = transformationId;
    }

    public String getUserHash() {
        return userHash;
    }

    public void setUserHash(String userHash) {
        this.userHash = userHash;
    }

    public void addInputRequirement(
            int inputIndex, StreamConfig.InputRequirement inputRequirement) {
        inputRequirements.put(inputIndex, inputRequirement);
    }

    public Map<Integer, StreamConfig.InputRequirement> getInputRequirements() {
        return inputRequirements;
    }

    public Optional<OperatorCoordinator.Provider> getCoordinatorProvider(
            String operatorName, OperatorID operatorID) {
        if (operatorFactory instanceof CoordinatedOperatorFactory) {
            return Optional.of(
                    ((CoordinatedOperatorFactory) operatorFactory)
                            .getCoordinatorProvider(operatorName, operatorID));
        } else {
            return Optional.empty();
        }
    }

    boolean isParallelismConfigured() {
        return parallelismConfigured;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }

        StreamNode that = (StreamNode) o;
        return id == that.id;
    }

    @Override
    public int hashCode() {
        return id;
    }

    @Nullable
    public IntermediateDataSetID getConsumeClusterDatasetId() {
        return consumeClusterDatasetId;
    }

    public void setConsumeClusterDatasetId(
            @Nullable IntermediateDataSetID consumeClusterDatasetId) {
        this.consumeClusterDatasetId = consumeClusterDatasetId;
    }

    public boolean isSupportsConcurrentExecutionAttempts() {
        return supportsConcurrentExecutionAttempts;
    }

    public void setSupportsConcurrentExecutionAttempts(
            boolean supportsConcurrentExecutionAttempts) {
        this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts;
    }
}

3.StreamEdge边:

        StreamEdge是StreamGraph中连接各个StreamNode的边,它定义了数据在StreamNode节点之间的流动方式和分区策略。

StreamEdge图解:

        StreamEdge主要封装了StreamEdge连接的前后StreamNode的id,并记录确定分区选择的Partitioner。

StreamEdge源码:

public class StreamEdge implements Serializable {

    private static final long serialVersionUID = 1L;

    private static final long ALWAYS_FLUSH_BUFFER_TIMEOUT = 0L;

    private final String edgeId;

    //封装前后StreamNode节点的id
    private final int sourceId;
    private final int targetId;

    /**
     * Note that this field doesn't have to be unique among all {@link StreamEdge}s. It's enough if
     * this field ensures that all logical instances of {@link StreamEdge} are unique, and {@link
     * #hashCode()} are different and {@link #equals(Object)} returns false, for every possible pair
     * of {@link StreamEdge}. Especially among two different {@link StreamEdge}s that are connecting
     * the same pair of nodes.
     */
    private final int uniqueId;

    /** The type number of the input for co-tasks. */
    private final int typeNumber;
    /** The side-output tag (if any) of this {@link StreamEdge}. */
    private final OutputTag outputTag;

    //分区器 StreamPartitioner
    /** The {@link StreamPartitioner} on this {@link StreamEdge}. */
    private StreamPartitioner<?> outputPartitioner;

    /** The name of the operator in the source vertex. */
    private final String sourceOperatorName;

    /** The name of the operator in the target vertex. */
    private final String targetOperatorName;

    private final StreamExchangeMode exchangeMode;

    private long bufferTimeout;

    private boolean supportsUnalignedCheckpoints = true;

    private final IntermediateDataSetID intermediateDatasetIdToProduce;

    public StreamEdge(
            StreamNode sourceVertex,
            StreamNode targetVertex,
            int typeNumber,
            StreamPartitioner<?> outputPartitioner,
            OutputTag outputTag) {

        this(
                sourceVertex,
                targetVertex,
                typeNumber,
                ALWAYS_FLUSH_BUFFER_TIMEOUT,
                outputPartitioner,
                outputTag,
                StreamExchangeMode.UNDEFINED,
                0,
                null);
    }

    public StreamEdge(
            StreamNode sourceVertex,
            StreamNode targetVertex,
            int typeNumber,
            StreamPartitioner<?> outputPartitioner,
            OutputTag outputTag,
            StreamExchangeMode exchangeMode,
            int uniqueId,
            IntermediateDataSetID intermediateDatasetId) {

        this(
                sourceVertex,
                targetVertex,
                typeNumber,
                sourceVertex.getBufferTimeout(),
                outputPartitioner,
                outputTag,
                exchangeMode,
                uniqueId,
                intermediateDatasetId);
    }

    public StreamEdge(
            StreamNode sourceVertex,
            StreamNode targetVertex,
            int typeNumber,
            long bufferTimeout,
            StreamPartitioner<?> outputPartitioner,
            OutputTag outputTag,
            StreamExchangeMode exchangeMode,
            int uniqueId,
            IntermediateDataSetID intermediateDatasetId) {

        this.sourceId = sourceVertex.getId();
        this.targetId = targetVertex.getId();
        this.uniqueId = uniqueId;
        this.typeNumber = typeNumber;
        this.bufferTimeout = bufferTimeout;
        this.outputPartitioner = outputPartitioner;
        this.outputTag = outputTag;
        this.sourceOperatorName = sourceVertex.getOperatorName();
        this.targetOperatorName = targetVertex.getOperatorName();
        this.exchangeMode = checkNotNull(exchangeMode);
        this.intermediateDatasetIdToProduce = intermediateDatasetId;
        this.edgeId =
                sourceVertex
                        + "_"
                        + targetVertex
                        + "_"
                        + typeNumber
                        + "_"
                        + outputPartitioner
                        + "_"
                        + uniqueId;
    }

    public int getSourceId() {
        return sourceId;
    }

    public int getTargetId() {
        return targetId;
    }

    public int getTypeNumber() {
        return typeNumber;
    }

    public OutputTag getOutputTag() {
        return this.outputTag;
    }

    public StreamPartitioner<?> getPartitioner() {
        return outputPartitioner;
    }

    public StreamExchangeMode getExchangeMode() {
        return exchangeMode;
    }

    public void setPartitioner(StreamPartitioner<?> partitioner) {
        this.outputPartitioner = partitioner;
    }

    public void setBufferTimeout(long bufferTimeout) {
        checkArgument(bufferTimeout >= -1);
        this.bufferTimeout = bufferTimeout;
    }

    public long getBufferTimeout() {
        return bufferTimeout;
    }

    public void setSupportsUnalignedCheckpoints(boolean supportsUnalignedCheckpoints) {
        this.supportsUnalignedCheckpoints = supportsUnalignedCheckpoints;
    }

    public boolean supportsUnalignedCheckpoints() {
        return supportsUnalignedCheckpoints;
    }

    @Override
    public int hashCode() {
        return Objects.hash(edgeId, outputTag);
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }

        StreamEdge that = (StreamEdge) o;
        return Objects.equals(edgeId, that.edgeId) && Objects.equals(outputTag, that.outputTag);
    }

    @Override
    public String toString() {
        return "("
                + (sourceOperatorName + "-" + sourceId)
                + " -> "
                + (targetOperatorName + "-" + targetId)
                + ", typeNumber="
                + typeNumber
                + ", outputPartitioner="
                + outputPartitioner
                + ", exchangeMode="
                + exchangeMode
                + ", bufferTimeout="
                + bufferTimeout
                + ", outputTag="
                + outputTag
                + ", uniqueId="
                + uniqueId
                + ')';
    }

    public IntermediateDataSetID getIntermediateDatasetIdToProduce() {
        return intermediateDatasetIdToProduce;
    }
}

        本文是对StreamGraph的解释与补充,完整StreamGraph创建源码见《Flink-1.19.0源码详解4-StreamGraph生成》。


网站公告

今日签到

点亮在社区的每一天
去签到