19.多级缓存

发布于:2025-07-21 ⋅ 阅读:(20) ⋅ 点赞:(0)

19.1 整体认识

19.1.1 背景

  • 传统的缓存策略一般是请求到达 Tomcat 后,先查询 Redis,如果未命中则查询数据库,如图:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • 问题:

    • 请求要经过 Tomcat 处理,Tomcat 的性能成为整个系统的瓶颈

    • Redis 缓存失效时,会对数据库产生冲击

19.1.2 是什么

多级缓存就是充分利用请求处理的每个环节,分别添加缓存,减轻 Tomcat 压力,提升服务性能:

  • 浏览器访问静态资源时,优先读取浏览器本地缓存

  • 访问非静态资源(ajax 查询数据)时,访问服务端

  • 请求到达 Nginx 后,优先读取 Nginx 本地缓存

  • 如果 Nginx 本地缓存未命中,则去直接查询 Redis(不经过 Tomcat)

  • 如果 Redis 查询未命中,则查询 Tomcat

  • 请求进入 Tomcat 后,优先查询 JVM 进程缓存

  • 如果 JVM 进程缓存未命中,则查询数据库

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

在多级缓存架构中,Nginx 内部需要编写本地缓存查询、Redis 查询、Tomcat 查询的业务逻辑,因此这样的 nginx 服务不再是一个反向代理服务器,而是一个编写业务的 Web 服务器了。

因此这样的业务 Nginx 服务也需要搭建集群来提高并发,再有专门的 nginx 服务来做反向代理,如图:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

另外,我们的 Tomcat 服务将来也会部署为集群模式:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

可见,多级缓存的关键有两个

  • 一个是在 nginx 中编写业务,实现 nginx 本地缓存、Redis、Tomcat 的查询

  • 另一个就是在 Tomcat 中实现 JVM 进程缓存

其中 Nginx 编程则会用到 OpenResty 框架结合 Lua 这样的语言。

19.2 JVM 缓存

19.2.1 Caffeine

缓存在日常开发中启动至关重要的作用,由于是存储在内存中,数据的读取速度是非常快的,能大量减少对数据库的访问,减少数据库的压力。我们把缓存分为两类:

  • 分布式缓存,例如 Redis:

    • 优点:存储容量更大、可靠性更好、可以在集群间共享

    • 缺点:访问缓存有网络开销

    • 场景:缓存数据量较大、可靠性要求较高、需要在集群间共享

  • 进程本地缓存,例如 HashMap、GuavaCache:

    • 优点:读取本地内存,没有网络开销,速度更快

    • 缺点:存储容量有限、可靠性较低、无法共享

    • 场景:性能要求较高,缓存数据量较小

我们今天会利用 Caffeine 框架来实现 JVM 进程缓存。

Caffeine 是一个基于 Java8 开发的,提供了近乎最佳命中率的高性能的本地缓存库。目前 Spring 内部的缓存使用的就是 Caffeine。GitHub 地址:<https://github.com/ben-manes/caffeine>

Caffeine 的性能非常好,下图是官方给出的性能对比:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • 缓存使用的基本 API:

    @Test
    void testBasicOps() {
        // 构建cache对象
        Cache<String, String> cache = Caffeine.newBuilder().build();
    
        // 存数据
        cache.put("gf", "迪丽热巴");
    
        // 取数据
        String gf = cache.getIfPresent("gf");
        System.out.println("gf = " + gf);
    
        // 取数据,包含两个参数:
        // 参数一:缓存的key
        // 参数二:Lambda表达式,表达式参数就是缓存的key,方法体是查询数据库的逻辑
        // 优先根据key查询JVM缓存,如果未命中,则执行参数二的Lambda表达式
        String defaultGF = cache.get("defaultGF", key -> {
            // 根据key去数据库查询数据
            return "柳岩";
        });
        System.out.println("defaultGF = " + defaultGF);
    }
    

    Caffeine 既然是缓存的一种,肯定需要有缓存的清除策略,不然的话内存总会有耗尽的时候。

  • Caffeine 提供了三种缓存驱逐策略:

    • 基于容量:设置缓存的数量上限
      // 创建缓存对象
      Cache<String, String> cache = Caffeine.newBuilder()
          .maximumSize(1) // 设置缓存大小上限为 1
          .build();
    
    • 基于时间:设置缓存的有效时间
      // 创建缓存对象
      Cache<String, String> cache = Caffeine.newBuilder()
          // 设置缓存有效期为 10 秒,从最后一次写入开始计时
          .expireAfterWrite(Duration.ofSeconds(10))
          .build();
    
    • 基于引用:设置缓存为软引用或弱引用,利用 GC 来回收缓存数据。性能较差,不建议使用。

    注意:在默认情况下,当一个缓存元素过期的时候,Caffeine 不会自动立即将其清理和驱逐。而是在一次读或写操作后,或者在空闲时间完成对失效数据的驱逐。

19.2.2 实现 JVM 进程缓存

  • 利用 Caffeine 实现下列需求:

    • 给根据 id 查询商品的业务添加缓存,缓存未命中时查询数据库

    • 给根据 id 查询商品库存的业务添加缓存,缓存未命中时查询数据库

    • 缓存初始大小为 100

    • 缓存上限为 10000

  • 首先,我们需要定义两个 Caffeine 的缓存对象,分别保存商品、库存的缓存数据。

  • 在 item-service 的 com.heima.item.config 包下定义 CaffeineConfig 类:

@Configuration
public class CaffeineConfig {

    @Bean
    public Cache<Long, Item> itemCache(){
        return Caffeine.newBuilder()
                .initialCapacity(100)
                .maximumSize(10_000)
                .build();
    }

    @Bean
    public Cache<Long, ItemStock> stockCache(){
        return Caffeine.newBuilder()
                .initialCapacity(100)
                .maximumSize(10_000)
                .build();
    }
}
  • 然后,修改 item-service 中的 com.heima.item.web 包下的 ItemController 类,添加缓存逻辑:
@RestController
@RequestMapping("item")
public class ItemController {

    @Autowired
    private IItemService itemService;
    @Autowired
    private IItemStockService stockService;

    @Autowired
    private Cache<Long, Item> itemCache;
    @Autowired
    private Cache<Long, ItemStock> stockCache;

    // ...其它略

    @GetMapping("/{id}")
    public Item findById(@PathVariable("id") Long id) {
        // 先查询缓存,没有则查询数据库,然后缓存
        return itemCache.get(id, key -> itemService.query()
                .ne("status", 3).eq("id", key)
                .one()
        );
    }

    @GetMapping("/stock/{id}")
    public ItemStock findStockById(@PathVariable("id") Long id) {
        return stockCache.get(id, key -> stockService.getById(key));
    }
}

19.3 Lua

Lua 是一种轻量小巧的脚本语言,用标准 C 语言编写并以源代码形式开放,其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。官网:<https://www.lua.org/>

19.3.1 数据类型

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

19.3.2 声明变量

  • Lua 声明变量的时候无需指定数据类型,而是用 local 来声明变量为局部变量:

    -- 声明字符串,可以用单引号或双引号,
    local str = 'hello'
    -- 字符串拼接可以使用 ..
    local str2 = 'hello' .. 'world'
    -- 声明数字
    local num = 21
    -- 声明布尔类型
    local flag = true
    
  • Lua 中的 table 类型既可以作为数组,又可以作为 Java 中的 map 来使用。数组就是特殊的 table,key 是数组角标而已:

    -- 声明数组 ,key为角标的 table
    local arr = {'java', 'python', 'lua'}
    -- 声明table,类似java的map
    local map =  {name='Jack', age=21}
    
  • Lua 中的数组角标是从 1 开始,访问的时候与 Java 中类似:

    -- 访问数组,lua数组的角标从1开始
    print(arr[1])
    
  • Lua 中的 table 可以用 key 来访问:

    -- 访问table
    print(map['name'])
    print(map.name)
    
  • 遍历普通 table

    -- 声明map,也就是table
    local map = {name='Jack', age=21}
    -- 遍历table
    for key,value in pairs(map) do
       print(key, value)
    end
    

19.3.3 循环

  • 对于 table,我们可以利用 for 循环来遍历。不过数组和普通 table 遍历略有差异。

    -- 声明数组 key为索引的 table
    local arr = {'java', 'python', 'lua'}
    -- 遍历数组
    for index,value in ipairs(arr) do
        print(index, value)
    end
    
  • 遍历普通 table

    -- 声明map,也就是table
    local map = {name='Jack', age=21}
    -- 遍历table
    for key,value in pairs(map) do
       print(key, value)
    end
    

19.4 OpenResty

OpenResty® 是一个基于 Nginx 的高性能 Web 平台,用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。具备下列特点:

  • 具备 Nginx 的完整功能

  • 基于 Lua 语言进行扩展,集成了大量精良的 Lua 库、第三方模块

  • 允许使用 Lua 自定义业务逻辑、自定义库

官方网站:<https://openresty.org/cn/>

19.4.1 快速入门

我们希望达到的多级缓存架构如图:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

其中:

  • windows 上的 nginx 用来做反向代理服务,将前端的查询商品的 ajax 请求代理到 OpenResty 集群

  • OpenResty 集群用来编写多级缓存业务

19.4.2 反向代理

现在,商品详情页使用的是假的商品数据。不过在浏览器中,可以看到页面有发起 ajax 请求查询真实商品数据。

  • 这个请求如下:

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • 请求地址是 localhost,端口是 80,就被 windows 上安装的 Nginx 服务给接收到了。然后代理给了 OpenResty 集群:

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • 我们需要在 OpenResty 中编写业务,查询商品数据并返回到浏览器。

  • 但是这次,我们先在 OpenResty 接收请求,返回假的商品数据。

19.4.3 监听请求

  • OpenResty 的很多功能都依赖于其目录下的 Lua 库,需要在 nginx.conf 中指定依赖库的目录,并导入依赖:
  1. 添加对 OpenResty 的 Lua 模块的加载,修改 `/usr/local/openresty/nginx/conf/nginx.conf` 文件,在其中的 http 下面,添加下面代码:

    #lua 模块
    lua_package_path "/usr/local/openresty/lualib/?.lua;;";
    #c模块
    lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
    
  2. 监听/api/item 路径,修改 `/usr/local/openresty/nginx/conf/nginx.conf` 文件,在 nginx.conf 的 server 下面,添加对/api/item 这个路径的监听:

    location  /api/item {
        # 默认的响应类型
        default_type application/json;
        # 响应结果由lua/item.lua文件来决定
        content_by_lua_file lua/item.lua;
    }
    
  • 这个监听,就类似于 SpringMVC 中的 @GetMapping("/api/item") 做路径映射。

  • content_by_lua_file lua/item.lua 则相当于调用 item.lua 这个文件,执行其中的业务,把结果返回给用户。相当于 java 中调用 service。

19.4.4 请求参数处理

  • 要返回真实数据,必须根据前端传递来的商品 id,查询商品信息才可以。那么如何获取前端传递的商品参数呢?

  • 获取参数的 API

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • 获取参数并返回

    • 获取商品 id,修改 `/usr/loca/openresty/nginx/nginx.conf` 文件中监听/api/item 的代码,利用正则表达式获取 ID:

      location ~ /api/item/(\d+) {
          # 默认的响应类型
          default_type application/json;
          # 响应结果由lua/item.lua文件来决定
          content_by_lua_file lua/item.lua;
      }
      
    • 拼接 ID 并返回,修改 `/usr/loca/openresty/nginx/lua/item.lua` 文件,获取 id 并拼接到结果中返回:

      -- 获取商品id
      local id = ngx.var[1]
      -- 拼接并返回
      ngx.say('{"id":' .. id .. ',"name":"SALSA AIR","title":"RIMOWA 21寸托运箱拉杆箱 SALSA AIR系列果绿色 820.70.36.4","price":17900,"image":"https://m.360buyimg.com/mobilecms/s720x720_jfs/t6934/364/1195375010/84676/e9f2c55f/597ece38N0ddcbc77.jpg!q70.jpg.webp","category":"拉杆箱","brand":"RIMOWA","spec":"","status":1,"createTime":"2019-04-30T16:00:00.000+00:00","updateTime":"2019-04-30T16:00:00.000+00:00","stock":2999,"sold":31290}')
      
    • 重新加载并测试

      nginx -s reload
      
    • 刷新页面可以看到结果中已经带上了 ID:

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

19.4.5 查询 Tomcat

  • 拿到商品 ID 后,本应去缓存中查询商品信息,不过目前我们还未建立 nginx、redis 缓存。因此,这里我们先根据商品 id 去 tomcat 查询商品信息。我们实现如图部分:

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • 需要注意的是,我们的 OpenResty 是在虚拟机,Tomcat 是在 Windows 电脑上。两者 IP 一定不要搞错了。

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • 发送 http 请求的 API

    local resp = ngx.location.capture("/path",{
        method = ngx.HTTP_GET,   -- 请求方式
        args = {a=1,b=2},  -- get方式传参数
    })
    
  • 返回的响应内容包括:

    • resp.status:响应状态码

    • resp.header:响应头,是一个 table

    • resp.body:响应体,就是响应数据

  • 注意:这里的 path 是路径,并不包含 IP 和端口。这个请求会被 nginx 内部的 server 监听并处理。

  • 但是我们希望这个请求发送到 Tomcat 服务器,所以还需要编写一个 server 来对这个路径做反向代理:

     location /path {
         # 这里是windows电脑的ip和Java服务端口,需要确保windows防火墙处于关闭状态
         proxy_pass http://192.168.150.1:8081;
     }
    

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • OpenResty 提供了一个 cjson 的模块用来处理 JSON 的序列化和反序列化。

    • 官方地址:<https://github.com/openresty/lua-cjson/>

19.4.6 基于 ID 的负载均衡

  • 刚才的代码中,我们的 tomcat 是单机部署。而实际开发中,tomcat 一定是集群模式:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • 因此,OpenResty 需要对 tomcat 集群做负载均衡。

  • 而默认的负载均衡规则是轮询模式,当我们查询/item/10001 时:

  • 第一次会访问 8081 端口的 tomcat 服务,在该服务内部就形成了 JVM 进程缓存

  • 第二次会访问 8082 端口的 tomcat 服务,该服务内部没有 JVM 缓存(因为 JVM 缓存无法共享),会查询数据库

  • 因为轮询的原因,第一次查询 8081 形成的 JVM 缓存并未生效,直到下一次再次访问到 8081 时才可以生效,缓存命中率太低了。

怎么办?

如果能让同一个商品,每次查询时都访问同一个 tomcat 服务,那么 JVM 缓存就一定能生效了。也就是说,我们需要根据商品 id 做负载均衡,而不是轮询。

  1. 原理

    • nginx 提供了基于请求路径做负载均衡的算法:nginx 根据请求路径做 hash 运算,把得到的数值对 tomcat 服务的数量取余,余数是几,就访问第几个服务,实现负载均衡。

    • 例如:

      • 我们的请求路径是 /item/10001

      • tomcat 总数为 2 台(8081、8082)

      • 对请求路径/item/1001 做 hash 运算求余的结果为 1

      • 则访问第一个 tomcat 服务,也就是 8081

    • 只要 id 不变,每次 hash 运算结果也不会变,那就可以保证同一个商品,一直访问同一个 tomcat 服务,确保 JVM 缓存生效。

  2. 实现

    1. 修改 /usr/local/openresty/nginx/conf/nginx.conf 文件,实现基于 ID 做负载均衡。

    2. 首先,定义 tomcat 集群,并设置基于路径做负载均衡:

      upstream tomcat-cluster {
          hash $request_uri;
          server 192.168.150.1:8081;
          server 192.168.150.1:8082;
      }
      
    3. 然后,修改对 tomcat 服务的反向代理,目标指向 tomcat 集群:

      location /item {
          proxy_pass http://tomcat-cluster;
      }
      
    4. 重新加载 OpenResty

      nginx -s reload
      

19.5 Redis 缓存预热

Redis 缓存会面临冷启动问题:

冷启动:服务刚刚启动时,Redis 中并没有缓存,如果所有商品数据都在第一次查询时添加缓存,可能会给数据库带来较大压力。

缓存预热:在实际开发中,我们可以利用大数据统计用户访问的热点数据,在项目启动时将这些热点数据提前查询并保存到 Redis 中。

我们数据量较少,并且没有数据统计相关功能,目前可以在启动时将所有数据都放入缓存中。

  • 编写初始化类

    • 缓存预热需要在项目启动时完成,并且必须是拿到 RedisTemplate 之后。

    • 这里我们利用 InitializingBean 接口来实现,因为 InitializingBean 可以在对象被 Spring 创建并且成员变量全部注入后执行。

      @Component
      public class RedisHandler implements InitializingBean {
      
          @Autowired
          private StringRedisTemplate redisTemplate;
      
          @Autowired
          private IItemService itemService;
          @Autowired
          private IItemStockService stockService;
      
          private static final ObjectMapper MAPPER = new ObjectMapper();
      
          @Override
          public void afterPropertiesSet() throws Exception {
              // 初始化缓存
              // 1.查询商品信息
              List<Item> itemList = itemService.list();
              // 2.放入缓存
              for (Item item : itemList) {
                  // 2.1.item序列化为JSON
                  String json = MAPPER.writeValueAsString(item);
                  // 2.2.存入redis
                  redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
              }
      
              // 3.查询商品库存信息
              List<ItemStock> stockList = stockService.list();
              // 4.放入缓存
              for (ItemStock stock : stockList) {
                  // 2.1.item序列化为JSON
                  String json = MAPPER.writeValueAsString(stock);
                  // 2.2.存入redis
                  redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
              }
          }
      }
      
  • 封装 redis 工具

    • OpenResty 提供了操作 Redis 的模块,我们只要引入该模块就能直接使用。但是为了方便,我们将 Redis 操作封装到之前的 common.lua 工具库中。

    • 修改 /usr/local/openresty/lualib/common.lua 文件:

    • 引入 Redis 模块,并初始化 Redis 对象

      -- 导入redis
      local redis = require('resty.redis')
      -- 初始化redis
      local red = redis:new()
      red:set_timeouts(1000, 1000, 1000)
      
    • 封装函数,用来释放 Redis 连接,其实是放入连接池

      -- 关闭redis连接的工具方法,其实是放入连接池
      local function close_redis(red)
          local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
          local pool_size = 100 --连接池大小
          local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
          if not ok then
              ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
          end
      end
      
    • 封装函数,根据 key 查询 Redis 数据

      -- 查询redis的方法 ip和port是redis地址,key是查询的key
      local function read_redis(ip, port, key)
          -- 获取一个连接
          local ok, err = red:connect(ip, port)
          if not ok then
              ngx.log(ngx.ERR, "连接redis失败 : ", err)
              return nil
          end
          -- 查询redis
          local resp, err = red:get(key)
          -- 查询失败处理
          if not resp then
              ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
          end
          --得到的数据为空处理
          if resp == ngx.null then
              resp = nil
              ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
          end
          close_redis(red)
          return resp
      end
      
    • 导出

      -- 将方法导出
      local _M = {
          read_http = read_http,
          read_redis = read_redis
      }
      return _M
      
    • 完整的 common.lua:

      -- 导入redis
      local redis = require('resty.redis')
      -- 初始化redis
      local red = redis:new()
      red:set_timeouts(1000, 1000, 1000)
      
      -- 关闭redis连接的工具方法,其实是放入连接池
      local function close_redis(red)
          local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
          local pool_size = 100 --连接池大小
          local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
          if not ok then
              ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
          end
      end
      
      -- 查询redis的方法 ip和port是redis地址,key是查询的key
      local function read_redis(ip, port, key)
          -- 获取一个连接
          local ok, err = red:connect(ip, port)
          if not ok then
              ngx.log(ngx.ERR, "连接redis失败 : ", err)
              return nil
          end
          -- 查询redis
          local resp, err = red:get(key)
          -- 查询失败处理
          if not resp then
              ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
          end
          --得到的数据为空处理
          if resp == ngx.null then
              resp = nil
              ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
          end
          close_redis(red)
          return resp
      end
      
      -- 封装函数,发送http请求,并解析响应
      local function read_http(path, params)
          local resp = ngx.location.capture(path,{
              method = ngx.HTTP_GET,
              args = params,
          })
          if not resp then
              -- 记录错误信息,返回404
              ngx.log(ngx.ERR, "http查询失败, path: ", path , ", args: ", args)
              ngx.exit(404)
          end
          return resp.body
      end
      -- 将方法导出
      local _M = {
          read_http = read_http,
          read_redis = read_redis
      }
      return _M
      
  • 实现 Redis 查询

    • 查询逻辑是:

      • 根据 id 查询 Redis

      • 如果查询失败则继续查询 Tomcat

      • 将查询结果返回

    -- 导入common函数库
    local common = require('common')
    local read_http = common.read_http
    local read_redis = common.read_redis
    -- 导入cjson库
    local cjson = require('cjson')
    
    -- 封装查询函数
    function read_data(key, path, params)
        -- 查询本地缓存
        local val = read_redis("127.0.0.1", 6379, key)
        -- 判断查询结果
        if not val then
            ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
            -- redis查询失败,去查询http
            val = read_http(path, params)
        end
        -- 返回数据
        return val
    end
    
    -- 获取路径参数
    local id = ngx.var[1]
    
    -- 查询商品信息
    local itemJSON = read_data("item:id:" .. id,  "/item/" .. id, nil)
    -- 查询库存信息
    local stockJSON = read_data("item:stock:id:" .. id, "/item/stock/" .. id, nil)
    
    -- JSON转化为lua的table
    local item = cjson.decode(itemJSON)
    local stock = cjson.decode(stockJSON)
    -- 组合数据
    item.stock = stock.stock
    item.sold = stock.sold
    
    -- 把item序列化为json 返回结果
    ngx.say(cjson.encode(item))
    

19.6 Nginx 本地缓存

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

19.6.1 本地缓存 API

  • OpenResty 为 Nginx 提供了 shard dict 的功能,可以在 nginx 的多个 worker 之间共享数据,实现缓存功能。
  1. 开启共享字典,在 nginx.conf 的 http 下添加配置:

     # 共享字典,也就是本地缓存,名称叫做:item_cache,大小150m
     lua_shared_dict item_cache 150m;
    
  2. 操作共享字典:

    -- 获取本地缓存对象
    local item_cache = ngx.shared.item_cache
    -- 存储, 指定key、value、过期时间,单位s,默认为0代表永不过期
    item_cache:set('key', 'value', 1000)
    -- 读取
    local val = item_cache:get('key')
    

19.6.2 实现本地缓存查询

  1. 修改 /usr/local/openresty/lua/item.lua 文件,修改 read_data 查询函数,添加本地缓存逻辑:

    -- 导入共享词典,本地缓存
    local item_cache = ngx.shared.item_cache
    
    -- 封装查询函数
    function read_data(key, expire, path, params)
        -- 查询本地缓存
        local val = item_cache:get(key)
        if not val then
            ngx.log(ngx.ERR, "本地缓存查询失败,尝试查询Redis, key: ", key)
            -- 查询redis
            val = read_redis("127.0.0.1", 6379, key)
            -- 判断查询结果
            if not val then
                ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
                -- redis查询失败,去查询http
                val = read_http(path, params)
            end
        end
        -- 查询成功,把数据写入本地缓存
        item_cache:set(key, val, expire)
        -- 返回数据
        return val
    end
    
  2. 修改 item.lua 中查询商品和库存的业务,实现最新的 read_data 函数:

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    • 其实就是多了缓存时间参数,过期后 nginx 缓存会自动删除,下次访问即可更新缓存。

    • 这里给商品基本信息设置超时时间为 30 分钟,库存为 1 分钟。

    • 因为库存更新频率较高,如果缓存时间过长,可能与数据库差异较大。

  3. 完整的 item.lua 文件:

    -- 导入common函数库
    local common = require('common')
    local read_http = common.read_http
    local read_redis = common.read_redis
    -- 导入cjson库
    local cjson = require('cjson')
    -- 导入共享词典,本地缓存
    local item_cache = ngx.shared.item_cache
    
    -- 封装查询函数
    function read_data(key, expire, path, params)
        -- 查询本地缓存
        local val = item_cache:get(key)
        if not val then
            ngx.log(ngx.ERR, "本地缓存查询失败,尝试查询Redis, key: ", key)
            -- 查询redis
            val = read_redis("127.0.0.1", 6379, key)
            -- 判断查询结果
            if not val then
                ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
                -- redis查询失败,去查询http
                val = read_http(path, params)
            end
        end
        -- 查询成功,把数据写入本地缓存
        item_cache:set(key, val, expire)
        -- 返回数据
        return val
    end
    
    -- 获取路径参数
    local id = ngx.var[1]
    
    -- 查询商品信息
    local itemJSON = read_data("item:id:" .. id, 1800,  "/item/" .. id, nil)
    -- 查询库存信息
    local stockJSON = read_data("item:stock:id:" .. id, 60, "/item/stock/" .. id, nil)
    
    -- JSON转化为lua的table
    local item = cjson.decode(itemJSON)
    local stock = cjson.decode(stockJSON)
    -- 组合数据
    item.stock = stock.stock
    item.sold = stock.sold
    
    -- 把item序列化为json 返回结果
    ngx.say(cjson.encode(item))
    

19.7 缓存同步

大多数情况下,浏览器查询到的都是缓存数据,如果缓存数据与数据库数据存在较大差异,可能会产生比较严重的后果。

所以我们必须保证数据库数据、缓存数据的一致性,这就是缓存与数据库的同步。

19.7.1 数据同步策略

缓存数据同步的常见方式有三种:

设置有效期:给缓存设置有效期,到期后自动删除。再次查询时更新

  • 优势:简单、方便

  • 缺点:时效性差,缓存过期之前可能不一致

  • 场景:更新频率较低,时效性要求低的业务

同步双写:在修改数据库的同时,直接修改缓存

  • 优势:时效性强,缓存与数据库强一致

  • 缺点:有代码侵入,耦合度高;

  • 场景:对一致性、时效性要求较高的缓存数据

异步通知:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据

  • 优势:低耦合,可以同时通知多个缓存服务

  • 缺点:时效性一般,可能存在中间不一致状态

  • 场景:时效性要求一般,有多个服务需要同步

而异步实现又可以基于 MQ 或者 Canal 来实现:

  1. 基于 MQ 的异步通知:

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    解读:

    • 商品服务完成对数据的修改后,只需要发送一条消息到 MQ 中。

    • 缓存服务监听 MQ 消息,然后完成对缓存的更新

    • 依然有少量的代码侵入。

  2. 基于 Canal 的通知

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    解读:

    • 商品服务完成商品修改后,业务直接结束,没有任何代码侵入

    • Canal 监听 MySQL 变化,当发现变化后,立即通知缓存服务

    • 缓存服务接收到 canal 通知,更新缓存

    • 代码零侵入

19.7.2 Canal

Canal [kə’næl],译意为水道/管道/沟渠,canal 是阿里巴巴旗下的一款开源项目,基于 Java 开发。基于数据库增量日志解析,提供增量数据订阅&消费。GitHub 的地址:https://github.com/alibaba/canal

Canal 是基于 mysql 的主从同步来实现的,MySQL 主从同步的原理如下:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • 1)MySQL master 将数据变更写入二进制日志 (binary log),其中记录的数据叫做 binary log events

  • 2)MySQL slave 将 master 的 binary log events 拷贝到它的中继日志 (relay log)

  • 3)MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

而 Canal 就是把自己伪装成 MySQL 的一个 slave 节点,从而监听 master 的 binary log 变化。再把得到的变化信息通知给 Canal 的客户端,进而完成对其它数据库的同步。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

我们可以利用 Canal 提供的 Java 客户端,监听 Canal 通知消息。当收到变化的消息时,完成对缓存的更新。

不过这里我们会使用 GitHub 上的第三方开源的 canal-starter 客户端。地址:https://github.com/NormanGyllenhaal/canal-client

与 SpringBoot 完美整合,自动装配,比官方客户端要简单好用很多。

<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>
canal:
  destination: heima # canal的集群名字,要与安装canal时设置的名称一致
  server: 192.168.150.101:11111 # canal服务地址
@Data
@TableName("tb_item")
public class Item {
    @TableId(type = IdType.AUTO)
    @Id
    private Long id;//商品id
    @Column(name = "name")
    private String name;//商品名称
    private String title;//商品标题
    private Long price;//价格(分)
    private String image;//商品图片
    private String category;//分类名称
    private String brand;//品牌名称
    private String spec;//规格
    private Integer status;//商品状态 1-正常,2-下架
    private Date createTime;//创建时间
    private Date updateTime;//更新时间
    @TableField(exist = false)
    @Transient
    private Integer stock;
    @TableField(exist = false)
    @Transient
    private Integer sold;
}

通过@Id、@Column、等注解完成 Item 与数据库表字段的映射:

通过实现 EntryHandler<T> 接口编写监听器,监听 Canal 消息。注意两点:

  • 实现类通过 @CanalTable("tb_item") 指定监听的表信息

  • EntryHandler 的泛型是与表对应的实体类

@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {

    @Autowired
    private RedisHandler redisHandler;
    @Autowired
    private Cache<Long, Item> itemCache;

    @Override
    public void insert(Item item) {
        // 写数据到JVM进程缓存
        itemCache.put(item.getId(), item);
        // 写数据到redis
        redisHandler.saveItem(item);
    }

    @Override
    public void update(Item before, Item after) {
        // 写数据到JVM进程缓存
        itemCache.put(after.getId(), after);
        // 写数据到redis
        redisHandler.saveItem(after);
    }

    @Override
    public void delete(Item item) {
        // 删除数据到JVM进程缓存
        itemCache.invalidate(item.getId());
        // 删除数据到redis
        redisHandler.deleteItemById(item.getId());
    }
}

在这里对 Redis 的操作都封装到了 RedisHandler 这个对象中,是我们之前做缓存预热时编写的一个类,内容如下:

@Component
public class RedisHandler implements InitializingBean {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private IItemService itemService;
    @Autowired
    private IItemStockService stockService;

    private static final ObjectMapper MAPPER = new ObjectMapper();

    @Override
    public void afterPropertiesSet() throws Exception {
        // 初始化缓存
        // 1.查询商品信息
        List<Item> itemList = itemService.list();
        // 2.放入缓存
        for (Item item : itemList) {
            // 2.1.item序列化为JSON
            String json = MAPPER.writeValueAsString(item);
            // 2.2.存入redis
            redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
        }

        // 3.查询商品库存信息
        List<ItemStock> stockList = stockService.list();
        // 4.放入缓存
        for (ItemStock stock : stockList) {
            // 2.1.item序列化为JSON
            String json = MAPPER.writeValueAsString(stock);
            // 2.2.存入redis
            redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
        }
    }

    public void saveItem(Item item) {
        try {
            String json = MAPPER.writeValueAsString(item);
            redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public void deleteItemById(Long id) {
        redisTemplate.delete("item:id:" + id);
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到