使用
sync.WaitGroup
可以用来阻塞等待一组并发任务完成
下面是如何使用sync.WaitGroup的使用
最重要的就是不能并发调用Add()
和Wait()
var wg sync.WaitGroup
for ... {
wg.Add(1) // 不能和wg.Wait()并发执行
go func() {
// 不能在启动的函数里面执行wg.Add(), 否则会panic
defer wg.Done()
// dosomething
} ()
}
wg.Wait()
下面是官方示例
package main
import (
"sync"
)
type httpPkg struct{}
func (httpPkg) Get(url string) {}
var http httpPkg
func main() {
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.example.com/",
}
for _, url := range urls {
// Increment the WaitGroup counter.
wg.Add(1)
// Launch a goroutine to fetch the URL.
go func(url string) {
// Decrement the counter when the goroutine completes.
defer wg.Done()
// Fetch the URL.
http.Get(url)
}(url)
}
// Wait for all HTTP fetches to complete.
wg.Wait()
}
源码解读
结构体
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
sema uint32
}
noCopy
标识结构体不能被复制
state
的高32位表示counter的值,低32位表示waiter的值
sema
用来阻塞和唤醒goroutine
方法
<font style="color:rgb(36, 41, 46);background-color:rgb(233, 236, 239);">func(wg *WaitGroup) Done()</font>
<font style="color:rgb(36, 41, 46);background-color:rgb(233, 236, 239);">func(wg *WaitGroup) Add(delta int)</font>
<font style="color:rgb(36, 41, 46);background-color:rgb(233, 236, 239);">func(wg *WaitGroup) Wait()</font>
Done()
// Done 将计数器(counter)值减 1
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
调用了add方法
Add()
func (wg *WaitGroup) Add(delta int) {
state := wg.state.Add(uint64(delta) << 32)
v := int32(state >> 32) // counter数量
w := uint32(state) // waiter数量
// counter < 0 -->> panic
if v < 0 {
panic("sync: negative WaitGroup counter")
}
// 并发调用Add、Wait-->> panic
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 操作成功
if v > 0 || w == 0 {
return
}
// 并发调用panic
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// v == 0 && counter != 0
// 释放等待的waiter,将state的状态置为0
wg.state.Store(0)
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}
从源码的panic来看,不能并发调用Add()
&Wait()
不能让counter < 0
Wait()
func (wg *WaitGroup) Wait() {
// CAS操作失败后
for {
state := wg.state.Load()
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// counter == 0 直接返回,没有需要等待的goroutine
return
}
//
if wg.state.CompareAndSwap(state, state+1) {
runtime_SemacquireWaitGroup(&wg.sema)
// 检验,唤醒前就已经将state置为0
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}