Go 限流控制《滑动窗口&令牌桶》:time/rate、TokenLimit、PeriodLimit

发布于:2022-12-29 ⋅ 阅读:(940) ⋅ 点赞:(0)

一、前言

流量控制基本是《微服务》和高并发系统设计的入门课,即便是在早期各种负载均衡和网络组件(如nginx、iptable、TC)都有提供基础的QPS限制能力,如今演进到微服务框架、Sentinel、Service Mesh和Serverless都已经具备完备的配置化的限流的能力已经能够满足大多数场景了,但如果我们在一些不以服务作为颗粒的方式可能就不太适用了,比如以下几个场景:

  • 调用外部三方服务存在频率限制
  • 队列消费控速
    • 设置最大消费线程数、每次拉取消息条数拉取间隔时间
    • 这样能够能精准控制队列的处理频率吗?(有些主流MQ中间件SDK有实现匀速器,如RocketMQ,但是单点限流不能多消费者分布式共享)
  • 当被超过频率限制时执行一段兜底逻辑

常用限流主要就是滑动窗口和令牌桶,本文基于golang官方包的time/rate和go-zero中的TokenLimit、PeriodLimit来介绍如何使用

二、time/rate、TokenLimit、PeriodLimit差异

限流器 存储介质 执行效率 突发流量 wait支持
time/rate 内存 支持 支持
PeriodLimit redis 一般 支持 不支持
TokenLimit redis 一般 不支持 不支持

突发流量指的是:当流量有一个小高峰,因为令牌桶(TokenLimit)当前桶中存在一定的token每秒有在生成token所以如果是一个10容量10并发的桶,第一秒能够支持20QPS,而滑动窗口(PeriodLimit)可以理解成一个没有容量的桶只能现用现生产。

wait支持指的是:阻塞程序执行等待token的生成而不是立马返回失败,比如队列限流使用场景下就比较合适,异步时间不敏感类型。

PS:go-zero提供的令牌桶当redis故障是会切换到内存令牌桶time/rate来降低影响,但是会影响限流的精准性

令牌桶基本原理:

  • 单位时间按照一定速率匀速的生产 token 放入桶内,直到达到桶容量上限。
  • 处理请求,每次尝试获取一个或多个令牌,如果拿到则处理请求,失败则拒绝请求。
    在这里插入图片描述

为了保证原子性基于redis实现分布式令牌桶,lua脚本详解:

-- 每秒生成token数量即token生成速度
local rate = tonumber(ARGV[1])
-- 桶容量
local capacity = tonumber(ARGV[2])
-- 当前时间戳
local now = tonumber(ARGV[3])
-- 当前请求token数量
local requested = tonumber(ARGV[4])
-- 需要多少秒才能填满桶
local fill_time = capacity/rate
-- 向下取整,ttl为填满时间的2倍
local ttl = math.floor(fill_time*2)
-- 当前时间桶容量
local last_tokens = tonumber(redis.call("get", KEYS[1]))
-- 如果当前桶容量为0,说明是第一次进入,则默认容量为桶的最大容量
if last_tokens == nil then
last_tokens = capacity
end
-- 上一次刷新的时间
local last_refreshed = tonumber(redis.call("get", KEYS[2]))
-- 第一次进入则设置刷新时间为0
if last_refreshed == nil then
last_refreshed = 0
end
-- 距离上次请求的时间跨度
local delta = math.max(0, now-last_refreshed)
-- 距离上次请求的时间跨度,总共能生产token的数量,如果超多最大容量则丢弃多余的token
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
-- 本次请求token数量是否足够
local allowed = filled_tokens >= requested
-- 桶剩余数量
local new_tokens = filled_tokens
-- 允许本次token申请,计算剩余数量
if allowed then
new_tokens = filled_tokens - requested
end
-- 设置剩余token数量
redis.call("setex", KEYS[1], ttl, new_tokens)
-- 设置刷新时间
redis.call("setex", KEYS[2], ttl, now)

return allowed

四、例子🌰

go-zero的限流相关组件在 core/limit 包下面

time/rate

func Test_TimeRate(t *testing.T) {

	// New tokenLimiter
	limiter := rate.NewLimiter(10, 100)
	timer := time.NewTimer(time.Second * 10)
	quit := make(chan struct{})
	defer timer.Stop()
	go func() {
		<-timer.C
		close(quit)
	}()

	var allowed, denied int32
	var wait sync.WaitGroup
	for i := 0; i < runtime.NumCPU(); i++ {
		wait.Add(1)
		go func() {
			for {
				select {
				case <-quit:
					wait.Done()
					return
				default:
					ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
					err := limiter.Wait(ctx)
					if err == nil {
						atomic.AddInt32(&allowed, 1)
					} else {
						fmt.Println(err)
						atomic.AddInt32(&denied, 1)
					}
					cancel()
				}
			}
		}()
	}

	wait.Wait()
	fmt.Printf("allowed: %d, denied: %d, qps: %d\n", allowed, denied, (allowed+denied)/10)
}

TokenLimit

func Test_TokenLimit(t *testing.T) {
	s, err := miniredis.Run()
	assert.Nil(t, err)

	//store := redis.New("localhost:6379")
	store := redis.New(s.Addr())

	const (
		burst   = 100
		rate    = 10
		seconds = 10
	)

	fmt.Println(store.Ping())
	// New tokenLimiter
	limiter := limit.NewTokenLimiter(rate, burst, store, "rate-test")
	timer := time.NewTimer(time.Second * seconds)
	quit := make(chan struct{})
	defer timer.Stop()
	go func() {
		<-timer.C
		close(quit)
	}()

	var allowed, denied int32
	var wait sync.WaitGroup
	for i := 0; i < runtime.NumCPU(); i++ {
		wait.Add(1)
		go func() {
			for {
				select {
				case <-quit:
					wait.Done()
					return
				default:
					if limiter.Allow() {
						atomic.AddInt32(&allowed, 1)
					} else {
						atomic.AddInt32(&denied, 1)
					}
				}
			}
		}()
	}

	wait.Wait()
	fmt.Printf("allowed: %d, denied: %d, qps: %d\n", allowed, denied, (allowed+denied)/seconds)
}

PeriodLimit

func Test_PeriodLimit(t *testing.T) {

	s, err := miniredis.Run()
	assert.Nil(t, err)

	//store := redis.New("localhost:6379")
	store := redis.New(s.Addr())

	const (
		seconds = 1
		total   = 100
		quota   = 5
	)
	l := limit.NewPeriodLimit(seconds, quota, store, "periodlimit")
	var allowed, hitQuota, overQuota int
	for i := 0; i < total; i++ {
		val, err := l.Take("first")
		if err != nil {
			t.Error(err)
		}
		switch val {
		case limit.Allowed:
			allowed++
		case limit.HitQuota:
			hitQuota++
		case limit.OverQuota:
			overQuota++
		default:
			t.Error("unknown status")
		}
	}

	assert.Equal(t, quota-1, allowed)
	assert.Equal(t, 1, hitQuota)
	assert.Equal(t, total-quota, overQuota)
}

网站公告

今日签到

点亮在社区的每一天
去签到