Redis篇--常见问题篇1--缓存穿透(缓存空值,布隆过滤器,接口限流)

发布于:2024-12-22 ⋅ 阅读:(70) ⋅ 点赞:(0)

1、概述

缓存穿透是指客户端请求的数据既不在Redis缓存中,也不在数据库中。换句话说,缓存和数据库中都不存在该数据,但客户端仍然发起了查询请求。这种情况下,缓存无法命中,请求会直接穿透到数据库,而数据库中也找不到相应的数据,最终返回null或者空结果。如果这种请求量非常大的话,不仅会浪费网络带宽、CPU和I/O等资源,还会额外增大数据库的压力,甚至会导致数据库崩溃。

2、具体场景

1、恶意攻击或爬虫
某些恶意用户或爬虫可能会故意构造大量不存在的键(如随机生成的 ID),频繁请求这些不存在的数据,导致缓存和数据库的负载增加。
2、业务逻辑问题
某些业务场景下,可能存在无效的请求,导致客户端请求了系统中不存在的数据。
3、缓存未命中
正常情况下,如果数据存在但没有被缓存,应该从数据库中加载并更新缓存。但如果数据本身在数据库中也不存在,则会导致缓存穿透。

3、解决方案

(1)、缓存空对象

对于查询不存在的数据,可以在缓存中存储一个空值(如null或特殊的标识符),并设置一个较短的过期时间。这样,后续相同的请求可以直接从缓存中获取空结果,而不会再次访问数据库。

(2)、布隆过滤器 (Bloom Filter)

使用布隆过滤器来快速判断某个键是否可能存在。布隆过滤器是一种空间效率很高的概率型数据结构,它可以告诉你某个元素是否存在于集合中。
虽然它可能会有少量的误判(即认为某个元素存在但实际上不存在),但它可以大大减少无效的数据库查询。

(3)、接口限流 (Rate Limiting)

通过限制每个客户端的请求频率,防止恶意用户或爬虫频繁请求不存在的数据。可以使用令牌桶算法、漏桶算法等限流策略来控制请求速率。但可能会对正常用户的请求产生影响,因此需要合理配置限流规则。

(4)、合理的参数校验 (Parameter Validation)

在业务逻辑层对请求参数进行严格的校验。如:规定key的格式(固定格式:年月日:key等),只有当请求的key满足格式时,才能进入缓存和数据库查询流程。

(5)、黑白名单机制 (Blacklist/Whitelist)

对于已知的恶意请求或无效请求,可以将其加入黑名单,禁止其访问系统。对于合法的请求,可以加入白名单,优先处理。

4、解决示例

(1)、缓存空对象方案

第一步:设置RedisTemplate序列化器

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);

        // 设置键的序列化器为 StringRedisSerializer
        template.setKeySerializer(new StringRedisSerializer());
        template.setHashKeySerializer(new StringRedisSerializer());

        // 设置值的序列化器为 Jackson2JsonRedisSerializer
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);

        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashValueSerializer(jackson2JsonRedisSerializer);

        template.afterPropertiesSet();
        return template;
    }
}

第二步:编写测试类

import com.zw.base.BaseController;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping(value = "redis", method = {RequestMethod.POST, RequestMethod.GET})
public class RedisController extends BaseController {

    @Autowired
    private RedisTemplate redisTemplate;

    // 模拟数据库查询
    private Object queryFromDatabase(String key) {
        // 模拟数据库查询逻辑
        System.out.println("Querying from database for key: " + key);
        return null; // 假设数据库中没有该数据
    }

    @RequestMapping("/get")
    public Object getTest() {
        String key = "key001";
        // 1. 尝试从缓存中获取数据
        Object cachedValue = redisTemplate.opsForValue().get(key);
        if (cachedValue != null) {
            return cachedValue;
        }
        // 2. 如果缓存中没有数据,查询数据库
        Object dbValue = queryFromDatabase(key);

        // 3. 如果数据库中也没有数据,将空结果缓存一段时间
        if (dbValue == null) {
            redisTemplate.opsForValue().set(key, "null", 60, TimeUnit.SECONDS); // 缓存空结果 60 秒
            return "null";
        }

        // 4. 如果数据库中有数据,缓存并返回
        redisTemplate.opsForValue().set(key, dbValue, 600, TimeUnit.SECONDS); // 缓存 10 分钟
        return dbValue;
    }
}

第三步:验证结果
在这里插入图片描述
通过断点调试可以清楚看到,设置null缓存后,会直接走Redis的缓存null,不会在查询数据库了。

(2)、布隆过滤器

本例以Redission实现的布隆过滤器为例。当然通过其他组件实现的布隆过滤器或自定义实现的布隆过滤器都可以。

第一步:导入依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.20.0</version>
</dependency>

第二步:配置类,注入RedissionClient

import com.fasterxml.jackson.databind.ObjectMapper;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        // 设置键的序列化器为 StringRedisSerializer
        template.setKeySerializer(new StringRedisSerializer());
        template.setHashKeySerializer(new StringRedisSerializer());
        // 设置值的序列化器为 Jackson2JsonRedisSerializer
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);

        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashValueSerializer(jackson2JsonRedisSerializer);

        template.afterPropertiesSet();
        return template;
    }

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://localhost:6379");     // 这里测试,直接写死,实际使用需要改成配置文件注入
        return Redisson.create(config);
    }
}

第三步:实现布隆过滤器工具类
工具类中至少包含初始化布隆过滤器,插入key和校验key的三个方法。

import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @Description: redission布隆过滤器工具类
 **/
@Component
public class RedisBloomFilterUtil {
    @Autowired
    RedissonClient redissonClient;
    /**
     * 布隆过滤器初始化
     * @param bloomFilterName 过滤器名称
     */
    public void bloomFilterInit(String bloomFilterName) {
        RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter(bloomFilterName);
        bloomFilter.tryInit(100_967_256L, 0.000001);     // 初始化大小和误判率
    }

    /**
     * 布隆过滤器添加数据
     * @param bloomFilterName 过滤器名称
     * @param value           要添加的key
     */
    public void bloomFilterAdd(String bloomFilterName, String value) {
        RBloomFilter<Object> bloomFilter = redissonClient.getBloomFilter(bloomFilterName);
        bloomFilter.add(value);
    }

    /**
     * 布隆过滤器数据统计
     * @param bloomFilterName 过滤器名称
     * @param value     要校验的key
     */
    public boolean bloomFilterContains(String bloomFilterName, String value) {
        RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter(bloomFilterName);
        return bloomFilter.contains(value);
    }
}

第四步:测试类

import com.zw.base.BaseController;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping(value = "redis2", method = {RequestMethod.POST, RequestMethod.GET})
public class RedisController2 extends BaseController {

    final String bloomFilterName = "zw001";
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    private RedisBloomFilterUtil bloomFilterService;

    // 模拟数据库查询
    private Object queryFromDatabase(String key) {
        // 模拟数据库查询逻辑
        System.out.println("Querying from database for key: " + key);
        return null; // 假设数据库中没有该数据
    }

// 初始化一个布隆过滤器
    @RequestMapping("/init")
    public void initTest() {
        // 初始化创建布隆过滤器,指定布隆过滤器的名称
        bloomFilterService.bloomFilterInit(bloomFilterName);   // 初始化布隆过滤器
    }

// 插入元素
    @RequestMapping("/set")
    public void setTest(@RequestBody DData dData) {
        // 1、设置保存数据
        // 实际业务处理:应该先保存到数据库,数据库保存成功后,将key添加到布隆过滤器中,最后在删除缓存的key
        // 本例模拟布隆过滤器功能,未使用数据库,所以就直接设置缓存了
        bloomFilterService.bloomFilterAdd(bloomFilterName, dData.getKey());    // 将key添加到布隆过滤器中
        redisTemplate.opsForValue().set(dData.getKey(), dData.getValue().toString());    // 添加Redis缓存
    }

// 查询元素
    @RequestMapping("/get")
    public Object getTest(String key) {
        // 1. 先检查布隆过滤器
        if (!bloomFilterService.bloomFilterContains(bloomFilterName, key)) {
            // 如果布隆过滤器认为该key不存在,则直接返回 null
            return "null";
        }

        // 2. 查询Redis获取数据(布隆过滤器认为key存在)
        String cachedData = redisTemplate.opsForValue().get(key);
        if (cachedData != null) {
            // 如果缓存中有数据,直接返回
            return cachedData;
        }

        // 3. 查询数据库(可能Redis中的缓存已经过期了,需要重新查询数据库)
        Object data = queryFromDatabase(key);
        if (data == null) {
            // 如果数据库中也没有数据,说明该key确实不存在。这里集合空对象缓存的策略,防止穿透行为。
            redisTemplate.opsForValue().set(key, "null", 60, TimeUnit.SECONDS);
        }

        // 4. 将数据缓存到Redis中
        redisTemplate.opsForValue().set(key, data.toString(), 60, TimeUnit.SECONDS);
        return data;
    }
}

第五步:测试验证
(1)、调用init方法,创建一个布隆过滤器。
这一步也可以放到项目初始化时去完成,或者自己生成都可以。
在这里插入图片描述
当调用后,查看Redis,可以看到出现了一个zw001的布隆过滤器配置。
在这里插入图片描述
(2)、设置数据,存入布隆过滤器
在这里插入图片描述
如上设置key003的数据,查看Redis发现数据设置成功。
在这里插入图片描述

(3)、查询数据,校验布隆过滤器
查看设置的key003校验是否存在,在校验没有设置的key004对比
在这里插入图片描述
在这里插入图片描述
如上发现查询key003时,正常查询到数据。查询不存在的key004时,布隆过滤器校验失败会直接返回null。

(3)、接口限流

RateLimiter是通过令牌桶算法来控制请求的速率的一个工具类,可以用于控制请求的速率。你可以指定每秒最多可以发放多少个令牌(即 QPS,可以是小数),当有请求到来时,RateLimiter会检查是否有足够的令牌可用。如果有,则允许请求通过,同时令牌减一;如果没有,则返回false,用户可以拒绝请求或让请求等待。

代码示例如:

import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.http.HttpServletRequest;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "redis3", method = {RequestMethod.POST, RequestMethod.GET})
public class RateLimitController {

    // 使用 ConcurrentHashMap存储每个客户端的 RateLimiter
    private static final ConcurrentHashMap<String, RateLimiter> rateLimiters = new ConcurrentHashMap<>();

    // 每个客户端每5秒最多允许1次请求
    private static final double QPS = 0.2;    // 0.2:每秒允许0.2次请求,即每5秒允许一次请求;如果配置为5,则表示每秒允许5次请求,这里配置0.2方便测试验证

    // 获取客户端 IP 地址
    private String getClientIp(HttpServletRequest request) {
        String ip = request.getHeader("X-Forwarded-For");
        if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {
            ip = request.getRemoteAddr();
        }
        return ip;
    }

    @GetMapping("/rateLimit")
    public String getData(HttpServletRequest request) {
        // 获取客户端 IP
        String clientIp = getClientIp(request);

        // 如果该客户端没有 RateLimiter,则创建一个新的 RateLimiter,并指定创建令牌速率;如果已存在则不会创建
        rateLimiters.computeIfAbsent(clientIp, key -> RateLimiter.create(QPS));

        // 获取该客户端的 RateLimiter
        RateLimiter rateLimiter = rateLimiters.get(clientIp);

        // 尝试获取令牌(如果令牌桶中有令牌会获取成功,反之会获取失败,令牌桶生成令牌的速率是创建时指定的QPS)
        if (rateLimiter.tryAcquire()) {
            // 如果获取到令牌,即没有达到限流标准,返回数据。
            return "Data fetched successfully for client: " + clientIp;
        } else {
            // 如果没有获取到令牌,返回限流提示
            return "Too many requests. Please try again later.";
        }
    }
}

测试验证:
如上的示例,我们配置QOS为0.2。简单理解就是每秒允许0.2次请求,实际为每5秒允许一次请求。我们再5秒时间内触发两次请求。
如下为第一次返回成功。
在这里插入图片描述
在5秒之中的第2次请求是拒绝。
在这里插入图片描述

附录:

扩展一下布隆过滤器:

Bloom 过滤器基于多个哈希函数和位数组实现。它的核心思想是使用多个哈希函数将元素映射到位数组中,并将对应的位设置为1。当查询一个元素时,通过对该元素进行相同的哈希计算,检查对应的位是否都为1。如果其中有任何一位为0,则可以确定该元素不在集合中;如果所有位都为1,则该元素可能在集合中,但并不确定,存在一定的概率误判。

结构图示例:
在这里插入图片描述

布隆过滤器解决的穿透的原理:
Redis会预热把key的数据存到布隆过滤器中,请求数据时,通过布隆过滤器查询key是否存在,不存在就返回,存在才会走redis。
在这里插入图片描述

布隆过滤器存在的问题:
当Redis中的键过期了或数据库数据删除了,但布隆过滤器仍然保留着该键的映射。
这种情况下,布隆过滤器仍然认为该键可能存在,从而导致误判。进而造会错误地允许某些请求穿透到数据库,而这些请求实际上应该返回 null 或者 “数据不存在”。

布隆过滤器的特点:
布隆过滤器是一个概率型数据结构,它只能添加元素,不能删除元素。一旦某个元素被添加到布隆过滤器中,它将永远存在于过滤器中,即使Redis中对应的键已经过期。
为什么不能删除呢?
因为hash算法算出的数组位置必须限制在数组的长度之内,所以当key比较多时,有很大概率造成映射位置重复。如果删除某个key的映射位置值(标记为0),就存在误删了其他key的可能。

布隆过滤器问题优化方案:
1、结合缓存空对象方法。即使布隆过滤器放行了过期的key,通过Redis缓存空对象的方法,也可以大大减轻数据库的压力。
2、Redis事件监听机制。前面篇章我们介绍过,当Redis的key过期或其他事件发生,Redis会以发布/订阅模式发布出来。Java是可以做到监听这些事件的,当key过期达到一定数量时,重新获取未过期的key,重新初始化布隆过滤器。