概述
为什么要要分布式锁
在并发编程中,我们通过锁,来避免由于竞争而造成的数据不一致问题。
通常,我们以synchronized 、Lock来使用它。Java中的锁,只能保证在同一个JVM进程内中执行
如果需要在分布式集群环境下的话,便需要分布式锁
分布式锁/线程锁/进程锁区别
分布式锁
:当多个进程不在同一个系统中(jvm),用分布式锁控制多个进程对资源的访问
。
线程锁
:主要用来给方法、代码块加锁。当某个方法或代码使用锁,在同一时刻仅有一个线程执行该方法或该代码段。
- 线程锁
只在同一JVM中有效果
, - 因为线程锁的实现在
根本上是依靠线程之间共享内存实现
的,比如synchronized是共享对象头,显示锁Lock是共享某个变量(state)。
进程锁
:为了控制同一操作系统中多个进程访问某个共享资源,因为进程具有独立性,各个进程无法访问其他进程的资源
,因此无法通过synchronized等线程锁实现进程锁。
分布式锁的使用场景
虽然线程间并发问题和进程间并发问题都可以通过分布式锁解决的,但是不推荐这样去做,因为采用分布式锁解决这些小问题是非常消耗资源
分布式锁应该用来解决分布式情况下的多进程并发的问题
才最合适
情境:线程A和线程B都共享某个变量X。
- 如果是单机情况下(单JVM),线程之间共享内存,只要
使用线程锁就可以解决
并发问题。 - 如果是分布式情况下(多JVM),线程A和线程B很可能不是在同一JVM中,这样线程锁就无法起到作用了,这时候就
要用到分布式锁来解决
分布式锁实现逻辑
分布式锁实现的关键是:在分布式的应用服务器外,搭建一个存储服务器,存储锁的信息
实现要点:
锁信息需要设置过期超时
的,不能让一个线程长期占有一个锁而导致死锁同一时刻只有一个线程可以获取到锁
。
实现方式:
数据库乐观锁
;基于Redis的分布式锁
;使用 Redis 实现锁,主要是
将状态放到 Redis 当中,利用其原子性,当其他线程访问时,如果 Redis 中已经存在这个状态,就不允许之后的一些操作
基于ZooKeeper的分布式锁
分布式锁实现要求
锁的实现同时满足以下四个条件
:
互斥性
。在任意时刻,只有一个客户端能持有锁
。不会发生死锁
。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。也就是设置一个超时时间具有容错性
。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。解铃还须系铃人
。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
实现redis分布式锁需要的命令/API
redis命令
# “set if not exits”,若该key-value不存在,则成功加入缓存并且返回1,否则返回0。
setnx(key, value)
# 获得key对应的value值,若不存在则返回nil。
get(key)
# 先获取key对应的value值,若不存在则返回nil,然后将旧的value更新为新的value。
getset(key, value)
# 设置key-value的有效期为seconds秒。
expire(key, seconds)
Set
语法:
- key: 要设置的键。
- value: 与键关联的值。
- EX seconds: 设置键的过期时间(以秒为单位)。
- PX milliseconds: 设置键的过期时间(以毫秒为单位)。
- NX: 仅在键不存在时设置键的值。
- XX: 仅在键已经存在时设置键的值
SET key value [EX seconds] [PX milliseconds] [NX|XX]
使用示例
# 基本使用
SET mykey "Hello, Redis!"
# 条件设置(仅在键不存在时保存, 如果 mykey 不存在,返回 (nil)。)
SET mykey "Hello, Redis!" NX
# 条件更新(仅在键已经存在时)
SET mykey "New Value" XX
# 设置过期时间(以毫秒为单位) 5000毫秒后过期
SET mykey "Hello, Redis!" PX 5000
# 设置过期时间(以秒为单位) 5000秒后过期
SET mykey "Hello, Redis!" EX 5000
总结:
- SET 命令 是 Redis 中最常用的命令之一,用于存储键值对。
- 通过 EX 和 PX 设置过期时间
- 通过 NX 和 XX 控制设置的条件`。
Del
在 Redis 中,DEL 命令用于删除一个或多个键。这个命令可以用来清除不再需要的数据。
参数说明
- key: 要删除的键,可以指定一个或多个键。
返回值:返回被删除的键的数量
。如果指定的键不存在,则不会报错,返回值仍然是被删除的键的数量。
DEL key [key ...]
示例
# 删除单个键。如果 mykey 存在,返回 1;如果不存在,返回 0。
DEL mykey
# 删除多个键。如果 key1、key2 和 key3 中的某些键存在,返回被删除的键的数量。
DEL key1 key2 key3
注意事项
- 使用 DEL 命令时,
如果键不存在,不会报错,返回值仍然是被删除的键的数量
。 - DEL 命令是一个 O(1) 操作,但在删除大量键时,可能会影响性能。
call
在 Redis 中,CALL 命令并不是一个直接的命令,而是 Lua 脚本中用于调用 Redis 命令的函数
。
通过 redis.call,你可以在 Lua 脚本中执行 Redis 的原生命令。
redis.call('COMMAND_NAME', arg1, arg2, ...)
详细说明
- KEYS: 在脚本中,KEYS 是一个数组,包含传递给脚本的所有键。
- ARGV: 你也可以使用 ARGV 数组来传递额外的参数。
示例
-- Lua 脚本:将两个键的值相加并返回结果
local value1 = redis.call('GET', KEYS[1]) -- 获取第一个键的值
local value2 = redis.call('GET', KEYS[2]) -- 获取第二个键的值
return value1 + value2 -- 返回两个值的和
总结
redis.call 是在 Lua 脚本中执行 Redis 命令的方式。
- 通过 KEYS 和 ARGV 数组,可以
灵活地传递键和参数
。 Lua 脚本的执行是原子性的,可以提高操作的效率
。
Jedis 接口
pom依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
eval()
在 Jedis 中,eval 方法用于执行 Lua 脚本
。
通过这个方法,你可以在 Redis 服务器上运行 Lua 脚本,从而实现原子操作和复杂的逻辑处理。
String result = jedis.eval(String script, List<String> keys, List<String> args);
参数说明
- script: 要执行的 Lua 脚本,作为字符串传入。
- keys: 需要在脚本中使用的键的列表。键的数量可以在脚本中通过 KEYS 表达式访问。
- args: 传递给脚本的参数列表。参数的数量可以在脚本中通过 ARGV 表达式访问。
注意事项
原子性: Lua 脚本在 Redis 中是原子执行的
,这意味着在脚本执行期间,其他命令不会干扰。性能: 使用 Lua 脚本可以减少网络往返次数
,提高性能,尤其是在需要执行多个命令时。- 调试: Lua 脚本的调试相对较难,因此在编写时要确保逻辑正确。
示例
import redis.clients.jedis.Jedis;
public class RedisLuaExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost");
// Lua 脚本:将两个键的值相加并返回结果
String script = "return redis.call('GET', KEYS[1]) + redis.call('GET', KEYS[2])";
// 需要使用的键
List<String> keys = Arrays.asList("key1", "key2");
// 执行脚本
String result = jedis.eval(script, keys, Collections.emptyList()).toString();
System.out.println("Result: " + result);
jedis.close();
}
}
API(Springboot)
Redisson Pom依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.10.1</version>
</dependency>
redis pom依赖
<!-- 引入redis依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
spring boot使用Redis的操作主要是通过RedisTemplate(或StringRedisTemplate )来实现
RedisTemplate和StringRedisTemplate的区别
RedisTemplate和StringRedisTemplate的区别:
- 两者的关系是
StringRedisTemplate继承RedisTemplate
。 - 两者的
数据是不共通的
;也就是说StringRedisTemplate只能管理StringRedisTemplate里面的数据,RedisTemplate只能管理RedisTemplate中的数据。 - SDR默认采用的序列化策略有两种,一种是String的序列化策略,一种是JDK的序列化策略。
- StringRedisTemplate默认采用的是String的序列化策略,保存的key和value都是采用此策略序列化保存的(StringRedisSerializer)。
- RedisTemplate默认采用的是JDK的序列化策略,保存的key和value都是采用此策略序列化保存的。(JdkSerializationRedisSerializer)
总结:
StringRedisTemplate:当你的redis数据库里面本来
存的是字符串数据或者你要存取的数据就是字符串类型数据的时候
。Redis当中的数据值是以数组形式显示出来的时候,只能使用RedisTemplate才能获取到里面的数据
RedisTemplate:但是如果你的
数据是复杂的对象类型
,而取出的时候又不想做任何的数据转换,直接从Redis里面取出一个对象。Redis当中的
数据值是以可读形式显示出来的时候,只能使用StringRedisTemplate才能获取到里面的数据
redisTemplate
// 将锁状态放入 Redis:setIfAbsent如果键不存在则新增,存在则不改变已经有的值。
redisTemplate.opsForValue().setIfAbsent("lockkey", "value");
// 设置锁的过期时间
redisTemplate.expire("lockkey", 30000, TimeUnit.MILLISECONDS);
//spring-data-redis 2.1 之后版本,加锁的同时设置过期时间,二者是原子性操作
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111",5, TimeUnit.SECONDS);
// 删除/解锁
redisTemplate.delete("lockkey");
// 获取锁
redisTemplate.opsForValue().get("lockkey");
StringRedisTemplate
@Autowired
private StringRedisTemplate stringRedisTemplate;
//在设置值的同时指定过期时间, 时间单位 s
stringRedisTemplate.opsForValue().set("key","value",7200, TimeUnit.SECONDS);
//删除key对应的键值对
stringRedisTemplate.opsForValue().delete("key");
//获取对应key的value
stringRedisTemplate.opsForValue().get("key");
实现redis分布式锁
单节点Redis的分布式锁
如果你的项目中Redis是多机部署的,那么可以尝试使用Redisson实现分布式锁,
加锁 实现
加锁实际上就是在redis中,给Key键设置一个值,为避免死锁,并给定一个过期时间
但是不建议分别使用加锁和设置超时这两个命令去设置值和过期时间,因为违背了原子性,也就是一旦锁被创建,而没有设置过期时间,则锁会一直存在
Jedis 实现
参数说明
第一个为key,我们使用key来当锁,因为key是唯一的
。
第二个为value,我们传的是requestId
,可靠性保证,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了
,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成
。
第三个为NX(键不存在时设置值),这个参数的意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;表示仅在键不存在时设置值
redis在set时,如果原先有值,SET 命令会返回 "OK"
第四个为PX(毫秒为单位),这个参数我们传的是PX,意思是我们要给这个key加一个过期时间的设置,具体时间由第五个参数决定。
第五个为expireTime,与第四个参数相呼应,代表key的过期时间大小
代码实现
public class RedisTool {
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
/**
* 尝试获取分布式锁
*
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功
*/
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
}
redis命令
-- 设置一个键 myLock,值为 12345,并且希望在 5000 毫秒后过期
SET myLock 12345 NX PX 5000
springBoot
但是在spring-data-redis 2.1 之后的版本,便可以直接设置过期时间了
spring-data-redis 2.1 前
spring-data-redis 2.1 前的版本
:在jedis当中是有这种原子操作的方法的,需要通过 RedisTemplate 的 execute 方法获取到jedis里操作命令的对象设置
String result = template.execute(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
JedisCommands commands = (JedisCommands) connection.getNativeConnection();
return commands.set(key, "锁定的资源", "NX", "PX", 3000);
}
});
spring-data-redis 2.1 后
spring-data-redis 2.1 之后的版本
//加锁的同时设置过期时间,二者是原子性操作
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111",5, TimeUnit.SECONDS);
解锁实现
解锁的过程就是将Key键删除
。但也不能乱删,不能说客户端1的请求将客户端2的锁给删除掉
为什么不直接删除锁?这种不先判断锁的拥有者而直接解锁的方式,会导致任何客户端都可以随时进行解锁,即使这把锁不是它的
Jedis实现
代码
public class RedisTool {
private static final Long RELEASE_SUCCESS = 1L;
/**
* 释放分布式锁
*
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @return 是否释放成功
*/
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;
}
}
为什么不直接删除
为什么不直接使用jedis.del()方法删除锁
?这种不先判断锁的拥有者而直接解锁的方式,会导致任何客户端都可以随时进行解锁,即使这把锁不是它的
jedis.del(lockKey);
为什么不判断是后在删除
如果想判断是不是这个客户端的锁,再去解锁行不行
?不行
,如果调用jedis.del()方法的时候,这把锁已经不属于当前客户端的时候会解除他人加的锁。
比如客户端A加锁,一段时间之后客户端A解锁,在执行jedis.del()之前,锁突然过期了,此时客户端B尝试加锁成功,然后客户端A再执行del()方法,则将客户端B的锁给解除了。
// 判断加锁与解锁是不是同一个客户端
if (requestId.equals(jedis.get(lockKey))) {
// 若在此时,这把锁突然不是这个客户端的,则会误解锁
jedis.del(lockKey);
}
Springboot
public Map<String, List<Catalog2Vo>> getCatalogJsonDbWithRedisLock() {
// 生成一个唯一的 UUID 作为锁的标识
String uuid = UUID.randomUUID().toString();
// 获取 Redis 的 ValueOperations 对象
ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
// 尝试在 Redis 中设置锁,过期时间为 5 秒
Boolean lock = ops.setIfAbsent("lock", uuid, 5, TimeUnit.SECONDS);
// 如果成功获取到锁
if (lock) {
// 获取分类数据
Map<String, List<Catalog2Vo>> categoriesDb = getCategoryMap();
// 获取当前锁的值
String lockValue = ops.get("lock");
// Lua 脚本,用于安全释放锁
String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
// 执行 Lua 脚本,释放锁
stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), lockValue);
// 返回获取的分类数据
return categoriesDb;
} else {
// 如果未能获取到锁,线程休眠 100 毫秒
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace(); // 打印异常堆栈
}
// 递归调用,尝试再次获取锁
return getCatalogJsonDbWithRedisLock();
}
}
多节点的redis分布式锁
概述
如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。
为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期
。
默认情况下,看门狗的检查锁的超时时间是30秒钟
,也可以通过修改Config.lockWatchdogTimeout来另行指定。
另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了
。
高并发场景下的问题
高并发场景下如下问题:
- 主从切换后,原从库被推举为主库,当在其他请求加锁的时候,连接的redis可能还没有同步到第一次加的锁,造成锁失效。
- 主库发生故障,加锁完成,还未同步到从节点或者集群中其他节点的时候,当前节点挂掉,锁就丢失了。
- 两种情况导致出现的原因就是redis的数据同步是异步的
Redisson
相对于Jedis而言,Redisson强大很多。当然了,随之而来的就是它的复杂性。它里面也实现了分布式锁,而且包含多种类型的锁
,
具体内容:分布式锁和同步器
可重入锁(Reentrant Lock)示例
下述已可重入锁(Reentrant Lock)示例,获取客户端进行加解锁操作如下
public static void main(String[] args) {
// 创建 Redis 配置对象
Config config = new Config();
// 设置 Redis 服务器地址
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
// 设置 Redis 服务器密码
config.useSingleServer().setPassword("redis1234");
// 创建 Redisson 客户端实例
final RedissonClient client = Redisson.create(config);
// 获取名为 "lock1" 的分布式锁
RLock lock = client.getLock("lock1");
try {
// 尝试获取锁,最多等待 10 秒,锁定 30 秒
if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {
try {
// 在此处执行需要加锁的操作
// 例如:处理共享资源或执行关键业务逻辑
} finally {
// 确保在操作完成后释放锁
lock.unlock();
}
} else {
// 如果无法获取锁,输出提示信息
System.out.println("无法获取锁,操作被跳过");
}
} catch (InterruptedException e) {
// 如果线程被中断,恢复中断状态
Thread.currentThread().interrupt();
// 输出中断信息
System.out.println("线程被中断");
}
}
加锁
调用lock方法,定位到lockInterruptibly。在这里,完成了加锁的逻辑。
代码
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 当前线程的 ID
long threadId = Thread.currentThread().getId();
// 尝试获取锁,返回值为锁的剩余时间(TTL)
Long ttl = tryAcquire(leaseTime, unit, threadId);
// 如果 ttl 为空,则证明获取锁成功
if (ttl == null) {
return; // 成功获取锁,直接返回
}
// 如果获取锁失败,则订阅到对应这个锁的 channel
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future); // 同步订阅,等待锁的释放
try {
while (true) {
// 再次尝试获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
// ttl 为空,说明成功获取锁,跳出循环
if (ttl == null) {
break; // 成功获取锁,退出循环
}
// ttl 大于 0 则等待 ttl 时间后继续尝试获取
if (ttl >= 0) {
// 尝试在 ttl 时间内获取锁
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
// ttl 小于 0,表示无限期等待
getEntry(threadId).getLatch().acquire();
}
}
} finally {
// 取消对 channel 的订阅,确保资源释放
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit)); // 可能是异步获取锁的逻辑,注释掉的部分
}
// 两种处理方式,一种是带有过期时间的锁,一种是不带过期时间的锁。
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
// 如果带有过期时间,则按照普通方式获取锁
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 先按照 30 秒的过期时间来执行获取锁的方法
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 如果还持有这个锁,则开启定时任务不断刷新该锁的过期时间
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return; // 如果获取锁失败,直接返回
}
Long ttlRemaining = future.getNow();
// 锁已成功获取
if (ttlRemaining == null) {
// 开启定时任务以刷新锁的过期时间
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture; // 返回异步获取锁的结果
}
// 正执行获取锁的逻辑,它是一段LUA脚本代码,hash数据结构。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit,
long threadId, RedisStrictCommand<T> command) {
// 将过期时间转换为毫秒
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 如果锁不存在,则通过 hset 设置它的值,并设置过期时间
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果锁已存在,并且锁是当前线程,则通过 hincrby 给数值递增 1
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果锁已存在,但并非本线程,则返回过期时间 ttl
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
流程图
解锁
通过调用unlock方法来解锁。
代码如下
// 这是一个公开的异步解锁方法,接受当前线程的 ID 作为参数,返回一个 RFuture<Void> 对象。
public RFuture<Void> unlockAsync(final long threadId) {
// 创建一个 Promise 对象,用于异步操作的结果
final RPromise<Void> result = new RedissonPromise<Void>();
// 调用解锁的内部异步方法
RFuture<Boolean> future = unlockInnerAsync(threadId);
// 添加监听器以处理解锁操作的结果
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
// 检查解锁操作是否成功
if (!future.isSuccess()) {
// 如果失败,取消过期时间的续期任务
cancelExpirationRenewal(threadId);
// 将失败原因传递给 Promise
result.tryFailure(future.cause());
return;
}
// 获取解锁操作的返回值
Boolean opStatus = future.getNow();
// 如果返回值为空,表示当前线程未持有锁,抛出异常
if (opStatus == null) {
IllegalMonitorStateException cause =
new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
// 解锁成功,取消刷新过期时间的定时任务
if (opStatus) {
cancelExpirationRenewal(null);
}
// 将成功结果传递给 Promise
result.trySuccess(null);
}
});
// 返回 Promise 对象
return result;
}
// 执行 Redis 脚本:使用 evalWriteAsync 方法执行 Lua 脚本,进行原子操作。
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL,
//如果锁已经不存在, 发布锁释放的消息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
//如果释放锁的线程和已存在锁的线程不是同一个线程,返回null
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//通过hincrby递减1的方式,释放一次锁
//若剩余次数大于0 ,则刷新过期时间
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
//否则证明锁已经释放,删除key并发布锁释放的消息
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()),
LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
流程图
redlock
使用流程
redlock的使用流程大致如下:
- 客户端获取到当前的时间戳。
- 客户端按顺序向部署的N个Redis实例执行加锁操作。在设定时间内,不管加锁成功还是失败,都会继续向下一个实例申请加锁操作。
- 若加锁成功的实例个数>= (N/2) + 1,并且加锁的总耗时要<锁设定的过期时间,Redlock就判断加锁成功,反之就是加锁失败。
- 加锁成功了,就继续往下操作,比如操作MySQL资源;若加锁失败,则会向所有节点发起锁释放的操作请求。
设计规则
Redlock的设计规则就是:
客户端要在所有实例上申请加锁,只有保证大多数节点加锁成功了才判定为加锁成功
。加锁的总耗时要 < 锁设定的过期时间
。释放锁的时候,要向所有节点发起锁释放的请求,不管之前加锁是否成功
( 为了确保只释放自己的锁,需要用前面提到的 Lua 脚本来代替直接使用 DEL 命令进行解锁操作)
代码
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 实现要点之允许加锁失败节点限制(N-(N/2+1))
int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
// 实现要点之遍历所有节点通过EVAL命令执行lua加锁
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
// 对节点尝试加锁
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
} catch (RedisConnectionClosedException|RedisResponseTimeoutException e) {
// 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
// 抛出异常表示获取锁失败
lockAcquired = false;
}
if (lockAcquired) {
// 成功获取锁集合
acquiredLocks.add(lock);
} else {
// 如果达到了允许加锁失败节点限制,那么break,即此次Redlock加锁失败
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}
}
}
return true;
}
RedLock 红锁 已经废弃
RedLock 红锁:RedLock 会对集群的每个节点进行加锁,如果大多数(N/2+1)加锁成功了,则认为获取锁成功
。
- 这个过程中
可能会因为网络问题,或节点超时的问题,影响加锁的性能,故而在最新的 Redisson 版本中中已经正式宣布废弃 RedLock
。
redisTemplate
为了解决多节点的上述问题,可以使用redisTemplate中的setIfAbsent方法
setIfAbsent方法是原子性的
单个 Redis 实例:在单个 Redis 实例中,setIfAbsent 是原子操作
,确保在键不存在时才会设置值。Redis 集群:在 Redis 集群中,setIfAbsent 仍然是原子操作
,但它只在同一个分片(slot)内有效。如果不同的节点(分片)之间存在竞争条件,可能会导致不一致的结果
可以在这个方法中,构造一个和可重用锁差不多的代码,及判断当前线程是否为加锁线程,去实现多节点先的分布式锁