TaskExecutor获取ResourceManagerGateway
TaskExecutor
与ResourceManager
之间的交互机制较为复杂,核心可以拆分为三个阶段:- 首次发现与注册
- 连接建立
- 心跳维持
本文聚焦连接建立阶段,详细分析底层
RPC
连接的实现原理。
回顾:startRegistration
方法
在注册过程中,TaskExecutor
会调用如下逻辑,通过 rpcService.connect
与 ResourceManager
建立远程通信连接:
//其中,targetType 是 ResourceManagerGateway.class。重点关注 rpcService.connect 方法。
if (FencedRpcGateway.class.isAssignableFrom(targetType)) {
rpcGatewayFuture = (CompletableFuture<G>)
rpcService.connect(
targetAddress,
fencingToken,
targetType.asSubclass(FencedRpcGateway.class)
);
}
PekkoRpcService
- Flink 内部的 RPC 框架由 Pekko(Akka) 支撑,
PekkoRpcService
就是基于 Pekko 实现的通信组件,负责不同组件之间的远程通信。
@Override
public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(
String address, F fencingToken, Class<C> clazz) {
return connectInternal(
address,
clazz,
(ActorRef actorRef) -> {
Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);
return new FencedPekkoInvocationHandler<>( // 关键:创建 InvocationHandler
addressHostname.f0,
addressHostname.f1,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
configuration.isForceRpcInvocationSerialization(),
null,
() -> fencingToken, // 支持 fencingToken 防止旧节点通信
captureAskCallstacks,
flinkClassLoader
);
}
);
}
核心方法:connectInternal
connectInternal
方法的任务是:
- 通过目标组件的 RPC 地址,获取 ActorRef(类似 NIO 中的 selector-channel);
- 与目标 Actor(如 ResourceManager)完成一次握手校验;
- 基于
InvocationHandler
生成远程组件的代理对象(Gateway),用于后续透明 RPC 调用。
private <C extends RpcGateway> CompletableFuture<C> connectInternal(
String address,
Class<C> clazz,
Function<ActorRef, InvocationHandler> invocationHandlerFactory) {
final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);
final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =
actorRefFuture.thenCompose(
(ActorRef actorRef) -> Patterns.ask(
actorRef,
new RemoteHandshakeMessage(clazz, getVersion()), // 发送握手请求
configuration.getTimeout().toMillis()
).mapTo(ClassTag$.MODULE$.apply(HandshakeSuccessMessage.class))
);
final CompletableFuture<C> gatewayFuture =
actorRefFuture.thenCombineAsync(
handshakeFuture,
(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);
@SuppressWarnings("unchecked")
C proxy = (C) Proxy.newProxyInstance(
getClass().getClassLoader(),
new Class<?>[] {clazz},
invocationHandler
);
return proxy; // 返回 ResourceManagerGateway 动态代理
},
actorSystem.dispatcher()
);
return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader);
}
获取 ActorRef:resolveActorAddress
private CompletableFuture<ActorRef> resolveActorAddress(String address) {
final ActorSelection actorSel = actorSystem.actorSelection(address);
return actorSel.resolveOne(configuration.getTimeout())
.toCompletableFuture()
.exceptionally(error -> {
throw new CompletionException(
new RpcConnectionException(
String.format("Could not connect to rpc endpoint under address %s.", address),
error
)
);
});
}
- 根据
address
定位到目标组件的 Actor(类似根据地址寻找远程服务端点)。 - 异步获取目标组件的
ActorRef
,这是后续所有远程消息传递的核心通信对象。 - 如果解析失败,立即包装为
RpcConnectionException
抛出,阻断注册链路。 - 特别注意:
该方法返回的CompletableFuture<ActorRef>
是由Akka
内部线程异步完成的(即依赖ActorSystem
自身的调度机制)。
因此,无需显式调用executor
来管理异步逻辑,整个链式流程天然是异步的,并由Akka
自身的事件机制驱动完成。
这也是Akka
(Pekko
)模型的设计优势:
➔ 组件间通信与任务调度完全解耦,基于 ActorRef 的消息传递自动异步非阻塞。
RemoteHandshakeMessage:初次握手阶段
作用:
在建立正式通信前,TaskExecutor
必须先与ResourceManager
进行协议层握手,确保:- 版本一致;
- 所请求的网关类型(如 ResourceManagerGateway)是对方支持的。
源码
private void handleHandshakeMessage(RemoteHandshakeMessage handshakeMessage) {
if (!isCompatibleVersion(handshakeMessage.getVersion())) {
sendErrorIfSender(
new HandshakeException(
String.format(
"Version mismatch between source (%s) and target (%s) rpc component. Please verify that all components have the same version.",
handshakeMessage.getVersion(), getVersion())));
} else if (!isGatewaySupported(handshakeMessage.getRpcGateway())) {
sendErrorIfSender(
new HandshakeException(
String.format(
"The rpc endpoint does not support the gateway %s.",
handshakeMessage.getRpcGateway().getSimpleName())));
} else {
//告诉taskExecutor,可以连接
getSender().tell(new Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf());
}
}
握手处理逻辑:PekkoRpcActor.handleHandshakeMessage()
版本校验
- 如果通信双方的 Flink 版本不一致(可能是跨版本集群或配置错误),直接拒绝握手并返回错误信息。
接口类型校验
- 检查请求方希望通信的 Gateway 接口(即 ResourceManagerGateway)是否被当前端点支持。
- 不支持的网关说明连接请求本质错误,拒绝握手。
握手成功
前两步校验都通过,表明可以安全建立通信。
此时向对方返回:
java 复制编辑 getSender().tell(new Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf());
即告诉发起方(如 TaskExecutor):可以正式建立连接。
actorRefFuture.thenCombineAsync
:构建 ResourceManagerGateWay代理
核心目的:
- 根据 ResourceManager 的 ActorRef 构建其 RPC 代理(即 ResourceManagerGateway 的动态代理对象)
源码解析
final CompletableFuture<C> gatewayFuture =
actorRefFuture.thenCombineAsync(
handshakeFuture,
(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
InvocationHandler invocationHandler =
invocationHandlerFactory.apply(actorRef);
ClassLoader classLoader = getClass().getClassLoader();
@SuppressWarnings("unchecked")
C proxy = (C)
Proxy.newProxyInstance(
classLoader,
new Class<?>[] {clazz},
invocationHandler);
return proxy;
},
actorSystem.dispatcher());
整体过程
- 等待 actorRefFuture 和 handshakeFuture 都完成:
- actorRefFuture:已经获取了 ResourceManager 的 ActorRef。
- handshakeFuture:握手成功,确认可以通信。
- 生成 InvocationHandler:
- 实际是封装了 actorRef 和通信参数的一个代理调用处理器。
- 后续所有发往 ResourceManager 的方法调用,都会被转化成消息,通过这个 handler 发送到 actorRef 对应的远程组件。
- 构建代理对象:
- 使用 JDK 动态代理(
Proxy.newProxyInstance
)创建了一个ResourceManagerGateway 的动态代理。 - 对用户代码来说,这个代理就是一个普通的 ResourceManagerGateway,只是内部通过 actorRef 做远程消息发送。
- 使用 JDK 动态代理(
- 返回代理对象(proxy):
- proxy 就是一个“可直接远程调用 ResourceManager”的接口对象。
总结
- TaskExecutor 已获取 ResourceManager 的代理网关(即
ResourceManagerGateway
代理对象); - 该代理对象封装了与 ResourceManager 通信所需的 actorRef 和 RPC 协议细节;
- TaskExecutor 接下来只需要通过该网关对象,正式发起注册请求。
这一阶段的核心工作是:
- 建立连接(即通过
rpcService.connect
拿到 ResourceManager 的 actorRef 并创建代理网关); - 完成握手(确保版本兼容和接口匹配);
- 生成代理(通过动态代理对外提供 ResourceManagerGateway 接口)。
下一步就是:
- TaskExecutor 通过该网关对象向 ResourceManager 发起注册;
- ResourceManager 受理注册请求;
- 建立心跳与 slot 汇报等稳定的会话机制。