13. Flink 高可用机制简述(Standalone 模式)

发布于:2025-07-14 ⋅ 阅读:(18) ⋅ 点赞:(0)

Flink 高可用机制简述(Standalone 模式)

  • 在 Flink 的高可用(HA)架构中,核心是对 JobManager 的主节点(Leader) 的管理与选举。Flink 通过一套可插拔的服务接口,来实现高可用功能,主要涉及以下几个关键类:

    1. HighAvailabilityServices

    • Flink 高可用功能的统一入口。
    • 提供用于 Leader 选举Leader 监听 的服务实例。
    • 不同部署模式(如 ZooKeeper、Kubernetes、Standalone)通过不同实现类来生成选举服务。

    2. LeaderRetrievalService

    • 用于“监听”当前的 Leader。
    • 当检测到 Leader 发生变化时,会调用 LeaderRetrievalListener.notifyLeaderAddress() 方法,通知对应组件更新地址。
    • 常用于 ResourceManagerTaskManager 等组件,获取最新的 Dispatcher / JobManager 地址。

    3. LeaderElection

    • 是对 LeaderElectionService 的封装,代表一次具体的“参选过程”。
    • 被动接收 LeaderElectionService 的选举结果,并代理调用 LeaderContender 的方法,完成注册、注销、确认等操作。

    4. LeaderElectionService

    • 提供具体的选举服务(例如基于 ZooKeeperKubernetes 实现)。
    • 负责整个选举生命周期:启动、竞选、发现已有 Leader 等。
    • 是 HA 的核心抽象组件。
    • 注意:
      • Standalone 模式 下,Flink 实际并不会使用 LeaderElectionService 接口的实现类。也就是说,在 Standalone 模式下的源码路径中,LeaderElectionService 是被绕过的,这也是为什么你在调试启动流程时没有看到它被真正调用的原因。

    5. LeaderContender

    • 这是一个接口,用于表示“竞选 Leader 的参与者”。
    • 想要支持高可用的组件(例如:JobManagerDispatcher)都需要实现这个接口。
    • 当被选为 Leader 时,会被回调 grantLeadership() 方法。

说明

Flink 默认提供了两种 HA 实现:

  • 基于 ZooKeeper 的实现:最常见、最成熟,适用于生产环境。
  • 基于 Kubernetes API 的实现:用于在原生 K8s 环境下管理 HA。

本笔记聚焦于 Standalone 模式 下的源码解析,因此不会深入 ZooKeeper / K8s 的实现。未来可能会单独撰写关于基于 ZooKeeperFlink HA 实现细节。

StandaloneHaServices

Standalone 模式下的 HighAvailabilityServices 实现分析

在 Flink 中,HighAvailabilityServices 是高可用服务的顶层接口,提供如下核心能力:

  • 返回各组件的 LeaderRetrievalService:用于监听 Leader 地址变化;
  • 返回各组件的 LeaderElectionService:用于注册参与 Leader 选举;
  • 提供任务状态(如 checkpoint/savepoint)存储相关服务;
  • 提供作业图(JobGraph)持久化服务;
  • 提供运行中 Job 的注册与发现服务。

StandaloneHaServices 的继承结构

HighAvailabilityServices (接口)
    ↑
AbstractNonHaServices (抽象类)
    ↑
StandaloneHaServices (实现类)

AbstractNonHaServices 表示“非高可用”的通用实现抽象;

StandaloneHaServices 是 Flink 在 standalone 模式 下提供的具体实现,本质上是一个“伪高可用”实现,用于本地或测试环境

StandaloneHaServices#getResourceManagerLeaderRetriever

@Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
    return new StandaloneLeaderRetrievalService(resourceManagerRpcAddress, resourceManagerHostname);
}

直接返回了一个 StandaloneLeaderRetrievalService 实例;

该服务不会监听 ZooKeeper 或 Kubernetes 等外部系统的变化,而是返回 预设的地址

一旦配置写死,整个系统中所有组件都会认为这个地址是 ResourceManager 的 Leader,不存在自动 failover 的机制。

StandaloneHaServices的Leader初始化流程

在 Flink 的 Standalone 模式 中,不依赖 ZooKeeper 或 Kubernetes 来做 Leader 选举。取而代之的是一套轻量级的本地模拟机制,核心体现在以下几个方面:

1. Leader 地址字段初始化

StandaloneHaServices 中维护了如下成员变量:

private final String resourceManagerAddress;
private final String dispatcherAddress;
private final String clusterRestEndpointAddress;
  • 这三个地址是从 Flink 配置中读取的(即当前启动节点的 RPC 地址);

  • 因为没有外部协调组件,这些地址就是系统默认认可的 Leader 地址。

2. Leader Election 的伪实现

调用 getResourceManagerLeaderElection() 时,会返回:

@Override
public LeaderElection getResourceManagerLeaderElection(LeaderContender contender) {
    return new StandaloneLeaderElection(contender);
}

  • 使用的是 StandaloneLeaderElection,是一个非真正选举的实现
  • 内部自动分配一个 UUID 作为 leader session ID;
  • 然后立即调用 contender.grantLeadership(UUID),将该 contender 设置为 Leader。

也就是说:谁启动,谁就是 Leader,没有真正的竞选过程。

3.StandaloneHaServices源码
public class StandaloneHaServices extends AbstractNonHaServices {

    /** The fix address of the ResourceManager. */
    private final String resourceManagerAddress;

    /** The fix address of the Dispatcher. */
    private final String dispatcherAddress;

    private final String clusterRestEndpointAddress;

    /**
     * Creates a new services class for the fix pre-defined leaders.
     *
     * @param resourceManagerAddress The fix address of the ResourceManager
     * @param clusterRestEndpointAddress
     */
    public StandaloneHaServices(
            String resourceManagerAddress,
            String dispatcherAddress,
            String clusterRestEndpointAddress) {
        this.resourceManagerAddress =
                checkNotNull(resourceManagerAddress, "resourceManagerAddress");
        this.dispatcherAddress = checkNotNull(dispatcherAddress, "dispatcherAddress");
        this.clusterRestEndpointAddress =
                checkNotNull(clusterRestEndpointAddress, clusterRestEndpointAddress);
    }

    // ------------------------------------------------------------------------
    //  Services
    // ------------------------------------------------------------------------

    @Override
    public LeaderRetrievalService getResourceManagerLeaderRetriever() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderRetrievalService(resourceManagerAddress, DEFAULT_LEADER_ID);
        }
    }

    @Override
    public LeaderRetrievalService getDispatcherLeaderRetriever() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderRetrievalService(dispatcherAddress, DEFAULT_LEADER_ID);
        }
    }

    @Override
    public LeaderElection getResourceManagerLeaderElection() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderElection(DEFAULT_LEADER_ID);
        }
    }

    @Override
    public LeaderElection getDispatcherLeaderElection() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderElection(DEFAULT_LEADER_ID);
        }
    }

    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderRetrievalService("UNKNOWN", DEFAULT_LEADER_ID);
        }
    }

    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(
            JobID jobID, String defaultJobManagerAddress) {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderRetrievalService(
                    defaultJobManagerAddress, DEFAULT_LEADER_ID);
        }
    }

    @Override
    public LeaderElection getJobManagerLeaderElection(JobID jobID) {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderElection(DEFAULT_LEADER_ID);
        }
    }

    @Override
    public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderRetrievalService(
                    clusterRestEndpointAddress, DEFAULT_LEADER_ID);
        }
    }

    @Override
    public LeaderElection getClusterRestEndpointLeaderElection() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderElection(DEFAULT_LEADER_ID);
        }
    }
}

StandaloneLeaderElection

Flink 的高可用架构中,LeaderElection 扮演了连接 LeaderContenderLeaderElectionService 的桥梁角色,而在 Standalone 模式下,StandaloneLeaderElection 则是该机制的本地实现版本。

StandaloneLeaderElection 简要结构

该类继承自 LeaderElection,其内部包含两个核心成员变量:

  • UUID leaderSessionID:模拟的 Leader 标识;
  • LeaderContender contender:高可用组件(如 ResourceManager、Dispatcher 等),实现了 LeaderContender 接口。

启动逻辑:

  • 通过调用 startLeaderElection() 方法:
    • 直接触发 contender.grantLeadership(leaderSessionID)
    • 模拟该组件成为 Leader;
  • 无需任何外部系统协作。

这也是为什么在 Standalone 模式中,启动即是 Leader,完全不依赖真正的选举过程

为什么说 LeaderElection 是连接的“代理层”

在真正的高可用场景(如 ZooKeeper 模式)下:

  • LeaderElectionService
    • 负责与外部系统打交道(例如创建 ZNode、监听节点变化);
    • 判断当前节点是否有资格成为 Leader;
    • 是具体的选举机制实现者。
  • LeaderElection
    • 是一个薄封装;
    • 屏蔽了选举服务的复杂性,只关注:是否被选中,以及如何通知 LeaderContender
    • 在内部会调用 startLeaderElection 的方法,同时向外部组件暴露一个统一接口。

因此,LeaderElection 才是实现“选举逻辑”与“业务组件”解耦的关键抽象

StandaloneLeaderElection源码

public class StandaloneLeaderElection implements LeaderElection {

    private final Object lock = new Object();

    private final UUID sessionID;
    @Nullable private LeaderContender leaderContender;

    public StandaloneLeaderElection(UUID sessionID) {
        //获取生成的uuid
        this.sessionID = sessionID;
    }

    @Override
    public void startLeaderElection(LeaderContender contender) throws Exception {
        synchronized (lock) {
            Preconditions.checkState(
                    leaderContender == null,
                    "No LeaderContender should have been registered with this LeaderElection, yet.");
            this.leaderContender = contender;
			
            //这里调用 具体的逻辑 启动 contender
            this.leaderContender.grantLeadership(sessionID);
        }
    }

    @Override
    public CompletableFuture<Void> confirmLeadershipAsync(
            UUID leaderSessionID, String leaderAddress) {
        return FutureUtils.completedVoidFuture();
    }

    @Override
    public CompletableFuture<Boolean> hasLeadershipAsync(UUID leaderSessionId) {
        synchronized (lock) {
            return CompletableFuture.completedFuture(
                    this.leaderContender != null && this.sessionID.equals(leaderSessionId));
        }
    }

    @Override
    public void close() throws Exception {
        synchronized (lock) {
            if (this.leaderContender != null) {
                this.leaderContender.revokeLeadership();
                this.leaderContender = null;
            }
        }
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到