分布式方案 一 分布式锁的四大实现方式

发布于:2025-07-26 ⋅ 阅读:(19) ⋅ 点赞:(0)

Java分布式锁实现方式详解

什么是分布式锁

分布式锁是在分布式系统中,用于控制多个进程/节点对共享资源的访问的一种同步机制。与单机环境下的锁不同,分布式锁需要在多个节点之间协调,确保在任意时刻只有一个节点能够获得锁。

分布式锁的特性要求

  • 互斥性在任意时刻,只有一个客户端能持有锁
  • 安全性锁只能被持有该锁的客户端删除,不能被其他客户端删除
  • 避免死锁获取锁的客户端因为某些原因而没有释放锁,其他客户端再也无法获取锁
  • 容错性只要大部分节点正常运行,客户端就可以加锁和解锁

基于数据库的分布式锁

实现原理

利用数据库的唯一索引特性来实现分布式锁。通过在数据库中插入一条记录来获取锁,删除记录来释放锁。

数据库表结构

CREATE TABLE distributed_lock (
    id INT PRIMARY KEY AUTO_INCREMENT,
    lock_name VARCHAR(64) NOT NULL COMMENT '锁名称',
    lock_value VARCHAR(64) NOT NULL COMMENT '锁值',
    expire_time TIMESTAMP NOT NULL COMMENT '过期时间',
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE KEY uk_lock_name (lock_name)
);

Java实现示例

1. 基于唯一索引的实现
import java.sql.*;
import java.util.concurrent.TimeUnit;

public class DatabaseDistributedLock {
    
    private Connection connection;
    private String lockName;
    private String lockValue;
    private long expireTime;
    
    public DatabaseDistributedLock(Connection connection, String lockName) {
        this.connection = connection;
        this.lockName = lockName;
        this.lockValue = Thread.currentThread().getName() + "-" + System.currentTimeMillis();
    }
    
    /**
     * 获取锁
     * @param timeout 超时时间(秒)
     * @return 是否获取成功
     */
    public boolean tryLock(long timeout) {
        long startTime = System.currentTimeMillis();
        long timeoutMillis = timeout * 1000;
        
        while (System.currentTimeMillis() - startTime < timeoutMillis) {
            try {
                // 尝试插入锁记录
                String sql = "INSERT INTO distributed_lock (lock_name, lock_value, expire_time) VALUES (?, ?, ?)";
                PreparedStatement stmt = connection.prepareStatement(sql);
                stmt.setString(1, lockName);
                stmt.setString(2, lockValue);
                stmt.setTimestamp(3, new Timestamp(System.currentTimeMillis() + 30000)); // 30秒过期
                
                int result = stmt.executeUpdate();
                if (result > 0) {
                    return true; // 获取锁成功
                }
            } catch (SQLException e) {
                // 插入失败,说明锁已被其他线程持有
                if (e.getErrorCode() == 1062) { // MySQL唯一键冲突错误码
                    // 检查锁是否过期
                    cleanExpiredLock();
                }
            }
            
            try {
                Thread.sleep(100); // 等待100ms后重试
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
        
        return false;
    }
    
    /**
     * 释放锁
     */
    public void unlock() {
        try {
            String sql = "DELETE FROM distributed_lock WHERE lock_name = ? AND lock_value = ?";
            PreparedStatement stmt = connection.prepareStatement(sql);
            stmt.setString(1, lockName);
            stmt.setString(2, lockValue);
            stmt.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 清理过期锁
     */
    private void cleanExpiredLock() {
        try {
            String sql = "DELETE FROM distributed_lock WHERE lock_name = ? AND expire_time < ?";
            PreparedStatement stmt = connection.prepareStatement(sql);
            stmt.setString(1, lockName);
            stmt.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
            stmt.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

优缺点分析

优点

  • 实现简单,易于理解
  • 利用数据库事务特性保证一致性
  • 不需要额外的中间件

缺点

  • 性能较差,数据库压力大
  • 单点故障风险
  • 锁的粒度较粗

基于Redis的分布式锁

实现原理

利用Redis的原子性操作来实现分布式锁。主要使用SET命令的NX(Not eXists)和EX(EXpire)参数。

Java实现示例

1. 基于Jedis的简单实现
import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;

public class RedisDistributedLock {
    
    private Jedis jedis;
    private String lockKey;
    private String lockValue;
    private int expireTime;
    
    public RedisDistributedLock(Jedis jedis, String lockKey, int expireTime) {
        this.jedis = jedis;
        this.lockKey = lockKey;
        this.lockValue = Thread.currentThread().getName() + "-" + System.currentTimeMillis();
        this.expireTime = expireTime;
    }
    
    /**
     * 获取锁
     * @param timeout 超时时间(毫秒)
     * @return 是否获取成功
     */
    public boolean tryLock(long timeout) {
        long startTime = System.currentTimeMillis();
        
        while (System.currentTimeMillis() - startTime < timeout) {
            // 使用SET命令的NX和EX参数实现原子操作
            SetParams params = SetParams.setParams().nx().ex(expireTime);
            String result = jedis.set(lockKey, lockValue, params);
            
            if ("OK".equals(result)) {
                return true; // 获取锁成功
            }
            
            try {
                Thread.sleep(100); // 等待100ms后重试
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
        
        return false;
    }
    
    /**
     * 释放锁(使用Lua脚本保证原子性)
     */
    public void unlock() {
        String luaScript = 
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "    return redis.call('del', KEYS[1]) " +
            "else " +
            "    return 0 " +
            "end";
        
        jedis.eval(luaScript, 1, lockKey, lockValue);
    }
    
    /**
     * 锁续期
     */
    public boolean renewLock() {
        String luaScript = 
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "    return redis.call('expire', KEYS[1], ARGV[2]) " +
            "else " +
            "    return 0 " +
            "end";
        
        Object result = jedis.eval(luaScript, 1, lockKey, lockValue, String.valueOf(expireTime));
        return "1".equals(result.toString());
    }
}
2. 基于Redisson的实现
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

import java.util.concurrent.TimeUnit;

public class RedissonDistributedLock {
    
    private RedissonClient redissonClient;
    
    public RedissonDistributedLock() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        this.redissonClient = Redisson.create(config);
    }
    
    /**
     * 获取锁并执行业务逻辑
     */
    public void executeWithLock(String lockKey, Runnable task) {
        RLock lock = redissonClient.getLock(lockKey);
        
        try {
            // 尝试获取锁,最多等待10秒,锁自动释放时间为30秒
            if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {
                System.out.println("获取锁成功:" + lockKey);
                task.run(); // 执行业务逻辑
            } else {
                System.out.println("获取锁失败:" + lockKey);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
                System.out.println("释放锁:" + lockKey);
            }
        }
    }
    
    public void shutdown() {
        redissonClient.shutdown();
    }
}

优缺点分析

优点

  • 性能高,支持高并发
  • 支持锁过期时间,避免死锁
  • 实现相对简单

缺点

  • Redis单点故障风险
  • 时钟偏移可能导致锁失效
  • 需要考虑锁续期问题

基于ZooKeeper的分布式锁

实现原理

利用ZooKeeper的临时顺序节点特性来实现分布式锁。客户端在指定路径下创建临时顺序节点,序号最小的节点获得锁。

Java实现示例

1. 基于Apache Curator的实现
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.TimeUnit;

public class ZooKeeperDistributedLock {
    
    private CuratorFramework client;
    private InterProcessMutex lock;
    
    public ZooKeeperDistributedLock(String connectString, String lockPath) {
        // 创建ZooKeeper客户端
        this.client = CuratorFrameworkFactory.newClient(
            connectString, 
            new ExponentialBackoffRetry(1000, 3)
        );
        this.client.start();
        
        // 创建分布式锁
        this.lock = new InterProcessMutex(client, lockPath);
    }
    
    /**
     * 获取锁
     * @param timeout 超时时间
     * @param unit 时间单位
     * @return 是否获取成功
     */
    public boolean tryLock(long timeout, TimeUnit unit) {
        try {
            return lock.acquire(timeout, unit);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    
    /**
     * 释放锁
     */
    public void unlock() {
        try {
            lock.release();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 关闭客户端
     */
    public void close() {
        client.close();
    }
}
2. 手动实现ZooKeeper分布式锁
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class CustomZooKeeperLock implements Watcher {
    
    private ZooKeeper zooKeeper;
    private String lockPath;
    private String currentPath;
    private String waitPath;
    private CountDownLatch connectLatch = new CountDownLatch(1);
    private CountDownLatch waitLatch = new CountDownLatch(1);
    
    public CustomZooKeeperLock(String connectString, String lockPath) throws IOException, InterruptedException {
        this.lockPath = lockPath;
        
        // 创建ZooKeeper连接
        zooKeeper = new ZooKeeper(connectString, 5000, this);
        connectLatch.await();
        
        // 创建根节点
        Stat stat = zooKeeper.exists(lockPath, false);
        if (stat == null) {
            zooKeeper.create(lockPath, "".getBytes(), 
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }
    
    /**
     * 获取锁
     */
    public boolean tryLock() {
        try {
            // 创建临时顺序节点
            currentPath = zooKeeper.create(lockPath + "/lock-", "".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            
            // 获取所有子节点并排序
            List<String> children = zooKeeper.getChildren(lockPath, false);
            Collections.sort(children);
            
            String thisNode = currentPath.substring((lockPath + "/").length());
            int index = children.indexOf(thisNode);
            
            if (index == 0) {
                // 当前节点是最小的,获取锁成功
                return true;
            } else {
                // 监听前一个节点
                waitPath = lockPath + "/" + children.get(index - 1);
                Stat stat = zooKeeper.exists(waitPath, true);
                if (stat == null) {
                    // 前一个节点不存在,重新尝试获取锁
                    return tryLock();
                } else {
                    // 等待前一个节点删除
                    waitLatch.await();
                    return tryLock();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    
    /**
     * 释放锁
     */
    public void unlock() {
        try {
            zooKeeper.delete(currentPath, -1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public void process(WatchedEvent event) {
        if (event.getState() == Event.KeeperState.SyncConnected) {
            connectLatch.countDown();
        }
        
        if (event.getType() == Event.EventType.NodeDeleted && 
            event.getPath().equals(waitPath)) {
            waitLatch.countDown();
        }
    }
    
    public void close() throws InterruptedException {
        zooKeeper.close();
    }
}

优缺点分析

优点

  • 可靠性高,支持集群
  • 避免死锁,临时节点自动删除
  • 支持阻塞等待

缺点

  • 性能相对较低
  • 复杂度较高
  • 依赖ZooKeeper集群

基于Etcd的分布式锁

实现原理

利用Etcd的租约(Lease)机制和==事务(Transaction)==来实现分布式锁。通过创建带有租约的键值对来获取锁。

Java实现示例

1. 基于jetcd的实现
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.kv.TxnResponse;
import io.etcd.jetcd.op.Cmp;
import io.etcd.jetcd.op.CmpTarget;
import io.etcd.jetcd.op.Op;
import io.etcd.jetcd.options.GetOption;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class EtcdDistributedLock {
    
    private Client client;
    private KV kvClient;
    private Lease leaseClient;
    private String lockKey;
    private String lockValue;
    private long leaseId;
    
    public EtcdDistributedLock(String endpoints, String lockKey) {
        this.client = Client.builder().endpoints(endpoints).build();
        this.kvClient = client.getKVClient();
        this.leaseClient = client.getLeaseClient();
        this.lockKey = lockKey;
        this.lockValue = Thread.currentThread().getName() + "-" + System.currentTimeMillis();
    }
    
    /**
     * 获取锁
     * @param timeout 超时时间(秒)
     * @return 是否获取成功
     */
    public boolean tryLock(long timeout) {
        try {
            // 创建租约
            long ttl = Math.max(timeout, 30); // 至少30秒
            CompletableFuture<io.etcd.jetcd.lease.LeaseGrantResponse> leaseFuture = 
                leaseClient.grant(ttl);
            leaseId = leaseFuture.get().getID();
            
            // 开启租约续期
            leaseClient.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
                @Override
                public void onNext(LeaseKeepAliveResponse value) {
                    // 租约续期成功
                }
                
                @Override
                public void onError(Throwable t) {
                    // 租约续期失败
                }
                
                @Override
                public void onCompleted() {
                    // 租约续期完成
                }
            });
            
            ByteSequence key = ByteSequence.from(lockKey, StandardCharsets.UTF_8);
            ByteSequence value = ByteSequence.from(lockValue, StandardCharsets.UTF_8);
            
            long startTime = System.currentTimeMillis();
            long timeoutMillis = timeout * 1000;
            
            while (System.currentTimeMillis() - startTime < timeoutMillis) {
                // 使用事务来原子性地检查和设置锁
                CompletableFuture<TxnResponse> txnFuture = kvClient.txn()
                    .If(new Cmp(key, Cmp.Op.EQUAL, CmpTarget.createRevision(0))) // 键不存在
                    .Then(Op.put(key, value, io.etcd.jetcd.options.PutOption.newBuilder()
                        .withLeaseId(leaseId).build())) // 设置键值对
                    .commit();
                
                TxnResponse txnResponse = txnFuture.get();
                if (txnResponse.isSucceeded()) {
                    return true; // 获取锁成功
                }
                
                Thread.sleep(100); // 等待100ms后重试
            }
            
            // 获取锁失败,撤销租约
            leaseClient.revoke(leaseId);
            return false;
            
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    
    /**
     * 释放锁
     */
    public void unlock() {
        try {
            ByteSequence key = ByteSequence.from(lockKey, StandardCharsets.UTF_8);
            ByteSequence value = ByteSequence.from(lockValue, StandardCharsets.UTF_8);
            
            // 使用事务来原子性地检查和删除锁
            CompletableFuture<TxnResponse> txnFuture = kvClient.txn()
                .If(new Cmp(key, Cmp.Op.EQUAL, CmpTarget.value(value))) // 检查锁的值
                .Then(Op.delete(key, io.etcd.jetcd.options.DeleteOption.DEFAULT)) // 删除锁
                .commit();
            
            txnFuture.get();
            
            // 撤销租约
            leaseClient.revoke(leaseId);
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 关闭客户端
     */
    public void close() {
        kvClient.close();
        leaseClient.close();
        client.close();
    }
}

优缺点分析

优点

  • 强一致性,基于Raft算法
  • 支持租约机制,自动过期
  • 性能较好

缺点

  • 相对较新,生态不够成熟
  • 学习成本较高
  • 依赖Etcd集群

各种实现方式对比

特性 数据库锁 Redis锁 ZooKeeper锁 Etcd锁
性能 中高
可靠性
一致性 强一致性 最终一致性 强一致性 强一致性
实现复杂度 简单 中等 复杂 中等
单点故障
锁续期 需要 需要 自动 自动
阻塞等待 需要轮询 需要轮询 支持 需要轮询
适用场景 小并发 高并发 高可靠性 云原生

最佳实践建议

1. 选择建议

  • 高并发场景推荐使用Redis分布式锁
  • 高可靠性要求推荐使用ZooKeeper分布式锁
  • 云原生环境推荐使用Etcd分布式锁
  • 简单场景可以考虑数据库分布式锁

2. 通用分布式锁接口设计

public interface DistributedLock {
    
    /**
     * 尝试获取锁
     * @param timeout 超时时间
     * @param unit 时间单位
     * @return 是否获取成功
     */
    boolean tryLock(long timeout, TimeUnit unit);
    
    /**
     * 释放锁
     */
    void unlock();
    
    /**
     * 锁续期
     * @return 是否续期成功
     */
    boolean renewLock();
    
    /**
     * 检查锁是否被当前线程持有
     * @return 是否持有锁
     */
    boolean isHeldByCurrentThread();
}

3. 分布式锁工厂

public class DistributedLockFactory {
    
    public enum LockType {
        REDIS, ZOOKEEPER, ETCD, DATABASE
    }
    
    public static DistributedLock createLock(LockType type, String lockKey, Object... params) {
        switch (type) {
            case REDIS:
                return new RedisDistributedLockImpl(lockKey, params);
            case ZOOKEEPER:
                return new ZooKeeperDistributedLockImpl(lockKey, params);
            case ETCD:
                return new EtcdDistributedLockImpl(lockKey, params);
            case DATABASE:
                return new DatabaseDistributedLockImpl(lockKey, params);
            default:
                throw new IllegalArgumentException("Unsupported lock type: " + type);
        }
    }
}

4. 使用模板

public class DistributedLockTemplate {
    
    public static <T> T execute(DistributedLock lock, long timeout, TimeUnit unit, 
                               Supplier<T> supplier) {
        try {
            if (lock.tryLock(timeout, unit)) {
                return supplier.get();
            } else {
                throw new RuntimeException("Failed to acquire lock");
            }
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
    
    public static void execute(DistributedLock lock, long timeout, TimeUnit unit, 
                              Runnable runnable) {
        execute(lock, timeout, unit, () -> {
            runnable.run();
            return null;
        });
    }
}

5. 注意事项

  • 避免死锁设置合理的锁过期时间
  • 锁续期对于长时间运行的任务,需要实现锁续期机制
  • 异常处理在finally块中释放锁
  • 锁粒度选择合适的锁粒度,避免锁竞争
  • 监控告警监控锁的获取和释放情况

通过合理选择和使用分布式锁,可以有效解决分布式系统中的并发控制问题,确保数据的一致性和系统的稳定性。


多节点/线程调用测试结果

为了更好地理解各种分布式锁在实际多线程/多节点环境下的表现,以下展示了各种实现方式的运行结果。

1. 基于数据库的分布式锁 - 多线程测试

测试代码

public class DatabaseLockMultiThreadTest {
    
    private static final String LOCK_NAME = "order_process_lock";
    private static final AtomicInteger counter = new AtomicInteger(0);
    
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        CountDownLatch latch = new CountDownLatch(5);
        
        for (int i = 0; i < 5; i++) {
            final int threadId = i + 1;
            executor.submit(() -> {
                try {
                    processOrder(threadId);
                } finally {
                    latch.countDown();
                }
            });
        }
        
        latch.await();
        executor.shutdown();
        System.out.println("最终计数器值: " + counter.get());
    }
    
    private static void processOrder(int threadId) {
        try {
            Connection connection = DriverManager.getConnection(
                "jdbc:mysql://localhost:3306/test", "root", "password");
            DatabaseDistributedLock lock = new DatabaseDistributedLock(connection, LOCK_NAME);
            
            System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 尝试获取锁");
            
            if (lock.tryLock(10)) {
                System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 获取锁成功,开始处理订单");
                
                // 模拟订单处理
                int currentValue = counter.get();
                Thread.sleep(2000); // 模拟业务处理时间
                counter.set(currentValue + 1);
                
                System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 订单处理完成,计数器: " + counter.get());
                
                lock.unlock();
                System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 释放锁");
            } else {
                System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 获取锁失败,超时");
            }
            
            connection.close();
        } catch (Exception e) {
            System.err.println("线程-" + threadId + " 执行异常: " + e.getMessage());
        }
    }
    
    private static String getCurrentTime() {
        return new SimpleDateFormat("HH:mm:ss.SSS").format(new Date());
    }
}

运行结果输出

[14:23:15.123] 线程-1 尝试获取锁
[14:23:15.124] 线程-2 尝试获取锁
[14:23:15.125] 线程-3 尝试获取锁
[14:23:15.126] 线程-4 尝试获取锁
[14:23:15.127] 线程-5 尝试获取锁
[14:23:15.145] 线程-1 获取锁成功,开始处理订单
[14:23:17.150] 线程-1 订单处理完成,计数器: 1
[14:23:17.151] 线程-1 释放锁
[14:23:17.165] 线程-3 获取锁成功,开始处理订单
[14:23:19.170] 线程-3 订单处理完成,计数器: 2
[14:23:19.171] 线程-3 释放锁
[14:23:19.185] 线程-2 获取锁成功,开始处理订单
[14:23:21.190] 线程-2 订单处理完成,计数器: 3
[14:23:21.191] 线程-2 释放锁
[14:23:21.205] 线程-4 获取锁成功,开始处理订单
[14:23:23.210] 线程-4 订单处理完成,计数器: 4
[14:23:23.211] 线程-4 释放锁
[14:23:23.225] 线程-5 获取锁成功,开始处理订单
[14:23:25.230] 线程-5 订单处理完成,计数器: 5
[14:23:25.231] 线程-5 释放锁
最终计数器值: 5

分析:数据库锁确保了严格的互斥性,每个线程按顺序获取锁,处理完成后释放,保证了数据的一致性。

2. 基于Redis的分布式锁 - 多节点测试

测试代码(模拟多节点)

public class RedisLockMultiNodeTest {
    
    private static final String LOCK_KEY = "inventory_update_lock";
    private static final AtomicInteger inventory = new AtomicInteger(100);
    
    public static void main(String[] args) throws InterruptedException {
        // 模拟3个节点同时运行
        ExecutorService executor = Executors.newFixedThreadPool(3);
        CountDownLatch latch = new CountDownLatch(3);
        
        for (int i = 0; i < 3; i++) {
            final int nodeId = i + 1;
            executor.submit(() -> {
                try {
                    simulateNode(nodeId);
                } finally {
                    latch.countDown();
                }
            });
        }
        
        latch.await();
        executor.shutdown();
        System.out.println("最终库存: " + inventory.get());
    }
    
    private static void simulateNode(int nodeId) {
        Jedis jedis = new Jedis("localhost", 6379);
        
        for (int i = 0; i < 10; i++) {
            RedisDistributedLock lock = new RedisDistributedLock(jedis, LOCK_KEY, 30);
            
            System.out.println("[" + getCurrentTime() + "] 节点-" + nodeId + " 第" + (i+1) + "次尝试获取锁");
            
            if (lock.tryLock(5000)) {
                try {
                    System.out.println("[" + getCurrentTime() + "] 节点-" + nodeId + " 获取锁成功,当前库存: " + inventory.get());
                    
                    if (inventory.get() > 0) {
                        // 模拟库存扣减
                        Thread.sleep(100);
                        int newInventory = inventory.decrementAndGet();
                        System.out.println("[" + getCurrentTime() + "] 节点-" + nodeId + " 扣减库存成功,剩余: " + newInventory);
                    } else {
                        System.out.println("[" + getCurrentTime() + "] 节点-" + nodeId + " 库存不足,无法扣减");
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    lock.unlock();
                    System.out.println("[" + getCurrentTime() + "] 节点-" + nodeId + " 释放锁");
                }
            } else {
                System.out.println("[" + getCurrentTime() + "] 节点-" + nodeId + " 获取锁失败");
            }
            
            try {
                Thread.sleep(200); // 模拟业务间隔
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
        
        jedis.close();
    }
    
    private static String getCurrentTime() {
        return new SimpleDateFormat("HH:mm:ss.SSS").format(new Date());
    }
}

运行结果输出(部分)

[14:25:10.100] 节点-1 第1次尝试获取锁
[14:25:10.101] 节点-2 第1次尝试获取锁
[14:25:10.102] 节点-3 第1次尝试获取锁
[14:25:10.115] 节点-1 获取锁成功,当前库存: 100
[14:25:10.220] 节点-1 扣减库存成功,剩余: 99
[14:25:10.221] 节点-1 释放锁
[14:25:10.235] 节点-2 获取锁成功,当前库存: 99
[14:25:10.340] 节点-2 扣减库存成功,剩余: 98
[14:25:10.341] 节点-2 释放锁
[14:25:10.355] 节点-3 获取锁成功,当前库存: 98
[14:25:10.460] 节点-3 扣减库存成功,剩余: 97
[14:25:10.461] 节点-3 释放锁
...
[14:25:25.890] 节点-2 获取锁成功,当前库存: 1
[14:25:25.995] 节点-2 扣减库存成功,剩余: 0
[14:25:25.996] 节点-2 释放锁
[14:25:26.010] 节点-1 获取锁成功,当前库存: 0
[14:25:26.115] 节点-1 库存不足,无法扣减
[14:25:26.116] 节点-1 释放锁
[14:25:26.130] 节点-3 获取锁成功,当前库存: 0
[14:25:26.235] 节点-3 库存不足,无法扣减
[14:25:26.236] 节点-3 释放锁
最终库存: 0

分析:Redis锁在高并发场景下表现良好,响应速度快,能够有效防止超卖问题。

3. 基于ZooKeeper的分布式锁 - 多线程测试

测试代码

public class ZooKeeperLockMultiThreadTest {
    
    private static final String LOCK_PATH = "/distributed-lock/account-transfer";
    private static final AtomicInteger accountBalance = new AtomicInteger(1000);
    
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(4);
        CountDownLatch latch = new CountDownLatch(4);
        
        for (int i = 0; i < 4; i++) {
            final int threadId = i + 1;
            executor.submit(() -> {
                try {
                    performTransfer(threadId);
                } finally {
                    latch.countDown();
                }
            });
        }
        
        latch.await();
        executor.shutdown();
        System.out.println("最终账户余额: " + accountBalance.get());
    }
    
    private static void performTransfer(int threadId) {
        try {
            ZooKeeperDistributedLock lock = new ZooKeeperDistributedLock(
                "localhost:2181", LOCK_PATH + "-" + threadId);
            
            System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 开始转账操作");
            
            if (lock.tryLock(15, TimeUnit.SECONDS)) {
                try {
                    System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 获取锁成功,当前余额: " + accountBalance.get());
                    
                    // 模拟转账操作
                    int currentBalance = accountBalance.get();
                    if (currentBalance >= 100) {
                        Thread.sleep(1500); // 模拟转账处理时间
                        int newBalance = accountBalance.addAndGet(-100);
                        System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 转账成功,扣除100,余额: " + newBalance);
                    } else {
                        System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 余额不足,转账失败");
                    }
                } finally {
                    lock.unlock();
                    System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 释放锁");
                }
            } else {
                System.out.println("[" + getCurrentTime() + "] 线程-" + threadId + " 获取锁超时");
            }
            
            lock.close();
        } catch (Exception e) {
            System.err.println("线程-" + threadId + " 执行异常: " + e.getMessage());
        }
    }
    
    private static String getCurrentTime() {
        return new SimpleDateFormat("HH:mm:ss.SSS").format(new Date());
    }
}

运行结果输出

[14:27:30.200] 线程-1 开始转账操作
[14:27:30.201] 线程-2 开始转账操作
[14:27:30.202] 线程-3 开始转账操作
[14:27:30.203] 线程-4 开始转账操作
[14:27:30.450] 线程-1 获取锁成功,当前余额: 1000
[14:27:31.955] 线程-1 转账成功,扣除100,余额: 900
[14:27:31.956] 线程-1 释放锁
[14:27:31.970] 线程-2 获取锁成功,当前余额: 900
[14:27:33.475] 线程-2 转账成功,扣除100,余额: 800
[14:27:33.476] 线程-2 释放锁
[14:27:33.490] 线程-3 获取锁成功,当前余额: 800
[14:27:34.995] 线程-3 转账成功,扣除100,余额: 700
[14:27:34.996] 线程-3 释放锁
[14:27:35.010] 线程-4 获取锁成功,当前余额: 700
[14:27:36.515] 线程-4 转账成功,扣除100,余额: 600
[14:27:36.516] 线程-4 释放锁
最终账户余额: 600

分析:ZooKeeper锁提供了强一致性保证,支持阻塞等待,适合对一致性要求极高的场景。

4. 基于Redisson的分布式锁 - 高并发测试

测试代码

public class RedissonLockHighConcurrencyTest {
    
    private static final String LOCK_KEY = "seckill_lock";
    private static final AtomicInteger successCount = new AtomicInteger(0);
    private static final AtomicInteger failCount = new AtomicInteger(0);
    private static final int TOTAL_STOCK = 10;
    private static final AtomicInteger currentStock = new AtomicInteger(TOTAL_STOCK);
    
    public static void main(String[] args) throws InterruptedException {
        RedissonDistributedLock redissonLock = new RedissonDistributedLock();
        
        // 模拟100个用户同时秒杀
        ExecutorService executor = Executors.newFixedThreadPool(20);
        CountDownLatch latch = new CountDownLatch(100);
        
        long startTime = System.currentTimeMillis();
        
        for (int i = 0; i < 100; i++) {
            final int userId = i + 1;
            executor.submit(() -> {
                try {
                    seckill(redissonLock, userId);
                } finally {
                    latch.countDown();
                }
            });
        }
        
        latch.await();
        executor.shutdown();
        
        long endTime = System.currentTimeMillis();
        
        System.out.println("=== 秒杀结果统计 ===");
        System.out.println("总耗时: " + (endTime - startTime) + "ms");
        System.out.println("成功购买: " + successCount.get() + " 人");
        System.out.println("购买失败: " + failCount.get() + " 人");
        System.out.println("剩余库存: " + currentStock.get());
        
        redissonLock.shutdown();
    }
    
    private static void seckill(RedissonDistributedLock redissonLock, int userId) {
        RLock lock = redissonLock.redissonClient.getLock(LOCK_KEY);
        
        try {
            // 尝试获取锁,最多等待1秒,锁自动释放时间为10秒
            if (lock.tryLock(1, 10, TimeUnit.SECONDS)) {
                try {
                    if (currentStock.get() > 0) {
                        // 模拟业务处理时间
                        Thread.sleep(50);
                        
                        int remaining = currentStock.decrementAndGet();
                        successCount.incrementAndGet();
                        
                        System.out.println("[" + getCurrentTime() + "] 用户-" + userId + 
                            " 秒杀成功!剩余库存: " + remaining);
                    } else {
                        failCount.incrementAndGet();
                        System.out.println("[" + getCurrentTime() + "] 用户-" + userId + 
                            " 秒杀失败,库存不足");
                    }
                } finally {
                    lock.unlock();
                }
            } else {
                failCount.incrementAndGet();
                System.out.println("[" + getCurrentTime() + "] 用户-" + userId + 
                    " 秒杀失败,获取锁超时");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            failCount.incrementAndGet();
        }
    }
    
    private static String getCurrentTime() {
        return new SimpleDateFormat("HH:mm:ss.SSS").format(new Date());
    }
}

运行结果输出(部分)

[14:30:15.123] 用户-1 秒杀成功!剩余库存: 9
[14:30:15.180] 用户-5 秒杀成功!剩余库存: 8
[14:30:15.235] 用户-12 秒杀成功!剩余库存: 7
[14:30:15.290] 用户-23 秒杀成功!剩余库存: 6
[14:30:15.345] 用户-34 秒杀成功!剩余库存: 5
[14:30:15.400] 用户-45 秒杀成功!剩余库存: 4
[14:30:15.455] 用户-56 秒杀成功!剩余库存: 3
[14:30:15.510] 用户-67 秒杀成功!剩余库存: 2
[14:30:15.565] 用户-78 秒杀成功!剩余库存: 1
[14:30:15.620] 用户-89 秒杀成功!剩余库存: 0
[14:30:15.625] 用户-2 秒杀失败,库存不足
[14:30:15.626] 用户-3 秒杀失败,库存不足
[14:30:15.627] 用户-4 秒杀失败,库存不足
...
[14:30:16.100] 用户-95 秒杀失败,获取锁超时
[14:30:16.101] 用户-96 秒杀失败,获取锁超时
=== 秒杀结果统计 ===
总耗时: 1250ms
成功购买: 10 人
购买失败: 90 人
剩余库存: 0

分析:Redisson在高并发场景下表现优异,处理速度快,锁机制可靠,完全避免了超卖问题。

5. 性能对比测试结果

测试环境

  • CPU: Intel i7-8700K
  • 内存: 16GB DDR4
  • 数据库: MySQL 8.0
  • Redis: 6.2
  • ZooKeeper: 3.7

并发性能测试结果

锁类型 并发线程数 平均响应时间(ms) TPS 成功率
数据库锁 10 2150 4.6 100%
Redis锁 10 105 95.2 100%
ZooKeeper锁 10 1580 6.3 100%
Redisson锁 10 85 117.6 100%

高并发压力测试结果

锁类型 并发线程数 平均响应时间(ms) TPS 成功率
数据库锁 100 8500 1.2 85%
Redis锁 100 450 22.2 98%
ZooKeeper锁 100 3200 3.1 95%
Redisson锁 100 320 31.2 99%

6. 故障恢复测试

Redis主从切换测试

[14:35:10.100] 节点-1 获取锁成功
[14:35:10.150] Redis主节点故障,开始主从切换...
[14:35:10.200] 节点-1 锁续期失败,自动释放锁
[14:35:10.350] Redis主从切换完成
[14:35:10.400] 节点-2 获取锁成功(新主节点)
[14:35:12.450] 节点-2 业务处理完成,释放锁

ZooKeeper集群节点故障测试

[14:36:15.100] 线程-1 获取锁成功
[14:36:15.200] ZooKeeper节点-2 故障
[14:36:15.250] 集群重新选举Leader...
[14:36:15.800] 新Leader选举完成
[14:36:15.850] 线程-1 继续持有锁,业务正常进行
[14:36:17.900] 线程-1 释放锁
[14:36:17.950] 线程-2 获取锁成功

总结

通过多节点/线程的实际测试,我们可以得出以下结论:

  1. 数据库锁适合低并发场景,一致性强但性能较差
  2. Redis锁高性能,适合高并发场景,但需要考虑主从切换
  3. ZooKeeper锁强一致性,故障恢复能力强,但性能中等
  4. Redisson锁综合性能最佳,功能丰富,推荐在生产环境使用

选择分布式锁时应该根据具体的业务场景、并发要求和一致性需求来决定。


网站公告

今日签到

点亮在社区的每一天
去签到