ResourceMangaer
ResourceManager
是 Flink 集群中的核心组件之一,负责资源的申请、分配与回收。在具备对以下底层机制的理解后:
HighAvailabilityServices
:提供 Leader 选举、地址监听等高可用能力;RpcSystem
:为 Flink 分布式组件之间的通信提供统一抽象;
我们就已经具备了阅读 ResourceManager
启动流程源码的基础。
DefaultDispatcherResourceManagerComponentFactory
.create
方法
- 前面初始化RpcSystem,blobServer等组件就不解析了。自己去看初始化的过程。
@Override
public DispatcherResourceManagerComponent create(
Configuration configuration,
ResourceID resourceId,
Executor ioExecutor,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
DelegationTokenManager delegationTokenManager,
MetricRegistry metricRegistry,
ExecutionGraphInfoStore executionGraphInfoStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
Collection<FailureEnricher> failureEnrichers,
FatalErrorHandler fatalErrorHandler)
throws Exception {
//这些都是 监听组件
LeaderRetrievalService dispatcherLeaderRetrievalService = null;
LeaderRetrievalService resourceManagerRetrievalService = null;
WebMonitorEndpoint<?> webMonitorEndpoint = null;
ResourceManagerService resourceManagerService = null;
DispatcherRunner dispatcherRunner = null;
try {
dispatcherLeaderRetrievalService =
highAvailabilityServices.getDispatcherLeaderRetriever();
resourceManagerRetrievalService =
highAvailabilityServices.getResourceManagerLeaderRetriever();
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =
new RpcGatewayRetriever<>(
rpcService,
DispatcherGateway.class,
DispatcherId::fromUuid,
new ExponentialBackoffRetryStrategy(
12, Duration.ofMillis(10), Duration.ofMillis(50)));
final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever =
new RpcGatewayRetriever<>(
rpcService,
ResourceManagerGateway.class,
ResourceManagerId::fromUuid,
new ExponentialBackoffRetryStrategy(
12, Duration.ofMillis(10), Duration.ofMillis(50)));
final ScheduledExecutorService executor =
WebMonitorEndpoint.createExecutorService(
configuration.get(RestOptions.SERVER_NUM_THREADS),
configuration.get(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint");
final long updateInterval =
configuration.get(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
final MetricFetcher metricFetcher =
updateInterval == 0
? VoidMetricFetcher.INSTANCE
: MetricFetcherImpl.fromConfiguration(
configuration,
metricQueryServiceRetriever,
dispatcherGatewayRetriever,
executor);
webMonitorEndpoint =
restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElection(),
fatalErrorHandler);
log.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();
final String hostname = RpcUtils.getHostname(rpcService);
resourceManagerService =
//这一步是关键,初始化 resourceManagerService。内部就是将 ResourceManager 初始化在 akka上。但是状态是未启动的
ResourceManagerServiceImpl.create(
resourceManagerFactory,
configuration,
resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
delegationTokenManager,
fatalErrorHandler,
new ClusterInformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
metricRegistry,
hostname,
ioExecutor);
final HistoryServerArchivist historyServerArchivist =
HistoryServerArchivist.createHistoryServerArchivist(
configuration, webMonitorEndpoint, ioExecutor);
final DispatcherOperationCaches dispatcherOperationCaches =
new DispatcherOperationCaches(
configuration.get(RestOptions.ASYNC_OPERATION_STORE_DURATION));
final PartialDispatcherServices partialDispatcherServices =
new PartialDispatcherServices(
configuration,
highAvailabilityServices,
resourceManagerGatewayRetriever,
blobServer,
heartbeatServices,
() ->
JobManagerMetricGroup.createJobManagerMetricGroup(
metricRegistry, hostname),
executionGraphInfoStore,
fatalErrorHandler,
historyServerArchivist,
metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
ioExecutor,
dispatcherOperationCaches,
failureEnrichers);
log.debug("Starting Dispatcher.");
dispatcherRunner =
dispatcherRunnerFactory.createDispatcherRunner(
highAvailabilityServices.getDispatcherLeaderElection(),
fatalErrorHandler,
new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),
ioExecutor,
rpcService,
partialDispatcherServices);
log.debug("Starting ResourceManagerService.");
//这里才是启动了状态
resourceManagerService.start();
//这里启动了监听
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
return new DispatcherResourceManagerComponent(
dispatcherRunner,
resourceManagerService,
dispatcherLeaderRetrievalService,
resourceManagerRetrievalService,
webMonitorEndpoint,
fatalErrorHandler,
dispatcherOperationCaches);
} catch (Exception exception) {
// clean up all started components
if (dispatcherLeaderRetrievalService != null) {
try {
dispatcherLeaderRetrievalService.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
}
if (resourceManagerRetrievalService != null) {
try {
resourceManagerRetrievalService.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
}
final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);
if (webMonitorEndpoint != null) {
terminationFutures.add(webMonitorEndpoint.closeAsync());
}
if (resourceManagerService != null) {
terminationFutures.add(resourceManagerService.closeAsync());
}
if (dispatcherRunner != null) {
terminationFutures.add(dispatcherRunner.closeAsync());
}
final FutureUtils.ConjunctFuture<Void> terminationFuture =
FutureUtils.completeAll(terminationFutures);
try {
terminationFuture.get();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
throw new FlinkException(
"Could not create the DispatcherResourceManagerComponent.", exception);
}
}
ResourceManagerServiceImpl
在 Flink 的集群管理中,ResourceManagerServiceImpl
是 资源管理组件 ResourceManager 的封装服务,用于支持高可用部署(HA)。它实现了两个重要接口:
1. ResourceManagerService
- 提供对外生命周期控制(如
start()
、closeAsync()
); - 是
DispatcherResourceManagerComponent
所依赖的通用接口; - 作为
ResourceManager
的启动入口。
2. LeaderContender
- 参与高可用 Leader 选举的核心接口;
- 当当前节点被选为 Leader 时,Flink HA 模块会调用
grantLeadership(UUID)
启动ResourceManager
; - 当失去 Leader 身份时,调用
revokeLeadership()
触发回收逻辑。
内部结构:延迟创建 ResourceManager
ResourceManagerServiceImpl
内部并不会立即创建 ResourceManager
实例,而是通过一个工厂方法延迟生成:
这是一个工厂接口(泛型类型可对应 YARN/K8s/Standalone 等部署环境);
真正的
ResourceManager
实例(如StandaloneResourceManager
)是在当选 Leader 时,由该工厂调用createResourceManager()
创建出来;启动流程会将生成的实例启动并注册 RPC 服务。
ResourceManagerServiceImpl
部分源码解析
private ResourceManagerServiceImpl(
ResourceManagerFactory<?> resourceManagerFactory,
ResourceManagerProcessContext rmProcessContext)
throws Exception {
this.resourceManagerFactory = checkNotNull(resourceManagerFactory);
this.rmProcessContext = checkNotNull(rmProcessContext);
//在standalone模式下,默认返回一个LeaderElection
this.leaderElection =
rmProcessContext.getHighAvailabilityServices().getResourceManagerLeaderElection();
this.fatalErrorHandler = rmProcessContext.getFatalErrorHandler();
this.ioExecutor = rmProcessContext.getIoExecutor();
this.handleLeaderEventExecutor = Executors.newSingleThreadExecutor();
this.serviceTerminationFuture = new CompletableFuture<>();
this.running = false;
this.leaderResourceManager = null;
this.leaderSessionID = null;
this.previousResourceManagerTerminationFuture = FutureUtils.completedVoidFuture();
}
@Override
public void start() throws Exception {
synchronized (lock) {
if (running) {
LOG.debug("Resource manager service has already started.");
return;
}
running = true;
}
LOG.info("Starting resource manager service.");
//this表明自己作为组件启动
leaderElection.startLeaderElection(this);
}
@Override
public void start() throws Exception {
synchronized (lock) {
if (running) {
LOG.debug("Resource manager service has already started.");
return;
}
running = true;
}
LOG.info("Starting resource manager service.");
leaderElection.startLeaderElection(this);
}
@Override
public void grantLeadership(UUID newLeaderSessionID) {
handleLeaderEventExecutor.execute(
() -> {
synchronized (lock) {
if (!running) {
LOG.info(
"Resource manager service is not running. Ignore granting leadership with session ID {}.",
newLeaderSessionID);
return;
}
LOG.info(
"Resource manager service is granted leadership with session id {}.",
newLeaderSessionID);
try {
//这里启动真正的 ResourceManager
startNewLeaderResourceManager(newLeaderSessionID);
} catch (Throwable t) {
fatalErrorHandler.onFatalError(
new FlinkException("Cannot start resource manager.", t));
}
}
});
}
@GuardedBy("lock")
private void startNewLeaderResourceManager(UUID newLeaderSessionID) throws Exception {
stopLeaderResourceManager();
this.leaderSessionID = newLeaderSessionID;
//工厂类方法,返回一个真正的 leaderResourceManager
this.leaderResourceManager =
resourceManagerFactory.createResourceManager(rmProcessContext, newLeaderSessionID);
final ResourceManager<?> newLeaderResourceManager = this.leaderResourceManager;
//previousResourceManagerTerminationFuture 是一个 FutureUtils.completedVoidFuture();表示已经完成的 Future。可以立马被运行
previousResourceManagerTerminationFuture
//对结果调用函数并执行返回的future
.thenComposeAsync(
(ignore) -> {
synchronized (lock) {
return startResourceManagerIfIsLeader(newLeaderResourceManager);
}
},
handleLeaderEventExecutor)
//对上一步返回的结果调用。
.thenAcceptAsync(
(isStillLeader) -> {
if (isStillLeader) {
leaderElection.confirmLeadershipAsync(
newLeaderSessionID, newLeaderResourceManager.getAddress());
}
},
ioExecutor);
}
@GuardedBy("lock")
private CompletableFuture<Boolean> startResourceManagerIfIsLeader(
ResourceManager<?> resourceManager) {
if (isLeader(resourceManager)) {
//这一步关键,调用了 rpcServer.start方法。真正启动了 resourcemanager方法
resourceManager.start();
forwardTerminationFuture(resourceManager);
return resourceManager.getStartedFuture().thenApply(ignore -> true);
} else {
return CompletableFuture.completedFuture(false);
}
}