文章目录
1.全局唯一ID
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:
- id的规律性太明显
- 受单表数据量的限制
场景分析:如果我们的id具有太明显的规则,用户或者说商业对手很容易猜测出来我们的一些敏感信息,比如商城在一天时间内,卖出了多少单,这明显不合适。
场景分析二:随着我们商城规模越来越大,mysql的单表的容量不宜超过500W,数据量过大之后,我们要进行拆库拆表,但拆分表了之后,他们从逻辑上讲他们是同一张表,所以他们的id是不能一样的, 于是乎我们需要保证id的唯一性。
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:
- UUID MySQL8.0以后,UUID可以自增且减少了存储空间
- Redis自增
- snowflake算法 依赖于系统时间,需要机器间的时间一致
- 数据库自增 专门弄一个数据库表用于自增id的获取,类似Redis,但是性能差于redis
- 为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:
Redis自增做全局唯一ID:
- 组成部分:
- 符号位:1bit,永远为0
- 时间戳:31bit,以秒为单位,可以使用69年
- 序列号:32bit,秒内的计数器,支持每秒产生232个不同ID
Redis 实现全局唯一ID
@Component
public class RedisIdWorker {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 开始时间戳
*/
public static final long BEGIN_TIMESTAMP = 1640995299L;//2021年1月1日时间戳
/**
* 序列号的位数
*/
public static final int COUNT_BITS = 32;
/**
* 1位符号位 + 31位时间戳 + 32位序列号
* @param keyPrefix
* @return
*/
public long nextId(String keyPrefix) {
//1 生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
//2 生成序列号
//2.1 获取当前日期,精确到天,每天一个key,方便统计
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
//2.2 自增长
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
//拼接并返回
return timestamp << COUNT_BITS | count;
}
}
2.优惠券秒杀
2.1 添加优惠券
由于没有后台,这里我们直接在数据库内添加秒杀优惠券
也可以通过postman进行提交请求
2.2 实现优惠券秒杀下单
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
//查询优惠券信息
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
//判断秒杀是否开始
if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
//未开始
return Result.fail("秒杀尚未开始");
}
if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
//已经结束
return Result.fail("秒杀已经结束");
}
//判断库存是否充足
if (seckillVoucher.getStock()<1) {
return Result.fail("库存不足");
}
//扣减库存
boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).update();
if (!success){
return Result.fail("下单失败");
}
//创建订单
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//返回订单id
return Result.ok(orderId);
}
表面上我们已经解决了优惠券的够买,实际上它还存在一些缺陷
2.3 优惠券超卖问题
这里我们假设有多个线程,线程一对数据库进行查询,发现库存为1,则进行购买优惠券,这时线程二也对数据库进行查询,而线程一还未完成库存的减少操作。此时线程二查到的同样库存为1这就出现了超卖问题(多线程安全问题)。
解决方案:
2.3.1 悲观锁
2.3.2 乐观锁
乐观锁又有两种方法
版本号法
我们可以查询库存以及版本号,我们在进行数据库的修改操作之前先进行判断版本号是否发生了改变(与查询到的版本号是否一致)即sql语句
set stock =stock-1,version=version+1 where id=10 and version= 1;
CAS法
当我们查询库存时,可以在对库存进行修改之前,可以先判断库存是否与刚刚查到的一致如果不一致,说明有人修改过了,需要进行重试或抛出异常。即sql语句
set stock=stock-1 where id= 10 and stock=1;
但同时又引入了另一个问题,此时如果很多线程同时与线程一争抢,那么只有线程一会成功,其他的全部失败。这就是乐观锁的弊端:成功率低。
那么作为库存,我们是不是可以直接让最后的判断条件为stock>0呢。这样同时争抢时,只要库存大于0,就可以抢到。
代码实现:
//扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock",0)
.update();
2.3.3 总结
2.4 单体下的一人一单
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
//查询优惠券信息
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
//判断秒杀是否开始
if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
//未开始
return Result.fail("秒杀尚未开始");
}
if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
//已经结束
return Result.fail("秒杀已经结束");
}
//判断库存是否充足
if (seckillVoucher.getStock()<1) {
return Result.fail("库存不足");
}
//6.一人一单
//用户id
Long userId = UserHolder.getUser().getId();
//6.1查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
//6.2判断用户是否已经购买过了
if (count>0){
//购买过了
return Result.fail("您已经购买过了");
}
//扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock",0)
.update();
if (!success){
return Result.fail("下单失败");
}
//创建订单
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//返回订单id
return Result.ok(orderId);
}
但是,此时我们开启多线程测试就会发现,并没有真正实现一人一单。原因是:当多个线程同时开启时,同时都去查询数据库,可能有多个线程查到当前用户没有买过优惠券。这时候我们没办法使用乐观锁,因为第一次查到的为空,无法进行条件判断。这里我们使用悲观锁。
代码实现:
@Override
public Result seckillVoucher(Long voucherId) {
//查询优惠券信息
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
//判断秒杀是否开始
if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
//未开始
return Result.fail("秒杀尚未开始");
}
if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
//已经结束
return Result.fail("秒杀已经结束");
}
//判断库存是否充足
if (seckillVoucher.getStock()<1) {
return Result.fail("库存不足");
}
Long userId=UserHolder.getUser().getId();
synchronized (userId.toString().intern()){
//获取代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
/**
* 创建订单
* @param voucherId
* @return
*/
@Transactional
public Result createVoucherOrder(Long voucherId) {
//6.一人一单
//用户id
Long userId = UserHolder.getUser().getId();
//6.1查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
//6.2判断用户是否已经购买过了
if (count > 0) {
//购买过了
return Result.fail("您已经购买过了");
}
//扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0)
.update();
if (!success) {
return Result.fail("下单失败");
}
//创建订单
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//返回订单id
return Result.ok(orderId);
}
实现细节:
锁的范围尽量小。synchronized尽量锁代码块,而不是方法,锁的范围越大性能越低
锁的对象一定要是一个不变的值。我们不能直接锁 Long 类型的 userId,每请求一次都会创建一个新的 userId 对象,synchronized 要锁不变的值,所以我们要将 Long 类型的 userId 通过 toString()方法转成 String 类型的 userId,toString()方法底层(可以点击去看源码)是直接 new 一个新的String对象,显然还是在变,所以我们要使用 intern() 方法从常量池中寻找与当前 字符串值一致的字符串对象,这就能够保障一个用户 发送多次请求,每次请求的 userId 都是不变的,从而能够完成锁的效果(并行变串行)
我们要锁住整个事务,而不是锁住事务内部的代码。如果我们锁住事务内部的代码会导致其它线程能够进入事务,当我们事务还未提交,锁一旦释放,仍然会存在超卖问题
Spring的@Transactional注解要想事务生效,必须使用动态代理。Service中一个方法中调用另一个方法,另一个方法使用了事务,此时会导致@Transactional失效,所以我们需要创建一个代理对象,使用代理对象来调用方法。
这篇文章对@Transactional注解事务失效的常见场景进行了一个总结:【Java面试篇】Spring中@Transactional注解事务失效的常见场景
让代理对象生效的步骤:
①引入AOP依赖,动态代理是AOP的常见实现之一
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
②暴露动态代理对象,默认是关闭的
@EnableAspectJAutoProxy(exposeProxy = true)
2.5 集群下的并发安全问题
1)搭建集群并实现负载均衡
首先,在IDEA中启动两个SpringBoot程序,一个端口号是8081,另一个端口是8082:
然后在Nginx中配置负载均衡:
这时我们开启两个程序使用postman向两个端口分别发送带有同一个用户token的请求。这时候我们会发现,又出现了超卖现象。原因如下:
这两个线程由于不属于同一个JVM导致他们的锁监视器也不是同一个。此时锁就失效了,只能锁住同一个JVM中的线程。
因此我们就需要一个新的方法来解决这个问题。
3.分布式锁
分布式锁指的是:满足分布式系统或集群模式下多进程可见并且互斥的锁
特点:
- 多进程可见
- 互斥
- 高可用性
- 高性能
- 安全性
3.1 基于Redis的分布式锁
实现分布式锁时需要实现的两个基本方法:
获取锁:
- 互斥:确保只能有一个线程获取锁
- 非阻塞:尝试一次,成功返回true,失败返回false
- setnx和expire合并为一个语句,避免只执行setnx后宕机
释放锁:
- 手动释放
- 超时释放:获取锁时添加一个超时时间
3.1.1 基于Redis实现分布式锁初级版本
public static final String KEY_PREFIX = "lock:";
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock(Long timeoutSec) {
// 获取锁
long threadID = Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadID + "", timeoutSec, TimeUnit.SECONDS);
return BooleanUtil.isTrue(success);
}
public void unlock() {
//通过del删除锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
极端情况:
当线程1获取到锁之后,由于某个原因业务阻塞,直到超时锁失效,此时线程二也来获取锁,然后线程1完成业务进行删除锁,然后线程三这时候就也能获取锁了。这再次造成了并发问题。
3.1.2 解决误删锁的问题
需求:修改之前的分布式锁实现,满足:在获取锁时存入线程标示(可以用UUID表示)
在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
- 如果一致则释放锁
- 如果不一致则不释放锁
核心逻辑:在存入锁时,放入自己线程的标识,在删除锁时,判断当前这把锁的标识是不是自己存入的,如果是,则进行删除,如果不是,则不进行删除。
public class SimpleRedisLock implements ILock{
public static final String KEY_PREFIX = "lock:";
/**
* 使用public static final的UUID作为JVM的区分,同一个JVM获取到的SimpleRedisLock实例ID_PREFIX相同
* 用于作为锁的value的前缀,避免不同JVM下threadId相同的情况下锁被别的线程误删
*/
public static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock(Long timeoutSec) {
// 获取锁
String threadID = ID_PREFIX + Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadID, timeoutSec, TimeUnit.SECONDS);
return BooleanUtil.isTrue(success);
}
@Override
public void unlock() {
//获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
//获取锁中的标识(value)
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
//判断标识是否一致
if (threadId.equals(id)) {
//释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}
极端情况
当线程1获取到锁,执行完业务后,判断锁标识一致要释放锁时,受到了阻塞,这段时间锁超时失效,线程二获取锁,获取成功之后线程一又结束了阻塞,这时候由于已经判断过锁标识,则会直接释放线程二的锁。这时候线程三就可以再次获取锁,造成并发问题
3.1.3 Lua脚本解决多条命令原子性问题
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html,这里重点介绍Redis提供的调用函数,我们可以使用lua去操作redis,又能保证他的原子性,这样就可以实现拿锁比锁删锁是一个原子性动作了,作为Java程序员这一块并不作一个简单要求,并不需要大家过于精通,只需要知道他有什么作用即可。
这里重点介绍Redis提供的调用函数,语法如下:
redis.call('命令名称', 'key', '其它参数', ...)
例如,我们要执行set name jack,则脚本是这样:
执行 set name jack
redis.call('set', 'name', 'jack')
例如,我们要先执行set name Rose,再执行get name,则脚本如下:
先执行 set name jack
redis.call('set', 'name', 'Rose')
再执行 get name
local name = redis.call('get', 'name')
返回
return name
写好脚本以后,执行脚本用EVAL执行脚本
接下来我们来回一下我们释放锁的逻辑:
释放锁的业务流程是这样的
1、获取锁中的线程标示
2、判断是否与指定的标示(当前线程标示)一致
3、如果一致则释放锁(删除)
4、如果不一致则什么都不做
如果用Lua脚本来表示则是这样的:
最终我们操作redis的拿锁比锁删锁的lua脚本就会变成这样
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
修改后的代码
public class SimpleRedisLock implements ILock {
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}
/*@Override
public void unlock() {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标示是否一致
if(threadId.equals(id)) {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}*/
}
3.1.4 总结
基于Redis的分布式锁实现思路:
- 利用set nx ex获取锁,并设置过期时间,保存线程标示
- 释放锁时先判断线程标示是否与自己一致,一致则删除锁
- 特性:
- 利用set nx满足互斥性
- 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
- 利用Redis集群保证高可用和高并发特性
- 特性:
3.2 Redissson优化分布式锁
基于setnx实现的分布式锁存在下面的问题:
要解决这些问题就要使用Redisson
引入依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
配置Redisson客户端:
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
// 若是集群使用useClusterServers()
config.useSingleServer().setAddress("redis://192.168.111.100:6379").setPassword("123456");
// 创建RedissonClient对象
return Redisson.create(config);
}
}
在 VoucherOrderServiceImpl
注入RedissonClient
@Resource
private RedissonClient redissonClient;
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
//创建锁对象 这个代码不用了,因为我们现在要使用分布式锁
//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
RLock lock = redissonClient.getLock("lock:order:" + userId);
//获取锁对象
boolean isLock = lock.tryLock();
//加锁失败
if (!isLock) {
return Result.fail("不允许重复下单");
}
try {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
//释放锁
lock.unlock();
}
}