go并发原语源码系列(二)sync.WaitGroup

发布于:2025-02-10 ⋅ 阅读:(56) ⋅ 点赞:(0)

waitGroup使用场景有限制,只要记住核心的4个要点就行:

1:Done操作本质上是一个Add操作,只不过是add负值

2:禁止add操作后使得counter值小于0

3:禁止任何Add操作与Wait操作并发。就是说当done操作-1时,检测到v=0要开始唤醒waiter前会检测是否有其他人修改了state,如果有就抛异常,假设此时正好有其他goroute并发调用了wait,那么state就会被修改,而waitgroup是禁止这种使用方式的

4:禁止在wait完成前reuse。这个reuse错误实际是两个Add之间的操作并发问题,就是说当一个Done操作扣完1后,检测到v=0要开始唤醒waiter前会检测是否有其他人修改了state,如果有就抛异常,假设此时正好有其他goroute并发调用了add,那么state就会被修改,而waitgroup是禁止这种使用方式的

其中第三条和第四条总结下来就是:总的就是说在最后一个goroute把v扣减到0之后准备执行唤醒之前,并且所有waiter都被唤醒并返回之前,禁止任何对state的修改,不管是add还是wait

下面是add源码中的注释的翻译:

// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
// Add 会将 delta(可以为负数)添加到 WaitGroup 的计数器中。
// 如果计数器变为零,所有阻塞在 Wait 上的 goroutine 将被释放。
// 如果计数器变为负数,Add 会引发 panic。
//
// 注意,当计数器为零时,带正 delta 的调用必须发生在 Wait 之前。
// 带负 delta 的调用或计数器大于零时带正 delta 的调用可以在任何时间发生。
// 通常,这意味着对 Add 的调用应在创建 goroutine 或其他等待事件的语句之前执行。
// 如果一个 WaitGroup 被重复用于等待几个独立的事件集,那么新的 Add 调用必须在所有先前的 Wait 调用返回之后进行

waitgroup源码注解

package tmp

import (
	"internal/race"
	"sync/atomic"
	"unsafe"
)

type WaitGroup struct {
	noCopy noCopy
	//高32位:counter,低32位:waiter
	state atomic.Uint64
	//信号量
	sema uint32
}

// Add 会将 delta(可以为负数)添加到 WaitGroup 的计数器中。
// 如果计数器变为零,所有阻塞在 Wait 上的 goroutine 将被释放。
// 如果计数器变为负数,Add 会引发 panic。
//
// 注意,当计数器为零时,带正 delta 的调用必须发生在 Wait 之前。
// 带负 delta 的调用或计数器大于零时带正 delta 的调用可以在任何时间发生。
// 通常,这意味着对 Add 的调用应在创建 goroutine 或其他等待事件的语句之前执行。
// 如果一个 WaitGroup 被重复用于等待几个独立的事件集,
// 那么新的 Add 调用必须在所有先前的 Wait 调用返回之后进行
func (wg *WaitGroup) Add(delta int) {
	if race.Enabled {
		if delta < 0 {
			// Synchronize decrements with Wait.
			race.ReleaseMerge(unsafe.Pointer(wg))
		}
		race.Disable()
		defer race.Enable()
	}
	//修改state
	//!!!切记,这是add原子操作,不是cas操作
	//!!!也就是说,会存在多个add操作并发修改state,就有可能造成冲突
	//!!!但是没关系,后面v<0、v>0||w==0会过滤掉很多场景
	state := wg.state.Add(uint64(delta) << 32)
	//获取修改后的counter
	v := int32(state >> 32)
	//获取此时的waiter数量
	w := uint32(state)
	if race.Enabled && delta > 0 && v == int32(delta) {
		// The first increment must be synchronized with Wait.
		// Need to model this as a read, because there can be
		// several concurrent wg.counter transitions from 0.
		race.Read(unsafe.Pointer(&wg.sema))
	}
	//waitgroup禁止任何时候v小于0,如果有则报错
	//比如v==1,a执行done后变成了0,如果又有人又执行done,那么v就会小于0,而导致报错
	//所以说done操作次数不能超过add值
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
	//waitgroup是禁止add操作和wait操作并发进行的
	//这里如果add以后v==delta,说明add前v==0
	//而waiter数不为0,则说明waiter在wait前v肯定是大于0的
	//add前v==0则说明肯定是有人执行了add负值比如done操作来扣减的
	//扣减操作扣完后会检测到v==0,此时就会唤醒waiter
	//waiter唤醒后就会修改state,waitegroup是禁止add和wait操作并发进行的
	//所以这里就直接抛异常
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	//如果add完后v>0则表明现在还不能唤醒waiter,所以返回
	//如果w==0则表明waiter数为0,此时无需唤醒waiter
	//举个例子:a线程add(x),然后for循环开一堆goroute 去done即add(-1),
	//然后a执行wait操作等待state的v扣减为0
	//当最后一个goroute b执行done操作后就会让state的v==0
	//当add操作扣减完v后发现v==0后就会去唤醒waiter
	//因为a线程是先执行for开一堆goroute,然后再执行wait
	//所以最后一个goroute b的done操作和a线程的wait操作是有可能并发的,
	//存在谁先谁后执行的问题,但都是安全的
	//1:假设a的wait操作先运行,那么a的wait操作就会检测到v>0,然后修改state后阻塞
	//那么最后一个goroute b在执行state := wg.state.Add(uint64(delta) << 32)修改记数后
	//a的wait操作不会修改记数,因为a的wait操作已经在此之前执行完了,
	//所以add操作执行if wg.state.Load() != state时就不会检测到冲突
	//2:a的wait操作先执行,然后a wait先检测到v==1,所以a会修改state即waiter+1
	//假设在a检测到1后,但是修改state=state+1前,最后一个goroute b执行了
	//然后最后一个goroute b执行state := wg.state.Add(uint64(delta) << 32)修改记数后
	//a再通过cas执行state=state+1,那么a就会检测到冲突,就会重试,
	//a在下一轮检测中就会发现v==0,就会直接返回,就不会去修改state了
	//对于b来说,b执行state := wg.state.Add(uint64(delta) << 32)修改记数后
	//虽然v==0了,但是,切记,我们这里是waiter是从0变到1,所以这里的w==0就会检测到,
	//所以b就会直接返回,不会去执行唤醒操作,
	//所以说:a先add然后开一堆goroute去done,然后a再wait,这是一个安全的操作,不会有任何问题
	//v>0这个条件,使得多个add(正值)是可以并发的,但是同时add正值和add负值,那么就可能回pannic
	//举个例子:此时v==1,a执行add(-1)那么就会跳过这个检测,就会执行唤醒wait操作,
	//假设b执行add(-1),那么v==-1就会小于0,在上面的检测中就会检测到而panice
	//如果b执行add(1),那么v===1,那么 a在执行if wg.state.Load() != state 时就会检测到冲突
	//就会panic
	if v > 0 || w == 0 {
		return
	}
	// This goroutine has set counter to 0 when waiters > 0.
	// Now there can't be concurrent mutations of state:
	// - Adds must not happen concurrently with Wait,
	// - Wait does not increment waiters if it sees counter == 0.
	// Still do a cheap sanity check to detect WaitGroup misuse.
	//waitgroup禁止未清零之前reuse,即禁止此时的add并发修改
	//举个例子:v==1,然后a 执行done操作-1,发现v==0就要执行唤醒操作
	//如果此时有另一个goroute执行了add操作,那么这里就会抛异常
	//waitgroup禁止wait和add操作并发修改
	//举个例子:在执行到这里时,a要执行唤醒操作了,突然又有b goroute执行wait操作
	//那么这里就会检测到并发修改,抛异常
	//总的就是说在最后一个goroute把v扣减到0之后准备执行唤醒之前,并且所有waiter都被唤醒并返回之前,
	//禁止任何对state的修改,不管是add还是wait,
	//这里是禁止把v扣减到0之后准备执行唤醒之前进行任何并发修改
	//wait里也有一个if wg.state.Load() != 0 会禁止所有waiter都被唤醒并返回之前的任何并发修改
	if wg.state.Load() != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// Reset waiters count to 0.
	//清零waiter,因为我们会一次性唤醒w个waiter
	//注意,这里还不算完全清零,得等所有waiter都被唤醒后并返回后才算清零
	wg.state.Store(0)
	//唤醒w个waiter
	for ; w != 0; w-- {
		runtime_Semrelease(&wg.sema, false, 0)
	}
}

// done操作本质就是一个add-1操作
func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

// 执行wait,就一个while cas操作
func (wg *WaitGroup) Wait() {
	if race.Enabled {
		race.Disable()
	}
	for {
		//读取state
		state := wg.state.Load()
		v := int32(state >> 32)
		w := uint32(state)
		//如果v==0,就直接返回
		if v == 0 {
			// Counter is 0, no need to wait.
			if race.Enabled {
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}
			return
		}
		//!!!cas操作修改state,而不是通过原子add,因为这个操作会冲突
		//!!!add操作里是原子add操作,那个有些场景需要检测并发修改,
		//但是有些场景的并发修改会通过if过滤掉许多场景,比如v<0、v>0、w==0
		//最后要检测并发修改时,会通过if wg.state.Load() != state来检测并发修改
		if wg.state.CompareAndSwap(state, state+1) {
			if race.Enabled && w == 0 {
				// Wait must be synchronized with the first Add.
				// Need to model this is as a write to race with the read in Add.
				// As a consequence, can do the write only for the first waiter,
				// otherwise concurrent Waits will race with each other.
				race.Write(unsafe.Pointer(&wg.sema))
			}
			//执行阻塞
			runtime_Semacquire(&wg.sema)
			//!!!只有所有wait都被唤醒并执行完这个if检测后才算做真正清零,
			//!!!在完成这个if检测后才能reuse这个waitergroup
			//!!!否则会报错,因为此时任何add wait操作都会让state不为0
			if wg.state.Load() != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			if race.Enabled {
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}
			return
		}
	}
}


网站公告

今日签到

点亮在社区的每一天
去签到