Flink on YARN启动全流程深度解析

发布于:2025-08-15 ⋅ 阅读:(11) ⋅ 点赞:(0)

Flink on YARN 模式启动流程及核心组件协作详解

整个过程分为三个主要阶段:

  1. ​JobManager 启动​​(作业提交与 AM 初始化)

  2. ​TaskManager 资源分配与启动​

  3. ​任务部署与执行​


第一阶段:作业提交与 JobManager (AM) 启动

​目标​​:在 YARN 集群上启动 Flink 的"大脑"——JobManager

组件协作流程

1. 客户端 (Client) -> YARN ResourceManager (RM)
  • ​生成 JobGraph​

    开发环境通过 Flink 命令行/代码提交作业时,客户端分析代码生成 JobGraph数据结构。该静态逻辑视图描述算子(Source/Map/Sink等)及其数据流向。

  • ​申请 ApplicationMaster (AM)​

    客户端连接 YARN RM 请求启动 Flink AM。Per-Job 模式下该 AM 是 ​​JobManager + Flink ResourceManager​​ 的组合体。客户端需上传:

    • JobGraph

    • Flink 框架 Jar 包

    • 用户代码 Jar 包

      (资源存储于 HDFS 等共享存储供 AM 访问)

2. YARN RM -> YARN NodeManager (NM)
  • ​分配容器 (Container)​

    RM 选择计算节点(NodeManager)并分配资源单位 Container(独立 JVM 进程环境)

3. YARN NM -> Flink JobManager (AM)
  • ​启动 AM​

    NodeManager 通过 launch_container.sh启动 ApplicationMaster,入口点为 YarnJobClusterEntrypoint,关键操作包括:

    ​环境初始化​

    • 打印 Flink 版本/JVM/OS 信息

    • 加载 flink-conf.yaml配置

    ​核心服务启动​

    • RpcService:组件间 RPC 通信

    • HAService:JobManager 高可用

    • BlobServer:传输作业 Jar 等大二进制对象

    • HeartbeatServices:心跳检测

    ​Flink 资源管理器启动​

    • YarnResourceManager:与外部 YARN RM 通信(申请/释放 Container)

    • SlotManager:管理 TaskManager 提供的 Slot 资源状态

    ​作业执行核心启动​

    • Dispatcher:接收 JobGraph 并启动对应 JobManager

    • JobManager:将 JobGraph 转换为包含并行度/任务链/资源分配细节的 ExecutionGraph


第二阶段:TaskManager 资源分配与启动

​目标​​:根据作业需求启动计算"工人" TaskManager

组件协作流程

1. JobManager -> Flink ResourceManager
  • ​申请 Slot​

    JobManager 分析 ExecutionGraph 后向 Flink ResourceManager 申请所需计算资源(Slot)

2. Flink ResourceManager -> YARN RM
  • ​资源检查与申请​

    • SlotManager先检查现有空闲 Slot

    • 不足时由 YarnResourceManager向 YARN RM 申请新 Container

3. YARN RM -> YARN NM -> Flink TaskManager
  • ​容器分配与进程启动​

    • YARN RM 在其他 NodeManager 分配新 Container

    • Flink ResourceManager 指示启动 TaskManager(入口类 YarnTaskExecutorRunner

  • ​TaskManager 初始化​

    • 启动核心组件 TaskExecutor(实际任务执行单元)

    • 每个 TaskExecutor 拥有与机器 CPU 核数匹配的 Slot(独立任务线程池)

4. TaskManager -> Flink ResourceManager & JobManager
  • ​资源注册流程​

    • RPC 向 Flink ResourceManager 注册自身

    • 汇报可用 Slot 数量及状态(SlotManager 掌握新资源)

    • 与 JobManager 建立心跳连接(健康监控)


第三阶段:任务部署与执行

​目标​​:将计算任务分发至 TaskManager 并启动持续计算

组件协作流程

1. Flink ResourceManager -> JobManager
  • ​Slot 分配​

    SlotManager 将可用 Slot 分配给等待的 JobManager 请求,并通知目标位置(某 TaskManager 的指定 Slot)

2. JobManager -> TaskManager
  • ​任务提交​

    JobManager 通过 RPC 调用 TaskManager 的 submitTask接口,传输包含:

    • 任务执行逻辑

    • 配置参数

    • 依赖信息

3. TaskManager 内部执行
  • ​任务线程生命周期​

    • 启动专用线程执行任务

    • 调用核心方法 AbstractInvokable.invoke(),按顺序执行:

      1. ​初始化 (initialize)​

        • 创建状态后端

        • 恢复 Checkpoint 状态

        • 调用 RichFunction.open()(如建立数据库连接)

      2. ​运行 (run)​

        • Source Task:循环读取 Kafka/Pulsar 等数据并下发下游

        • Stream Task(中间算子):处理上游数据并转发下游

        • Sink Task:将数据写入外部系统(数据库/文件系统)

      3. ​关闭 (close)​

        • 清理资源连接

        • 调用 RichFunction.close()

至此数据开始在 TaskManager 间流动,Flink 作业进入持续计算状态

TM向JM连接的源码分析

这个过程由 ResourceManager (RM) 作为中间协调者。是 TaskManager (TM) 在收到 ResourceManager 的指令后,主动向 JobManager (JM) 发起连接和注册。 而不是 JM 主动去发现并连接 TM。

下面我们结合源码和 Flink 的工作机制来详细分解这个流程:

连接建立的详细步骤

  1. TM 启动并向 RM 注册 当一个 TM 节点启动时,它的首要任务是找到并向 ResourceManager (RM) 注册自己。这相当于 TM 在向集群的资源管理者报告:“我上线了,我有这些资源(Slots)可供使用”。

    在 TaskExecutor.java 的 onStart() 方法中,我们可以看到这个过程的起点:

    // ... existing code ...
    @Override
    public void onStart() throws Exception {
        try {
            startTaskExecutorServices();
        } catch (Throwable t) {
    // ... existing code ...
        }
    
        startRegistrationTimeout();
    }
    
    private void startTaskExecutorServices() throws Exception {
        try {
            // start by connecting to the ResourceManager
            resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
    
            // tell the task slot table who's responsible for the task slot actions
            taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
    
            // start the job leader service
            jobLeaderService.start(
                    getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
    // ... existing code ...
    

    这里的 resourceManagerLeaderRetriever.start(...) 就是 TM 寻找 RM 的过程。一旦找到 RM,TM 就会将自己的位置信息、资源ID和可用的 Slot 信息注册到 RM。

  2. JM 向 RM 请求资源 当用户提交一个 Job 到 JM 后,JM 会为这个 Job 的所有 Task 向 RM 请求计算资源(Slots)。

  3. RM 分配资源并通知 TM RM 收到 JM 的请求后,会从它管理的、已经注册的 TM 中寻找符合要求的空闲 Slot。找到后,RM 会将这个 Slot 分配给 JM。同时,RM 会向持有该 Slot 的 TM 发送一个指令(例如 offerSlots RPC 调用),告诉它:“你的某个 Slot 现在被分配给了 JobID 为 X 的 Job,这个 Job 的 JobManager 在 Y 地址”。

  4. TM 主动连接并注册到 JM 这是最关键的一步。TM 在收到 RM 的指令后,才得知它需要为哪个 JM 工作以及 JM 的地址。这时,TM 内部的 JobLeaderService 会为这个特定的 jobId 启动一个到目标 JM 的连接和注册过程。

    DefaultJobLeaderService.java 中的代码片段展示了 TM 是如何准备注册信息的:

    // ... existing code ...
    protected RetryingRegistration<
                    JobMasterId,
                    JobMasterGateway,
                    JMTMRegistrationSuccess,
                    JMTMRegistrationRejection>
            generateRegistration() {
        return new DefaultJobLeaderService.JobManagerRetryingRegistration(
                LOG,
                rpcService,
                "JobManager",
                JobMasterGateway.class,
                getTargetAddress(),
                getTargetLeaderId(),
                retryingRegistrationConfiguration,
                jobId,
                TaskManagerRegistrationInformation.create(
                        ownerAddress, ownLocation, taskManagerSession));
    }
    // ... existing code ...
    

    TM 会构建一个 TaskManagerRegistrationInformation 对象,然后通过 RPC 调用 JM 的 registerTaskManager 方法。

    当注册成功后,TM 端的 JobLeaderListenerImpl 会被回调,执行后续的连接建立工作。

    // ... existing code ...
    private final class JobLeaderListenerImpl implements JobLeaderListener {
    
        @Override
        public void jobManagerGainedLeadership(
                final JobID jobId,
                final JobMasterGateway jobManagerGateway,
                final JMTMRegistrationSuccess registrationMessage) {
            runAsync(
                    () ->
                            jobTable.getJob(jobId)
                                    .ifPresent(
                                            job ->
                                                    establishJobManagerConnection(
                                                            job,
                                                            jobManagerGateway,
                                                            registrationMessage)));
        }
    // ... existing code ...
    

    jobManagerGainedLeadership 这个名字意味着 TM 已经成功地与 JM 领导者建立了联系。JMTMRegistrationSuccess 这个参数就是注册成功的凭证。之后 establishJobManagerConnection 方法会建立心跳监控等。

  5. JM 向 TM 提交任务 一旦 TM 在 JM 处注册成功,它们之间的连接就完全建立好了。此时,JM 就可以通过这个连接向 TM 发送 submitTask 请求,将具体的计算任务部署到 TM 的 Slot 中去执行了。

    // ... existing code ...
    @Override
    public CompletableFuture<Acknowledge> submitTask(
            TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Duration timeout) {
    
        final JobID jobId = tdd.getJobId();
        // todo: consider adding task info
        try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
    
            final ExecutionAttemptID executionAttemptID = tdd.getExecutionAttemptId();
    
            final JobTable.Connection jobManagerConnection =
                    jobTable.getConnection(jobId)
                            .orElseThrow(
                                    () -> {
                                        final String message =
                                                "Could not submit task because there is no JobManager "
                                                        + "associated for the job "
                                                        + jobId
                                                        + '.';
    
                                        log.debug(message);
    // ... existing code ...
    

    submitTask 方法首先会检查与对应 jobId 的 JM 连接是否存在 (jobTable.getConnection(jobId)), 这也反向证明了连接必须预先建立好。

总结

  • 启动顺序:TM 和 JM 都可以独立启动。
  • 连接发起方TM 是主动方
  • 协调者ResourceManager 在整个过程中扮演了“介绍人”的角色。它告诉 TM 应该去连接哪个 JM。

所以,整个流程可以概括为:TM 向 RM "报到",JM 向 RM "要人",RM "指派" TM 去为 JM 工作,最后 TM 主动去向 JM "报到"(注册)。

Flink 与 YARN 集成

即使使用了 YARN,Flink 自己的 ResourceManager (RM) 依然是必需的,但它的角色和工作方式会发生根本性的变化。

为什么使用 YARN 还需要 Flink 的 ResourceManager?

这是因为 Flink 的 RM 和 YARN 的 RM 处在不同的管理层级,职责也不同:

  • YARN ResourceManager:这是整个 Hadoop 集群的全局资源管理器。它管理集群中所有节点(NodeManager)的 CPU、内存等资源。它的任务是为各种应用(如 Flink、Spark、MapReduce)分配资源单元,这个单元叫做“容器”(Container)。YARN RM 并不知道 Flink 的“Slot”是什么,它只负责分配和调度容器。

  • Flink ResourceManager:这是 Flink 集群专属的资源管理器。当以 Flink on YARN 模式运行时,它会作为 ApplicationMaster (AM) 在 YARN 分配的第一个容器中启动。它的核心职责是:

    • 作为桥梁:代表 Flink 应用,向 YARN RM 申请/释放容器。
    • 管理 Flink 资源:当从 YARN RM 获取到容器后,它会在这些容器中启动 Flink 的 TaskManager (TM) 进程。
    • 维护 Slot:管理所有 TM 上的 Slot 状态,并将可用的 Slot 信息提供给 JobManager (JM),用于执行任务。

所以,你可以这样理解它们的关系:Flink RM 向 YARN RM “要地”(申请容器),然后在这些“地”上“盖房子”(启动 TM),并管理这些“房子”里的“房间”(Slots)

ResourceManager.java 文件正是 Flink RM 的核心抽象类定义,这证明了 Flink RM 是 Flink 运行时的一个独立且关键的组件。

// ... existing code ...
public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
        extends FencedRpcEndpoint<ResourceManagerId>
        implements DelegationTokenManager.Listener, ResourceManagerGateway {
// ... existing code ...

引入 YARN 到底发生了什么变化?

引入 YARN 最大的变化是 Flink 实现了资源管理的动态化和自动化

  • Standalone 模式(没有 YARN):你需要手动启动固定数量的 TaskManager。资源是静态的,有多少 TM 和 Slot 在集群启动时就确定了,不会自动增减。
  • YARN 模式
    • 动态资源分配:Flink RM 可以根据作业的实际需求(例如,作业并行度增加了),动态地向 YARN RM 申请更多的容器来启动新的 TM。当资源空闲时,也可以将容器释放回 YARN,供其他应用使用。这大大提高了集群的资源利用率。
    • 进程管理外包:TM 的生命周期由 Flink RM 和 YARN NodeManager 管理。如果一个 TM 挂了,Flink RM 会向 YARN RM 重新申请一个容器来替换它。如果 Flink RM (ApplicationMaster) 挂了,YARN RM 会负责重启它,从而实现了一定程度的自动容错。

在 flink-yarn 模块中,YarnResourceManagerFactory 就是专门用于在 YARN 环境下创建 Flink RM 实例的工厂类,它创建的 RM 具备了与 YARN 交互的能力。

// ... existing code ...
public class YarnResourceManagerFactory extends ActiveResourceManagerFactory<YarnWorkerNode> {

    private static final YarnResourceManagerFactory INSTANCE = new YarnResourceManagerFactory();

    private YarnResourceManagerFactory() {}

// ... existing code ...

Flink 是如何与 YARN 集成的?

Flink 没有 复制 YARN 的源码。它是通过标准的 YARN 客户端 API 来与 YARN 集群进行通信的,这是一种标准的、解耦的集成方式。

整个集成过程大致如下:

  1. 你在客户端执行 flink run -t yarn-session ... 或 flink run-application ... 命令。
  2. Flink 的 YARN 客户端会将 Flink 的 Jar 包(包括 ApplicationMaster 的代码)和作业 Jar 包上传到 HDFS。
  3. YARN 客户端向 YARN RM 提交一个应用创建请求。
  4. YARN RM 收到请求后,会在某个 NodeManager 上分配一个容器,并在这个容器里启动 Flink 的 ApplicationMaster(AM)进程(里面就运行着 Flink RM)。
  5. 启动后的 Flink AM (Flink RM) 会使用 YARN 提供的 AMRMClientAsync 客户端与 YARN RM 建立心跳和通信,根据需要申请更多的容器来运行 TaskManager。
  6. 当 YARN RM 批准请求并分配了新的容器后,Flink AM 会使用 NMClientAsync 客户端与对应的 NodeManager 通信,在新的容器中启动 TaskManager 进程。

YarnResourceManagerClientFactory.java 这个文件就明确展示了 Flink 是如何创建 YARN RM 客户端的,其中 AMRMClientAsync 就是 YARN 提供的标准客户端接口。

// ... existing code ...
public interface YarnResourceManagerClientFactory {

    /**
     * Create YARN ResourceManager client with the given callback handler.
     *
     * @param yarnHeartbeatIntervalMillis heartbeat interval between the client and the YARN
     *     ResourceManager.
     * @param callbackHandler which handles the events from YARN ResourceManager.
     * @return an {@link AMRMClientAsync} instance.
     */
    AMRMClientAsync<AMRMClient.ContainerRequest> createResourceManagerClient(
            int yarnHeartbeatIntervalMillis,
            AMRMClientAsync.AbstractCallbackHandler callbackHandler);
}

这种方式使得 Flink 可以很好地兼容不同版本的 Hadoop YARN,而不需要关心 YARN 的内部实现细节。

Standalone和Yarn

TM 进程的来源以及 RM 的角色

  • Standalone 模式(原始模式):TM 是由用户手动启动的。RM 在这里扮演一个被动的“登记中心”角色,它只是等待 TM 来注册并上报资源。
  • YARN 模式:TM 是由 Flink RM 通过 YARN 动态申请并创建的。RM 在这里扮演一个主动的“资源调度者”角色,它负责向 YARN 申请资源并拉起 TM。

下面我们深入源码来看这两种模式的具体流程。

Standalone 模式:被动等待与注册

在这种模式下,Flink RM 完全不关心 TM 进程是怎么来的。

TM 的产生: 由运维人员或启动脚本在集群的各个节点上,手动执行 bin/taskmanager.sh start 来启动一个或多个 TaskManager 进程。

RM 如何知道 Slot:

  1. TM 主动注册:每个手动启动的 TM,在初始化后会根据 flink-conf.yaml 中的 jobmanager.rpc.address (在 Flink 1.x 中,现在是 rest.address) 找到 RM 的地址,然后调用 RM 的 registerTaskExecutor RPC 方法来注册自己。

  2. 上报 SlotReport:在注册请求中,TM 会附带一个 SlotReport 对象。这个对象详细描述了该 TM 拥有多少个 Slot,以及每个 Slot 的资源规格(ResourceProfile)。

  3. RM 登记资源:RM 的 registerTaskExecutor 方法接收到请求后,会做几件事:

    • 验证这个 TM 是否合法。
    • 将这个 TM 的连接信息(TaskExecutorGateway)和其 Worker 信息记录在 taskExecutors 这个 Map 中。
    • 最关键的,它会将 SlotReport 交给内部的 SlotManager 组件。
    • SlotManager 会解析 SlotReport,将这些新的 Slot 加入到自己的可用资源池中,等待 JobManager 来申请。

StandaloneResourceManager.java 是此模式下的具体实现。你会发现它的 initialize 方法非常简单,因为它不需要做任何主动申请资源的操作,只是启动内部服务,然后“坐等”TM 来连接。

// ... existing code ...
    @Override
    protected void initialize() throws ResourceManagerException {
        // 启动一个启动周期,在这个周期内,它会等待TMs注册。
        // 它不会主动做任何事情来创建TM。
        startStartupPeriod();
    }

    @Override
    protected void terminate() {
        // noop
    }

    @Override
    protected void internalDeregisterApplication(
            ApplicationStatus finalStatus, @Nullable String diagnostics) {}
// ... existing code ...

YARN 模式:主动申请与创建

在 YARN 模式下,情况完全反转,RM 变得非常“主动”。这里的 RM 是 ActiveResourceManager 的一个实例。

TM 的产生:

  1. JM 申请 Slot:一个 Job 提交后,JobManager 会根据作业的并行度向 Flink RM 申请所需数量的 Slot。

  2. SlotManager 发现资源不足:Flink RM 内部的 SlotManager 收到请求,发现当前已注册的 TM 上的空闲 Slot 不足以满足需求。

  3. RM 主动申请资源SlotManager 会通过 ResourceEventListener 回调通知 ActiveResourceManager:“我需要更多资源!”。ActiveResourceManager 不会等待,而是会立即行动。它通过一个叫做 ResourceManagerDriver 的组件(在 YARN 模式下,具体实现是 YarnResourceManagerDriver)来和外部资源系统(YARN)交互。它会调用 resourceManagerDriver.requestResource(...) 方法。

  4. YARN 分配容器YarnResourceManagerDriver 会使用 YARN 的客户端 API 向 YARN RM 发送一个容器申请。YARN RM 会在某个 NodeManager 上分配一个容器。

  5. RM 在容器中启动 TM:当 YarnResourceManagerDriver 收到 YARN RM 分配容器的通知后,它会连接到对应的 NodeManager,并命令它在刚刚分配的容器中执行一个启动命令,这个命令会拉起一个 TaskExecutor 进程。

ActiveResourceManager.java 是实现这一逻辑的核心。它的 initialize 方法会启动 ResourceManagerDriver,为与外部系统通信做好准备。

// ... existing code ...
    @Override
    protected void initialize() throws ResourceManagerException {
        try {
            // 初始化 ResourceManagerDriver,它封装了与YARN/K8s等外部系统交互的逻辑
            resourceManagerDriver.initialize(
                    this,
                    new GatewayMainThreadExecutor(),
                    ioExecutor,
                    blocklistHandler::getAllBlockedNodeIds);
        } catch (Exception e) {
            throw new ResourceManagerException("Cannot initialize resource provider.", e);
        }
    }
// ... existing code ...

当需要新 worker 时,ActiveResourceManager 会调用 startNewWorker,最终会调用到 resourceManagerDriver.requestResource

RM 如何知道 Slot: 这一步和 Standalone 模式完全一样。 一旦 TM 进程被 YARN 的 NodeManager 在容器中启动,这个 TM 就会执行和 Standalone 模式下完全相同的初始化逻辑:找到 Flink RM (此时是 ApplicationMaster),然后调用 registerTaskExecutor RPC 方法,并上报自己的 SlotReport

总结

特性

Standalone 模式

YARN 模式

RM 角色

被动 (Passive)

主动 (Active)

TM 来源

用户手动启动

Flink RM 通过 YARN 申请并创建

资源弹性

静态,资源固定

动态,按需伸缩

核心实现

StandaloneResourceManager

ActiveResourceManager + YarnResourceManagerDriver

所以,引入 YARN 的本质变化是将资源管理的职责从“用户手动操作”转移到了“程序自动协调”。Flink RM 从一个简单的“登记员”升级为了一个聪明的“项目经理”,能够主动地向 YARN 这个“资源供应商”申请和释放资源,实现了整个集群的弹性伸缩。


网站公告

今日签到

点亮在社区的每一天
去签到