zookeeper 实现分布式锁

发布于:2025-09-06 ⋅ 阅读:(12) ⋅ 点赞:(0)

目录

核心思想

模拟实现

建立连接

 Watcher

EventType

keeperState

加锁

解锁

测试


zookeeper 是一个高性能、高可用的分布式协调服务。依赖 ZNode 节点的有序性 Watcher(监听)机制,可以实现多个客户端之间的互斥访问

接下来,我们就来看如何使用 zookeeper 实现分布式锁

核心思想

基于 zookeeper 的临时顺序节点监听机制,我们可以实现分布式的公平锁

1. 当客户端尝试获取锁时,会在 zookeeper 上创建一个临时顺序节点(EPHEMERAL_SEQUENTIAL)

2. 所有节点按照顺序编号(如:/seq-000000001、/seq-000000002、/seq-000000003)

3. 客户端检查自己创建的节点是否是当前所有子节点中序号最小

        若是,获取锁

        若不是,监听前一个序号节点的删除事件(等待前一个客户端释放锁)

4. 当客户端处理完毕后,删除创建节点,此时 zookeeper 就会通知下一个等待的客户端获取锁

接下来,我们就可以根据上述思路,自己实现分布式锁

模拟实现

我们需要来梳理一下需要实现的功能:

1. 要对 zookeeper 节点进行操作,首先需要与 zookeeper 建立连接

2. 进行加锁,创建临时顺序节点,检查当前是否是最小节点,若是,立即获取到锁;若不是,监听前一节点

3. 处理完成后,进行解锁,删除创建的临时顺序节点

接下来,我们就来分别实现这些功能

建立连接

要使用官方提供的 zookeeper 客户端库,首先需要添加 Maven 依赖:

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.8.4</version> <!-- 尽量与服务端版本一致 -->
        </dependency>

创建 zookeeper 连接:

public class DistributedLock {
    // zookeeper 服务器地址
    private static final String CONNECT_STRING = "47.112.48.236:2182";
    // 会话超时时间(毫秒)
    private static final int SESSION_TIMEOUT = 300000;
    private ZooKeeper zooKeeper;
    // 加锁节点
    private String lockPath;
    // 等待连接建立
    private CountDownLatch connectLatch = new CountDownLatch(1);
    
    public void connect() throws InterruptedException, IOException {
        zooKeeper = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    // 连接成功,释放阻塞
                    connectLatch.countDown();
                }
            }
        });
        // 等待连接建立
        connectLatch.await();
        System.out.println("zookeeper 连接成功");
    }
}

我们使用 CountDownLatch 来等待连接建立,连接建立成功后再执行后续逻辑,其中:

SESSION_TIMEOUT 是 zookeeper 会话超时时间,过期后服务器会自动清除临时节点

我们重点来看 Watcher

 Watcher

Watcher 是 zookeeper 的 一次性事件通知机制,用于监听 ZNode 的数据变化(setData)、子节点变化(create / delete 子节点)、节点删除(delete)以及客户端连接状态变化(连接、断开、重连等)

Watcher 中的核心组件:

EventType:事件类型表示发生了什么类型的变更

keeperState:通知状态表示 ZooKeeper 客户端与服务端的连接状态

path:监听路径,即被监听的 ZNode 路径

EventType

org.apache.zookeeper.Watcher.Event.EventType 枚举类型

None:特殊类型,表示连接状态变化,如 SyncConnected、Disconnected

NodeCreated:目标节点被创建

NodeDeleted:目标节点被删除

NodeDataChanged:目标节点的数据被修改

NodeChildrenChanged:目标节点的子节点被创建或删除(不通知具体哪个子节点变化,只表示子节点有变化)

keeperState

 Watcher.Event.KeeperState 定义,表示客户端与 zookeeper 集群的连接状态:

SyncConnected:客户端与服务器正常连接

Disconnected:客户端与服务器断开(客户端视角)

Expired:会话超时,服务器已清除该客户端的 session 和临时节点

AuthFailed:认证失败

Watcher 的工作流程:

+------------------+     +------------------+
|   客户端注册      | --> |  服务器记录 Watcher |
|   zk.exists(path, |     |                  |
|        watcher)   |     +------------------+
+------------------+
         |
         v
+------------------+
|  事件发生         | --> 节点被创建/删除/修改
+------------------+
         |
         v
+------------------+
|  服务器发送通知   | --> 异步通知客户端
+------------------+
         |
         v
+------------------+
|  客户端触发       | --> watcher.process(event)
|   process() 方法  |
+------------------+
         |
         v
+------------------+
|  Watcher 失效     | --> 必须重新注册才能继续监听
+------------------+

Watcher 是一次性触发,触发一次后自动失效,若需要持续监听,需要重新注册,Watcher 本身不携带数据,只通知 "发生了什么"

由于 Watcher 是一次性的,要实现持续监听,必须在每次触发后 重新注册,可以使用 Curator Framework,提供了 Cached 机制(如 PathChildrenCache、NodeCache),能够自动管理 Watcher 重注册

接下来,我们来看如何进行加锁

加锁

加锁流程:

1. 判断目标节点是否存在

2. 创建临时顺序节点

3. 获取目标节点(/lock) 下的所有节点

4. 判断当前节点是否是最小节点

        是 --> 获取到锁

        否 --> 监听前一节点 

public class DistributedLock {
    // zookeeper 服务器地址
    private static final String CONNECT_STRING = "47.112.48.236:2182";
    // 会话超时时间(毫秒)
    private static final int SESSION_TIMEOUT = 300000;
    private ZooKeeper zooKeeper;
    // 加锁节点
    private String lockPath;
    // 等待连接建立
    private CountDownLatch connectLatch = new CountDownLatch(1);
    // 创建的临时顺序节点
    private String curNode;
    // 顺序节点前缀
    private static final String CREATE_NODE_PREFIX = "seq-";
    // 监听节点
    private String waitNode;
    private CountDownLatch waitLatch = new CountDownLatch(1);

    /**
     * 建立 zookeeper 连接
     * @throws InterruptedException
     * @throws IOException
     */
    public void connect() throws InterruptedException, IOException {
        zooKeeper = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    // 连接成功,释放阻塞
                    connectLatch.countDown();
                }
                // 若前一节点删除, 获取到锁
                if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitNode)) {
                    waitLatch.countDown();
                }
            }
        });
        // 等待连接建立
        connectLatch.await();
        System.out.println("zookeeper 连接成功");
    }

    /**
     * 加锁
     * @throws InterruptedException
     * @throws KeeperException
     */
    public void zkLock() throws InterruptedException, KeeperException {
        // 目标节点是否存在
        if (zooKeeper.exists(lockPath, null) == null) {
            throw new RuntimeException("目标节点: " + lockPath + " 不存在!");
        }
        /**
         * 创建临时顺序节点
         * String path: 创建节点路径
         * byte[] data: 节点数据
         * List<ACL> acl: 访问控制列表(Access Control List),定义谁可以访问该节点
         * CreateMode createMode: 创建节点类型
         */
        curNode = zooKeeper.create(lockPath + "/" + CREATE_NODE_PREFIX, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        // 获取 目标节点 下的节点
        List<String> children = zooKeeper.getChildren(lockPath, null);
        // 计算当前节点位置
        Collections.sort(children);
        String thisNode = curNode.substring((lockPath + "/").length());
        int curIndex = children.indexOf(thisNode);
        if (curIndex == 0) {
            // 获取到锁
             System.out.println(curNode + "获取到锁...");
        } else if (curIndex > 0) {
            // 监听前一节点
            waitNode = lockPath + "/" + children.get(curIndex - 1);
            Stat stat = zooKeeper.exists(waitNode, true);
            // 前一节点删除, 获取锁
            if (stat == null) {
                System.out.println("节点" + waitNode +"删除, 获取到锁...");
            } else {
                // 等待前一节点释放锁
                waitLatch.await();
            }
        } else {
            throw new RuntimeException("节点获取失败! children: " + children + " node: " + curNode);
        }
    }
}

若目标节点存在,我们就可以在 目标节点 下创建临时顺序节点 curNode

然后获取目标节点下的所有子节点,并进行排序

获取 curNode 的次序:

若为第一个节点(index = 0),则直接获取到锁;

若为后续节点(index > 0),则监听前一节点,等待前一节点删除后获取锁

若次序获取失败(index < 0),则表明可能出现节点创建失败等异常情况

在这里,我们使用 CountDownLatch 来等待前一节点删除

zooKeeper.exists(waitNode, true):设置 watch true,表示我们使用创建 zookeeper 时注册的 Watcher,因此,我们需要在 process 中添加对应的处理逻辑,也就是当 waitNode 删除时,将 CountDownLatch 的计数 -1

接下来,我们来继续看如何解锁

解锁

解锁,也就是将当前节点释放:

    public void zkUnlock() throws InterruptedException, KeeperException {
        zooKeeper.delete(curNode, -1);
        System.out.println(curNode + " 删除, 释放锁...");
    }

完整代码:

public class DistributedLock {
    // zookeeper 服务器地址
    private static final String CONNECT_STRING = "47.112.48.236:2182";
    // 会话超时时间(毫秒)
    private static final int SESSION_TIMEOUT = 300000;
    private ZooKeeper zooKeeper;
    // 加锁节点
    private String lockPath;
    // 等待连接建立
    private CountDownLatch connectLatch = new CountDownLatch(1);
    // 创建的临时顺序节点
    private String curNode;
    // 顺序节点前缀
    private static final String CREATE_NODE_PREFIX = "seq-";
    // 监听节点
    private String waitNode;
    private CountDownLatch waitLatch = new CountDownLatch(1);

    public DistributedLock(String lockPath) {
        this.lockPath = lockPath;
    }

    /**
     * 建立 zookeeper 连接
     * @throws InterruptedException
     * @throws IOException
     */
    public void connect() throws InterruptedException, IOException {
        zooKeeper = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    // 连接成功,释放阻塞
                    connectLatch.countDown();
                }
                // 若前一节点删除, 获取到锁
                if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitNode)) {
                    System.out.println("节点" + waitNode +"删除, 获取到锁...");
                    waitLatch.countDown();
                }
            }
        });
        // 等待连接建立
        connectLatch.await();
        System.out.println("zookeeper 连接成功");
    }

    /**
     * 加锁
     * @throws InterruptedException
     * @throws KeeperException
     */
    public void zkLock() throws InterruptedException, KeeperException {
        // 目标节点是否存在
        if (zooKeeper.exists(lockPath, null) == null) {
            throw new RuntimeException("目标节点: " + lockPath + " 不存在!");
        }
        /**
         * 创建临时顺序节点
         * String path: 创建节点路径
         * byte[] data: 节点数据
         * List<ACL> acl: 访问控制列表(Access Control List),定义谁可以访问该节点
         * CreateMode createMode: 创建节点类型
         */
        curNode = zooKeeper.create(lockPath + "/" + CREATE_NODE_PREFIX, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        // 获取 目标节点 下的节点
        List<String> children = zooKeeper.getChildren(lockPath, null);
        // 计算当前节点位置
        Collections.sort(children);
        String thisNode = curNode.substring((lockPath + "/").length());
        int curIndex = children.indexOf(thisNode);
        if (curIndex == 0) {
            // 获取到锁
            System.out.println(curNode + "获取到锁...");
        } else if (curIndex > 0) {
            // 监听前一节点
            waitNode = lockPath + "/" + children.get(curIndex - 1);
            Stat stat = zooKeeper.exists(waitNode, true);
            // 前一节点删除, 获取锁
            if (stat == null) {
                System.out.println("节点" + waitNode +"删除, 获取到锁...");
            } else {
                // 等待前一节点释放锁
                waitLatch.await();
            }
        } else {
            throw new RuntimeException("节点获取失败! children: " + children + " node: " + curNode);
        }
    }

    public void zkUnlock() throws InterruptedException, KeeperException {
        zooKeeper.delete(curNode, -1);
        System.out.println(curNode + " 删除, 释放锁...");
    }
}

接下来,我们对其进行测试

测试

public class DistributedLockTest {
    public static void main(String[] args) {
        // 记得在 zookeeper 上创建对应节点
        String path = "/lock";
        final DistributedLock lock1 = new DistributedLock(path);
        final DistributedLock lock2 = new DistributedLock(path);
        new Thread(() -> {
            try {
                lock1.connect();
                lock1.zkLock();
                System.out.println("线程" + Thread.currentThread().getName() + " 加锁...");
                Thread.sleep(5 * 1000);
                System.out.println("线程" + Thread.currentThread().getName() + " 释放锁...");
                lock1.zkUnlock();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (KeeperException e) {
                throw new RuntimeException(e);
            }
        }).start();
        new Thread(() -> {
            try {
                lock2.connect();
                lock2.zkLock();
                System.out.println("线程" + Thread.currentThread().getName() + " 加锁...");
                Thread.sleep(5 * 1000);
                System.out.println("线程" + Thread.currentThread().getName() + " 释放锁...");
                lock2.zkUnlock();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (KeeperException e) {
                throw new RuntimeException(e);
            }
        }).start();
    }
}

运行,并观察结果:

结果符合预期,说明上述代码实现了基本的加锁和解锁功能

上述实现的 分布式锁 只是基于 zookeeper  临时顺序节点 Watcher 监听前驱节点 的基本实现,但在实际生产环境中,可能会遇到多种异常情况,如:

1. 会话超时或网络中断,zookeeper 服务端会认为客户端已“死亡”,并自动删除其创建的 EPHEMERAL 节点,但客户端可能未及时感知,仍认为自己持有锁

2. 未建立连接就调用 zkLock(),可能忘记调用 connect() 就直接 zkLock(),此时 zooKeeper == null,抛出 NullPointerException

3. zkLock() 未被重复调用,即 不支持可重入,若同一个线程重复调用 zkLock(),会创建多个临时节点,导致死锁或异常

4. 调用 zkUnlock() 之前由于网络中断等原因,节点已被删除,此时再次尝试删除,抛出 NoNodeException

......

在下一篇文章中,我们就来看 Curator 中的 InterProcessMutex 是如何实现分布式锁的,以及具体是如何支持可重入、进行重试等...


网站公告

今日签到

点亮在社区的每一天
去签到