ZooKeeper 深度实践:从原理到 Spring Boot 全栈落地

发布于:2025-08-05 ⋅ 阅读:(17) ⋅ 点赞:(0)

在 Kubernetes 为主流注册发现的今天,给出如何在 Spring Boot 中基于 ZooKeeper 实现服务注册/发现、分布式锁、配置中心以及集群协调的完整代码与最佳实践。所有示例均可直接复制运行。


1. ZooKeeper 架构与核心原理

1.1 角色

  • Leader:处理写请求,广播事务(ZAB 协议)。
  • Follower / Observer:处理读请求,Follower 参与选举,Observer 仅扩展读能力。

1.2 一致性协议:ZAB(ZooKeeper Atomic Broadcast)

  1. 所有写请求统一由 Leader 生成全局递增的 zxid
  2. 两阶段提交(Proposal → ACK → Commit)。
  3. 崩溃恢复阶段:根据 zxid 选举新 Leader,保证已 Commit 的事务不丢失。

1.3 数据模型

/
├── services
│   ├── user-service
│   │   ├── 192.168.1.10#8080  (EPHEMERAL_SEQUENTIAL)
│   │   └── 192.168.1.11#8080
│   └── order-service
├── configs
│   └── application.yml
└── locks
    ├── pay_lock_0000000001 (EPHEMERAL_SEQUENTIAL)
    └── pay_lock_0000000002
  • EPHEMERAL:会话断则节点自动删除,天然适合心跳/服务实例。
  • SEQUENTIAL:节点名后缀自增,用于公平锁、队列。

2. Spring Boot 集成 ZooKeeper

场景:K8s 已有 Service 发现,但团队需要异构语言互通强一致配置分布式锁,于是引入 ZooKeeper。

2.1 依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>5.5.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-discovery</artifactId>
    <version>5.5.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.5.0</version>
</dependency>

2.2 自动配置(Spring Boot 3.x)

@Configuration
@EnableConfigurationProperties(ZkProps.class)
public class ZkConfig {

    @Bean(initMethod = "start", destroyMethod = "close")
    public CuratorFramework curator(ZkProps p) {
        return CuratorFrameworkFactory.builder()
                .connectString(p.getUrl())
                .sessionTimeoutMs(30_000)
                .connectionTimeoutMs(10_000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();
    }

    @Bean
    public ServiceDiscovery<InstanceDetails> discovery(CuratorFramework client) throws Exception {
        ServiceDiscovery<InstanceDetails> sd = ServiceDiscoveryBuilder
                .builder(InstanceDetails.class)
                .client(client)
                .basePath("/services")
                .serializer(new JsonInstanceSerializer<>(InstanceDetails.class))
                .build();
        sd.start();
        return sd;
    }
}

2.3 服务注册(应用启动时自动注册)

@Component
@RequiredArgsConstructor
public class ZkRegistrar implements ApplicationRunner {

    private final ServiceDiscovery<InstanceDetails> discovery;
    private final ZkProps props;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        InstanceDetails payload = new InstanceDetails(props.getProfile());
        ServiceInstance<InstanceDetails> instance = ServiceInstance.<InstanceDetails>builder()
                .name(props.getAppName())
                .id(props.getPodIp() + ":" + props.getPort())
                .address(props.getPodIp())
                .port(props.getPort())
                .payload(payload)
                .build();
        discovery.registerService(instance);
    }
}

2.4 服务发现(负载均衡示例)

@Component
@RequiredArgsConstructor
public class ZkLoadBalancer {

    private final ServiceDiscovery<InstanceDetails> discovery;

    public InstanceDetails choose(String serviceName) throws Exception {
        Collection<ServiceInstance<InstanceDetails>> instances =
                discovery.queryForInstances(serviceName);
        if (instances.isEmpty()) throw new IllegalStateException("No instances");
        // 轮询
        return instances.stream()
                .skip(ThreadLocalRandom.current().nextInt(instances.size()))
                .findFirst()
                .orElseThrow()
                .getPayload();
    }
}

3. 分布式锁:Curator Recipes

Curator 提供 InterProcessMutex(可重入)、InterProcessSemaphoreMutex(不可重入)等实现。

3.1 配置

@Bean
public InterProcessMutex payLock(CuratorFramework client) {
    return new InterProcessMutex(client, "/locks/pay");
}

3.2 业务中使用

@Service
@RequiredArgsConstructor
public class PayService {

    private final InterProcessMutex payLock;

    public void pay(String orderId) throws Exception {
        if (payLock.acquire(10, TimeUnit.SECONDS)) {
            try {
                // 幂等扣款逻辑
            } finally {
                payLock.release();
            }
        } else {
            throw new RuntimeException("获取锁超时");
        }
    }
}

3.3 高级:读写锁

@Bean
public InterProcessReadWriteLock rwLock(CuratorFramework client) {
    return new InterProcessReadWriteLock(client, "/locks/rw");
}

4. 配置中心(动态刷新)

4.1 存储

/configs/application.yml

4.2 监听与热更新

@Component
@RequiredArgsConstructor
public class ConfigWatcher {

    private final CuratorFramework client;
    private final Environment env;

    @PostConstruct
    public void watch() throws Exception {
        TreeCache cache = TreeCache.newBuilder(client, "/configs").build();
        cache.getListenable().addListener((cf, event) -> {
            if (event.getType() == TreeCacheEvent.Type.NODE_UPDATED) {
                String path = event.getData().getPath();
                if (path.endsWith("application.yml")) {
                    byte[] data = event.getData().getData();
                    // 这里触发 Spring Environment 刷新
                    ((ConfigurableEnvironment) env).getPropertySources()
                            .replace("zk-config", new MapPropertySource("zk-config",
                                    new Yaml().load(new String(data))));
                }
            }
        });
        cache.start();
    }
}

5. 最佳实践与注意事项

维度 建议
部署 3 或 5 节点奇数集群,独立 SSD,JVM 堆 4-8G,开启快照自动清理。
会话 会话超时 < 客户端 GC 时间;避免长时间 STW。
节点 数据节点 < 1MB,子节点 < 10 万;使用 Observer 扩展读。
锁路径独立;锁内逻辑幂等、可重试;设置超时避免死锁。
K8s StatefulSet 部署 ZooKeeper;Headless Service 使 Pod 稳定 DNS。
迁移 若未来迁到 etcd,可通过 Curator-to-etcd Bridge 逐步替换。

6. 小结

功能 K8s 原生 ZooKeeper 方案优势
服务发现 CoreDNS 跨语言、精细权重、健康检查可扩展
分布式锁 强一致、可重入、读写锁
配置中心 ConfigMap 监听粒度细、版本化、变更审计
集群协调 Leader 选举、队列、屏障(Barrier)

K8s 为主的今天,ZooKeeper 并非过时,而是作为强一致协调层的补充,特别适合金融交易、库存扣减、大规模异构系统


参考阅读

如需进一步探讨性能压测脚本K8s Operator 部署方案,欢迎留言!


网站公告

今日签到

点亮在社区的每一天
去签到