WaitGroup

sync.WaitGroup 可以达到并发 Goroutine 的执行屏障的效果,等待多个 Goroutine 执行完毕。

sync.WaitGroup 结构体中只包含两个成员变量:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
// WaitGroup 用于等待一组 Goroutine 执行完毕。
// 主 Goroutine 调用 Add 来设置需要等待的 Goroutine 的数量
// 然后每个 Goroutine 运行并调用 Done 来确认已经执行网完毕
// 同时,Wait 可以用于阻塞并等待所有 Goroutine 完成。
//
// WaitGroup 在第一次使用后不能被复制
type WaitGroup struct {
	// 避免复制使用的一个技巧,可以告诉vet工具违反了复制使用的规则
	noCopy noCopy

	// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
	// 64-bit atomic operations require 64-bit alignment, but 32-bit
	// compilers do not ensure it. So we allocate 12 bytes and then use
	// the aligned 8 bytes in them as state, and the other 4 as storage
	// for the sema.
	// 64 位值: 高 32 位用于计数,低 32 位用于等待计数
	// 64 位的原子操作要求 64 位对齐,但 32 位编译器无法保证这个要求
	// 因此分配 12 字节然后将他们对齐,其中 8 字节作为状态,其他 4 字节用于存储原语
	// 64bit(8bytes)的值分成两段,高32bit是计数值,低32bit是waiter的计数
	// 另外32bit是用作信号量的
	// 因为64bit值的原子操作需要64bit对齐,但是32bit编译器不支持,所以数组中的元素在不同的架构中不一样,具体处理看下面的方法
	// 总之,会找到对齐的那64bit作为state,其余的32bit做信号量

	state1 [3]uint32
}

可以看到,WaitGroup 内部仅仅只是一个 uint32 类型的数组。由于需要考虑 32 位机器的兼容性, 这里采用了 uint32 结构的数组,保证在不同类型的机器上都是 12 个字节。

  • noCopy 的辅助字段,主要就是辅助 vet 工具检查是否通过 copy 赋值这个 WaitGroup 实例
  • state1,一个具有复合意义的字段,包含 WaitGroup 的计数、阻塞在检查点的 waiter 数和信号量。

通过 state() 函数来确定实际的存储情况:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 得到state的地址和信号量的地址
// state returns pointers to the state and sema fields stored within wg.state1.
// state 返回 wg.state1 中存储的状态和原语字段
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
	if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
		// 如果地址是64bit对齐的,数组前两个元素做state,后一个元素做信号量
		return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
	} else {
		// 如果地址是32bit对齐的,数组后两个元素用来做state,它可以用来做64bit的原子操作
		return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
	}
}
  • 在 64 位机器上 state1[0] 和 state1[1] 分别用于等待计数和计数,而最后一个 state1[2] 用于存储原语。
  • 在 32 位机器上 state1[0] 作为存储原语,而 state[1] 和 state[2] 用于等待计数和计数

因为对 64 位整数的原子操作要求整数的地址是 64 位对齐的,所以针对 64 位和 32 位环境的 state 字段的组成是不一样的。

在 64 位环境下,state1 的第一个元素是 waiter 数,第二个元素是 WaitGroup 的计数值,第三个元素是信号量。

在 32 位环境下,如果 state1 不是 64 位对齐的地址,那么 state1 的第一个元素是信号量,后两个元素分别是 waiter 数和计数值。

Add&Done

Add 方法主要操作的是 state 的计数部分。你可以为计数值增加一个 delta 值,内部通过原子操作把这个值加到计数值上。需要注意的是,这个 delta 也可以是个负数,相当于为计数值减去一个值,Done 方法内部其实就是通过 Add(-1) 实现的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88

// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
// Add 将 delta(可能为负)加到 WaitGroup 的计数器上
// 如果计数器归零,则所有阻塞在 Wait 的 Goroutine 被释放
// 如果计数器为负,则 panic
//
// 请注意,当计数器为 0 时发生的带有正的 delta 的调用必须在 Wait 之前。
// 当计数器大于 0 时,带有负 delta 的调用或带有正 delta 调用可能在任何时候发生。
// 通常,这意味着 Add 调用必须发生在 Goroutine 创建之前或其他被等待事件之前。
// 如果一个 WaitGroup 被复用于等待几个不同的独立事件集合,必须在前一个 Wait 调用返回后才能调用 Add。
func (wg *WaitGroup) Add(delta int) {
	// 首先获取状态指针和存储指针
	statep, semap := wg.state()
	if race.Enabled {
		_ = *statep // trigger nil deref early
		if delta < 0 {
			// Synchronize decrements with Wait.
			race.ReleaseMerge(unsafe.Pointer(wg))
		}
		race.Disable()
		defer race.Enable()
	}
	// 高32bit是计数值v,所以把delta左移32,增加到计数上
	// 将 delta 加到 statep 的前 32 位上,即加到计数器上
	state := atomic.AddUint64(statep, uint64(delta)<<32)
	// 计数器的值
	v := int32(state >> 32)// 当前计数值
	// 等待器的值
	w := uint32(state)
	if race.Enabled && delta > 0 && v == int32(delta) {
		// The first increment must be synchronized with Wait.
		// Need to model this as a read, because there can be
		// several concurrent wg.counter transitions from 0.
		race.Read(unsafe.Pointer(semap))
	}
	// 如果实际计数为负则直接 panic,因此是不允许计数为负值的
	if v < 0 {
		//WaitGroup 的计数器的值必须大于等于 0。我们在更改这个计数值的时候,WaitGroup 会先做检查,如果计数值被设置为负数,就会导致 panic。
		panic("sync: negative WaitGroup counter")
	}
	// 如果等待器不为零,但 delta 是处于增加的状态,而且存储计数与 delta 的值相同,则立即 panic
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// 如果计数器 > 0 或者等待器为 0 则一切都很好,直接返回
	if v > 0 || w == 0 {
		return
	}
	// This goroutine has set counter to 0 when waiters > 0.
	// Now there can't be concurrent mutations of state:
	// - Adds must not happen concurrently with Wait,
	// - Wait does not increment waiters if it sees counter == 0.
	// Still do a cheap sanity check to detect WaitGroup misuse.
	// 这时 Goroutine 已经将计数器清零,且等待器大于零(并发调用导致)
	// 这时不允许出现并发使用导致的状态突变,否则就应该 panic
	// - Add 不能与 Wait 并发调用
	// - Wait 在计数器已经归零的情况下,不能再继续增加等待器了
	// 仍然检查来保证 WaitGroup 不会被滥用
	if *statep != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// Reset waiters count to 0.
	// 如果计数值v为0并且waiter的数量w不为0,那么state的值就是waiter的数量
    // 将waiter的数量设置为0,因为计数值v也是0,所以它们俩的组合*statep直接设置为0即可。此时需要并唤醒所有的waiter
	// 结束后将等待器清零
	*statep = 0
	// 等待器大于零,减少 runtime_Semrelease 产生的阻塞
	for ; w != 0; w-- {
		runtime_Semrelease(semap, false, 0)
	}
}
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
	// Done方法实际就是计数器减1
	wg.Add(-1)
}

Add 将 statep 的值作为两段来处理,前 32 位处理为计数器,后 32 位处理为等待器。

  • 在初始阶段,等待器为 0 ,计数器随着 Add 正数的调用而增加。
  • 如果 Add 使用错误导致计数器为负,则会立即 panic
  • 由于并发的效果,计数器和等待器的值是分开操作的,因此可能出现计数器已经为零(说明当前 Add 的操作为负,即 Done),但等待器为正的情况,依次调用存储原语释放产生的阻塞(本质上为加 1 操作)

我们来考虑一个使用场景,首先刚创建的 WaitGroup 所有值为零:

1
statep 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000

这时候调用 Add(1):

注意,有符号数为补码表示,最高位为符号位

1
2
3
4
int64(delta)      0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0001
int64(delta)<<32  0000 0000 0000 0000 0000 0000 0000 0001 0000 0000 0000 0000 0000 0000 0000 0000
statep            0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000
state             0000 0000 0000 0000 0000 0000 0000 0001 0000 0000 0000 0000 0000 0000 0000 0000

那么这时候的 v (计数器)为 1,而 w 等待器为 0。

再来执行一遍减一的操作。再减一之前:

1
statep            0000 0000 0000 0000 0000 0000 0000 0001 0000 0000 0000 0000 0000 0000 0000 0000

减一:

注意,有符号数为补码表示,最高位为符号位

1
2
3
4
int64(delta)      1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111
int64(delta)<<32  1111 1111 1111 1111 1111 1111 1111 1111 0000 0000 0000 0000 0000 0000 0000 0000
statep            0000 0000 0000 0000 0000 0000 0000 0001 0000 0000 0000 0000 0000 0000 0000 0000
state            10000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000

即计数器归零。

Wait

Wait 方法的实现逻辑是:不断检查 state 的值。如果其中的计数值变为了 0,那么说明所有的任务已完成,调用者不必再等待,直接返回。如果计数值大于 0,说明此时还有任务没完成,那么调用者就变成了等待者,需要加入 waiter 队列,并且阻塞住自己。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Wait blocks until the WaitGroup counter is zero.
// Wait 会保持阻塞直到 WaitGroup 计数器归零
func (wg *WaitGroup) Wait() {
	// 先获得计数器和存储原语
	statep, semap := wg.state()
	if race.Enabled {
		_ = *statep // trigger nil deref early
		race.Disable()
	}
	// 一个简单的死循环,只有当计数器归零才会结束
	for {
		// 原子读
		state := atomic.LoadUint64(statep)
		// 计数
		v := int32(state >> 32)// 当前计数值
		w := uint32(state)	// waiter的数量
		// 如果计数器已经归零,则直接退出循环
		if v == 0 {
			// Counter is 0, no need to wait.
			// 如果计数值为0, 调用这个方法的goroutine不必再等待,继续执行它后面的逻辑即可
			if race.Enabled {
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}
			return
		}
		// Increment waiters count.
		// 否则把waiter数量加1。期间可能有并发调用Wait的情况,所以最外层使用了一个for循环
		// 增加等待计数,此处的原语会比较 statep 和 state 的值,如果相同则等待计数加 1
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
			if race.Enabled && w == 0 {
				// Wait must be synchronized with the first Add.
				// Need to model this is as a write to race with the read in Add.
				// As a consequence, can do the write only for the first waiter,
				// otherwise concurrent Waits will race with each other.
				race.Write(unsafe.Pointer(semap))
			}
			// 阻塞休眠等待
			// 会阻塞到存储原语是否 > 0(即睡眠),如果 *semap > 0 则会减 1,因此最终的 semap 理论为 0
			runtime_Semacquire(semap)
			// 在这种情况下,如果 *semap 不等于 0 ,则说明使用失误,直接 panic
			if *statep != 0 {
				//WaitGroup 虽然可以重用,但是是有一个前提的,那就是必须等到上一轮的 Wait 完成之后,才能重用 WaitGroup 执行下一轮的 Add/Wait,如果你在 Wait 还没执行完的时候就调用下一轮 Add 方法,就有可能出现 panic
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			// 被唤醒,不再阻塞,返回
			if race.Enabled {
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}
			return
		}
	}
}

可以看到 Wait 使用的是一个简单的死循环来进行操作。在循环体中,每次先读取计数器和等待器的值。 然后增加等待计数,如果增加成功,会调用 runtime_Semacquire 来阻塞当前的死循环, 直到存储原语的值被 runtime_Semrelease 减少后才会解除阻塞状态进入下一个循环。

整体流程

我们来完成考虑一下整个流程:

1
2
3
4
wg := sync.WaitGroup{}
wg.Add(1)
go func() { wg.Done() }()
wg.Wait()

在 wg 创建之初,计数器、等待器、存储原语的值均初始化为零值。不妨假设调用 wg.Add(1),则计数器加 1 等待器、存储原语保持不变,均为 0。

wg.Done() 和 wg.Wait() 的调用顺序可能成两种情况:

情况 1:先调用 wg.Done() 再调用 wg.Wait()。

这时候 wg.Done() 使计数器减 1 ,这时计数器、等待器、存储原语均为 0,由于等待器为 0 则 runtime_Semrelease 不会被调用。 于是当 wg.Wait() 开始调用时,读取到计数器已经为 0,循环退出,wg.Wait() 调用完毕。

情况 2:先调用 wg.Wait() 再调用 wg.Done()。

这时候 wg.Wait() 开始调用时,读取到计数器为 1,则为等待器加 1,并调用 runtime_Semacquire 开始阻塞在存储原语为 0 的状态。

在阻塞的过程中,Goroutine 被调度器调度,开始执行 wg.Done(),于是计数器清零,但由于等待器为 1 大于零。 这时将等待器也清零,并调用与等待器技术相同次数(此处为 1 次)的 runtime_Semrelease,这导致存储原语的值变为 1,计数器和等待器均为零。 这时,runtime_Semacquire 在存储原语大于零后被唤醒,这时检查计数器和等待器是否为零(如果不为零则说明 Add 与 Wait 产生并发调用,直接 panic),这时他们为 0,因此进入下一个循环,当再次读取计数器时,发现计数器已经清理,于是退出 wg.Wait() 调用,结束阻塞。

noCopy

vet 会对实现 Locker 接口的数据类型做静态检查,一旦代码中有复制使用这种数据类型的情况,就会发出警告。

通过给 WaitGroup 添加一个 noCopy 字段,我们就可以为 WaitGroup 实现 Locker 接口,这样 vet 工具就可以做复制检查了。而且因为 noCopy 字段是未输出类型,所以 WaitGroup 不会暴露 Lock/Unlock 方法。

noCopy 字段的类型是 noCopy,它只是一个辅助的、用来帮助 vet 检查用的类型:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11

// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
type noCopy struct{}

// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}

如果你想要自己定义的数据结构不被复制使用,或者说,不能通过 vet 工具检查出复制使用的报警,就可以通过嵌入 noCopy 这个数据类型来实现。

参考

https://colobu.com/2017/07/11/dive-into-sync-Map

https://segmentfault.com/a/1190000015242373

https://pathbox.github.io/2018/04/05/understand-sync.Map-in-Goalng/

http://www.qiuxiaobing.cn/%E7%BC%96%E7%A8%8B%E8%AF%AD%E8%A8%80/2018/03/09/go-sync-map.html

http://www.gogodjzhu.com/index.php/code/basic/397/

http://russellluo.com/2017/06/go-sync-map-diagram.html

5.4 条件变量

5.5 同步组

Go 标准库源码分析 - sync 包的Pool

5.7 并发安全散列表

6.1 上下文 Context

go context剖析之源码分析

5.3 原子操作