Redisson滑动窗口限流器原理
初始化滑动窗口限流器
核心方法tryAcquire 尝试获取令牌
public class SlidingWindowRateLimiter implements RateLimiter {
// Redisson客户端,用于与Redis集群进行交互
private RedissonClient redissonClient;
// 限流键的前缀,用于在Redis中标识限流相关的键
private static final String LIMIT_KEY_PREFIX = "nft:turbo:limit:";
// 构造函数,接收RedissonClient实例
public SlidingWindowRateLimiter(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
/**
* 尝试获取令牌,判断当前请求是否允许通过
* @param key 限流的唯一标识,例如用户ID、IP地址或接口名称等
* @param limit 时间窗口内允许的最大请求次数
* @param windowSize 时间窗口大小,单位为秒
* @return 如果允许请求返回true,否则返回false
*/
@Override
public Boolean tryAcquire(String key, int limit, int windowSize) {
// 从Redisson获取指定键的限流器
RRateLimiter rRateLimiter = redissonClient.getRateLimiter(LIMIT_KEY_PREFIX + key);
// 如果限流器在Redis中不存在,则进行初始化配置
if (!rRateLimiter.isExists()) {
// 设置限流规则:OVERALL表示全局限流,即所有节点共享同一个限流策略
// limit为窗口内最大请求数,windowSize为窗口大小,单位为秒
rRateLimiter.trySetRate(RateType.OVERALL, limit, windowSize, RateIntervalUnit.SECONDS);
}
// 尝试获取令牌,成功返回true,失败返回false
return rRateLimiter.tryAcquire();
}
}
tryAcquire实现原理
-- 速率
local rate = redis.call("hget", KEYS[1], "rate")
-- 时间区间(ms)
local interval = redis.call("hget", KEYS[1], "interval")
local type = redis.call("hget", KEYS[1], "type")
assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized")
-- {name}:value 分析后面的代码,这个key记录的是当前令牌桶中的令牌数
local valueName = KEYS[2]
-- {name}:permits 这个key是一个zset,记录了请求的令牌数,score则为请求的时间戳
local permitsName = KEYS[4]
-- 单机限流才会用到,集群模式不用关注
if type == "1" then
valueName = KEYS[3]
permitsName = KEYS[5]
end
-- 原版本有bug(https://github.com/redisson/redisson/issues/3197),最新版将这行代码提前了
-- rate为1 arg1这里是 请求的令牌数量(默认是1)。rate必须比请求的令牌数大
assert(tonumber(rate) >= tonumber(ARGV[1]), "Requested permits amount could not exceed defined rate")
-- 第一次执行这里应该是null,会进到else分支
-- 第二次执行到这里由于else分支中已经放了valueName的值进去,所以第二次会进if分支
local currentValue = redis.call("get", valueName)
if currentValue ~= false then
-- 从第一次设的zset中取数据,范围是0 ~ (第二次请求时间戳 - 令牌生产的时间)
-- 可以看到,如果第二次请求时间距离第一次请求时间很短(小于令牌产生的时间),那么这个差值将小于上一次请求的时间,取出来的将会是空列表。反之,能取出之前的请求信息
-- 这里作者将这个取出来的数据命名为expiredValues,可认为指的是过期的数据
local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)
local released = 0
-- lua迭代器,遍历expiredValues,如果有值,那么released等于之前所有请求的令牌数之和,表示应该释放多少令牌
for i, v in ipairs(expiredValues) do
local random, permits = struct.unpack("fI", v)
released = released + permits
end
-- 没有过期请求的话,released还是0,这个if不会进,有过期请求才会进
if released > 0 then
-- 移除zset中所有元素,重置周期
redis.call("zrem", permitsName, unpack(expiredValues))
currentValue = tonumber(currentValue) + released
redis.call("set", valueName, currentValue)
end
-- 这里简单分析下上面这段代码:
-- 1. 只有超过了1个令牌生产周期后的请求,expiredValues才会有值。
-- 2. 以rate为3举例,如果之前发生了两个请求那么现在released为2,currentValue为1 + 2 = 3
-- 以此可以看到,redisson的令牌桶放令牌操作是通过请求时间窗来做的,如果距离上一个请求的时间已经超过了一个令牌生产周期时间,那么令牌桶中的令牌应该得到重置,表示生产rate数量的令牌。
-- 如果当前令牌数 < 请求的令牌数
if tonumber(currentValue) < tonumber(ARGV[1]) then
-- 从zset中找到距离当前时间最近的那个请求,也就是上一次放进去的请求信息
local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), tonumber(ARGV[2]), 'withscores', 'limit', 0, 1);
local random, permits = struct.unpack("fI", nearest[1])
-- 返回 上一次请求的时间戳 - (当前时间戳 - 令牌生成的时间间隔) 这个值表示还需要多久才能生产出足够的令牌
return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval)
else
-- 如果当前令牌数 ≥ 请求的令牌数,表示令牌够多,更新zset
redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
-- valueName存的是当前总令牌数,-1表示取走一个
redis.call("decrby", valueName, ARGV[1])
return nil
end
else
-- set一个key-value数据 记录当前限流器的令牌数
redis.call("set", valueName, rate)
-- 建了一个以当前限流器名称相关的zset,并存入 以score为当前时间戳,以lua格式化字符串{当前时间戳为种子的随机数、请求的令牌数}为value的值。
-- struct.pack第一个参数表示格式字符串,f是浮点数、I是长整数。所以这个格式字符串表示的是把一个浮点数和长整数拼起来的结构体。我的理解就是往zset里记录了最后一次请求的时间戳和请求的令牌数
redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
-- 从总共的令牌数 减去 请求的令牌数。
redis.call("decrby", valueName, ARGV[1])
return nil
end
脚本核心逻辑
参数和初始化
local rate = redis.call("hget", KEYS[1], "rate")
local interval = redis.call("hget", KEYS[1], "interval")
local type = redis.call("hget", KEYS[1], "type")
assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized")
从哈希表中获取限流器配置(速率, 时间间隔, 类型)
断言 确保数据已经被初始化
-- {name}:value 分析后面的代码,这个key记录的是当前令牌桶中的令牌数
local valueName = KEYS[2]
-- {name}:permits 这个key是一个zset,记录了请求的令牌数,score则为请求的时间戳
local permitsName = KEYS[4]
-- 单机限流才会用到,集群模式不用关注
if type == "1" then
valueName = KEYS[3]
permitsName = KEYS[5]
end
核心逻辑
第一次调用 初始化令牌桶
local currentValue = redis.call("get", valueName)
if currentValue ~= false then
-- 处理已有令牌的情况
else
-- 初始化令牌桶 设置令牌桶的初始数量
redis.call("set", valueName, rate)
-- 将第一次的请求 记录在有序集合中
redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
-- 从令牌桶中扣除本次的令牌
redis.call("decrby", valueName, ARGV[1])
return nil
end
后续调用 进入令牌刷新机制
通过zset筛选出过期的令牌 放回令牌桶
-- 从第一次设的zset中取数据,范围是0 ~ (第二次请求时间戳 - 令牌生产的时间)
-- 可以看到,如果第二次请求时间距离第一次请求时间很短(小于令牌产生的时间),那么这个差值将小于上一次请求的时间,取出来的将会是空列表。反之,能取出之前的请求信息
-- 这里作者将这个取出来的数据命名为expiredValues,可认为指的是过期的数据
local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)
local released = 0
-- lua迭代器,遍历expiredValues,如果有值,那么released等于之前所有请求的令牌数之和,表示应该释放多少令牌
for i, v in ipairs(expiredValues) do
local random, permits = struct.unpack("fI", v)
released = released + permits
end
-- 没有过期请求的话,released还是0,这个if不会进,有过期请求才会进
if released > 0 then
-- 移除zset中所有元素,重置周期
redis.call("zrem", permitsName, unpack(expiredValues))
currentValue = tonumber(currentValue) + released
redis.call("set", valueName, currentValue)
end
令牌请求
令牌不足时 返回过期时间
令牌足够时 扣减令牌 记录请求
-- 这里简单分析下上面这段代码:
-- 1. 只有超过了1个令牌生产周期后的请求,expiredValues才会有值。
-- 2. 以rate为3举例,如果之前发生了两个请求那么现在released为2,currentValue为1 + 2 = 3
-- 以此可以看到,redisson的令牌桶放令牌操作是通过请求时间窗来做的,如果距离上一个请求的时间已经超过了一个令牌生产周期时间,那么令牌桶中的令牌应该得到重置,表示生产rate数量的令牌。
-- 如果当前令牌数 < 请求的令牌数
if tonumber(currentValue) < tonumber(ARGV[1]) then
-- 从zset中找到距离当前时间最近的那个请求,也就是上一次放进去的请求信息
local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), tonumber(ARGV[2]), 'withscores', 'limit', 0, 1);
local random, permits = struct.unpack("fI", nearest[1])
-- 返回 上一次请求的时间戳 - (当前时间戳 - 令牌生成的时间间隔) 这个值表示还需要多久才能生产出足够的令牌
return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval)
else
-- 如果当前令牌数 ≥ 请求的令牌数,表示令牌够多,更新zset
redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
-- valueName存的是当前总令牌数,-1表示取走一个
redis.call("decrby", valueName, ARGV[1])
return nil
end
模拟lua脚本执行
假设我们有一个令牌桶限流器,配置如下:
速率 (rate): 5 个令牌 / 秒
时间间隔 (interval): 1000 毫秒
类型 (type): 单机模式 (1)
首次请求(获取 1 个令牌)
ARGV [1]: 1 (请求 1 个令牌)
ARGV [2]: 1630000000000 (时间戳,单位毫秒)
ARGV [3]: 0.12345 (随机数) 随机数目的等下说
当前令牌数 (KEYS[3]):
GET rate_limiter:orders:value
→ “4” // 初始为5,扣除1个后剩余4请求记录 (KEYS[5]):
ZRANGE rate_limiter:orders:permits 0 -1 WITHSCORES
→ 1630000000000 → “\x3F\x9F\xC1\xB8\x00\x00\x00\x01” // 二进制格式:0.12345 (float) + 1 (int)
第二次请求(100 毫秒后,获取 2 个令牌)
ARGV [1]: 2 (请求 2 个令牌)
ARGV [2]: 1630000000100 (时间戳)
ARGV [3]: 0.67890 (随机数)
当前令牌数:
GET rate_limiter:orders:value
→ “2” // 4 - 2 = 2请求记录:
ZRANGE rate_limiter:orders:permits 0 -1 WITHSCORES
→ 1630000000000 → “\x3F\x9F\xC1\xB8\x00\x00\x00\x01”
1630000000100 → “\x3F\xAD\xF7\xB2\x00\x00\x00\x02” // 0.67890 (float) + 2 (int)
第三次请求(500 毫秒后,获取 3 个令牌)
ARGV [1]: 3 (请求 3 个令牌)
ARGV [2]: 1630000000600 (时间戳)
ARGV [3]: 0.98765 (随机数)
当前令牌数:
GET rate_limiter:orders:value
→ “2” // 令牌不足,未扣减请求记录:
ZRANGE rate_limiter:orders:permits 0 -1 WITHSCORES
→ 1630000000000 → “\x3F\x9F\xC1\xB8\x00\x00\x00\x01”
1630000000100 → “\x3F\xAD\xF7\xB2\x00\x00\x00\x02” // 保持不变
第四次请求(1100 毫秒后,获取 1 个令牌)
ARGV [1]: 1 (请求 1 个令牌)
ARGV [2]: 1630000001200 (时间戳)
ARGV [3]: 0.54321 (随机数)
当前令牌数:
GET rate_limiter:orders:value
→ “4” // 过期请求释放3个令牌,加上原有2个,共5个,扣除1个后剩余4请求记录:
ZRANGE rate_limiter:orders:permits 0 -1 WITHSCORES
→ 1630000001200 → “\x3F\x8B\xAE\x47\x00\x00\x00\x01” // 旧请求被清理,仅保留最新请求
总体使用一个string结构来存储当前令牌数量 一个zset存储请求的历史记录 (score: 毫秒级时间戳 value: 二进制数据 struct.pack(“fI”, random, permits)生成 包含客户端生成的随机数和本次请求的令牌数量)