从Redisson源码角度深入理解Redis分布式锁的正确实现

发布于:2025-08-11 ⋅ 阅读:(18) ⋅ 点赞:(0)


1. 为什么使用分布式锁

拿实际开发商品库存的例子来说,我们在开发中进行删减库存可能会写出这样的代码:

但是这样无疑会出现并发问题

如图所示,两个请求进行删减库存, 最后数据库存储的数据却只删减了一次

解决这个问题的直接方法就是加锁

但是加什么锁合适? 直接使用Java的synchronized行不行? 就像这样

@Service
public class ProductService {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;


    public void deductStock(String productId) {
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock:" + productId));
        synchronized (this) {
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock:" + productId, realStock + "");
                System.out.println("扣除成功,剩余库存:" + realStock);
            }else {
                System.out.println("扣除失败,库存不足");
            }
        }
    }
}

在单机环境下确实能解决多线程并发问题

但是现在的互联网架构大多都是集群架构,也就是不止一台机器,而synchronized只在单个JVM进行内有效,多台服务器部署时无法跨进程同步.

同时,由于针对的是this加锁,那么一旦别的线程删减库存的商品id与获取锁的线程不一致,也会进行阻塞,但是这是没必要的,我们只需要针对一个商品id的操作进行加锁即可

因此我们需要引入分布式锁,本质上就是使用一个公共服务器来记录加锁状态

这个公共的服务器可以是Redis,也可以是其他组件(如Mysql或者ZooKeeper等) 还可以是我们自己写的一个服务

2. 简单实现分布式锁

redis中提供了setnx命令, 当且进仅当key不存在的时候,将key的值设置为value, 如果key已经存在,不做任何操作

与我们锁实现的功能基本一致

因此我们可以使用这个命令来针对某个productId进行加锁

    public String deductStock(String productId) {
        String lockKey = "lock:product:" + productId;
        boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"lock");
        if(!result) {
            return "waiting";
        }
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock:" + productId));
        if (stock > 0) {
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock:" + productId, realStock + "");
            System.out.println("扣除成功,剩余库存:" + realStock);
        }else {
            System.out.println("扣除失败,库存不足");
        }
        stringRedisTemplate.delete(lockKey);
        return "success";
    }

但是这样的做法会出现死锁问题

最简单直接的现象就是,如果某个线程获取锁后,执行删减库存的过程中,出现了异常, 那么锁将无法释放

因此我们需要将释放锁的步骤放在finally中,确保锁能释放

  public String deductStock(String productId) {
        String lockKey = "lock:product:" + productId;
        boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"lock");
        if(!result) {
            return "waiting";
        }
        try {
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock:" + productId));
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock:" + productId, realStock + "");
                System.out.println("扣除成功,剩余库存:" + realStock);
            }else {
                System.out.println("扣除失败,库存不足");
            }
        }finally {
            stringRedisTemplate.delete(lockKey);
        }
        return "success";
    }

但是,如果在执行删减的过程中,服务器突然宕机了呢?? finally也不能确保执行到

因此为了保险,我们需要加上过期时间,并且需要通过原子命令进行,确保加锁的同时设置过期时间

3. 防止误删的分布式锁

我们上面实现的加上过期时间的分布式锁,实际上还会出现误删的问题,如下:

由于线程1设置的锁提前过期,导致线程2可以占据锁,但是线程1业务执行完后, 又将锁给删除了,但是实际上线程1删除的锁是线程2后来设置的锁, 导致线程2设置的锁直接失效了

此时别的线程又可以占据锁, 而线程2执行完业务逻辑后又将锁给删除了, 由此反复,可能导致锁一直不生效

解决这个方法的核心就是,确保当前线程删除的锁是自己设置的锁, 不能释放别的线程设置的锁

我们可以使用UUID作为唯一标识

能不能使用ThreadId作为唯一标识?? 当然不行,最简单的问题就是, 分布式环境下,不同机器的多个线程id之间可能是重复的

但是很容易发现,释放锁的步骤,不是原子的

这种不是原子的操作,在极端的场景下很可能出现问题

我们将删除锁的步骤单独拿出来说

但是在极端情况下:

也就是由于获取锁对应的value进行判断, 和删除锁, 由于不是原子操作,导致中间如果出现其他情况(如卡顿) , 而此时锁更好过期, 别的线程又刚好获取到锁, 此时线程1就把别的线程的锁释放掉了

究其原因就是 释放锁的步骤不是原子的

但是redis又没有原子的命令能够保证这两个步骤是原子的

实际上通过lua脚本就可以实现(文章后面会讲)

4. 自动锁续命的分布式锁

我们上面的误删问题,本质上还是因为在业务逻辑还没执行完之前, 锁就释放了

那如果我们可以让业务逻辑执行完之前,不断让锁进行续命, 也就是增加超时时间, 不让锁提前过期

那如果直接增加超时时间,如从10s提升到30s? 实际上这样不能从根本上解决问题,因为业务执行的时间是不确定的

因此我们可以在执行业务逻辑的同时,通过后台线程定时去判断锁是否还存在(这个定时时间一定要小于超时时间)

如果还存在,说明业务逻辑还没执行完, 那么就要进行续命

这就是看门狗机制(Watchdog)的思想

会不会出现业务机器宕机,导致一直续命?? 实际上续命的线程和业务机器是同一台,因此当业务机器宕机了, 看门狗自然也会消失

我们当然可以自己实现Watchdog解决方案,但是市面上已经存在很多成熟的开源方案

Redisson就是一个热门完善的实现方案,是一个基于Java的Redis客户端,专门用于分布式场景的Redis操作,如分布式对象,分布式集合,分布式锁等

引入依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.50.0</version>
</dependency>

现在我们通过Redisson来使用分布式锁


    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Autowired
    private RedissonClient  redissonClient;

    public String deductStock(String productId) {
        String lockKey = "lock:product:" + productId;
        RLock lock = redissonClient.getLock(lockKey);
        lock.lock();
        try {
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock:" + productId));
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock:" + productId, realStock + "");
                System.out.println("扣除成功,剩余库存:" + realStock);
            }else {
                System.out.println("扣除失败,库存不足");
            }
        }finally {
            lock.unlock();
        }
        return "success";
    }

5. 深度理解Redisson底层逻辑

redisson一个简单的lock,帮我们实现了阻塞等待, 看门狗续命,防误删等操作

我们先来看他的大概逻辑

5.1. 大概逻辑

5.2. 获取锁的实现

接下来我们看源码,直接查看Redisson中lock的实现方法

可以看到,如果我们不传参数leaseTime那么默认就是-1

第二个lock方法再次调用了重载的lock方法

最核心的方法就是这里的tryAcquire方法

这里传进的waiting是-1,threadId是当前线程的id

接着一层层往下走,就会找到这个方法

由于我们的leaseTime是-1,那么我们执行else方法的逻辑

可以看到,与if分治不同的地方在于,leastTime被替换成了别的变量

一层一层往上找,就能发现这个变量的值默认为30秒

所以我们接下来的重心就在与tryLockInnerAsync方法

这些命令实际上就是我们上面提了一嘴的lua脚本

Lua 的语法类似于 JS, 是一个动态弱类型的语言. Lua 的解释器一般使用 C 语言实现. Lua 语法

简单精炼, 执行速度快, 解释器也比较轻量(Lua 解释器的可执行程序体积只有 200KB 左右).

因此 Lua 经常作为其他程序内部嵌入的脚本语言. Redis 本身就支持 Lua 作为内嵌脚本.

最重要的是,一个 lua 脚本会被 Redis 服务器以原子的方式来执行.

我们先来看参数的对应关系

实际上我们逻辑就类似于

public Long execute(long expireTime,String key,String hKey) {
    if (!redisTemplate.hasKey(hKey) || redisTemplate.opsForHash().hasKey(key, hKey)) {
        redisTemplate.opsForHash().increment(key, hKey, 1);
        redisTemplate.expire(key, expireTime, TimeUnit.MILLISECONDS);
        return null;
    }else {
        return redisTemplate.getExpire(key);
    }
}

但是通过Lua脚本实现就是原子的

这里的key就是源代码里的getRawName方法的返回结果,实际上也就对应着我们一开始传进来的key,也就是我们的锁

而ARG[2],也就是我们自己实现的hKey,是getLockName方法的返回结果,实际上就是一个随机值

我们先可以简单理解, 这个脚本就是尝试去加锁,只是加的锁是通过hash结构存储的(至于为什么后面会讲)

这个锁的超时时间是30s,如果我们自己指定那就是我们指定的值

这里要注意,如果我们获取锁成功,那么返回值就是null, 如果获取失败,那么返回值就是这个锁的剩余时间

5.3. 看门狗的实现

我们回到这个方法调用的地方

通过方法名也可以看出,这个方法实际上是异步执行的

当异步执行完后,会执行这一部分区域的内容

我们上面说过,如果获取锁成功, 返回值就是null

因此我们会进入第一个if分支,又由于leaseTime是-1, 因此会执行scheduleExpirationRenewal方法

从方法名可以猜出, 这个方法就是重置超时时间, 实际上也就对应着看门狗的逻辑!

这里可以看出,看门狗的定时任务逻辑就封装在LockTask中

并且这里的add相当于开启了一个定时任务

我们来看看这个schedule方法

此时会通过<font style="color:rgba(0, 0, 0, 0.85);">schedule</font>方法设置第一个定时任务,间隔为<font style="color:rgba(0, 0, 0, 0.85);">internalLockLeaseTime / 3</font>,如果是默认值,也就是10s

定时任务的<font style="color:rgba(0, 0, 0, 0.85);">run</font>方法(<font style="color:rgba(0, 0, 0, 0.85);">TimerTask</font>接口的核心方法)会在定时时间到达后被执行,执行完成后会再次调用<font style="color:rgba(0, 0, 0, 0.85);">schedule</font>方法,形成循环:

这是定时任务的执行,那么定式任务具体做了什么呢??? 我们要回到添加定时任务前,也就是下面这个方法

定时任务的逻辑就封装在这里面

也就是这个方法

我们来看看下面的Lua脚本

简单说,这个循环的目的是遍历所有传入脚本的键(KEYS 数组中的每个键),对每个键执行后续的<font style="color:rgba(0, 0, 0, 0.85);">hexists</font><font style="color:rgba(0, 0, 0, 0.85);">pexpire</font>操作。也就是实现了批量key的续命

到此,Watchdog的逻辑我们大概就能理解了

5.4. 等待锁的实现

回到一开始的地方,如果我们的线程没有抢占到锁呢??

我们上面讲过会得到这会锁剩余的超时时间,那么接下来会执行什么逻辑??

上面我们讲过,如果ttl是null,那么就说明获取锁成功

因此如果我们获取锁失败,执行的是下面的逻辑

我们来看最主要的while循环

这里实际上就得到了一个信号量

Semaphore维护了一个计数器,线程可以通过调用acquire()方法来获取Semaphore中的许可证,当计数器为0时,调用acquire()的线程将被阻塞,直到有其他线程释放许可证;线程可以通过调用release()方法来释放Semaphore中的许可证,这会使Semaphore中的计数器增加,从而允许更多的线程访问共享资源。

简单来说,如果线程没有加锁成功, 那么就会通过Semaphore进行阻塞ttl的时间,这段时间内是不会阻塞CPU的,

等到ttl到了以后,再通过while循环再次尝试获取锁,由此不断的间歇性的抢占锁

那么可能到这里就会有疑问,如果锁提前释放了呢? 也是会傻等ttl的时间吗??? 那性能也太差了. 实际上不是这样的

实际上,在并发编程里,如果这里阻塞住,一定会有其他地方会在一定条件下唤醒这个线程

实际上就在我们上面的逻辑中

实际上这里就是使用了redis的发布订阅的功能

简单来说,这些没抢到锁的线程, 会在这里订阅了一个频道,相当于进行了监听

这个类实际上就是对发布订阅的封装

那这个频道什么时候会通知这个线程呢?? 可想而知肯定是在解锁的逻辑中

我们通过伪代码来解释

/**
 * 分布式锁解锁核心逻辑(伪代码)
 * @param threadId 当前线程ID
 * @param requestId 本次请求标识(用于生成解锁信号键)
 * @param timeout 超时时间(毫秒)
 * @return 异步结果:1=完全解锁,0=部分解锁,null=解锁失败
 */
protected CompletableFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {
    // 1. 定义关键参数
    String lockKey = getRawName(); // 锁的主键(哈希表)
    String channelKey = getChannelName(); // 通知频道(用于发布解锁消息)
    String unlockLatchKey = getUnlockLatchName(requestId); // 解锁信号键(用于同步)
    String threadLockId = getLockName(threadId); // 当前线程的锁标识(如"threadId:uuid")
    long leaseTime = internalLockLeaseTime; // 锁的默认租期
    String publishCmd = getSubscribeService().getPublishCommand(); // 发布命令("PUBLISH")
    String unlockMsg = LockPubSub.UNLOCK_MESSAGE; // 解锁消息内容

    // 2. 执行核心逻辑
    return CompletableFuture.supplyAsync(() -> {
        // 2.1 检查解锁信号状态(对应Lua中"get KEYS[3]")
        String latchVal = redis.get(unlockLatchKey);
        if (latchVal != null) {
            return Long.parseLong(latchVal) > 0; // 若有信号,返回信号值
        }

        // 2.2 验证是否是当前线程持有锁(防止误删)
        boolean hasLock = redis.hexists(lockKey, threadLockId);
        if (!hasLock) {
            return null; // 未持有锁,解锁失败
        }

        // 2.3 重入计数器减1(对应Lua中"hincrby -1")
        long counter = redis.hincrBy(lockKey, threadLockId, -1);

        if (counter > 0) {
            // 2.4 部分释放(仍有重入):更新过期时间,标记部分释放
            redis.pexpire(lockKey, leaseTime);
            redis.set(unlockLatchKey, "0", "PX", timeout); // 标记"部分释放"
            return false; // 对应原逻辑返回0
        } else {
            // 2.5 完全释放:删除锁,发布解锁消息
            redis.del(lockKey); // 删除锁
            redis.executeCommand(publishCmd, channelKey, unlockMsg); // 通知等待线程
            redis.set(unlockLatchKey, "1", "PX", timeout); // 标记"完全释放"
            return true; // 对应原逻辑返回1
        }
    });
}

这里可以看到, 如果hash中对应的counter > 0, 那么就是-1, 实际上就是实现了可重入锁

因此可以回到我们一开始加锁的lua脚本上,如果当前hash的hKey存在,那么就在原来的基础上加1

解锁的时候也是, 只是进行-1, 直到为0 ,才代表完全释放锁

这里同时也解决了我们上面留下的一个问题, 就是我们自己写的代码,在释放锁的时候不是原子的

那么将释放锁的逻辑放到lua脚本之后, 就能作为一个原子操作,自然不会出现上面的问题!

因此,当锁释放后,会释放出一个信号, 通知等待的线程, 当前锁释放了,可以去尝试获取锁了

那么接收到消息的线程会执行什么逻辑呢?

我们来看看上面发布订阅类的onMessage方法

至此,形成了一个完美的闭环!!


网站公告

今日签到

点亮在社区的每一天
去签到