要真正驾驭 Flink 并构建出高效、稳定、可扩展的流处理应用,仅仅停留在 API 的表面使用是远远不够的。深入理解其内部的运行机制,洞悉数据从代码到分布式执行的完整生命周期,以及明晰各个核心组件之间错综复杂而又协同工作的关系,对于我们进行性能调优、故障排查以及设计更优的应用程序架构至关重要。
本文将带领大家一起揭开 Flink 的神秘面纱,我们将首先详细梳理一个 Flink 作业从客户端提交到在 TaskManager 上具体执行的完整启动流程,理解 StreamGraph
、JobGraph
到 ExecutionGraph
的演变。
紧接着,我们将深入剖析 Flink 中那些我们既熟悉又可能感到困惑的核心概念,如 DataStream
如何通过 Transformation
承载用户的 UDF
,最终又是如何在 StreamOperator
和 StreamTask
中焕发生机,以及它们之间是如何相互关联、协同工作的。希望通过这次探索,能帮助构建起对 Flink 内部原理更为清晰和系统的认识。
Flink 任务启动流程
客户端(Client)准备与提交作业:
- 用户通过 Flink 客户端(例如,执行
flink run
命令的 CLI,或者通过 REST API 提交的程序,或者在 IDE 中直接运行)提交一个 Flink 应用程序(例如,用 DataStream API 编写的程序)。 - 在客户端,用户的程序(例如
StreamExecutionEnvironment.execute()
)首先会将用户定义的DataStream
操作转换为一个StreamGraph
。StreamGraph
是作业的最初的、面向流的逻辑表示,它包含了所有的算子、UDF、并行度设置、数据流向等信息。 - 客户端随后将这个
StreamGraph
提交给 JobManager(具体来说是 JobManager 中的 Dispatcher 组件)。
- 用户通过 Flink 客户端(例如,执行
JobManager 接收与处理作业:【stream Graph转化Job Graph 版本有变动,但是通信链路是一致的】
- Dispatcher: JobManager 中的 Dispatcher 接收到
StreamGraph
后,会为这个作业启动一个新的 JobMaster。Dispatcher 负责作业的提交、JobMaster 的生命周期管理,并提供 REST 接口。 - JobMaster: 每个作业都有其专属的 JobMaster。JobMaster 负责该作业的整个生命周期管理。
StreamGraph
->JobGraph
: JobMaster 首先将接收到的StreamGraph
转换为JobGraph
。JobGraph
是一个更通用的、并行的作业表示,它将StreamGraph
中的算子链(Operator Chains)优化考虑在内,并确定了JobVertex
(逻辑上的并行算子)。每个JobVertex
对应StreamGraph
中的一个或多个(链式)算子。- 如
docs/content.zh/docs/internals/job_scheduling.md
中提到:“JobManager 会接收到一个 JobGraph,用来描述由多个算子顶点 (JobVertex) 组成的数据流图”。
- 如
JobGraph
->ExecutionGraph
: JobMaster 接着将JobGraph
转换为ExecutionGraph
。ExecutionGraph
是作业的物理执行计划,是JobGraph
的并行化版本。- 它将每个
JobVertex
根据其并行度展开为多个并行的ExecutionVertex
。 - 每个
ExecutionVertex
代表了一个逻辑算子(或算子链)的一个并行实例。 ExecutionGraph
中的每个ExecutionVertex
会有一个或多个Execution
对象来跟踪其执行尝试(例如,初次执行、故障恢复后的重试)。ExecutionGraph
是 JobMaster 调度和监控作业执行的核心数据结构。
- 它将每个
- 调度器 (Scheduler): JobMaster 内部的调度器负责将
ExecutionGraph
中的任务(具体来说是Execution
对象代表的执行尝试)部署到可用的 TaskManager Slot 上。- 调度器会向 ResourceManager (如果使用了如 YARN, Kubernetes 等资源管理器,或者是 Flink 自身的 Standalone ResourceManager) 请求所需的 Task Slot。
- 一旦 Slot 分配成功,调度器就会将任务部署到相应的 TaskManager。
- Dispatcher: JobManager 中的 Dispatcher 接收到
TaskManager 执行任务:
- TaskManager 接收到 JobManager (JobMaster) 分配的任务部署指令后,会在其管理的某个空闲的 Task Slot 中为该任务启动执行。
- Slot 与线程: 一个 Task Slot 代表了 TaskManager 提供的一份固定的计算资源(通常与 CPU核心数相关)。一个 Task Slot 会运行一个或多个(如果启用了 Slot Sharing Group 且任务属于同一共享组)任务,每个任务(Task)都在其自己的独立线程中执行。 这个线程本身不属于槽而属于task,Slot 是资源的划分,线程是执行的载体。
StreamTask
: 在 TaskManager 内部,每个被部署的流处理任务的实际体现就是一个StreamTask
(或其特定子类,如SourceTask
,OneInputStreamTask
,TwoInputStreamTask
等)的实例。- 如
docs/content/docs/internals/task_lifecycle.md
所述: "TheStreamTask
is the base for all different task sub-types in Flink's streaming engine." StreamTask
负责初始化和运行其内部的StreamOperator
链(OperatorChain
),处理输入数据,执行用户定义的函数 (UDF),并产生输出数据。StreamTask
的生命周期包括创建、部署、运行、取消、完成或失败等状态。- 如
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
文件所示,StreamTask
包含了大量的逻辑来管理算子链 (operatorChain
)、配置 (configuration
)、状态后端 (stateBackend
)、检查点协调 (subtaskCheckpointCoordinator
) 等。
- 如
关键补充点总结:
StreamGraph
: 在客户端生成,是最初的逻辑图。- Dispatcher 与 JobMaster: JobManager 内部组件,Dispatcher 负责接收作业并为每个作业启动一个 JobMaster。JobMaster 负责单个作业的完整生命周期。
JobGraph
: 由StreamGraph
转换而来,是并行的逻辑图。ExecutionGraph
: 由JobGraph
转换而来,是物理执行图,包含ExecutionVertex
和Execution
。- ResourceManager: 负责 Task Slot 的分配。
- Task Slot 与线程: Slot 是资源单位,Task 在 Slot 内的独立线程中运行。
OperatorChain
:StreamTask
内部可以运行一个算子链,这是 Flink 的一项重要优化。
Flink 核心概念关系:从 API 到执行
DataStream
API 与UDF
(User-Defined Function - 用户定义函数):- 起点: 用户通过
DataStream
API (例如dataStream.map(myMapFunction).filter(myFilterFunction)
) 来声明式地构建数据处理流程。 - 业务逻辑:
UDF
(例如MyMapFunction
,MyFilterFunction
,KeyedProcessFunction
) 是用户编写的包含具体业务处理逻辑的 Java/Scala 函数或类。它们是数据转换的核心。
- 起点: 用户通过
Transformation
(转换):- 逻辑蓝图: 每当在
DataStream
上调用一个操作(如map
,filter
,keyBy
),就会创建一个或多个Transformation
对象。 Transformation
树是 Flink 作业的逻辑表示或蓝图。它详细描述了数据如何从一个操作流向下一个操作,包括操作类型、应用的UDF
、输入/输出数据类型、并行度设置等。它本身不执行计算。
- 逻辑蓝图: 每当在
逻辑
Operator
(算子) 与逻辑Subtask
(子任务):- 逻辑处理单元: 在
Transformation
层面,我们可以认为每个转换操作对应一个逻辑Operator
(例如 Map Operator, Filter Operator)。 - 并行实例 (逻辑): 如果一个逻辑
Operator
的并行度(parallelism)被设置为N
,那么在逻辑规划中,这个Operator
就拥有N
个并行的逻辑实例,这些逻辑实例通常被称为Subtask
。每个逻辑Subtask
代表了该Operator
的一个独立、并行的处理单元。
- 逻辑处理单元: 在
StreamOperator
(运行时算子/流算子):- 物理执行体:
StreamOperator
是 Flink 运行时的核心组件,是Transformation
中定义的逻辑算子在物理执行时的具体实现和承载体。例如,StreamMap
、KeyedProcessOperator
都是StreamOperator
的具体实现。 - 封装 UDF:
StreamOperator
负责封装用户提供的UDF
。它管理UDF
的生命周期(如调用open()
,close()
方法)并调用UDF
的核心处理方法(如map()
,filter()
,processElement()
)来处理数据。 AbstractUdfStreamOperator
是许多包含UDF
的StreamOperator
的通用基类,它简化了UDF
的管理。
- 物理执行体:
StreamTask
(流任务/物理任务):- 执行单元:
StreamTask
是 Flink 在 TaskManager 上进行物理执行的基本单元。它是一个实现了java.lang.Runnable
的对象,在 TaskManager 的一个 Slot 中的一个独立线程内运行。 - 核心职责:
StreamTask
负责:- 管理其内部一个或多个
StreamOperator
的完整生命周期(初始化、打开、运行数据处理循环、响应 Checkpoint、关闭、清理)。 - 处理数据的输入(从网络或上游 Task)和输出(到网络或下游 Task)。
- 协调 Checkpoint 过程。
- 管理其内部一个或多个
- 执行入口:
StreamTask
的invoke()
方法是其执行逻辑的入口点,它启动了数据处理的主循环。
- 执行单元:
算子链 (Operator Chaining) 的影响:
- 优化: Flink 会尽可能地将满足条件的多个逻辑
Operator
(及其对应的逻辑Subtask
) 链接(chain)在一起。例如,连续的map -> filter -> map
操作,如果并行度相同且数据传输直接(非重分区),它们通常会被链接。 - 结果:
- 被链接起来的一系列
StreamOperator
实例会运行在同一个StreamTask
内部,由一个OperatorChain
对象管理。 - 这意味着一个
StreamTask
可能只包含一个单独的StreamOperator
(如果没有发生链接),或者包含一串链式连接的StreamOperator
。 - 这样做能显著减少线程切换、数据序列化/反序列化以及网络传输的开销,提升性能。
- 被链接起来的一系列
- 优化: Flink 会尽可能地将满足条件的多个逻辑
总结关系流程:
- 用户使用
DataStream
API 编写代码,并提供UDF
来定义业务逻辑。 - 这些 API 调用会构建一个
Transformation
树,这是作业的逻辑蓝图。 - Flink 编译器将
Transformation
树转换为物理执行图 (JobGraph -> ExecutionGraph):- 每个逻辑
Operator
(在Transformation
中定义) 根据其并行度被实例化为多个逻辑Subtask
。 - 满足条件的逻辑
Subtask
(来自不同的逻辑Operator
) 会被优化策略链接(chain)起来。
- 每个逻辑
- 在运行时 (TaskManager):
- 每个(可能经过链接的)
Subtask
序列,作为一个整体,被调度为一个StreamTask
实例,并在一个独立的线程中执行。 StreamTask
内部运行着一个或多个(如果发生链接,则形成OperatorChain
)StreamOperator
实例。- 每个
StreamOperator
实例则封装并调用用户编写的UDF
来对流经它的数据执行具体的业务逻辑处理。
- 每个(可能经过链接的)
结合源码说明
我们来看一些关键的源码片段来理解这个流程:
StreamTask 的构造与初始化: 当 TaskManager 接收到部署任务的指令后,会创建
StreamTask
实例。// ... existing code ... public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> implements TaskInvokable, CheckpointableTask, CoordinatedTask, AsyncExceptionHandler, ContainingTaskDetails { // ... existing code ... protected StreamTask( Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor actionExecutor, TaskMailbox taskMailbox) throws Exception { this.environment = Preconditions.checkNotNull(environment); this.configuration = new StreamConfig(environment.getTaskConfiguration()); this.recordWriter = createRecordWriterDelegate(configuration, environment); this.resourceCloser = new AutoCloseableRegistry(); this.mailboxProcessor = new MailboxProcessor( this::processInput, taskMailbox, actionExecutor, resourceCloser, this::shouldBeTerminated, this::handleAsyncExceptionDuringNormalExecution); // ... existing code ... this.asyncOperationsThreadPool = MdcUtils.scopeToJob( getEnvironment().getJobID(), new ThreadPoolExecutor( 0, configuration.getMaxConcurrentCheckpoints() + 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ExecutorThreadFactory( "AsyncOperations", uncaughtExceptionHandler))); // ... existing code ... this.stateBackend = createStateBackend(); this.checkpointStorage = createCheckpointStorage(stateBackend); // ... existing code ... this.subtaskCheckpointCoordinator = new SubtaskCheckpointCoordinatorImpl( checkpointStorageAccess, getName(), actionExecutor, getAsyncOperationsThreadPool(), environment, this, this::prepareInputSnapshot, configuration.getMaxConcurrentCheckpoints(), channelStateWriter, configuration .getConfiguration() .get( CheckpointingOptions .ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH), BarrierAlignmentUtil.createRegisterTimerCallback( mainMailboxExecutor, systemTimerService), environment.getTaskStateManager().getFileMergingSnapshotManager()); // ... existing code ... }
在构造函数中,
StreamTask
会初始化运行环境、配置、状态后端、Checkpoint 存储和协调器等。StreamTask 的执行入口
invoke()
: 这是 TaskManager 启动 Task 后调用的核心方法。// ... existing code ... @Override public final void invoke() throws Exception { SubTaskInitializationMetricsBuilder initializationMetrics = SubTaskInitializationMetricsBuilder.create(getEnvironment().getMetricGroup()); final long initializationStarted = SystemClock.getInstance().absoluteTimeMillis(); initializationMetrics.addTimestampMetric(INITIALIZATION_START_TIMESTAMP, initializationStarted); // 初始化 OperatorChain,这里会创建 StreamOperator 实例 // RegularOperatorChain 或 FinishedOperatorChain try { operatorChain = getEnvironment().getTaskStateManager().isTaskDeployedAsFinished() ? new FinishedOperatorChain<>(this, recordWriter) : new RegularOperatorChain<>(this, recordWriter); mainOperator = operatorChain.getMainOperator(); getEnvironment() .getTaskStateManager() .getRestoreCheckpointId() .ifPresent(restoreId -> latestReportCheckpointId = restoreId); // task specific initialization,调用子类实现的 init() 方法 init(); configuration.clearInitialConfigs(); // save the work of reloading state, etc, if the task is already canceled ensureNotCanceled(); // -------- Invoke -------- LOG.debug("Invoking {}", getName()); // we need to make sure that any triggers scheduled in open() cannot be // executed before all operators are opened // 恢复状态并打开算子 CompletableFuture<Void> allGatesRecoveredFuture = actionExecutor.call(() -> restoreStateAndGates(initializationMetrics)); // Run mailbox until all gates will be recovered. // 启动邮箱处理循环,这是任务处理数据的主要逻辑 mailboxProcessor.runMailboxLoop(); // ... existing code ... // make sure this is executed in any case! LOG.debug("Finished task {}", getName()); } finally { // ... cleanup ... actionExecutor.runThrowing( () -> { // only set the StreamTask to not running after all operators have been // finished! // ... existing code ... disableInterruptOnCancel(); // ... existing code ... // clean up everything we initialized isRunning = false; // ... existing code ... try { resourceCloser.close(); } catch (Throwable t) { Exception e = t instanceof Exception ? (Exception) t : new Exception(t); throw firstOrSuppressed(e, cancelException); } } }
在
invoke()
方法中:- 创建
OperatorChain
(RegularOperatorChain
或FinishedOperatorChain
),它包含了这个StreamTask
要执行的一个或多个StreamOperator
。 - 调用
init()
方法进行特定于任务类型的初始化(例如,SourceOperatorStreamTask
会在这里启动 SourceReader)。 - 调用
restoreStateAndGates()
,其中会调用operatorChain.initializeStateAndOpenOperators()
。
- 创建
Operator 的初始化和开:
OperatorChain
的initializeStateAndOpenOperators
方法会遍历链上的所有算子,调用它们的initializeState()
和open()
方法。// ... existing code ... operatorChain.initializeStateAndOpenOperators( createStreamTaskStateInitializer(initializationMetrics)); initializeStateEndTs = SystemClock.getInstance().absoluteTimeMillis(); // ... existing code ...
而
createStreamTaskStateInitializer
会创建一个StreamTaskStateInitializerImpl
实例,用于初始化算子的状态。// ... existing code ... public StreamTaskStateInitializer createStreamTaskStateInitializer( SubTaskInitializationMetricsBuilder initializationMetrics) { InternalTimeServiceManager.Provider timerServiceProvider = configuration.getTimerServiceProvider(getUserCodeClassLoader()); return new StreamTaskStateInitializerImpl( getEnvironment(), stateBackend, // ... existing code ...
UDF 的生命周期调用: 以
AbstractUdfStreamOperator
为例,它的initializeState()
和open()
方法会进一步调用 UDF 的相应方法。// ... existing code ... @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); StreamingFunctionUtils.restoreFunctionState(context, userFunction); } @Override public void open() throws Exception { super.open(); FunctionUtils.openFunction(userFunction, DefaultOpenContext.INSTANCE); } @Override public void finish() throws Exception { super.finish(); if (userFunction instanceof SinkFunction) { ((SinkFunction<?>) userFunction).finish(); } } @Override public void close() throws Exception { super.close(); FunctionUtils.closeFunction(userFunction); } // ... existing code ...
这里
userFunction
就是用户定义的 UDF。FunctionUtils.openFunction
会调用 UDF 的open()
方法,并传入RuntimeContext
。数据处理循环:
StreamTask
的mailboxProcessor.runMailboxLoop()
启动后,会不断调用processInput()
方法(如果邮箱中有待处理的邮件或默认操作可以执行)。// ... existing code ... protected void processInput(MailboxDefaultAction.Controller controller) throws Exception { DataInputStatus status = inputProcessor.processInput(); switch (status) { case MORE_AVAILABLE: if (taskIsAvailable()) { return; } break; case NOTHING_AVAILABLE: break; case END_OF_RECOVERY: throw new IllegalStateException("We should not receive this event here."); // ... existing code ...
inputProcessor.processInput()
会从输入源读取数据,并通过OperatorChain
将数据传递给第一个StreamOperator
,然后数据会在算子链中依次处理,每个算子会调用其内部 UDF 的处理逻辑(如map()
,filter()
,processElement()
)。
文档中的相关描述
Task Lifecycle:
/flink/docs/content/docs/internals/task_lifecycle.md
描述了StreamTask
和Operator
的生命周期。The
StreamTask
is the base for all different task sub-types in Flink's streaming engine. ... OPERATOR::setup -> UDF::setRuntimeContext OPERATOR::initializeState OPERATOR::open -> UDF::open OPERATOR::processElement -> UDF::runFlink Architecture - Tasks and Operator Chains:
flink/docs/content/docs/concepts/flink-architecture.md
For distributed execution, Flink chains operator subtasks together into tasks. Each task is executed by one thread. Chaining operators together into tasks is a useful optimization...
通过以上分析,应该对 Flink 任务的启动流程以及 Subtask
、StreamTask
、Operator
和 UDF
之间的关系有了更清晰的理解。
StreamTask
是核心的执行单元,它承载了 Operator
,而 Operator
又驱动着 UDF
的执行。整个过程由 JobManager 调度,并在 TaskManager 上实际运行。