1 “超卖”问题
1.1 问题引出
来看下面这段代码:
- 检查库存 -> 扣减库存 -> 写入库存;
stock
是 Redis 中的一个 Key;
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock") if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value) System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); }
在高并发环境下,多个请求(可能来自不同的服务器或同一服务器的不同线程)会交叉执行这三个步骤,导致数据不一致;
假设初始库存
stock = 1
。现在有两个用户(User A 和 User B)几乎同时发起购买请求:时间序列 Tomcat 服务器 1 (处理 User A 的请求) Tomcat 服务器 2 (处理 User B 的请求) Redis 中的 Stock 值 导致的结果 T1 读取库存: int stock = get("stock");
// 读到 11
T2 读取库存: int stock = get("stock");
// 也读到 11
致命问题: 两个请求都认为库存充足,都准备进行扣减。 T3 计算新库存: realStock = 1 - 1 = 0
1
T4 计算新库存: realStock = 1 - 1 = 0
1
T5 写入库存: set("stock", "0")
0
User A 的请求成功扣减,库存变为 0。 T6 写入库存: set("stock", "0")
0
超卖发生! User B 的请求覆盖了 User A 的写入操作。库存最终为 0,但实际发生了两次扣减,商品多卖了一份。 T7 系统输出: 扣减成功,剩余库存:0
0
T8 系统输出: 扣减成功,剩余库存:0
0
两个用户都收到了“成功”的提示,但库存只够一个人买。
1.2 单机环境下解决
如果是在单机环境下,通过
synchronized
关键字构建一个代码块就可以解决:synchronized (this) { int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock") if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value) System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); } }
1.3 分布式环境下解决
1.3.1 基本解决
可以用
SETNX
命令来解决,其是一个用于设置键值对的命令,它是 “SET if Not eXists”(如果不存在,则 SET)的缩写;- 语法格式:
setnx key value
,其中key
是要设置的键,value
是对应的值; - 执行逻辑:当执行
SETNX
命令时,Redis 会检查指定的key
是否已经存在于数据库中;- 如果该
key
不存在,那么就将key
的值设置为value
,并且该命令返回1
,表示设置成功; - 如果
key
已经存在,那么SETNX
不会对已有的key-value
对做任何修改,并且返回0
,表示设置失败;
- 如果该
@Autowired private StringRedisTemplate stringRedisTemplate; // 商品ID,可以作为分布式锁的 Key String lockKey = "lock:product_101"; Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "shisan"); if (!result) { return "获取锁失败"; } // 业务逻辑 int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock") if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value) System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); } //解锁 stringRedisTemplate.delete(lockKey);
- 语法格式:
1.3.2 异常解决
如果在执行业务逻辑时抛出了异常,最后没有执行到解锁的代码,就会造成死锁,可以用
try-finally
解决:@Autowired private StringRedisTemplate stringRedisTemplate; // 商品ID,可以作为分布式锁的 Key String lockKey = "lock:product_101"; Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "shisan"); if (!result) { return "获取锁失败"; } // 业务逻辑 try{ int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock") if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value) System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); } } finally { //解锁 stringRedisTemplate.delete(lockKey); }
1.3.3 宕机解决
如果在执行业务逻辑时系统宕机,即使将解锁的代码放在了
finally
中,但系统宕机导致该解锁代码也执行不到,一样也会造成死锁,此时可以给锁加一个过期时间:@Autowired private StringRedisTemplate stringRedisTemplate; // 商品ID,可以作为分布式锁的 Key String lockKey = "lock:product_101"; Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "shisan"); stringRedisTemplate.expire(lockKey, 30, TimeUnit.SECONDS); if (!result) { return "获取锁失败"; } // 业务逻辑 try{ int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock") if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value) System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); } } finally { //解锁 stringRedisTemplate.delete(lockKey); }
1.3.4 “加锁+设置过期时间”的原子性解决
如果在执行完加锁逻辑后系统就发生了宕机,此时还没有执行给锁加过期时间的代码,一样也会出现没有解锁而导致死锁的场景,这是因为这两行代码不是原子性的,可以用一行代码来一次性做完这两个“加锁+设置过期时间”的操作,这样就保证了原子性:
@Autowired private StringRedisTemplate stringRedisTemplate; // 商品ID,可以作为分布式锁的 Key String lockKey = "lock:product_101"; //Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "shisan"); //stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS); Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, shisan, 30, TimeUnit.SECONDS); if (!result) { return "获取锁失败"; } // 业务逻辑 try{ int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock") if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value) System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); } } finally { //解锁 stringRedisTemplate.delete(lockKey); }
1.3.5 持锁时间过长解决
在高并发环境下,如果某个持有锁的线程执行时间过长(比如那台机器执行较慢),可能会引发一种严重的锁交叉释放问题:
时间线 线程1 (T1) 线程2 (T2) 线程3 (T3) Redis 中锁的状态 ( lock:product_101
)造成的结果与问题 T0 获取锁成功:执行 setIfAbsent(lockKey, "T1-val", 30s)
锁值: “T1-val”值: “T1-val” 剩余TTL: 30s T1 开始执行业务逻辑 T+10s 仍在执行业务逻辑… 值: “T1-val” 剩余TTL: 20s T+25s 仍在执行业务逻辑…(执行缓慢) 值: “T1-val” 剩余TTL: 5s T+30s 仍在执行业务逻辑…(锁已过期,但T1不知情) 获取锁成功 : setIfAbsent(..., "T2-val", 30s)
值: “T2-val” 剩余TTL: 30s(新锁) 锁的互斥性被破坏! T1 和 T2 现在同时进入了临界区代码,可能导致超卖等数据不一致问题 T+32s 开始执行业务逻辑… 值: “T2-val” 剩余TTL: 28s T+35s 终于执行完业务逻辑 进入 finally
块 执行delete(lockKey)
仍在执行业务逻辑… 值: null(线程2的锁被线程1删除) 致命错误:T1 误删了 T2 的锁! T2 失去了保护,其他线程可以获取锁 T+36s 获取锁成功 : setIfAbsent(..., "T3-val", 30s)
值: “T3-val” 剩余TTL: 30s(新锁) T3 开始执行,此时 T2 和 T3 同时在执行临界区代码,问题加剧 T+37s 执行完业务逻辑 进入 finally
块 执行delete(lockKey)
正在执行业务逻辑… 值: null(线程3的锁被线程2删除) 连锁反应:T2 又误删了 T3 的锁! 系统陷入“获取锁 -> 被前一个线程误删 -> 下一个线程获取锁”的恶性循环,分布式锁完全失效 根本原因:自己的锁被别的线程释放掉了,所以可以将每一个锁的 value 设置成一个“身份ID”,在解锁的时候判断一下当前解锁的线程和“身份ID”是否一致,一致才允许解锁:
@Autowired private StringRedisTemplate stringRedisTemplate; // 商品ID,可以作为分布式锁的 Key String lockKey = "lock:product_101"; String clientId = UUID.randomUUID().toString(); Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS); if (!result) { return "获取锁失败"; } // 业务逻辑 try{ int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock") if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value) System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); } } finally { // 谁加锁就只允许谁解锁 if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) { stringRedisTemplate.delete(lockKey); } }
1.3.6 锁续命(看门狗机制)
假设有这么一个场景:
- 线程1执行完
if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey)))
这行验证身份的代码,机器卡顿了一下; - 在卡顿的过程中,锁到了过期时间被自动释放掉了,线程2就获取到了锁且正在执行业务代码;
- 此时线程1的机器从卡顿中恢复,执行释放锁的代码,此时释放的却是线程2的锁,
1.3.5 持锁时间过长解决
的问题又再现了;
- 线程1执行完
根本原因:验证身份的代码和释放锁的代码,没有保证原子性,可以用锁续命来解决:
锁续命,也称为 看门狗机制,是一种在分布式锁场景下,由客户端主动且定期地延长其所持有锁的过期时间的策略。
- 它的核心目的是:防止因为业务执行时间超过锁的初始过期时间而导致的锁提前失效问题,从而确保业务逻辑在执行期间能一直持有锁,维持互斥性;
- 锁续命机制通常由一个独立的守护线程(或定时任务)来实现,其工作流程如下:
- 成功获取锁:业务线程成功获取分布式锁,并设置一个初始的过期时间(例如 10 秒);
- 启动看门狗:同时,客户端会启动一个守护线程(看门狗),这个线程会定期(例如,在过期时间的三分之一时,即 3 秒后)去检查业务线程是否还持有这把锁;
- 定期续期:
- 如果锁还存在且仍是当前线程持有:守护线程就执行
EXPIRE
命令,将锁的过期时间重新设置为初始值(例如,再续上 10 秒); - 如果锁不存在或已被其他线程持有:说明业务已经执行完毕释放了锁,或者锁已经被抢占,则守护线程停止续期;
- 如果锁还存在且仍是当前线程持有:守护线程就执行
- 业务完成,停止续期:当业务线程执行完毕,显式释放锁后,它会通知守护线程停止续期工作。
在实际开发中,我们通常不会自己实现完整的锁续命机制,而是使用成熟的框架,例如 Redisson;
- 官网:Redisson | Valkey & Redis Java client. Ultimate Real-Time Data Platform;
- GitHub:GitHub - redisson/redisson: Redisson - Valkey & Redis Java client. Real-Time Data Platform. Sync/Async/RxJava/Reactive API. Over 50 Valkey and Redis based Java objects and services: Set, Multimap, SortedSet, Map, List, Queue, Deque, Semaphore, Lock, AtomicLong, Map Reduce, Bloom filter, Spring, Tomcat, Scheduler, JCache API, Hibernate, RPC, local cache…。
2 Redisson 框架
2.1 锁续命实现
引入依赖:
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.6.5</version> </dependency>
注入 Bean:
@Bean public Redisson redisson() { // 此为单机模式 Config config = new Config(); config.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0); return (Redisson) Redisson.create(config); }
使用:
@Autowired private Redisson redisson; @Autowired private StringRedisTemplate stringRedisTemplate; // 商品ID,可以作为分布式锁的 Key String lockKey = "lock:product_101"; //获取锁对象 RLock redissonLock = redisson.getLock(lockKey); //加分布式锁 redissonLock.lock(); // 业务逻辑 try{ int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock") if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value) System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); } } finally { //解锁 redissonLock.unlock(); }
2.2 Redisson 分布式锁原理
线程 1 加锁流程
- 线程 1 通过 Redisson 发起
lock
(加锁)请求; - 若加锁成功,Redisson 会启动一个后台线程:每隔 10 秒检查线程 1 是否仍持有锁。如果仍持有锁,就延长锁的过期时间(即“锁续命”,保证业务执行期间锁一直有效);
- 加锁成功后,线程 1 执行相关业务逻辑;
- 业务逻辑执行完后,通过
unlock
(释放锁)操作,将锁归还给 Redis(Master 节点),再由 Master 同步给 Slave 节点,保证集群数据一致;
- 线程 1 通过 Redisson 发起
线程 2 竞争锁流程
线程 2 也通过 Redisson 发起
lock
请求,尝试加锁;加锁成功?:
- 若成功,后续流程与“线程 1 加锁”一致(启动后台续期线程、执行业务、释放锁);
- 若失败(说明锁已被线程 1 持有),线程 2 会进入
while
循环(自旋):间歇性地再次尝试加锁,直到成功获取锁。
2.3 RedissonLock.class
源码解析
- 通过上面代码的
redissonLock.lock();
,即可追溯到;
2.3.1 加锁逻辑
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime); // 锁的过期时间
return this.commandExecutor.evalWriteAsync(
this.getName(), // KEYS[1]:锁的key
LongCodec.INSTANCE,
command,
// Lua 脚本开始
// 锁不存在时,直接加锁
"if (redis.call('exists', KEYS[1]) == 0) then " + // 检查锁 Key 是否存在。如果不存在(值为0),说明当前没有线程持有此锁
"redis.call('hset', KEYS[1], ARGV[2], 1); " + // 使用Hash结构存储锁信息。Field=ARGV[2](线程唯一ID),Value=1(重入次数)
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 为整个锁 Key 设置毫秒级的过期时间,防止死锁
"return nil; " + // 返回 nil 表示加锁成功
"end; " +
// 锁已存在,且是当前线程持有(重入锁)
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 检查锁Key的Hash中,是否存在当前线程的Field。如果存在,说明是重入场景
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 将当前线程对应的重入次数 +1。例如,从 1 变成 2
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 重置锁的过期时间。这是实现看门狗自动续期的基础。每次重入都会刷新过期时间
"return nil; " + // 返回 nil 表示重入加锁成功
"end; " +
// 锁已存在,且被其他线程持有
"return redis.call('pttl', KEYS[1]);", // 如果前两个条件都不满足,说明锁存在且被其他线程持有。此时返回这个锁的剩余存活时间(毫秒)
// Lua 脚本结束
Collections.singletonList(this.getName()), // KEYS 数组:只有一个元素,就是锁的key
new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)} // ARGV 数组:两个参数
);
}
参数 | 说明 |
---|---|
KEYS[1] | 锁在 Redis 中的 Key 名称 |
ARGV[1] | 锁的租约时间(毫秒),即过期时间 |
ARGV[2] | 锁的唯一标识符,格式为 UUID:threadId |
this.internalLockLeaseTime
是 Redisson 设置的锁的过期时间:其是
RedissonLock.class
中的一个属性:protected long internalLockLeaseTime;
由 Redisson 的构造方法赋值:
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.id = commandExecutor.getConnectionManager().getId(); this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); }
再根据
getLockWatchdogTimeout()
可以追溯到其默认值为 30000 毫秒,即30秒,对应源码在Config.class
中:public Config() { this.transportMode = TransportMode.NIO; this.lockWatchdogTimeout = 30000L; this.keepPubSubOrder = true; this.addressResolverGroupFactory = new DnsAddressResolverGroupFactory(); }
在注入 Redisson 的 Bean 时,可以修改锁的过期时间:
config.setLockWatchdogTimeout(10000);
this.getLockName(threadId)
是锁信息(相当于1.3.5 持锁时间过长解决
中的clientId
),被上面的 Lua 脚本使用 Hash 结构存储到 Redis 中,其值为:String getLockName(long threadId) { return this.id + ":" + threadId; }
final UUID id;
2.3.2 续命(看门狗)逻辑
当成功加锁后:
- 会返回 Lua 脚本的
nil
,其相当于 Java 的null
; - 即下面代码中
Boolean ttlRemaining = (Boolean)future.getNow();
,ttlRemaining
的值就为true
; - 就会执行
RedissonLock.this.scheduleExpirationRenewal(threadId);
,即执行续命逻辑;
/** * 尝试获取一次锁的异步方法 * @param leaseTime 指定的锁持有时间。-1L 表示使用看门狗机制 * @param unit 时间单位 * @param threadId 当前线程ID * @return RFuture<Boolean> 异步结果,true表示获取成功,false表示失败 */ private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) { // 分支一:如果调用方明确指定了锁的租约时间(leaseTime != -1L) if (leaseTime != -1L) { // 直接使用用户指定的租约时间获取锁,不启动看门狗机制 // 锁会在指定时间后自动过期,即使业务未执行完 return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN); } // 分支二:如果调用方未指定租约时间(leaseTime == -1L),使用看门狗机制 else { // 使用看门狗默认超时时间(默认30秒)作为初始租约时间尝试加锁 RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync( this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), // 获取配置的看门狗超时时间 TimeUnit.MILLISECONDS, // 时间单位为毫秒 threadId, // 线程ID RedisCommands.EVAL_NULL_BOOLEAN // Redis命令类型 ); // 为加锁操作添加监听器,用于在加锁完成后处理看门狗的启动 ttlRemainingFuture.addListener(new FutureListener<Boolean>() { public void operationComplete(Future<Boolean> future) throws Exception { // 只有当加锁操作成功完成时(无网络异常等错误) if (future.isSuccess()) { // 获取加锁结果:true-成功,false-失败(锁已被占用) Boolean ttlRemaining = (Boolean)future.getNow(); // 如果加锁成功,启动看门狗续期机制 if (ttlRemaining) { // 调度过期时间续期任务:启动看门狗线程定期续期 RedissonLock.this.scheduleExpirationRenewal(threadId); } // 如果加锁失败(ttlRemaining为false),什么都不做,锁已被其他线程持有 } // 如果加锁操作本身失败(如网络异常),也不启动看门狗 } }); // 返回异步结果Future return ttlRemainingFuture; } }
- 会返回 Lua 脚本的
接下来看一下
scheduleExpirationRenewal
方法的源码:/** * 调度锁过期时间续期任务(即启动看门狗机制) * @param threadId 当前线程ID */ private void scheduleExpirationRenewal(final long threadId) { // 使用expirationRenewalMap来确保同一个锁只启动一个看门狗任务,避免重复调度 // this.getEntryName() 获取锁的唯一标识,用于作为Map的Key if (!expirationRenewalMap.containsKey(this.getEntryName())) { // 创建一个定时任务(Timeout),这就是看门狗的核心 Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() { public void run(Timeout timeout) throws Exception { // 1. 执行Lua脚本进行续期操作 RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync( RedissonLock.this.getName(), // KEYS[1]: 锁的Key LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // Lua脚本:检查锁是否仍被当前线程持有,如果是则续期 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + // 续期操作 "return 1; " + // 返回1表示续期成功 "end; " + "return 0;", // 返回0表示续期失败(锁不存在或不属于当前线程) Collections.singletonList(RedissonLock.this.getName()), // KEYS数组 new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)} // ARGV参数:续期时间和线程标识 ); // 2. 为续期操作添加监听器,处理续期结果 future.addListener(new FutureListener<Boolean>() { public void operationComplete(Future<Boolean> future) throws Exception { // 无论续期成功与否,都先从Map中移除当前任务记录 RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName()); // 如果续期操作本身失败(如网络异常) if (!future.isSuccess()) { RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause()); } else { // 获取续期结果:true(1)-成功,false(0)-失败 if ((Boolean)future.getNow()) { // 3. 续期成功:递归调用自己,安排下一次续期任务 RedissonLock.this.scheduleExpirationRenewal(threadId); } // 如果续期失败(返回false),说明锁已被释放或不属于当前线程,不再续期 } } }); } }, // 定时任务的延迟执行时间:租约时间的1/3 // 例如:internalLockLeaseTime=30000ms,则延迟10000ms(10秒)后执行第一次续期 this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); // 使用putIfAbsent原子性地将任务放入Map,避免并发问题 if (expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) { // 如果Map中已存在该任务(其他线程先放入),则取消当前创建的任务,避免重复 task.cancel(); } } // 如果Map中已存在该任务,说明看门狗已经启动,直接返回 }
2.3.3 其它线程自旋获取锁的逻辑
/**
* 可中断的获取锁方法(支持等待且可被中断)
* @param leaseTime 锁的租约时间
* @param unit 时间单位
* @throws InterruptedException 如果等待过程中线程被中断
*/
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 获取当前线程ID
long threadId = Thread.currentThread().getId();
// 1. 首先尝试直接获取锁
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
// 如果ttl为null,表示获取锁成功,直接返回(如果不为空,返回的ttl是锁剩余的过期时间)
if (ttl != null) {
// 2. 获取锁失败,订阅锁释放消息通道
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
this.commandExecutor.syncSubscription(future); // 等待订阅完成
try {
// 3. 进入循环等待状态
while(true) {
// 再次尝试获取锁
ttl = this.tryAcquire(leaseTime, unit, threadId);
// 如果获取成功(ttl为null),跳出循环
if (ttl == null) {
return;
}
// 4. 根据剩余时间采取不同的等待策略
if (ttl >= 0L) {
// ttl >= 0: 锁被占用但有明确的剩余时间
// 使用Semaphore尝试在指定时间内等待,如果超时或中断则抛出异常
this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
// ttl < 0: 锁被占用但无过期时间(理论上不应该发生)
// 无限期等待,直到被通知或中断
this.getEntry(threadId).getLatch().acquire();
}
// 循环继续,再次尝试获取锁
}
} finally {
// 5. 最终清理:取消订阅,释放资源
this.unsubscribe(future, threadId);
}
}
// 如果第一次就获取成功,直接退出方法
}
2.3.4 解锁逻辑
/**
* 释放锁的核心异步方法(执行Lua脚本)
* @param threadId 当前线程ID
* @return RFuture<Boolean> 异步结果:true-完全释放,false-重入计数减1,null-释放失败
*/
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(
this.getName(), // 锁的Key
LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
// Lua 脚本开始
"if (redis.call('exists', KEYS[1]) == 0) then " + // 锁 Key 已经不存在(可能已过期)
"redis.call('publish', KEYS[2], ARGV[1]); " + // 仍然发布解锁消息(通知可能还在等待的线程)
"return 1; " + // 返回1表示释放完成
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // 当前线程(ARGV[3])不持有该锁
"return nil; " + // 返回nil表示当前线程不持有锁
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 将当前线程的重入次数减1
"if (counter > 0) then " + // 重入次数仍大于0(锁仍被当前线程持有)
"redis.call('pexpire', KEYS[1], ARGV[2]); " + // 更新锁的过期时间(续期)
"return 0; " + // 返回0表示重入次数减少但锁仍被持有(即还未释放)
"else " + // 重入次数减到0
"redis.call('del', KEYS[1]); " + // 删除锁 Key
"redis.call('publish', KEYS[2], ARGV[1]); " + // 发布解锁消息,通知等待线程
"return 1; " + // 返回1表示锁被完全释放
"end; " +
"return nil;", // 理论上不会执行到这行
// Lua 脚本结束
// KEYS 数组:两个Key
Arrays.asList(
this.getName(), // KEYS[1]: 锁的Key
this.getChannelName() // KEYS[2]: 发布订阅的频道名,用于通知其他等待线程
),
// ARGV 数组:三个参数
new Object[]{
LockPubSub.unlockMessage, // ARGV[1]: 发布的消息内容(通常是数字1)
this.internalLockLeaseTime, // ARGV[2]: 锁的租约时间(用于续期)
this.getLockName(threadId) // ARGV[3]: 当前线程的唯一标识(UUID:threadId)
}
);
}
2.3.5 其它线程被唤醒获取锁的逻辑
在
2.3.3 其它线程自旋获取锁的逻辑
中,获取锁失败的线程会执行RFuture<RedissonLockEntry> future = this.subscribe(threadId);
去订阅锁释放消息通道;进入到
subscribe
方法中:protected RFuture<RedissonLockEntry> subscribe(long threadId) { return PUBSUB.subscribe( this.getEntryName(), // 锁的唯一标识,作为订阅的入口名 this.getChannelName(), // Redis的发布订阅频道名称 this.commandExecutor.getConnectionManager().getSubscribeService() // 订阅服务 ); }
PUBSUB
是 Redisson 内部的一个静态工具类,它专门负责管理所有基于 Redis 发布订阅(Pub/Sub)功能的连接和订阅关系;protected static final LockPubSub PUBSUB = new LockPubSub();
Ctrl
+鼠标左键点击上面LockPubSub
,可以追溯到LockPubSub.class
中,其中有一个onMessage
方法:/** * 收到锁释放消息时的回调处理方法 * @param value RedissonLockEntry对象,包含等待线程的信息 * @param message 收到的消息内容(应该是unlockMessage) */ protected void onMessage(RedissonLockEntry value, Long message) { // 1. 检查收到的消息是否为解锁消息 if (message.equals(unlockMessage)) { // 2. 释放信号量,唤醒一个正在等待的线程 value.getLatch().release(); // 3. 处理监听器任务(确保公平性) while(true) { Runnable runnableToExecute = null; // 使用synchronized保证对value对象的线程安全访问 synchronized(value) { // 3.1 从监听器队列中取出一个任务 Runnable runnable = (Runnable)value.getListeners().poll(); if (runnable != null) { // 3.2 尝试获取信号量许可(非阻塞方式) if (value.getLatch().tryAcquire()) { // 获取成功:将这个任务标记为待执行 runnableToExecute = runnable; } else { // 获取失败:说明其他线程已经抢到了锁,将任务重新放回队列 value.addListener(runnable); } } } // 4. 执行取出的任务或退出循环 if (runnableToExecute == null) { // 队列为空或无法获取信号量,退出循环 return; } // 5. 执行获取锁的任务(不在同步块内执行,避免阻塞) runnableToExecute.run(); } } // 如果不是解锁消息,忽略处理 }
3 Redis 主从架构下的分布式锁失效问题
3.1 问题
在 Redis 主从复制架构中:
- 主节点(Master):处理所有写操作(如加锁、释放锁);
- 从节点(Slave):异步复制主节点的数据,处理读操作;
下面通过一个时序图来展示锁失效的具体过程:
- Redis 主从复制是异步的,主节点写入成功后立即返回,数据同步到从节点有毫秒级的延迟;
- 当主节点宕机时,从节点可能尚未收到最新的锁数据,但已经被提升为新的主节点;
- 在某些网络分区情况下,可能会出现两个客户端分别向不同的节点申请同一把锁,都成功获得;
3.2 不推荐使用红锁解决
3.2.1 简介
Redlock 算法是 Redis 作者提出的解决方案,核心思想是在多个独立的 Redis 实例上同时获取锁;
算法步骤:
- 客户端获取当前时间戳 T1;
- 依次向 N 个独立的 Redis 实例申请锁(使用相同的 Key 和随机值);
- 客户端计算获取锁花费的时间(当前时间 T2 - T1);
- 只有当在大多数实例(N/2 + 1)上获取成功;
- 且总耗时小于锁的有效时间时,才认为加锁成功;
- 如果加锁失败,向所有实例发送释放锁命令;
3.2.2 使用示例
@RequestMapping("/redlock")
public String redlock() {
String lockKey = "product_001"; // 定义锁的键名
// 创建多个RLock实例(在实际生产中,每个RLock应该对应不同的Redis实例)
// 注意:这里只是伪代码,实际需要连接不同的Redis服务器实例
RLock lock1 = redisson.getLock(lockKey); // 第一个Redis实例的锁
RLock lock2 = redisson.getLock(lockKey); // 第二个Redis实例的锁
RLock lock3 = redisson.getLock(lockKey); // 第三个Redis实例的锁
/**
* 创建RedissonRedLock(红锁)对象
* 这是Redlock算法的核心实现,需要传入多个独立的RLock实例
* 通常要求这些Redis实例是相互独立的主节点,而不是主从关系
*/
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
try {
/**
* 尝试获取分布式锁(Redlock算法)
* @param waitTimeout 10 最大等待时间:客户端尝试获取锁的最长等待时间,超过则放弃
* @param leaseTime 30 锁的租约时间:获取成功后锁的自动释放时间,应大于业务执行时间
* @param unit TimeUnit.SECONDS 时间单位
* @return boolean 是否成功获取锁
*
* Redlock算法核心:必须在大多数节点(N/2 + 1)上成功获取锁,且总耗时小于租约时间
*/
boolean res = redLock.tryLock(10, 30, TimeUnit.SECONDS);
if (res) {
// 成功获得锁,在这里处理业务逻辑
// 注意:此时锁已经在多个Redis实例上被成功持有
// 可以安全地执行需要分布式锁保护的临界区代码
} else {
// 获取锁失败,可以记录日志或抛出异常
// 在实际应用中,可能需要重试或返回特定错误信息给客户端
}
} catch (Exception e) {
// 捕获获取锁过程中可能出现的异常(如网络异常、中断异常等)
throw new RuntimeException("lock fail", e); // 包装异常并抛出
} finally {
// 无论如何,最后都要尝试解锁
// RedissonRedLock会向所有参与红锁的Redis实例发送解锁命令
// 即使某些实例上的锁获取失败,也会确保清理其他实例上的锁
redLock.unlock();
}
return "end"; // 返回处理结果
}
3.2.3 不推荐使用的原因一
如果给 redis1,redis2,redis3 后面分别加上一个从节点,然后来看这么一个场景:
第一阶段:线程1成功获取锁
- 线程 1 执行 Redlock 算法,向三个主节点申请锁:
redis1 Master
:加锁成功redis2 Master
:加锁成功redis3 Master
:加锁失败(可能由于网络延迟或竞争)
- 线程1在 2/3 个节点上加锁成功(满足大多数条件),认为获取锁成功,开始执行业务逻辑;
redis2 Master
的锁数据正在异步复制到redis2 Slave
,由于 Redis 复制的异步性,redis2 Slave
可能尚未收到锁数据;
- 线程 1 执行 Redlock 算法,向三个主节点申请锁:
第二阶段:redis2 Master 宕机,故障转移
redis2 Master
突然宕机(在数据完全同步前);- 哨兵(Sentinel)或集群模式检测到主节点宕机,将
redis2 Slave
提升为新的主节点(现在称为redis2 New Master
); - 关键问题:新的
redis2 New Master
上没有线程1的锁数据,因为复制尚未完成;
第三阶段:线程2尝试获取同一把锁
- 线程 2 执行 Redlock 算法,向三个主节点申请同一把锁:
redis1 Master
:加锁失败(线程1仍持有锁)redis2 New Master
:加锁成功(新主节点上没有锁数据)redis3 Master
:加锁成功
- 线程2在 2/3 个节点上加锁成功(满足大多数条件),也认为获取锁成功;
- 线程 2 执行 Redlock 算法,向三个主节点申请同一把锁:
最终结果:锁失效
- 两个线程同时持有锁:
- 线程1:认为自己在
redis1 Master
和redis2 Master
上持有锁 - 线程2:认为自己在
redis2 New Master
和redis3 Master
上持有锁
- 线程1:认为自己在
- 数据竞争:两个线程同时进入临界区,导致数据不一致(超卖等问题);
- 两个线程同时持有锁:
时序图:
3.2.4 不推荐使用的原因二
如果没有从节点:
第一阶段:线程1成功获取锁
线程 1 执行 Redlock 算法,向三个节点申请锁:
redis1
:加锁成功redis2
:加锁成功redis3
:加锁失败
线程1在 2/3 个节点上加锁成功,认为获取锁成功,开始执行业务逻辑;
redis2
使用的是 RDB 持久化,加锁操作(SETNX)只存在于redis2
的内存中,尚未到达 RDB 持久化的时间点,数据未持久化到磁盘;RDB 的持久化方式是每隔一段时间就将 Reids 执行的命令持久化到磁盘中;
第二阶段:redis2 宕机并重启
- 在 RDB 持久化之前,
redis2
突然宕机。由于 RDB 是定时持久化,内存中的数据全部丢失; - redis2 重启后,从最近的 RDB 快照文件恢复数据,但这个快照中不包含线程 1 的锁信息;
- 关键问题:重启后的
redis2
是一个"干净"的状态,完全不知道线程 1 曾经加过锁;
- 在 RDB 持久化之前,
第三阶段:线程2尝试获取同一把锁
- 线程 2 执行 Redlock 算法,向三个节点申请同一把锁:
redis1
:加锁失败(线程1仍持有锁)redis2
:加锁成功(重启后无锁数据)redis3
:加锁成功
- 线程2在 2/3 个节点上加锁成功,也认为获取锁成功;
- 线程 2 执行 Redlock 算法,向三个节点申请同一把锁:
最终结果:锁失效
- 两个线程同时持有锁:
- 线程1:认为自己在
redis1
和redis2
上持有锁(但实际上redis2
的锁已丢失) - 线程2:认为自己在
redis2
和redis3
上持有锁
- 线程1:认为自己在
- 数据竞争:两个线程同时进入临界区,导致数据不一致;
- 两个线程同时持有锁:
时序图:
4 分布式锁性能提升方法之——分段锁
4.1 简介
分段锁是一种将单个粗粒度锁拆分为多个细粒度锁的并发控制技术。其核心思想是:将共享资源划分为多个独立的区间(段),每个区间使用独立的锁进行保护,从而减少锁竞争,提高并发性能;
其思想就是:空间换时间 + 拆分粒度;
Java 的
ConcurrentHashMap
就是分段锁的经典实现:// ConcurrentHashMap 内部结构(Java 7及之前版本) public class ConcurrentHashMap<K, V> { final Segment<K, V>[] segments; // 分段数组 static final class Segment<K, V> extends ReentrantLock { // 每个Segment包含一个HashEntry数组 transient volatile HashEntry<K, V>[] table; } }
4.2 秒杀库存扣减场景举例
假设有一个商品库存
stock = 1000
,传统方式是用一把锁保护整个库存:// 传统方式 - 性能瓶颈 @RestController public class SeckillController { private final String LOCK_KEY = "product_001_lock"; private final String STOCK_KEY = "product_001_stock"; @PostMapping("/seckill") public String seckill() { RLock lock = redisson.getLock(LOCK_KEY); try { lock.lock(); int stock = Integer.parseInt(redisTemplate.opsForValue().get(STOCK_KEY)); if (stock > 0) { redisTemplate.opsForValue().set(STOCK_KEY, String.valueOf(stock - 1)); return "秒杀成功"; } return "库存不足"; } finally { lock.unlock(); } } }
使用分段锁优化,我们将库存拆分为10个分段:
- 在某些业务场景下, 也可以根据地理区域来划分子区域;
// 分段锁实现 @RestController public class SegmentSeckillController { private final int SEGMENT_COUNT = 10; // 分为10个段 private final String SEGMENT_LOCK_PREFIX = "product_001_segment_lock_"; private final String SEGMENT_STOCK_PREFIX = "product_001_segment_stock_"; // 初始化分段库存 @PostConstruct public void initStock() { int totalStock = 1000; int stockPerSegment = totalStock / SEGMENT_COUNT; int remainder = totalStock % SEGMENT_COUNT; for (int i = 0; i < SEGMENT_COUNT; i++) { int segmentStock = stockPerSegment + (i < remainder ? 1 : 0); redisTemplate.opsForValue().set(SEGMENT_STOCK_PREFIX + i, String.valueOf(segmentStock)); } } @PostMapping("/seckill-segment") public String seckillSegment() { // 1. 随机选择一个分段(或根据用户ID哈希) int segmentIndex = ThreadLocalRandom.current().nextInt(SEGMENT_COUNT); // 2. 获取该分段的锁 RLock segmentLock = redisson.getLock(SEGMENT_LOCK_PREFIX + segmentIndex); String segmentStockKey = SEGMENT_STOCK_PREFIX + segmentIndex; try { // 3. 尝试获取分段锁(设置较短超时时间) if (segmentLock.tryLock(100, 500, TimeUnit.MILLISECONDS)) { int stock = Integer.parseInt(redisTemplate.opsForValue().get(segmentStockKey)); if (stock > 0) { redisTemplate.opsForValue().set(segmentStockKey, String.valueOf(stock - 1)); return "秒杀成功,来自分段[" + segmentIndex + "]"; } // 当前分段库存不足,尝试其他分段 return tryOtherSegments(segmentIndex); } return "系统繁忙,请重试"; } finally { segmentLock.unlock(); } } private String tryOtherSegments(int excludedSegment) { // 尝试其他分段的逻辑 for (int i = 0; i < SEGMENT_COUNT; i++) { if (i == excludedSegment) continue; RLock otherLock = redisson.getLock(SEGMENT_LOCK_PREFIX + i); try { if (otherLock.tryLock(50, 200, TimeUnit.MILLISECONDS)) { int stock = Integer.parseInt(redisTemplate.opsForValue() .get(SEGMENT_STOCK_PREFIX + i)); if (stock > 0) { redisTemplate.opsForValue() .set(SEGMENT_STOCK_PREFIX + i, String.valueOf(stock - 1)); return "秒杀成功,来自分段[" + i + "](重试)"; } } } finally { otherLock.unlock(); } } return "库存不足"; } }
对比: