目录
2.3.4 添加刷新Token拦截器逻辑 (只做判断,不做拦截)
2.3.6 【故障排查】关于一登陆后前端立马闪退回登录界面的问题
3.7.2 基于互斥锁解决缓存击穿案例——以请求店铺信息为例
3.7.3 基于逻辑过期解决缓存击穿案例——以请求店铺信息为例
3.7.4【故障排查】关于逻辑过期时间策略无法查询店铺信息的现象
4.8 (拓展知识点) Redisson ——成熟的锁工具包
4.8.8 (TODO)建立Redis集群环境,测试主从一致性问题
4.10.3(三者之最) 基于Stream的消息队列原理及其使用
4.10.4 基于Redis的Stream结果作为消息队列,优化异步秒杀下单功能
扩展阅读推荐:
Redis学习Day1——配置Linux环境下的运行环境-CSDN博客
黑马程序员Redis入门到实战教程_哔哩哔哩_bilibili
使用git命令行将本地仓库代码上传到gitee/github远程仓库-CSDN博客
布隆(Bloom Filter)过滤器——全面讲解,建议收藏-CSDN博客
实战篇-07.优惠券秒杀-实现一人一单功能_哔哩哔哩_bilibili
Redisson帮助文档————Home · redisson/redisson Wiki (github.com)
深入了解分布式锁 Redisson 原理—可重入原理、可重试原理、主从一致性原理、解决超时锁失效
【黑马点评】已解决java.lang.NullPointerException异常-CSDN博客
一、项目介绍及其初始化
学习Redis的过程,我们还将遇到各种实际问题,例如缓存穿透、雪崩、击穿等问题,只有在实际的项目实践中解决这些问题,才能更好的掌握和理解Redis的企业开发思维。
以下是本次【黑马点评】项目的主要内容:


1.1 数据库连接配置
在yml配置文件中,配置好自己的数据库连接信息

1.2 工程启动演示
想要成功启动该项目,需要以下步骤:
1. 打开VM虚拟机,激活Linux系统中事先配置好的Redis数据库
2. 启动nignx服务器(注意nignx服务器必须放在无中文的目录下)
3. 启动后端程序(观察端口,访问初始工程)
4. 访问请求地址,验证工程启动正确http://localhost:8081/shop-type/list


二、短信登录功能实现
2.1 基于传统Session实现的短信登录及其校验
2.1.1 基于Session登录校验的流程设计
2.1.2 实现短信验证码发送功能
请求接口 | /user/code |
请求类型 | post |
请求参数 | phone |
返回值 | 无 |
/**
* 发送手机验证码
*/
@PostMapping("/code")
public Result sendCode(@RequestParam("phone") String phone, HttpSession session) {
log.info("发送验证码, 手机号:{}", phone);
return userService.sendCode(phone, session);
}
/**
* 发送验证码
* @param phone
* @param session
* @return
*/
@Override
public Result sendCode(String phone, HttpSession session) {
// 1. 校验手机号码
if(RegexUtils.isPhoneInvalid(phone)){
return Result.fail("手机号码格式错误!");
}
// 2. 生成验证码
String code = RandomUtil.randomNumbers(6);
// 3. 将验证码保存到Session中
session.setAttribute("code", code);
//TODO 4. 调用阿里云 将短信信息发送到指定手机
log.info("发送短信验证码成功,验证码:{}", code);
return Result.ok();
}
2.1.3 实现登录、注册功能
请求接口 | /user/login |
请求类型 | post |
请求参数 | LoginForm---> phone,code,[password] |
返回值 | 无 |
/**
* 登录功能
* @param loginForm 登录参数,包含手机号、验证码;或者手机号、密码
*/
@PostMapping("/login")
public Result login(@RequestBody LoginFormDTO loginForm, HttpSession session){
log.info("用户登录, 参数:{}", loginForm);
return userService.login(loginForm, session);
}
/**
* 登录功能
* @param loginForm
* @param session
* @return
*/
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
// 1. 校验手机号
String phone = loginForm.getPhone();
if(RegexUtils.isPhoneInvalid(phone)){
return Result.fail("手机号码格式错误!");
}
// 2. 校验验证码
Object cacheCode = session.getAttribute("code");
String code = loginForm.getCode();
if(cacheCode==null || !cacheCode.toString().equals(code)){
return Result.fail("验证码错误!");
}
// 3. 根据手机号查询用户 select * from tb_user where phone = ?
User user = query().eq("phone", phone).one();
// if 0 :创建新用户,保存数据库,将用户信息存储到Session
//
if(user == null){
user = createUserWithPhone(phone);
}
//else: 登录成功,将用户信息存储到Session
session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));
return Result.ok();
}
/**
* 根据手机号创建用户
* @param phone
* @return
*/
private User createUserWithPhone(String phone) {
//创建用户
User user = new User();
user.setPhone(phone);
user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));
//保存用户
save(user);
// 返回
return user;
}
2.1.4 实现登录状态校验拦截器
由于日后项目功能会越来越多,需要登录才能进行访问的界面也会越来越多,我们必须想办法将登录状态校验抽离出来形成一个前置校验的条件,再放行到后续逻辑。
1. 封装TreadLocal工具类
将用户信息保存到 TreadLocal中 并封装TreadLocal工具类用于 保存用户、获取用户、移除用户
在 urils / UserHolder
/**
* TreadLocal工具类
*/
public class UserHolder {
private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();
// 保存用户
public static void saveUser(UserDTO user){
tl.set(user);
}
// 获取ThreadLocal中的用户
public static UserDTO getUser(){
return tl.get();
}
// 清空ThreadLocal
public static void removeUser(){
tl.remove();
}
}
2. 创建登录拦截器
在 urils / LoginInterceptor
public class LoginInterceptor implements HandlerInterceptor {
/**
* 前置拦截器
*/
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1. 获取session
HttpSession session = request.getSession();
// 2. 获取session中的用户
Object user = session.getAttribute("user");
// 3. 判断用户是否存在
if(user == null){
response.setStatus(401);
return false;
}
// 4. 如果存在,用户信息保存到 ThreadLocal 并放行
UserHolder.saveUser((UserDTO) user);
return true;
}
/**
* 后置拦截器(移除用户)
*/
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// 移除用户
UserHolder.removeUser();
}
}
3. 添加配置,生效拦截器,并配置放行路径
在 config/ MvcConfig
@Configuration
public class MvcConfig implements WebMvcConfigurer {
/**
* 添加拦截器
* @param registry
*/
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/user/code",
"/user/login",
"/blog/host",
"/shop/**",
"/shop-type/**",
"/voucher/**"
);
}
}
2.1.5 实现获取用户请求
前端点击我的,发送请求到后端,获取当前登录状态,方能进入个人中心
/**
* 获取当前登录的用户
* @return
*/
@GetMapping("/me")
public Result me(){
UserDTO user = UserHolder.getUser();
return Result.ok(user);
}
2.1.6 (附加)用户信息脱敏处理
为防止出现以下这种情况(将用户隐私信息暴露过多),我们采用UserDTO对象对用户信息脱敏处理:
@Data public class UserDTO { private Long id; private String nickName; private String icon; }
并借助拷贝工具 进行对象拷贝
session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));
2.2 传统Session在集群环境下的弊端
Session共享问题
多台Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题。
解决策略
1. 让Session可以共享
Tomcat提供了Session拷贝功能,但是这会增加服务器的额外内存开销,并且带来数据一致性问题
2. 【推荐】使用Redis进行替代
数据共享、内存存储(快)、key-value结构
2.3 基于Redis实现短信登录功能
2.3.1 基于Redis实现短信登录流程设计


对于验证码,使用 手机号码作为KEY,确保了正确的手机对应着正确的短信验证码。
对于用户信息唯一标识,使用 UUID生成的Token作为 KEY,而不使用手机号码,从而提高了用户数据安全性。
2.3.2 修改发送短信验证码功能
只需要在Session的基础上,将第三步保存到Redis中
格式:
key | value | TTL |
login:code:[手机号码] | [验证码] | 120S |
// // 3. 将验证码保存到Session中
// session.setAttribute("code", code);
// 3. 将验证码保存到Redis中
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);
2.3.3 修改登录、注册功能
1. 手机号校验
2. 从Redis中取出验证码进行校验
3.查询用户信息
4. 将用户信息存储到Redis ---> 需要以Hash结构进行存储 ----> 需要将user对象转成 Map对象
5. 将token返回给客户端 ,充当Session的标识作用
/**
* 登录功能
* @param loginForm
* @param session
* @return
*/
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
// 1. 校验手机号
String phone = loginForm.getPhone();
if(RegexUtils.isPhoneInvalid(phone)){
return Result.fail("手机号码格式错误!");
}
// 2. 校验验证码 REDIS
// Object cacheCode = session.getAttribute("code");
// 2.1 从Redis中获取验证码
String redisCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
// 2.2 校验验证码
String code = loginForm.getCode();
if(redisCode==null || !redisCode.equals(code)){
return Result.fail("验证码错误!");
}
// 3. 根据手机号查询用户 select * from tb_user where phone = ?
User user = query().eq("phone", phone).one();
// if 0 :创建新用户,保存数据库,将用户信息存储到Session
if(user == null){
user = createUserWithPhone(phone);
}
// //else: 登录成功,将用户信息存储到Session
// session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));
// 4. 将用户信息存储到Redis中
// 1. 随机生成token,作为登录令牌 ---> UUID导入工具包中的方法,不要导入java自带的
String token = UUID.randomUUID().toString(true);
// 2. 以hash结构进行存储
UserDTO userDTO = BeanUtil.copyProperties(user,UserDTO.class);
//TODO 这里报错了,因为UserDTO中有个id属性,不是字符串,在Redis序列化下报错
// Map<String,Object> userMap = BeanUtil.beanToMap(userDTO);
Map<String,Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName,fieldValue)-> fieldValue.toString()));
// 3. 存储到Redis中
stringRedisTemplate.opsForHash().putAll(LOGIN_USER_KEY + token,userMap);
// 给token设置有效期
// 超过30分钟不访问任何界面就会剔除,所以还需要设置在访问过程中不断更新token的有效期
// 实现方式: 在登录拦截器中进行处理
stringRedisTemplate.expire(LOGIN_USER_KEY + token, LOGIN_USER_TTL, TimeUnit.MINUTES);
// 5. 返回token到客户端,客户端保存到浏览器中
return Result.ok(token);
}
2.3.4 添加刷新Token拦截器逻辑 (只做判断,不做拦截)
首先,由于需要在自定义的拦截器中使用StringRedisTemplate对象,由于不是交由spring管理的,所以我们需要自己写构造函数进行导入。同时在MvcConfig中直接交给Spring管理
其次,这里选择了新建一个专门负责刷新Token的“拦截器”,只做判断不做拦截。确保请求在经过登录校验拦截器之前,会统一先被该“拦截器”获取,并对Token进行判断,如果没有Token,则会被接下来的登录拦截器进行拦截
package com.hmdp.config;
import com.hmdp.Interceptor.LoginInterceptor;
import com.hmdp.Interceptor.RefreshTokenInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import javax.annotation.Resource;
@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 添加拦截器
* @param registry
*/
public void addInterceptors(InterceptorRegistry registry) {
// 刷新token拦截器 全部拦截 只做判断
registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**");
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/user/code",
"/user/login",
"/blog/host",
"/shop/**",
"/shop-type/**",
"/voucher/**"
);
}
}
package com.hmdp.Interceptor;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.StrUtil;
import com.hmdp.dto.UserDTO;
import com.hmdp.utils.UserHolder;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.servlet.HandlerInterceptor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static com.hmdp.utils.RedisConstants.LOGIN_USER_KEY;
import static com.hmdp.utils.RedisConstants.LOGIN_USER_TTL;
/**
* Token缓存刷新拦截器 只会放行不会拦截
*/
public class RefreshTokenInterceptor implements HandlerInterceptor {
private StringRedisTemplate stringRedisTemplate;
/**
* 手动创建的对象,需要手动注入,所以需要构造方法
* @param stringRedisTemplate
*/
public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1.获取请求头中的token
String token = request.getHeader("authorization");
if (StrUtil.isBlank(token)) {
return true;
}
// 2.基于TOKEN获取redis中的用户
String key = LOGIN_USER_KEY + token;
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
// 3.判断用户是否存在
if (userMap.isEmpty()) {
return true;
}
// 5.将查询到的hash数据转为UserDTO
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
// 6.存在,保存用户信息到 ThreadLocal
UserHolder.saveUser(userDTO);
// 7.刷新token有效期
stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES);
// 8.放行
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// 移除用户
UserHolder.removeUser();
}
}
2.3.5 (补充)退出登录功能实现
前端点击退出登录时发送logout请求,我们只需要将TreadLocal的用户对象给清除掉,这样一来前端的请求就获取不到用户信息,强制被拦截到登录界面了
/**
* 登出功能
* @return 无
*/
@PostMapping("/logout")
public Result logout(){
// 清除用户登录状态
UserHolder.removeUser();
return Result.ok();
}
2.3.6 (补充)查看用户首页功能实现
点击用户头像,可以进入用户的首页

/**
* 根据id查询用户
* @param userId
* @return
*/
@GetMapping("/{id}")
public Result getUserById(@PathVariable("id") Long userId){
User user = userService.getById(userId);
if(user == null){
return Result.ok();
}
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
return Result.ok(userDTO);
}
2.3.7【故障排查】关于一登陆后前端立马闪退回登录界面的问题
在跟着视频练习的过程中,在所以代码开发完成后,我发现了每当自己点击登录校验成功后,前端又会重复的闪退回登录界面。对此我进行了以下排除手段:
1. 检查Redis是否 将 短信验证码及其用户token信息保存成功,有则证明这两个大环节没有问题
2. 猜测是拦截器问题:
一开始,我们模仿老师将刷新Token的逻辑一并写到了登录校验拦截器上。但是!登录校验拦截器在开发的过程中,有一些需要Token【或是说需要校验TreadLocal对象】的接口并没有被拦截器拦截下来,导致前端认为该用户操作并未携带Token【没被存储到TreadLocal中】,从而误判为未登录状态,从而剔除该用户,强制跳转到登录界面。
为此,秉承单一职责原则,对于Token,我们需要新建一个将所有界面都“拦截”的拦截器,这样子可以保证在进入后续拦截器的请求,不会再有被误判的情况出现。
而且,也确保了不管用户进行了什么操作,Token都能刷新时长
(该故障经过拆分拦截器已正常解决,但是题主并没有深刻去揪到底是拿一些请求被误判了,这个故障原因也是我分析,如有高见请分享一下)
2.4 (TODO) 基于阿里云完善验证码功能
TODO
三、商户查询缓存系列功能实现
3.1 缓存的理解
我们的程序如果想要用户有一个比较良好的使用体验,在请求数据速度上必然要有所突出。因此我们一般会在项目中运用缓存策略,从而提高我们程序的响应速度。与此同时,在添加缓存策略之后,数据一致性、缓存穿透、雪崩、热Key、维护成本提高等相继出现。如何平衡好这种关系,成为了我们学习Redis的重要所在。
3.2 查询商户店铺---添加Redis缓存
3.2.1 添加缓存逻辑理解
由于Redis访问快的优点,我们在客户端与数据库之间添加一层 Redis数据层。用户发送的请求首先打到Redis进行查询,
- 如果在Redis上查询命中,则直接返回给用户,从而减轻了底层数据库服务器数据压力。
- 如果没能命中,则请求打到数据库进行查询,查询未命中则说明此次请求为 错误请求
- 数据库命中的话,就将数据写入Redis中 ,接着返回给用户。

3.2.2 添加缓存逻辑实现
/**
* 根据id查询商铺信息
* @param id 商铺id
* @return 商铺详情数据
*/
@GetMapping("/{id}")
public Result queryShopById(@PathVariable("id") Long id) {
return shopService.queryById(id);
}
/**
* 根据id查询店铺(添加Redis缓存版)
* @param id
* @return
*/
@Override
public Result queryById(Long id) {
//1. 根据id到Redis中查询用户信息
//2. Redis命中 ----------------> 返回商户信息 -------> 结束
//3. Redis未命中 查询数据库
//4. 查询数据库未命中 ----------------> 返回错误信息 ------> 结束
//5. 数据库命中 ---------------> 将数据写入Redis ------> 返回商户信息 -------> 结束
String stopJson = stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
if(StrUtil.isNotBlank(stopJson)){
// 存在直接返回
Shop shop = JSONUtil.toBean(stopJson, Shop.class); //将JSON字符串转换为对象
return Result.ok(shop);
}
// query().eq("shop_id",id);
// 查询数据库
Shop shop = getById(id);
if(shop == null){
Result.fail("店铺不存在");
}
stringRedisTemplate.opsForValue().set(
RedisConstants.CACHE_SHOP_KEY + id,
JSONUtil.toJsonStr(shop),
RedisConstants.CACHE_SHOP_TTL,
TimeUnit.MINUTES);
return Result.ok(shop);
}
3.2.3 缓存功能效果展示
第一次查询商户店铺,未缓存Redis,后台查询数据库
第二次再次查询商户店铺,已缓存Redis,后台没有查询店铺的数据库语句
3.3(课后作业)查询商户分类列表---添加Redis缓存
3.3.1 使用String存储类型实现
/**
* 查询店铺类型 (添加Redis版)
* @return
*/
@GetMapping("list")
public Result queryTypeList() {
// List<ShopType> typeList = typeService
// .query().orderByAsc("sort").list();
return typeService.queryTypeList();
}
/**
* 查询店铺类型列表(添加Redis版)
* String 实现版
* @return
*/
@Override
public Result queryTypeList() {
// 1. 在Redis中查询店铺类型列表
// 2. Redis命中 ------> 直接返回店铺类型数据 -------> 结束
// 3. Redis未命中, 查询数据库
// 4. 数据库未命中 -------> 返回报错信息 --------> 结束
// 5. 数据库命中,-------> 将数据存入Redis --------> 返回店铺类型数据 --------> 结束
String Key = RedisConstants.CACHE_SHOP_TYPE_KEY;
String shopTypeJSON = stringRedisTemplate.opsForValue().get(Key);
// 将字符串转换为对象
List<ShopType> shopTypeList = null;
if(StrUtil.isNotBlank(shopTypeJSON)){
shopTypeList = JSONUtil.toList(shopTypeJSON, ShopType.class);
return Result.ok(shopTypeList); // 返回店铺类型数据
}
// 查询数据库
shopTypeList = query().orderByAsc("sort").list();
// 将对象转换为字符串
shopTypeJSON = JSONUtil.toJsonStr(shopTypeList);
// 将数据存入Redis
stringRedisTemplate.opsForValue().set(Key, shopTypeJSON);
return Result.ok(shopTypeList);
}
3.3.2 使用List存储类型实现
/**
* 查询店铺类型列表(添加Redis版)
* List 实现版
* @return
*/
@Override
public Result queryTypeList() {
// 1. 在Redis中查询店铺类型列表
// 2. Redis命中 ------> 直接返回店铺类型数据 -------> 结束
// 3. Redis未命中, 查询数据库
// 4. 数据库未命中 -------> 返回报错信息 --------> 结束
// 5. 数据库命中,-------> 将数据存入Redis --------> 返回店铺类型数据 --------> 结束
String Key = RedisConstants.CACHE_SHOP_TYPE_KEY;
// 获取列表中所有元素(字符串格式)
List<String> shopTypeJSON = stringRedisTemplate.opsForList().range(Key, 0, -1); // 获取列表中所有元素
if(shopTypeJSON != null && !shopTypeJSON.isEmpty()){
// Redis中存在数据,需要将所有的Value转换成 ShopType对象
// 将字符串转换为对象
List<ShopType> shopTypeList = new ArrayList<>();
for(String str : shopTypeJSON){
shopTypeList.add(JSONUtil.toBean(str, ShopType.class));
}
return Result.ok(shopTypeList); // 返回店铺类型数据
}
// 查询数据库
List<ShopType> shopTypeList = query().orderByAsc("sort").list();
if(shopTypeList == null || shopTypeList.isEmpty()){
return Result.fail("店铺类型不存在");
}
// 将对象转换为字符串(每一项都是)
for (ShopType shopType : shopTypeList) {
stringRedisTemplate.opsForList().rightPushAll(Key, JSONUtil.toJsonStr(shopType));
}
// 设置过期时间
stringRedisTemplate.expire(Key, RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
return Result.ok(shopTypeList);
}
3.3.3 缓存功能效果展示


3.4 (知识点)缓存更新策略最佳实践
3.4.1 数据一致性问题
前面引入Redis时已经讲过了,缓存的使用可以降低后端负载、降低响应时间、提高读写效率。但也会带来系列问题,数据一致性便是其中最为常见的问题之一。
数据一致性问题的根本原因就是缓存和数据库中的数据不同步,因此,我们需要提出一套良好的缓存更新方案,尽可能的使得缓存和数据库中的数据进行及时同步。
3.4.2 缓存更新策略的最佳实践
- 内存淘汰机制【全自动触发】
所谓内存淘汰机制,就是当Redis出现内存不足的情况下,会选择性的剔除一部分数据。这种全自动触发的淘汰机制不受控制,且触发概率不高,因此只适用于一些低一致性需求的业务
- 超时剔除机制【半自动触发】
超时剔除机制通常被我们当作一个保底策略,就是习惯性的给缓存数据设置一定的过期时间TTL。到期后,由Redis自动进行剔除,从而更好的利用缓存空间
- 主动更新机制【手动触发】
手动编码实现缓存更新,在修改数据库的同时更新缓存,实现双写。
特别灵活,可控性好
- 读写穿透(Read / Write Through Pattern)
缓存与数据库整合成为一个服务,由服务来维护统一性。调用者调用该服务,无需关心缓存的一致性问题
问题:想实现困难,市面上也很难找到这种服务
- 写回方案 (Write Behind Caching Pattern)
调用者只操作缓存,由其他一个独立线程异步的将缓存数据持久化到数据库,从而实现最终保持一致
问题:异步任务复杂,一致性难以保证(异步非同步),宕机及丢失
- 双写方案 (Cache Aside Pattern)
由缓存的调用者,在更新数据库的同时去更新缓存
- 更新缓存模式
无效写操作较多,不推荐使用:
例如对于一个数据,我们要进行100次的修改更新。如果每次都去更新,实际上只有最后一次更新是有效操作。所有对于查询少的情况下,更新缓存模式性能反而不太好。
- 删除缓存模式
无效写操作相对较少,推荐使用
- 先操作缓存
先操作缓存再操作数据库在多线程环境下出错情况说明:
【线程1 做更新缓存操作 线程2 做查询操作】
由于先操作缓存,线程1执行删除缓存操作
如果 从删除缓存 到 更新完成这一个过程复杂 耗时特别长
同一时间,线程2执行查询操作,由于缓存未命中,查询数据库
此时还未更新完成,查询到旧的值,并把旧的值写回了缓存
这样一来,缓存数据和数据库数据产生了不一致
这个概率比较大:
线程1先删掉缓存,然后执行一个耗时长的更新动作
而线程2 则是进行查询缓存 和 写入缓存的动作,耗时短
- 先操作数据库
先操作数据库再删除缓存出错情况说明:
【线程1 做查询操作 线程2 做更新操作】
假设恰好缓存失效了
线程1来查,刚好处于未命中状态,于是查询数据库【旧值】
并准备把数据库数据写入缓存
恰好线程2 执行更新数据库操作,将数据库的值更新成【新值】
最后线程1终于开始执行写入缓存操作了,但是写的是【旧值】
这个概率微乎其微:
两个恰好,其次在线程1查询后 写缓存的这个微妙级别的操作之内,
必须恰好有另外的线程完成了 更新数据库这一耗时长的操作。
- 先操作缓存
- 更新缓存模式
- 读写穿透(Read / Write Through Pattern)
总结:最佳的缓存更新策略:
使用超时剔除机制作为保底策略,采用主动更新机制中的双写模式,选择删除缓存模式,先操作数据库后操作缓存。
3.4.3 实现查询商户店铺详细的缓存与数据库双写一致
超时剔除机制保底:
// 在原先的 queryById 方法中 添加过期时间
// 将数据写入Redis + 设置过期时间
stringRedisTemplate.opsForValue().set(
RedisConstants.CACHE_SHOP_KEY + id,
JSONUtil.toJsonStr(shop),
RedisConstants.CACHE_SHOP_TTL,
TimeUnit.MINUTES);
更新数据时,主动删除缓存:
注意添加事务,确保更新数据库和删除缓存的原子性
/**
* 更新店铺信息
* @param shop
* @return
*/
@Override
@Transactional
public Result update(Shop shop) {
Long id = shop.getId();
if(id == null){
return Result.fail("店铺id不能为空");
}
// 1. 更新数据库
updateById(shop);
// 2. 删除缓存
stringRedisTemplate.delete(RedisConstants.CACHE_SHOP_KEY + shop.getId());
// 3. 返回结果
return Result.ok();
}
3.5 (知识点)缓存穿透与解决策略
3.5.1 什么是缓存穿透?
缓存穿透是指客户端请求的数据在缓存中和在数据库中都不存在,这样缓存永远不会生效,这些请求都会直接被达到数据库上。
3.5.2 缓存穿透的危害?
若是有大量这种无效、恶意请求直接打到数据库。会极大地增加数据库本身的压力,很可能造成数据库宕机。就像DDOS攻击,导致正常的客户端请求无法得到及时的响应。
3.5.3 缓存穿透的解决措施
以下介绍的两种方式都是被动的解决缓存穿透方案。除此之外,我们还可以采用主动的方案预防缓存穿透,比如:增强id的复杂度避免被猜测id规律、做好数据的基础格式校验、加强用户权限校验
3.5.3.1 缓存空对象
优点: | 缺点: |
实现简单,维护方便 | 1. 额外的内存消耗 2. 可能造成短期的不一致问题 |

3.5.3.2 布隆过滤器
当一个元素加入布隆过滤器中的时候,会进行如下操作:
- 使用布隆过滤器中的哈希函数对元素值进行计算,得到哈希值(有几个哈希函数得到几个哈希值)。
- 根据得到的哈希值,在位数组中把对应下标的值置为 1。
当我们需要判断一个元素是否存在于布隆过滤器的时候,会进行如下操作:
- 对给定元素再次进行相同的哈希计算;
- 得到值之后判断位数组中的每个元素是否都为 1,如果值都为 1,那么说明这个值在布隆过滤器中,如果存在一个值不为 1,说明该元素不在布隆过滤器中。
优点: | 缺点: |
内存占用少,没有多余的Key | 1. 实现复杂 2. 存在误判可能 |

3.5.3.3 解决缓存穿透——以请求店铺信息为例

缓存空对象实现代码
/**
* 根据id查询店铺(添加Redis缓存版 + 解决缓存穿透[缓存空对象])
* @param id
* @return
*/
@Override
public Result queryById(Long id) {
//1. 根据id到Redis中查询用户信息
//2. Redis命中 ----------------> 返回商户信息 -------> 结束
//2.改:Redis命中 ----------------> 判断是否为空对象-------> 结束
// | 非空
// ------> 返回商户信息 -------> 结束
//3. Redis未命中 查询数据库
//4. 查询数据库未命中 ----------------> 返回错误信息 ------> 结束
//4.改: 查询数据库未命中----------> 将空对象写入Redis ------> 结束
//5. 数据库命中 ---------------> 将数据写入Redis ------> 返回商户信息 -------> 结束
String stopJson = stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
// 判断Redis中是否存在数据
if(StrUtil.isNotBlank(stopJson)){
// 存在直接返回
Shop shop = JSONUtil.toBean(stopJson, Shop.class); //将JSON字符串转换为对象
return Result.ok(shop);
}
// 判断Redis中是否存在空对象
if (RedisConstants.CACHE_PENETRATION_NULL_VALUE.equals(stopJson)){
return Result.fail("店铺信息不存在");
}
// query().eq("shop_id",id);
// 查询数据库
Shop shop = getById(id);
// 查询数据库不存在
if(shop == null){
// 将空对象写入Redis
stringRedisTemplate.opsForValue().set(
RedisConstants.CACHE_SHOP_KEY + id,
RedisConstants.CACHE_PENETRATION_NULL_VALUE,
RedisConstants.CACHE_NULL_TTL,
TimeUnit.MINUTES
);
return Result.fail("店铺不存在");
}
// 将数据写入Redis + 设置过期时间
stringRedisTemplate.opsForValue().set(
RedisConstants.CACHE_SHOP_KEY + id,
JSONUtil.toJsonStr(shop),
RedisConstants.CACHE_SHOP_TTL,
TimeUnit.MINUTES
);
return Result.ok(shop);
}
测试结果


3.6 (知识点)缓存雪崩与解决策略
缓存雪崩是指在同一时间段大量缓存key同时失效或者Redis服务宕机,导致大量请求打到数据库,带来巨大压力

解决策略:
- 事前:Redis集群部署,主从 + 哨兵机制, 避免全盘崩溃宕机
- 事中:本地缓存 + 限流和降级,避免数据库压力过大造成宕机
- 事后:Redis持久化,机器重启后可以自动从磁盘加载数据,恢复缓存数据
3.7 (知识点)缓存击穿与解决策略
缓存击穿问题又叫热点Key问题,是指一个高访问量并且缓存重建业务较为复杂的key突然失效了,导致无数请求直接打到数据库,造成巨大压力的情况
如图,在线程1进行缓存重建的过程,由于重建业务耗时较长,在重建业务期间,有其他大量线程执行查询操作,由于缓存未命中,均尝试执行查询数据库重建缓存的重复操作。
3.7.1 缓存击穿的解决方案
3.7.1.1 基于互斥锁的解决方案
为了防止在缓存重建的过程中,其余线程也去进行查询数据库重建缓存。互斥锁策略则是采用给第一个尝试重建缓存的线程添加互斥锁,其余的线程则在不断地进行 “尝试获取锁 --- 休眠等待”。从而减少了查询数据库造成的压力问题。

3.7.1.2 基于逻辑过期的解决方案
首先,我们会给这种高访问量业务的Key设置一个逻辑过期时间(到期不会被Redis自动删除,以确保缓存必定命中)。
然后,线程每次访问时,会先对当前时间与逻辑过期时间进行判断,过期则获取一个互斥锁,来表明自己是第一个发现需要缓存重建的线程。
接着,该线程就会开启一个独立线程,专门用于执行缓存重建的任务。自己则是先返回旧的数据使用。
在缓存重建线程执行完成之前,互斥锁不会释放,此时其他线程在访问的过程中获取锁失败,则直接返回旧数据。
最后,当缓存重建线程执行完毕后,释放互斥锁。
3.7.1.3 总结比较
互斥锁追求的是对一致性要求较高的业务,但是代价是多线程等待,性能受到了一定的影响
逻辑过期追求的是高性能的服务,但是却牺牲了一致性。
3.7.2 基于互斥锁解决缓存击穿案例——以请求店铺信息为例
3.7.2.1 互斥锁解决缓存击穿思路流程

3.7.2.2 代码实现——抽取方法
在实现功能的过程中,我们人为的在进行缓存重建的过程中添加了Thread休眠,这样使得整个缓存重建的时长变长,模拟复杂的重建业务,从而更容易能展示出互斥锁在高并发情况下减缓数据库压力的作用
模拟重建的延迟情况 : Thread.sleep(100);
public Result queryById(Long id) {
//1. 根据id到Redis中查询用户信息
//2. Redis命中 ----------------> 返回商户信息 -------> 结束
//2.改:Redis命中 ----------------> 判断是否为空对象-------> 结束
// | 非空
// ------> 返回商户信息 -------> 结束
//3. Redis未命中 -----> 查询数据库
//3.改:Redis未命中 -------> 尝试获取互斥锁 ------> 获取成功 -----> 查询数据库 -------> 将数据库结果写入Redis -------> 释放锁 ------> 返回商户信息 -------> 结束
// | 获取失败
// ------> 休眠一段时间后重试
//4. 查询数据库未命中 ----------------> 返回错误信息 ------> 结束
//4.改: 查询数据库未命中----------> 将空对象写入Redis ------> 结束
//5. 数据库命中 ---------------> 将数据写入Redis ------> 返回商户信息 -------> 结束
// -------------------------------------------上述思路将被分装成两个方法分别解决 缓存穿透 和 缓存击穿 问题------------------------------------------------------------------------------------------------ //
// 基于互斥锁解决缓存击穿问题
Shop shop = queryWithMutex(id);
if(shop == null){
return Result.fail("店铺不存在");
}
return Result.ok(shop);
}
/**
* 获取互斥锁 setNx ----- setIfAbsent
* @param key
* @return
*/
private boolean tryLock(String key){
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key,"1",RedisConstants.LOCK_SHOP_TTL,TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
/**
* 释放互斥锁
* @param key
*/
private void unLock(String key){
stringRedisTemplate.delete(key);
}
/**
* 基于互斥锁解决缓存击穿问题保存代码
* @param id
* @return
*/
private Shop queryWithMutex(Long id){
String stopJson = stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
// 判断Redis中是否存在数据
if(StrUtil.isNotBlank(stopJson)){
// 存在直接返回
Shop shop = JSONUtil.toBean(stopJson, Shop.class); //将JSON字符串转换为对象
// return Result.ok(shop);
return shop;
}
// 判断Redis中是否存在空对象
if (RedisConstants.CACHE_PENETRATION_NULL_VALUE.equals(stopJson)){
// return Result.fail("店铺信息不存在");
return null;
}
// 4. 实现缓存重建
//4.1 尝试获取互斥锁
String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
Shop shop = null;
try {
boolean isLock = tryLock(lockKey);
//4.2 获取互斥锁失败 休眠一段时间后 重新查询Redis(自旋)
if(!isLock){
Thread.sleep(50);
return queryWithMutex(id);
}
//4.3 获取互斥锁成功 查询数据库 将商铺信息写入Redis
// query().eq("shop_id",id);
// 查询数据库
shop = getById(id);
// 模拟重建的延迟情况
Thread.sleep(100);
// 查询数据库不存在
if(shop == null){
// 将空对象写入Redis
stringRedisTemplate.opsForValue().set(
RedisConstants.CACHE_SHOP_KEY + id,
RedisConstants.CACHE_PENETRATION_NULL_VALUE,
RedisConstants.CACHE_NULL_TTL,
TimeUnit.MINUTES
);
// return Result.fail("店铺不存在");
return null;
}
// 将数据写入Redis + 设置过期时间
stringRedisTemplate.opsForValue().set(
RedisConstants.CACHE_SHOP_KEY + id,
JSONUtil.toJsonStr(shop),
RedisConstants.CACHE_SHOP_TTL,
TimeUnit.MINUTES
);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
//4.4 释放互斥锁
unLock(lockKey);
}
//4.5 返回商铺信息
// return Result.ok(shop);
return shop;
}
3.7.2.3 功能测试——基于JMeter的压力测试
配置JMeter测试任务
配置访问后端数据地址

清空缓存店铺信息

执行测试任务

后台查询数据库只执行了一次
3.7.3 基于逻辑过期解决缓存击穿案例——以请求店铺信息为例
3.7.3.1 逻辑过期解决缓存击穿思路流程

3.7.3.2 构建RedisData对象——组合优于继承
我们先前定义的Shop对象实际上是没有逻辑过期时间字段的。如何解决这个问题呢?明显有两种方法:
基于继承的方法:【对Shop有侵入性】
创建一个父类,内涵 expireTime字段,让原先的Shop对象继承父类,从而获得逻辑过期时间字段。
基于组合的方法:
定义一个组合对象RedisData,包含一个逻辑过期时间字段和一个Object对象
3.7.3.3 代码实现
代码包括:
1. 开启一个大小为10的线程池
2. 查询店铺方法
3. 封装RedisData对象的公用方法【需要用于单元测试获取数据,所以要定义成公用方法】
4. 封装利用逻辑过期时间解决缓存击穿的私有方法
单元测试代码
@SpringBootTest
class HmDianPingApplicationTests {
@Resource
private ShopServiceImpl shopService;
@Test
void testSaveShop() throws InterruptedException {
shopService.saveShop2Redis(1L,10L);
}
}
/**
* 开启线程池
*/
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
/**
* 根据id查询店铺(添加Redis缓存版 + 解决缓存穿透【缓存空对象】+ 互斥锁实现方案)
* @param id
* @return
*/
@Override
public Result queryById(Long id) {
// 基于逻辑过期解决缓存击穿问题
Shop shop = queryWithLogicalExpire(id);
if(shop == null){
return Result.fail("热点店铺不存在");
}
return Result.ok(shop);
}
/**
* 创建RedisData对象 【原Shop + 逻辑过期时间字段】
* @param id
* @param expireTime
*/
public void saveShop2Redis(Long id, Long expireTime) throws InterruptedException {
// 1.查询店铺数据
Shop shop = getById(id);
// 模拟休息情况
Thread.sleep(100);
// 2.封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireTime));
// 3.写入Redis
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
}
/**
* 基于逻辑过期解决缓存击穿问题 保存代码、
* 为什么不用判断缓存穿透问题,因为一定有Key,如果没有Key必然不对,不需要让它继续访问数据库了,直接打回就可
* @param id
* @return
*/
private Shop queryWithLogicalExpire(Long id){
String stopJson = stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
//1. 判断Redis中是否存在数据
if(StrUtil.isBlank(stopJson)){
//1.1 未命中直接结束
return null;
}
//2. 命中了,判断缓存是否过期
//2.1 将JSON字符串反序列为对象
RedisData redisData = JSONUtil.toBean(stopJson,RedisData.class);
JSONObject data =(JSONObject) redisData.getData();
Shop shop = JSONUtil.toBean(data,Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
if(expireTime.isAfter(LocalDateTime.now())){ // 未过期
// 直接返回
return shop;
}
// 3. 已过期,需要缓存重建
// 4. 尝试获取互斥锁
String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
if(isLock){
//5. 获取互斥锁成功,开启独立线程,查询数据库,重建缓存
CACHE_REBUILD_EXECUTOR.submit(() ->{
try {
// 重建缓存
this.saveShop2Redis(id, 20L);
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
// 6. 释放互斥锁
unLock(lockKey);
}
});
}
return shop;
}
3.7.3.4 功能测试——基于JMeter的压力测试
想要测试【逻辑过期时间】策略,必须先在缓存中准备好数据,否则测试不成功,具体原因请看3.7.4故障排除
1. 执行测试方法获取Redis缓存店铺数据
等到该缓存数据过期后再执行方法
2. 修改后台数据库中店铺名,用于后续更好的观察该策略的效果
3. JMemet测试
全部请求都通过了,但是前面的请求查询到的数据还是 “102茶餐厅”【旧数据】
从这个请求开始,重建线程已经完成,后续请求都和后台数据库同步了,都是“103茶餐厅”
3.7.4【故障排查】关于逻辑过期时间策略无法查询店铺信息的现象
当我们使用逻辑过期时间策略时,在直接访问店铺数据时会发现无论怎么样,店铺数据都不会被存储到缓存中,也不会去查询数据库,导致了店铺数据一直无法访问。
这是出于逻辑过期时间策略 默认要求 热点Key数据是由管理员事先存放到Redis中设置好,等到活动结束后再人为的删除掉的。
所以在第一次访问时,如果没有提前将店铺数据存放在Redis,该策略直接返回空对象后结束,因而造成了这种现象的发生。


3.8 封装Redis缓存工具
3.8.1 封装代码
@Component
@Slf4j
public class CacheClient {
private final StringRedisTemplate stringRedisTemplate;
/**
* 开启线程池
*/
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
/**
* 获取互斥锁 setNx ----- setIfAbsent
* @param key
* @return
*/
private boolean tryLock(String key){
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key,"1",RedisConstants.LOCK_SHOP_TTL,TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
/**
* 释放互斥锁
* @param key
*/
private void unLock(String key){
stringRedisTemplate.delete(key);
}
/**
* 构造方法
* @param stringRedisTemplate
*/
public CacheClient(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
/**
* 将任意Java对象序列化为json并存储在String类型的key中,并且可以设置过期时间
* @param key
* @param value 任意java对象
* @param time 过期时间
* @param unit 过期时间单位
*/
public void set(String key, Object value, Long time, TimeUnit unit){
// 序列化
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit);
}
/**
* 将任意java对象序列化成json并存储在String类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
* @param key
* @param value 任意java对象
* @param time 逻辑过期时间
* @param unit 逻辑过期时间单位
*/
public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit){
// 封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(value);
// 利用uint的 toSeconds 转换成Second
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
// 写入Redis【逻辑过期】
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(redisData));
}
/**
* 根据指定的key查询缓存,并反序列为指定的类型,利用缓存空值来解决缓存穿透问题
* @param keyPrefix key前缀
* @param id 查询数据库id
* @param type 返回值类型
* @param dbFallback 数据库查询方法
* @param time 过期时间
* @param unit 过期时间单位
* @return
* @param <R> 返回值类型
* @param <ID> 查询数据库id类型
*/
public <R,ID> R queryWithPassThrough(
String keyPrefix,
ID id,
Class<R> type,
Function<ID,R> dbFallback,
Long time,
TimeUnit unit
) {
String key = keyPrefix + id;
//1. 查询Redis
String json = stringRedisTemplate.opsForValue().get(key);
//2. 判断是否存在
if(StrUtil.isNotBlank(json)){
//3. 存在,直接返回
// 反序列化回 type 类型
return JSONUtil.toBean(json,type);
}
//判断是否为空值
if(json != null){
// 返回
return null;
}
//4. 不存在,根据id查询数据库
R r = dbFallback.apply(id);
//5. 不存在,返回错误
if(r == null){
// 写入空值
stringRedisTemplate.opsForValue().set(key,CACHE_PENETRATION_NULL_VALUE,CACHE_NULL_TTL,TimeUnit.MINUTES);
return null;
}
//6. 存在,写入Redis
this.set(key,r,time,unit);
//7. 返回
return r;
}
/**
* 根据指定的key查询缓存,并反序列成指定类型,利用逻辑过期解决缓存击穿问题
* @param KeyPrefix key前缀
* @param id 查询数据库id
* @param type 返回值类型
* @param dbFallback 数据库查询方法
* @param time 逻辑过期时间
* @param unit 逻辑过期时间单位
* @return
* @param <R> 返回值类型
* @param <ID> 查询数据库id类型
*/
public <R,ID> R queryWithLogicalExpire(
String KeyPrefix,
ID id,
Class<R> type,
Function<ID,R> dbFallback,
Long time,
TimeUnit unit
) {
// 查询Redis
String json = stringRedisTemplate.opsForValue().get(KeyPrefix + id);
//1. 判断Redis中是否存在数据
if(StrUtil.isBlank(json)){
//1.1 不存在直接结束
return null;
}
//2. 命中了,判断缓存是否过期
//2.1 将JSON字符串反序列为对象
RedisData redisData = JSONUtil.toBean(json,RedisData.class);
JSONObject data =(JSONObject) redisData.getData();
R r = JSONUtil.toBean(data,type);
LocalDateTime expireTime = redisData.getExpireTime();
if(expireTime.isAfter(LocalDateTime.now())){ // 未过期
// 直接返回
return r;
}
// 3. 已过期,需要缓存重建
// 4. 尝试获取互斥锁
String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
if(isLock){
//5. 获取互斥锁成功,开启独立线程,查询数据库,重建缓存
CACHE_REBUILD_EXECUTOR.submit(() ->{
try {
// 重建缓存
//1. 查数据库
R r1 = dbFallback.apply(id);
//2. 写redis
this.setWithLogicalExpire(KeyPrefix,r1,time,unit);
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
// 6. 释放互斥锁
unLock(lockKey);
}
});
}
return r;
}
}
3.8.2 使用方法
/**
* 根据id查询店铺(添加Redis缓存版 + 解决缓存穿透【缓存空对象】+ 互斥锁实现方案)
* @param id
* @return
*/
@Override
public Result queryById(Long id) {
// 解决缓存穿透问题
// Shop shop = queryWithPassThrough(id);
// 基于互斥锁解决缓存击穿问题
// Shop shop = queryWithMutex(id);
// 基于逻辑过期解决缓存击穿问题 【需要提前准备好Redis缓存数据】
// Shop shop = queryWithLogicalExpire(id);
// 基于自定义封装的Redis缓存工具解决缓存穿透问题
// Shop shop = cacheClient.queryWithPassThrough(
// CACHE_SHOP_KEY,
// id,
// Shop.class,
// this::getById,
// CACHE_SHOP_TTL,
// TimeUnit.SECONDS);
// 基于自定义封装的Redis缓存工具解决缓存击穿问题【逻辑过期时间】
Shop shop = cacheClient.queryWithLogicalExpire(
CACHE_SHOP_KEY,
id,
Shop.class,
this::getById,
CACHE_SHOP_TTL,
TimeUnit.MINUTES);
if (shop == null){
return Result.fail("店铺不存在");
}
return Result.ok(shop);
}
四、 优惠卷秒杀系列功能实现
4.1 全局ID生成器
4.1.1 全局ID生成器的选型
为什么需要全局ID生成器呢?
由于秒杀业务需要,有时候我们会生成数量极为庞大的业务数据。但是对于单张表,往往会存在存储数据的上限(随着存储数据的增加,查询的效率会越来越低),我们往往会进行分表操作。
这么一来,分表带来的 ID不唯一、ID自增的规律性太强导致安全性低等问题就产生了。为了解决这些问题,我们可以利用全局ID生成器进行开发。
全局ID生成器需要具备的特征:
使用Redis符合上述五点需求
高可用、高性能: Redis的查询能力本身就比数据库要快得多
唯一性: 由于Redis数据库可以全局通用一张表,因此可以更好地维护ID的唯一性
递增性:Redis拥有自增操作,INCREMENT
安全性:通过设置ID的自增规律,可以提高ID的安全性
全局ID生成器的格式规范:
4.1.2 全局ID生成器的实现
步骤:
1. 获取一个基准时间戳,标记为BEGIN_TIMESTAMP
2. 获取当前时间戳标记为now
3. 计算全局ID的时间戳部分
4. 以天为单位,分割key【可以更好的统计每天的订单数量】
5. 获取全局ID的序列号【调用Redis的自增方法】
6. 利用运算符将时间戳 与 序列号 进行拼接并返回
在utils包下新建RedisWorkers用于实现全局ID生成:
@Component
public class RedisIdWorker {
// 开始时间戳 2024-09-12-0-0-0
private static final long BEGIN_TIMESTAMP = 1726099200L;
// 序列号位数
private static final int COUNT_BITS = 32;
private StringRedisTemplate stringRedisTemplate;
public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
public Long nextId(String keyPrefix){
// 1. 时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond-BEGIN_TIMESTAMP;
// 2. 序列号
//2.1 获取当前日期,精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 3. 拼接返回
return timestamp << COUNT_BITS | count;
}
/**
* 获取当前时间戳
* @param args
*/
public static void main(String[] args) {
LocalDateTime time = LocalDateTime.of(2024,9,12,0,0,0);
Long second = time.toEpochSecond(ZoneOffset.UTC); // 1726099200
System.out.println("second=" + second);
}
}
4.1.3 全局ID生成器的测试
开启3000个线程任务,每个线程插入100条订单数据,运行查看效果
@SpringBootTest
class HmDianPingApplicationTests {
@Resource
private RedisIdWorker redisIdWorker;
private ExecutorService es = Executors.newFixedThreadPool(500);
@Test
void testIdWorker() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3000);
Runnable task = () ->{
for(int i=0;i<100;i++){
long id = redisIdWorker.nextId("order");
System.out.println("id = " + id);
}
latch.countDown();
};
long begin = System.currentTimeMillis();
for(int i=0;i<3000;i++){
es.submit(task);
}
latch.await();
long end = System.currentTimeMillis();
System.out.println("time = " + (end-begin));
}
}

以日期作为key分割,可以统计当天订单数

4.1.4 其他ID生成器的拓展
除了采用Redis自增生成全局ID,还有以下方法可行:
1. UUID 【无序】
2. 数据库自增 【数据量不能太多】
3. 雪花算法 (本次项目使用的Redis自增实际上就是运用了雪花算法的思想)
Redis自增策略
1. 每天一个key,方便统计订单量
2. ID构造是 时间戳 + 计数器
4.2 利用PostMan模拟管理员后台添加秒杀优惠卷信息


【代码实现】
/**
* 新增秒杀优惠券
* 多表添加 加事务
* @param voucher
*/
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
// 关联优惠券id
seckillVoucher.setVoucherId(voucher.getId());
// 秒杀库存
seckillVoucher.setStock(voucher.getStock());
// 开始时间
seckillVoucher.setBeginTime(voucher.getBeginTime());
// 结束时间
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
}
【PostMan测试】
4.3 优惠卷秒杀下单功能基础逻辑实现
【功能说明】
实现基本的下单逻辑
请求方式 |
POST |
请求路径 | /voucher-order/seckill/{id} |
请求参数 | 优惠卷id |
返回值 | 订单Id |
【代码实现】
利用全局Id生成订单id
@Resource
private ISeckillVoucherService seckillVoucherService;
/**
* 全局ID生成器
*/
@Resource
private RedisIdWorker redisIdWorker;
/**
* 秒杀优惠券下单
* @param voucherId
* @return
*/
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
//1. 提交优惠卷id 查询优惠卷id信息
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
if(voucher == null){
return Result.fail("优惠券不存在!");
}
//2. 判断优惠卷是否在抢购时间内
LocalDateTime beginTime = voucher.getBeginTime();
LocalDateTime endTime = voucher.getEndTime();
long stockCount =voucher.getStock();
if(beginTime.isAfter(LocalDateTime.now())){
// 否----> 返回异常信息---->结束
return Result.fail("秒杀尚未开始!");
}
if(endTime.isBefore(LocalDateTime.now())){
// 否----> 返回异常信息---->结束
return Result.fail("秒杀已经结束!");
}
//3. 判断优惠卷库存是否充足
if(stockCount <= 0){
// 否----> 返回异常信息---->结束
return Result.fail("库存不足!");
}
//4. 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1")
.eq("voucher_id",voucherId).update();
if(!success){
return Result.fail("库存不足!");
}
//5. 创建优惠卷订单
VoucherOrder voucherOrder = new VoucherOrder();
//5.1 订单id 【全局ID生成器】4
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//5.2 用户id 【当前登录用户】
long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
//5.3 优惠卷id 【传递过来的优惠卷id】
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//6. 返回订单id ----> 结束
return Result.ok(orderId);
}
【功能测试】
点击抢购,成功生成订单信息,库存扣减1
4.4 (优化)解决超卖问题
4.4.1 超卖现象重现及其原因
步骤:
利用jmeter进行多线程模拟高并发抢购优惠卷时的情况
现象:
发现优惠卷有超卖的现象
原因:
在多线程高并发访问的环境下,当库存为1时,正常情况下最后一个线程在执行完最后一个扣减操作的过程中,有其余的线程涌入,此时由于扣减操作未完成,库存量还是为1,从而误判库存剩余,最后导致超卖。
现象回顾




回看数据库报告
正常来说异常为50%才是正确的,但是这里的异常没有到50%。在高并发访问的过程中出现了超卖现象,即同一时刻两个线程同时完成导致优惠卷最后的数量异常。


4.4.2 解决超卖问题的方案比较
悲观锁方案
- 悲观锁,认为线程安全问题一定会发生,因此操作数据库之前都需要先获取锁,确保线程串行执行。常见的悲观锁有:
synchronized
、lock
- 悲观锁,适合写入操作较多、冲突频繁的场景
乐观锁方案
- 乐观锁,认为线程安全问题不一定发生,因此不加锁,而是判断有没有其他线程在自己进行操作时修改了数据,有则重试。常见的实现方式有:版本号法、CAS操作.
- 乐观锁,适合读取操作较多、冲突较少的场景。
4.4.3 乐观锁两大方案比较
版本号法(涉及ABA问题时使用)
首先,我们要为数据库表新增一个版本号字段 version
然后,线程开始查询库存及其库存版本号,记录版本号信息。
接着,在通过判断后,执行扣减时,检查当前版本号与先前记录的版本号信息是否一致,如若一致(例如线程1),则可以顺利执行扣减更新语句。如若不一致(例如线程2),则无法执行扣减更新语句。
CAS法(不涉及ABA问题时使用)
在版本号的基础上,可以发现其实库存量直接可以充当版本号的作用,于是就出现了更加简洁,不用额外添加字段的方法——CAS法。
尽管CAS操作具有诸多优点,但它也存在一个潜在的问题,即ABA问题。ABA问题是指在CAS操作过程中,一个线程a将数值改成了b,接着又改成了a,此时CAS认为是没有变化,其实是已经变化过了。这个问题在多线程环境下可能会导致程序行为不符合预期,从而引发并发错误。
4.4.4 基于CAS法解决超卖问题
只需要在扣减库存这里加多一个库存值的判断,同时记得如果判断出问题了要重试!
//4. 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1")
.eq("voucher_id",voucherId).eq("stock",voucher.getStock())
.update();
上述代码的异常率特别高
解决失败率高的问题
//4. 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1")
.eq("voucher_id",voucherId).gt("stock",0) // 库存大于0就行了
.update();
4.5 (优化)实现单机情况下 限购一人一单
4.5.1 限购功能整体概述
为什么要一人一单?
对于秒杀优惠卷,我们当然不希望一个人承包大部分甚至所有的优惠卷,这种行径无疑是电商黄牛。我们希望每个顾客至多只能抢购一张券。
限购一人一单实现逻辑?
在扣减库存之前,我们对优惠卷数据进行查询,查看当前用户(用户id)是否在数据库中已经存在抢购数据,如果有了,则不再给予抢购机会。否则才会进行扣减库存的操作
实现限购功能需要克服的潜在风险?
存在线程并发问题: 假设有100个相同用户标识的线程同时查询数据库,发现数据库没有数据时,100个线程同时通过了判断,结果导致1个用户抢了多单
4.5.2 限购功能实现及说明
【基础实现】
根据前面的概述,我们大概明白了整个功能的逻辑,实际上就是简单的增加一个判断。
//4. 限制一人一单【悲观锁方案】
Long userId = UserHolder.getUser().getId();
//4.1 查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
//4.2 判断订单是否存在
// 是 -----> 返回异常信息---->结束
if (count > 0) {
return Result.fail("该用户已经购买过一次了!");
}
【基础锁实现】
但是单单只添加判断逻辑,还会造成线程并发问题,因此我们需要给判断过程加锁,最简单的方式就是将一人一单功能抽取成一个方法,在方法上添加synchronized关键字
@Transactional
public synchronized Result createVoucherOrder(Long voucherId) {
//4. 限制一人一单【悲观锁方案】
Long userId = UserHolder.getUser().getId();
//4.1 查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
//4.2 判断订单是否存在
// 是 -----> 返回异常信息---->结束
if (count > 0) {
return Result.fail("该用户已经购买过一次了!");
}
//5. 扣减库存——解决超卖问题【乐观锁方案】
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1")
.eq("voucher_id", voucherId).gt("stock", 0) // 库存大于0就行了
.update();
if (!success) {
return Result.fail("库存不足!");
}
//6. 创建优惠卷订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1 订单id 【全局ID生成器】4
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2 用户id 【当前登录用户】
// long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
//6.3 优惠卷id 【传递过来的优惠卷id】
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//7. 返回订单id ----> 结束
return Result.ok(orderId);
}
【降低锁粒度实现】
以上代码还无法实现全部功能,由于我们将整个方法都锁住了,相当于每个用户进行抢购时,都需要判断该方法是否已经被执行,这样会导致整个抢购秒杀过程退化成串行执行。 而我们想实现的是同一个用户标识线程串行执行,不同用户之间并行执行。因此我们需要降低锁的粒度,将锁作用在用户上。
【关于如何锁用户】
userId.toString().intern()
由于同一个用户标识的不同线程创建的User对象实际上并不是同一个,为了能达成用户级锁,我们必须是按值锁,因此需要调用toString().intern()方法
@Transactional
public Result createVoucherOrder(Long voucherId) {
//4. 限制一人一单【悲观锁方案】
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
//4.1 查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
//4.2 判断订单是否存在
// 是 -----> 返回异常信息---->结束
if (count > 0) {
return Result.fail("该用户已经购买过一次了!");
}
//5. 扣减库存——解决超卖问题【乐观锁方案】
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1")
.eq("voucher_id", voucherId).gt("stock", 0) // 库存大于0就行了
.update();
if (!success) {
return Result.fail("库存不足!");
}
//6. 创建优惠卷订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1 订单id 【全局ID生成器】4
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2 用户id 【当前登录用户】
// long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
//6.3 优惠卷id 【传递过来的优惠卷id】
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//7. 返回订单id ----> 结束
return Result.ok(orderId);
}
}
【扩大锁范围实现】
上述代码实现了将锁的粒度成功的缩小到了用户级,但是还不是最优的代码,还存在一定的风险。具体的,由于@Transactional事务是作用在方法上的,而现在我们的锁仅仅是作用在方法内部,这样一来就会存在一种异常情况:当方法内部都执行完毕了,锁会先释放,然后才是Spring自动提交事务。在提交事务完成之前,锁已经释放掉了,这也就意味着在这期间其他的线程完全有机会趁虚而入。为了解决这个问题,我们必须把锁的范围重新扩大到将方法包裹其中。
/**
* 秒杀优惠券下单
* @param voucherId
* @return
*/
@Override
public Result seckillVoucher(Long voucherId) {
//1. 提交优惠卷id 查询优惠卷id信息
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
if(voucher == null){
return Result.fail("优惠券不存在!");
}
//2. 判断优惠卷是否在抢购时间内
LocalDateTime beginTime = voucher.getBeginTime();
LocalDateTime endTime = voucher.getEndTime();
long stockCount =voucher.getStock();
if(beginTime.isAfter(LocalDateTime.now())){
// 否----> 返回异常信息---->结束
return Result.fail("秒杀尚未开始!");
}
if(endTime.isBefore(LocalDateTime.now())){
// 否----> 返回异常信息---->结束
return Result.fail("秒杀已经结束!");
}
//3. 判断优惠卷库存是否充足
if(stockCount <= 0){
// 否----> 返回异常信息---->结束
return Result.fail("库存不足!");
}
/* 为什么要加在createVoucherOrder方法中?
* 因为createVoucherOrder方法中已经加了事务,如果加了锁,事务回滚,锁也会释放,这样就会导致锁失效
* 为什么要指定锁用户id?
* 将锁的粒度降低,将锁的粒度降低到用户粒度,这样就可以保证一个用户只会有一个订单
* 添加.intern() 确保是按值加锁 而不是按对象加锁
* 为什么要使用代理对象?
* @Transactional注解是Spring的事务注解,它只能在Spring管理的Bean中生效,
* 如果直接在ServiceImpl中调用createVoucherOrder方法,
* 那么@Transactional注解就不会生效,
* 因为createVoucherOrder方法不是在Spring管理的Bean中调用的,
* 所以需要使用代理对象来调用createVoucherOrder方法,这样@Transactional注解就会生效
*/
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
return createVoucherOrder(voucherId);
}
}
【解决代理对象事务】
上述代码看起来很完美了,但是事实上还是存在问题@Transactional事务机制是失效的。这是由于我们创建的createVoucherOrder()方法并不受Spring管理,所以没有办法使用Spring的事务机制。要解决这个问题,我们需要引入Spring中AspectJ注解支持来暴露代理对象,通过代理对象机制实现事务功能。具体操作如下:
1. 导入AspectJ注解支持依赖
2. 获取createVoucherOrder()方法的代理对象
3. 通过代理对象调用方法
4. 在启动类上添加@EnableAspectJAutoProxy(exposeProxy = true)注解,暴露代理对象
<!--动态代理模式-->
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
/**
* 启动类添加@EnableAspectJAutoProxy(exposeProxy = true) 暴露代理对象
*
*/
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
// 开启AspectJ注解支持 暴露代理对象
@EnableAspectJAutoProxy(exposeProxy = true)
public class HmDianPingApplication {
public static void main(String[] args) {
SpringApplication.run(HmDianPingApplication.class, args);
}
}
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
// 获取代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
4.5.3 限购功能测试检验
两百个线程共用一个token向后端发送请求,来模拟同一用户高并发抢购的情况
200个线程只有1个通过了请求,数据库中优惠卷剩余99张,优惠卷订单只有1个。证明我们的功能实现了
4.6 (拓展)集群模式下限购一人一单
4.6.1 单体模式下限购功能实现的局限性
在4.5时,我们成功的完成了限购一人一单的业务功能。事实上还是有相对较大的局限性。由于秒杀业务的高并发性,为了防止单台服务器压力过大的原因,我们会考虑将多台服务器部署成一个集群环境。这时候,我们在单体环境中写的加用户锁方案就有了并发安全问题。
如下图,我们知道。Java提供的锁方案事实上是由JVM下的锁监视器去进行监听的。因此在单体环境中,由于锁监视器唯一,可以正常的执行线程监听任务。但是放在集群环境中,每一个服务器都有一个JVM平台,都有一个锁监视器。在nginx服务器发送轮询请求时,可以有多个线程同时获得锁,同时执行。于是乎限购功能出现了并发安全问题。
4.6.2 配置集群环境,测试限购代码的并发问题
配置两台Tomcat服务器
配置nginx服务器轮询访问
并重新启动nginx服务器
测试集群环境
访问:localhost:8080/api/voucher/list/1 两次
8081、8082 各被轮询了1次
4.6.3 解决策略——分布式锁
解决策略无非是将监视器统一,且必须要在集成环境中唯一共享。分布式锁正是这样一种策略。
我们将会在下一小结详细介绍分布式锁的相关知识,并使用分布式锁在集群环境下解决并发安全问题
4.7 (知识点)分布式锁
4.7.1 分布式锁介绍
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
分布式锁的特点:
1、高可用的获取锁与释放锁;
2、高性能的获取锁与释放锁;
3、多进程可见
4、具备可重入特性;
5、具备锁失效机制,防止死锁;
6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。
分布式锁的实现方式:
1. 基于数据库的实现:
使用数据库的事务和锁机制来实现分布式锁。可靠性高,支持事务机制和锁机制,但是性能较低,对数据库性能有一定影响。适用于需要强一致性和高可靠性的场景。
2. 基于缓存的实现:
使用缓存系统(如Redis)的原子操作来实现分布式锁。性能较高,支持原子操作,但是可靠性相对较低,需要处理缓存故障和失效的情况。适用于高并发场景。
3. 基于分布式协调服务的实现:
使用分布式协调服务(如ZooKeeper、etcd)来实现分布式锁。通过创建临时顺序节点和监听节点变化来控制锁的获取和释放。可靠性高,支持分布式环境,但是性能较低,对分布式协调服务的性能有一定影响。适用于需要高可靠性和一致性的场景。
4. 基于消息队列的实现:
使用消息队列来实现分布式锁。通过发送和接收消息来控制锁的获取和释放。
优点简单易用,适用于简单的场景。缺点性能较低,不适用于高并发和高吞吐量的场景。
4.7.2 基于Redis的分布式锁方案
获取锁 + 添加过期时间 【且确保原子性操作】
EX 设置过期时间 NX 不存在才添加
SETNX lock thread1 EX 10 NX
释放锁
DEL key
4.7.3 基于Redis实现分布式锁的初级版本
利用Redis,完成基本的分布式锁构建
4.7.3.1 代码实现
创建Lock接口
//父类
public interface ILock {
/**
* 获取锁
* @param timeoutSec
* @return
*/
boolean tryLock(long timeoutSec);
/**
* 释放锁
*/
void unlock();
}
实现Redis分布式锁方法
//实现 ILock接口
public class SimpleRedisLock implements ILock{
private String name;
private StringRedisTemplate stringRedisTemplate;
// 锁前缀
private static final String KEY_PREFIX = "lock:";
/**
* 获取锁
* @param timeoutSec
* @return
*/
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识
long threadId = Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name ,threadId+"",timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
/**
* 释放锁
*/
@Override
public void unlock() {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
使用分布式锁
/*
* 分布式锁方案
*/
// 1. 创建锁对象
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
// 2. 尝试获取锁
boolean isLock = lock.tryLock(1200);
// 3. 判断锁是否获取成功
if(! isLock){
return Result.fail("不允许重复下单");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
// 4. 释放锁
lock.unlock();
}
4.7.3.2 接口测试
利用postman进行测试
发送两个请求,获取锁成功,一个获取锁失败,线程Id为72
4.7.4 (优化)解决分布式锁出现的误删问题
4.7.4.1 出现误删的原因判断
线程1获取分布式锁后运行,结果发生了业务阻塞,阻塞时间大于设定的超时释放锁时间。于是在线程1还未执行完业务时,分布式锁已经被释放了。
这时候线程2获取锁开始执行业务,恰好线程1结束了阻塞,也顺利的结束了业务,于是将锁释放了。这样一来线程2的锁又没有了,线程3恰巧又来了......如此反复,导致线程1释放线程2拿到的的锁;线程2释放线程3拿到的锁,造成并发安全问题
4.7.4.2 解决方案——检查锁是不是自己的:
如何知道这个锁是不是自己的呢? -----在获取锁时存入线程标识【UUID】
流程:
4.7.4.3 代码实现及其测试
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();// 判断锁是不是自己的
if(threadId.equals(id)){ // 如果是自己的锁,则释放锁
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
// 如果不是自己的锁,则不管
/**
* 获取锁
* @param timeoutSec
* @return
*/
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name ,threadId,timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
/**
* 释放锁
*/
@Override
public void unlock() {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断锁是不是自己的
if(threadId.equals(id)){ // 如果是自己的锁,则释放锁
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
// 如果不是自己的锁,则不管
}
集群环境测试


4.7.5 (优化)解决分布式锁的原子性问题
4.7.5.1 非原子性操作带来的并发安全问题
在我们完善了Redis分布式锁误删问题后,程序还是没有达到完美状态。以下这种情况依旧存在并发安全问题:
线程1获取锁执行业务,在执行业务完成后判断当前锁的标识是否和自己的一致,发现一致后,执行释放锁操作。但恰好在执行释放锁操作时出现了阻塞,知道超过了锁的超时释放时间。锁被释放了,于是乎线程2提前拿到了锁,结果线程1阻塞结束,又把线程2刚刚拿到的锁给释放掉了
4.7.5.2 解决方案——引入Rua脚本实现命令原子化
出现上述问题,究其原因就是 判断锁标识 和 释放锁 是两个动作,不符合原子性。因此若是在两者间发生了阻塞,就会造成并发安全问题。
所以解决策略也很简单, 将判断锁的操作和释放锁的操作组合成一个原子性操作,一起执行,要阻塞都阻塞,要通过都通过
在Redis中, 提供了 Lua 脚本功能,在一个脚本中编写多条 Redis 命令,确保多条命令执行时的原子性。
4.7.5.3 Rua脚本在Redis中的基本使用
1. 执行Redis命令
redis.call('命名名称','key','其他参数') redi.call('set','name','jack')
2. 执行脚本
EVAL "脚本内容" 脚本需要的key类型的参数个数 # 执行无参脚本 EVAL "return redis.call('set','name','jack') " 0 # EVAL执行带参脚本 redis 127.0.0.1:6379> eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second 1) "key1" 2) "key2" 3) "first" 4) "second"
4.7.5.4 使用Rua编写释放锁脚本
-- 释放锁的业务
-- 1. 获取锁中的线程标识
-- 2. 判断是否与指定的标识相同
-- 3. 一致则释放锁
----------------------开始脚本-------------------------
-- 锁的key
--local key = "lock:order:5"
local key = KEYS[1]
-- 当前线程标识
--local threadId = "asdasdasdasd"
local threadId = ARGV[1]
-- 获取锁中的线程标识
local id = redis.call("GET", key)
-- 比较锁中的线程标识与当前线程标识是否一致
if(id == threadId) then
-- 释放锁
redis.call("DEL", key)
end
return 0
----------------------结束脚本-------------------------
4.7.5.5 在IDEA编写使用lua脚本的代码
在SimpleRedisLock方法内修改:
// 提前读取脚本
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
// 指定脚本位置
UNLOCK_SCRIPT.setLocation(new ClassPathResource("lua/unlock.lua"));
// 指定返回类型
UNLOCK_SCRIPT.setResultType(Long.class);
}
/**
* 释放锁 --- 基于lua脚本
*/
@Override
public void unlock() {
/*
* 调用Lua脚本
* 参数1: 预加载的lua脚本
* 参数2: KEYS[]
* 参数3: ARGV[]
*/
String key = KEY_PREFIX + name;
String argv = ID_PREFIX + Thread.currentThread().getId();
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(key),
argv
);
}
4.7.5.6 功能测试
1.启动集群环境,先让8081 获取锁成功
2. 模拟锁超时,将Redis中的锁删掉,让8082获得一把新的锁
3. 让失去锁的8081执行lua脚本,观察8082的锁会不会被清除
结果:不会被清除
4. 让获得锁的8082执行lua脚本,观察8082的锁会不会被清除
结果: 被清除了
4.8 (拓展知识点) Redisson ——成熟的锁工具包
4.8.1 基于setnx实现的分布式锁还存在的问题
1. 不可重入问题:同一个线程无法多次获取相同的一把锁
2. 不可重试问题:获取锁只尝试一次就返回false,没有重试机制
3. 超时释放问题:业务耗时过长,还是有可能导致锁释放,设置超时时间很讲究
4. 主从一致性问题:主从同步存在延迟,如果在主节点设置锁,在还没有同步到从节点时,主节点宕机。
当然,以上问题指针对部分有需要的业务才会有问题,我们前面完善的Redis + lua脚本的分布式锁方案已经很优秀了
4.8.2 Redisson介绍
官网:Home · redisson/redisson Wiki (github.com)
Redisson 是一个在 Redis 的基础上实现的一个 Java 驻内存数据网格(In-Memory Data Grid, IMDG)。它提供了丰富的分布式和可扩展的 Java 数据结构,包括分布式锁和同步器。Redisson 的可重试锁(Retryable Lock)是基于 Redis 的特性来实现的一种锁机制,它允许在锁不可用时进行重试,直到成功获取锁或者达到重试次数上限。
4.8.3 Redisson入门
1. 引入依赖
<!--Redisson--> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.6</version> </dependency>
2. 创建Config文件
@Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient(){ // 配置 Config config = new Config(); // 单机模式 // 集群模式:config.useClusterServers().addNodeAddress("redis://192.168.186.136:7000"); config.useSingleServer().setAddress("redis://192.168.186.136:6379").setPassword("123321"); // 创建RedissonClient对象 return Redisson.create(config); } }
3. 使用Redisson——以秒杀优惠卷为例
/* * Redisson方案 */ //1. 创建锁对象 RLock lock = redissonClient.getLock("lock:order:" + userId); //2. 尝试获取锁 boolean isLock = lock.tryLock(); // 3. 判断锁是否获取成功 if(! isLock){ return Result.fail("不允许重复下单"); } try { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { // 4. 释放锁 lock.unlock(); }
4.8.4 Redisson的可重入原理
不可重入锁的基本流程Demo:
在method1中获取锁成功后,其调用的method2没办法获取锁。在这种嵌套的业务中,不可重入锁具有局限性
如何实现可重入?
添加一个当前获取锁的次数字段 —— hash结构存储
![]()
同一个线程内的业务方法在获取锁的时候,发现如果锁存在了就将锁的次数+1,锁的次数就代表了当前获取锁的方法个数。在进行删除锁时,需要先检查锁的重入次数-1 后是否为 0 ,是才能释放锁,否则只做-1操作。可重入锁的设计流程
可重入锁的脚本实现
--- --- Generated by EmmyLua(https://github.com/EmmyLua) --- Created by good boy. --- DateTime: 2024/9/15 16:30 --- ------------------------获取可重入锁的Lua脚本------------------------------------- local key = KEYS[1] -- 获取锁的key local threadId = ARGV[1] -- 获取线程唯一标识 local releaseTime = ARGV[2] -- 获取锁的自动释放时间 -- 判断锁是否存在 -- 不存在的逻辑 if(redis.call('exists',key) == 0 ) then -- 不存在则,获取锁,标记获取锁的次数 redis.call('hset', key, threadId, '1'); -- 设置过期时间 redis.call('expire', key, releaseTime); -- 返回获取锁成功 return 1; end; -- 锁已经存在,判断线程标识是不是自己的 -- 是自己的逻辑 if(redis.call('hexists', key, threadId) == 1) then -- 是自己的,则获取锁的次数+1 redis.call('hincrby', key, threadId, 1); -- 重置有效期 redis.call('expire', key, releaseTime); -- 返回获取锁成功 return 1; end; -- 走到这里说明获取到的锁并不是自己的 return 0; ----------------释放可重入锁的Lua脚本----------------------------- local key = KEYS[1] --获取锁的key local threadId = ARGV[1] --获取线程唯一标识 local releaseTime = ARGV[2] --获取锁的自动释放时间 -- 判断锁是不是自己的 -- 锁不是自己的逻辑 if(redis.call('hexists', key, threadId) == 0 ) then return nil; -- 锁不是自己的,直接返回nil,表示自己的锁已经被释放了 end; -- 锁是自己的逻辑 -- 将锁的计数器-1 local count = redis.call('hincrby', key, threadId, -1); -- 判断锁的计数器是否为0 if(count > 0) then -- 大于0,说明还有别的方法获取了锁,不用释放锁,只需要重置过期时间,让其他业务有足够的时间执行 redis.call('expire', key, releaseTime); return nil; else -- 等于0,说明最外层业务已经完成了,可以释放锁 redis.call('del', key); return nil; end;
4.8.5 (TODO)Redisson的可重试原理
基于看门狗 + 消息订阅模型实现
(看晕了,源码留后面啃)
4.8.6(TODO) Redisson的超时释放原理
基于看门狗的定时续约机制
简单来说,在成功获取锁之后,系统会立即启动一个自动续期机制(通常通过
scheduleExpirationRenewal(threadId)
方法实现),该机制利用一个映射(map)来关联业务名称与定时任务。这个定时任务被设置为每隔一定时间(例如10秒)执行一次,其主要职责是重置锁的最大超时时间。通过递归或循环调用重置锁时间的逻辑,确保锁在业务执行期间不会因为超时而被自动释放,从而实现了锁的“永久持有”效果,直到业务逻辑执行完毕。当业务逻辑执行完成并显式释放锁时,系统会同时取消之前设置的定时任务,以避免不必要的资源消耗。这种设计确保了锁的持有与业务执行周期紧密相关,既提高了系统的灵活性,又增强了资源使用的效率。
4.8.7 Redisson的主从一致性原理
Redis主从集群模式造成一致性问题的原因
Java应用发送命令到主节点,主节点崩溃,Redis哨兵机制选取更新最近主节点的从节点,将其变成新的主节点。但是还是存在先前存储的命令失效问题
Redisson解决主从一致性问题方案——连锁方案
取消单主多从方案,采用高可用集群 + 主从分布
4.8.8 (TODO)建立Redis集群环境,测试主从一致性问题
实战篇-21.分布式锁-Redisson的multiLock原理_哔哩哔哩_bilibili
太吃电脑性能了,得晚点实现,或者等上docker
4.9 Redis优化秒杀系列功能实现
4.9.1 业务性能瓶颈分析
优化前的方案
业务耗时为Tomcat六步总和,其中查询优惠卷、查询订单、减库存、创建订单都去数据库查询,效率慢。此外,目前我们还没实现MySQL集群,就更慢了
优化后的方案
将校验秒杀资格的两个环节提前到Redis异步执行,从而减少处理的环节,提高整体效率。
4.9.2 优化流程分析
Redis部分——lua脚本实现
1. 判断秒杀库存------使用String结构存储
2. 校验一人一单------使用Set结构存储userIdJava部分
1. 判断lua脚本执行结果
2. 返回“小票”,优惠卷id、用户id、订单id
4.9.3 改进秒杀业务代码改造及详细解释
1. 将优惠卷信息存入Redis中
/** * 新增秒杀优惠券 * 多表添加 加事务 * @param voucher */ @Override @Transactional public void addSeckillVoucher(Voucher voucher) { // 保存优惠券 save(voucher); // 保存秒杀信息 SeckillVoucher seckillVoucher = new SeckillVoucher(); // 关联优惠券id seckillVoucher.setVoucherId(voucher.getId()); // 秒杀库存 seckillVoucher.setStock(voucher.getStock()); // 开始时间 seckillVoucher.setBeginTime(voucher.getBeginTime()); // 结束时间 seckillVoucher.setEndTime(voucher.getEndTime()); seckillVoucherService.save(seckillVoucher); // 保存库存到Redis stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY + voucher.getId(),voucher.getStock().toString()); }
![]()
添加优惠卷 ![]()
将优惠卷保存到Redis
2. 基于Lua脚本,判断秒杀资格
--- --- Generated by EmmyLua(https://github.com/EmmyLua) --- Created by good boy. --- DateTime: 2024/9/15 22:30 --- -----------判断秒杀资格的lua脚本--------------------------------- local voucherId = ARGV[1] -- 优惠卷id local userId = ARGV[2] -- 用户id local stockKey = 'seckill:stock:' .. voucherId -- 库存key local orderKey = 'seckill:order:' .. voucherId -- 订单key -- 脚本开始 -- 1. 判断库存是否充足 if( tonumber(redis.call('get', stockKey)) <=0) then -- 库存不足返回1 return 1 end -- 2. 判断用户是否下单 if( redis.call('sismember', orderKey, userId) == 1 ) then -- 用户已经下单返回2 return 2 end -- 3. 扣减库存 incrby -1 redis.call('incrby', stockKey, -1) -- 4. 下单(保存用户) redis.call('sadd', orderKey, userId) -- 5. 返回0表示下单成功 return 0
/** * 秒杀优惠券下单------秒杀优化代码----lua脚本 * @param voucherId * @return */ @Override public Result seckillVoucher(Long voucherId) { // 获取用户 Long userId = UserHolder.getUser().getId(); //1.执行Lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(),userId.toString() ); //2.判断结果是否为0 int r = result.intValue(); if(r != 0){ //3.不为0,代表没有购买资格 return Result.fail(r == 1 ? "库存不足!" : "不能重复下单!"); } long orderId = redisIdWorker.nextId("order"); //TODO 4.为0,代表有购买资格,将下单信息保存至阻塞队列 //5.返回订单id return Result.ok(orderId); }
![]()
测试基本功能:第一次请求成功 ![]()
第二次请求失败
3. 抢购成功,封装信息到阻塞队列中,实现异步下单
调用创建订单逻辑,首先会进入到seckillVoucher秒杀优惠卷,进入seckillVouvher方法后,首先调用Lua脚本去判断该用户请求是否具备秒杀资格,如果具备秒杀资格,则创建订单信息,并将订单信息保存到阻塞队列中去.
/** * 预加载lua脚本 */ private static DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/Seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); }
/** * 秒杀优惠券下单------秒杀优化代码----lua脚本---主线程 * @param voucherId * @return */ private IVoucherOrderService proxy; @Override public Result seckillVoucher(Long voucherId) { // 获取用户 Long userId = UserHolder.getUser().getId(); //1.执行Lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(),userId.toString() ); //2.判断结果是否为0 int r = result.intValue(); if(r != 0){ //3.不为0,代表没有购买资格 return Result.fail(r == 1 ? "库存不足!" : "不能重复下单!"); } long orderId = redisIdWorker.nextId("order"); //4.为0,代表有购买资格,将下单信息保存至阻塞队列 VoucherOrder voucherOrder = new VoucherOrder(); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); // 放入阻塞队列 orderTasks.add(voucherOrder); //提前 获取代理对象 proxy = (IVoucherOrderService) AopContext.currentProxy(); //5.返回订单id return Result.ok(orderId); }
注意标号顺序,我们提前定义了阻塞队列和线程池,并将线程任务放到初始化方法中,只要程序一启动,就会执行init()中的方法。
当seckillVoucher方法执行成功将订单信息存储到阻塞队列中后。只要阻塞队列有值,我们就会通过线程任务方法VoucherOrderHandler()方法取出队首元素,并且调用handleVocherOrder()方法创建订单。
进入到handleVocherOrder()方法,我们再次判断是否正常,判断通过后调用创建订单方法createVoucherOrder()
由于该方法添加了事务,在子线程中事务及其代理对象都失效了,为了能继续使用代理对象进行调用,我们在主方法seckillVoucher()中提前声明并赋值了代理对象,这样在子线程任务中就可以获取到代理对象执行创建订单的方法了。
进入到最后的创建订单方法createVoucherOrder() ,首先对一人一单、库存超卖问题再次进行判断,均通过以后创建订单。
通过上述步骤实现了异步创建订单,提高了并发效率//1. 创建-- 阻塞队列 private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024); //2,创建-- 秒杀线程池 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); //5. 初始化方法 一初始化就执行 @PostConstruct public void init(){ SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } //4. 创建-- 订单外部类 private void handleVocherOrder(VoucherOrder voucherOrder){ // 获取用户 Long userId = voucherOrder.getUserId(); // 1. 创建锁对象 RLock lock = redissonClient.getLock("lock:order:" + userId); //2. 尝试获取锁 boolean isLock = lock.tryLock(); // 3. 判断锁是否获取成功 if(! isLock){ log.error("不允许重复下单"); } try { proxy.createVoucherOrder(voucherOrder); } finally { // 4. 释放锁 lock.unlock(); } } //3. 创建-- 秒杀线程任务 private class VoucherOrderHandler implements Runnable{ @Override public void run() { while (true) { try { //1. 获取队列中的订单信息 VoucherOrder voucherOrder = orderTasks.take(); //2. 创建订单 handleVocherOrder(voucherOrder); } catch (Exception e) { log.error("获取订单异常",e); } } } }
/** * 秒杀优惠券下单------秒杀优化代码----创建订单 * @param voucherOrder */ @Transactional public void createVoucherOrder(VoucherOrder voucherOrder) { //4. 限制一人一单【悲观锁方案】 Long userId = voucherOrder.getUserId(); //4.1 查询订单 int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count(); //4.2 判断订单是否存在 // 是 -----> 返回异常信息---->结束 if (count > 0) { log.error("用户已经购买了一次了"); } //5. 扣减库存——解决超卖问题【乐观锁方案】 boolean success = seckillVoucherService.update() .setSql("stock = stock-1") .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // 库存大于0就行了 .update(); if (!success) { log.error("库存不足"); } //6. 创建订单 save(voucherOrder); }
4.9.4 Redis优化秒杀功能测试
1. 简单测试功能是否实现,使用postman发送请求
【错误提醒】千万不要把Redis中的 seckill:stock:11 删掉啊!这个优惠卷信息是我们创建优惠卷时顺便存进去的,你要是将它清除了,后面的测试就进行不了了,lua脚本会报nil错误的(俺排了大半个小时)
![]()
第一次请求成功 ![]()
第二次请求失败
2. 利用jmeter测试高并发情况
这里我生成了1000个token的文件,实际上是10个token重复了100次,用于模拟1000个请求(10个用户),同时请求jmeter的情况。结果和视频中做出来的不太一样,但是因为没有1000不同用户,所有我也没办法求证我的程序性能到底得到优化没有
![]()
准备10个用户token 优惠卷改成了5张
1000个token
![]()
准备1000个token的文件 ![]()
配置基本路径信息 ![]()
配置请求头读取 ![]()
配置读取文件 开启测试
测试1s内发送1000个请求 10个不同用户token,抢购5张券
![]()
QPS:379,平均耗时:1076 ![]()
没有出现超卖现象 测试 1s内发送100个请求10个用户 抢5张券
![]()
吞吐:595,平均耗时:39
测试5秒内 发送10000个请求 10个不同用户token 抢5张券
![]()
吞吐:621,平均耗时:1934 测试1秒内 发送10000个请求 10个不同用户token 抢5张券【有点不行】
![]()
吞吐:934,平均耗时:4308
4.9.5 总结
简单来说,就是Java阻塞队列还不够好
4.10(知识点) 学习Redis消息队列实现异步秒杀
消息队列(Message Queue):存放消息的队列。最简单的消息队列模型包括3个角色:
消息队列:存储和管理消息,也被称为消息代理(Message Broker)
生产者:发送消息到消息队列
消费者:从消息队列获取消息并处理消息Redis提供了三种不同的方式来实现消息队列:
list结构:基于List结构模拟消息队列
PubSub:基本的点对点消息模型
Stream:比较完善的消息队列模型基础的消息队列模型
4.10.1 基于List类型模拟的消息队列原理及其使用
在Day2中我就总结过Redis中各个数据结构使用的业务场景,其中基于双向链表实现的list结构则可以用于模拟消息队列的功能(Redis学习Day2——Redis基础使用-CSDN博客)
实现原理
- 使用 LPUSH + RPOP 或 RPUSH + LPOP 实现消息队列
- 使用 BLPOP 或 BRPOP 实现带有阻塞效果的消息队列
实现效果
基于List结构实现的消息队列比较简单,只支持单消费者的模式
![]()
单消费者 优点
- 相比于java实现的阻塞队列,不会受限于JVM的存储上限,没有未知的内存溢出风险
- 属于Redis的基础数据结构,可以实现持久化存储,数据安全性也有所保障
- List队列,先进先出,消息处理有序
缺点
- 消息丢失无法避免
- 只支持单消费者模式
缺点说明
假设从Redis中取出一条消息,但是还没来得及处理就挂掉了,结果等到恢复正常后,刚从队列中取出来的消息已经不可复现了,因此存在消息丢失的风险。
由于消息一旦被某个消费者拿走了,就会从消息队列中移除,因此该模式仅仅支持单消费者
常用命令
- 阻塞监听
- 存放消息
4.10.2 基于PubSub的消息队列原理及其使用
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
实现原理
- 使用 SUBSCRIBE channel [channel] :订阅一个或多个频道
- 使用 PUBLISH channel msg :向一个频道发送消息
- 使用 PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
实现效果
发布者 + 订阅者模式,也就是说,可以实现多消费者的消息队列模型
![]()
多消费者 优点
- 采用发布订阅模型,支持多生产者、多消费者的模型
缺点
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出是数据丢失
缺点说明
PubSub本身设计出来是用于消息发送的,不具备存储功能,因而没有持久化策略。当某个频道没有任何人订阅,在该频道发送的数据直接丢失了。此外PubSub发送的信息过多未处理完容易造成堆积丢失。因此PubSub不适用于可靠性要求高的场景。
常见命令使用
- 订阅单个频道【自带阻塞】
- 订阅多个频道
- 发布消息
4.10.3(三者之最) 基于Stream的消息队列原理及其使用
Stream是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
主要特征
数据结构:Redis Stream是一个由有序消息组成的日志数据结构,每个消息都有一个全局唯一的ID,确保消息的顺序性和可追踪性。
消息ID:消息的ID由两部分组成,分别是毫秒级时间戳和序列号。这种设计确保了消息ID的单调递增有序性。
消费者组:Redis Stream新增消费者组的概念,允许多个消费者以组的形式订阅Stream,并且每个消息只会被组内的一个消费者处理,避免了消息的重复消费。
优点
- 消息可回溯,可被多个消费者读取,可阻塞读取
- 消息读完了不消失,会永久的保持在队列当中。
缺点
- 任然有消息漏读的风险,在处理消息的过程中,如果同时来了多条消息,最后可能只能读到最后一条新消息,从而造成了消息漏读
常见命令使用
- 发送消息 —— XADD
- 读取消息—— XREAD
![]()
- 读取第一条消息,且可重复读
- 读取最新消息
- 读取并阻塞等待最新消息
- BUG——漏读消息
基于Stream的消息队列——消费者组
消费者组(Consumer Group):
将多个消费者划分到一个组中,监听同一个队列。具备下列的特点:
消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
消息标识:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费
消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。【解决消息丢失问题】
消费者组常见命令
- 创建消费者组——XGROUP CREATE
- 删除消费者组——XGROUP DESTORY
- 添加消费者到消费组——XGROUP CREATECONSUMER
- 将消费者移除消费组——XGROUP DELCONSUMER
- 从消费者组读取信息——XREADGROUP
4.10.4 基于Redis的Stream结果作为消息队列,优化异步秒杀下单功能
【步骤说明】
①创建一个Stream类型的消息队列,名为stream.orders
②修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
③项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
4.10.4.1 创建Stream消息队列
1. 登录redis
2. 创建队列和消费者组
4.10.4.2 修改秒杀资格判断的Lua脚本
--- --- Generated by EmmyLua(https://github.com/EmmyLua) --- Created by good boy. --- DateTime: 2024/9/16 20:32 --- -----------判断秒杀资格的lua脚本--------------------------------- local voucherId = ARGV[1] -- 优惠卷id local userId = ARGV[2] -- 用户id local orderId = ARGV[3] -- 订单id local stockKey = 'seckill:stock:' .. voucherId -- 库存key local orderKey = 'seckill:order:' .. voucherId -- 订单key -- 脚本开始 -- 1. 判断库存是否充足 if( tonumber(redis.call('get', stockKey)) <=0 ) then -- 库存不足返回1 return 1 end -- 2. 判断用户是否下单 if( redis.call('sismember', orderKey, userId) == 1 ) then -- 用户已经下单返回2 return 2 end -- 3. 扣减库存 incrby -1 redis.call('incrby', stockKey, -1) -- 4. 下单(保存用户) redis.call('sadd', orderKey, userId) -- 5. 发送消息到队列中 XADD [队列名]stream.orders * k1 v1 k2 v2 ... redis.call('XADD','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId) -- 6. 返回0 return 0
4.10.4.3 完善秒杀下单代码及其代码说明
1. 首先是修改预加载Lua脚本信息
/** 方案二、三公共代码 * 预加载lua脚本 */ private static DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript<>(); // 这是第二种方案需要执行的lua脚本 // SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/seckill.lua")); // 这是第三种方案需要执行的lua脚本 SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/streamSeckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); }
2. 主程序改变逻辑,将消息队列逻辑放入到了lua脚本
/** * 秒杀优惠券下单------秒杀优化代码----lua脚本---主线程---使用Redis stream的消息队列完成的 */ private IVoucherOrderService proxy; @Override public Result seckillVoucher(Long voucherId) { // 获取用户 Long userId = UserHolder.getUser().getId(); // 获取订单id long orderId = redisIdWorker.nextId("order"); //1.执行Lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(),userId.toString(),String.valueOf(orderId) ); //2.判断结果是否为0 int r = result.intValue(); if(r != 0){ //3.不为0,代表没有购买资格 return Result.fail(r == 1 ? "库存不足!" : "不能重复下单!"); } //5.返回订单id return Result.ok(orderId); }
3. 重头戏,开启线程任务!!
【注意特意标明】【黑马点评】已解决java.lang.NullPointerException异常-CSDN博客
项目启动时,init()方法执行,开启线程任务VoucherOrderHandler()方法。
进入到VoucherOrderHandler()方法,我们不断循环尝试去去消息队列中的信息,
获取成功: 执行handleVocherOrder()方法创建订单;
获取失败: 说明没有消息 ---->继续循环
出现异常: 执行handlePendingList()方法处理PendingList的异常
handleVocherOrder()方法和之前java阻塞队列方案写法一致,不过多赘述。
进入到handlePendingList()方法后,一样循环获取PendingList中的消息
获取成功: 那就反过来调用handleVocherOrder()方法,执行订单创建
获取失败: 说明Pending List没有消息 ---->结束循环
出现异常: 休眠一段时间后自动回到VoucherOrderHandler()方法中的下一次循环
// 1,创建-- 秒杀线程池 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); //2. 初始化方法 一初始化就执行 @PostConstruct public void init(){ SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } //3. 创建线程任务用于接收消息队列的信息 private class VoucherOrderHandler implements Runnable{ // 消息队列名称 private String queueName = "stream.orders"; @Override public void run() { while (true) { try{ //1. 获取队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.oredes > // 指定队列名称,组名称,消费者名称,读取模式,读取数量,阻塞时间,队列名称,读取位置 List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create(queueName, ReadOffset.lastConsumed()) ); //2. 判断消息获取是否成功 if( list == null || list.isEmpty()){ //2.1 获取失败 说明没有消息 ---->继续循环 continue; } // 解析消息中的订单信息 MapRecord<String,Object,Object> record = list.get(0); // 获取键值对集合 Map<Object,Object> values = record.getValue(); // 获取订单信息 VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true); //3. 获取成功,执行订单创建 handleVocherOrder(voucherOrder); //4. ACK确认 SACK stream.orders g1 id stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId()); }catch (Exception e) { // 消息没有被ACK确认 进入Pending List log.error("订单处理出现异常",e); handlePendingList(); } } } // 5.取不到订单————— 处理Pending List中的订单信息 private void handlePendingList(){ while (true) { try { //1. 获取Pending List中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.oredes 0 // 指定队列名称,组名称,消费者名称,读取模式,读取数量,阻塞时间,队列名称,读取位置 List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1), StreamOffset.create(queueName, ReadOffset.from("0")) ); //2. 判断消息获取是否成功 if (list == null || list.isEmpty()) { //2.1 获取失败 说明Pending List没有消息 ---->结束循环 break; } // 解析消息中的订单信息 MapRecord<String, Object, Object> record = list.get(0); // 获取键值对集合 Map<Object, Object> values = record.getValue(); // 获取订单信息 VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true); //3. 获取成功,执行订单创建 handleVocherOrder(voucherOrder); //4. ACK确认 SACK stream.orders g1 id stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId()); } catch (Exception e) { log.error("Pending List订单处理出现异常", e); try { Thread.sleep(20); }catch (InterruptedException interruptedException){ interruptedException.printStackTrace(); } } } } } // 4. 取到了订单—————创建订单 private void handleVocherOrder(VoucherOrder voucherOrder){ // 获取用户 Long userId = voucherOrder.getUserId(); // 1. 创建锁对象 RLock lock = redissonClient.getLock("lock:order:" + userId); //2. 尝试获取锁 boolean isLock = lock.tryLock(); // 3. 判断锁是否获取成功 if(! isLock){ log.error("不允许重复下单"); } try { proxy.createVoucherOrder(voucherOrder); } finally { // 4. 释放锁 lock.unlock(); } }
4.10.4.4 秒杀下单功能测试
1. 简单测试功能是否实现,使用postman发送请求
2. 利用jmeter测试高并发情况
老规矩10个token
这次我们先测试一下 10个线程请求 10个不同用户 抢5张卷的情况
测试一下 1000个线程请求 10个不同用户 抢5张卷的情况
测试一下 10000个线程请求 10个不同用户 抢5张卷的情况
![]()
吞吐669
4.11 总结情况
思路好像跟着来的,但是耗时和吞吐量始终是上不去,我上网看了看大家做的情况,好像都没有黑马一千多甚至两千的吞吐量。我感觉其实stream实现的消息队列相比java实现的阻塞队列性能上其实也没有优化到哪去,只是更加灵活可靠了吧。
五、达人探店系列功能实现
5.1 发布和查看探店笔记
5.1.1 发布探店笔记
这块代码黑马已经完成了,在发布探店笔记界面,有两块内容是需要上传的。一是笔记内容,二是笔记配图。其中笔记配图部分黑马使用的是上传到本地前端服务器上面的。我我觉得可以将图片文件发布在阿里云的OSS存储也行,该功能等后续会完成。等黑马点评后端部分完成后、再去分析黑马写的前端代码。然后再将该功能实现。
文件上传功能实现
在系统常量中修改文件上传的地址(改成你自己的)
// 图片上传路径 public static final String IMAGE_UPLOAD_DIR = "D:\\software\\hm-dianping-nginx\\nginx-1.18.0\\html\\hmdp\\imgs";
@Slf4j
@RestController
@RequestMapping("upload")
public class UploadController {
@PostMapping("blog")
public Result uploadImage(@RequestParam("file") MultipartFile image) {
try {
// 获取原始文件名称
String originalFilename = image.getOriginalFilename();
// 生成新文件名
String fileName = createNewFileName(originalFilename);
// 保存文件
image.transferTo(new File(SystemConstants.IMAGE_UPLOAD_DIR, fileName));
// 返回结果
log.debug("文件上传成功,{}", fileName);
return Result.ok(fileName);
} catch (IOException e) {
throw new RuntimeException("文件上传失败", e);
}
}
private String createNewFileName(String originalFilename) {
// 获取后缀
String suffix = StrUtil.subAfter(originalFilename, ".", true);
// 生成目录
String name = UUID.randomUUID().toString();
int hash = name.hashCode();
int d1 = hash & 0xF;
int d2 = (hash >> 4) & 0xF;
// 判断目录是否存在
File dir = new File(SystemConstants.IMAGE_UPLOAD_DIR, StrUtil.format("/blogs/{}/{}", d1, d2));
if (!dir.exists()) {
dir.mkdirs();
}
// 生成文件名
return StrUtil.format("/blogs/{}/{}/{}.{}", d1, d2, name, suffix);
}
笔记发布功能实现
@PostMapping
public Result saveBlog(@RequestBody Blog blog) {
// 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 保存探店博文
blogService.save(blog);
// 返回id
return Result.ok(blog.getId());
}
发布功能测试
5.1.2 查看探店笔记
添加TableField注解表示这三个字段不是数据库中的字段。我们查看探店笔记时,需要将创作者的信息也显示出来,但是又不能显示过多暴露,只需要有头像、姓名之类的就好了。
/** * 用户图标 */ @TableField(exist = false) private String icon; /** * 用户姓名 */ @TableField(exist = false) private String name;
查看探店笔记代码实现
/**
* 根据id查询探店笔记
* @param id
* @return
*/
@GetMapping("/{id}")
public Result queryBlogById(@PathVariable("id") Long id) {
return blogService.queryBlogById(id);
}
/**
* 根据id查博客笔记
* @param id
* @return
*/
@Override
public Result queryBlogById(Long id) {
//1. 查询blog
Blog blog = getById(id);
if(blog==null){
return Result.fail("博客不存在");
}
//2.查用户
queryBlogUser(blog);
return Result.ok(blog);
}
/**
* 复用方法封装
* @param blog
*/
private void queryBlogUser(Blog blog){
Long userId = blog.getUserId();
User user = userService.getById(userId);
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
}
查看功能测试
5.2 点赞功能实现
5.2.1 当前点赞功能存在的问题
不校验点赞用户信息,一个人可以无限刷赞
/**
* 修改点赞数量
* @param id
* @return
*/
@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {
// 修改点赞数量
// update tb_blog set liked liked + 1 where id = #{id}
blogService.update()
.setSql("liked = liked + 1").eq("id", id).update();
return Result.ok();
}
5.2.2 完善点赞功能需求与实现步骤
需求:
1. 同一用户只能点赞一次,再次点击即取消点赞
2. 如果当前用户已经点赞,则点赞按钮高亮显示(isLike 告知前端即可)
步骤:
1. 给Blog类添加一个isList字段,标记当前用户是否点赞
2. 修改点赞功能,利用Redis的set集合特性存储当前笔记的点赞用户id,用于判断该用户是否给笔记点过赞,为点过则赞 +1 ,否则赞-1
3. 在根据id查询Blog业务时,就判断当前登录用户有无点赞记录,赋值给isList字段.
4. 在分页查询Blog业务时,也去判断并赋值
5.2.3 点赞功能实现
添加一个isList字段
/**
* 是否点赞过了
*/
@TableField(exist = false)
private Boolean isLike;
修改点赞功能
/**
* 点赞
* @param id
* @return
*/
@Override
public Result likeBlog(Long id) {
//1.判断当前用户有没点赞
Long userId = UserHolder.getUser().getId();
String key = RedisConstants.BLOG_LIKED_KEY + id;
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key,userId.toString());
if(BooleanUtil.isFalse(isMember)){
//2.更新数据库信息 点赞+1
boolean isSuccess = update().setSql("liked = liked + 1").eq("id",id).update();
//3. 保存用户到Redis的set集合
if(isSuccess){
stringRedisTemplate.opsForSet().add(key,userId.toString());
}
}else{
//4. 点赞-1
boolean isSuccess = update().setSql("liked = liked - 1").eq("id",id).update();
//5。 移除Redis
if(isSuccess){
stringRedisTemplate.opsForSet().remove(key,userId.toString());
}
}
return Result.ok();
}
添加判断功能
/**
* 查询点赞状态
* @param blog
*/
private void isBlogLiked(Blog blog) {
Long userId = UserHolder.getUser().getId();
String key = RedisConstants.BLOG_LIKED_KEY + blog.getId();
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key,userId.toString());
blog.setIsLike(BooleanUtil.isTrue(isMember));
}
/**
* 查多个
* @param current
* @return
*/
@Override
public Result queryHotBlog(Integer current) {
// 根据用户查询
Page<Blog> page = query()
.orderByDesc("liked")
.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
// 查询用户
records.forEach(blog -> {
this.queryBlogUser(blog);
// 查询点赞状态
this.isBlogLiked(blog);
});
return Result.ok(records);
}
/**
* 根据id查博客笔记
* @param id
* @return
*/
@Override
public Result queryBlogById(Long id) {
//1. 查询blog
Blog blog = getById(id);
if(blog==null){
return Result.fail("博客不存在");
}
//2.查用户
queryBlogUser(blog);
//3. 查询点赞状态
isBlogLiked(blog);
return Result.ok(blog);
}
5.2.4 点赞功能测试
5.3 点赞排行榜功能实现
5.3.1 数据结构选型说明
需求是能排序且去重的排行榜功能,因此我们选用zset集合来实现这些需求
5.3.2 排行功能实现
修改点赞及点赞状态逻辑
/**
* 查询点赞状态
* @param blog
*/
private void isBlogLiked(Blog blog) {
Long userId = UserHolder.getUser().getId();
String key = RedisConstants.BLOG_LIKED_KEY + blog.getId();
Double score = stringRedisTemplate.opsForZSet().score(key,userId.toString());
blog.setIsLike(score != null);
}
/**
* 点赞
* @param id
* @return
*/
@Override
public Result likeBlog(Long id) {
//1.判断当前用户有没点赞
Long userId = UserHolder.getUser().getId();
String key = RedisConstants.BLOG_LIKED_KEY + id;
Double score = stringRedisTemplate.opsForZSet().score(key,userId.toString());
if(score == null){
//2.更新数据库信息 点赞+1
boolean isSuccess = update().setSql("liked = liked + 1").eq("id",id).update();
//3. 保存用户到Redis的set集合
if(isSuccess){
stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
}
}else{
//4. 点赞-1
boolean isSuccess = update().setSql("liked = liked - 1").eq("id",id).update();
//5。 移除Redis
if(isSuccess){
stringRedisTemplate.opsForZSet().remove(key,userId.toString());
}
}
return Result.ok();
}
实现排行榜功能
ZRANGE 查询范围内的元素
/**
* 查询点赞排行榜
* @param id
* @return
*/
@Override
public Result queryBlogLikes(Long id) {
// 使用zrange查询TOP5
Set<String> top5StrIds = stringRedisTemplate.opsForZSet().range(RedisConstants.BLOG_LIKED_KEY + id, 0, 4);
if(top5StrIds==null || top5StrIds.isEmpty()){
return Result.ok(Collections.emptyList());
}
List<Long> ids = top5StrIds.stream().map(Long::valueOf).collect(Collectors.toList());
// 根据用户id查询数据库
List<User> users = userService.listByIds(ids);
// DTO
List<UserDTO> userDTOList = users
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
return Result.ok(userDTOList);
}
5.3.3 排行功能测试
优化
发现查询点赞排行榜的顺序不正确——原因:数据库in使用时,无法保障顺序
添加ORDER BY FIELD 自定义顺序
// 根据用户id查询数据库 // 自定义sql查询 List<User> users = userService.query().in("id", ids).last("ORDER BY FIELD(id," + StrUtil.join(",", ids) + ")").list();
5.4 查看用户笔记列表功能实现
在点击用户头像进入首页时,可以查询该用户的博客列表进行展示


/**
* 根据用户id查询探店笔记列表
* @param current
* @param id
* @return
*/
@GetMapping("/of/user")
public Result queryBlogByUserid(
@RequestParam(value = "current",defaultValue = "1") Integer current,
@RequestParam("id") Long id) {
// 根据用户查询
Page<Blog> page = blogService.query()
.eq("user_id", id)
.page(new Page<>(current,SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
return Result.ok(records);
}