Redis系列-5 Redis分布式锁

发布于:2024-06-10 ⋅ 阅读:(167) ⋅ 点赞:(0)

背景:

本文介绍Redis分布式锁的内容,包括Redis相关命令和Lua脚本的介绍,以及操作分布式锁的流程与消息,最后结合Redission源码介绍分布式锁的实现原理。

1.基本命令

1.1 基本键值对的设置

设值: set key value
取值: get key
删除: del key

>set key1 value1
"OK"
>get key1
"value1"
>del key1
"1"

1.2 setnx用法

setnx key value:
当key不存在时,进行设置,返回1(表示操作成功)
当key存在时,不进行设置,返回0(表示操作失败)

>setnx key1 value1
"1"
>setnx key1 value1
"0"

1.3 setex和psetex

setex key seoconds value
等价于原子性地执行了 set key value和expire key seconds
psetex用法与setex相同,区别是setex单位为秒,而psetex是毫秒;

>setex key1 1000 value1
"OK"
>ttl key1
"997"

1.4 set扩展用法

set key value [EX seconds | PX millSeconds] [NX | XX]
seconds EX 表示设置过期时间以秒为单位,millSeconds PX 表示设置过期时间以毫秒为单位;
NX表示当键不存在时执行,并返回OK;否则返回null
XX表示当键存在时执行,并返回OK;否则返回null

>set key1 value1 EX 1000 NX
"OK"
>set key1 value1 EX 1000 NX
null
>set key1 value1 EX 2000 XX
"OK"
>ttl key1
"1996"

2.lua脚本

由于redis是单线程执行的,因此可以原子性地执行lua脚本。因此可通过lua脚本对基本命令进行组合。
格式如下:

EVAL "lua脚本" n KEY... , ARGV...

(1) 通过EVAL命令执行lua脚本;
(2) 可对脚本进行传参,可以传多个KEY和多个ARGV,KEY和ARGV建议使用逗号(,)隔开;
(3) 需要显示指定KEY个数;
(4) lua脚本通过KEYS[i] 和 ARGV[j] 获取传入的参数,下标从1开始;
以下通过案例的方式介绍一下lua脚本的使用。

2.1 加锁

分布式锁的数据结构可以被定义为如下格式:

{
 "lockKey":  {
        "uuid: threadId": num
    }
}

lockKey表示分布式锁:数据库存中存在lockKey键时,表示已有客户端占据了lockKey锁,否则表示lockKey锁未被获取。
uuid: threadId结构包含了UUID唯一字符串,num为获取锁的次数。UUID用于保证上锁和解锁是同一个客户端,num用于实现锁的可重入。
案例:

// 如果锁不存在,则加锁并设置过期时间
if (redis.call('exists', KEYS[1]) == 0) then
    // 设置锁记录锁的获取次数为1
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    // 设置锁的过期时间
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;

// 如果锁存在,且为自己,锁+1,并重新设置过期时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    // 设置锁记录锁的获取次数+1
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    // 重置锁的过期时间
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;

// 锁已存在,不是自己,则返回锁到期时间
return redis.call('pttl', KEYS[1]);

说明:
上述LUA脚本返回空,说明锁获取成功;否则获取失败并得到锁的过期时间(毫秒)。

其中,redis.call('exists', KEYS[1])表示KEY[1]键是否存在,存在返回1,不存在返回0;redis.call('hincrby', KEYS[1], ARGV[2], 1)表示对哈希类型数据KEYS[1]和ARGV[2]键对应的值加1;redis.call('pexpire', KEYS[1], ARGV[1])表示设置KEYS[1]键的有效期为ARGV[1],单位毫秒;

在redis客户端进行如下操作:

>EVAL "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" 1 myLock 60000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
null

>hget myLock 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
"1"

>ttl myLock
"54"

>EVAL "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" 1 myLock 60000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
null

>ttl myLock
"56"

>hget myLock 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12
"2"

给上述lua脚本的传参为1 myLock 60000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

1表示只有一个Key, 其他为ARGV, 即
KEY[1] = myLock
ARGV[1]=60000
ARGV[2]=80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

得到的结果如下:

{
 "myLock":  {
        "80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12": 2
    }
}

表示"myLock"分布式锁已被占用, 获取锁的次数为2次。

2.2 解锁

案例:

// 解锁成功返回1,失败返回0
if (redis.call('del', KEYS[1]) == 1) then 
    // 向Redis发布消息
    redis.call('publish', KEYS[2], ARGV[1]); 
    return 1 
else 
    return 0 
end

说明:
解锁成功后,该lua脚本返回1,解锁失败返回0;
其中: redis.call('del', KEYS[1])表示根据KEYS[1]键删除数据;redis.call('publish', KEYS[2], ARGV[1])表示发布消息 KEYS[2], ARGV[1];

2.3 释放一层锁

// 锁不是被自己占有,直接返回
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
end;

// 锁数量-1,如果还大于0,重新设置过期时间;否则删除锁
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
    // 重置过期时间
    redis.call('pexpire', KEYS[1], ARGV[2]);
    return 0;
else
    redis.call('del', KEYS[1]);
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1;
end;
return nil;

上述Lua脚本返回1表示删除锁成功,返回0表示锁释放一层,返回空表示释放失败。

2.4 续期

// 锁被自己占用,重新设置过期时间,返回1;否则返回0if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1;
end;
return 0;

上述Lua脚本返回1表示续期成功,返回0表示续期失败(当前未获取锁)。

3.Redission用法

分布式锁可以直接使用开源的Redission
引入依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.16.1</version>
</dependency>

编码如下:

public static void lock1() {
    RedissonClient redisson = getRedissonClient();
    RLock lock = redisson.getLock("myLock");
    // 获取锁
    lock.lock();
    try {
        // 业务逻辑
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        // 释放锁
        lock.unlock();
    }
    redisson.shutdown();
    System.out.println("Begin end");
}

// 获取redis客户端实例
private static RedissonClient getRedissonClient() {
    Config config = new Config();
    config.setLockWatchdogTimeout(600*1000);
    config.useSingleServer().setAddress("redis://127.0.0.1:6001").setPassword("xxx");
    RedissonClient redisson = Redisson.create(config);
    return redisson;
}

说明:redisson.getLock(“myLock”)中的myLock即为分布式锁的键,多个客户端实例需要保证键相同。
lock.lock()用于执行获取锁的逻辑,获取成功后直接返回;获取失败后进入等待队列阻塞;lock.unlock();用于手动解锁。
getRedissonClient方法用于获取redis客户端实例,其中的setLockWatchdogTimeout方法用于设置看门狗的超时时间,单位毫秒,默认为30000(30秒)。
使用lock.lock()方法获取锁时不需要设置锁的过期时间,在获取锁成功后,Redisson通过看门狗机制,进行锁的续期,每经过WatchdogTimeout/3时间执行一次续期操作。
当lock.unlock()释放锁时,会同时关闭看门狗。

4.流程和消息

4.1 流程介绍

屏蔽底层Redis对锁的实现方式,仅用Lock和UnLock表示获取锁和释放锁,分布式锁的竞争流程可表示如下图所示:
在这里插入图片描述
[1] 客户端ClientA向Redis发送获取锁的消息,锁key为myLock(自定义);
[2] Redis响应成功,表示占锁成功;
[3] 客户端ClientB向Redis发送获取锁的消息,key为myLock;
[4] 服务器判断此时myLock锁已被ClientA占有,Redis响应失败;
[5] Client B 向Redis发送订阅消息订阅myChannel频道,等待收到通知;
[6-7] ClientA释放锁同时发布消息至Redis的myChannel频道;
[8] Redis收到publish消息后,向所有订阅了myChannel频道的客户端发送message通知消息;
[9] ClientB收到订阅的消息后,知道锁已被释放,再次获取锁;
其中:消息6和消息7是lua脚本执行的,因此具备原子性;当客户端收到message消息时,表明锁已被释放,可以重新竞争锁。
另外,对于客户端ClientA,在消息2-6之间,Redis的看门狗机制会自动为myLock续期。

4.2 消息介绍

Auth消息:

*2
$4
AUTH
$8
Root@123

+OK

其中:*2 表示由两个输入字符串;
$4表示第一个字符串长度为4,即AUTH;
$8表示第二个字符串长度为8,即Root@123;
+OK为Redis返回的结构,表示鉴权成功;
解析后为:

client: AUTH Root@123
Redis: OK

PING/PONG消息:

*1
$4
PING

+PONG

客户端向Redis发送PING心跳消息,Redis响应PONG消息。

QUITE消息:

*1
$4
QUIT

+OK

客户端向Redis发送QUITE退出消息,Redis响应OK消息。

以下分场景介绍Redis消息,包括成功获取锁—锁的续期—锁的释放和发布通知以及获取锁失败—锁的订阅—收到通知消息—取消订阅等,为简化篇幅,将省略AUTH、PING/PONG、AUITE等重复的内容。

4.2.1 成功获取锁

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "exists"过滤条件,得到:

*6
$4 
EVAL
$339 
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);
$1
1
$6
myLock
$4
6000
$39
80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

解析后为:

EVAL "lua脚本" 1 myLock 6000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

4.2.2 获取锁后,锁的续期

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""过滤条件,得到:

*6
$4
EVAL
$120
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;
$1
1
$6
myLock
$4
6000
$39
80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

解析后为:

EVAL "lua脚本" 1 myLock 6000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

4.2.3 释放锁

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""过滤条件,得到:

*8
$4
EVAL
$305
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;
$1
2
$6
myLock
$31
redisson_lock__channel:{myLock}
$1
0
$4
6000
$39
80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

解析后为:

EVAL "lua脚本" 2 myLock redisson_lock__channel:{myLock} 0 6000 80e6bb0b-b3bb-4ce0-b120-9c7ce917b767:12

4.2.4 获取锁失败后订阅

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""过滤条件,得到:

*2
$9
SUBSCRIBE
$31
redisson_lock__channel:{myLock}

解析后为:

EVAL SUBSCRIBE redisson_lock__channel:{myLock}

4.2.5 订阅后收到通知消息

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""过滤条件,得到:

*3
$7
message
$31
redisson_lock__channel:{myLock}
$1
0

解析后为:

message redisson_lock__channel:{myLock} 0

4.2.6 取消订阅

通过wireshark抓包以及经过tcp.port == 16379 && tcp contains "pexpire" && tcp contains "hexists""过滤条件,得到:

*2
$11
UNSUBSCRIBE
$31
redisson_lock__channel:{myLock}

解析后为:

EVAL UNSUBSCRIBE redisson_lock__channel:{myLock}

5.源码

源码介绍围绕下图展开,如果对下图的逻辑线比较属性,直接跳过本章内容。
在这里插入图片描述
在介绍源码前,有必要了解一下两个概念: Redis的订阅发布机制和Semaphore.
Redis订阅和发布机制:
打开两个Redis客户端,分别执行subscribe myChannel订阅myChannel频道的消息:

>subscribe myChannel
切换到推送/订阅模式,关闭标签页来停止接收信息。
1) "subscribe"
2) "myChannel"
3) "1"

再打开一个客户端,执行publish myChannel key1告诉Redis,向订阅了myChannel的客户端发送消息:

>publish myChannel key1
"2"

返回值2表示有两个订阅客户端。

客户端收到Redis的通知消息:

1) "message"
2) "myChannel"
3) "key1"

Semaphore:

说明:由于在线程专题已经详细介绍过AQS,这里涉及AQS的内容不再展开介绍。

Semaphore是JUC中的一个并发工具类,内部维持了一个state的整数记录状态值,并提供了acquireXXX和release方法用于获取和释放锁(共享锁)。当state的值小于acquireXXX时,线程会进入AQS的等待队列,处于阻塞状态; release方法被时,state属性会增加,如果大于0,则从等待队列中唤醒一个。

如下案例中,Semaphore创建时,state设置为0,当客户端ClientA调用acquire方法获取锁时,进入Semaphore的等待队列处于阻塞状态;
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7H3sF8Ns-1717209303379)(C:\Users\0216001379\AppData\Roaming\Typora\typora-user-images\1716255100424.png)]
当客户端ClientB调用release方法释放锁时(本质是对state值进行加法运算),此时Semaphore会自动唤醒处于等待队列中的ClientA.

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-n602PIUM-1717209303380)(C:\Users\0216001379\AppData\Roaming\Typora\typora-user-images\1716255109685.png)]
客户端A唤醒时,会再次尝试获取锁,此时Semaphore拥有共享锁的数量为1,与acquire方法获取数量相同(默认获取1个),获取锁正常。
在这里插入图片描述接下来,根据如下案例进行源码介绍:

public static void main(String[] args) {
    RedissonClient client = getClient();
    RLock lock = client.getLock("myLock");
    System.out.println("Lock1 Begin exec");
    lock.lock();
    try {
        System.out.println("Lock1 Begin...");
        Thread.sleep(1000 * 60);
        System.out.println("Lock1 End.");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
    client.shutdown();
}

5.1 获取锁

源码入口lock.lock()->lock(-1, null, false)

说明:为减少代码重复度,会提取公共部分代码形成模板方法,模板方法相对于提取前的方法因存在扩展逻辑,导致可读性降低(虽然整体可维护性提升)。如lock(long leaseTime, TimeUnit unit, boolean interruptibly)方法支持响应中断与忽略中断两种情况,支持设置超时时间与不设置超时时间两种情况。

说明:是否响应中断(即被中断时抛出异常或者不抛出异常)不影响解析主线逻辑,在介绍源码时,认为interruptibly为false, 对interruptibly=true分为不进行说明。

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
 long threadId = Thread.currentThread().getId();
    // part-1.执行lua脚本获取锁
 Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
 if (ttl == null) {
  return;
 }

    // part-2.向Redis发布订阅请求
 RFuture<RedissonLockEntry> future = subscribe(threadId);
 commandExecutor.syncSubscription(future);

 try {
        // part-3.while死循环中获取锁
  while (true) {
   ttl = tryAcquire(-1, leaseTime, unit, threadId);
   if (ttl == null) {
    break;
   }

   if (ttl >= 0) {
    try {
     future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
     future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    }
   } else {
    future.getNow().getLatch().acquireUninterruptibly();
   }
  }
 } finally {
        // part-4.获取锁成功后,取消订阅
  unsubscribe(future, threadId);
 }
}

上述lock方法的主体逻辑可以分为4个部分:
[1] part-1:执行Lua脚本尝试获取锁,获取锁成功,则直接返回;
[2] part-2: 获取锁失败后,向Redis发布订阅;
[3] part-3: while死循环,获取锁或者抛出异常后退出循环;
[4] part-4: 退出while循环(获取锁成功或抛出异常),向Redis发送取消订阅消息;
整体流程比较清晰,从逻辑上可以切分为两个部分:获取锁成功场景和获取锁失败场景。

5.1.1 获取锁成功:

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
 long threadId = Thread.currentThread().getId();
    // part-1.执行lua脚本获取锁
 Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
 if (ttl == null) {
  return;
 }
 // Ignore ...
}

tryAcquire(-1, leaseTime, unit, threadId)返回null时,获取锁成功,退出lock方法。进入tryAcquire方法:

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

其中tryAcquireAsync返回一个Future对象,get方法阻塞等待(通过Future的await方法)该Future执行完成并返回结果或者抛出异常(包装后的RedisException)。进入tryAcquireAsync方法:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    // 1.尝试从Redis获取锁,返回一个异步的Future对象
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime != -1) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                               TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    // 2.在Future对象添加回调函数,完成时回调
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired
        if (ttlRemaining == null) {
            if (leaseTime != -1) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 3.通过看门狗对锁进行自动续期
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

tryAcquireAsync方法整体逻辑较为简单:尝试从Redis获取锁,返回一个异步的Future对象;然后在Future对象添加回调逻辑,在Future完成时回调。

tryAcquireAsync方法仍然是个模板方法,支持设置过期时间和不设置过期时间:

[1]不设置时间: 向Redis申请锁时携带看门狗的超时时间(在3.Redission用法章节中通过Config对象的setLockWatchdogTimeout方法设置),之后通过看门狗续期。

[2]设置过期时间: 向Redis申请锁时携带执行的超时时间,超时后自动释放锁,因此不需要看门狗。

这里有两个重点方法需要关注一下:tryLockInnerAsync向Redis申请锁和scheduleExpirationRenewal开启看门狗。

tryLockInnerAsync方法就是将Lua脚本的执行包装为异步执行,返回一个Future对象:

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                          "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "return redis.call('pttl', KEYS[1]);",
                          Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

这里的lua脚本在2.1章节已进行结合,不再赘述。

scheduleExpirationRenewal功能是开启一个看门狗线程,定期(1/3的看门狗超时时间)向Redis续期,主线逻辑如下所示:

protected void scheduleExpirationRenewal(long threadId) {
   //...
   // scheduleExpirationRenewal的核心逻辑在于调用renewExpiration
   renewExpiration();
   //...
}


private void renewExpiration() {
    //...
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {          
            //...
            // 调用lua脚本-为锁续期
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {   
                //...
                // 每internalLockLeaseTime/3时间,回调自身,开始循环
                renewExpiration();   
                //...                    
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    //...
}

// 对lua脚本异步执行的封装,与2.4中介绍的lua脚本相同
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

5.1.2 获取锁失败:

当锁已被其他客户端占有,获取锁失败,进入订阅-阻塞等待锁释放流程:

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
 //...

    // part-2.向Redis发布订阅请求
 RFuture<RedissonLockEntry> future = subscribe(threadId);
 commandExecutor.syncSubscription(future);

 try {
        // part-3.while死循环中获取锁
  while (true) {
   ttl = tryAcquire(-1, leaseTime, unit, threadId);
   if (ttl == null) {
    break;
   }

   if (ttl >= 0) {
    try {
     future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
     future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    }
   } else {
    future.getNow().getLatch().acquireUninterruptibly();
   }
  }
 } finally {
        // part-4.获取锁成功后,取消订阅
  unsubscribe(future, threadId);
 }
}

[1] 先看一下向Redis发布订阅请求部分:

// 向lua发送订阅消息,并注册一个监听器监听订阅频道的通知消息
RFuture<RedissonLockEntry> future = subscribe(threadId);
// 与前文结合的get方法逻辑相似,阻塞等待Future执行完成,即等待订阅消息发送给Redis并说道订阅成功消息
commandExecutor.syncSubscription(future);

重点在于第一个方法subscribe(threadId):向Redis发送订阅消息,并注册一个监听器,监听通知消息。
向redis发送订阅消息, 消息内容如章节4.2.4中的SUBSCRIBE redisson_lock__channel:{myLock}, 该消息表示客户端订阅redisson_lock__channel:{myLock}频道, 当Redis服务器收到该频道的通知消息后,会以Message类型的消息通知给订阅的客户端,消息内容为:message redisson_lock__channel:{myLock} 0.
当Redssion客户端收到message redisson_lock__channel:{myLock} 0后,监听器被调用,监听器的注册和监听器的内容后面介绍。
[2] 接着进入while死循环,只有抛出异常或者获取锁成功才会退出:

while (true) {
    // tryAcquire前文已介绍:获取锁成功返回null, 否则返回锁的超时时间
    ttl = tryAcquire(-1, leaseTime, unit, threadId);
    if (ttl == null) {
        break;
    }

    if (ttl >= 0) {
        try {
            // 根据锁的超时时间,定时阻塞等待锁,超时后,自动苏醒
            future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
        }
    } else {
        // 当锁没有设置超时时间时,阻塞等待,直到被唤醒
        future.getNow().getLatch().acquireUninterruptibly();
    }
}

这里的future.getNow().getLatch()返回的是一个Semaphore对象,初始化时state设置为0,因此调用acquire方法会陷入等待队列。唤醒逻辑在5.3 订阅和通知章节中介绍。
[3] unsubscribe(future, threadId)用于取消订阅,能进入finnally说明有异常抛出或者已经获取锁,从而不需要再监听redis的通知,unsubscribe核心是删除注册的监听器。

5.2 释放锁

Redssion提供的分布式锁支持可重入,因此多次获取需要多次释放。根据案例的lock.unlock()进入unlock方法:

public void unlock() {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}

get是5.1章中结合过,这里直接进入unlockAsync方法,主线逻辑如下:

public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<>();
    //  向Redis发送解锁消息
    RFuture<Boolean> future = unlockInnerAsync(threadId);
    future.onComplete((opStatus, e) -> {
        // 关闭看门狗
        cancelExpirationRenewal(threadId);
  //...
    });
    return result;
}

核心逻辑在于unlockInnerAsync:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                          "return nil;" +
                          "end; " +
                          "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                          "if (counter > 0) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                          "return 0; " +
                          "else " +
                          "redis.call('del', KEYS[1]); " +
                          "redis.call('publish', KEYS[2], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          "return nil;",
                          Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

这里的lua脚本用于释放一层锁,如果释放完一层锁后,锁的数量为0,则删除对应的key(释放锁), 此时还会发布一条消息通知Redis,可参考2.3 释放一层锁

5.3 订阅和通知

章节5.2中介绍了客户端获取锁失败后Redis订阅,然后进入等待队列阻塞;在章节5.3中介绍了释放锁以及向Redis发布通知消息,本章节的内容是将二者衔接起来。

订阅和通知的核心功能是:阻塞在等待队列中的客户端将会因为通知消息而被唤醒。

Redis通知Redission:

客户端与Redis服务器底层的通讯是基于TCP链,Redssion使用Netty进行了封装,在pipeline中添加了CommandPubSubDecoder解码器,该解码器中存在如下逻辑:

protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, Channel channel,Object result) throws IOException {
 //...
 if (result instanceof Message) {
  //...
  if (result instanceof PubSubMessage) {
   pubSubConnection.onMessage((PubSubMessage) result);
  }
  //...
 }
 // ...
}

当接收到Message且是PubSubMessage类型的消息(即前文介绍的message redisson_lock__channel:{myLock} 0消息)时,调用pubSubConnection.onMessage((PubSubMessage) result),进入该方法:

public void onMessage(PubSubMessage message) {
    for (RedisPubSubListener<Object> redisPubSubListener : listeners) {
        redisPubSubListener.onMessage(message.getChannel(), message.getValue());
    }
}

这里会调用注册的监听器,包括5.1.2 获取锁失败章节中注册的监听器。

Redission注册监听器:
继续看一下5.1.2 获取锁失败章节RFuture<RedissonLockEntry> future = subscribe(threadId)逻辑:

protected RFuture<RedissonLockEntry> subscribe(long threadId) {
    return pubSub.subscribe(getEntryName(), getChannelName());
}

public RFuture<E> subscribe(String entryName, String channelName) {
    //...
    RedisPubSubListener<Object> listener = createListener(channelName, value);
    service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
    //...
}

通过createListener创建监听器,然后将监听器注册到RedisPubSubConnection对象的listeners属性中:

public class RedisPubSubConnection extends RedisConnection {
    // 监听器容器
    final Queue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>();
    
    // 添加监听器的方法
    public void addListener(RedisPubSubListener<?> listener) {
        listeners.add((RedisPubSubListener<Object>) listener);
    }
}

至于创建listener后,如何调用RedisPubSubConnection的addListener方法可通过代码追踪和Bebug进行了解,不是重点内容;这里重点关注的是这个监听器的内部逻辑,即当这个监听器被调用时触发的逻辑:

private RedisPubSubListener<Object> createListener(String channelName, E value) {
    RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {
        @Override
        public void onMessage(CharSequence channel, Object message) {
            if (!channelName.equals(channel.toString())) {
                return;
            }
            PublishSubscribe.this.onMessage(value, (Long) message);
        }

      //...
    };
    return listener;
}

channelName.equals(channel.toString())用于判断收到的消息是否是订阅的消息,如前文介绍的订阅的频道是redisson_lock__channel:{myLock}, 此时会校验channelName。
核心逻辑在于PublishSubscribe.this.onMessage(value, (Long) message):

protected void onMessage(RedissonLockEntry value, Long message) {
    //...
    value.getLatch().release();
    //...
}

value.getLatch()获取的是前文介绍的Semaphore对象,调用release时会修改state值,并唤醒一个等待的线程。
基于上述介绍:线程获取分布式锁失败后,向Redis订阅消息,并注册监听器,然后陷入Semaphore的等待队列;当其他客户端释放锁时,同时会发布通知消息给Redis服务器;Redis服务器收到消息后,向订阅的客户端发送通知消息;客户端收到通知消息后,触发监听器逻辑,监听器基于Semaphore机制,唤醒阻塞的线程。线程被唤醒后再次尝试获取分布式锁。