【Go底层】singleflight包原理

发布于:2025-02-10 ⋅ 阅读:(104) ⋅ 点赞:(0)

1、背景

在处理同一时刻接口的并发请求时,常见的有这几种情况:一个请求正在执行,相同的其它请求等待顺序执行,使用互斥锁就能完成、一个请求正在执行,相同的其它请求都丢弃、一个请求正在执行,相同的其它请求等待拿取相同的结果。使用singleflight包就能达到一个请求正在执行,相同的其它请求过来等待第一个请求执行完,然后共享第一个请求的结果,在处理并发场景时非常好用。

2、下载

go get -u golang.org/x/sync/singleflight

3、原理解释

singleflight底层结构:

type Group struct {
	mu sync.Mutex       //保护map对象m的并发安全
	m  map[string]*call //key-请求的唯一标识,val-请求唯一标识对应要执行的函数
}

call底层结构:

type call struct {
	wg sync.WaitGroup //用来阻塞相同标识对应的请求中的第一个请求之外的请求
	val interface{} //第一个请求的执行结果
	err error //第一个请求返回的错误
	dups  int //第一个请求之外的其它请求数
	chans []chan<- Result //请求结果写入通道
}

关键函数:

//
// Do
//  @Description: 一个唯一标识对应的请求在执行过程中,相同唯一标识对应的请求会被阻塞,等待第一个请求执行完并共享结果
//  @receiver g 
//  @param key 请求唯一标识
//  @param fn 请求要执行的函数
//  @return v 请求要执行函数返回的结果
//  @return err 请求要执行的函数返回的错误
//  @return shared 是否有多个请求共享结果
//
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	g.mu.Lock() //保护map对象m的并发安全
	if g.m == nil {
		g.m = make(map[string]*call) //初始化m对象
	}
	if c, ok := g.m[key]; ok { //map中key存在说明这个key对应的请求正在执行中,这次请求不是第一个请求
		c.dups++ //等待共享结果数+1
		g.mu.Unlock()
		c.wg.Wait() //阻塞等待第一个请求执行完

		if e, ok := c.err.(*panicError); ok {
			panic(e)
		} else if c.err == errGoexit {
			runtime.Goexit()
		}
		return c.val, c.err, true //返回第一个请求的执行结果
	}
    
    //第一个请求的处理逻辑
	c := new(call)
	c.wg.Add(1) //计数+1
	g.m[key] = c //唯一标识关联对应的函数对象
	g.mu.Unlock()

	g.doCall(c, key, fn) //执行请求对应的函数
	return c.val, c.err, c.dups > 0 //返回执行结果
}

上面Do函数中g.doCall函数也需要大概理解一下,就是会将key对应函数的执行结果写的call对象c里,然后清空wg计数,相同key对应的其它请求就会跳出c.wg.Wait()阻塞,直接从call对象c中读取第一个请求的执行结果和错误信息并返回,源码如下:

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	normalReturn := false
	recovered := false
	
	defer func() { //fn函数执行完之后执行
		if !normalReturn && !recovered {
			c.err = errGoexit
		}

		g.mu.Lock()
		defer g.mu.Unlock()
		c.wg.Done() //释放计数
		if g.m[key] == c {
			delete(g.m, key) //删除此次请求的唯一标识相关信息
		}

		if e, ok := c.err.(*panicError); ok {
			if len(c.chans) > 0 {
				go panic(e)
				select {} 
			} else {
				panic(e)
			}
		} else if c.err == errGoexit {
			// Already in the process of goexit, no need to call again
		} else {
			for _, ch := range c.chans { //执行结果写入通道
				ch <- Result{c.val, c.err, c.dups > 0}
			}
		}
	}()

	func() {
		defer func() {
			if !normalReturn {
				if r := recover(); r != nil {
					c.err = newPanicError(r)
				}
			}
		}()

		c.val, c.err = fn() //执行fn函数
		normalReturn = true
	}()

	if !normalReturn {
		recovered = true
	}
}

singleflight中还提供了与Do函数功能相同的函数DoChan函数,唯一区别就是将请求对应的函数执行结果放到通道中进行返回,这两函数一个用于同步场景,一个用于异步场景。还有一个Forget函数:

func (g *Group) Forget(key string) {
	g.mu.Lock()
	delete(g.m, key) //删除map中的key,相同key对应请求进来会重新执行,不等待第一个key对应请求的执行结果
	g.mu.Unlock()
}

4、代码示例

示例如下:

func main() {
	var singleFlight singleflight.Group //初始化一个单次执行对象
	var count uint64                    //用于测试是否被修改

	//为了并发执行,这里测试唯一标识都为xxx
	go func() {
		val1, _, shared1 := singleFlight.Do("xxx", func() (interface{}, error) {
			logger.Info("first count +1")

			atomic.AddUint64(&count, 1) //第一次执行,将count+1

			time.Sleep(5 * time.Second) //增加第一次执行时间

			return count, nil
		})

		//打印第一次执行结果
		logger.Info("first count info", zap.Any("val1", val1), zap.Bool("shared1", shared1), zap.Uint64("count", count))
	}()

	time.Sleep(2 * time.Second) //为了防止下面的Do函数先执行

	val2, _, shared2 := singleFlight.Do("xxx", func() (interface{}, error) {
		logger.Info("second count +1")

		atomic.AddUint64(&count, 1) //第2次执行count+1

		return count, nil
	})

	//打印第二次执行结果
	logger.Info("second count info", zap.Any("val2", val2), zap.Bool("shared2", shared2), zap.Uint64("count", count))
}

控制台输出:

$ go run ./singlefight_demo/main.go
[2025-01-09 17:08:11.169] | INFO  | Goroutine:6  | [singlefight_demo/main.go:19] | first count +1
[2025-01-09 17:08:16.261] | INFO  | Goroutine:6  | [singlefight_demo/main.go:28] | first count info | {"val1": 1, "shared1": true, "count": 1}
[2025-01-09 17:08:16.261] | INFO  | Goroutine:1  | [singlefight_demo/main.go:41] | second count info | {"val2": 1, "shared2": true, "count": 1}

5、总结

看singleflight原码之后,要实现一个请求正在执行,相同的其它请求进来时直接报错的功能也很简单,将singleflight中等待第一个请求的逻辑改为直接返回错误就可以。


网站公告

今日签到

点亮在社区的每一天
去签到