Flink task、Operator 和 UDF 之间的关系

发布于:2025-06-15 ⋅ 阅读:(17) ⋅ 点赞:(0)

要真正驾驭 Flink 并构建出高效、稳定、可扩展的流处理应用,仅仅停留在 API 的表面使用是远远不够的。深入理解其内部的运行机制,洞悉数据从代码到分布式执行的完整生命周期,以及明晰各个核心组件之间错综复杂而又协同工作的关系,对于我们进行性能调优、故障排查以及设计更优的应用程序架构至关重要。

本文将带领大家一起揭开 Flink 的神秘面纱,我们将首先详细梳理一个 Flink 作业从客户端提交到在 TaskManager 上具体执行的完整启动流程,理解 StreamGraphJobGraph 到 ExecutionGraph 的演变。

紧接着,我们将深入剖析 Flink 中那些我们既熟悉又可能感到困惑的核心概念,如 DataStream 如何通过 Transformation 承载用户的 UDF,最终又是如何在 StreamOperator 和 StreamTask 中焕发生机,以及它们之间是如何相互关联、协同工作的。希望通过这次探索,能帮助构建起对 Flink 内部原理更为清晰和系统的认识。

Flink 任务启动流程

  1. 客户端(Client)准备与提交作业:

    • 用户通过 Flink 客户端(例如,执行 flink run 命令的 CLI,或者通过 REST API 提交的程序,或者在 IDE 中直接运行)提交一个 Flink 应用程序(例如,用 DataStream API 编写的程序)。
    • 在客户端,用户的程序(例如 StreamExecutionEnvironment.execute())首先会将用户定义的 DataStream 操作转换为一个 StreamGraphStreamGraph 是作业的最初的、面向流的逻辑表示,它包含了所有的算子、UDF、并行度设置、数据流向等信息。
    • 客户端随后将这个 StreamGraph 提交给 JobManager(具体来说是 JobManager 中的 Dispatcher 组件)。
  2. JobManager 接收与处理作业:【stream Graph转化Job Graph 版本有变动,但是通信链路是一致的】

    • Dispatcher: JobManager 中的 Dispatcher 接收到 StreamGraph 后,会为这个作业启动一个新的 JobMaster。Dispatcher 负责作业的提交、JobMaster 的生命周期管理,并提供 REST 接口。
    • JobMaster: 每个作业都有其专属的 JobMaster。JobMaster 负责该作业的整个生命周期管理。
      • StreamGraph -> JobGraph: JobMaster 首先将接收到的 StreamGraph 转换为 JobGraphJobGraph 是一个更通用的、并行的作业表示,它将 StreamGraph 中的算子链(Operator Chains)优化考虑在内,并确定了 JobVertex(逻辑上的并行算子)。每个 JobVertex 对应 StreamGraph 中的一个或多个(链式)算子。
        • 如 docs/content.zh/docs/internals/job_scheduling.md 中提到:“JobManager 会接收到一个 JobGraph,用来描述由多个算子顶点 (JobVertex) 组成的数据流图”。
      • JobGraph -> ExecutionGraph: JobMaster 接着将 JobGraph 转换为 ExecutionGraphExecutionGraph 是作业的物理执行计划,是 JobGraph 的并行化版本。
        • 它将每个 JobVertex 根据其并行度展开为多个并行的 ExecutionVertex
        • 每个 ExecutionVertex 代表了一个逻辑算子(或算子链)的一个并行实例。
        • ExecutionGraph 中的每个 ExecutionVertex 会有一个或多个 Execution 对象来跟踪其执行尝试(例如,初次执行、故障恢复后的重试)。
        • ExecutionGraph 是 JobMaster 调度和监控作业执行的核心数据结构。
    • 调度器 (Scheduler): JobMaster 内部的调度器负责将 ExecutionGraph 中的任务(具体来说是 Execution 对象代表的执行尝试)部署到可用的 TaskManager Slot 上。
      • 调度器会向 ResourceManager (如果使用了如 YARN, Kubernetes 等资源管理器,或者是 Flink 自身的 Standalone ResourceManager) 请求所需的 Task Slot。
      • 一旦 Slot 分配成功,调度器就会将任务部署到相应的 TaskManager。
  3. TaskManager 执行任务:

    • TaskManager 接收到 JobManager (JobMaster) 分配的任务部署指令后,会在其管理的某个空闲的 Task Slot 中为该任务启动执行。
    • Slot 与线程: 一个 Task Slot 代表了 TaskManager 提供的一份固定的计算资源(通常与 CPU核心数相关)。一个 Task Slot 会运行一个或多个(如果启用了 Slot Sharing Group 且任务属于同一共享组)任务,每个任务(Task)都在其自己的独立线程中执行。 这个线程本身不属于槽而属于task,Slot 是资源的划分,线程是执行的载体。
    • StreamTask: 在 TaskManager 内部,每个被部署的流处理任务的实际体现就是一个 StreamTask(或其特定子类,如 SourceTaskOneInputStreamTaskTwoInputStreamTask 等)的实例。
      • 如 docs/content/docs/internals/task_lifecycle.md 所述: "The StreamTask is the base for all different task sub-types in Flink's streaming engine."
      • StreamTask 负责初始化和运行其内部的 StreamOperator 链(OperatorChain),处理输入数据,执行用户定义的函数 (UDF),并产生输出数据。
      • StreamTask 的生命周期包括创建、部署、运行、取消、完成或失败等状态。
      • 如 flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java 文件所示,StreamTask 包含了大量的逻辑来管理算子链 (operatorChain)、配置 (configuration)、状态后端 (stateBackend)、检查点协调 (subtaskCheckpointCoordinator) 等。

关键补充点总结:

  • StreamGraph: 在客户端生成,是最初的逻辑图。
  • Dispatcher 与 JobMaster: JobManager 内部组件,Dispatcher 负责接收作业并为每个作业启动一个 JobMaster。JobMaster 负责单个作业的完整生命周期。
  • JobGraph: 由 StreamGraph 转换而来,是并行的逻辑图。
  • ExecutionGraph: 由  JobGraph 转换而来,是物理执行图,包含 ExecutionVertex 和 Execution
  • ResourceManager: 负责 Task Slot 的分配。
  • Task Slot 与线程: Slot 是资源单位,Task 在 Slot 内的独立线程中运行。
  • OperatorChainStreamTask 内部可以运行一个算子链,这是 Flink 的一项重要优化。

Flink 核心概念关系:从 API 到执行

  1. DataStream API 与 UDF (User-Defined Function - 用户定义函数):

    • 起点: 用户通过 DataStream API (例如 dataStream.map(myMapFunction).filter(myFilterFunction)) 来声明式地构建数据处理流程。
    • 业务逻辑UDF (例如 MyMapFunctionMyFilterFunctionKeyedProcessFunction) 是用户编写的包含具体业务处理逻辑的 Java/Scala 函数或类。它们是数据转换的核心。
  2. Transformation (转换):

    • 逻辑蓝图: 每当在 DataStream 上调用一个操作(如 mapfilterkeyBy),就会创建一个或多个 Transformation 对象。
    • Transformation 树是 Flink 作业的逻辑表示或蓝图。它详细描述了数据如何从一个操作流向下一个操作,包括操作类型、应用的 UDF、输入/输出数据类型、并行度设置等。它本身不执行计算。
  3. 逻辑 Operator (算子) 与逻辑 Subtask (子任务):

    • 逻辑处理单元: 在 Transformation 层面,我们可以认为每个转换操作对应一个逻辑 Operator (例如 Map Operator, Filter Operator)。
    • 并行实例 (逻辑): 如果一个逻辑 Operator 的并行度(parallelism)被设置为 N,那么在逻辑规划中,这个 Operator 就拥有 N 个并行的逻辑实例,这些逻辑实例通常被称为 Subtask。每个逻辑 Subtask 代表了该 Operator 的一个独立、并行的处理单元。
  4. StreamOperator (运行时算子/流算子):

    • 物理执行体StreamOperator 是 Flink 运行时的核心组件,是 Transformation 中定义的逻辑算子在物理执行时的具体实现和承载体。例如,StreamMapKeyedProcessOperator 都是 StreamOperator 的具体实现。
    • 封装 UDFStreamOperator 负责封装用户提供的 UDF。它管理 UDF 的生命周期(如调用 open()close() 方法)并调用 UDF 的核心处理方法(如 map()filter()processElement())来处理数据。
    • AbstractUdfStreamOperator 是许多包含 UDF 的 StreamOperator 的通用基类,它简化了 UDF 的管理。
  5. StreamTask (流任务/物理任务):

    • 执行单元StreamTask 是 Flink 在 TaskManager 上进行物理执行的基本单元。它是一个实现了 java.lang.Runnable 的对象,在 TaskManager 的一个 Slot 中的一个独立线程内运行。
    • 核心职责StreamTask 负责:
      • 管理其内部一个或多个 StreamOperator 的完整生命周期(初始化、打开、运行数据处理循环、响应 Checkpoint、关闭、清理)。
      • 处理数据的输入(从网络或上游 Task)和输出(到网络或下游 Task)。
      • 协调 Checkpoint 过程。
    • 执行入口StreamTask 的 invoke() 方法是其执行逻辑的入口点,它启动了数据处理的主循环。
  6. 算子链 (Operator Chaining) 的影响:

    • 优化: Flink 会尽可能地将满足条件的多个逻辑 Operator (及其对应的逻辑 Subtask) 链接(chain)在一起。例如,连续的 map -> filter -> map 操作,如果并行度相同且数据传输直接(非重分区),它们通常会被链接。
    • 结果:
      • 被链接起来的一系列 StreamOperator 实例会运行在同一个 StreamTask 内部,由一个 OperatorChain 对象管理。
      • 这意味着一个 StreamTask 可能只包含一个单独的 StreamOperator (如果没有发生链接),或者包含一串链式连接的 StreamOperator
      • 这样做能显著减少线程切换、数据序列化/反序列化以及网络传输的开销,提升性能。

总结关系流程:

  1. 用户使用 DataStream API 编写代码,并提供 UDF 来定义业务逻辑。
  2. 这些 API 调用会构建一个 Transformation 树,这是作业的逻辑蓝图。
  3. Flink 编译器将 Transformation 树转换为物理执行图 (JobGraph -> ExecutionGraph):
    • 每个逻辑 Operator (在 Transformation 中定义) 根据其并行度被实例化为多个逻辑 Subtask
    • 满足条件的逻辑 Subtask (来自不同的逻辑 Operator) 会被优化策略链接(chain)起来。
  4. 在运行时 (TaskManager):
    • 每个(可能经过链接的)Subtask 序列,作为一个整体,被调度为一个 StreamTask 实例,并在一个独立的线程中执行。
    • StreamTask 内部运行着一个或多个(如果发生链接,则形成 OperatorChainStreamOperator 实例。
    • 每个 StreamOperator 实例则封装并调用用户编写的 UDF 来对流经它的数据执行具体的业务逻辑处理。

结合源码说明

我们来看一些关键的源码片段来理解这个流程:

  1. StreamTask 的构造与初始化: 当 TaskManager 接收到部署任务的指令后,会创建 StreamTask 实例。

    // ... existing code ...
    public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
            implements TaskInvokable,
                    CheckpointableTask,
                    CoordinatedTask,
                    AsyncExceptionHandler,
                    ContainingTaskDetails {
    
        // ... existing code ...
        protected StreamTask(
                Environment environment,
                @Nullable TimerService timerService,
                Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
                StreamTaskActionExecutor actionExecutor,
                TaskMailbox taskMailbox)
                throws Exception {
            this.environment = Preconditions.checkNotNull(environment);
            this.configuration = new StreamConfig(environment.getTaskConfiguration());
            this.recordWriter = createRecordWriterDelegate(configuration, environment);
            this.resourceCloser = new AutoCloseableRegistry();
            this.mailboxProcessor =
                    new MailboxProcessor(
                            this::processInput,
                            taskMailbox,
                            actionExecutor,
                            resourceCloser,
                            this::shouldBeTerminated,
                            this::handleAsyncExceptionDuringNormalExecution);
    // ... existing code ...
            this.asyncOperationsThreadPool =
                    MdcUtils.scopeToJob(
                            getEnvironment().getJobID(),
                            new ThreadPoolExecutor(
                                    0,
                                    configuration.getMaxConcurrentCheckpoints() + 1,
                                    60L,
                                    TimeUnit.SECONDS,
                                    new LinkedBlockingQueue<>(),
                                    new ExecutorThreadFactory(
                                            "AsyncOperations", uncaughtExceptionHandler)));
    // ... existing code ...
            this.stateBackend = createStateBackend();
            this.checkpointStorage = createCheckpointStorage(stateBackend);
    // ... existing code ...
            this.subtaskCheckpointCoordinator =
                    new SubtaskCheckpointCoordinatorImpl(
                            checkpointStorageAccess,
                            getName(),
                            actionExecutor,
                            getAsyncOperationsThreadPool(),
                            environment,
                            this,
                            this::prepareInputSnapshot,
                            configuration.getMaxConcurrentCheckpoints(),
                            channelStateWriter,
                            configuration
                                    .getConfiguration()
                                    .get(
                                            CheckpointingOptions
                                                    .ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH),
                            BarrierAlignmentUtil.createRegisterTimerCallback(
                                    mainMailboxExecutor, systemTimerService),
                            environment.getTaskStateManager().getFileMergingSnapshotManager());
    // ... existing code ...
    }
    

    在构造函数中,StreamTask 会初始化运行环境、配置、状态后端、Checkpoint 存储和协调器等。

  2. StreamTask 的执行入口 invoke(): 这是 TaskManager 启动 Task 后调用的核心方法。

    // ... existing code ...
    @Override
    public final void invoke() throws Exception {
        SubTaskInitializationMetricsBuilder initializationMetrics =
                SubTaskInitializationMetricsBuilder.create(getEnvironment().getMetricGroup());
        final long initializationStarted = SystemClock.getInstance().absoluteTimeMillis();
        initializationMetrics.addTimestampMetric(INITIALIZATION_START_TIMESTAMP, initializationStarted);
    
        // 初始化 OperatorChain,这里会创建 StreamOperator 实例
        // RegularOperatorChain 或 FinishedOperatorChain
        try {
            operatorChain =
                    getEnvironment().getTaskStateManager().isTaskDeployedAsFinished()
                            ? new FinishedOperatorChain<>(this, recordWriter)
                            : new RegularOperatorChain<>(this, recordWriter);
            mainOperator = operatorChain.getMainOperator();
    
            getEnvironment()
                    .getTaskStateManager()
                    .getRestoreCheckpointId()
                    .ifPresent(restoreId -> latestReportCheckpointId = restoreId);
    
            // task specific initialization,调用子类实现的 init() 方法
            init();
            configuration.clearInitialConfigs();
    
            // save the work of reloading state, etc, if the task is already canceled
            ensureNotCanceled();
    
            // -------- Invoke --------
            LOG.debug("Invoking {}", getName());
    
            // we need to make sure that any triggers scheduled in open() cannot be
            // executed before all operators are opened
            // 恢复状态并打开算子
            CompletableFuture<Void> allGatesRecoveredFuture =
                    actionExecutor.call(() -> restoreStateAndGates(initializationMetrics));
    
            // Run mailbox until all gates will be recovered.
            // 启动邮箱处理循环,这是任务处理数据的主要逻辑
            mailboxProcessor.runMailboxLoop();
    
    // ... existing code ...
            // make sure this is executed in any case!
            LOG.debug("Finished task {}", getName());
        } finally {
            // ... cleanup ...
            actionExecutor.runThrowing(
                    () -> {
                        // only set the StreamTask to not running after all operators have been
                        // finished!
    // ... existing code ...
    
            disableInterruptOnCancel();
    
    // ... existing code ...
            // clean up everything we initialized
            isRunning = false;
    
    // ... existing code ...
            try {
                resourceCloser.close();
            } catch (Throwable t) {
                Exception e = t instanceof Exception ? (Exception) t : new Exception(t);
                throw firstOrSuppressed(e, cancelException);
            }
        }
    }
    

    在 invoke() 方法中:

    • 创建 OperatorChain (RegularOperatorChain 或 FinishedOperatorChain),它包含了这个 StreamTask 要执行的一个或多个 StreamOperator
    • 调用 init() 方法进行特定于任务类型的初始化(例如,SourceOperatorStreamTask 会在这里启动 SourceReader)。
    • 调用 restoreStateAndGates(),其中会调用 operatorChain.initializeStateAndOpenOperators()
  3. Operator 的初始化和开OperatorChain 的 initializeStateAndOpenOperators 方法会遍历链上的所有算子,调用它们的 initializeState() 和 open() 方法。

    // ... existing code ...
        operatorChain.initializeStateAndOpenOperators(
                createStreamTaskStateInitializer(initializationMetrics));
    
        initializeStateEndTs = SystemClock.getInstance().absoluteTimeMillis();
    // ... existing code ...
    

    而 createStreamTaskStateInitializer 会创建一个 StreamTaskStateInitializerImpl 实例,用于初始化算子的状态。

    // ... existing code ...
    public StreamTaskStateInitializer createStreamTaskStateInitializer(
            SubTaskInitializationMetricsBuilder initializationMetrics) {
        InternalTimeServiceManager.Provider timerServiceProvider =
                configuration.getTimerServiceProvider(getUserCodeClassLoader());
    
        return new StreamTaskStateInitializerImpl(
                getEnvironment(),
                stateBackend,
    // ... existing code ...
    
  4. UDF 的生命周期调用: 以 AbstractUdfStreamOperator 为例,它的 initializeState() 和 open() 方法会进一步调用 UDF 的相应方法。

    // ... existing code ...
    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        StreamingFunctionUtils.restoreFunctionState(context, userFunction);
    }
    
    @Override
    public void open() throws Exception {
        super.open();
        FunctionUtils.openFunction(userFunction, DefaultOpenContext.INSTANCE);
    }
    
    @Override
    public void finish() throws Exception {
        super.finish();
        if (userFunction instanceof SinkFunction) {
            ((SinkFunction<?>) userFunction).finish();
        }
    }
    
    @Override
    public void close() throws Exception {
        super.close();
        FunctionUtils.closeFunction(userFunction);
    }
    // ... existing code ...
    

    这里 userFunction 就是用户定义的 UDF。FunctionUtils.openFunction 会调用 UDF 的 open() 方法,并传入 RuntimeContext

  5. 数据处理循环StreamTask 的 mailboxProcessor.runMailboxLoop() 启动后,会不断调用 processInput() 方法(如果邮箱中有待处理的邮件或默认操作可以执行)。

    // ... existing code ...
    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
        DataInputStatus status = inputProcessor.processInput();
        switch (status) {
            case MORE_AVAILABLE:
                if (taskIsAvailable()) {
                    return;
                }
                break;
            case NOTHING_AVAILABLE:
                break;
            case END_OF_RECOVERY:
                throw new IllegalStateException("We should not receive this event here.");
    // ... existing code ...
    

    inputProcessor.processInput() 会从输入源读取数据,并通过 OperatorChain 将数据传递给第一个 StreamOperator,然后数据会在算子链中依次处理,每个算子会调用其内部 UDF 的处理逻辑(如 map()filter()processElement())。

文档中的相关描述

  • Task Lifecycle/flink/docs/content/docs/internals/task_lifecycle.md 描述了 StreamTask 和 Operator 的生命周期。

    The StreamTask is the base for all different task sub-types in Flink's streaming engine. ... OPERATOR::setup -> UDF::setRuntimeContext OPERATOR::initializeState OPERATOR::open -> UDF::open OPERATOR::processElement -> UDF::run

  • Flink Architecture - Tasks and Operator Chainsflink/docs/content/docs/concepts/flink-architecture.md

    For distributed execution, Flink chains operator subtasks together into tasks. Each task is executed by one thread. Chaining operators together into tasks is a useful optimization...

通过以上分析,应该对 Flink 任务的启动流程以及 SubtaskStreamTaskOperator 和 UDF 之间的关系有了更清晰的理解。

StreamTask 是核心的执行单元,它承载了 Operator,而 Operator 又驱动着 UDF 的执行。整个过程由 JobManager 调度,并在 TaskManager 上实际运行。


网站公告

今日签到

点亮在社区的每一天
去签到