ZooKeeper系列:zk中的watch

发布于:2022-11-28 ⋅ 阅读:(195) ⋅ 点赞:(0)

Watch就是监听,观察。

其实就是客户端注册watch,然后服务端发生节点数据变化的时候会触发watch事件,接着回调客户端

创建工程和实现类

创建java 的maven工程,然后在pom中添加对应的zookeeper的maven信息,其版本需要和安装的zookeeper版本一致

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.0</version>
</dependency>
复制代码

与zookeeper集群建立连接,其中使用了三个参数:

  • connectString: 连接地址, 集群中各个节点的地址和端口,使用逗号隔开。
  • sessionTimeout: 超时时间,session的超时
  • watcher: 监听,在session中注册监听,此 watcher是session级别的

连接配置以及watch相关的使用参考下面的代码

public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ZooKeeper zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 3000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                Event.KeeperState state = watchedEvent.getState();
                System.out.println(watchedEvent.toString());
​
                switch (state) {
                    case Unknown:
                        break;
                    case Disconnected:
                        break;
                    case NoSyncConnected:
                        break;
                    case SyncConnected:
                        System.out.println("连接成功:connected");
                        countDownLatch.countDown();
                        break;
                    case AuthFailed:
                        break;
                    case ConnectedReadOnly:
                        break;
                    case SaslAuthenticated:
                        break;
                    case Expired:
                        break;
                    case Closed:
                        break;
                }
            }
        });
​
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ZooKeeper.States states = zk.getState();
​
        switch (states) {
            case CONNECTING:
                System.out.println("CONNECTING......");
                break;
            case ASSOCIATING:
                break;
            case CONNECTED:
                System.out.println("CONNECTED......");
                break;
            case CONNECTEDREADONLY:
                break;
            case CLOSED:
                break;
            case AUTH_FAILED:
                break;
            case NOT_CONNECTED:
                break;
        }
​
        zk.create("/name","纪先生".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        Stat stat = new Stat();
        zk.getData("/name", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("getData watch..."+ watchedEvent.toString());
                try {
                    zk.getData("/name",true,stat);
//                    zk.getData("/name",this,stat);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },stat);
        Stat stat1 = zk.setData("/name","无先生".getBytes(),0);
        zk.setData("/name","有先生".getBytes(),stat1.getVersion());
​
    }
复制代码

总共有两种watch:

第一类:new Zookeeper时候传入的watch,此watch是session级别的,跟path,node没关系

第二类:getData的时候传入的watch,此watch是和path,node相关的,当path发生变化的时候会触发watch。

watch的特点:

  1. watch的注册只发生在读类型的调用,如getData,exists,getChildren
  2. watch的注册是一次性的,如果想每次操作都有回调,需要每次读操作都要注册一次

例如:

注册watch除了直接new watch之外,还有下面两种简单的方式

方式一:代码中的zk.getData("/name",true,stat),其中的true参数就是代表注册进去的watch是default watch,即new zk的那个watch。

方式二:zk.getData("/name",this,stat);,这里的this代表的是父级的watch,就是它外层注册的watch。

zk.getData("/name", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("getData watch..."+ watchedEvent.toString());
                try {
                    zk.getData("/name",true,stat);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },stat);
复制代码

如果不在watch中添加一个zk.getData("/name",true,stat),那么只有在第一次setData的时候会触发设置的watch,第二次setData的时候则不会(watch 是一次性的)

为什么会使用CountDownLatch

仔细点话会发现代码中增加countDownLatch,这个是为了检测zk创建成功之后才会返回连接成功(CONNECTED),zk的启动是异步的,new之后立即获取zk的状态是CONNECTING的,无法正常使用。

我是纪先生,用输出倒逼输入而持续学习,持续分享技术系列文章,以及全网值得收藏好文,欢迎关注公众号,做一个持续成长的技术人。

 

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到