redission 实现滑动窗口(注解)推荐

发布于:2025-07-14 ⋅ 阅读:(23) ⋅ 点赞:(0)

结构目录

在这里插入图片描述

相关代码

<dependency>
   <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.17.0</version>
</dependency>
package org.example.redission.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Redisson配置类
 */
@Configuration
public class RedissonConfig {
    
    @Value("${spring.data.redis.host:localhost}")
    private String redisHost;
    
    @Value("${spring.data.redis.port:6379}")
    private int redisPort;
    
    @Value("${spring.data.redis.password:}")
    private String redisPassword;
    
    @Value("${spring.data.redis.database:0}")
    private int redisDatabase;
    
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        
        String redisUrl = String.format("redis://%s:%d", redisHost, redisPort);
        config.useSingleServer()
                .setAddress(redisUrl)
                .setDatabase(redisDatabase)
                .setConnectionMinimumIdleSize(10)
                .setConnectionPoolSize(64)
                .setConnectTimeout(3000)
                .setTimeout(3000)
                .setRetryAttempts(3)
                .setRetryInterval(1500);
        
        // 如果有密码,设置密码
        if (redisPassword != null && !redisPassword.isEmpty()) {
            config.useSingleServer().setPassword(redisPassword);
        }
        
        return Redisson.create(config);
    }
} 
package org.example.redission.annotation;

import org.example.redission.handler.ExceptionRejectHandler;
import org.example.redission.handler.RejectHandler;

import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;

/**
 * 滑动时间窗口限流注解
 * 基于Redisson的RRateLimiter实现
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RateLimiter {
    
    /**
     * 限流key,支持SpEL表达式
     * 默认使用方法签名作为key
     */
    String key() default "";
    
    /**
     * 限流速率 - 每个时间窗口内允许的请求数量
     */
    long rate() default 10;
    
    /**
     * 时间窗口大小
     */
    long rateInterval() default 1;
    
    /**
     * 时间窗口单位
     */
    TimeUnit rateIntervalUnit() default TimeUnit.SECONDS;
    
    /**
     * 是否等待获取许可
     * true: 等待获取许可直到超时
     * false: 立即返回,不等待
     */
    boolean waitForPermission() default true;
    
    /**
     * 等待超时时间
     */
    long timeout() default 3;
    
    /**
     * 等待超时时间单位
     */
    TimeUnit timeoutUnit() default TimeUnit.SECONDS;
    
    /**
     * 拒绝策略处理器类
     */
    Class<? extends RejectHandler> rejectHandler() default ExceptionRejectHandler.class;
    
    /**
     * 拒绝时返回的消息
     */
    String rejectMessage() default "请求过于频繁,请稍后再试";
} 
package org.example.redission.aspect;

import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.example.redission.annotation.RateLimiter;
import org.example.redission.handler.RejectHandler;
import org.redisson.api.RRateLimiter;
import org.redisson.api.RateIntervalUnit;
import org.redisson.api.RateType;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.expression.MethodBasedEvaluationContext;
import org.springframework.core.DefaultParameterNameDiscoverer;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;

/**
 * 限流切面实现
 */
@Aspect
@Component
@Slf4j
public class RateLimiterAspect {
    
    @Autowired
    private RedissonClient redissonClient;
    
    @Autowired
    private ApplicationContext applicationContext;
    
    private final SpelExpressionParser spelExpressionParser = new SpelExpressionParser();
    private final DefaultParameterNameDiscoverer parameterNameDiscoverer = new DefaultParameterNameDiscoverer();
    
    @Around("@annotation(rateLimiter)")
    public Object around(ProceedingJoinPoint joinPoint, RateLimiter rateLimiter) throws Throwable {
        
        // 获取限流key
        String key = generateKey(joinPoint, rateLimiter);
        
        // 获取RRateLimiter实例
        RRateLimiter limiter = redissonClient.getRateLimiter(key);
        
        // 设置限流规则
        RateIntervalUnit rateUnit = convertToRedissonUnit(rateLimiter.rateIntervalUnit());
        limiter.trySetRate(RateType.OVERALL, rateLimiter.rate(), rateLimiter.rateInterval(), rateUnit);
        
        // 尝试获取许可
        boolean acquired = false;
        
        if (rateLimiter.waitForPermission()) {
            // 等待获取许可
            acquired = limiter.tryAcquire(1, rateLimiter.timeout(), rateLimiter.timeoutUnit());
        } else {
            // 立即尝试获取许可
            acquired = limiter.tryAcquire(1);
        }
        
        if (acquired) {
            // 获取许可成功,执行目标方法
            return joinPoint.proceed();
        } else {
            // 获取许可失败,执行拒绝策略
            return handleRejection(joinPoint, rateLimiter);
        }
    }
    
    /**
     * 生成限流key
     */
    private String generateKey(ProceedingJoinPoint joinPoint, RateLimiter rateLimiter) {
        String key = rateLimiter.key();
        
        if (StringUtils.hasText(key)) {
            // 如果key包含SpEL表达式,解析它
            if (key.contains("#") || key.contains("${")) {
                Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
                EvaluationContext context = new MethodBasedEvaluationContext(
                    joinPoint.getTarget(), method, joinPoint.getArgs(), parameterNameDiscoverer);
                
                Expression expression = spelExpressionParser.parseExpression(key);
                key = expression.getValue(context, String.class);
            }
        } else {
            // 使用方法签名作为默认key
            MethodSignature signature = (MethodSignature) joinPoint.getSignature();
            key = signature.getDeclaringTypeName() + "." + signature.getName();
        }
        
        return "rate_limiter:" + key;
    }
    
    /**
     * 处理拒绝策略
     */
    private Object handleRejection(ProceedingJoinPoint joinPoint, RateLimiter rateLimiter) throws Throwable {
        String message = rateLimiter.rejectMessage();
        Class<? extends RejectHandler> handlerClass = rateLimiter.rejectHandler();
        
        // 通过Spring容器获取处理器实例
        RejectHandler handler;
        try {
            handler = applicationContext.getBean(handlerClass);
        } catch (Exception e) {
            // 如果无法从容器获取,则创建新实例
            handler = handlerClass.getDeclaredConstructor().newInstance();
        }
        
        return handler.handleReject(joinPoint, message);
    }
    

    
    /**
     * 转换时间单位
     */
    private RateIntervalUnit convertToRedissonUnit(TimeUnit timeUnit) {
        switch (timeUnit) {
            case SECONDS:
                return RateIntervalUnit.SECONDS;
            case MINUTES:
                return RateIntervalUnit.MINUTES;
            case HOURS:
                return RateIntervalUnit.HOURS;
            case DAYS:
                return RateIntervalUnit.DAYS;
            default:
                return RateIntervalUnit.SECONDS;
        }
    }
} 
package org.example.redission.exception;

/**
 * 限流异常
 */
public class RateLimitException extends RuntimeException {
    
    public RateLimitException(String message) {
        super(message);
    }
    
    public RateLimitException(String message, Throwable cause) {
        super(message, cause);
    }
} 

handler 处理各种异常

package org.example.redission.handler;

import org.aspectj.lang.ProceedingJoinPoint;

/**
 * 拒绝策略处理接口
 */
public interface RejectHandler {
    
    /**
     * 处理拒绝情况
     * 
     * @param joinPoint 切点
     * @param message 拒绝消息
     * @return 处理结果
     * @throws Throwable 可能抛出的异常
     */
    Object handleReject(ProceedingJoinPoint joinPoint, String message) throws Throwable;
} 
package org.example.redission.handler;

import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * 自定义拒绝处理器示例
 * 演示如何实现自定义的拒绝处理逻辑
 */
@Component
@Slf4j
public class CustomRejectHandler implements RejectHandler {
    
    @Override
    public Object handleReject(ProceedingJoinPoint joinPoint, String message) throws Throwable {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        String methodName = signature.getName();
        
        log.warn("自定义拒绝处理器被触发: 方法={}, 消息={}", methodName, message);
        
        // 根据方法名执行不同的处理逻辑
        if (methodName.contains("user")) {
            return handleUserRelatedMethod(joinPoint, message);
        } else if (methodName.contains("data")) {
            return handleDataRelatedMethod(joinPoint, message);
        } else {
            return handleDefaultMethod(joinPoint, message);
        }
    }
    
    /**
     * 处理用户相关方法的限流
     */
    private Object handleUserRelatedMethod(ProceedingJoinPoint joinPoint, String message) {
        Class<?> returnType = ((MethodSignature) joinPoint.getSignature()).getReturnType();
        
        if (returnType.equals(String.class)) {
            return String.format("用户访问受限: %s", message);
        }
        
        return null;
    }
    
    /**
     * 处理数据相关方法的限流
     */
    private Object handleDataRelatedMethod(ProceedingJoinPoint joinPoint, String message) {
        Class<?> returnType = ((MethodSignature) joinPoint.getSignature()).getReturnType();
        
        if (returnType.equals(String.class)) {
            return "数据服务暂时不可用,请稍后再试";
        } else if (returnType.equals(Map.class)) {
            Map<String, Object> errorResponse = new HashMap<>();
            errorResponse.put("error", true);
            errorResponse.put("message", message);
            errorResponse.put("timestamp", System.currentTimeMillis());
            return errorResponse;
        }
        
        return null;
    }
    
    /**
     * 处理默认方法的限流
     */
    private Object handleDefaultMethod(ProceedingJoinPoint joinPoint, String message) {
        Class<?> returnType = ((MethodSignature) joinPoint.getSignature()).getReturnType();
        
        if (returnType.equals(String.class)) {
            return "服务繁忙,请稍后再试";
        }
        
        return null;
    }
} 
package org.example.redission.handler;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;

/**
 * 默认值拒绝处理器
 * 当限流时返回方法返回类型的默认值
 */
@Component
public class DefaultValueRejectHandler implements RejectHandler {
    
    @Override
    public Object handleReject(ProceedingJoinPoint joinPoint, String message) throws Throwable {
        return getDefaultValue(joinPoint);
    }
    
    /**
     * 获取默认返回值
     */
    private Object getDefaultValue(ProceedingJoinPoint joinPoint) {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Class<?> returnType = signature.getReturnType();
        
        if (returnType.equals(Void.TYPE)) {
            return null;
        } else if (returnType.equals(Boolean.TYPE) || returnType.equals(Boolean.class)) {
            return false;
        } else if (returnType.equals(Integer.TYPE) || returnType.equals(Integer.class)) {
            return 0;
        } else if (returnType.equals(Long.TYPE) || returnType.equals(Long.class)) {
            return 0L;
        } else if (returnType.equals(Double.TYPE) || returnType.equals(Double.class)) {
            return 0.0;
        } else if (returnType.equals(Float.TYPE) || returnType.equals(Float.class)) {
            return 0.0f;
        } else if (returnType.equals(String.class)) {
            return "";
        } else {
            return null;
        }
    }
} 
package org.example.redission.handler;

import org.aspectj.lang.ProceedingJoinPoint;
import org.example.redission.exception.RateLimitException;
import org.springframework.stereotype.Component;

/**
 * 异常拒绝处理器
 * 当限流时抛出异常
 */
@Component
public class ExceptionRejectHandler implements RejectHandler {
    
    @Override
    public Object handleReject(ProceedingJoinPoint joinPoint, String message) throws Throwable {
        throw new RateLimitException(message);
    }
} 
package org.example.redission.handler;

import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;

/**
 * 降级拒绝处理器
 * 当限流时执行降级逻辑
 */
@Component
@Slf4j
public class FallbackRejectHandler implements RejectHandler {
    
    @Override
    public Object handleReject(ProceedingJoinPoint joinPoint, String message) throws Throwable {
        log.warn("Rate limit exceeded, executing fallback for method: {}, message: {}", 
            joinPoint.getSignature().toShortString(), message);
        
        // 这里可以扩展为调用降级方法
        return getFallbackValue(joinPoint);
    }
    
    /**
     * 获取降级返回值
     * 可以扩展为调用具体的降级方法
     */
    private Object getFallbackValue(ProceedingJoinPoint joinPoint) {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Class<?> returnType = signature.getReturnType();
        
        if (returnType.equals(Void.TYPE)) {
            return null;
        } else if (returnType.equals(Boolean.TYPE) || returnType.equals(Boolean.class)) {
            return false;
        } else if (returnType.equals(Integer.TYPE) || returnType.equals(Integer.class)) {
            return -1; // 降级时返回-1表示失败
        } else if (returnType.equals(Long.TYPE) || returnType.equals(Long.class)) {
            return -1L;
        } else if (returnType.equals(Double.TYPE) || returnType.equals(Double.class)) {
            return -1.0;
        } else if (returnType.equals(Float.TYPE) || returnType.equals(Float.class)) {
            return -1.0f;
        } else if (returnType.equals(String.class)) {
            return "服务繁忙,请稍后再试"; // 降级时返回友好提示
        } else {
            return null;
        }
    }
} 
package org.example.redission.handler;

import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.springframework.stereotype.Component;

/**
 * 日志拒绝处理器
 * 当限流时记录日志并返回null
 */
@Component
@Slf4j
public class LogAndReturnNullRejectHandler implements RejectHandler {
    
    @Override
    public Object handleReject(ProceedingJoinPoint joinPoint, String message) throws Throwable {
        log.warn("Rate limit exceeded for method: {}, message: {}", 
            joinPoint.getSignature().toShortString(), message);
        return null;
    }
} 

测试类

package org.example.redission.controller;

import org.example.redission.annotation.RateLimiter;
import org.example.redission.handler.DefaultValueRejectHandler;
import org.example.redission.handler.FallbackRejectHandler;
import org.example.redission.handler.LogAndReturnNullRejectHandler;
import org.example.redission.handler.CustomRejectHandler;
import org.springframework.web.bind.annotation.*;

import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;

/**
 * 测试控制器,展示限流注解的使用
 */
@RestController
@RequestMapping("/test")
public class TestController {
    
    /**
     * 基本限流测试 - 每秒允许5个请求
     */
    @GetMapping("/basic")
    @RateLimiter(rate = 1, rateInterval = 1, rateIntervalUnit = TimeUnit.SECONDS)
    public String basicRateLimit() {
        return "Basic rate limit test success at " + LocalDateTime.now();
    }
    
    /**
     * 自定义key限流测试 - 基于用户ID
     */
    @GetMapping("/user/{userId}")
    @RateLimiter(
        key = "'user:' + #userId", 
        rate = 1,
        rateInterval = 1, 
        rateIntervalUnit = TimeUnit.SECONDS,
        rejectMessage = "用户请求过于频繁,请稍后再试"
    )
    public String userRateLimit(@PathVariable String userId) {
        return "User " + userId + " rate limit test success at " + LocalDateTime.now();
    }
    
    /**
     * 不等待限流测试 - 立即返回
     */
    @GetMapping("/nowait")
    @RateLimiter(
        rate = 1,
        rateInterval = 10,
        rateIntervalUnit = TimeUnit.SECONDS,
        waitForPermission = false,
        rejectHandler = DefaultValueRejectHandler.class
    )
    public String noWaitRateLimit() {
        return "No wait rate limit test success at " + LocalDateTime.now();
    }
    
    /**
     * 返回null策略测试
     */
    @GetMapping("/null")
    @RateLimiter(
        rate = 1, 
        rateInterval = 3,
        waitForPermission = false,
        rateIntervalUnit = TimeUnit.SECONDS,
        rejectHandler = LogAndReturnNullRejectHandler.class
    )
    public String nullRateLimit() {
        System.out.println(1);
        return "Null rate limit test success at " + LocalDateTime.now();
    }
    
    /**
     * 基于请求参数的限流
     */
    @PostMapping("/param")
    @RateLimiter(
        key = "'param:' + #request.type + '_' + #request.category",
        rate = 1,
        rateInterval = 1,
        rateIntervalUnit = TimeUnit.MINUTES
    )
    public String paramRateLimit(@RequestBody TestRequest request) {
        return "Param rate limit test success for " + request.getType() + 
               " and " + request.getCategory() + " at " + LocalDateTime.now();
    }
    
    /**
     * 自定义拒绝处理器测试
     */
    @GetMapping("/custom")
    @RateLimiter(
        rate = 1,
        rateInterval = 5,
        rateIntervalUnit = TimeUnit.SECONDS,
        waitForPermission = false,
        rejectHandler = CustomRejectHandler.class,
        rejectMessage = "自定义限流处理"
    )
    public String customRateLimit() {
        return "Custom rate limit test success at " + LocalDateTime.now();
    }
    
    /**
     * 降级拒绝处理器测试
     */
    @GetMapping("/fallback")
    @RateLimiter(
        rate = 1,
        rateInterval = 5,
        rateIntervalUnit = TimeUnit.SECONDS,
        waitForPermission = false,
        rejectHandler = FallbackRejectHandler.class,
        rejectMessage = "系统繁忙,执行降级"
    )
    public String fallbackRateLimit() {
        return "Fallback rate limit test success at " + LocalDateTime.now();
    }
    
    /**
     * 测试请求对象
     */
    public static class TestRequest {
        private String type;
        private String category;
        
        public String getType() {
            return type;
        }
        
        public void setType(String type) {
            this.type = type;
        }
        
        public String getCategory() {
            return category;
        }
        
        public void setCategory(String category) {
            this.category = category;
        }
    }
} 

网站公告

今日签到

点亮在社区的每一天
去签到