每秒扛住10万请求?RedissonRateLimiter 分布式限流器详解

发布于:2025-09-04 ⋅ 阅读:(13) ⋅ 点赞:(0)


种一棵树最好的时间是10年前,其次就是现在,加油!
                                                                                   --by蜡笔小柯南

RedissonRateLimiter作为方便好用的限流工具,在某些场景下,极简了我们的开发,通过简单几行代码,就能搞定限流。那么,如何好用的限流器,底层是如何实现的呢?接下来,让我们一起去探索!

Java源码分析

这是 RedissonRateLimiter 类下的 tryAcquireAsync 方法,限流的主要逻辑在这里面

private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
        byte[] random = getServiceManager().generateIdArray();

        return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "local rate = redis.call('hget', KEYS[1], 'rate');"
              + "local interval = redis.call('hget', KEYS[1], 'interval');"
              + "local type = redis.call('hget', KEYS[1], 'type');"
              + "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"
              
              + "local valueName = KEYS[2];"
              + "local permitsName = KEYS[4];"
              + "if type == '1' then "
                  + "valueName = KEYS[3];"
                  + "permitsName = KEYS[5];"
              + "end;"

              + "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "

              + "local currentValue = redis.call('get', valueName); "
              + "local res;"
              + "if currentValue ~= false then "
                     + "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
                     + "local released = 0; "
                     + "for i, v in ipairs(expiredValues) do "
                          + "local random, permits = struct.unpack('Bc0I', v);"
                          + "released = released + permits;"
                     + "end; "

                     + "if released > 0 then "
                          + "redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
                          + "if tonumber(currentValue) + released > tonumber(rate) then "
                               + "currentValue = tonumber(rate) - redis.call('zcard', permitsName); "
                          + "else "
                               + "currentValue = tonumber(currentValue) + released; "
                          + "end; "
                          + "redis.call('set', valueName, currentValue);"
                     + "end;"

                     + "if tonumber(currentValue) < tonumber(ARGV[1]) then "
                         + "local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); "
                         + "res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));"
                     + "else "
                         + "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "
                         + "redis.call('decrby', valueName, ARGV[1]); "
                         + "res = nil; "
                     + "end; "
              + "else "
                     + "redis.call('set', valueName, rate); "
                     + "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "
                     + "redis.call('decrby', valueName, ARGV[1]); "
                     + "res = nil; "
              + "end;"

              + "local ttl = redis.call('pttl', KEYS[1]); "
              + "if ttl > 0 then "
                  + "redis.call('pexpire', valueName, ttl); "
                  + "redis.call('pexpire', permitsName, ttl); "
              + "end; "
              + "return res;",
                Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),
                value, System.currentTimeMillis(), random);
    }

可以看到,使用了大量的 lua 脚本,其中,KEYS[1]、ARGV[1]分别表示取的key的第1个值,以及参数的第1个值,中括号里面的数字,表示取的是第几个值。

我们看一下 evalWriteAsync 方法的定义,如下:

<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);

最后两个参数,一个是 List<Object> keys,一个是 Object... params。KEYS[1]就是从 keys 中取第1个值,ARGV[1]就是从 params 中取第1个值。

Lua脚本分析

创建限流器

使用 Hash 结构来保存

保存和获取的命令分别为:

  • hset key field value
  • hget key field
redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);
redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);
return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);

创建限流器,向redis中保存了 rateintervaltype 信息

尝试获取令牌

通过 Lua 脚本实现,我把 Lua 脚本拷贝出来,写了注释

-- 速率,即规定的时间内,能够通过多少个
local rate = redis.call('hget', KEYS[1], 'rate');
-- 时间间隔,即规定的时间,如60秒
local interval = redis.call('hget', KEYS[1], 'interval');
-- 类型,用于区分单机限流还是集群限流,默认type=0,所以这里不用关注
local type = redis.call('hget', KEYS[1], 'type');
-- 校验,rate!=false && interval!=false && type!=false,其中任何一个不满足则抛出异常:RateLimiter is not initialized
assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')

-- {keyName}:value,大括号内是key的名称,后面+冒号+value共同组成valueName
local valueName = KEYS[2];

-- {keyName}:permits,大括号内是key的名称,后面+冒号+ permits共同组成permitsName
local permitsName = KEYS[4];

-- type即为第6行获取到的type的值,当type=1时才会走下面的逻辑,由于type默认为0,所以这里不用关注
if type == '1' then 
  valueName = KEYS[3];
  permitsName = KEYS[5];
end;

-- ARGV[1]是请求的令牌数量,rate必须要比请求的大
-- 如:60秒允许1个,rate=1,如果请求的令牌数是2,即rate<ARGV[1],则抛出异常,异常信息为:Requested permits amount could not exceed defined rate
assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); 

-- get valueName --> get {keyName}:value,第一次执行redis中什么都还没有,所以currentValue为null
-- 第二次执行,currentValue有值
local currentValue = redis.call('get', valueName); 
local res;

-- 第一次:currentValue为null,进入else分支
-- 第二次,currentValue不为null,进入下面的if分支
if currentValue ~= false then 
  -- 从zset中取数据,{keyName}:permits为key,取的区间是 0 ~ (当前时间戳 - 时间间隔)
  -- 如:第一次请求的时间是8:00:00,第二次请求的时间是8:00:30,当前时间戳就是8点30秒,时间间隔是60秒
  -- 8:00:30减去60秒,7:59:30,则取的数据就是0 ~ 7:59:30 这区间的数据
  -- 由于第一次的请求时间是8:00:00,所以取出的expiredValues值为空,也就是过期的数据
  local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); 
  local released = 0; 
  -- 遍历expiredValues,如果expiredValues有值,released=之前所有请求的令牌数之和
  for i, v in ipairs(expiredValues) do 
    local random, permits = struct.unpack('Bc0I', v);
    -- 表示释放或者回收已过期的令牌,供下次请求使用
    released = released + permits;
  end; 

  -- released大于0,说明有过期的数据,如果没有,不会进入此if分支
  if released > 0 then 
    -- {keyName}:permits为key,区间是0 ~ (当前时间戳 - 时间间隔),将所有过期的数据从zset中移除
    redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); 
    -- 如果当前令牌数 + 释放(回收)的令牌数 > rate
    if tonumber(currentValue) + released > tonumber(rate) then 
      -- 当前的令牌数 = rate - zset中的元素个数
      currentValue = tonumber(rate) - redis.call('zcard', permitsName); 
    else 
      -- 当前的令牌数 = 当前的令牌数 + 释放(回收)的令牌数
      currentValue = tonumber(currentValue) + released; 
    end; 
    -- 将更新后的当前令牌数的值,重新保存到{keyName}:value中,表示当前可用的令牌总数
    redis.call('set', valueName, currentValue);
  end;

  -- 如果当前的令牌数小于请求的令牌数
  if tonumber(currentValue) < tonumber(ARGV[1]) then 
    -- 获取zset中最早的一次请求,返回的firstValue中包含此次请求的令牌数和请求时间
    local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); 
    -- ARGV[2]:当前时间戳 firstValue[2]:最早请求的时间戳,这里不同的版本源码有区别
    -- 最终表示的是,距离下次令牌产生还需要多长时间
    res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));
    
  -- 表示当前的令牌数>=请求的令牌数
  else 
    -- 更新zset,当前请求的令牌数以及当前时间戳
    redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); 
    -- 对{keyName}:value做自减,总的令牌数 - 请求的令牌数
    redis.call('decrby', valueName, ARGV[1]); 
    res = nil; 
  end; 

else 
  -- set一个key-value数据,记录当前限流器的令牌数量
  -- set {keyName}:value rate
  redis.call('set', valueName, rate); 
  -- 保存一个zset数据,{keyName}:permits为key,score是当前时间戳,member是经过格式化的,在redis可视化工具中
  -- 显示的是编码。实际上记录的就是这次请求的令牌数。所以,这个zset的数据,保存的就是请求的时间戳和请求的令牌数量
  redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); 
  -- 对{keyName}:value做自减,总的令牌数 - 请求的令牌数
  redis.call('decrby', valueName, ARGV[1]); 
  res = nil; 
end;

-- 获取key的过期时间,如果设置了过期时间,则进行续期
local ttl = redis.call('pttl', KEYS[1]); 
if ttl > 0 then 
  redis.call('pexpire', valueName, ttl); 
  redis.call('pexpire', permitsName, ttl); 
end; 
return res;

redisson使用了zset 数据结构来保存请求的信息,这样可以通过比较 score 分数,也就是请求的时间戳,来判断令牌需不需要生产。如果当前请求距离上一次请求超过了一个令牌生产周期,则说明令牌需要生产,之前用掉了多少个,就生产多少个,而之前用掉了多少个令牌的信息也在 zset 中保存了;如果没有超过一个令牌周期,则判断剩余可用的令牌数是否满足请求的令牌数,如果满足,则通过,如果不满足,则拒绝。

案例实战

比如:我们设置,60秒内只允许1个请求通过,也就是说,每60秒,才会生产1个令牌,每次只允许请求1个令牌

这里我们使用发短信场景,60秒内,只允许发送1条短信

key为:telephone:limit:13612345678
rate为:1
interval为:60秒

创建完成后,redis中保存的数据结构是这样的,我们传入的是60秒,在实际保存时会转换为毫秒,所以interval的值是60000

在这里插入图片描述

第一次请求进入

分析源码可得,Lua 脚本中,KEYS[1]的值就是key的值,即KEYS[1] = telephone:limit:13612345678

local rate = redis.call('hget', KEYS[1], 'rate');
local interval = redis.call('hget', KEYS[1], 'interval');
local type = redis.call('hget', KEYS[1], 'type');

Lua 脚本中,这3条语句,转换为redis的实际命令就是:

  • hget telephone:limit:13612345678 rate
  • hget telephone:limit:13612345678 interval
  • hget telephone:limit:13612345678 type

获取到的值就是redis中hash结构保存的信息,得到:

  • rate:1
  • interval:60000
  • type:0

下面,继续执行这条语句:

assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')

这条语句对上面获取到的3个字段做了校验,rate、interval、type 必须同时不为空,否则,抛出异常,异常信息为:RateLimiter is not initialized

下面,继续执行以下语句:

local valueName = KEYS[2];
local permitsName = KEYS[4];
if type == '1' then
	valueName = KEYS[3];
	permitsName = KEYS[5];
end;

经源码分析可得
KEYS[2] = {telephone:limit:13612345678}:value
KEYS[4] = {telephone:limit:13612345678}:permits

即:
valueName = {telephone:limit:13612345678}:value
permitsName = {telephone:limit:13612345678}:permits

由于 type = 0 ,所以这里的if分支不进入。继续执行后面语句

assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'

这条语句是校验:请求的令牌数不能超过设置的 rate 数,如:我们 rate 设为了1,请求的令牌数是2,则抛出异常,异常信息为:Requested permits amount could not exceed defined rate

继续执行后续语句,

local currentValue = redis.call('get', valueName); 
local res;
if currentValue ~= false then

currentValue 实际执行的redis命令是:

  • get {telephone:limit:13612345678}:value

由于是第一次请求,此时redis中除了保存的 hash 结构的数据外,没有其他任何数据,所以 currentValue 获取到的就是null,再判断 currentValue,由于它为null,if 分支进不去,直接到else分支

else 
  redis.call('set', valueName, rate); 
  redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); 
  redis.call('decrby', valueName, ARGV[1]); 
  res = nil; 
end;

set valueName rate 转换为redis命令就是:

set {telephone:limit:13612345678}:value 1

此时,currentValue才保存到redis中

再向redis中保存一个 zset ,key是permitsName,即{telephone:limit:13612345678}:permits,redis中 zadd 的命令格式是:
zadd key socre member ,最终,score就是当前的时间戳,member就是当前请求的令牌数

在这里插入图片描述

最后,使用 decrby 命令对 valueName 进行自减
decrby {telephone:limit:13612345678}:value 1

自减1后,此时,redis中的值为0

在这里插入图片描述

继续执行后续语句:

local ttl = redis.call('pttl', KEYS[1]);
if ttl > 0 then
	redis.call('pexpire', valueName, ttl);
	redis.call('pexpire', permitsName, ttl);
end
return res;

获取过期时间,默认我们没有手动设置过期时间,所以获取到的 ttl = -1 ,不满足 if 条件,if 分支不执行。后续我们直接略过此段代码。

至此,第一次请求完成,向redis中新增的key有:

  • {telephone:limit:13612345678}:value
    • String 结构,值为0
  • {telephone:limit:13612345678}:permits
    • hash 结构,member为请求的令牌数,score为这次请求的时间戳

第二次请求进入

当执行语句到local currentValue = redis.call('get', valueName);时,currentValue有值,为0,就会进入if currentValue ~= false分支

后续执行的语句为:

if currentValue ~= false then
	local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);
	local released = 0;
	for i, v in ipairs(expiredValues) do
		local random, permits = struct.unpack('Bc0I', v);
		released = released + permits;
end;

redis.call 实际执行的redis命令是:zrangebyscore {telephone:limit:13612345678}:permits 0 当前时间-interval,从zset中取数据,key是{telephone:limit:13612345678}:permits,范围是 0 ~ (当前时间戳 - interval)。

例如:第一次请求的时间是8:00:00,请求的令牌数量是1个,那么zset中就会记录第一次请求的时间和请求的令牌数量;第二次的请求时间是8:00:30,interval是60秒,第二次请求的时间减去interval 就是7:59:30,获取的zset数据范围是 0 ~ 7:59:30,因为第一次请求是8:00:00,所以这次获取不到数据,expiredValues就是空,released = 0,for循环不会进入,此语句块执行结束。

因为 released = 0,所以 if released > 0 的分支不会进入,直接走到后面的语句。

if tonumber(currentValue) < tonumber(ARGV[1]) then
	local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores');
	res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));
else
	redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1]));
	redis.call('decrby', valueName, ARGV[1]);
	res = nil;
end;

currentValue是当前的令牌数,ARGV[1] 是请求的令牌数,如果当前的令牌数 < 请求的令牌数,则获取zset中按score排序后,score最小的那一条数据,即时间最早的那一条数据,转换为redis命令是:zrange {telephone:limit:13612345678}:permits 0 0 withscores,如果firstValue有值,说明这次请求应该被拒绝,没有可用的令牌提供给这次请求,返回距离下次令牌产生还需要多长时间。

如:60秒允许1个请求通过,9:00:00第一个请求,请求1个令牌,成功通过,令牌分配给第一个请求,此时当前可用令牌为0,9:00:05第二个请求,请求1个令牌,当前已没有空闲可用的令牌,得需要下一个时间周期后才会产生新的令牌,也就是60秒后,第二个请求只比第一个请求晚了5秒,小于令牌产生的周期,所以,第二个请求则不通过,返回距离下次令牌产生还需要多长时间。

下面是else分支执行的语句:

else
	redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); 
	redis.call('decrby', valueName, ARGV[1]); 
	res = nil; 
end; 

如果当前的令牌数 >= 请求的令牌数,走入else分支,这里就比较简单,向zset中保存数据,对valueName做自减1,结束。

下面讨论released > 0 的情况:

if currentValue ~= false then
	local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);
	local released = 0;
	for i, v in ipairs(expiredValues) do
		local random, permits = struct.unpack('Bc0I', v);
		released = released + permits;
end;
if released > 0 then
	redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);
	if tonumber(currentValue) + released > tonumber(rate) then
		currentValue = tonumber(rate) - redis.call('zcard', permitsName);
	else
		currentValue = tonumber(currentValue) + released;
	end
	redis.call('set', valueName, currentValue);
end

released > 0 说明 firstValue有值,进入for循环后,给released进行了赋值。

例如:第一次请求的时间是8:00:00,请求的令牌数量是1个,那么zset中就会记录第一次请求的时间和请求的令牌数量;第二次的请求时间是8:01:01,interval是60秒,第二次请求的时间减去interval 就是8:00:01,获取的zset数据范围是 0 ~ 8:00:01,因为第一次请求是8:00:00,所以能获取到数据,expiredValues中记录了请求的令牌数以及时间,就是已过期的数据。

进入for循环,permits就是需要释放的令牌数,因为第一次请求的令牌数是1,所以permits = 1,released = released + permits,released = 0 + 1 = 1,released > 0,进入 if 分支。

使用 zremrangebyscore 移除过期的数据,移除的就是expiredValues,即数据范围是0 ~ 8:00:01的数据。下面又做了一层校验,如果 当前令牌数 + 释放的令牌数 > rate,则 当前令牌数 = rate - zset中的元素个数,保证总数不能超过设置的rate;否则,进入else分支,将释放的令牌数加到当前令牌数中,对currentValue进行赋值,更新当前令牌数的值。


完结散花!这一套组合拳分析下来,你学废了吗?

如果你有任何疑问或经验分享,可以在评论区留言哦~~

不管在任何时候,我希望你永远不要害怕挑战,不要畏惧失败。每一个错误都是向成功迈出的一步,每一个挑战都是成长的机会,因为每一次的努力,都会使我们离梦想更近一点。只要你行动起来,任何时候都不算晚。最后,把座右铭送给大家:种一棵树最好的时间是10年前,其次就是现在,加油!共勉 💪。

网站公告

今日签到

点亮在社区的每一天
去签到