singlefligt使用方法和源码解读

发布于:2025-06-19 ⋅ 阅读:(11) ⋅ 点赞:(0)

singlefligt使用方法和源码解读

介绍

  • sync.once保证其整个生命周期内只调用一次;而singleflight则可以保证在一定范围内其只调用一次。

背景|使用场景

  • 应对缓存击穿:加锁可以解决这个问题,但是加锁不太灵活(不能控制访问频率之类的),singlefilght可以通过定时清除的方式限制频率
  • 去除重复请求:当一定时间范围内存在了大量的重复请求,可以考虑使用:一致性hash负载均衡+singlefilght收束请求。用户根据key使用一致性hash请求到特定的服务机器上,服务对请求执行singlefilght后,再去请求下游,以此收束重复请求

用法

基础用法:合并请求,合并第一个请求执行过程中到达的所有请求。

从下面的代码和输出可以看出每次的输出query...​都是基本上每10ms一次,而其中穿插了很多次打印结果,代表着在函数真正执行过程中所有的请求都是被拦截住了,当函数执行完时,会将所有的​**结果共享给这个期间到来的所有请求**。

package main

import (
	"fmt"
	"golang.org/x/sync/singleflight"
	"log"
	"math/rand"
	"time"
)

func getData(id int64) string {
	log.Printf("query...%d\n", id)
	time.Sleep(10 * time.Millisecond) // 模拟一个比较耗时的操作,10ms
	return "liwenzhou.com" + fmt.Sprintf("%d", id)
}

func main() {
	log.SetFlags(log.Lmicroseconds)
	g := new(singleflight.Group)
	var i int64 = 0
	for {
		time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
		// 调用
		go func() {
			icopy := i
			i++
			v1, _, shared := g.Do("getData", func() (interface{}, error) {
				ret := getData(icopy)
				return ret, nil
			})
			log.Printf("call: v1:%v, shared:%v , i:%d\n", v1, shared, i)
		}()

	}

}

/**
query...1
1st call: v1:liwenzhou.com1, shared:true
2nd call: v2:liwenzhou.com1, shared:true
*/

➜  test21 git:(main)go run main.go
20:11:31.995346 query...0
20:11:32.005800 call: v1:liwenzhou.com0, shared:true , i:4
20:11:32.005799 call: v1:liwenzhou.com0, shared:true , i:4
20:11:32.005804 call: v1:liwenzhou.com0, shared:true , i:4
20:11:32.005807 call: v1:liwenzhou.com0, shared:true , i:4
20:11:32.006386 query...4
20:11:32.016671 call: v1:liwenzhou.com4, shared:true , i:9
20:11:32.016687 call: v1:liwenzhou.com4, shared:true , i:9
20:11:32.016691 call: v1:liwenzhou.com4, shared:true , i:9
20:11:32.016693 call: v1:liwenzhou.com4, shared:true , i:9
20:11:32.016694 call: v1:liwenzhou.com4, shared:true , i:9
20:11:32.017366 query...9
20:11:32.027418 call: v1:liwenzhou.com9, shared:true , i:16
20:11:32.027433 call: v1:liwenzhou.com9, shared:true , i:16
20:11:32.027436 call: v1:liwenzhou.com9, shared:true , i:16
20:11:32.027437 call: v1:liwenzhou.com9, shared:true , i:16
进阶用法1:超时控制DoChan

在基础方法中,在某一次请求执行过程中,所有到来的新的请求都会阻塞等待这个请求的执行结果。如果真正执行的过程超时出错了,其他并发的请求就只能等待。

考虑到singleflight库通常用于避免缓存击穿,需要查询外部数据库,这样的出错、超时的场景是必须要考虑的。

比如正常是10ms,但是超时时间设置的是50ms。由于并发数量并不需要真正为1,因此想12ms就停止阻塞。

// 使用DoChan进行超时控制 
func CtrTimeout(ctx context.Context, req interface{}){
	ch := g.DoChan(key, func() (interface{}, error) {
	    return call(ctx, req)
	})

	select {
	case <-time.After(500 * time.Millisecond): 
		    return
	case <-ctx.Done()
				return
	case ret := <-ch: 
	    go handle(ret)
	}
}
进阶用法2:手动合并请求频率控制

在基础用法中,请求合并的频率是 一个请求的执行时间 ,希望达到自己控制合并的时间,不限制为一个请求的执行时间。

方法:另起一个协程删除对应的key​,一般是在go.Do中另起Forget​一次即可。

// 另外启用协程定时删除key,提高请求下游次数,提高成功率
func CtrRate(ctx context.Context, req interface{}){
	res, _, shared := g.Do(key, func() (interface{}, error) {
			// 另外其一个goroutine,等待一段时间后,删除key
			// 删除key后的调用,会重新执行Do
	    go func() {
	        time.Sleep(10 * time.Millisecond)
	        g.Forget(key)
	    }()

	    return call(ctx, req)
		})

	handle(res)
}

总结

singlefligt可以合并多个请求达到限频率的目的。可以使用DoChan​方法或Forget​来手动控制请求的频率和超时返回。

源码解读

singleflight的源码就一个文件,因此就放在备注里了:


// call is an in-flight or completed singleflight.Do call
type call struct {
	wg sync.WaitGroup //并发调用的的时候,一个协程执行其它协程阻塞,用于执行完之后通知其他阻塞的协程

	// These fields are written once before the WaitGroup is done
	// and are only read after the WaitGroup is done.
	val interface{} //保存执行的结果值
	err error //保存执行过程中的错误,只会写入一次

	// These fields are read and written with the singleflight
	// mutex held before the WaitGroup is done, and are read but
	// not written after the WaitGroup is done.
	dups  int //记录并发数量,用于执行后返回时候共享的结果(Shared)
	chans []chan<- Result //用于支持结果通过channel返回出去
}

// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
	mu sync.Mutex       // protects m 保证m的并发安全
	m  map[string]*call // lazily initialized //m代表任务,一个key-val代表一个任务正在执行
						// 任务执行完之后会从map里面被删除
}

// Result holds the results of Do, so they can be passed
// on a channel.
// 保存结果的结构体,这样才能支持通过channel返回结果
type Result struct { 
	Val    interface{}
	Err    error
	Shared bool //是否和其他协程共享的结果
}

// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		c.dups++
		g.mu.Unlock()
		c.wg.Wait()

		if e, ok := c.err.(*panicError); ok {
			panic(e)
		} else if c.err == errGoexit {
			runtime.Goexit()
		}
		return c.val, c.err, true
	}
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	g.doCall(c, key, fn)
	return c.val, c.err, c.dups > 0
}

// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
//
// The returned channel will not be closed.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
	ch := make(chan Result, 1)
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		c.dups++
		c.chans = append(c.chans, ch)
		g.mu.Unlock()
		return ch
	}
	c := &call{chans: []chan<- Result{ch}}
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	go g.doCall(c, key, fn)

	return ch
}

// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	normalReturn := false
	recovered := false

	// use double-defer to distinguish panic from runtime.Goexit,
	// more details see https://golang.org/cl/134395
  // 这块设计非常有意思,主要是针对两种异常退出的情况使用了双重defer来保证正确处理,具体见下面注释
	defer func() {
		// the given function invoked runtime.Goexit
		if !normalReturn && !recovered { //结合下面代码分析,一定是出现了exit
			c.err = errGoexit
		}

		g.mu.Lock()
		defer g.mu.Unlock()
		c.wg.Done() //通知其他协程可以拿结果了
		if g.m[key] == c { //删掉任务
			delete(g.m, key)
		}

		if e, ok := c.err.(*panicError); ok {
			// In order to prevent the waiting channels from being blocked forever,
			// needs to ensure that this panic cannot be recovered.
			if len(c.chans) > 0 {
				go panic(e)
				select {} // Keep this goroutine around so that it will appear in the crash dump.
			} else {
				panic(e)
			}
		} else if c.err == errGoexit {
			// Already in the process of goexit, no need to call again
		} else {
			// Normal return
			for _, ch := range c.chans {
				ch <- Result{c.val, c.err, c.dups > 0}
			}
		}
	}()

	func() {
		defer func() {
			if !normalReturn {//到这里说明:要么发生panic,要么发生exit
				// Ideally, we would wait to take a stack trace until we've determined
				// whether this is a panic or a runtime.Goexit.
				//
				// Unfortunately, the only way we can distinguish the two is to see
				// whether the recover stopped the goroutine from terminating, and by
				// the time we know that, the part of the stack trace relevant to the
				// panic has been discarded.
				if r := recover(); r != nil { //panic就赋值
					c.err = newPanicError(r)
				}
			}
		}()

		c.val, c.err = fn()
		normalReturn = true //到这里说明:没有发生panic,任务里面没有go exit
	}()
	//执行不到这里的话说明是exit
	if !normalReturn { //要么发生panic,要么发生exit    
		recovered = true  //结合分析------> 一定是panic,所以赋值
	}
}

// Forget tells the singleflight to forget about a key.  Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
	g.mu.Lock()
	delete(g.m, key)
	g.mu.Unlock()
}

双重defer来确定是exit​还是发生了panic。
这块设计非常有意思,主要是针对两种异常退出的情况使用了双重defer来保证

总结

singleflight使用map​来隔离不同的任务,map​的key存在性标识任务是否在执行(执行完会马上从map中删除)。

对于一个任务,使用call结构体进行管理,其主要用于:控制并发(waitGroup​),执行,和传递结果(支持channel​传递)。

执行中很有意思的一点是使用了双重defer来判断异常退出是panic退出还是调用了exit()退出


网站公告

今日签到

点亮在社区的每一天
去签到