文章目录
本章代码已分享至Gitee: https://gitee.com/lengcz/curator01
zookeeper 的安装
关于zookeeper的安装请见: dubbo(2):zookeeper和dubbo-admin的安装
Curator 介绍
Curator 是Apache Zookeeper 的java 客户端库
常见的Zookeeper Java API
- 原生Java API
- ZkClient
- Curator
Curator 项目的目标是简化Zookeeper 客户端的使用。
Curator 最初是Netfix 研发的,后来捐献给了Apache基金会,目前属于 Apache顶级项目。
Curator 官网: https://curator.apache.org/
Curator API 常用操作
- 建立连接
- 添加节点
- 删除节点
- 修改节点
- 查询节点
- Watch事件监听
- 分布式锁实现
本章必要的相关依赖和配置
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.57</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
建立连接
第一种方式
//1 第一种方式
/*
* String connectString, 连接字符串 zk server 地址和端口 ip1:port1,ip2:port2....
* int sessionTimeoutMs, 会话超时时间 单位ms
* int connectionTimeoutMs, 连接超时时间,单位ms
* RetryPolicy retryPolicy 重拾策略
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
String connectString = "localhost:2181";
CuratorFramework client =
CuratorFrameworkFactory.newClient(connectString, 60_000, 15_000, retryPolicy);
client.start();
第二种方式
//2 第二种方式,链式编程
CuratorFramework client2 = CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(60_000)
.connectionTimeoutMs(15_000)
.retryPolicy(retryPolicy).namespace("demo01").build();
client2.start();
namespace表示根节点,表示这个客户端的操作都在这个节点之下,后续使用中不需要从根开始声明。
- 节点不需要手动创建,连接是会自动创建
关闭连接
client.close();
添加节点
创建节点
@Test
public void testCreate() throws Exception {
//如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
String path = client.create().forPath("/app1");
logger.info(path);
}
创建节点并设置值和类型
@Test
public void testCreate() throws Exception {
//如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
// String path = client.create().forPath("/app1");
// logger.info(path);
// withMode 指定节点类型,是临时的,还是临时的,顺序的
String path2 = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3","hello world".getBytes()); //创建节点并设置值
logger.info(path2);
}
withMode :模式。EPHEMERAL 临时的,当会话关闭时,节点会被删除。
创建多级节点
// creatingParentContainersIfNeeded 如果节点不存在则创建
String path3 = client.create().creatingParentContainersIfNeeded().forPath("/app4/p1"); //创建多级节点
logger.info(path3);
- creatingParentContainersIfNeeded 如果节点不存在则创建
查询节点
查询数据
byte[] bytes = client.getData().forPath("/app1");
logger.info(new String(bytes));
查询所有子节点
//获取子节点
List<String> paths = client.getChildren().forPath("/app4");
paths.forEach(c ->
logger.info(c));
List<String> paths2 = client.getChildren().forPath("/");//查询根节点
查询节点信息
@Test
public void testGetNodeInfo() throws Exception {
// 查询节点状态信息 ls -s
Stat status = new Stat();
client.getData().storingStatIn(status).forPath("/app1");
logger.info(JSONObject.toJSONString(status));
}
修改节点
@Test
public void testSet() throws Exception {
// 修改数据
client.setData().forPath("/app1","hello".getBytes());
}
修改节点(乐观锁修改,根据版本号)
@Test
public void testSetForVersion() throws Exception {
Stat status = new Stat();
client.getData().storingStatIn(status).forPath("/app1");
int version = status.getVersion();//查询版本
logger.info("版本号:"+version);
// 修改数据,如果版本不一致,则不修改数据,乐观锁模式
client.setData().withVersion(version).forPath("/app1","hello".getBytes());
}
图片表示版本号
删除节点
删除节点
/**
* 1 删除节点 delete
* 2 删除节点带有子节点的节点 deleteall
* 3 必须成功的删除
* 4 回调
* @throws Exception
*/
@Test
public void testDelete() throws Exception {
// 删除节点d
client.delete().forPath("/app1");
}
删除带有子节点的节点
@Test
public void testDelete2() throws Exception {
// 删除节点带有子节点的节点
client.delete().deletingChildrenIfNeeded().forPath("/app4");
}
必须删除成功
@Test
public void testDelete3() throws Exception {
// 必须删除成功
client.delete().guaranteed().forPath("/app4");
}
必须成功就是重试,防止网络抖动
删除后回调
@Test
public void testCallback() throws Exception{
// 回调
client.delete().guaranteed().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
logger.info("删除后的消息回调");
}
}).forPath("/app4");
}