Zookeeper实现分布式锁实战应用

发布于:2025-05-01 ⋅ 阅读:(18) ⋅ 点赞:(0)

Zookeeper实现分布式锁实战应用示例

1. 分布式锁概述

在分布式系统中,当多个进程或服务需要互斥地访问共享资源时,就需要分布式锁来协调。Zookeeper因其强一致性和临时节点特性,非常适合实现分布式锁。

2. Zookeeper实现分布式锁的核心原理

  • 临时顺序节点:创建的节点是临时的,客户端断开连接后自动删除
  • 节点顺序性:Zookeeper会为节点名称添加递增序号
  • 最小节点获取锁:所有客户端监听比自己序号小的节点,序号最小的获取锁
  • 监听机制:通过Watcher机制实现锁释放通知

3. 实战代码示例(Java)

3.1 引入依赖

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.3</version>
</dependency>

3.2 分布式锁实现

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class ZkDistributedLock implements Watcher {
    private ZooKeeper zk;
    private String lockPath;
    private String currentLock;
    private String waitPath;
    private CountDownLatch latch;
    private CountDownLatch connectedLatch = new CountDownLatch(1);
    private static final String LOCK_ROOT = "/locks";
    private static final int SESSION_TIMEOUT = 30000;

    public ZkDistributedLock(String zkServers) throws IOException, InterruptedException, KeeperException {
        zk = new ZooKeeper(zkServers, SESSION_TIMEOUT, this);
        connectedLatch.await();
        // 确保根节点存在
        Stat stat = zk.exists(LOCK_ROOT, false);
        if (stat == null) {
            zk.create(LOCK_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    @Override
    public void process(WatchedEvent event) {
        if (event.getState() == Event.KeeperState.SyncConnected) {
            connectedLatch.countDown();
        }
        if (this.latch != null && event.getType() == Event.EventType.NodeDeleted) {
            this.latch.countDown();
        }
    }

    public boolean tryLock() throws KeeperException, InterruptedException {
        // 创建临时顺序节点
        currentLock = zk.create(LOCK_ROOT + "/lock_", new byte[0], 
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        
        // 获取所有子节点
        List<String> children = zk.getChildren(LOCK_ROOT, false);
        Collections.sort(children);
        
        // 当前节点是最小节点,则获取锁
        if (currentLock.equals(LOCK_ROOT + "/" + children.get(0))) {
            return true;
        }
        
        // 不是最小节点,找到前一个节点
        int currentIndex = Collections.binarySearch(children, 
                currentLock.substring(LOCK_ROOT.length() + 1));
        waitPath = LOCK_ROOT + "/" + children.get(currentIndex - 1);
        
        // 监听前一个节点
        Stat stat = zk.exists(waitPath, true);
        if (stat != null) {
            this.latch = new CountDownLatch(1);
            this.latch.await();
            this.latch = null;
        }
        return true;
    }

    public boolean tryLock(long timeout, TimeUnit unit) throws Exception {
        long start = System.currentTimeMillis();
        long end = start + unit.toMillis(timeout);
        
        while (System.currentTimeMillis() < end) {
            if (tryLock()) {
                return true;
            }
            Thread.sleep(100);
        }
        return false;
    }

    public void unlock() throws KeeperException, InterruptedException {
        zk.delete(currentLock, -1);
        currentLock = null;
        if (zk != null) {
            zk.close();
        }
    }
}

3.3 使用示例

public class DistributedLockExample {
    public static void main(String[] args) {
        String zkServers = "localhost:2181";
        String resourceKey = "order_123";
        
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                ZkDistributedLock lock = null;
                try {
                    lock = new ZkDistributedLock(zkServers);
                    System.out.println(Thread.currentThread().getName() + " 尝试获取锁");
                    
                    if (lock.tryLock(5, TimeUnit.SECONDS)) {
                        System.out.println(Thread.currentThread().getName() + " 获取锁成功");
                        // 模拟业务处理
                        Thread.sleep(2000);
                    } else {
                        System.out.println(Thread.currentThread().getName() + " 获取锁超时");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if (lock != null) {
                        try {
                            lock.unlock();
                            System.out.println(Thread.currentThread().getName() + " 释放锁");
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            }, "Thread-" + i).start();
        }
    }
}

4. 实际应用场景

4.1 电商秒杀系统

public class SeckillService {
    private ZkDistributedLock lock;
    
    public SeckillService(String zkServers) throws Exception {
        this.lock = new ZkDistributedLock(zkServers);
    }
    
    public boolean seckill(String productId, String userId) {
        try {
            if (lock.tryLock(3, TimeUnit.SECONDS)) {
                // 1. 查询库存
                int stock = getStockFromDB(productId);
                if (stock <= 0) {
                    return false;
                }
                // 2. 扣减库存
                reduceStock(productId);
                // 3. 创建订单
                createOrder(productId, userId);
                return true;
            }
            return false;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        } finally {
            try {
                lock.unlock();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    // 其他方法...
}

4.2 分布式定时任务

public class DistributedScheduler {
    private ZkDistributedLock lock;
    
    public DistributedScheduler(String zkServers) throws Exception {
        this.lock = new ZkDistributedLock(zkServers);
    }
    
    public void scheduleTask() {
        try {
            if (lock.tryLock(0, TimeUnit.SECONDS)) {
                // 只有获取锁的节点执行任务
                executeTask();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                lock.unlock();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    private void executeTask() {
        // 执行定时任务逻辑
        System.out.println("执行定时任务: " + new Date());
    }
}

5. 优化与注意事项

  1. 锁重入问题:如果需要支持同一线程重入,需要额外记录线程信息
  2. 锁等待队列:公平锁实现需要考虑等待队列的顺序
  3. 超时处理:合理设置锁获取超时时间,避免死锁
  4. 连接恢复:处理Zookeeper连接断开后的重连机制
  5. 锁释放:确保锁最终能被释放,避免死锁
  6. 性能考虑:高频锁操作场景下,Zookeeper可能成为性能瓶颈

6. 其他实现方案对比

  1. Curator框架:Apache Curator提供了更高级的分布式锁实现

    InterProcessMutex lock = new InterProcessMutex(client, "/lock_path");
    lock.acquire();
    try {
        // 业务逻辑
    } finally {
        lock.release();
    }
    
  2. Redis分布式锁:基于SETNX命令实现,性能更高但一致性较弱

  3. 数据库分布式锁:基于唯一索引或乐观锁实现,简单但性能较差

Zookeeper分布式锁适合对一致性要求高的场景,而Redis分布式锁适合高性能但对一致性要求相对宽松的场景。


网站公告

今日签到

点亮在社区的每一天
去签到