17.TaskExecutor与ResourceManager交互

发布于:2025-07-22 ⋅ 阅读:(11) ⋅ 点赞:(0)

TaskExecutorResourceManager交互

  • TaskExecutorResourceManager 之间的交互机制较为复杂,核心包含首次发现与注册连接建立以及心跳维持三个阶段。后续会将这些内容拆解为两个部分详细说明。本文重点介绍领导者发现与注册阶段。

LeaderRetrievalService

  • LeaderRetrievalService(领导者检索服务)是专门用于监听领导者变更的组件,前文已有介绍。
  • Standalone 模式下,其实现类为 StandaloneLeaderRetrievalService。该服务启动后会持续监听指定组件的领导者信息变化。
  • 对于 TaskExecutor 而言,这一监听服务主要用于监控ResourceManager 的领导者变更,监听的回调对象为:
    ResourceManagerLeaderListener

ResourceManagerLeaderListener

  • ResourceManagerLeaderListenerLeaderRetrievalService 中的回调监听器。
  • 其职责是:
    • ResourceManager 领导者节点发生变更时,通知 TaskExecutor,以便后者与新的领导者节点重新建立连接与注册关系。
    • 换句话说,ResourceManagerLeaderListener 是**TaskExecutor 感知 ResourceManager 变动**的关键入口。
/** The listener for leader changes of the resource manager. */
    private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {

        @Override
        public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
            runAsync(
                    () ->
                			//这一步通知,并建立连接
                            notifyOfNewResourceManagerLeader(
                                    leaderAddress,
                                    ResourceManagerId.fromUuidOrNull(leaderSessionID)));
        }

        @Override
        public void handleError(Exception exception) {
            onFatalError(exception);
        }
    }

 // ------------------------------------------------------------------------
    //  Internal resource manager connection methods
    // ------------------------------------------------------------------------

    private void notifyOfNewResourceManagerLeader(
            String newLeaderAddress, ResourceManagerId newResourceManagerId) {
        resourceManagerAddress =
               //解析获取对应的 akka下的 地址
                createResourceManagerAddress(newLeaderAddress, newResourceManagerId);
        //连接 ResourceManager
        reconnectToResourceManager(
                new FlinkException(
                        String.format(
                                "ResourceManager leader changed to new address %s",
                                resourceManagerAddress)));
    }

private void reconnectToResourceManager(Exception cause) {
        //取消之前的连接
        closeResourceManagerConnection(cause);
        //超时控制
        startRegistrationTimeout();
        //尝试建立连接
        tryConnectToResourceManager();
    }
  • 先看一下 startRegistrationTimeout方法的实现:

    • 用于控制注册过程的最大等待时间。如果在设定的超时时间内未完成注册,系统会触发超时回调,重新尝试或报错退出。
    • 超时机制采用 UUID 标记以避免并发问题,只有当前有效的请求会触发超时逻辑
     private void startRegistrationTimeout() {
            final Duration maxRegistrationDuration =
                    taskManagerConfiguration.getMaxRegistrationDuration();
    
            if (maxRegistrationDuration != null) {
                //生成新的uuid
                final UUID newRegistrationTimeoutId = UUID.randomUUID();
                currentRegistrationTimeoutId = newRegistrationTimeoutId;
                scheduleRunAsync(
                         //定时异步检查,如果 currentRegistrationTimeoutId 和 newRegistrationTimeoutId 相等,就说明没连上
                        //说明在连接了以后,肯定有某个地方取消了 currentRegistrationTimeoutId。
                        () -> registrationTimeout(newRegistrationTimeoutId), maxRegistrationDuration);
            }
        }
    
  • 再看一下 tryConnectToResourceManager方法

    • 前提条件:监听到领导者后,resourceManagerAddress 已被解析生成。
    • 核心动作:调用 connectToResourceManager() 方法建立实际连接。
private void tryConnectToResourceManager() {
        if (resourceManagerAddress != null) {
            connectToResourceManager();
        }
    }

private void connectToResourceManager() {
    assert (resourceManagerAddress != null);
    assert (establishedResourceManagerConnection == null);
    assert (resourceManagerConnection == null);

    log.info("Connecting to ResourceManager {}.", resourceManagerAddress);

    // 封装 TaskExecutor 自身的基本信息,作为注册请求的数据
    final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(
            getAddress(),                            // TaskExecutor RPC 地址
            getResourceID(),                         // TaskExecutor 唯一资源 ID
            unresolvedTaskManagerLocation.getDataPort(),
            JMXService.getPort().orElse(-1),         // JMX 端口(用于监控)
            hardwareDescription,                     // 硬件信息
            memoryConfiguration,                     // 内存资源信息
            taskManagerConfiguration.getDefaultSlotResourceProfile(),  // 默认 slot 配置
            taskManagerConfiguration.getTotalResourceProfile(),        // 总资源配置
            unresolvedTaskManagerLocation.getNodeId()                  // 节点 ID
    );

    // 构建与 ResourceManager 通信的专用连接管理类
    resourceManagerConnection = new TaskExecutorToResourceManagerConnection(
            log,
            getRpcService(),
            taskManagerConfiguration.getRetryingRegistrationConfiguration(),
            resourceManagerAddress.getAddress(),                   // ResourceManager 地址
            resourceManagerAddress.getResourceManagerId(),         // ResourceManager session ID
            getMainThreadExecutor(),
            new ResourceManagerRegistrationListener(),             // 注册结果监听
            taskExecutorRegistration                               // 注册数据
    );

    // 正式启动连接与注册流程
    resourceManagerConnection.start();
}

核心方法:connectToResourceManager

关键组件说明
TaskExecutorRegistration
  • 封装了当前 TaskExecutor 节点的所有关键信息。
  • 这些信息将在后续通过 RPC 发送给 ResourceManager,用于注册与资源上报。
TaskExecutorToResourceManagerConnection
  • 这是一个专门负责管理与 ResourceManager 通信的类。
  • 核心职责:
    • 负责发送注册请求;
    • 管理连接状态与生命周期;
    • 支持断线重连、注册重试;
    • 通过监听器获取注册结果(成功/失败)。

resourceManagerConnection.start()
  • 启动该连接管理类:
    • 立即发起首次注册请求
    • 如果失败,依据配置进行重试
    • 注册成功后,建立起稳定的通信会话
    • 后续的心跳机制slot 报告均通过此连接进行。

RegisteredRpcConnection

  • 这是一个用于封装两个组件之间 RPC 通信逻辑的通用工具类
  • 在本文的场景中,主要体现为 TaskExecutorToResourceManagerConnection,负责管理 TaskExecutorResourceManager 之间的连接生命周期和注册流程。

start方法


    // ------------------------------------------------------------------------
    //  Life cycle
    // ------------------------------------------------------------------------

    public void start() {
        checkState(!closed, "The RPC connection is already closed");
        checkState(
                !isConnected() && pendingRegistration == null,
                "The RPC connection is already started");
		
        // 创建一个重试注册策略对象,用于管理连接和注册的重试逻辑
        final RetryingRegistration<F, G, S, R> newRegistration = createNewRegistration();

        if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
            //启动该策略
            newRegistration.startRegistration();
        } else {
            // concurrent start operation
            newRegistration.cancel();
        }
    }

关键点解析

  • start() 方法是连接生命周期的入口,负责启动注册和连接建立过程。
  • 通过原子更新器 (REGISTRATION_UPDATER) 保证注册策略只被启动一次,防止并发重复执行。
  • 注册过程封装为 RetryingRegistration,它实现了自动重试机制,确保在网络抖动或服务不可达时,能够持续尝试连接。
  • 如果已经有激活的注册流程,新的启动请求会被取消,避免资源浪费和竞态条件。

createNewRegistration()方法

  • 该方法负责生成一个带有回调机制的注册任务(RetryingRegistration),用以管理与目标组件(此处为 ResourceManager)的连接和注册流程。

    private RetryingRegistration<F, G, S, R> createNewRegistration() {
        RetryingRegistration<F, G, S, R> newRegistration = checkNotNull(generateRegistration());
    
        CompletableFuture<RetryingRegistration.RetryingRegistrationResult<G, S, R>> future =
                newRegistration.getFuture();
    
        future.whenCompleteAsync(
            (RetryingRegistration.RetryingRegistrationResult<G, S, R> result, Throwable failure) -> {
                if (failure != null) {
                    if (failure instanceof CancellationException) {
                        // 忽略取消异常,表示注册任务被主动取消
                        log.debug("Retrying registration towards {} was cancelled.", targetAddress);
                    } else {
                        // 非取消异常视为严重错误,触发失败处理
                        onRegistrationFailure(failure);
                    }
                } else {
                    // 注册结果处理
                    if (result.isSuccess()) {
                        targetGateway = result.getGateway();
                        onRegistrationSuccess(result.getSuccess());
                    } else if (result.isRejection()) {
                        onRegistrationRejection(result.getRejection());
                    } else {
                        throw new IllegalArgumentException(
                            String.format("Unknown retrying registration response: %s.", result));
                    }
                }
            },
            executor);
    
        return newRegistration;
    }
    

    核心流程说明:

    • 生成一个新的注册任务(RetryingRegistration)。

      • 该类中包含一个重要的成员变量:CompletableFuture<RetryingRegistrationResult<G, S, R>> completionFuture。

        • 这是整个注册过程的异步结果通知机制。
        • 通知的方法是 onRegistrationSuccess(result.getSuccess());
      • 若注册失败且非主动取消,触发失败处理逻辑。

      • 若注册成功,保存返回的 Gateway 并触发成功处理。

      • 若注册被拒绝,调用拒绝处理逻辑。

    • 整体通过异步回调执行,保证非阻塞执行。

startRegistration() 方法

  • 该方法是 RetryingRegistration 启动注册尝试的入口,完成目标地址解析及正式发起注册请求。
@SuppressWarnings("unchecked")
public void startRegistration() {
    if (canceled) {
        // 已被取消,直接返回
        return;
    }

    try {
        // 解析目标地址,获取 RPC Gateway 代理
        final CompletableFuture<G> rpcGatewayFuture;

        if (FencedRpcGateway.class.isAssignableFrom(targetType)) {
            rpcGatewayFuture =
                //接下来重点看  rpcService.connect 方法
                (CompletableFuture<G>) rpcService.connect(
                    targetAddress,
                    fencingToken,
                    targetType.asSubclass(FencedRpcGateway.class));
        } else {
            rpcGatewayFuture = rpcService.connect(targetAddress, targetType);
        }

        // 成功解析后,开始执行注册流程
        CompletableFuture<Void> rpcGatewayAcceptFuture = rpcGatewayFuture.thenAcceptAsync(
            (G rpcGateway) -> {
                log.info("Resolved {} address, beginning registration", targetName);
                register(
                    rpcGateway,
                    1,
                    retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis());
            },
            rpcService.getScheduledExecutor());

        // 如果解析失败且未取消,则根据配置延迟重试
        rpcGatewayAcceptFuture.whenCompleteAsync(
            (Void v, Throwable failure) -> {
                if (failure != null && !canceled) {
                    final Throwable strippedFailure = ExceptionUtils.stripCompletionException(failure);
                    if (log.isDebugEnabled()) {
                        log.debug(
                            "Could not resolve {} address {}, retrying in {} ms.",
                            targetName,
                            targetAddress,
                            retryingRegistrationConfiguration.getErrorDelayMillis(),
                            strippedFailure);
                    } else {
                        log.info(
                            "Could not resolve {} address {}, retrying in {} ms: {}",
                            targetName,
                            targetAddress,
                            retryingRegistrationConfiguration.getErrorDelayMillis(),
                            strippedFailure.getMessage());
                    }

                    startRegistrationLater(retryingRegistrationConfiguration.getErrorDelayMillis());
                }
            },
            rpcService.getScheduledExecutor());
    } catch (Throwable t) {
        // 出现异常,完成异常通知并取消注册
        completionFuture.completeExceptionally(t);
        cancel();
    }
}

重点解析

  • 首先判断是否已经取消,如果是则跳过。
  • 调用 rpcService.connect 解析目标地址,获取对应的 RPC Gateway。
    • 对支持 fencing 的 Gateway,会传入 fencingToken 进行鉴权。
  • 解析成功后,调用 register() 方法发起正式的注册请求,传入初始重试次数和超时时间。
  • 解析失败时,根据配置的延迟时间进行重试。
  • 异常捕获保证了在意外错误发生时,能正确地完成异常流程并取消注册。

总结:

  • 至此,TaskExecutorResourceManager注册阶段流程梳理完毕:
    • 监听 ResourceManager 领导者变更;
    • 建立连接并发送注册请求;
    • 注册过程支持自动重试与超时控制;
    • 注册成功后建立正式会话,后续的心跳与 slot 汇报基于该会话完成。
  • 后续章节将重点介绍:
    • 注册细节
      • 包括 TaskExecutorResourceManager 双方的网关对象生成机制;
    • 心跳机制
      • 注册成功后,如何通过心跳维持与 ResourceManager 的活跃连接;

网站公告

今日签到

点亮在社区的每一天
去签到