singlefligt使用方法和源码解读
介绍
- sync.once保证其整个生命周期内只调用一次;而singleflight则可以保证在一定范围内其只调用一次。
背景|使用场景
- 应对缓存击穿:加锁可以解决这个问题,但是加锁不太灵活(不能控制访问频率之类的),singlefilght可以通过定时清除的方式限制频率
- 去除重复请求:当一定时间范围内存在了大量的重复请求,可以考虑使用:一致性hash负载均衡+singlefilght收束请求。用户根据key使用一致性hash请求到特定的服务机器上,服务对请求执行
singlefilght
后,再去请求下游,以此收束重复请求。
用法
基础用法:合并请求,合并第一个请求执行过程中到达的所有请求。
从下面的代码和输出可以看出每次的输出query...
都是基本上每10ms一次,而其中穿插了很多次打印结果,代表着在函数真正执行过程中所有的请求都是被拦截住了,当函数执行完时,会将所有的**结果共享给这个期间到来的所有请求**。
package main
import (
"fmt"
"golang.org/x/sync/singleflight"
"log"
"math/rand"
"time"
)
func getData(id int64) string {
log.Printf("query...%d\n", id)
time.Sleep(10 * time.Millisecond) // 模拟一个比较耗时的操作,10ms
return "liwenzhou.com" + fmt.Sprintf("%d", id)
}
func main() {
log.SetFlags(log.Lmicroseconds)
g := new(singleflight.Group)
var i int64 = 0
for {
time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
// 调用
go func() {
icopy := i
i++
v1, _, shared := g.Do("getData", func() (interface{}, error) {
ret := getData(icopy)
return ret, nil
})
log.Printf("call: v1:%v, shared:%v , i:%d\n", v1, shared, i)
}()
}
}
/**
query...1
1st call: v1:liwenzhou.com1, shared:true
2nd call: v2:liwenzhou.com1, shared:true
*/
➜ test21 git:(main) ✗ go run main.go
20:11:31.995346 query...0
20:11:32.005800 call: v1:liwenzhou.com0, shared:true , i:4
20:11:32.005799 call: v1:liwenzhou.com0, shared:true , i:4
20:11:32.005804 call: v1:liwenzhou.com0, shared:true , i:4
20:11:32.005807 call: v1:liwenzhou.com0, shared:true , i:4
20:11:32.006386 query...4
20:11:32.016671 call: v1:liwenzhou.com4, shared:true , i:9
20:11:32.016687 call: v1:liwenzhou.com4, shared:true , i:9
20:11:32.016691 call: v1:liwenzhou.com4, shared:true , i:9
20:11:32.016693 call: v1:liwenzhou.com4, shared:true , i:9
20:11:32.016694 call: v1:liwenzhou.com4, shared:true , i:9
20:11:32.017366 query...9
20:11:32.027418 call: v1:liwenzhou.com9, shared:true , i:16
20:11:32.027433 call: v1:liwenzhou.com9, shared:true , i:16
20:11:32.027436 call: v1:liwenzhou.com9, shared:true , i:16
20:11:32.027437 call: v1:liwenzhou.com9, shared:true , i:16
进阶用法1:超时控制DoChan
在基础方法中,在某一次请求执行过程中,所有到来的新的请求都会阻塞等待这个请求的执行结果。如果真正执行的过程超时出错了,其他并发的请求就只能等待。
考虑到singleflight库通常用于避免缓存击穿,需要查询外部数据库,这样的出错、超时的场景是必须要考虑的。
比如正常是10ms,但是超时时间设置的是50ms。由于并发数量并不需要真正为1,因此想12ms就停止阻塞。
// 使用DoChan进行超时控制
func CtrTimeout(ctx context.Context, req interface{}){
ch := g.DoChan(key, func() (interface{}, error) {
return call(ctx, req)
})
select {
case <-time.After(500 * time.Millisecond):
return
case <-ctx.Done()
return
case ret := <-ch:
go handle(ret)
}
}
进阶用法2:手动合并请求频率控制
在基础用法中,请求合并的频率是 一个请求的执行时间 ,希望达到自己控制合并的时间,不限制为一个请求的执行时间。
方法:另起一个协程删除对应的key
,一般是在go.Do中另起Forget
一次即可。
// 另外启用协程定时删除key,提高请求下游次数,提高成功率
func CtrRate(ctx context.Context, req interface{}){
res, _, shared := g.Do(key, func() (interface{}, error) {
// 另外其一个goroutine,等待一段时间后,删除key
// 删除key后的调用,会重新执行Do
go func() {
time.Sleep(10 * time.Millisecond)
g.Forget(key)
}()
return call(ctx, req)
})
handle(res)
}
总结
singlefligt可以合并多个请求达到限频率的目的。可以使用DoChan
方法或Forget
来手动控制请求的频率和超时返回。
源码解读
singleflight的源码就一个文件,因此就放在备注里了:
// call is an in-flight or completed singleflight.Do call
type call struct {
wg sync.WaitGroup //并发调用的的时候,一个协程执行其它协程阻塞,用于执行完之后通知其他阻塞的协程
// These fields are written once before the WaitGroup is done
// and are only read after the WaitGroup is done.
val interface{} //保存执行的结果值
err error //保存执行过程中的错误,只会写入一次
// These fields are read and written with the singleflight
// mutex held before the WaitGroup is done, and are read but
// not written after the WaitGroup is done.
dups int //记录并发数量,用于执行后返回时候共享的结果(Shared)
chans []chan<- Result //用于支持结果通过channel返回出去
}
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m 保证m的并发安全
m map[string]*call // lazily initialized //m代表任务,一个key-val代表一个任务正在执行
// 任务执行完之后会从map里面被删除
}
// Result holds the results of Do, so they can be passed
// on a channel.
// 保存结果的结构体,这样才能支持通过channel返回结果
type Result struct {
Val interface{}
Err error
Shared bool //是否和其他协程共享的结果
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
//
// The returned channel will not be closed.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
// use double-defer to distinguish panic from runtime.Goexit,
// more details see https://golang.org/cl/134395
// 这块设计非常有意思,主要是针对两种异常退出的情况使用了双重defer来保证正确处理,具体见下面注释
defer func() {
// the given function invoked runtime.Goexit
if !normalReturn && !recovered { //结合下面代码分析,一定是出现了exit
c.err = errGoexit
}
g.mu.Lock()
defer g.mu.Unlock()
c.wg.Done() //通知其他协程可以拿结果了
if g.m[key] == c { //删掉任务
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
// In order to prevent the waiting channels from being blocked forever,
// needs to ensure that this panic cannot be recovered.
if len(c.chans) > 0 {
go panic(e)
select {} // Keep this goroutine around so that it will appear in the crash dump.
} else {
panic(e)
}
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
} else {
// Normal return
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
defer func() {
if !normalReturn {//到这里说明:要么发生panic,要么发生exit
// Ideally, we would wait to take a stack trace until we've determined
// whether this is a panic or a runtime.Goexit.
//
// Unfortunately, the only way we can distinguish the two is to see
// whether the recover stopped the goroutine from terminating, and by
// the time we know that, the part of the stack trace relevant to the
// panic has been discarded.
if r := recover(); r != nil { //panic就赋值
c.err = newPanicError(r)
}
}
}()
c.val, c.err = fn()
normalReturn = true //到这里说明:没有发生panic,任务里面没有go exit
}()
//执行不到这里的话说明是exit
if !normalReturn { //要么发生panic,要么发生exit
recovered = true //结合分析------> 一定是panic,所以赋值
}
}
// Forget tells the singleflight to forget about a key. Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}
双重defer来确定是exit
还是发生了panic。
这块设计非常有意思,主要是针对两种异常退出的情况使用了双重defer来保证
总结
singleflight使用map
来隔离不同的任务,map
的key存在性标识任务是否在执行(执行完会马上从map中删除)。
对于一个任务,使用call结构体进行管理,其主要用于:控制并发(waitGroup
),执行,和传递结果(支持channel
传递)。
执行中很有意思的一点是使用了双重defer来判断异常退出是panic
退出还是调用了exit()
退出。