1、JobGraph (JobManager)
JobGraph 生成时,通过 ChainingStrategy
连接算子,最终在 Task 中生成 ChainedDriver
链表。
StreamingJobGraphGenerator
createJobGraph() 构建jobGrapch 包含 JobVertex
setChaining() 构建算子链
isChainable() 是否可以合并算子
-- 算子链没有禁用
-- 下游算子非head
-- 并行度一致
-- 相同slot
-- partitioner Forward
createChain 递归构建JobVertex
JobVertex jobVertex = jobVertexBuildContext.getJobVertex(startNodeId);
if (jobVertex == null) {
jobVertex =
createJobVertex(
chainInfo, serializationExecutor, jobVertexBuildContext);
}
2、ExecutionGraph(JobManager)
ExecutionVertex
:代表一个并行子任务(即一个算子链的实例)
- 作用:将
JobVertex
拆解为并行子任务,每个子任务对应一个ExecutionVertex
。 - 核心对象:
ExecutionJobVertex
:与JobVertex
一一对应,管理并行实例。ExecutionVertex
:代表一个并行子任务(即一个算子链的实例)
3、物理执行计划(TaskManager)
• 作用:将 ExecutionVertex
调度到 TaskManager 的 Slot 中运行。
OperatorChain
管理算子链的结构,负责算子的初始化、状态管理和数据传递。
<pre>
first
\
main (multi-input) -> ... -> tail
/
second
</pre>
@Nullable protected final StreamOperatorWrapper<OUT, OP> mainOperatorWrapper;
@Nullable protected final StreamOperatorWrapper<?, ?> firstOperatorWrapper;
@Nullable protected final StreamOperatorWrapper<?, ?> tailOperatorWrapper;
StreamOperatorWrapper
包装单个算子,维护算子间的链式关系,处理数据在算子间的流转。
private StreamOperatorWrapper<?, ?> previous;
private StreamOperatorWrapper<?, ?> next;
*StreamIterationHead/StreamIterationTail
* 迭代场景下的特殊任务,分别处理迭代的头部(反馈输入)和尾部(反馈输出),通过阻塞队列实现迭代数据循环
4、链化策略类型
ALWAYS
:默认策略,尽可能与上下游算子链化(如map
、filter
)。HEAD
:仅与下游链化,不与上游链化(如Source
算子)。NEVER
:独立成 Task,不与任何算子链化
5、算子链执行流程与优化技术
- 链化过程
- 编译阶段:将满足条件的算子合并为
OperatorChain
,生成一个Task
而非多个独立 Task。 - 运行时:
ChainedDriver
在单线程内按拓扑顺序执行链内算子,数据通过CopyingChainingOutput
(默认深拷贝)或BroadcastingOutput
(对象重用)传递。
- 编译阶段:将满足条件的算子合并为
- 性能权衡
- 优点:减少线程数、网络 IO,提升吞吐并降低延迟。
- 缺点:长链路可能阻塞,需通过
startNewChain()
或disableChaining()
手动拆分