20. TaskExecutor与ResourceManager心跳

发布于:2025-07-22 ⋅ 阅读:(11) ⋅ 点赞:(0)

20. TaskExecutor与ResourceManager心跳

  • 现在,需要回过头看 ResourceManager是如何产生心跳管理服务的。
  • cluster.initializeServices 方法的 heartbeatServices = createHeartbeatServices(configuration);产生一个 HeartbeatServicesImpl
  • jobmanager的 resourceManager启动的时候,会启动 startHeartbeatServices();

startHeartbeatServices方法

  • 该方法产生2个对象。1个taskManagerHeartbeatManager和1个jobManagerHeartbeatManager。这次主要看前面1个。
private void startHeartbeatServices() {
        taskManagerHeartbeatManager =
                heartbeatServices.createHeartbeatManagerSender(
                        resourceId,
                        new TaskManagerHeartbeatListener(),
                        getMainThreadExecutor(),
                        log);

        jobManagerHeartbeatManager =
                heartbeatServices.createHeartbeatManagerSender(
                        resourceId,
                        new JobManagerHeartbeatListener(),
                        getMainThreadExecutor(),
                        log);
    }
  • taskManagerHeartbeatManager:负责监控所有已注册的 TaskExecutor,是 ResourceManager 与 TaskExecutor 间心跳通信的核心组件。
  • jobManagerHeartbeatManager:与 JobManager 保持心跳(此处暂不关注)。

HeartbeatManagerSenderImpl

  • HeartbeatManagerSenderImpl 是 ResourceManager 端的心跳发送管理器,具备以下特点:
    • 继承 HeartbeatManagerImpl,是具体的心跳机制实现;
    • 实现 Runnable 接口,自身是一个周期性执行的任务。
  • 核心点:
    • 周期性遍历 getHeartbeatTargets()(即所有注册的 TaskExecutor);
    • 逐个向它们发送心跳请求(requestHeartbeat 方法)。
HeartbeatManagerSenderImpl(
            long heartbeatPeriod,
            long heartbeatTimeout,
            int failedRpcRequestsUntilUnreachable,
            ResourceID ownResourceID,
            HeartbeatListener<I, O> heartbeatListener,
            ScheduledExecutor mainThreadExecutor,
            Logger log,
            HeartbeatMonitor.Factory<O> heartbeatMonitorFactory) {
        super(
                heartbeatTimeout,
                failedRpcRequestsUntilUnreachable,
                ownResourceID,
                heartbeatListener,
                mainThreadExecutor,
                log,
                 heartbeatMonitorFactory);

        this.heartbeatPeriod = heartbeatPeriod;
    	//这里表明周期调度
        mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
    }

   @Override
    public void run() {
        if (!stopped) {
            log.debug("Trigger heartbeat request.");
            for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
                //这里就是循环将前面taskExecutor注册心跳取出来,进行心跳
                requestHeartbeat(heartbeatMonitor);
            }
			
            //这里单线程周期调度
            getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
        }
    }

requestHeartbeat 方法

//heartbeatMonitor封装了一个taskexecutor网关。说白了就是heartbeatTarget就是调用 taskexecutor方法进行交互。
private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {
        O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
        final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();

        heartbeatTarget
                //TaskExecutorHeartbeatSender 就是前面讲的rpc调用
                .requestHeartbeat(getOwnResourceID(), payload)
                .whenCompleteAsync(
                        handleHeartbeatRpc(heartbeatMonitor.getHeartbeatTargetId()),
                        getMainThreadExecutor());
    }
  • heartbeatTarget

    • 实际上是封装的 TaskExecutorHeartbeatSender

    • 通过 RPC 接口向对应 TaskExecutor 发出心跳请求;

    • 本质上是对 TaskExecutor 方法的远程调用。

  • TaskExecutorHeartbeatSender 实质上是一个封装了 TaskExecutor RPC 网关的对象,负责通过 RPC 调用向 TaskExecutor 发送心跳请求。推荐配合 Debug 调试理解整个心跳交互过程,有助于深入掌握 ResourceManager 与 TaskExecutor 之间的通信机制。

小结

  • ResourceManager 在启动阶段就为所有 TaskExecutor 准备好了心跳监控;

  • 依靠单线程周期调度器,实现对所有 TaskExecutor 的心跳请求发送;

  • 心跳本质是对 TaskExecutor RPC 方法的远程调用

  • 如果某个 TaskExecutor 心跳超时或失败,会触发资源回收与故障恢复机制。


网站公告

今日签到

点亮在社区的每一天
去签到