Redis分布式锁

发布于:2023-01-22 ⋅ 阅读:(457) ⋅ 点赞:(0)

引言

随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要-种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

一、分布式锁的概念

分布式锁指的是,所有服务中的所有线程都去获得同一把锁,但只有一个线程可以成功的获得锁,其他没有获得锁的线程必须全部等待,等到获得锁的线程释放掉锁之后获得了锁才能进行操作。Redis官网中,set key value有个带有NX参数的命令,这是一个原子性加锁的命令,指的是此key没有被lock时,当前线程才能加锁,如果已经被占用,就不能加锁。

                                                                   图1

 二、Redis分布式锁在java代码的实现

在虚拟机开启redis服务,设置num为0,根据图1,可以初步写出如下java代码:

​
@GetMapping("testLock")  
public void testLock(){
//1获取锁,setne
Boolean lock =redisTemplate.opsForValue().setIfAbsent( k: “lock", v: "111");
//2获取锁成功、查询num的值 
if(lock){
Object value =redisTemplate.opsForValue().get("num");
//2.1判断num为空return
if(StringUtils.isEmptv(value)){
return:}
//2.2有值就转成成int
int num = Integer.parseInt(s:value+"");
/12.3把redis的num加1
redisTemplate.opsForValue().set("num”,++num);
//2.4释放锁
redisTemplate. delete( key:“lock");
}}else{
//3获取锁失败、每隔0.1秒再获取 
try{
Thread. sleep( millis: 100): 
testLock();
} catch (InterruptedException e) {
e.printStackTraceO;
}

​

 在上述代码中,看似没有问题,但是如果在上锁之后,释放锁之前的代码的执行出现问题,那么就无法释放锁了,所以此时应该在步骤1获取锁的时候给锁设置一个过期时间,如下代码:

​
@GetMapping("testLock")  
public void testLock(){
//1获取锁,setne
Boolean lock =redisTemplate.opsForValue().setIfAbsent( k: “lock", v: "111",3, TimeUnit. SECONDS,);
//2获取锁成功、查询num的值 
if(lock){
Object value =redisTemplate.opsForValue().get("num");
//2.1判断num为空return
if(StringUtils.isEmptv(value)){
return:}
//2.2有值就转成成int
int num = Integer.parseInt(s:value+"");
/12.3把redis的num加1
redisTemplate.opsForValue().set("num”,++num);
//2.4释放锁
redisTemplate. delete( key:“lock");
}}else{
//3获取锁失败、每隔0.1秒再获取 
try{
Thread. sleep( millis: 100): 
testLock();
} catch (InterruptedException e) {
e.printStackTraceO;
}

​

此时也会有一个问题,什么问题呢,接下来我举个例子

如上图所示,设置锁10秒过期,a进行操作,先上锁,然后进行具体操作,但是此时服务器突然出现卡顿,导致a的具体操作还没完成,锁就先过期自动释放。

接下来,锁一旦被释放,就被b抢占了,然后进行具体操作,但是此时a操作的服务器反应过来了,进行了锁的释放,就会导致操作b的锁被操作a释放了,进而导致锁又再次被其他操作抢占。进而导致这种混乱的场面。

因此,我们就得保证a操作只能释放自己的锁而不能去释放其他操作持有的锁,此时,我们就可以在每个操作进行上锁时,给锁设置一个uuid,然后在完成具体操作后,在java代码中通过判断该锁的uuid是不是自己操作时的uuid来判断是否释放锁了,代码如下:


@GetMapping("testLock")  
public void testLock(){
//1获取锁,setne
String uuid=UUID.randomUUID().toString();
Boolean lock =redisTemplate.opsForValue().setIfAbsent( “lock", uuid,3, TimeUnit. SECONDS,);
//2获取锁成功、查询num的值 
if(lock){
Object value =redisTemplate.opsForValue().get("num");
//2.1判断num为空return
if(StringUtils.isEmptv(value)){
return:}
//2.2有值就转成成int
int num = Integer.parseInt(s:value+"");
/12.3把redis的num加1
redisTemplate.opsForValue().set("num”,++num);
//2.4释放锁
String lockUuid=(String)redisTemplate.opsForValue().get("lock");
if(lockUuid.equals(uuid)){
redisTemplate. delete("lock");
}
}}else{
//3获取锁失败、每隔0.1秒再获取 
try{
Thread. sleep( millis: 100): 
testLock();
} catch (InterruptedException e) {
e.printStackTraceO;
}
}

​

此时,你可能会觉得这下应该没有问题了吧,那只能说你太单纯了,不了解社会的险恶,上述代码还是会出现问题,下面我就来解释一下吧

当a操作进行上锁,完成具体操作后,然后进行uuid的比较,判断得锁的uuid与自己操作时设置的uuid一致,进行释放锁的操作,但如果在正要释放锁而没有释放锁的时候,锁刚好过期,这就导致锁会被b操作抢占,然后进行具体操作,当操作a完成了对锁的释放,也就是意味着操作b的锁被操作a释放了。

为了解决这一问题,我们就得保证在uuid判断完成后进行解锁这一过程具有原子性,即在此过程中不能被其他操作抢占,这时,我们就需要用到lua脚本了。

那么lua脚本又什么用呢?它可以将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接 redis 的次数。提升性能。lua脚本是类似 redis 事务,有一定的原子性,不会被其他命令插队,可以完成一些 redis 事务性的操作。但是注意 redis的lua脚本功能,只有在Redis2.6 以上的版本才可以使用。

因此,我们只需将判断uuid和释放锁的这一过程写成lua脚本(使这一过程具有原子性)就可以解决问题了,代码如下:

​@GetMapping("testLock")  
public void testLock(){
//1获取锁,setne
String uuid=UUID.randomUUID().toString();
Boolean lock =redisTemplate.opsForValue().setIfAbsent( “lock", uuid,3, TimeUnit. SECONDS,);
//2获取锁成功、查询num的值 
if(lock){
Object value =redisTemplate.opsForValue().get("num");
//2.1判断num为空return
if(StringUtils.isEmptv(value)){
return:}
//2.2有值就转成成int
int num = Integer.parseInt(s:value+"");
/12.3把redis的num加1
redisTemplate.opsForValue().set("num”,++num);
//2.4释放锁
String lockUuid=(String)redisTemplate.opsForValue().get("lock");
/*使用lua脚本来锁*/
//定义lua脚本
String script ="if redis.call('get',KEYS[1])== ARGV[1] then return redis.call('del’ KEYS[1]) else return 0 end";
//使用redis执行lua执行
DefaultRedisSeript<Long> redisSeript=new DefaultRedisSeript<>(): redisScript.setScriptText(script);
//设置一下返回值类型为Long
//因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型
//那么返回字符串与会有发生错误。
redisSeript.setResultType(Long.class);
//第一个要是script脚本,第二个需要判断的key,第三个就是key所对应的值。 redisTemplate.execute(redisScript.Arravs.aslist(locKev),uuid);
}else{
//3获取锁失败、每隔0.1秒再获取 
try{
Thread. sleep( millis: 100): 
testLock();
} catch (InterruptedException e) {
e.printStackTraceO;
}
}

​

​

本文含有隐藏内容,请 开通VIP 后查看