Redis下篇--分布式锁

发布于:2025-06-24 ⋅ 阅读:(15) ⋅ 点赞:(0)

Redis下篇–分布式锁

本文示例代码见GITEE仓库中的【redis-analysis】

地址:https://gitee.com/quercus-sp204/new-technology/tree/master/redis-analysis

1.基本介绍

Redis分布式锁是一种基于Redis实现的跨进程互斥机制,用于在分布式系统中控制多个服务/节点对共享资源的并发访问,确保同一时刻只有一个客户端能执行关键操作(如修改共享数据、执行任务等)。

在单机系统中,我们可以用线程锁(如Java的synchronizedReentrantLock)保证并发安全。但在分布式系统中【项目集群部署】,服务部署在多台机器上,跨进程的共享资源无法通过本地锁保护。如下图1所示:

在这里插入图片描述

此时就需要分布式锁。我们需要用它来:

  1. 解决分布式竞争条件:当多个服务实例同时操作共享资源(如库存扣减、订单状态更新)时,可能因并发写入导致数据错误(例如超卖)。分布式锁确保同一时刻只有一个实例能执行操作。

  2. 替代低效方案:传统方案(如数据库行锁)在并发高时性能差,而Redis作为内存数据库,高性能(10万+ QPS)原子操作特性使其成为理想的分布式锁实现基础。

  3. 保证操作原子性:分布式锁将临界区操作(如“查询+修改”)封装为原子操作,避免多个客户端交叉执行引发的逻辑错误。

大致示意图如下图2所示:(就是多加了一层!)

在这里插入图片描述

上面说了,用于分布式系统的,我在这里抛出一个小问题:现在也有很多应用不是微服务架构的,我们是不是没有必要上分布式锁呢?熟悉java的人都知道,我们可以用java自带的synchronized、ReentrantLock锁呀,我们为什么要这个分布式锁呢?好像确实有些许道理。这个问题先放一放。

介绍完了其基本信息,下面给出其在实际生产中的应用场景:

  1. 防止重复操作:用户重复提交订单:方案:用订单ID+操作类型作为锁key,确保同一订单的支付操作只执行一次。
  2. 高并发库存扣减
# 伪代码示例【大致流程】
lock_key = "stock_lock:product_123"
if redis.set(lock_key, "1", nx=True, ex=5):  # 获取锁
    try:
        stock = db.query("SELECT stock FROM products WHERE id=123")
        if stock > 0:
            db.execute("UPDATE products SET stock = stock - 1 WHERE id=123")
    finally:
        redis.del(lock_key)  # 释放锁
  1. 分布式任务调度:多个节点竞争执行定时任务(如每天0点统计报表,或者是每天定时数据库做一些数据修改之类的),在要执行定时任务的时候,我们可以用任务名作为key,抢到锁的节点执行任务。

等等场景都可以用到分布式锁。其实分布式锁实现方式不只有Redis可以实现,还可以用其他方案来实现,比如说

我们可以用数据库来实现分布式锁,通过SELECT ... FOR UPDATE对数据库记录加行锁,亦或者是在表中增加版本号字段,更新时校验版本号(CAS机制);但是频繁加锁导致数据库IO压力大,高并发下延迟显著,同时事务未提交或超时可能引发死锁。

还可以使用Zookeeper来创建临时结点来达到类似的效果,创建临时顺序节点,最小序号节点获锁,其他节点监听前序节点删除事件,节点宕机时临时节点自动删除,避免死锁。但是相比于redis来说,节点创建/删除及事件通知的开销较大(低于Redis),同时可能需维护ZooKeeper集群,开发成本较高。

实际生产要将成本和系统复杂度综合起来考虑,最后再决定采用哪种方案,但是Redis这个中间件对于现在的很多系统来说,我认为是用得非常普遍了吧。在综合考虑性能、系统复杂度、以及业界普遍方案,本文就以Redis实现分布式锁展开了。

2. Redis实现分布式锁

2.1 基础版setnx

SETNX lock_key 1  # 尝试获取锁
DEL lock_key      # 释放锁
    
@Component
public class BaseLock {
    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    public boolean tryLock(String lockName) {
        Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockName, "lock");
        return lock != null && lock;
    }

    public void unLock(String lockName) {
        redisTemplate.delete(lockName);
    }
}

// 测试使用
public void test() {
    // 1.加锁
    try {
        bool res = baseLock.tryLock(key);
        if ( res ) {
            do......
        } else {
            获取锁失败
        }
    } catch( e ) {
        ........
    } finally{
        baseLock.unLock(key) // 如果这一行死活执行不了
    }
}

这绝对是有一点问题的,如果有异常原因,在执行baseLock.unLock(key) 这行代码的时候出现异常怎么办,可能这个时候网络异常?Redis客户端断开了?等等原因,导致该key没有被删掉,那么就会有死锁问题。下面为了避免这个问题,我们把自动删除key的操作,加一个过期时间,让redis-server为我们兜一下底。

2.2 过期时间

在设置过期时间这一点上,需要提前说明一下,Redis是有EXPIRE这个命令的,它为锁设置过期时间,避免死锁(如 EXPIRE lock_key 10),如果是先setnx,再expire,这就是两条命令了,所以我们需要保证两条命令的原子性。但是Redis如今都发展到版本7.x了,Redis早就提供了原子命令:

SET lock_key unique_value NX EX 10 #同时完成锁设置和过期时间,避免非原子性问题

本文就直接用上面这个了。

// 方案二:set + nx + ex
public boolean tryLock2(String lockName, int t) {
    Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockName, "lock", Duration.of(t, ChronoUnit.SECONDS));
    return lock != null && lock;
}
public void unLock2(String lockName) {
    redisTemplate.delete(lockName);
}

// 测试使用
public void test() {
    // 1.加锁
    try {
        // 指定key的有效时间 假设time=5
        bool res = baseLock.tryLock2(key, time);
        if ( res ) {
            do...... // 【但是这里执行6秒喔】
        } else {
            获取锁失败
        }
    } catch( e ) {
        ........
    } finally{
        baseLock.unLock2(key) //【释放锁处】
    }
}

上面这个方案,就算是【释放锁处】死活执行不了,有过期时间为我们兜底,不用担心死锁问题,过期时间到了就行了。上面仍然会有问题,且听细细道来。

看上面测试代码的注释 do...... // 【但是这里执行6秒喔】,就是说业务执行时间超过了锁的过期时间,就会导致如下问题:

在这里插入图片描述

请按照顺序看上面的流程,我们可以知道发生了这样一件事情:那就是线程1把线程2的锁给删了,出现误删的现象,然后后面就可能会引发一连串的错误了。

下面来看方案3,前面两个方案的value我们都没有用到,现在要用到了:

// 方案三:set + uuid
public boolean tryLock3(String lockName, String value, int t) {
    Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockName, value, Duration.of(t, ChronoUnit.SECONDS));
    return lock != null && lock;
}
public void unLock3(String lockName, String value) {
    String lock = (String) redisTemplate.opsForValue().get(lockName);
    if ( lock != null && lock.equals(value)) { // 比较一下value,是不是自己的
        redisTemplate.delete(lockName);
    }
}

public void test() {
    String lockName = "lock";
    String uuid = UUID.randomUUID().toString();
    try {
        boolean b = tryLock3(lockName, uuid, 5);
        if (b) {
            // 获取锁成功
            // 逻辑代码
            Thread.sleep(5000);
        } else {
            // 获取锁失败
        }
    } catch (Exception e ) {
        // ...
    } finally {
        unLock3(lockName, uuid);
    }
}

虽然这样看起来不会释放别人的锁了,但是如果业务超时,还是会出现有多个线程进入临界区的情况,这是不希望看到的。还有一个小问题,就是unLock3的那两步不是原子操作,极端情况也会出现误删问题:下面给出deepseek的极端情况推演图
在这里插入图片描述

2.3 lua脚本

所以要保证原子性。我们使用redis lua脚本来保证这两部操作的原子性

private static final String UNLOCK_SCRIPT =
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                    "   return redis.call('del', KEYS[1]) " +
                    "else " +
                    "   return 0 " +
                    "end";
// 删除锁的时候用lua脚本 unLock3优化版
public void unLock3(String lockName, String value) {
    DefaultRedisScript<Long> script = new DefaultRedisScript<>(UNLOCK_SCRIPT, Long.class);
    redisTemplate.execute(script, Collections.singletonList(lockName), value);
}

这里顺带提一嘴,Redis不是还有事务吗?我们可以用事务吗:

public void unsafeUnlockWithTransaction(String lockName, String value) {
    redisTemplate.execute(new SessionCallback<>() {
        @Override
        public Object execute(RedisOperations operations) {
            operations.watch(lockName);
            String lockValue = (String) operations.opsForValue().get(lockName);
            if (value.equals(lockValue)) {
                operations.multi();
                operations.delete(lockName);
                operations.exec(); // 事务提交
            } else {
                operations.unwatch();
            }
            return null;
        }
    });
}

那么,这个Redis事务在第三章补充一下,在此处先放一下。再次回到2.2小节的问题,那就是业务时间 > 锁持有时间,还是会导致多个线程进入临界区的情况,所以这个过期时间是我们要着重考虑的问题了。

2.4 锁续期

对于这个问题,我们可以有这样的实现思路,启动一个线程在后台对key进行续期不就好了?可以我们可以利用Java的周期任务线程来完成这个效果,简单示例代码如下:

@Component
@Slf4j
public class AutoExtensionLock {
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    // 锁的队列
    private static ConcurrentLinkedQueue<RedisLockHolder> keys = new ConcurrentLinkedQueue<>();

    private static final ScheduledExecutorService SCHEDULER =
            new ScheduledThreadPoolExecutor(1,
                new BasicThreadFactory.Builder().daemon(true).namingPattern("redis-AutoExtensionLock-%d").build());

    @PostConstruct
    public void init() {
        // 定时检查锁的到期时间
        SCHEDULER.scheduleAtFixedRate(() -> {
            Iterator<RedisLockHolder> iterator = keys.iterator();
            while (iterator.hasNext()) {
                RedisLockHolder redisLockHolder = iterator.next();
                log.info("redis-AutoExtensionLock-info, {}", redisLockHolder.toString());
                try { //try-catch起来否者报错后定时任务将不会再运行
                    if (redisLockHolder.getEndTime() >= System.currentTimeMillis()) {
                        Object v = redisTemplate.opsForValue().get(redisLockHolder.getKey());
                        if ( v == null ) { // 锁不存在
                            iterator.remove();
                        } else {
                            long holderExpireTime = redisLockHolder.getEndTime();
                            long now = System.currentTimeMillis();
                            long during = redisLockHolder.getDuring();
                            int maxRetry = redisLockHolder.getMaxRetry();
                            int nowCount = redisLockHolder.getNowCount();
                            long l = holderExpireTime - now; // 锁的剩余时间
                            if ( nowCount == maxRetry ) {
                                log.info("redis-AutoExtensionLock-error, {}", "重试次数已满");
                                iterator.remove();
                            } else if ( l <= 0 ) {
                                log.info("redis-AutoExtensionLock-error, {}", "锁已过期");
                                iterator.remove();
                            }
                            // 如果 l <= during / 3
                            else if ( l <= during / 3.0 ) {
                                log.info("redis-AutoExtensionLock-info, {}", "锁即将过期,开始自动续期");
                                redisTemplate.expire(redisLockHolder.getKey(), redisLockHolder.getDuring(), TimeUnit.SECONDS);
                                redisLockHolder.setNowCount(nowCount + 1);
                                redisLockHolder.setEndTime(now + during);
                            }
                        }
                    }
                    else {
                        iterator.remove();
                    }
                } catch (Exception e) {
                    // ....
                    log.info("redis-AutoExtensionLock-error, {}", e.toString());
                    iterator.remove();
                }
            }

        }, 0, 3000, TimeUnit.MILLISECONDS);
    }

    // 尝试获取锁
    public boolean tryLock4(String lockName, String value, long t ) {
        Boolean b = redisTemplate.opsForValue().setIfAbsent(lockName, value, Duration.of(t, ChronoUnit.SECONDS));
        if (b != null && b) {
            keys.add(new RedisLockHolder(lockName, value, t * 1000,System.currentTimeMillis() + (t * 1000), 0, 5));
            return true;
        }
        return false;
    }

    // 释放锁
    private static final String UNLOCK_SCRIPT =
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                    "   return redis.call('del', KEYS[1]) " +
                    "else " +
                    "   return 0 " +
                    "end";
    public void unlock4(String lockName, String value) {
        DefaultRedisScript<Long> script = new DefaultRedisScript<>(UNLOCK_SCRIPT, Long.class);
        redisTemplate.execute(script, Collections.singletonList(lockName), value);
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    private static class RedisLockHolder {
        public static final int MAX_RETRY = 5;

        private String key;
        private String value;
        private long during; // 锁的持续时间 --- 毫秒
        private long endTime; // 到期时间
        private int nowCount; // 当前重试次数
        private int maxRetry; // 重试次数
    }
}

在这里插入图片描述

像这种守护线程去续期锁的机制,叫做“看门狗机制”。

2.5 Redisson

2.5.1 基本介绍

如果自行去实现各种api,相比是有点麻烦的,所以Redisson 是一个基于 Redis 的高性能 Java 客户端库,专为分布式系统设计。它不仅封装了 Redis 的基础操作(如键值存储),还提供了丰富的分布式数据结构和服务,简化了分布式环境下的开发复杂度。Redisson 的核心价值在于 将分布式系统的通用能力(锁、队列、缓存等)抽象为易用的 Java API,开发者无需重复造轮子即可构建高可靠分布式应用。Redisson的宗旨是促进使用者对 Redis 的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上

如果我们的项目中已使用 Redis,那么Redisson可能是比原生客户端更高效的选择。

  • 分布式锁

它在分布式锁的场景下,实现了可重入锁(Reentrant Lock):同一线程多次获取同一锁不会死锁,通过 Hash 结构存储线程 ID 和重入计数器;还有公平锁(Fair Lock)【默认情况下redisson分布式锁是非公平的,即任意时刻任意一个请求都可以在锁释放后争抢分布式锁】:基于 Redis 的 BRPOPLPUSH 命令,按请求顺序分配锁,避免饥饿问题;它也有自带的看门狗喔(自动续期(Watchdog)):后台线程自动延长锁超时时间,防止业务未完成时锁过期。

Redisson的看门狗机制提供的默认超时时间是30*1000毫秒,也就是30秒

如果一个线程获取锁后,运行程序到释放锁所花费的时间大于锁自动释放时间(也就是看门狗机制提供的超时时间30s),那么Redission会自动给redis中的目标锁延长超时时间。在Redission中想要启动看门狗机制,那么我们就不用获取锁的时候自己定义leaseTime(锁自动释放时间)。如果自己定义了锁自动释放时间的话,无论是通过lock还是tryLock方法,都无法启用看门狗机制。但是,如果传入的leaseTime为-1,也是会开启看门狗机制的。

这里抛出一个小问题:如果 业务一直没执行完,那岂不是一直续期,相当于死锁了吗,这种情况怎么处理?

乍一看,对喔,有道理啊,但是仔细一想,这是问题吗?业务代码怎么可能一直没执行完(一直没执行完难道不是业务程序的逻辑有问题吗)?而且,看门狗是给拿到锁的客户端续期(有限时间),在看门狗或拿到锁的客户端宕机后(或是其它异常)就会停止续期,最后一次续期的时间就是他的有效期,到期自动释放。这个锅中间件不能背喔。

RLock lock = redisson.getLock("orderLock");
lock.lock(); // 获取锁(自动续期)
try {
    // 业务操作
} finally {
    lock.unlock(); // 原子化释放
}

在分布式锁的基础上还提供了联锁(MultiLock),读写锁(ReadWriteLock),公平锁(Fair Lock),红锁(RedLock),信号量(Semaphore),可过期性信号量(PermitExpirableSemaphore)和闭锁(CountDownLatch)这些实际当中对多线程高并发应用至关重要的基本部件

  • 分布式数据结构
类型 实现类示例 功能说明
Map RMap 分布式 HashMap,支持本地缓存优化
Queue RBlockingQueueRDelayedQueue 阻塞队列,支持任务调度;延迟队列
AtomicLong RAtomicLong 分布式原子计数器
Bloom Filter RBloomFilter 高效大数据去重

布隆过滤器见:

CSDN:Redis中篇

微信公众号:https://mp.weixin.qq.com/s/tcUQeX1PFOrW_ifSBytefQ

在普通数据结构上,将原生的Redis Hash,List,Set,String,Geo,HyperLogLog等数据结构封装为Java里大家最熟悉的映射(Map),列表(List),集(Set),通用对象桶(Object Bucket),地理空间对象桶(Geospatial Bucket),基数估计算法(HyperLogLog)等结构,

  • 网络通信上

Redisson采用了基于NIO的Netty框架,不仅能作为Redis底层驱动客户端,具备提供对Redis各种组态形式的连接功能,对Redis命令能以同步发送、异步形式发送、异步流形式发送或管道形式发送的功能,LUA脚本执行处理,以及处理返回结果的功能

可以说,Redisson算是Redis的一款开发利器了!怪不得叫他“瑞士军刀”。。

2.5.2 分布式锁相关原理分析
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.45.1</version>
</dependency>

首先是这个版本。

加锁大致过程:指定一个 key 作为锁标记,存入 Redis 中,指定一个 唯一的用户标识 作为 value;当 key 不存在时才能设置值,确保同一时间只有一个客户端进程获得锁,满足 互斥性 特性;设置一个过期时间,防止因系统异常导致没能删除这个 key,满足 防死锁 特性;当处理完业务之后需要清除这个 key 来释放锁,清除 key 时需要校验 value 值,需要满足 只有加锁的人才能释放锁

现在看这个tryLock方法**【无参数的】**

public boolean tryLock(String lockName) {
    return redissonClient.getLock(lockName).tryLock();
}

public void unLock(String lockName) {
    redissonClient.getLock(lockName).unlock();
}

// 带超时的锁
public boolean tryLock(String lockName, long waitTime, long leaseTime) {
    try {
        return redissonClient.getLock(lockName).tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        log.error("tryLock 出现异常", e);
        return false;
    }
}

可以看到使用起来非常简单吧。那我们就从加锁、释放锁看起。

我们先看加锁,也就是这里:redissonClient.getLock(lockName).tryLock();

// 首先是getLock
// 【Redisson.java】
public final class Redisson implements RedissonClient {
    @Override
    public RLock getLock(String name) {
        // 返回的是RLock类型【具体类型是RedissonLock】
        return new RedissonLock(commandExecutor, name);
    }
}
// 然后就是tryLock方法了
// 【RedissonLock.java】
public class RedissonLock extends RedissonBaseLock {
    @Override
    public boolean tryLock() {
        // 1.tryLockAsync()
        // 2.get( RFuture<Boolean> )
        // get( xxx ) 阻塞到tryLockAsync()返回
        return get(tryLockAsync());
    }
   
}

// 【RedissonBaseLock.java】
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
    @Override
    public RFuture<Boolean> tryLockAsync() {
        // 这里调用的tryLockAsync()方法,具体实现在【RedissonLock.java】
        // 这个方法是RLockAsync接口里面定义的
        // RLock接口继承了RLockAsync接口,所以这里可以看到这个方法
        return tryLockAsync(Thread.currentThread().getId());
    }
}

// 现在又回到 【RedissonLock.java】
@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
    // 先 () -> tryAcquireOnceAsync(-1, -1, null, threadId)
    // 返回RFuture<Boolean>是execute方法返回的
    return getServiceManager().execute(() -> tryAcquireOnceAsync(-1, -1, null, threadId));
}
private <T> void execute(AtomicInteger attempts, CompletableFuture<T> result, Supplier<CompletionStage<T>> supplier) {
    // 这里一行执行的是传进来的supplier.....也就是() -> tryAcquireOnceAsync(-1, -1, null, threadId)
    CompletionStage<T> future = supplier.get(); 
    future.whenComplete((r, e) -> { 
        if (e != null) { // 有异常
            if (.....) {
                ....
				// 重试
                newTimeout(t -> execute(attempts, result, supplier),
                           config.getRetryInterval(), TimeUnit.MILLISECONDS);
                return;
            }
            result.completeExceptionally(e);
            return;
        }
		// 没有异常,ok,future设置为完成
        result.complete(r);
    });
}
// () -> tryAcquireOnceAsync(-1, -1, null, threadId)
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    CompletionStage<Boolean> acquiredFuture;
    // 如果给的所持有时间 > 0
    if (leaseTime > 0) {
        //场景:用户显式指定锁超时时间(如 lock.lock(10, TimeUnit.SECONDS))
        acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    } else { 
        // 如果 <= 0, 由于我们最外层是调用的tryLock(),没有带任何参数,走到这里的话,leaseTime传过来的是-1
        // 在该方法下面看tryLockInnerAsync方法
        acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
             TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    }

    acquiredFuture = handleNoSync(threadId, acquiredFuture);

    CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {
        // lock acquired
        if (acquired) {
            if (leaseTime > 0) { // 如果传进来的锁持有时间>0
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else { 
                // 如果传进来的锁持有时间 <= 0
                // 会启动看门狗喔
                scheduleExpirationRenewal(threadId);
            }
        }
        return acquired;
    });
    return new CompletableFutureWrapper<>(f);
}
// 可以看到底层是用了lua脚本喔
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, command,
        "if ((redis.call('exists', KEYS[1]) == 0) " + // 情况1:锁不存在
                  "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " + // 情况2:锁已被当前线程持有
              "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 重入计数+1
              "redis.call('pexpire', KEYS[1], ARGV[1]); " + // 设置/刷新过期时间
              "return nil; " + // 返回nil表示成功
         "end; " +
         "return redis.call('pttl', KEYS[1]);", // 返回锁剩余时间(毫秒)
       Collections.singletonList(getRawName()), // KEYS[1] = 锁名称
       unit.toMillis(leaseTime),  // ARGV[1] = 锁持有时间(毫秒)
       getLockName(threadId));  // ARGV[2] = 线程标识符(UUID:threadId)
}

到这里我们知道,Redisson不是用的string数据类型的,而是hash类型的。在Redisson里面是这样的:【可以看到是一个可重入锁】

key: test [hash类型的]
value: 
----field: 285475da-9152-4c83-822a-67ee2f116a79:52 [线程ID]
----val: 1 [重入次数]

看得有点儿长了,我们接着看看门狗是怎么运行的:

// 接上文
// 会启动看门狗喔
// 【RedissonLock.java】
scheduleExpirationRenewal(threadId);
// 这个方法其实是 【RedissonBaseLock.java】里面的
protected void scheduleExpirationRenewal(long threadId) {
    renewalScheduler.renewLock(getRawName(), threadId, getLockName(threadId));
}
// 【LockRenewalScheduler.java】
public void renewLock(String name, Long threadId, String lockName) {
    reference.compareAndSet(null, new LockTask(internalLockLeaseTime, executor, batchSize));
    LockTask task = reference.get();
    task.add(name, lockName, threadId);
}
// ...
// 到这里
// RenewalTask.java
final void add(String rawName, String lockName, long threadId, LockEntry entry) {
    addSlotName(rawName);
    LockEntry oldEntry = name2entry.putIfAbsent(rawName, entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId, lockName);
    } else {
        if (tryRun()) {
            schedule(); // 这里
        }
    }
}
public void schedule() {
    if (!running.get()) {
        return;
    }
    long internalLockLeaseTime = executor.getServiceManager().getCfg().getLockWatchdogTimeout();
    //定时任务 是 lockWatchdogTimeout 的1/3时间去执行 renewExpirationAsync【默认就是10秒嘛】
    executor.getServiceManager().newTimeout(this, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
}
// ServiceManager.java
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    try {
        return timer.newTimeout(task, delay, unit);
    } catch (IllegalStateException e) {
        ....
    }
}
// 然后我们发现,来到netty包下面了 io.netty.util
// public class HashedWheelTimer implements Timer {....}
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    // delay参数是internalLockLeaseTime / 3
}

我发现这个Netty时间轮算法,我还不会,在这里就不继续深究了。见后续文章吧。

上面是tryLock无参数的,下面来看tryLock带时间参数的,方法有一点长,这里仅给出关键部分:

// 【RedissonLock.java】
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    // 阶段1:初始化计时
    long time = unit.toMillis(waitTime);  // 总等待时间(ms)
    long current = System.currentTimeMillis();  // 起始时间戳
    long threadId = Thread.currentThread().getId();  // 线程唯一ID
    // 阶段2:首次尝试获取锁
    // 最终会走到上面的见过的方法 tryAcquireOnceAsync -- 执行Lua脚本
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    if (ttl == null) {  // 加锁成功
        return true;
    }
    // 阶段3:检查剩余等待时间
    time -= System.currentTimeMillis() - current;  // 扣除已用时间
    if (time <= 0) {  // 已超时
        acquireFailed(waitTime, unit, threadId);  // 记录失败指标
        return false;
    }
    // 阶段4:订阅锁释放事件
    // 通过 Redis Pub/Sub 监听锁释放事件(通道名:redisson_lock__channel:{lockName}
    current = System.currentTimeMillis();
    CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    try {
        subscribeFuture.get(time, TimeUnit.MILLISECONDS);
    } catch (TimeoutException e) {
        // 订阅超时处理(关键点2)
        if (!subscribeFuture.completeExceptionally(new RedisTimeoutException(...))) {
            subscribeFuture.whenComplete((res, ex) -> {
                if (ex == null) unsubscribe(res, threadId);  // 异步取消订阅
            });
        }
        // 若订阅操作超时(TimeoutException),取消订阅并返回失败
        acquireFailed(waitTime, unit, threadId);
        return false;
    } catch (ExecutionException e) {
        // 订阅异常处理
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
    try {
        // 阶段5:二次时间检查
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        // 阶段6:循环尝试获取锁(核心逻辑)
        while (true) {
            long currentTime = System.currentTimeMillis();
            ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
            if (ttl == null) {  // 获取成功
                return true;
            }
            // 更新时间余额
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {  // 总等待超时
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
            // 阶段7:精准阻塞等待(设计精髓)
            currentTime = System.currentTimeMillis();
            if (ttl >= 0 && ttl < time) {  // 锁将先于等待时间过期
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {  // 等待时间先结束
                getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }
            // 最终时间检查
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        }
    } finally {
        // 阶段8:资源清理
        unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
    }
}

接下来看一下释放锁的:redissonClient.getLock(lockName).unlock();

// 【RedissonBaseLock.java】
@Override
public void unlock() {
    try {
        // 同理哦
        // 看这个unlockAsync(Thread.currentThread().getId())
        get(unlockAsync(Thread.currentThread().getId()));
    } ...
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
    // 通过上面的加锁源码分析,这一步也很能理解了
    // () -> unlockAsync0(threadId)
    return getServiceManager().execute(() -> unlockAsync0(threadId));
}

private RFuture<Void> unlockAsync0(long threadId) {
    // 这个
    CompletionStage<Boolean> future = unlockInnerAsync(threadId);
    CompletionStage<Void> f = future.handle((res, e) -> {
        cancelExpirationRenewal(threadId, res);
        if (e != null) {
            // 抛异常
        }
        if (res == null) {
            。。。。
        }
        return null;
    });

    return new CompletableFutureWrapper<>(f);
}
// 然后绕进去,最后会看到这里
protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {
    return evalWriteSyncedNoRetryAsync(
        getRawName(),  // 锁名称(Redis Key)
        LongCodec.INSTANCE, // 编解码器
        RedisCommands.EVAL_BOOLEAN, // 期望返回布尔值
        // 以下是关键 Lua 脚本
         "local val = redis.call('get', KEYS[3]); " +
         "if val ~= false then " +
         	  "return tonumber(val);" +
         "end; " +
         "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
         	  "return nil;" +
         "end; " +
         "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 减少重入计数
         "if (counter > 0) then " + // 重入计数 > 0
              "redis.call('pexpire', KEYS[1], ARGV[2]); " + // 更新锁过期时间
              "redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +
              "return 0; " +
         "else " + // 重入计数 = 0
             "redis.call('del', KEYS[1]); " +  // 删除锁
             "redis.call(ARGV[4], KEYS[2], ARGV[1]); " +  // 发布解锁消息
             "redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +
             "return 1; " +
         "end; ",
         // KEYS 参数列表
         // KEYS[1] = 锁名称(如 myLock) 
         // KEYS[2] = 发布订阅频道(如 redisson_lock__channel:myLock)
         // KEYS[3] = key(如 redisson_unlock_latch:{UUID})
         Arrays.asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),
         LockPubSub.UNLOCK_MESSAGE,  // ARGV[1] = 解锁消息(固定为0)
         internalLockLeaseTime,// ARGV[2] = 锁租约时间(毫秒)
         getLockName(threadId), // ARGV[3] = 锁持有者ID(UUID:threadId)
         getSubscribeService().getPublishCommand(),  // ARGV[4] = 发布命令(PUBLISH)
         timeout);// ARGV[5] = 超时时间
}

3. 补充点

3.1 redis事务

上文说到了Redis的事务问题,本章探讨一下Redis的事务:

事务本质是一组命令的集合,支持一次执行多个命令,一个事务中所有命令都会被序列化。在redis事务的执行过程,会按照顺序串行化执行队列中的命令,其他客户端提交的命令请求不会插入到事务执行命令序列中。

redis的事务指令有3个关键字,分别是:

  1. multi:开启事务
  2. exec:执行事务
  3. discard:取消事务
  4. watch:监视Key改变,用于实现乐观锁。如果监视的Key的值改变,事务最终会执行失败。在事务开启前使用
  5. unwatch:放弃监视。

通过multi,当前客户端就会开启事务,后续用户键入的都指令都会保证到队列中暂不执行,当用户键入exec后,这些指令都会按顺序执行。 需要注意的是,若开启multi后输入若干指令,客户端输入discard,则之前的指令通通取消执行。

总结说:Redis事务就是一次性、顺序性、排他性的执行一个队列中的一系列命令

127.0.0.1:6379[2]> keys *
(empty array)
127.0.0.1:6379[2]> MULTI # 开启事务
OK
127.0.0.1:6379[2](TX)> set test1 1
QUEUED
127.0.0.1:6379[2](TX)> set test2 2qwe
QUEUED
127.0.0.1:6379[2](TX)> EXEC # 执行
1) OK
2) OK
127.0.0.1:6379[2]> keys *
1) "test2"
2) "test1"

那么Redis事务满足我们熟悉的事务四大特性吗?

对于隔离性来说,Redis是单线程执行命令的,并且执行事务时是对事务队列中的命令依次执行,因此Redis不会出现隔离性问题。

持久性那就不必多说了。

重点看一下原子性:

# 情况一:语法错误【编译器就可以检查到】,可以看到还是可以保证原子性的
127.0.0.1:6379[2]> keys *
(empty array)
127.0.0.1:6379[2]> multi
OK
127.0.0.1:6379[2](TX)> set test1 1
QUEUED
127.0.0.1:6379[2](TX)> seeqw test2 2
(error) ERR unknown command 'seeqw', with args beginning with: 'test2' '2' 
127.0.0.1:6379[2](TX)> exec
(error) EXECABORT Transaction discarded because of previous errors.
127.0.0.1:6379[2]> keys *
(empty array)

# 情况二:运行时错误
# 这种情况并没有保证原子性
# 除了运行出错的命令,其他命令都会执行喔
127.0.0.1:6379[2]> keys *
(empty array)
127.0.0.1:6379[2]> MULTI
OK
127.0.0.1:6379[2](TX)> set test1 1
QUEUED
127.0.0.1:6379[2](TX)> set test2 2qwe # 不是纯数字
QUEUED
127.0.0.1:6379[2](TX)> incr test2 # 这一步肯定是出错
QUEUED
127.0.0.1:6379[2](TX)> incr test1
QUEUED
127.0.0.1:6379[2](TX)> EXEC # 执行事务
1) OK
2) OK
3) (error) ERR value is not an integer or out of range
4) (integer) 2
127.0.0.1:6379[2]> keys * # 发现有俩key
1) "test2"
2) "test1"
127.0.0.1:6379[2]> get test1 # 对test1的自增成功了
"2"

当命令输入错误会在执行时直接报错,这种情况下能够满足原子性

当运行时出现错误时,会执行到具体命令时才报错,这种情况下除了报错的命令不执行,事务中其他正常的命令会执行,不能满足原子性

为什么这么做?

  • 使用Redis命令语法错误,或是将命令运用在错误的数据类型键上(如对字符串进行加减乘除等),从而导致业务数据有问题,这种情况认为是编程导致的错误,应该在开发过程中解决,避免在生产环境中发生;
  • 由于不用支持回滚功能,Redis内部简单化,而且还比较快;

多数事务失败是由语法错误或者数据结构类型错误导致的,语法错误说明在命令入队前就进行检测的,而类型错误是在执行时检测的,Redis为提升性能而采用这种简单的事务,这是不同于关系型数据库的,特别要注意区分。Redis之所以保持这样简易的事务,完全是为了保证高并发下的核心问题——性能

接下来看一下watch命令:【最前面的数字,表示命令执行顺序】

客户端一:

【6】127.0.0.1:6379[2]> keys *
1) "test1"
【7】127.0.0.1:6379[2]> get test1
"100"
【8】127.0.0.1:6379[2]> set test1 150
OK

客户端二:

【1】127.0.0.1:6379[2]> keys *
(empty array)
【2】127.0.0.1:6379[2]> set test1 100
OK
【3】127.0.0.1:6379[2]> watch test1
OK
【4】127.0.0.1:6379[2]> multi
OK
【5】127.0.0.1:6379[2](TX)> set test1 1000
QUEUEDime-seconds|PXAT unix-time-milliseconds|KEEPTTL]
【9】127.0.0.1:6379[2](TX)> exec
(nil) # 在执行前被其他客户端修改了
【10】127.0.0.1:6379[2]> get test1
"150"

上面通过watch监视指定Redis Key ( test1 ),如果在事务执行之前没有改变,就执行成功,如果发现对应值发生改变,事务就会执行失败

看到这里,在2.3节可以用事务吗?我认为在逻辑不出错的情况下,是可以用的,需要我们编程人员编码的时候,来确认每一条命令都确保逻辑上都是正确的。

但是我们能用Lua脚本解决的原子性问题,优先用Lua

3.2 tryLock 和 lock

上面源码分析的都是tryLock的,其实还有lock方法,那么这二者有什么区别呢?

(1)返回值: lock() 是没有返回值的;tryLock() 的返回值是 boolean。

(2)时机:lock() 一直等锁释放;tryLock() 获取到锁返回true,获取不到锁并直接返回false

(3)tryLock() 是可以被打断的,被中断的;lock是不可以。

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
	// 获取当前线程 ID
    long threadId = Thread.currentThread().getId();
    // 获取锁,正常获取锁则ttl为null,竞争锁时返回锁的过期时间
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    if (ttl == null) {
        return;
    }
    // 订阅锁释放事件
    // 如果当前线程通过 Redis 的 channel 订阅锁的释放事件获取得知已经被释放,则会发消息通知待等待的线程进行竞争
    CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
    pubSub.timeout(future);
    RedissonLockEntry entry;
    if (interruptibly) {
        entry = commandExecutor.getInterrupted(future);
    } else {
        entry = commandExecutor.get(future);
    }
    try {
        while (true) {
            // 循环重试获取锁,直至重新获取锁成功才跳出循环
            // 此种做法阻塞进程,一直处于等待锁手动释放或者超时才继续线程    
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
            if (ttl == null) {
                break;
            }
            ....
        }
    } finally {
        // 最后释放订阅事件
        unsubscribe(future, threadId);
    }
}

实际二者使用

RLock lock = redisson.getLock("myLock");
boolean isLocked = lock.tryLock(); // 不会阻塞
if (isLocked) { // 可以根据返回结果作不同的操作
    try {
        ...逻辑
    } finally {
        lock.unlock();
    }
} else {
    获取锁失败
}

RLock lock = redisson.getLock("myLock");
lock.lock(); // 如果获取不到锁,在这里阻塞住了
try {
    ....do
} finally {
    lock.unlock();
}

end. 参考

  1. https://mp.weixin.qq.com/s/nrCO8GZBJrLQis98bMaRhg
  2. https://mp.weixin.qq.com/s/UzMTAqVy5MXxmV9rXdgyPg
  3. https://www.cnblogs.com/jackson0714/p/redisson.html
  4. https://mp.weixin.qq.com/s/qVPT-e-gOXsqQ3pMVIEK7A 【Redis事务】
  5. https://segmentfault.com/a/1190000044686369 【思否- redis事务
  6. https://blog.csdn.net/jiayi_yao/article/details/124689937 【Redis事务】

网站公告

今日签到

点亮在社区的每一天
去签到