waitGroup使用场景有限制,只要记住核心的4个要点就行:
1:Done操作本质上是一个Add操作,只不过是add负值
2:禁止add操作后使得counter值小于0
3:禁止任何Add操作与Wait操作并发。就是说当done操作-1时,检测到v=0要开始唤醒waiter前会检测是否有其他人修改了state,如果有就抛异常,假设此时正好有其他goroute并发调用了wait,那么state就会被修改,而waitgroup是禁止这种使用方式的
4:禁止在wait完成前reuse。这个reuse错误实际是两个Add之间的操作并发问题,就是说当一个Done操作扣完1后,检测到v=0要开始唤醒waiter前会检测是否有其他人修改了state,如果有就抛异常,假设此时正好有其他goroute并发调用了add,那么state就会被修改,而waitgroup是禁止这种使用方式的
其中第三条和第四条总结下来就是:总的就是说在最后一个goroute把v扣减到0之后准备执行唤醒之前,并且所有waiter都被唤醒并返回之前,禁止任何对state的修改,不管是add还是wait
下面是add源码中的注释的翻译:
// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
// Add 会将 delta(可以为负数)添加到 WaitGroup 的计数器中。
// 如果计数器变为零,所有阻塞在 Wait 上的 goroutine 将被释放。
// 如果计数器变为负数,Add 会引发 panic。
//
// 注意,当计数器为零时,带正 delta 的调用必须发生在 Wait 之前。
// 带负 delta 的调用或计数器大于零时带正 delta 的调用可以在任何时间发生。
// 通常,这意味着对 Add 的调用应在创建 goroutine 或其他等待事件的语句之前执行。
// 如果一个 WaitGroup 被重复用于等待几个独立的事件集,那么新的 Add 调用必须在所有先前的 Wait 调用返回之后进行
waitgroup源码注解
package tmp
import (
"internal/race"
"sync/atomic"
"unsafe"
)
type WaitGroup struct {
noCopy noCopy
//高32位:counter,低32位:waiter
state atomic.Uint64
//信号量
sema uint32
}
// Add 会将 delta(可以为负数)添加到 WaitGroup 的计数器中。
// 如果计数器变为零,所有阻塞在 Wait 上的 goroutine 将被释放。
// 如果计数器变为负数,Add 会引发 panic。
//
// 注意,当计数器为零时,带正 delta 的调用必须发生在 Wait 之前。
// 带负 delta 的调用或计数器大于零时带正 delta 的调用可以在任何时间发生。
// 通常,这意味着对 Add 的调用应在创建 goroutine 或其他等待事件的语句之前执行。
// 如果一个 WaitGroup 被重复用于等待几个独立的事件集,
// 那么新的 Add 调用必须在所有先前的 Wait 调用返回之后进行
func (wg *WaitGroup) Add(delta int) {
if race.Enabled {
if delta < 0 {
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
//修改state
//!!!切记,这是add原子操作,不是cas操作
//!!!也就是说,会存在多个add操作并发修改state,就有可能造成冲突
//!!!但是没关系,后面v<0、v>0||w==0会过滤掉很多场景
state := wg.state.Add(uint64(delta) << 32)
//获取修改后的counter
v := int32(state >> 32)
//获取此时的waiter数量
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
// several concurrent wg.counter transitions from 0.
race.Read(unsafe.Pointer(&wg.sema))
}
//waitgroup禁止任何时候v小于0,如果有则报错
//比如v==1,a执行done后变成了0,如果又有人又执行done,那么v就会小于0,而导致报错
//所以说done操作次数不能超过add值
if v < 0 {
panic("sync: negative WaitGroup counter")
}
//waitgroup是禁止add操作和wait操作并发进行的
//这里如果add以后v==delta,说明add前v==0
//而waiter数不为0,则说明waiter在wait前v肯定是大于0的
//add前v==0则说明肯定是有人执行了add负值比如done操作来扣减的
//扣减操作扣完后会检测到v==0,此时就会唤醒waiter
//waiter唤醒后就会修改state,waitegroup是禁止add和wait操作并发进行的
//所以这里就直接抛异常
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
//如果add完后v>0则表明现在还不能唤醒waiter,所以返回
//如果w==0则表明waiter数为0,此时无需唤醒waiter
//举个例子:a线程add(x),然后for循环开一堆goroute 去done即add(-1),
//然后a执行wait操作等待state的v扣减为0
//当最后一个goroute b执行done操作后就会让state的v==0
//当add操作扣减完v后发现v==0后就会去唤醒waiter
//因为a线程是先执行for开一堆goroute,然后再执行wait
//所以最后一个goroute b的done操作和a线程的wait操作是有可能并发的,
//存在谁先谁后执行的问题,但都是安全的
//1:假设a的wait操作先运行,那么a的wait操作就会检测到v>0,然后修改state后阻塞
//那么最后一个goroute b在执行state := wg.state.Add(uint64(delta) << 32)修改记数后
//a的wait操作不会修改记数,因为a的wait操作已经在此之前执行完了,
//所以add操作执行if wg.state.Load() != state时就不会检测到冲突
//2:a的wait操作先执行,然后a wait先检测到v==1,所以a会修改state即waiter+1
//假设在a检测到1后,但是修改state=state+1前,最后一个goroute b执行了
//然后最后一个goroute b执行state := wg.state.Add(uint64(delta) << 32)修改记数后
//a再通过cas执行state=state+1,那么a就会检测到冲突,就会重试,
//a在下一轮检测中就会发现v==0,就会直接返回,就不会去修改state了
//对于b来说,b执行state := wg.state.Add(uint64(delta) << 32)修改记数后
//虽然v==0了,但是,切记,我们这里是waiter是从0变到1,所以这里的w==0就会检测到,
//所以b就会直接返回,不会去执行唤醒操作,
//所以说:a先add然后开一堆goroute去done,然后a再wait,这是一个安全的操作,不会有任何问题
//v>0这个条件,使得多个add(正值)是可以并发的,但是同时add正值和add负值,那么就可能回pannic
//举个例子:此时v==1,a执行add(-1)那么就会跳过这个检测,就会执行唤醒wait操作,
//假设b执行add(-1),那么v==-1就会小于0,在上面的检测中就会检测到而panice
//如果b执行add(1),那么v===1,那么 a在执行if wg.state.Load() != state 时就会检测到冲突
//就会panic
if v > 0 || w == 0 {
return
}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
//waitgroup禁止未清零之前reuse,即禁止此时的add并发修改
//举个例子:v==1,然后a 执行done操作-1,发现v==0就要执行唤醒操作
//如果此时有另一个goroute执行了add操作,那么这里就会抛异常
//waitgroup禁止wait和add操作并发修改
//举个例子:在执行到这里时,a要执行唤醒操作了,突然又有b goroute执行wait操作
//那么这里就会检测到并发修改,抛异常
//总的就是说在最后一个goroute把v扣减到0之后准备执行唤醒之前,并且所有waiter都被唤醒并返回之前,
//禁止任何对state的修改,不管是add还是wait,
//这里是禁止把v扣减到0之后准备执行唤醒之前进行任何并发修改
//wait里也有一个if wg.state.Load() != 0 会禁止所有waiter都被唤醒并返回之前的任何并发修改
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
//清零waiter,因为我们会一次性唤醒w个waiter
//注意,这里还不算完全清零,得等所有waiter都被唤醒后并返回后才算清零
wg.state.Store(0)
//唤醒w个waiter
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}
// done操作本质就是一个add-1操作
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
// 执行wait,就一个while cas操作
func (wg *WaitGroup) Wait() {
if race.Enabled {
race.Disable()
}
for {
//读取state
state := wg.state.Load()
v := int32(state >> 32)
w := uint32(state)
//如果v==0,就直接返回
if v == 0 {
// Counter is 0, no need to wait.
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
//!!!cas操作修改state,而不是通过原子add,因为这个操作会冲突
//!!!add操作里是原子add操作,那个有些场景需要检测并发修改,
//但是有些场景的并发修改会通过if过滤掉许多场景,比如v<0、v>0、w==0
//最后要检测并发修改时,会通过if wg.state.Load() != state来检测并发修改
if wg.state.CompareAndSwap(state, state+1) {
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(&wg.sema))
}
//执行阻塞
runtime_Semacquire(&wg.sema)
//!!!只有所有wait都被唤醒并执行完这个if检测后才算做真正清零,
//!!!在完成这个if检测后才能reuse这个waitergroup
//!!!否则会报错,因为此时任何add wait操作都会让state不为0
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}