Cond
标准库中的 Cond 并发原语初始化的时候,需要关联一个 Locker 接口的实例,一般我们使用 Mutex 或者 RWMutex。
首先,Cond 关联的 Locker 实例可以通过 c.L 访问,它内部维护着一个先入先出的等待队列。
然后,我们分别看下它的三个方法 Broadcast、Signal 和 Wait 方法。
- Signal 方法,允许调用者 Caller 唤醒一个等待此 Cond 的 goroutine。如果此时没有等待的 goroutine,显然无需通知 waiter;如果 Cond 等待队列中有一个或者多个等待的 goroutine,则需要从等待队列中移除第一个 goroutine 并把它唤醒。在其他编程语言中,比如 Java 语言中,Signal 方法也被叫做 notify 方法。
- 调用 Signal 方法时,不强求你一定要持有 c.L 的锁。
- Broadcast 方法,允许调用者 Caller 唤醒所有等待此 Cond 的 goroutine。如果此时没有等待的 goroutine,显然无需通知 waiter;如果 Cond 等待队列中有一个或者多个等待的 goroutine,则清空所有等待的 goroutine,并全部唤醒。在其他编程语言中,比如 Java 语言中,Broadcast 方法也被叫做 notifyAll 方法。
- 同样地,调用 Broadcast 方法时,也不强求你一定持有 c.L 的锁。
- Wait 方法,会把调用者 Caller 放入 Cond 的等待队列中并阻塞,直到被 Signal 或者 Broadcast 的方法从等待队列中移除并唤醒。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
//
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond must not be copied after first use.
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
// 当观察或者修改等待条件的时候需要加锁
L Locker
// 等待队列
notify notifyList
checker copyChecker
}
// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
//
// Wait 把调用者加入到等待队列时会释放锁,在被唤醒之后还会请求锁。在阻塞休眠期间, 调用者是不持有锁的,这样能让其他 goroutine 有机会检查或者更新等待变量。
// Wait 原子式的 unlock c.L, 并暂停执行调用的 goroutine。
// 在稍后执行后,Wait 会在返回前 lock c.L. 与其他系统不同,
// 除非被 Broadcast 或 Signal 唤醒,否则等待无法返回。
//
// 因为等待第一次 resume 时 c.L 没有被锁定,所以当 Wait 返回时,
// 调用者通常不能认为条件为真。相反,调用者应该在循环中使用 Wait():
//
// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
//
func (c *Cond) Wait() {
// 增加到等待队列中
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
// 阻塞休眠直到被唤醒
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
// Signal 和 Broadcast 只涉及到 notifyList 数据结构,不涉及到锁。
// Signal 唤醒一个等待 c 的 goroutine(如果存在)
//
// 在调用时它可以(不必须)持有一个 c.L
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
// Broadcast 唤醒等待 c 的所有 goroutine
//
// 调用时它可以(不必须)持久有个 c.L
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
|
copyChecker
copyChecker 非常简单,它实现了一个 check() 方法,这个方法以 copyChecker 的指针作为 reciever, 因为 copyChecker 在一个 Cond 中并非指针,因此当 Cond 发生拷贝行为后,这个 reciever 会 发生变化,从而检测到拷贝行为,使用 panic 以警示用户:
1
2
3
4
5
6
7
8
9
10
11
12
|
// copyChecker holds back pointer to itself to detect object copying.
// copyChecker 是一个辅助结构,可以在运行时检查 Cond 是否被复制使用。
// copyChecker 保存指向自身的指针来检测对象的复制行为。
type copyChecker uintptr
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
|
notifyList
runtime_notifyListXXX 是运行时实现的方法,实现了一个等待 / 通知的队列。
notifyList 结构本质上是一个队列:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
// notifyList 基于 ticket 实现通知列表
type notifyList struct {
// wait 为下一个 waiter 的 ticket 编号
// 在没有 lock 的情况下原子自增
wait uint32
// notify 是下一个被通知的 waiter 的 ticket 编号
// 它可以在没有 lock 的情况下进行读取,但只有在持有 lock 的情况下才能进行写
//
// wait 和 notify 会产生 wrap around,只要它们 "unwrapped"
// 的差别小于 2^31,这种情况可以被正确处理。对于 wrap around 的情况而言,
// 我们需要超过 2^31+ 个 goroutine 阻塞在相同的 condvar 上,这是不可能的。
//
notify uint32
// waiter 列表.
lock mutex
head *sudog
tail *sudog
}
|
当一个 Cond 调用 Wait 方法时候,向 wait 字段加 1,并返回一个 ticket 编号:
1
2
3
4
5
6
7
|
// notifyListAdd 将调用者添加到通知列表,以便接收通知。
// 调用者最终必须调用 notifyListWait 等待这样的通知,并传递返回的 ticket 编号。
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
// 这可以并发调用,例如,当在 read 模式下保持 RWMutex 时从 sync.Cond.Wait 调用时。
return atomic.Xadd(&l.wait, 1) - 1
}
|
而后使用这个 ticket 编号来等待通知,这个过程会将等待通知的 goroutine 进行停泊,进入等待状态, 并将其 M 与 P 解绑,从而将 G 从 M 身上剥离,放入等待队列 sudog 中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
// notifyListWait 等待通知。如果在调用 notifyListAdd 后发送了一个,则立即返回。否则,它会阻塞。
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
lock(&l.lock)
// 如果 ticket 编号对应的 goroutine 已经被通知到,则立刻返回
if less(t, l.notify) {
unlock(&l.lock)
return
}
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
t0 := int64(0)
if blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
// 将 M/P/G 解绑,并将 G 调整为等待状态,放入 sudog 等待队列中
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
if t0 != 0 {
blockevent(s.releasetime-t0, 2)
}
releaseSudog(s)
}
// 将当前 goroutine 置于等待状态并解锁 lock。
// 通过调用 goready(gp) 可让 goroutine 再次 runnable
func goparkunlock(lock *mutex, reason waitReason, traceEv byte, traceskip int) {
gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceskip)
}
|
当调用 Signal 时,会有一个在等待的 goroutine 被通知到,具体过程就是从 sudog 列表中找到 要通知的 goroutine,而后将其 goready 来等待调度循环将其调度:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
// notifyListNotifyOne 通知列表中的一个条目
//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
// Fast-path: 如果上次通知后没有新的 waiter
// 则无需加锁
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lock(&l.lock)
// slow-path 的二次检查
t := l.notify
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
// 更新下一个需要唤醒的 ticket 编号
atomic.Store(&l.notify, t+1)
// 尝试找到需要被通知的 g
// 如果目前还没来得及入队,是无法找到的
// 但是,当它看到通知编号已经发生改变是不会被 park 的
//
// 这个查找过程看起来是线性复杂度,但实际上很快就停了
// 因为 g 的队列与获取编号不同,因而队列中会出现少量重排,但我们希望找到靠前的 g
// 而 g 只有在不再 race 后才会排在靠前的位置,因此这个迭代也不会太久,
// 同时,即便找不到 g,这个情况也成立:
// 它还没有休眠,并且已经失去了我们在队列上找到的(少数)其他 g 的 race。
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}
func readyWithTime(s *sudog, traceskip int) {
if s.releasetime != 0 {
s.releasetime = cputicks()
}
goready(s.g, traceskip)
}
|
如果是全员通知,基本类似:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
// notifyListNotifyAll 通知列表里的所有人
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
// Fast-path: 如果上次通知后没有新的 waiter
// 则无需加锁
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
// 从列表中取一个,保存到局部变量,waiter 则可以在无锁的情况下 ready
lock(&l.lock)
s := l.head
l.head = nil
l.tail = nil
// 更新要通知的下一个 ticket。
// 可以将它设置为等待的当前值,因为任何以前的 waiter 已经在列表中,
// 或者会他们在尝试将自己添加到列表时已经收到通知。
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
// 遍历整个本地列表,并 ready 所有的 waiter
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
|
参考
https://colobu.com/2017/07/11/dive-into-sync-Map
https://segmentfault.com/a/1190000015242373
https://pathbox.github.io/2018/04/05/understand-sync.Map-in-Goalng/
http://www.qiuxiaobing.cn/%E7%BC%96%E7%A8%8B%E8%AF%AD%E8%A8%80/2018/03/09/go-sync-map.html
http://www.gogodjzhu.com/index.php/code/basic/397/
http://russellluo.com/2017/06/go-sync-map-diagram.html
5.4 条件变量
5.5 同步组
Go 标准库源码分析 - sync 包的Pool
5.7 并发安全散列表
6.1 上下文 Context
go context剖析之源码分析
5.3 原子操作