Zookeeper实现分布式锁实战应用示例
1. 分布式锁概述
在分布式系统中,当多个进程或服务需要互斥地访问共享资源时,就需要分布式锁来协调。Zookeeper因其强一致性和临时节点特性,非常适合实现分布式锁。
2. Zookeeper实现分布式锁的核心原理
- 临时顺序节点:创建的节点是临时的,客户端断开连接后自动删除
- 节点顺序性:Zookeeper会为节点名称添加递增序号
- 最小节点获取锁:所有客户端监听比自己序号小的节点,序号最小的获取锁
- 监听机制:通过Watcher机制实现锁释放通知
3. 实战代码示例(Java)
3.1 引入依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
</dependency>
3.2 分布式锁实现
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class ZkDistributedLock implements Watcher {
private ZooKeeper zk;
private String lockPath;
private String currentLock;
private String waitPath;
private CountDownLatch latch;
private CountDownLatch connectedLatch = new CountDownLatch(1);
private static final String LOCK_ROOT = "/locks";
private static final int SESSION_TIMEOUT = 30000;
public ZkDistributedLock(String zkServers) throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper(zkServers, SESSION_TIMEOUT, this);
connectedLatch.await();
// 确保根节点存在
Stat stat = zk.exists(LOCK_ROOT, false);
if (stat == null) {
zk.create(LOCK_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
connectedLatch.countDown();
}
if (this.latch != null && event.getType() == Event.EventType.NodeDeleted) {
this.latch.countDown();
}
}
public boolean tryLock() throws KeeperException, InterruptedException {
// 创建临时顺序节点
currentLock = zk.create(LOCK_ROOT + "/lock_", new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 获取所有子节点
List<String> children = zk.getChildren(LOCK_ROOT, false);
Collections.sort(children);
// 当前节点是最小节点,则获取锁
if (currentLock.equals(LOCK_ROOT + "/" + children.get(0))) {
return true;
}
// 不是最小节点,找到前一个节点
int currentIndex = Collections.binarySearch(children,
currentLock.substring(LOCK_ROOT.length() + 1));
waitPath = LOCK_ROOT + "/" + children.get(currentIndex - 1);
// 监听前一个节点
Stat stat = zk.exists(waitPath, true);
if (stat != null) {
this.latch = new CountDownLatch(1);
this.latch.await();
this.latch = null;
}
return true;
}
public boolean tryLock(long timeout, TimeUnit unit) throws Exception {
long start = System.currentTimeMillis();
long end = start + unit.toMillis(timeout);
while (System.currentTimeMillis() < end) {
if (tryLock()) {
return true;
}
Thread.sleep(100);
}
return false;
}
public void unlock() throws KeeperException, InterruptedException {
zk.delete(currentLock, -1);
currentLock = null;
if (zk != null) {
zk.close();
}
}
}
3.3 使用示例
public class DistributedLockExample {
public static void main(String[] args) {
String zkServers = "localhost:2181";
String resourceKey = "order_123";
for (int i = 0; i < 5; i++) {
new Thread(() -> {
ZkDistributedLock lock = null;
try {
lock = new ZkDistributedLock(zkServers);
System.out.println(Thread.currentThread().getName() + " 尝试获取锁");
if (lock.tryLock(5, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread().getName() + " 获取锁成功");
// 模拟业务处理
Thread.sleep(2000);
} else {
System.out.println(Thread.currentThread().getName() + " 获取锁超时");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lock != null) {
try {
lock.unlock();
System.out.println(Thread.currentThread().getName() + " 释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}, "Thread-" + i).start();
}
}
}
4. 实际应用场景
4.1 电商秒杀系统
public class SeckillService {
private ZkDistributedLock lock;
public SeckillService(String zkServers) throws Exception {
this.lock = new ZkDistributedLock(zkServers);
}
public boolean seckill(String productId, String userId) {
try {
if (lock.tryLock(3, TimeUnit.SECONDS)) {
// 1. 查询库存
int stock = getStockFromDB(productId);
if (stock <= 0) {
return false;
}
// 2. 扣减库存
reduceStock(productId);
// 3. 创建订单
createOrder(productId, userId);
return true;
}
return false;
} catch (Exception e) {
e.printStackTrace();
return false;
} finally {
try {
lock.unlock();
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 其他方法...
}
4.2 分布式定时任务
public class DistributedScheduler {
private ZkDistributedLock lock;
public DistributedScheduler(String zkServers) throws Exception {
this.lock = new ZkDistributedLock(zkServers);
}
public void scheduleTask() {
try {
if (lock.tryLock(0, TimeUnit.SECONDS)) {
// 只有获取锁的节点执行任务
executeTask();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.unlock();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void executeTask() {
// 执行定时任务逻辑
System.out.println("执行定时任务: " + new Date());
}
}
5. 优化与注意事项
- 锁重入问题:如果需要支持同一线程重入,需要额外记录线程信息
- 锁等待队列:公平锁实现需要考虑等待队列的顺序
- 超时处理:合理设置锁获取超时时间,避免死锁
- 连接恢复:处理Zookeeper连接断开后的重连机制
- 锁释放:确保锁最终能被释放,避免死锁
- 性能考虑:高频锁操作场景下,Zookeeper可能成为性能瓶颈
6. 其他实现方案对比
Curator框架:Apache Curator提供了更高级的分布式锁实现
InterProcessMutex lock = new InterProcessMutex(client, "/lock_path"); lock.acquire(); try { // 业务逻辑 } finally { lock.release(); }
Redis分布式锁:基于SETNX命令实现,性能更高但一致性较弱
数据库分布式锁:基于唯一索引或乐观锁实现,简单但性能较差
Zookeeper分布式锁适合对一致性要求高的场景,而Redis分布式锁适合高性能但对一致性要求相对宽松的场景。