目录
接上篇深入解析ZooKeeper核心机制与应用(上)主要介绍了原理部分,本篇更注重应用部分
Curator
官网介绍 https://curator.apache.org/
Curator是Netflix公司开源的一套Zookeeper客户端框架,Curator是对Zookeeper支持最好的客户端框架。Curator封装了大部分Zookeeper的功能,比如:Leader选举、分布式锁等等,极大的减轻了开发者在使用Zookeeper时的底层细节开发工作。
引入Curator的Maven依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
配置文件
@Data
@Component
@ConfigurationProperties(prefix = "curator")
public class ZkProperties {
// 重试次数
private int retryCount;
// 重试的间隔时间(单位:毫秒)
private int sleepBetweenRetries;
// zk连接地址(多个zk的时候,用逗号分割)
private String connect;
// 会话超时时间(单位:毫秒)
private int sessionTimout;
// 连接超时时间(单位:毫秒)
private int connectionTimeout;
}
@Configuration
public class CuratorConfig {
@Resource
private ZkProperties zkProperties;
@Bean(initMethod = "start")
public CuratorFramework curatorFramework() {
RetryPolicy retryPolicy = new RetryNTimes(zkProperties.getRetryCount(), zkProperties.getSleepBetweenRetries());
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(
zkProperties.getConnect(),
zkProperties.getSessionTimout(),
zkProperties.getConnectionTimeout(), retryPolicy);
return curatorFramework;
}
}
项目示例演示
@Slf4j
@SpringBootTest
class ZookeeperDemoApplicationTests {
private final static String NODE_NAME = "/curator-node";
private final static String EPHEMERAL_NODE_NAME = "/curator-ephemeral-node-";
private final static String PARENT_NODE_NAME = "/animal/dog/whiteDog";
private final static String WATCH_NODE_NAME = "/curator-watch-node";
private final static byte[] VALUE_BYTES = "muse".getBytes();
private final static byte[] NEW_VALUE_BYTES = "muse-new".getBytes();
private final static Gson GSON = new Gson();
@Resource
private CuratorFramework curatorFramework;
/**
* 创建永久节点/curator-node,并存储值“muse”
*/
@Test
void createNode() throws Throwable {
String path = curatorFramework.create().forPath(NODE_NAME, VALUE_BYTES);
log.info("createNode success! path={}", path);
}
/**
* 创建临时序号节点 /curator-ephemeral-node-[序号]
*/
@Test
@SneakyThrows
void createEphemeralSeqNode() {
String path = curatorFramework.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).
forPath(EPHEMERAL_NODE_NAME, VALUE_BYTES);
log.info("createEphemeralSeqNode success! path={}", path);
Thread.sleep(20000); // 线程睡眠20秒钟,这个时间可以查询到临时节点,方法执行完毕,临时节点就不存在了
}
/**
* 如果父节点不存在,则连带着创建父类节点 /animal/dog/whiteDog
*/
@Test
@SneakyThrows
void createWithParent() {
String path = curatorFramework.create().creatingParentsIfNeeded().forPath(PARENT_NODE_NAME, VALUE_BYTES);
log.info("createWithParent success! path={}", path);
}
/**
* 获取节点/curator-node上存储的值
*/
@Test
@SneakyThrows
void getData() {
byte[] valueByte = curatorFramework.getData().forPath(NODE_NAME);
log.info("getData success! valueByte={}", new String(valueByte));
}
/**
* 修改节点/curator-node的值为“muse-new”
*/
@Test
@SneakyThrows
void setData() {
curatorFramework.setData().forPath(NODE_NAME, NEW_VALUE_BYTES);
}
/**
* 删除节点/curator-node及其包含的子节点
*/
@Test
@SneakyThrows
void deleteNode() {
curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(NODE_NAME);
}
}
ZK的读写锁设计
读锁概述
读锁(Read Lock)并发的时候,多个线程都可以去执行读操作,彼此不会阻塞。
加读锁成功的前提是:没有对其待访问的资源加写锁。
写锁(Write Lock)并发时如果多个线程都要去获得写锁,那么只有一条线程可以获得写锁,彼此会发生阻塞。
加写锁成功的前提是:没有对其待访问的资源加任和锁(无论是写锁or读锁)。
如何实现读写锁?
首先,在/lock路径下创建临时序号节点/lock/WRITE- 或 /lock/READ-,该节点就代表将要获取的Write/Read锁节点。其次:获取/lock下的子节点,并按照临时节点的顺序号排序。最后:检查此Read/Write锁之前是否有Write锁,若有,则先注册对该Write锁前一个锁的监听,然后阻塞该Read/Write锁的获取。若监听到该Read/Write锁前一个Write锁已释放,则打开阻塞,继续执行。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
/**
* 读锁(需要引入curator-recipes依赖)
*/
@Test
@SneakyThrows
void getReadLock() {
InterProcessReadWriteLock rwlock = new InterProcessReadWriteLock(curatorFramework, "/read-lock");
InterProcessMutex readLock = rwlock.readLock(); /** 获得读锁实例对象 */
for (int i = 0; i< 10 ; i++) {
new Thread(()-> {
String threadName = Thread.currentThread().getName();
try {
readLock.acquire(); // 获取读锁
log.info("线程={}:等待获取Read锁成功!开始执行业务代码...", threadName);
Thread.sleep(1000);
} catch (Throwable e) {
e.printStackTrace();
} finally {
try {
readLock.release(); // 释放锁
} catch (Throwable e) {
e.printStackTrace();
}
}
}).start();
}
Thread.sleep(2000);
}
/**
* 写锁(需要引入curator-recipes依赖)
*/
@Test
@SneakyThrows
void getWriteLock() {
InterProcessReadWriteLock rwlock = new InterProcessReadWriteLock(curatorFramework, "/write-lock");
InterProcessMutex writeLock = rwlock.writeLock(); /** 获得写锁实例对象 */
for (int i = 0; i< 5 ; i++) {
new Thread(()-> {
String threadName = Thread.currentThread().getName();
try {
writeLock.acquire(); // 获取写锁
log.info("线程={}:等待获取Write锁成功!开始执行业务代码...", threadName);
Thread.sleep(1000);
} catch (Throwable e) {
e.printStackTrace();
} finally {
try {
writeLock.release(); // 释放锁
} catch (Throwable e) {
e.printStackTrace();
}
}
}).start();
}
Thread.sleep(6000);
}
/**
* 监听/curator-watch-node节点
*/
@Test
@SneakyThrows
void watch() {
// 如果不存在节点,则创建
if (null == curatorFramework.checkExists().forPath(WATCH_NODE_NAME)) {
curatorFramework.create().forPath(WATCH_NODE_NAME, VALUE_BYTES);
}
CuratorCache curatorCache = CuratorCache.builder(curatorFramework, WATCH_NODE_NAME).build();
CuratorCacheListener listener = CuratorCacheListener.builder().forNodeCache(
() -> log.info("-----forNodeCache-----{} node is changed!", WATCH_NODE_NAME)).forAll(
(type, oldData, data) -> log.info("-----forAll-----{} node is changed!type={} oldDate={} date={}",
WATCH_NODE_NAME, GSON.toJson(type), GSON.toJson(oldData), GSON.toJson(data))).build();
curatorCache.listenable().addListener(listener);
curatorCache.start();
System.in.read();
}
Watch机制
概述
我们可以把Watch理解成是注册在指定Znode上的触发器。
当被Watch的这个Znode发生了变化(即:create、delete、setData方法)时,将会触发Znode上注册的对应监听事件,请求Watch的客户端会接收到异步回调通知。
客户端使用了NIO通信模式监听服务的调用。
监听节点内容的变化,我们可以使用get -w [节点]
监听节点目录的变化,我们可以使用ls -w [节点]
监听所有级别子目录变化,我么可以使用ls -w -R [节点]
基于Curator使用WatchZookeeperDemoApplicationTests类的watch()方法
ZK集群
集群搭建
步骤一:创建四个数据存储目录
步骤二:分别在zk1~zk4目录中创建myid文件
步骤三:创建四个zoo.cfg配置文件,分别为zoo1.conf,zoo2.conf,zoo3.conf,zoo4.conf
步骤四:启动四台ZooKeeper
步骤五:查看四台ZooKeeper的角色
步骤六:连接集群./zkCli.sh -server 127.0.0.1:2181127.0.0.1:2182 127.0.0.1:2183 127.0.0.1:2184
Zookeeper集群的leader选举
ZooKeeper作为非常重要的分布式协调组件,需要进行集群部署,集群中会以一主多从的形式进行部署。为了保证数据的一致性,使用了ZAB(ZooKeeper Atomic Broadcase)协议,这个协议解决了ZooKeeper的崩溃恢复和主从数据同步的问题。
ZAB协议定义了如下四种节点状态
Leader选举流程
注释:基于 FastLeaderElection 算法(Zookeeper 默认选举算法)
选举触发
- 初始启动:当 Zookeeper 集群中各服务器首次启动时,所有服务器都不知道谁是 leader,此时会触发 leader 选举流程。
- leader 故障:在集群运行过程中,如果 follower 节点在一定时间内没有收到 leader 的心跳(通过心跳机制检测,默认 tickTime 时间间隔内未收到心跳视为 leader 故障),则认为 leader 出现故障,follower 节点会发起新一轮的 leader 选举。
第一轮选举投票
- 初始化投票:
- 每个服务器启动后,都会初始化自己的投票信息,将自己推举为 leader,投票信息包含服务器的 myid(唯一标识,在配置文件中指定)、zxid(事务 ID,反映服务器上数据的最新状态,越大表示数据越新)以及 epoch(选举轮次,初始值为 0,每次选举都会递增)。例如,服务器 S1 的 myid 为 1,zxid 为 100,它会向集群中的其他服务器发送投票 (1, 100, 0)。
- 接收与比较投票:
- 服务器在接收到其他服务器的投票后,会按照一定规则进行比较。首先比较 epoch,epoch 大的投票优先;若 epoch 相同,则比较 zxid,zxid 大的优先;若 zxid 也相同,再比较 myid,myid 大的优先。
- 假设服务器 S1 接收到服务器 S2 的投票 (2, 105, 0),由于 S2 的 zxid 大于 S1,S1 会更新自己的投票,将票改投给 S2,即把自己的投票信息更新为 (2, 105, 0)。
- 统计投票:
- 每台服务器都会统计自己收到的投票信息,判断是否有超过半数的服务器投给了某个节点。例如,一个由 5 台服务器组成的集群,当一台服务器收到至少 3 张((5 / 2) + 1 = 3)指向同一节点的有效投票时,就初步认为该节点可能成为 leader。
- 第一轮选举结果:
- 在第一轮选举中,可能出现两种情况。一种是有某个节点获得超过半数的投票,初步当选为 leader;另一种是没有节点获得超过半数的投票,此时所有服务器会进入下一轮选举。
第二轮选举投票
- 递增 epoch:
- 如果第一轮选举没有选出 leader,所有服务器会将自己的 epoch 值加 1,进入下一轮选举。例如,第一轮选举的 epoch 为 0,第二轮选举时,所有服务器的 epoch 变为 1。
- 重新投票:
- 服务器基于新的 epoch 值重新封装自己的投票信息,并向集群中其他服务器发送。此时,服务器会根据当前的状态和接收到的其他服务器信息,重新确定自己的投票对象。例如,服务器 S1 在第二轮选举中,会综合第一轮选举后获取的新信息,可能会改变自己的投票选择,重新向其他服务器发送新的投票信息。
- 重复比较与统计:
- 后续服务器接收投票、比较投票以及统计投票的过程与第一轮类似。各服务器按照 epoch、zxid、myid 的顺序依次比较接收到的投票信息,更新自己的投票,并统计投票结果。
- 在每一轮选举中,只要有节点获得超过半数的投票,就会当选为 leader。若仍然没有节点获得超过半数的投票,则继续递增 epoch,进行下一轮选举,直到选出 leader 为止。
确定 leader后
- 当某个节点在某一轮选举中获得超过半数的投票时,该节点就正式当选为 leader。例如,服务器 S3 在第二轮选举中获得了 3 台服务器的投票,它就成为了 leader。
Leader选举出来之后,会周期性不断的向Follower发送心跳 (ping命令没有内容的socket)。
当Leader崩溃后,Follower发现socket通道已经关闭,那么Follower就会从Following状态进入到Looking状态,然后重新开始进行Leader的选举,在Leader选举的这个过程中,zk集群不能堆外提供服务。
zk集群同步
- leader 当选后,会向其他服务器发送通知,告知自己成为 leader。其他服务器收到通知后,更新自己的状态,确认 leader,并开始与 leader 进行数据同步。通过 ZAB 协议的原子广播机制,leader 将最新的数据状态同步给 follower,使整个集群的数据状态达成一致,从而保证集群的一致性和可用性。
如果客户端连接了Leader节点,则直接将数据写入到主节点;如果客户端连接到了Follower节点,那么Follower节点会将数据转发给Leader节点,Leader节点再将数据写入到本节点中。