1.StreamGraph图:
StreamGraph是Flink流处理作业的第一个计算调度流图,它是从用户编写的 DataStream API程序转换而来的逻辑图。StreamGraph由StreamNode与StreamEdge组成,StreamNode为记录数据处理的节点,StreamEdge为连接两个StreamNode的边。
StreamGraph图解:
2.StreamNode节点:
StreamNode是StreamGraph中的基本构建单元,代表了数据处理逻辑中的一个独立操作或阶段。每个StreamNode对应DataStream API中的一个转换操作(Transformation)。
StreamNode图解:
StreamNode主要封装了算子操作逻辑StreamOperator和其SimpleOperatorFactory,算子的入边与出边集合、还记录SlotSharingGroup与ColocationGroup的等配置信息。
StreamNode完整源码:
public class StreamNode {
private final int id;
//并行度
private int parallelism;
/**
* Maximum parallelism for this stream node. The maximum parallelism is the upper limit for
* dynamic scaling and the number of key groups used for partitioned state.
*/
private int maxParallelism;
private ResourceSpec minResources = ResourceSpec.DEFAULT;
private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
private final Map<ManagedMemoryUseCase, Integer> managedMemoryOperatorScopeUseCaseWeights =
new HashMap<>();
private final Set<ManagedMemoryUseCase> managedMemorySlotScopeUseCases = new HashSet<>();
private long bufferTimeout;
private final String operatorName;
private String operatorDescription;
//slotSharingGroup与coLocationGroup配置
private @Nullable String slotSharingGroup;
private @Nullable String coLocationGroup;
private KeySelector<?, ?>[] statePartitioners = new KeySelector[0];
private TypeSerializer<?> stateKeySerializer;
//封装算子的StreamOperatorFactory
private StreamOperatorFactory<?> operatorFactory;
private TypeSerializer<?>[] typeSerializersIn = new TypeSerializer[0];
private TypeSerializer<?> typeSerializerOut;
//算子的入边与出边
private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
private final Class<? extends TaskInvokable> jobVertexClass;
private InputFormat<?, ?> inputFormat;
private OutputFormat<?> outputFormat;
private String transformationUID;
private String userHash;
private final Map<Integer, StreamConfig.InputRequirement> inputRequirements = new HashMap<>();
private @Nullable IntermediateDataSetID consumeClusterDatasetId;
private boolean supportsConcurrentExecutionAttempts = true;
private boolean parallelismConfigured = false;
@VisibleForTesting
public StreamNode(
Integer id,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperator<?> operator,
String operatorName,
Class<? extends TaskInvokable> jobVertexClass) {
this(
id,
slotSharingGroup,
coLocationGroup,
SimpleOperatorFactory.of(operator),
operatorName,
jobVertexClass);
}
public StreamNode(
Integer id,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperatorFactory<?> operatorFactory,
String operatorName,
Class<? extends TaskInvokable> jobVertexClass) {
this.id = id;
this.operatorName = operatorName;
this.operatorDescription = operatorName;
this.operatorFactory = operatorFactory;
this.jobVertexClass = jobVertexClass;
this.slotSharingGroup = slotSharingGroup;
this.coLocationGroup = coLocationGroup;
}
public void addInEdge(StreamEdge inEdge) {
checkState(
inEdges.stream().noneMatch(inEdge::equals),
"Adding not unique edge = %s to existing inEdges = %s",
inEdge,
inEdges);
if (inEdge.getTargetId() != getId()) {
throw new IllegalArgumentException("Destination id doesn't match the StreamNode id");
} else {
inEdges.add(inEdge);
}
}
public void addOutEdge(StreamEdge outEdge) {
checkState(
outEdges.stream().noneMatch(outEdge::equals),
"Adding not unique edge = %s to existing outEdges = %s",
outEdge,
outEdges);
if (outEdge.getSourceId() != getId()) {
throw new IllegalArgumentException("Source id doesn't match the StreamNode id");
} else {
outEdges.add(outEdge);
}
}
public List<StreamEdge> getOutEdges() {
return outEdges;
}
public List<StreamEdge> getInEdges() {
return inEdges;
}
public List<Integer> getOutEdgeIndices() {
List<Integer> outEdgeIndices = new ArrayList<Integer>();
for (StreamEdge edge : outEdges) {
outEdgeIndices.add(edge.getTargetId());
}
return outEdgeIndices;
}
public List<Integer> getInEdgeIndices() {
List<Integer> inEdgeIndices = new ArrayList<Integer>();
for (StreamEdge edge : inEdges) {
inEdgeIndices.add(edge.getSourceId());
}
return inEdgeIndices;
}
public int getId() {
return id;
}
public int getParallelism() {
return parallelism;
}
public void setParallelism(Integer parallelism) {
setParallelism(parallelism, true);
}
void setParallelism(Integer parallelism, boolean parallelismConfigured) {
this.parallelism = parallelism;
this.parallelismConfigured =
parallelismConfigured && parallelism != ExecutionConfig.PARALLELISM_DEFAULT;
}
/**
* Get the maximum parallelism for this stream node.
*
* @return Maximum parallelism
*/
int getMaxParallelism() {
return maxParallelism;
}
/**
* Set the maximum parallelism for this stream node.
*
* @param maxParallelism Maximum parallelism to be set
*/
void setMaxParallelism(int maxParallelism) {
this.maxParallelism = maxParallelism;
}
public ResourceSpec getMinResources() {
return minResources;
}
public ResourceSpec getPreferredResources() {
return preferredResources;
}
public void setResources(ResourceSpec minResources, ResourceSpec preferredResources) {
this.minResources = minResources;
this.preferredResources = preferredResources;
}
public void setManagedMemoryUseCaseWeights(
Map<ManagedMemoryUseCase, Integer> operatorScopeUseCaseWeights,
Set<ManagedMemoryUseCase> slotScopeUseCases) {
managedMemoryOperatorScopeUseCaseWeights.putAll(operatorScopeUseCaseWeights);
managedMemorySlotScopeUseCases.addAll(slotScopeUseCases);
}
public Map<ManagedMemoryUseCase, Integer> getManagedMemoryOperatorScopeUseCaseWeights() {
return Collections.unmodifiableMap(managedMemoryOperatorScopeUseCaseWeights);
}
public Set<ManagedMemoryUseCase> getManagedMemorySlotScopeUseCases() {
return Collections.unmodifiableSet(managedMemorySlotScopeUseCases);
}
public long getBufferTimeout() {
return bufferTimeout;
}
public void setBufferTimeout(Long bufferTimeout) {
this.bufferTimeout = bufferTimeout;
}
@VisibleForTesting
public StreamOperator<?> getOperator() {
return (StreamOperator<?>) ((SimpleOperatorFactory) operatorFactory).getOperator();
}
public StreamOperatorFactory<?> getOperatorFactory() {
return operatorFactory;
}
public String getOperatorName() {
return operatorName;
}
public String getOperatorDescription() {
return operatorDescription;
}
public void setOperatorDescription(String operatorDescription) {
this.operatorDescription = operatorDescription;
}
public void setSerializersIn(TypeSerializer<?>... typeSerializersIn) {
checkArgument(typeSerializersIn.length > 0);
// Unfortunately code above assumes type serializer can be null, while users of for example
// getTypeSerializersIn would be confused by returning an array size of two with all
// elements set to null...
this.typeSerializersIn =
Arrays.stream(typeSerializersIn)
.filter(typeSerializer -> typeSerializer != null)
.toArray(TypeSerializer<?>[]::new);
}
public TypeSerializer<?>[] getTypeSerializersIn() {
return typeSerializersIn;
}
public TypeSerializer<?> getTypeSerializerOut() {
return typeSerializerOut;
}
public void setSerializerOut(TypeSerializer<?> typeSerializerOut) {
this.typeSerializerOut = typeSerializerOut;
}
public Class<? extends TaskInvokable> getJobVertexClass() {
return jobVertexClass;
}
public InputFormat<?, ?> getInputFormat() {
return inputFormat;
}
public void setInputFormat(InputFormat<?, ?> inputFormat) {
this.inputFormat = inputFormat;
}
public OutputFormat<?> getOutputFormat() {
return outputFormat;
}
public void setOutputFormat(OutputFormat<?> outputFormat) {
this.outputFormat = outputFormat;
}
public void setSlotSharingGroup(@Nullable String slotSharingGroup) {
this.slotSharingGroup = slotSharingGroup;
}
@Nullable
public String getSlotSharingGroup() {
return slotSharingGroup;
}
public void setCoLocationGroup(@Nullable String coLocationGroup) {
this.coLocationGroup = coLocationGroup;
}
public @Nullable String getCoLocationGroup() {
return coLocationGroup;
}
public boolean isSameSlotSharingGroup(StreamNode downstreamVertex) {
return (slotSharingGroup == null && downstreamVertex.slotSharingGroup == null)
|| (slotSharingGroup != null
&& slotSharingGroup.equals(downstreamVertex.slotSharingGroup));
}
@Override
public String toString() {
return operatorName + "-" + id;
}
public KeySelector<?, ?>[] getStatePartitioners() {
return statePartitioners;
}
public void setStatePartitioners(KeySelector<?, ?>... statePartitioners) {
checkArgument(statePartitioners.length > 0);
this.statePartitioners = statePartitioners;
}
public TypeSerializer<?> getStateKeySerializer() {
return stateKeySerializer;
}
public void setStateKeySerializer(TypeSerializer<?> stateKeySerializer) {
this.stateKeySerializer = stateKeySerializer;
}
public String getTransformationUID() {
return transformationUID;
}
void setTransformationUID(String transformationId) {
this.transformationUID = transformationId;
}
public String getUserHash() {
return userHash;
}
public void setUserHash(String userHash) {
this.userHash = userHash;
}
public void addInputRequirement(
int inputIndex, StreamConfig.InputRequirement inputRequirement) {
inputRequirements.put(inputIndex, inputRequirement);
}
public Map<Integer, StreamConfig.InputRequirement> getInputRequirements() {
return inputRequirements;
}
public Optional<OperatorCoordinator.Provider> getCoordinatorProvider(
String operatorName, OperatorID operatorID) {
if (operatorFactory instanceof CoordinatedOperatorFactory) {
return Optional.of(
((CoordinatedOperatorFactory) operatorFactory)
.getCoordinatorProvider(operatorName, operatorID));
} else {
return Optional.empty();
}
}
boolean isParallelismConfigured() {
return parallelismConfigured;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StreamNode that = (StreamNode) o;
return id == that.id;
}
@Override
public int hashCode() {
return id;
}
@Nullable
public IntermediateDataSetID getConsumeClusterDatasetId() {
return consumeClusterDatasetId;
}
public void setConsumeClusterDatasetId(
@Nullable IntermediateDataSetID consumeClusterDatasetId) {
this.consumeClusterDatasetId = consumeClusterDatasetId;
}
public boolean isSupportsConcurrentExecutionAttempts() {
return supportsConcurrentExecutionAttempts;
}
public void setSupportsConcurrentExecutionAttempts(
boolean supportsConcurrentExecutionAttempts) {
this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts;
}
}
3.StreamEdge边:
StreamEdge是StreamGraph中连接各个StreamNode的边,它定义了数据在StreamNode节点之间的流动方式和分区策略。
StreamEdge图解:
StreamEdge主要封装了StreamEdge连接的前后StreamNode的id,并记录确定分区选择的Partitioner。
StreamEdge源码:
public class StreamEdge implements Serializable {
private static final long serialVersionUID = 1L;
private static final long ALWAYS_FLUSH_BUFFER_TIMEOUT = 0L;
private final String edgeId;
//封装前后StreamNode节点的id
private final int sourceId;
private final int targetId;
/**
* Note that this field doesn't have to be unique among all {@link StreamEdge}s. It's enough if
* this field ensures that all logical instances of {@link StreamEdge} are unique, and {@link
* #hashCode()} are different and {@link #equals(Object)} returns false, for every possible pair
* of {@link StreamEdge}. Especially among two different {@link StreamEdge}s that are connecting
* the same pair of nodes.
*/
private final int uniqueId;
/** The type number of the input for co-tasks. */
private final int typeNumber;
/** The side-output tag (if any) of this {@link StreamEdge}. */
private final OutputTag outputTag;
//分区器 StreamPartitioner
/** The {@link StreamPartitioner} on this {@link StreamEdge}. */
private StreamPartitioner<?> outputPartitioner;
/** The name of the operator in the source vertex. */
private final String sourceOperatorName;
/** The name of the operator in the target vertex. */
private final String targetOperatorName;
private final StreamExchangeMode exchangeMode;
private long bufferTimeout;
private boolean supportsUnalignedCheckpoints = true;
private final IntermediateDataSetID intermediateDatasetIdToProduce;
public StreamEdge(
StreamNode sourceVertex,
StreamNode targetVertex,
int typeNumber,
StreamPartitioner<?> outputPartitioner,
OutputTag outputTag) {
this(
sourceVertex,
targetVertex,
typeNumber,
ALWAYS_FLUSH_BUFFER_TIMEOUT,
outputPartitioner,
outputTag,
StreamExchangeMode.UNDEFINED,
0,
null);
}
public StreamEdge(
StreamNode sourceVertex,
StreamNode targetVertex,
int typeNumber,
StreamPartitioner<?> outputPartitioner,
OutputTag outputTag,
StreamExchangeMode exchangeMode,
int uniqueId,
IntermediateDataSetID intermediateDatasetId) {
this(
sourceVertex,
targetVertex,
typeNumber,
sourceVertex.getBufferTimeout(),
outputPartitioner,
outputTag,
exchangeMode,
uniqueId,
intermediateDatasetId);
}
public StreamEdge(
StreamNode sourceVertex,
StreamNode targetVertex,
int typeNumber,
long bufferTimeout,
StreamPartitioner<?> outputPartitioner,
OutputTag outputTag,
StreamExchangeMode exchangeMode,
int uniqueId,
IntermediateDataSetID intermediateDatasetId) {
this.sourceId = sourceVertex.getId();
this.targetId = targetVertex.getId();
this.uniqueId = uniqueId;
this.typeNumber = typeNumber;
this.bufferTimeout = bufferTimeout;
this.outputPartitioner = outputPartitioner;
this.outputTag = outputTag;
this.sourceOperatorName = sourceVertex.getOperatorName();
this.targetOperatorName = targetVertex.getOperatorName();
this.exchangeMode = checkNotNull(exchangeMode);
this.intermediateDatasetIdToProduce = intermediateDatasetId;
this.edgeId =
sourceVertex
+ "_"
+ targetVertex
+ "_"
+ typeNumber
+ "_"
+ outputPartitioner
+ "_"
+ uniqueId;
}
public int getSourceId() {
return sourceId;
}
public int getTargetId() {
return targetId;
}
public int getTypeNumber() {
return typeNumber;
}
public OutputTag getOutputTag() {
return this.outputTag;
}
public StreamPartitioner<?> getPartitioner() {
return outputPartitioner;
}
public StreamExchangeMode getExchangeMode() {
return exchangeMode;
}
public void setPartitioner(StreamPartitioner<?> partitioner) {
this.outputPartitioner = partitioner;
}
public void setBufferTimeout(long bufferTimeout) {
checkArgument(bufferTimeout >= -1);
this.bufferTimeout = bufferTimeout;
}
public long getBufferTimeout() {
return bufferTimeout;
}
public void setSupportsUnalignedCheckpoints(boolean supportsUnalignedCheckpoints) {
this.supportsUnalignedCheckpoints = supportsUnalignedCheckpoints;
}
public boolean supportsUnalignedCheckpoints() {
return supportsUnalignedCheckpoints;
}
@Override
public int hashCode() {
return Objects.hash(edgeId, outputTag);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StreamEdge that = (StreamEdge) o;
return Objects.equals(edgeId, that.edgeId) && Objects.equals(outputTag, that.outputTag);
}
@Override
public String toString() {
return "("
+ (sourceOperatorName + "-" + sourceId)
+ " -> "
+ (targetOperatorName + "-" + targetId)
+ ", typeNumber="
+ typeNumber
+ ", outputPartitioner="
+ outputPartitioner
+ ", exchangeMode="
+ exchangeMode
+ ", bufferTimeout="
+ bufferTimeout
+ ", outputTag="
+ outputTag
+ ", uniqueId="
+ uniqueId
+ ')';
}
public IntermediateDataSetID getIntermediateDatasetIdToProduce() {
return intermediateDatasetIdToProduce;
}
}
本文是对StreamGraph的解释与补充,完整StreamGraph创建源码见《Flink-1.19.0源码详解4-StreamGraph生成》。