10 缓存+分布式锁优化根据专辑id查询专辑详情接口
10.1 缓存的选择和模型
缓存的选择:
- 本地缓存
- 特点:
- 速度快:本地缓存是存储在应用程序所在的服务器或进程内,数据读取和写入操作不需要通过网络传输,因此响应速度极快;
- 数据可能会丢失:由于本地缓存通常是基于内存的,当应用程序重启、服务器宕机或内存不足时,缓存中的数据可能会丢失;
- 选择:
- Map:Map 是一种键值对的数据结构,可以用来实现简单的本地缓存。它的优点是使用方便、轻量级,但功能相对简单,没有过期时间、淘汰策略等高级特性;
- 咖啡因(Caffeine):Caffeine 是一个高性能的 Java 缓存库,提供了丰富的功能,如自动过期、淘汰策略(如 LRU、LFU 等)、异步加载等。它在性能上表现出色,是很多 Java 应用中本地缓存的首选;
- 特点:
- 分布式缓存
- 特点:
- 速度相比本地缓存来说没有那么快:分布式缓存的数据存储在独立的缓存服务器集群中,数据的读取和写入需要通过网络传输,因此速度会比本地缓存慢一些;
- 数据不会丢失:分布式缓存通常会将数据持久化到磁盘或采用主从复制、集群等机制来保证数据的可靠性,即使部分节点出现故障,数据也不会轻易丢失;
- 选择:
- MongoDB:MongoDB 是一种文档型的 NoSQL 数据库,虽然它主要被用作数据库,但也可以通过一些配置和优化来作为分布式缓存使用。它具有灵活的数据模型、强大的查询功能和良好的扩展性;
- ES(Elasticsearch):Elasticsearch 是一个基于 Lucene 的搜索和分析引擎,它也可以被用作分布式缓存。ES 具有实时搜索、分布式存储和高可用性等特点,适用于需要复杂查询和分析的缓存场景;
- Redis:Redis 是一个开源的内存数据结构存储系统,它支持多种数据结构(如字符串、哈希、列表、集合等),并提供了丰富的功能,如持久化、复制、事务、Lua 脚本等。Redis 在分布式缓存领域非常流行,具有高性能、低延迟和丰富的功能特性;
- 特点:
- 本地缓存
没有缓存:
本地缓存:
分布式缓存:
10.2 仅使用异步和使用本地缓存测试专辑详情的接口响应时间
修改:将异步查询数据库,根据专辑id查询专辑详情抽取成
getAlbumInfoFromDb()
方法;/** * 从数据库根据专辑id查询专辑详情 * @param albumId * @return */ @NotNull private Map<String, Object> getAlbumInfoFromDb(Long albumId) { // 创建Map对象 Map<String, Object> map = new HashMap<>(); CompletableFuture<Void> albumStatCompletableFuture = CompletableFuture.runAsync(new Runnable() { @Override public void run() { System.out.println("查询专辑的统计信息使用的线程:" + Thread.currentThread().getName()); // 专辑的统计信息 Result<AlbumStatVo> albumStatResult = albumInfoFeignClient.getAlbumStat(albumId); AlbumStatVo albumStatVoData = albumStatResult.getData(); if (albumStatVoData == null) { throw new ShisanException(201, "远程查询专辑微服务,获取专辑统计信息失败"); } map.put("albumStatVo", albumStatVoData); } }); CompletableFuture<Void> viewCompletableFuture = CompletableFuture.runAsync(new Runnable() { @Override public void run() { System.out.println("查询专辑的分类使用的线程:" + Thread.currentThread().getName()); // 专辑的分类(分类的名字) Result<BaseCategoryView> albumCategoryResult = albumInfoFeignClient.getAlbumCategory(albumId); BaseCategoryView baseCategoryViewData = albumCategoryResult.getData(); if (baseCategoryViewData == null) { throw new ShisanException(201, "远程查询专辑微服务,获取专辑分类信息失败"); } map.put("baseCategoryView", baseCategoryViewData); } }); CompletableFuture<Long> albumInfoCompletableFuture = CompletableFuture.supplyAsync(new Supplier<Long>() { @Override public Long get() { System.out.println("查询专辑的基本数据使用的线程:" + Thread.currentThread().getName()); // 专辑基本数据 Result<AlbumInfo> albumInfoAndAttrValueResult = albumInfoFeignClient.getAlbumInfoAndAttrValue(albumId); AlbumInfo albumInfoData = albumInfoAndAttrValueResult.getData(); if (albumInfoData == null) { throw new ShisanException(201, "远程查询专辑微服务,获取专辑基本信息失败"); } map.put("albumInfo", albumInfoData); return albumInfoData.getUserId(); } }, threadPoolExecutor); // CompletableFuture<Void> userInfoCompletableFuture = CompletableFuture.runAsync(new Runnable() { // @SneakyThrows // @Override // public void run() { // // 专辑的主播信息 // Long userId = albumInfoCompletableFuture.get(); // Result<UserInfoVo> userInfoResult = userInfoFeignClient.getUserInfo(userId); // UserInfoVo userInfoData = userInfoResult.getData(); // if (userInfoData == null) { // throw new ShisanException(201, "远程查询用户微服务,获取专辑对应主播信息失败"); // } // map.put("announcer", userInfoData); // } // }); // 对于上面注释的另一种写法 CompletableFuture<Void> userInfoCompletableFuture = albumInfoCompletableFuture.thenAcceptAsync(new Consumer<Long>() { @Override public void accept(Long userId) { System.out.println("查询专辑的主播信息使用的线程:" + Thread.currentThread().getName()); // 专辑的主播信息 Result<UserInfoVo> userInfoResult = userInfoFeignClient.getUserInfo(userId); UserInfoVo userInfoData = userInfoResult.getData(); if (userInfoData == null) { throw new ShisanException(201, "远程查询用户微服务,获取专辑对应主播信息失败"); } map.put("announcer", userInfoData); } }); // 等待所有异步线程执行完毕,Tomcat的线程才能继续执行 CompletableFuture.allOf( albumStatCompletableFuture, viewCompletableFuture, albumInfoCompletableFuture, userInfoCompletableFuture) .join(); return map; }
仅使用异步线程:
public class ItemServiceImpl implements ItemService { // ……其它逻辑 HashMap<Long, Map<String, Object>> localCache = new HashMap<>(); /** * 根据专辑id查询专辑详情(仅使用异步) * @param albumId * @return */ @Override public Map<String, Object> getAlbumInfo(Long albumId) { Long startTime = System.currentTimeMillis(); Map<String, Object> albumInfoFromDb = getAlbumInfoFromDb(albumId); Long endTime = System.currentTimeMillis(); log.info("仅使用异步,耗时:{}毫秒", endTime - startTime); return albumInfoFromDb; } }
使用本地缓存:
public class ItemServiceImpl implements ItemService { // ……其它逻辑 // 创建一个本地缓存 HashMap<Long, Map<String, Object>> localCache = new HashMap<>(); /** * 根据专辑id查询专辑详情(使用本地缓存) * @param albumId * @return */ @Override public Map<String, Object> getAlbumInfo(Long albumId) { Long startTime = System.currentTimeMillis(); // 查询本地缓存 boolean b = localCache.containsKey(albumId); // 本地缓存中存在,直接返回 if (b) { log.info("本地缓存命中,耗时:{}毫秒", System.currentTimeMillis() - startTime); return localCache.get(albumId); } // 本地缓存中不存在,查询数据库,然后保存到本地缓存中 Map<String, Object> albumInfoFromDb = getAlbumInfoFromDb(albumId); localCache.put(albumId, albumInfoFromDb); Long endTime = System.currentTimeMillis(); log.info("本地缓存未命中,耗时:{}毫秒", endTime - startTime); // 同时将查询到的数据返回给前端 return albumInfoFromDb; } }
10.3 使用分布式缓存Redis测试专辑详情接口的响应时间
使用Redis分布式缓存:
/** * 根据专辑id查询专辑详情(使用分布式缓存Redis) * @param albumId * @return */ @Override public Map<String, Object> getAlbumInfo(Long albumId) { Long startTime = System.currentTimeMillis(); // 查询Redis分布式缓存 String s = redisTemplate.opsForValue().get(albumId.toString()); // 判断分布式缓存是否命中 if (!StringUtils.isEmpty(s)) { log.info("Redis分布式缓存命中,耗时:{}毫秒", System.currentTimeMillis() - startTime); return JSONObject.parseObject(s, Map.class); } // Redis分布式缓存未命中,回源查询数据库 Map<String, Object> albumInfoFromDb = getAlbumInfoFromDb(albumId); // 将数据库的数据同步到分布式缓存Redis中 redisTemplate.opsForValue().set(String.valueOf(albumId), JSONObject.toJSONString(albumInfoFromDb)); Long endTime = System.currentTimeMillis(); log.info("Redis分布式缓存未命中,耗时:{}毫秒", endTime - startTime); return albumInfoFromDb; }
10.4 双缓存架构
使用双缓存架构:本地Map缓存+分布式缓存Redis
/** * 根据专辑id查询专辑详情(使用双缓存架构:本地Map缓存+分布式缓存Redis) * @param albumId * @return */ @Override public Map<String, Object> getAlbumInfo(Long albumId) { Map<String, Object> result = new HashMap<>(); Long startTime = System.currentTimeMillis(); // 查询一级缓存,即本地Map缓存 boolean b = localCache.containsKey(albumId); if (b) { log.info("一级缓存,即本地Map缓存命中,耗时:{}毫秒", System.currentTimeMillis() - startTime); return localCache.get(albumId); } log.info("一级缓存,即本地Map缓存未命中,查询二级缓存,即分布式缓存Redis"); // 查询二级缓存,即分布式缓存Redis String s = redisTemplate.opsForValue().get(albumId.toString()); if (!StringUtils.isEmpty(s)) { // 分布式缓存命中 // 将Redis的数据反序列化成为Map对象 result = JSONObject.parseObject(s, Map.class); // 将数据同步到一级缓存 localCache.put(albumId, result); log.info("二级缓存,即分布式缓存Redis命中,耗时:{}毫秒", System.currentTimeMillis() - startTime); return result; } else { // 分布式缓存未命中 result = getAlbumInfoFromDb(albumId); // 将数据库查询的数据同步到二级缓存Redis和一级缓存本地缓存Map redisTemplate.opsForValue().set(String.valueOf(albumId), JSONObject.toJSONString(result)); localCache.put(albumId, result); log.info("二级缓存,即分布式缓存Redis未命中,耗时:{}毫秒", System.currentTimeMillis() - startTime); return result; } }
- 此时因为Redis中已经有了专辑详情的数据,所以查询二级缓存,即分布式缓存Reids直接命中。
10.5 高并发下缓存的四大问题
10.5.1 缓存击穿
定义
- 缓存击穿指某个热点 key 在缓存中过期的瞬间,大量并发请求直接穿透到数据库,导致数据库短时间承受巨大压力;
- 例如:某商品秒杀活动中,商品 ID 对应的缓存突然过期,此时数万用户同时请求该商品,所有请求都会直接访问数据库;
核心原因
- 热点 key 缓存过期(如设置了较短的 TTL);
- 高并发集中访问该 key,且缓存未及时重建;
解决方案
- 热点 key 永不过期:对核心热点数据(如秒杀商品)不设置 TTL,避免过期问题。需注意:需通过主动更新(如定时任务)保证数据新鲜度;
- 互斥锁:当缓存失效时,只允许一个请求获取锁并查询数据库,其他请求等待锁释放后从缓存获取数据;
- 示例:使用 Redis 的
SET NX
命令实现分布式锁,第一个请求拿到锁后更新缓存,其他请求自旋等待;
- 示例:使用 Redis 的
- 提前预热 + 延迟过期
- 预热:活动前将热点数据主动加载到缓存;
- 延迟过期:在缓存过期前(如过期前 5 分钟),通过后台线程主动更新缓存,避免过期瞬间的请求穿透。
10.5.2 缓存穿透
定义:
- 缓存穿透指请求的 key 在缓存和数据库中都不存在,导致所有请求直接穿透到数据库,若恶意高频请求此类 key(如伪造 ID),可能压垮数据库;
- 例如:黑客用不存在的用户 ID 高频请求用户信息,缓存和数据库均无数据,所有请求都会访问数据库;
核心原因
- 请求不存在的数据(可能是业务正常场景,也可能是恶意攻击);
- 缓存无法“拦截”无效请求(因缓存中无对应 key);
解决方案
- 缓存空值(Null Value):对查询结果为 null 的请求,在缓存中存储空值(如
key:null
),并设置较短 TTL(如 5 分钟);- 注意:需避免空值缓存占用过多空间,可限制空值 key 的数量或设置较短过期时间;
- 布隆过滤器(Bloom Filter):布隆过滤器是一种高效的概率性数据结构,可提前判断 key 是否存在于数据库中;
- 原理:将数据库中所有有效 key 提前存入布隆过滤器,请求先经过过滤器:
- 若过滤器判断“不存在”,直接返回空(无需访问缓存/数据库);
- 若判断“可能存在”,再走正常缓存 + 数据库流程(存在极小误判率,可忽略)。
- 适用场景:数据总量固定(如用户 ID、商品 ID),且需拦截大量无效请求;
- 原理:将数据库中所有有效 key 提前存入布隆过滤器,请求先经过过滤器:
- 接口层校验:对输入参数进行合法性校验(如用户 ID 必须为正整数),直接拦截明显无效的请求(如负数 ID、超长字符串)。
- 缓存空值(Null Value):对查询结果为 null 的请求,在缓存中存储空值(如
10.5.3 缓存雪崩
定义:
- 缓存雪崩指缓存中大量 key 同时过期,或缓存服务整体故障(如 Redis 集群宕机),导致所有请求瞬间穿透到数据库,引发数据库雪崩;
- 例如:缓存集群因网络故障宕机,所有请求直接访问数据库,数据库连接池被占满,系统无法响应。
核心原因
大量 key 设置了相同 TTL,导致同一时间集体过期;
缓存服务不可用(如集群崩溃、网络中断);
解决方案
- 过期时间随机化:对批量 key 设置 TTL 时,在基础过期时间上增加随机值(如
TTL = 30 分钟 + 随机 0-5 分钟
),避免同时过期; - 缓存集群高可用 :
- 部署 Redis 集群(主从 + 哨兵),主节点故障时自动切换到从节点;
- 跨机房部署,避免单机房故障导致缓存不可用;
- 熔断降级(服务保护):使用 Sentinel、Hystrix 等工具,当数据库压力超过阈值时,自动拒绝部分请求(返回默认值或提示“系统繁忙”),避免数据库崩溃;
- 多级缓存:引入本地缓存(如 Caffeine)+ 分布式缓存(如 Redis)
- 本地缓存可拦截部分请求,减少分布式缓存压力;
- 即使分布式缓存故障,本地缓存仍能提供部分服务。
- 过期时间随机化:对批量 key 设置 TTL 时,在基础过期时间上增加随机值(如
10.5.4 数据一致性
定义:
- 数据一致性指缓存中的数据与数据库中的数据保持一致。高并发下,若更新数据库后未正确同步缓存,可能导致用户读取到旧数据(脏读);
- 例如:用户修改昵称后,数据库已更新,但缓存未同步,其他用户仍看到旧昵称;
核心矛盾
- 缓存更新策略与数据库更新策略的“时序”和“原子性”难以保证;
- 高并发下,更新操作可能被并发请求干扰(如 A 请求更新数据库,B 请求同时读取旧缓存);
常见方案(需根据业务场景选择)
Cache Aside Pattern(旁路缓存模式) 。这是最常用的策略,核心逻辑:
读操作:先查缓存,命中则返回;未命中则查数据库,再写入缓存;
写操作:先更新数据库,再删除缓存(而非更新缓存);
为何删除而非更新?避免“更新缓存”与“更新数据库”的并发冲突(如 A 更新数据库,B 同时更新缓存,可能导致缓存数据旧于数据库);
风险:若删除缓存失败,会导致缓存数据旧于数据库,需配合“缓存过期”兜底(设置 TTL);
Write Through(写透模式)。写操作时,先更新缓存,再更新数据库,保证两者同时成功;
- 优点:强一致性(缓存与数据库同步更新);
- 缺点:写性能低(需等待两次 IO),适合一致性要求极高的场景(如金融交易);
最终一致性方案(异步更新)。写操作时先更新数据库,再通过消息队列(如 Kafka)异步通知缓存删除/更新;
- 优点:不阻塞主流程,适合高并发场景;
- 风险:短暂不一致(数据库更新后,缓存未及时更新),需接受“最终一致”(如几秒内同步);
版本号/时间戳校验:在数据中加入版本号,更新缓存时校验版本:只有当缓存版本低于数据库版本时,才更新缓存,避免旧数据覆盖新数据。
10.5.5 总结:四大问题对比
问题 | 核心场景 | 风险点 | 核心解决方案 |
---|---|---|---|
缓存击穿 | 热点 key 过期瞬间 | 数据库瞬时压力骤增 | 分布式锁、热点 key 永不过期 |
缓存穿透 | 请求不存在的 key | 数据库被无效请求压垮 | 布隆过滤器、缓存空值 |
缓存雪崩 | 大量 key 同时过期或缓存故障 | 数据库整体崩溃 | 随机 TTL、缓存集群高可用 |
数据一致性 | 缓存与数据库数据不同步 | 脏读影响用户体验 | 旁路缓存模式、异步更新 + 过期兜底 |
- 核心原则:没有万能方案,需结合业务场景选择(如一致性优先选写透模式,性能优先选旁路模式),并通过“多层防护”(如缓存 + 限流 + 熔断)提升系统稳定性。
10.6 缓存击穿的过程&锁的选择
缓存击穿的过程:
锁的选择
10.7 本地锁与分布式锁的本质区别&加锁过程
本质区别:锁对象不一样(没有对象就没有锁);
本地锁的锁对象:永远都是来自于应用对应的 JVM 虚拟机中创建出来的对象(
new
这个动作程序员自己做);import java.util.concurrent.locks.ReentrantLock; public class ApiTest { private int i = 0; public static void main(String[] args) { ApiTest apiTest = new ApiTest(); apiTest.testApi1(); // 锁对象:apiTest apiTest.testApi2(); // 锁对象:apiTest } // 使用 ReentrantLock 实现同步 public void testApi1() { ReentrantLock reentrantLock = new ReentrantLock(); reentrantLock.lock(); try { i++; // 共享资源 } finally { reentrantLock.unlock(); } } // 使用 synchronized 修饰方法实现同步 public synchronized void testApi2() { i++; // 共享资源 } // 使用 synchronized 代码块(锁类对象)实现同步 public void testApi3() { synchronized (ApiTest.class) { // 锁对象:ApiTest.class i++; // 共享资源 } } }
对于
Synchronized
来说:加锁和解锁的本质就是修改调用同步方法的这个对象的对象头中的锁标识位。分布式锁的锁对象:永远都是来自于第三方中间件创建出来的对象(这个锁对象直接使用,不需要程序员自己
new
);
Java 中一个对象的组成:对象头、对象中的实例数据、padding(填充字段,可选,主要为了让对象的大小是8bit的整数倍);
以 32 位虚拟机为例:
加锁过程:
- 多线程到底能不能对要访问的保护资源互斥:关键要看多线程操作的锁对象是不是同一个,如果是同一个,一定互斥(即锁住了),反之一定则不互斥(即没锁住);
- 注意:锁对象不要放到某个需要访问保护资源的线程里面去创建,不然一定锁不住!因为该锁对象是该线程独享的,其它线程无法访问到这个锁对象;
- 加锁的本质:就是修改对象的对象头的锁标识位;
- 解锁的本质:也是修改对象的对象头的锁标识位;
- 多线程到底能不能对要访问的保护资源互斥:关键要看多线程操作的锁对象是不是同一个,如果是同一个,一定互斥(即锁住了),反之一定则不互斥(即没锁住);
一个微服务两个实例,如果使用本地锁:
一个微服务两个实例,如果使用分布式锁:
高性能和锁的关系:
- 本地锁的性能 > 分布式锁的性能;
- 锁的粒度越大,性能越低;锁的粒度越小,性性能越高;
- 高性能和强一致不能同时都具备,只能二选一或者平衡一下。
10.8 分布式缓存Redis+Redis版本的分布式锁,以解决缓存击穿问题V1~V3
/**
* V1:分布式缓存Redis+Redis版本的分布式锁,以解决缓存击穿问题
* @param albumId
* @return
*/
@Nullable
private Map opsDistroCacheAndLockV1(Long albumId) {
String cacheKey = RedisConstant.CACHE_INFO_PREFIX + albumId; // 缓存key
String lockKey = RedisConstant.ALBUM_LOCK_SUFFIX + albumId; // 分布式锁key
// 查询分布式缓存
String resultFromCache = redisTemplate.opsForValue().get(cacheKey);
// 判断缓存是否存在
if (!StringUtils.isEmpty(resultFromCache)) {
return JSONObject.parseObject(resultFromCache, Map.class);
}
// 缓存未命中,则回源查询数据库
// 加分布式锁
Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(lockKey, "lock");
if (aBoolean) { // 若抢得到锁(即加锁成功)
Map<String, Object> albumInfoFromDb;
// 回源查询数据库
albumInfoFromDb = getAlbumInfoFromDb(albumId);
// 将数据库查询的数据同步到Redis
redisTemplate.opsForValue().set(cacheKey, JSONObject.toJSONString(albumInfoFromDb));
// 释放分布式锁
redisTemplate.delete(lockKey);
// 返回数据给前端
return albumInfoFromDb;
} else { // 若未抢到锁(即加锁失败)
try {
Thread.sleep(200); // 休眠200毫秒,然后重新获取数据(此时上面先抢到锁的请求已经把数据放入了Redis)。这个时间要通过压测来获取
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
String resultFromCacheStr = redisTemplate.opsForValue().get(cacheKey);
return JSONObject.parseObject(resultFromCacheStr, Map.class);
}
}
/**
* V2:在极端情况下,抢到锁的线程刚要执行业务,服务器突然宕机。此时Redis中的锁没有被释放,此时会导致死锁发生
* 解决:
* 1、客户端主动删除锁:redisTemplate.delete(lockKey);
* 2、Redis服务端给锁的key设置过期时间:redisTemplate.expire(lockKey, 30, TimeUnit.SECONDS);
* 底层实现:Redis将设置key和给这个key设置过期时间的操作用一个lua脚本包装起来,然后用一个Redis连接执行这个lua脚本,从而保证这两个动作的原子性
* @param albumId
* @return
*/
@Nullable
private Map getDistroCacheAndLockV2(Long albumId) {
String cacheKey = RedisConstant.CACHE_INFO_PREFIX + albumId;
String lockKey = RedisConstant.ALBUM_LOCK_SUFFIX + albumId;
// 查询分布式缓存
String resultFromCache = redisTemplate.opsForValue().get(cacheKey);
// 判断缓存是否存在
if (!StringUtils.isEmpty(resultFromCache)) {
return JSONObject.parseObject(resultFromCache, Map.class);
}
// 缓存未命中,则回源查询数据库
// 加分布式锁。此时给锁传入一个过期时间,防止死锁发生
Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(lockKey, "lock", 30, TimeUnit.SECONDS);
if (aBoolean) { // 若抢得到锁(即加锁成功)
Map<String, Object> albumInfoFromDb;
try {
// 回源查询数据库
albumInfoFromDb = getAlbumInfoFromDb(albumId);
// 将数据库查询的数据同步给缓存Redis
redisTemplate.opsForValue().set(cacheKey, JSONObject.toJSONString(albumInfoFromDb));
} finally {
// 释放分布式锁
redisTemplate.delete(lockKey);
}
// 返回数据给前端
return albumInfoFromDb;
} else { // 若未抢到锁(即加锁失败)
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
String resultFromCacheStr = redisTemplate.opsForValue().get(cacheKey);
return JSONObject.parseObject(resultFromCacheStr, Map.class);
}
}
/**
* v3:在极端情况下,抢到锁的线程1业务的执行时间超过了锁的过期时间,此时会导致锁提前释放,从而会导致其他线程(线程2)抢到锁,然后线程1执行完业务后,释放锁,此时释放的是线程2的锁,而不是线程1的锁,这就是锁的误删问题
* 原因:锁和释放锁不是原子操作
* 解决思路:每个线程加锁的时候都给一个锁标识,然后在释放锁的时候,先判断一下这个锁是不是自己加的,如果是则删除,反之则不删除
* 解决方法:自定义一个lua脚本,该脚本来判断加锁和释放锁是否是同一个线程
* @param albumId
* @return
*/
@Nullable
private Map getDistroCacheAndLockV3(Long albumId) {
String cacheKey = RedisConstant.CACHE_INFO_PREFIX + albumId;
String lockKey = RedisConstant.ALBUM_LOCK_SUFFIX + albumId;
String token = UUID.randomUUID().toString().replace("-", ""); // 生成一个随机字符串,作为锁的标识
// 查询分布式缓存
String resultFromCache = redisTemplate.opsForValue().get(cacheKey);
// 判断缓存是否存在
if (!StringUtils.isEmpty(resultFromCache)) {
return JSONObject.parseObject(resultFromCache, Map.class);
}
// 缓存未命中,则回源查询数据库
// 加分布式锁。此时给锁传入一个过期时间,防止死锁发生
Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(lockKey, token, 30, TimeUnit.SECONDS);
if (aBoolean) { // 若抢得到锁(即加锁成功)
Map<String, Object> albumInfoFromDb;
try {
// 回源查询数据库
albumInfoFromDb = getAlbumInfoFromDb(albumId);
// 将数据库查询的数据同步给缓存Redis
redisTemplate.opsForValue().set(cacheKey, JSONObject.toJSONString(albumInfoFromDb));
} finally {
// 释放分布式锁
// 判断一下这个锁是不是自己加的,如果是则删除,反之则不删除
String lockValueFromCache = redisTemplate.opsForValue().get(lockKey); // 获取Redis中的锁
// 自定义 Lua 脚本。该脚本的意思是:如果 Redis 中键为KEYS[1]的值等于参数ARGV[1],则删除该键并返回 1,否则直接返回 0
String luaScript = "IF redis.call('get',KEYS[1])==ARGV[1] THEN return redis.call('del',KEYS[1]) ELSE return 0 END";
// 执行 Lua 脚本
// new DefaultRedisScript<Long>(luaScript, Long.class):封装了之前的 Lua 脚本,并指定返回值类型为Long(对应 Lua 脚本中的return 0或return 1)
// Arrays.asList(lockKey):对应 Lua 脚本中的KEYS[1],即要操作的 Redis 键(锁的名称)
// token对应 Lua 脚本中的ARGV[1],即锁的唯一标识
Long execute = redisTemplate.execute(new DefaultRedisScript<Long>(luaScript, Long.class), Arrays.asList(lockKey), token);
if (execute == 0) {
log.error("释放锁失败");
} else {
log.info("释放锁成功");
}
// 如果没有上面的lua脚本。有这么一种情况:
// 执行完业务,锁还未过期
// 正要释放锁的时候,锁刚好过期,此时其他线程刚好抢到了锁,释放的就是其他线程的锁,一样会出现锁的误删问题
if (token.equals(lockValueFromCache)) {
redisTemplate.delete(lockKey); // 此时其它线程还未抢到锁,直接删除
}
}
// 返回数据给前端
return albumInfoFromDb;
} else { // 若未抢到锁(即加锁失败)
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
String resultFromCacheStr = redisTemplate.opsForValue().get(cacheKey);
return JSONObject.parseObject(resultFromCacheStr, Map.class);
}
}
10.9 分布式缓存Redis+Redis版本的分布式锁,以解决缓存击穿问题Finally Version
新建:该执行器用于管理线程对Redis中的锁key续期
package com.shisan.tingshu.search.executor; import com.shisan.tingshu.common.constant.RedisConstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** * 该执行器用于管理线程对Redis中的锁key续期 */ public class ExpireThreadExecutor { Logger logger = LoggerFactory.getLogger(this.getClass()); private Long ttl; private TimeUnit timeUnit; private StringRedisTemplate redisTemplate; private Long taskId; static ScheduledExecutorService scheduledExecutorService; static ScheduledFuture<?> scheduledFuture = null; static { scheduledExecutorService = Executors.newScheduledThreadPool(2); } public ExpireThreadExecutor(StringRedisTemplate redisTemplate, Long taskId) { this.redisTemplate = redisTemplate; this.taskId = taskId; } /** * 定义一个续期方法 */ public void renewal(Long ttl, TimeUnit timeUnit) { scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { logger.info("续期线程开始续期:"); // 续期 redisTemplate.expire(RedisConstant.ALBUM_LOCK_SUFFIX + taskId, ttl, timeUnit); } }, ttl / 3, ttl / 3, TimeUnit.SECONDS); } /** * 中断续期任务方法 */ public Boolean cancelRenewal() { boolean cancel = scheduledFuture.cancel(true); // 取消给线程池的任务 logger.info("续期线程结束续期:"); return cancel; } }
Finally Version:
ThreadLocal<String> reentrantLockTokenThreadLocal = new ThreadLocal<>(); /** * Finally Version:抢到锁的线程在将从数据库中查询到的数据写入Redis时失败,此时锁不会释放,从而会导致其他线程无法获取到锁,从而无法访问数据库 * @param albumId * @return * 解决思路:只要抢到锁的线程没有把自己的活干完,这个抢到锁的线程对应的这个锁key就不能释放掉。只有等抢到锁的线程把活干完了或者干活期间出异常,才让这个锁过期 * 即:活没干完,则给锁key续期;活干完或者干活期间出现异常,不用再给锁key续期(注意:只有抢到锁,才续期,没抢到就别续) * 续期:每隔10s将Redis中的锁key设置为30s * 解决方法:启动一个线程,该线程负责完成续期任务 * 方法1:直接new一个Thread线程,让这个线程一直做续期任务,并且让这个线程作为守护线程。最后再利用Thread的中断机制,完成对续期线程的取消 * 方法2:用线程池(这个线程池有定时或者延时功能)完成续期 */ private Map getDistroCacheAndLockFinallyVersion(Long albumId) { String cacheKey = RedisConstant.CACHE_INFO_PREFIX + albumId; // 缓存key String lockKey = RedisConstant.ALBUM_LOCK_SUFFIX + albumId; // 分布式锁key String token = ""; // 锁的标识 Boolean acquireLockFlag = false; // 获取锁标志 // 查询分布式缓存 String resultFromCache = redisTemplate.opsForValue().get(cacheKey); // 判断缓存是否存在,若存在,则直接返回给前端 if (!StringUtils.isEmpty(resultFromCache)) { return JSONObject.parseObject(resultFromCache, Map.class); } // 缓存未命中,则回源查询数据库 // 从ThreadLocal中获取令牌值(解决递归的线程进来) String s = reentrantLockTokenThreadLocal.get(); // 判断是否是递归进来的线程(即已经持有锁的线程) if (!StringUtils.isEmpty(s)) { token = s; // 如果是递归线程,使用已有的token acquireLockFlag = true; // 如果是递归线程,直接获取锁成功 } else { // 如果是第一次进来的线程,生成新的token token = UUID.randomUUID().toString().replace("-", ""); // 生成一个随机字符串,作为锁的标识 // 加分布式锁 acquireLockFlag = redisTemplate.opsForValue().setIfAbsent(lockKey, token, 30, TimeUnit.SECONDS); } if (acquireLockFlag) { // 若抢得到锁(即加锁成功) // 创建并启动续期线程(每10秒将锁的过期时间重置为30秒) ExpireThreadExecutor expireThreadExecutor = new ExpireThreadExecutor(redisTemplate, albumId); expireThreadExecutor.renewal(30l, TimeUnit.SECONDS); Map<String, Object> albumInfoFromDb; try { // 回源查询数据库 albumInfoFromDb = getAlbumInfoFromDb(albumId); // 将数据库查询的数据同步到Redis缓存 redisTemplate.opsForValue().set(cacheKey, JSONObject.toJSONString(albumInfoFromDb)); } finally { // 释放分布式锁 // 判断一下这个锁是不是自己加的,如果是则删除,反之则不删除 // 自定义 Lua 脚本。该脚本的意思是:如果 Redis 中键为KEYS[1]的值等于参数ARGV[1],则删除该键并返回 1,否则直接返回 0 String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; // 执行 Lua 脚本 // new DefaultRedisScript<Long>(luaScript, Long.class):封装了之前的 Lua 脚本,并指定返回值类型为Long(对应 Lua 脚本中的return 0或return 1) // Arrays.asList(lockKey):对应 Lua 脚本中的KEYS[1],即要操作的 Redis 键(锁的名称) // token对应 Lua 脚本中的ARGV[1],即锁的唯一标识 Long execute = redisTemplate.execute(new DefaultRedisScript<Long>(luaScript, Long.class), Arrays.asList(lockKey), token); if (execute == 0) { log.error("释放锁失败"); } else { log.info("释放锁成功"); } // 从ThreadLocal移除令牌,防止内存泄漏 reentrantLockTokenThreadLocal.remove(); // 结束续期任务 expireThreadExecutor.cancelRenewal(); } // 返回数据给前端 return albumInfoFromDb; } else { // 若未抢到锁(即加锁失败) try { Thread.sleep(200); // 休眠200毫秒,然后重新获取数据(此时上面先抢到锁的请求已经把数据放入了Redis)。这个时间要通过压测来获取 } catch (InterruptedException e) { throw new RuntimeException(e); } // 查询缓存。正常99%的情况下200ms之后的缓存一定是有数据,所以直接返回给前端即可 String firstCacheStr = redisTemplate.opsForValue().get(cacheKey); if (!StringUtils.isEmpty(firstCacheStr)) { return JSONObject.parseObject(firstCacheStr, Map.class); } // 剩余1%极端情况:抢到锁的线程在将数据库中的数据同步到缓存的时候出现了问题,导致缓存没有数据 while (true) { // 再次查询缓存 String doubleCacheStr = redisTemplate.opsForValue().get(cacheKey); if (!StringUtils.isEmpty(doubleCacheStr)) { return JSONObject.parseObject(doubleCacheStr); } // 尝试获取锁 Boolean acquireLock = redisTemplate.opsForValue().setIfAbsent(lockKey, token, 30, TimeUnit.SECONDS); if (acquireLock) { // 获取锁成功,将token存入ThreadLocal(实现可重入) reentrantLockTokenThreadLocal.set(token); break; // 退出循环 } } // 递归调用自身(此时当前线程已持有锁) return getAlbumInfo(albumId); } }
- 缓存优先:先查缓存,命中则直接返回
- 锁续期机制:获取锁成功后启动续期线程,防止处理时间过长导致锁过期
- 可重入锁:通过ThreadLocal实现,防止同一线程递归时重复获取锁
- 原子释放锁:使用Lua脚本确保只有锁的持有者才能释放锁
- 自旋重试:未获取锁的线程先短暂等待,然后进入自旋(while + 可重入)重试
- 异常处理:finally块确保锁一定会被释放,续期任务一定会被取消
据专辑id查询专辑详情的接口:
/** * 根据专辑id查询专辑详情 * @param albumId * @return */ @Override public Map<String, Object> getAlbumInfo(Long albumId) { return getDistroCacheAndLockFinallyVersion(albumId); }