【Zookeeper】两种基于原生zk客户端的分布式锁的实现

发布于:2024-06-26 ⋅ 阅读:(159) ⋅ 点赞:(0)

基于zk的分布式锁的实现主要依赖zk节点的原子性,可以基于原生zk来自己实现分布式锁,更多的是基于Curator这个框架来直接使用基于zk的分布式锁[1]。这里我们仅仅讨论基于原生zk客户端依赖自己实现的zk分布式锁。

原生zk客户端中的一些调用如getChildren方法,可以是同步返回,也可以通过实现AsyncCallback的内部接口来重写异步回调处理逻辑。这里我们举例同步和异步两种方式的实现。

同步实现[1],这篇文章中缺少了关于"Watcher关注的前面节点状态改变后CountDown"的逻辑,即缺少了Watcher的回调。这里我补上了回调并做了一些调整,代码如下:

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class SyncZkLock implements Watcher {
    // zookeeper server 列表
    private String connectString =
            "192.168.1.128:2181,192.168.1.129:2181,192.168.1.130:2181";
    // 超时时间
    private int sessionTimeout = 2000;
    private ZooKeeper zk;
    private String rootNode = "locks";
    private String subNode = "seq-";
    // 当前 client 等待的子节点
    private String waitPath;
    // ZooKeeper 连接等待
    private CountDownLatch connectLatch = new CountDownLatch(1);
    // ZooKeeper 节点等待
    private CountDownLatch waitLatch = new CountDownLatch(1);
    // 当前 client 创建的子节点
    private String currentNode;

    // 和 zk 服务建立连接,并创建根节点
    public SyncZkLock() throws IOException, InterruptedException, KeeperException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }
                // 发生了 waitPath 的删除事件
                if (event.getType() == Event.EventType.NodeDeleted &&
                        event.getPath().equals(waitPath)) {
                    waitLatch.countDown();
                }
            }
        });

        // 等待连接建立
        connectLatch.await();
        //获取根节点状态
        Stat stat = zk.exists("/" + rootNode, false);
        //如果根节点不存在,则创建根节点,根节点类型为永久节点
        if (stat == null) {
            System.out.println("根节点不存在");
            zk.create("/" + rootNode, new byte[0],
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    // 加锁方法
    public void zkLock() {
        try {
            //在根节点下创建临时顺序节点,返回值为创建的节点路径
            currentNode = zk.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            checkAndLockOrAwait(false);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    // 解锁方法
    public void zkUnlock() {
        try {
            zk.delete(this.currentNode, -1);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }

    //watch被触发
    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                checkAndLockOrAwait(true);
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
        }
    }

    //查看当前节点状态,Lock结束或者添加Watcher并等待
    private void checkAndLockOrAwait(boolean flag) {
        try {
            // 注意, 没有必要监听"/locks"的子节点的变化情况
            List<String> childrenNodes = zk.getChildren("/" + rootNode, false);
            // 列表中只有一个子节点, 那肯定就是 currentNode , 说明client 获得锁
            if (childrenNodes.size() == 1) {
                return;
            } else {
                //对根节点下的所有临时顺序节点进行从小到大排序
                Collections.sort(childrenNodes);
                //当前节点名称
                String thisNode = currentNode.substring(("/" + rootNode + "/").length());
                //获取当前节点的位置
                int index = childrenNodes.indexOf(thisNode);
                if (index == -1) {
                    System.out.println("数据异常");
                } else if (index == 0) {
                    //刚创建时flag为false,不需要countDown。
                    //watch触发时flag为true,需要countDown。
                    if (flag){
                        waitLatch.countDown();
                    }
                    // index == 0, 说明 thisNode 在列表中最小, 当前client 获得锁
                    return;
                } else {
                    // 获得排名比 currentNode 前 1 位的节点
                    this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
                    // 在 waitPath 上注册监听器, 当 waitPath 被删除时, zookeeper 会回调监听器的 process 方法
                    zk.getData(waitPath, true, new Stat());
                    //进入等待锁状态
                    waitLatch.await();
                    return;
                }
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}

异步的实现,代码如下:

public class AsyncZkLock implements Watcher, AsyncCallback.StringCallback ,AsyncCallback.Children2Callback ,AsyncCallback.StatCallback {

    private ZooKeeper zk ;
    private String threadName;
    private CountDownLatch cc = new CountDownLatch(1);
    private String pathName;
    private final String ctx = "zk_lock";

    public String getPathName() {
        return pathName;
    }

    public void setPathName(String pathName) {
        this.pathName = pathName;
    }

    public String getThreadName() {
        return threadName;
    }

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public ZooKeeper getZk() {
        return zk;
    }

    public void setZk(ZooKeeper zk) {
        this.zk = zk;
    }

    public void tryLock(){
        try {

            System.out.println(threadName + "  create....");
            zk.create("/lock",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this, ctx);
            cc.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void unLock(){
        try {
            zk.delete(pathName,-1);
            System.out.println(threadName + " over work....");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    //给前一个节点加的Watcher被触发的回调
    @Override
    public void process(WatchedEvent event) {

        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                //这个getChildren是个异步方法,通过重写AsyncCallback.Children2Callback的processResult方法,处理回调
                zk.getChildren("/",false,this ,ctx);
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
        }

    }

    //string callback
    //zk.create方法的异步回调
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        if(name != null ){
            System.out.println(threadName  +"  create node : " +  name );
            pathName =  name ;
            zk.getChildren("/",false,this , ctx);
        }
    }

    //getChildren  call back
    //zk.getChildren方法的异步回调
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {

        Collections.sort(children);
        int i = children.indexOf(pathName.substring(1));

        //是不是第一个
        if(i == 0){
            //yes
            System.out.println(threadName +" i am first....");
            try {
                zk.setData("/",threadName.getBytes(),-1);
                cc.countDown();

            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }else{
            //no
            //监控前面节点,创建监控前面节点的Watcher
            zk.exists("/"+children.get(i-1),this,this, ctx);
        }
    }

    //statCallback
    //zk.exists的异步回调
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        //这里默认添加watch成功,没有做失败的处理。
        //假设有依次A B C D E ,C取消了,D接受到回调,取到了children列表 A B D E,
        //但是此时B也取消了,而D此时给前面节点B添加watch,会出现问题,
        //因此这里如果添加失败,应该重新获取children列表,
        // 依靠getChildren的回调逻辑:如果是第一个就结束,不是第一个,就找到前一个节点并给前一个添加监控
        //来重新添加watch
    }
}

参考文章:
[1],Zookeeper + Curator实现分布式锁


网站公告

今日签到

点亮在社区的每一天
去签到