文章目录
1. 为什么使用分布式锁
拿实际开发商品库存的例子来说,我们在开发中进行删减库存可能会写出这样的代码:
但是这样无疑会出现并发问题
如图所示,两个请求进行删减库存, 最后数据库存储的数据却只删减了一次
解决这个问题的直接方法就是加锁
但是加什么锁合适? 直接使用Java的synchronized
行不行? 就像这样
@Service
public class ProductService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
public void deductStock(String productId) {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock:" + productId));
synchronized (this) {
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock:" + productId, realStock + "");
System.out.println("扣除成功,剩余库存:" + realStock);
}else {
System.out.println("扣除失败,库存不足");
}
}
}
}
在单机环境下确实能解决多线程并发问题
但是现在的互联网架构大多都是集群架构,也就是不止一台机器,而synchronized
只在单个JVM进行内有效,多台服务器部署时无法跨进程同步.
同时,由于针对的是this
加锁,那么一旦别的线程删减库存的商品id与获取锁的线程不一致,也会进行阻塞,但是这是没必要的,我们只需要针对一个商品id的操作进行加锁即可
因此我们需要引入分布式锁,本质上就是使用一个公共服务器来记录加锁状态
这个公共的服务器可以是Redis,也可以是其他组件(如Mysql或者ZooKeeper等) 还可以是我们自己写的一个服务
2. 简单实现分布式锁
redis
中提供了setnx
命令, 当且进仅当key不存在的时候,将key的值设置为value, 如果key已经存在,不做任何操作
与我们锁实现的功能基本一致
因此我们可以使用这个命令来针对某个productId
进行加锁
public String deductStock(String productId) {
String lockKey = "lock:product:" + productId;
boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"lock");
if(!result) {
return "waiting";
}
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock:" + productId));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock:" + productId, realStock + "");
System.out.println("扣除成功,剩余库存:" + realStock);
}else {
System.out.println("扣除失败,库存不足");
}
stringRedisTemplate.delete(lockKey);
return "success";
}
但是这样的做法会出现死锁
问题
最简单直接的现象就是,如果某个线程获取锁后,执行删减库存的过程中,出现了异常, 那么锁将无法释放
因此我们需要将释放锁的步骤放在finally
中,确保锁能释放
public String deductStock(String productId) {
String lockKey = "lock:product:" + productId;
boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"lock");
if(!result) {
return "waiting";
}
try {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock:" + productId));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock:" + productId, realStock + "");
System.out.println("扣除成功,剩余库存:" + realStock);
}else {
System.out.println("扣除失败,库存不足");
}
}finally {
stringRedisTemplate.delete(lockKey);
}
return "success";
}
但是,如果在执行删减的过程中,服务器突然宕机了呢?? finally
也不能确保执行到
因此为了保险,我们需要加上过期时间,并且需要通过原子命令进行,确保加锁的同时设置过期时间
3. 防止误删的分布式锁
我们上面实现的加上过期时间的分布式锁,实际上还会出现误删的问题,如下:
由于线程1设置的锁提前过期,导致线程2可以占据锁,但是线程1业务执行完后, 又将锁给删除了,但是实际上线程1删除的锁是线程2后来设置的锁, 导致线程2设置的锁直接失效了
此时别的线程又可以占据锁, 而线程2执行完业务逻辑后又将锁给删除了, 由此反复,可能导致锁一直不生效
解决这个方法的核心就是,确保当前线程删除的锁是自己设置的锁, 不能释放别的线程设置的锁
我们可以使用UUID作为唯一标识
能不能使用ThreadId作为唯一标识?? 当然不行,最简单的问题就是, 分布式环境下,不同机器的多个线程id之间可能是重复的
但是很容易发现,释放锁的步骤,不是原子的
这种不是原子的操作,在极端的场景下很可能出现问题
我们将删除锁的步骤单独拿出来说
但是在极端情况下:
也就是由于获取锁对应的value进行判断, 和删除锁, 由于不是原子操作,导致中间如果出现其他情况(如卡顿) , 而此时锁更好过期, 别的线程又刚好获取到锁, 此时线程1就把别的线程的锁释放掉了
究其原因就是 释放锁的步骤不是原子的
但是redis又没有原子的命令能够保证这两个步骤是原子的
实际上通过lua脚本就可以实现(文章后面会讲)
4. 自动锁续命的分布式锁
我们上面的误删问题,本质上还是因为在业务逻辑还没执行完之前, 锁就释放了
那如果我们可以让业务逻辑执行完之前,不断让锁进行续命, 也就是增加超时时间, 不让锁提前过期
那如果直接增加超时时间,如从10s提升到30s? 实际上这样不能从根本上解决问题,因为业务执行的时间是不确定的
因此我们可以在执行业务逻辑的同时,通过后台线程定时去判断锁是否还存在(这个定时时间一定要小于超时时间)
如果还存在,说明业务逻辑还没执行完, 那么就要进行续命
这就是看门狗机制(Watchdog)的思想
会不会出现业务机器宕机,导致一直续命?? 实际上续命的线程和业务机器是同一台,因此当业务机器宕机了, 看门狗自然也会消失
我们当然可以自己实现Watchdog解决方案,但是市面上已经存在很多成熟的开源方案
Redisson就是一个热门完善的实现方案,是一个基于Java的Redis客户端,专门用于分布式场景的Redis操作,如分布式对象,分布式集合,分布式锁等
引入依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.50.0</version>
</dependency>
现在我们通过Redisson来使用分布式锁
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedissonClient redissonClient;
public String deductStock(String productId) {
String lockKey = "lock:product:" + productId;
RLock lock = redissonClient.getLock(lockKey);
lock.lock();
try {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock:" + productId));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock:" + productId, realStock + "");
System.out.println("扣除成功,剩余库存:" + realStock);
}else {
System.out.println("扣除失败,库存不足");
}
}finally {
lock.unlock();
}
return "success";
}
5. 深度理解Redisson底层逻辑
redisson一个简单的lock,帮我们实现了阻塞等待, 看门狗续命,防误删等操作
我们先来看他的大概逻辑
5.1. 大概逻辑
5.2. 获取锁的实现
接下来我们看源码,直接查看Redisson中lock的实现方法
可以看到,如果我们不传参数leaseTime
那么默认就是-1
第二个lock方法再次调用了重载的lock方法
最核心的方法就是这里的tryAcquire
方法
这里传进的waiting
是-1,threadId
是当前线程的id
接着一层层往下走,就会找到这个方法
由于我们的leaseTime是-1,那么我们执行else方法的逻辑
可以看到,与if分治不同的地方在于,leastTime被替换成了别的变量
一层一层往上找,就能发现这个变量的值默认为30秒
所以我们接下来的重心就在与tryLockInnerAsync
方法
这些命令实际上就是我们上面提了一嘴的lua脚本
Lua 的语法类似于 JS, 是一个动态弱类型的语言. Lua 的解释器一般使用 C 语言实现. Lua 语法
简单精炼, 执行速度快, 解释器也比较轻量(Lua 解释器的可执行程序体积只有 200KB 左右).
因此 Lua 经常作为其他程序内部嵌入的脚本语言. Redis 本身就支持 Lua 作为内嵌脚本.
最重要的是,一个 lua 脚本会被 Redis 服务器以原子的方式来执行.
我们先来看参数的对应关系
实际上我们逻辑就类似于
public Long execute(long expireTime,String key,String hKey) {
if (!redisTemplate.hasKey(hKey) || redisTemplate.opsForHash().hasKey(key, hKey)) {
redisTemplate.opsForHash().increment(key, hKey, 1);
redisTemplate.expire(key, expireTime, TimeUnit.MILLISECONDS);
return null;
}else {
return redisTemplate.getExpire(key);
}
}
但是通过Lua脚本实现就是原子的
这里的key就是源代码里的getRawName
方法的返回结果,实际上也就对应着我们一开始传进来的key,也就是我们的锁
而ARG[2],也就是我们自己实现的hKey,是getLockName
方法的返回结果,实际上就是一个随机值
我们先可以简单理解, 这个脚本就是尝试去加锁,只是加的锁是通过hash结构存储的(至于为什么后面会讲)
这个锁的超时时间是30s,如果我们自己指定那就是我们指定的值
这里要注意,如果我们获取锁成功,那么返回值就是null, 如果获取失败,那么返回值就是这个锁的剩余时间
5.3. 看门狗的实现
我们回到这个方法调用的地方
通过方法名也可以看出,这个方法实际上是异步执行的
当异步执行完后,会执行这一部分区域的内容
我们上面说过,如果获取锁成功, 返回值就是null
因此我们会进入第一个if分支,又由于leaseTime
是-1, 因此会执行scheduleExpirationRenewal
方法
从方法名可以猜出, 这个方法就是重置超时时间, 实际上也就对应着看门狗的逻辑!
这里可以看出,看门狗的定时任务逻辑就封装在LockTask中
并且这里的add相当于开启了一个定时任务
我们来看看这个schedule
方法
此时会通过<font style="color:rgba(0, 0, 0, 0.85);">schedule</font>
方法设置第一个定时任务,间隔为<font style="color:rgba(0, 0, 0, 0.85);">internalLockLeaseTime / 3</font>
,如果是默认值,也就是10s
定时任务的<font style="color:rgba(0, 0, 0, 0.85);">run</font>
方法(<font style="color:rgba(0, 0, 0, 0.85);">TimerTask</font>
接口的核心方法)会在定时时间到达后被执行,执行完成后会再次调用<font style="color:rgba(0, 0, 0, 0.85);">schedule</font>
方法,形成循环:
这是定时任务的执行,那么定式任务具体做了什么呢??? 我们要回到添加定时任务前,也就是下面这个方法
定时任务的逻辑就封装在这里面
也就是这个方法
我们来看看下面的Lua脚本
简单说,这个循环的目的是遍历所有传入脚本的键(KEYS 数组中的每个键),对每个键执行后续的<font style="color:rgba(0, 0, 0, 0.85);">hexists</font>
和<font style="color:rgba(0, 0, 0, 0.85);">pexpire</font>
操作。也就是实现了批量key的续命
到此,Watchdog的逻辑我们大概就能理解了
5.4. 等待锁的实现
回到一开始的地方,如果我们的线程没有抢占到锁呢??
我们上面讲过会得到这会锁剩余的超时时间,那么接下来会执行什么逻辑??
上面我们讲过,如果ttl是null,那么就说明获取锁成功
因此如果我们获取锁失败,执行的是下面的逻辑
我们来看最主要的while循环
这里实际上就得到了一个信号量
Semaphore维护了一个计数器,线程可以通过调用acquire()方法来获取Semaphore中的许可证,当计数器为0时,调用acquire()的线程将被阻塞,直到有其他线程释放许可证;线程可以通过调用release()方法来释放Semaphore中的许可证,这会使Semaphore中的计数器增加,从而允许更多的线程访问共享资源。
简单来说,如果线程没有加锁成功, 那么就会通过Semaphore进行阻塞ttl的时间,这段时间内是不会阻塞CPU的,
等到ttl到了以后,再通过while循环再次尝试获取锁,由此不断的间歇性的抢占锁
那么可能到这里就会有疑问,如果锁提前释放了呢? 也是会傻等ttl的时间吗??? 那性能也太差了. 实际上不是这样的
实际上,在并发编程里,如果这里阻塞住,一定会有其他地方会在一定条件下唤醒这个线程
实际上就在我们上面的逻辑中
实际上这里就是使用了redis的发布订阅的功能
简单来说,这些没抢到锁的线程, 会在这里订阅了一个频道,相当于进行了监听
这个类实际上就是对发布订阅的封装
那这个频道什么时候会通知这个线程呢?? 可想而知肯定是在解锁的逻辑中
我们通过伪代码来解释
/**
* 分布式锁解锁核心逻辑(伪代码)
* @param threadId 当前线程ID
* @param requestId 本次请求标识(用于生成解锁信号键)
* @param timeout 超时时间(毫秒)
* @return 异步结果:1=完全解锁,0=部分解锁,null=解锁失败
*/
protected CompletableFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {
// 1. 定义关键参数
String lockKey = getRawName(); // 锁的主键(哈希表)
String channelKey = getChannelName(); // 通知频道(用于发布解锁消息)
String unlockLatchKey = getUnlockLatchName(requestId); // 解锁信号键(用于同步)
String threadLockId = getLockName(threadId); // 当前线程的锁标识(如"threadId:uuid")
long leaseTime = internalLockLeaseTime; // 锁的默认租期
String publishCmd = getSubscribeService().getPublishCommand(); // 发布命令("PUBLISH")
String unlockMsg = LockPubSub.UNLOCK_MESSAGE; // 解锁消息内容
// 2. 执行核心逻辑
return CompletableFuture.supplyAsync(() -> {
// 2.1 检查解锁信号状态(对应Lua中"get KEYS[3]")
String latchVal = redis.get(unlockLatchKey);
if (latchVal != null) {
return Long.parseLong(latchVal) > 0; // 若有信号,返回信号值
}
// 2.2 验证是否是当前线程持有锁(防止误删)
boolean hasLock = redis.hexists(lockKey, threadLockId);
if (!hasLock) {
return null; // 未持有锁,解锁失败
}
// 2.3 重入计数器减1(对应Lua中"hincrby -1")
long counter = redis.hincrBy(lockKey, threadLockId, -1);
if (counter > 0) {
// 2.4 部分释放(仍有重入):更新过期时间,标记部分释放
redis.pexpire(lockKey, leaseTime);
redis.set(unlockLatchKey, "0", "PX", timeout); // 标记"部分释放"
return false; // 对应原逻辑返回0
} else {
// 2.5 完全释放:删除锁,发布解锁消息
redis.del(lockKey); // 删除锁
redis.executeCommand(publishCmd, channelKey, unlockMsg); // 通知等待线程
redis.set(unlockLatchKey, "1", "PX", timeout); // 标记"完全释放"
return true; // 对应原逻辑返回1
}
});
}
这里可以看到, 如果hash中对应的counter > 0, 那么就是-1, 实际上就是实现了可重入锁
因此可以回到我们一开始加锁的lua脚本上,如果当前hash的hKey存在,那么就在原来的基础上加1
解锁的时候也是, 只是进行-1, 直到为0 ,才代表完全释放锁
这里同时也解决了我们上面留下的一个问题, 就是我们自己写的代码,在释放锁的时候不是原子的
那么将释放锁的逻辑放到lua脚本之后, 就能作为一个原子操作,自然不会出现上面的问题!
因此,当锁释放后,会释放出一个信号, 通知等待的线程, 当前锁释放了,可以去尝试获取锁了
那么接收到消息的线程会执行什么逻辑呢?
我们来看看上面发布订阅类的onMessage方法
至此,形成了一个完美的闭环!!