redis解决常见的秒杀问题

发布于:2025-05-16 ⋅ 阅读:(13) ⋅ 点赞:(0)

title: redis解决常见的秒杀问题
date: 2025-03-07 14:24:13
tags: redis
categories: redis的应用

秒杀问题

每个店铺都可以发布优惠券,保存到 tb_voucher 表中;当用户抢购时,生成订单并保存到 tb_voucher_order 表中。

订单表如果使用数据库自增 ID,会存在以下问题:

  • ID 的规律太明显,容易暴露信息。
  • 单表数据量的限制,订单过多时单表很难存储得下。数据量过大后需要拆库拆表,但拆分表了之后,各表从逻辑上是同一张表,所以 id 不能一样, 于是需要保证 ID 的唯一性。

全局唯一ID

全局唯一 ID 的特点

  • 唯一性:Redis 独立于数据库之外,不论有多少个数据库、多少张表,访问 Redis 获取到的 ID 可以保证唯一。
  • 高可用:Redis 高可用(集群等方案)。
  • 高性能:Redis 速度很快。
  • 递增性:例如 String 的 INCR 命令,可以保证递增。
  • 安全性:为了增加 ID 的安全性,在使用 Redis 自增数值的基础上,在拼接一些其他信息。

全局唯一 ID 的组成(存储数值类型占用空间更小,使用 long 存储,8 byte,64 bit)

在这里插入图片描述

  • 符号位:1 bit,永远为 0,代表 ID 是正数。

  • 时间戳:31 bit,以秒为单位,可以使用 69 年。

  • 序列号:32 bit,当前时间戳对应的数量,也就是每秒可以对应 2^32 个不同的 ID。

Redis ID 自增策略:通过设置每天存入一个 Key,方便统计订单数量;ID 构造为 时间戳 + 计数器。

@Component
public class RedisIdWorker {
    /**
     * 指定时间戳(2023年1月1日 0:0:00) LocalDateTime.of(2023, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC)
     */
    private static final long BEGIN_TIMESTAMP_2023 = 1672531200L;

    /**
     * 序列号位数
     */
    private static final int BIT_COUNT = 32;

    private final StringRedisTemplate stringRedisTemplate;

    public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    public long nextId(String keyPrefix) {
        // 1. 时间戳
        long timestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) - BEGIN_TIMESTAMP_2023;
        // 2. 生成序列号:自增 1,Key 不存在会自动创建一个 Key。(存储到 Redis 中的 Key 为 keyPrefix:date,Value 为自增的数量)
        Long serialNumber = stringRedisTemplate.opsForValue().increment(keyPrefix + ":" + DateTimeFormatter.ofPattern("yyyy-MM-dd").format(LocalDate.now()));
        // 3. 时间戳左移 32 位,序列号与右边的 32 个 0 进行与运算
        return timestamp << BIT_COUNT | serialNumber;
    }
}

测试(300个线程生成共3w个id)

@Resource
private RedisIdWorker redisIdWorker;

public static final ExecutorService ES = Executors.newFixedThreadPool(500);

@Test
void testGloballyUniqueID() throws Exception {
    // 程序是异步的,分线程全部走完之后主线程再走,使用 CountDownLatch;否则异步程序没有执行完时主线程就已经执行完了
    CountDownLatch latch = new CountDownLatch(300);
    Runnable task = () -> {
        for (int i = 0; i < 100; i++) {
            long globallyUniqueID = redisIdWorker.nextId("sun");
            System.out.println("globallyUniqueID = " + globallyUniqueID);
        }
        latch.countDown();
    };

    long begin = System.currentTimeMillis();
    for (int i = 0; i < 300; i++) {
        ES.submit(task);
    }
    latch.await();
    long end = System.currentTimeMillis();
    System.out.println("Execution Time: " + (end - begin));
}

添加优惠卷

格式类似这种逻辑太简单了略

{
    "shopId":1,
    "title":"100元代金券",
    "subTitle":"周一至周五均可使用",
    "rules":"全场通用\n无需预约\n可无限叠加\n不兑现、不找零\n仅限堂食",
    "payValue":8000,
    "actualValue":10000,
    "type":1,
    "stock":100,
    "beginTime":"2022-11-13T10:09:17",
    "endTime":"2022-11-13T22:10:17"
}

秒杀下单功能

在这里插入图片描述

@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
    //1.查询优惠卷
    SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
    //2.判断秒杀是否开始,是否结束
    if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
        return Result.fail("秒杀尚未开始!");
    }
    if(voucher.getEndTime().isBefore(LocalDateTime.now())){
        return Result.fail("秒杀已结束!");
    }
    //3.判断库存是否充足
    if(voucher.getStock()<=0){
        return Result.fail("优惠券库存不足!");
    }
    //4.扣减库存
    boolean success = seckillVoucherService.update()
            .setSql("stock = stock -1")
            .eq("voucher_id", voucherId).update();
    //5.创建订单
    if(!success){
        return Result.fail("优惠券库存不足!");
    }
    //6.返回订单id
    VoucherOrder voucherOrder = new VoucherOrder();
    //6.1订单id
    long orderId = redisIdWorker.nextId("order");
    voucherOrder.setId(orderId);
    //6.2用户id
    Long userId = UserHolder.getUser().getId();
    voucherOrder.setUserId(userId);
    //6.3代金券id
    voucherOrder.setVoucherId(voucherId);

    //7.订单写入数据库
    save(voucherOrder);
    
    //8.返回订单Id
    return Result.ok(orderId);
}

超卖问题

假设库存为 1,有线程1、2、3,时刻 t1、t2、t3、t4。

  • t1:线程1 查询库存,库存为 1;
  • t2:线程2、线程 3 查询库存,库存为 1;
  • t3:线程1 下单,库存扣减为 0。
  • t4:线程2 和 线程3 下单,库存扣减为 -2。

具体图示:
在这里插入图片描述
在这里插入图片描述

解决超卖问题

悲观锁

太简单了直接加锁保证操作数据是原子操作要串行执行

乐观锁
版本号法:

一般是在数据库表中加上一个 version 字段表示 数据被修改的次数。数据被修改时 version 值加 1。

  1. 线程 A 读取数据,同时读取到 version 值。

  2. 提交更新时,若刚才读到的 version 值未发生变化:则提交更新并且 version 值加 1。

  3. 提交更新时,若刚才读到的 version 值发生了变化:放弃更新,并通过报错、自旋重试等方式进行下一步处理。

在这里插入图片描述

CAS法(简单来说就是直接拿库存当版本号):

CAS 操作需要输入两个数值,一个旧值(操作前的值)和一个新值,操作时先比较下在旧值有没有发生变化,若未发生变化才交换成新值,发生了变化则不交换。

CAS 是原子操作,多线程并发使用 CAS 更新数据时,可以不使用锁。原子操作是最小的不可拆分的操作,操作一旦开始,不能被打断,直到操作完成。也就是多个线程对同一块内存的操作是串行的。

在这里插入图片描述

一人一单问题

在这里插入图片描述

一人一单逻辑:

  1. 发送下单请求,提交优惠券 ID。
  2. 下单前需要判断:秒杀是否开始或结束、库存是否充足
  3. 库存充足:根据优惠券 ID 和用户 ID 查询订单,判断该用户是否购买过该优惠券
  4. 该用户对该优惠券的订单不存在时,扣减库存、创建订单、返回订单 ID。

解决并发安全问题

  1. 单人下单(一个用户),高并发的情况下:该用户的 10 个线程同时执行到 查询该用户 ID 和秒杀券对应的订单数量,10 个线程查询到的值都为 0,即未下单。于是会出现一个用户下 10 单的情况。
  2. **此处仍需加锁,乐观锁适合更新操作,插入操作需要选择悲观锁。**若直接在方法上添加 synchronized 关键字,会让锁的范围(粒度)过大,导致性能较差。因此,采用 一个用户一把锁 的方式。

问题:能否用乐观锁执行?

不能,原因是乐观锁只能操作(修改)单个变量,而创建订单需要操作数据库(难以跟踪状态)

@Override
public CommonResult<Long> seckillVoucher(Long voucherId) {
    // 判断秒杀是否开始或结束、库存是否充足。
    SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
    ThrowUtils.throwIf(seckillVoucher == null, ErrorCode.NOT_FOUND_ERROR);
    LocalDateTime now = LocalDateTime.now();
    ThrowUtils.throwIf(now.isBefore(seckillVoucher.getBeginTime()), ErrorCode.OPERATION_ERROR, "秒杀尚未开始");
    ThrowUtils.throwIf(now.isAfter(seckillVoucher.getEndTime()), ErrorCode.OPERATION_ERROR, "秒杀已经结束");
    ThrowUtils.throwIf(seckillVoucher.getStock() < 1, ErrorCode.OPERATION_ERROR, "库存不足");

    // 下单
    return this.createVoucherOrder(voucherId);
}

/**
 * 下单(超卖 - CAS、一人一单 - synchronized)
 */
@Override
@Transactional
public CommonResult<Long> createVoucherOrder(Long voucherId) {
    // 1. 判断当前用户是否下过单
    Long userId = UserHolder.getUser().getId();
    Integer count = this.lambdaQuery()
            .eq(VoucherOrder::getVoucherId, voucherId)
            .eq(VoucherOrder::getUserId, userId)
            .count();
    ThrowUtils.throwIf(count > 0, ErrorCode.OPERATION_ERROR, "禁止重复下单");

    // 2. 扣减库存
    boolean result = seckillVoucherService.update()
                .setSql("stock = stock - 1")
                .eq("voucher_id", voucherId)
                .gt("stock", 0)
                .update();
    ThrowUtils.throwIf(!result, ErrorCode.OPERATION_ERROR, "下单失败");

    // 3. 下单
    VoucherOrder voucherOrder = new VoucherOrder();
    voucherOrder.setUserId(userId);
    voucherOrder.setId(redisIdWorker.nextId("seckillVoucherOrder"));
    voucherOrder.setVoucherId(voucherId);
    result = this.save(voucherOrder);
    ThrowUtils.throwIf(!result, ErrorCode.OPERATION_ERROR, "下单失败");
    return CommonResult.success(voucherOrder.getId());
}

集群环境下的并发问题

在这里插入图片描述

分布式锁-原理

不去使用jvm内部的锁监视器,我们要在外部开一个锁监视器,让它监视所有的线程

在这里插入图片描述

常见的分布式锁

MySQL:MySQL 本身带有锁机制,但是由于 MySQL 性能一般,所以采用分布式锁的情况下,使用 MySQL 作为分布式锁比较少见。
Redis:Redis 作为分布式锁比较常见,利用 setnx 方法,如果 Key 插入成功,则表示获取到锁,插入失败则表示无法获取到锁。
Zookeeper:Zookeeper 也是企业级开发中比较好的一个实现分布式锁的方案。

MySQL Redis Zookeeper
互斥 利用 MySQL 本身的互斥锁机制 利用 setnx 互斥命令 利用节点的唯一性和有序性
高可用
高性能 一般 一般
安全性 断开链接,自动释放锁 利用锁超时时间,到期释放 临时节点,断开链接自动释放
# 添加锁(NX 互斥、EX 设置 TTL 时间)
SET lock thread1 NX EX 10

# 手动释放锁
DEL lock
public interface DistributedLock {
    /**
     * 获取锁(只有一个线程能够获取到锁)
     * @param timeout   锁的超时时间,过期后自动释放
     * @return          true 代表获取锁成功;false 代表获取锁失败
     */
    boolean tryLock(long timeout);

    /**
     * 释放锁
     */
    void unlock();
}

public class SimpleDistributedLock4Redis implements DistributedLock {
    private static final String KEY_PREFIX = "lock:";
    private final String name;
    private final StringRedisTemplate stringRedisTemplate;

    public SimpleDistributedLockBased4Redis(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public boolean tryLock(long timeout) {
        String threadId = Thread.currentThread().getId().toString();
        Boolean result = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name, threadId, timeout, TimeUnit.SECONDS);
      	// result 是 Boolean 类型,直接返回存在自动拆箱,为防止空指针不直接返回
        return Boolean.TRUE.equals(result);
    }

    @Override
    public void unlock() {
        stringRedisTemplate.delete(KEY_PREFIX + name);
    }
}

/**
 * VERSION3.0 - 秒杀下单优惠券(通过分布式锁解决一人一单问题)
 */
@Override
public CommonResult<Long> seckillVoucher(Long voucherId) {
    // 判断秒杀是否开始或结束、库存是否充足。
    ...

    // 下单
    SimpleDistributedLock4Redis lock = new SimpleDistributedLock4Redis("order:" + UserHolder.getUser().getId(), stringRedisTemplate);
    boolean tryLock = lock.tryLock(TTL_TWO);
    ThrowUtils.throwIf(!tryLock, ErrorCode.OPERATION_ERROR, "禁止重复下单");
    try {
        VoucherOrderService voucherOrderService = (VoucherOrderService) AopContext.currentProxy();
        return voucherOrderService.createVoucherOrder(voucherId);
    } finally {
        lock.unlock();
    }
}

误删问题

# 线程 1 获取到锁后执行业务,碰到了业务阻塞。
setnx lock:order:1 thread01

# 业务阻塞的时间超过了该锁的 TTL 时间,触发锁的超时释放。超时释放后,线程 2 获取到锁并执行业务。
setnx lock:order:1 thread02

# 线程 2 执行业务的过程中,线程 1 的业务执行完毕并且释放锁,但是释放的是线程 2 获取到的锁。(线程 2:你 TM 放我锁是吧!)
del lock:order:1

# 线程 3 获取到锁(此时线程 23 并行执行业务)
setnx lock:order:1 thread03

在这里插入图片描述

解决方案:在线程释放锁时,判断当前这把锁是否属于自己,如果不属于自己,就不会进行锁的释放(删除)。

# 线程 1 获取到锁后执行业务,碰到了业务阻塞。
setnx lock:order:1 thread01

# 业务阻塞的时间超过了该锁的 TTL 时间,触发锁的超时释放。超时释放后,线程 2 获取到锁并执行业务。
setnx lock:order:1 thread02

# 线程 2 执行业务的过程中,线程 1 的业务执行完毕并且释放锁。但是线程 1 需要判断这把锁是否属于自己,不属于自己就不会释放锁。
# 于是线程 2 一直持有这把锁直到业务执行结束后才会释放,并且在释放时也需要判断当前要释放的锁是否属于自己。
del lock:order:1

# 线程 3 获取到锁并执行业务
setnx lock:order:1 thread03

在这里插入图片描述

基于 Redis 的分布式锁的实现(解决误删问题)

  1. 相较于最开始分布式锁的实现,只需要增加一个功能:释放锁时需要判断当前锁是否属于自己。(而集群环境下不同 JVM 中的线程 ID 可能相同,增加一个 UUID 区分不同 JVM)

  2. 因此通过分布式锁存入 Redis 中的线程标识包括:UUID (服务器id)+ 线程 ID(线程id)。UUID 用于区分不同服务器中线程 ID 相同的线程,线程 ID 用于区分相同服务器的不同线程。

    public class SimpleDistributedLockBasedOnRedis implements DistributedLock {
        private String name;
        private StringRedisTemplate stringRedisTemplate;
    
        public SimpleDistributedLockBasedOnRedis(String name, StringRedisTemplate stringRedisTemplate) {
            this.name = name;
            this.stringRedisTemplate = stringRedisTemplate;
        }
    
        private static final String KEY_PREFIX = "lock:";
    
      	// ID_PREFIX 在当前 JVM 中是不变的,主要用于区分不同 JVM
        private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
    
        /**
         * 获取锁
         */
        @Override
        public boolean tryLock(long timeoutSeconds) {
          	// UUID 用于区分不同服务器中线程 ID 相同的线程;线程 ID 用于区分同一个服务器中的线程。
            String threadIdentifier = ID_PREFIX + Thread.currentThread().getId();
            Boolean isSucceeded = stringRedisTemplate.opsForValue()
                    .setIfAbsent(KEY_PREFIX + name, threadIdentifier, timeoutSeconds, TimeUnit.SECONDS);
            return Boolean.TRUE.equals(isSucceeded);
        }
    
        /**
         * 释放锁(释放锁前通过判断 Redis 中的线程标识与当前线程的线程标识是否一致,解决误删问题)
         */
        @Override
        public void unlock() {
            // UUID 用于区分不同服务器中线程 ID 相同的线程;线程 ID 用于区分同一个服务器中的线程。
            String threadIdentifier = THREAD_PREFIX + Thread.currentThread().getId();
            String threadIdentifierFromRedis = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
            // 比较 Redis 中的线程标识与当前的线程标识是否一致
            if (!StrUtil.equals(threadIdentifier, threadIdentifierFromRedis)) {
                throw new BusinessException(ErrorCode.OPERATION_ERROR, "释放锁失败");
            }
            // 释放锁标识
            stringRedisTemplate.delete(KEY_PREFIX + name);
        }
    }
    
    

用Lua脚本解决原子性问题

分布式锁的原子性问题

  1. 线程 1 获取到锁并执行完业务,判断锁标识一致后释放锁,释放锁的过程中阻塞,导致锁没有释放成功,并且阻塞的时间超过了锁的 TTL 释放,导致锁自动释放。

  2. 此时线程 2 获取到锁,执行业务;在线程 2 执行业务的过程中,线程 1 完成释放锁操作。

  3. 之后,线程 3 获取到锁,执行业务,又一次导致此时有两个线程同时在并行执行业务

因此,需要保证 unlock() 方法的原子性,即判断线程标识的一致性和释放锁这两个操作的原子性。

Redis 提供了 Lua 脚本功能,在一个脚本中编写多条 Redis 命令,确保 Redis 多条命令执行时的原子性。

unlock操作

private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;

static{//写成静态代码块,类加载就可以完成初始定义,就不用每次释放锁都去加载这个,性能提高咯
    UNLOCK_SCRIPT = new DefaultRedisScript<>();
    UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));//设置脚本位置
    UNLOCK_SCRIPT.setResultType(Long.class);
}
    public void unlock(){
        //调用lua脚本
        stringRedisTemplate.execute(
                UNLOCK_SCRIPT,
                Collections.singletonList(KEY_PREFIX + name),
                ID_PREFIX + Thread.currentThread().getId()
        );
    }

Lua脚本

-- 锁的key
-- local key = KEYS[1]
-- 当前线程标识
-- local threadId = ARGV[1]
-- 获取锁中的线程标识
if(redis.call('get',KEYS[1]) == ARGV[1]) then return redis.call('del',KEYS[1])
end return 0


网站公告

今日签到

点亮在社区的每一天
去签到