Mutex

Go 语言的 sync.Mutex 由两个字段 state 和 sema 组成。其中 state 表示当前互斥锁的状态,而 sema 是用于控制锁状态的信号量。

1
2
3
4
type Mutex struct {
	state int32
	sema  uint32
}

设计理念

这个案例基于两个 goroutine:

  • goroutine 1 每次持锁100ms
  • goroutine 2 每100ms 持有一次锁

都是100ms 的周期,但是由于 goroutine 1 不断的请求锁,可预期它会更频繁的持续到锁。我们基于 Go 1.8 循环了10次,下面是锁的请求占用分布:

Mutex 被 g1 获取了700多万次,而 g2 只获取了10次。

首先,goroutine1 将获得锁并休眠100ms。当goroutine2 试图获取锁时,它将被添加到锁的队列中- FIFO 顺序,goroutine 将进入等待状态。

然后,当 goroutine1 完成它的工作时,它将释放锁。此版本将通知队列唤醒 goroutine2。goroutine2 将被标记为可运行的,并且正在等待 Go 调度程序在线程上运行.

然而,当 goroutine2 等待运行时,goroutine1将再次请求锁。

goroutine2 尝试去获取锁,结果悲剧的发现锁又被人持有了,它自己继续进入到等待模式。

我们看看几种 Mutex 锁的实现:

  • Barging. 这种模式是为了提高吞吐量,当锁被释放时,它会唤醒第一个等待者,然后把锁给第一个等待者或者给第一个请求锁的人。
  • Handsoff. 当锁释放时候,锁会一直持有直到第一个等待者准备好获取锁。它降低了吞吐量,因为锁被持有,即使另一个 goroutine 准备获取它。 一个互斥锁的 handsoff 会完美地平衡两个goroutine 之间的锁分配,但是会降低性能,因为它会迫使第一个 goroutine 等待锁。
  • Spinning. 自旋在等待队列为空或者应用程序重度使用锁时效果不错。Parking 和 Unparking goroutines 有不低的性能成本开销,相比自旋来说要慢得多。

Go 1.8 使用了 Barging 和 Spining 的结合实现。当试图获取已经被持有的锁时,如果本地队列为空并且 P 的数量大于1,goroutine 将自旋几次(用一个 P 旋转会阻塞程序)。自旋后,goroutine park。在程序高频使用锁的情况下,它充当了一个快速路径。

Go 1.9 通过添加一个新的饥饿模式来解决先前解释的问题,该模式将会在释放时候触发 handsoff。所有等待锁超过一毫秒的 goroutine(也称为有界等待)将被诊断为饥饿。当被标记为饥饿状态时,unlock 方法会 handsoff 把锁直接扔给第一个等待者。

在饥饿模式下,自旋也被停用,因为传入的goroutines 将没有机会获取为下一个等待者保留的锁。

状态state

互斥锁的状态比较复杂,如下图所示,最低三位分别表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置用来表示当前有多少个 Goroutine 在等待互斥锁的释放:

在默认情况下,互斥锁的所有状态位都是 0,int32 中的不同位分别表示了不同的状态:

  • mutexLocked — 表示互斥锁的锁定状态;
  • mutexWoken — 表示从正常模式被唤醒;
  • mutexStarving — 当前的互斥锁进入饥饿状态;
  • waitersCount — 当前互斥锁上等待的 Goroutine 个数;

正常模式和饥饿模式

sync.Mutex 有两种模式 — 正常模式和饥饿模式。我们需要在这里先了解正常模式和饥饿模式都是什么以及它们有什么样的关系。

正常模式下,waiter 都是进入先入先出队列,被唤醒的 waiter 并不会直接持有锁,而是要和新来的 goroutine 进行竞争。新来的 goroutine 有先天的优势,它们正在 CPU 中运行,可能它们的数量还不少,所以,在高并发情况下,被唤醒的 waiter 可能比较悲剧地获取不到锁,这时,它会被插入到队列的前面。如果 waiter 获取不到锁的时间超过阈值 1 毫 秒,那么,这个 Mutex 就进入到了饥饿模式。

在饥饿模式下,Mutex 的拥有者将直接把锁交给队列最前面的 waiter。新来的 goroutine 不会尝试获取锁,即使看起来锁没有被持有,它也不会去抢,也不会 spin,它会乖乖地加 入到等待队列的尾部。

如果拥有 Mutex 的 waiter 发现下面两种情况的其中之一,它就会把这个 Mutex 转换成正常模式:

  • 此 waiter 已经是队列中的最后一个 waiter 了,没有其它的等待锁的 goroutine 了;
  • 此 waiter 的等待时间小于 1 毫秒。

正常模式拥有更好的性能,因为即使有等待抢锁的 waiter,goroutine 也可以连续多次获取到锁。

饥饿模式是对公平性和性能的一种平衡,它避免了某些 goroutine 长时间的等待锁。在饥饿模式下,优先对待的是那些一直在等待的 waiter。

实现原理

互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念:

  • 如果互斥锁处于初始化状态,会通过置位 mutexLocked 加锁;
  • 如果互斥锁处于 mutexLocked 状态并且在普通模式下工作,会进入自旋,执行 30 次 PAUSE 指令消耗 CPU 时间等待锁的释放;
  • 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式;
  • 互斥锁在正常情况下会通过 runtime.sync_runtime_SemacquireMutex 将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒;
  • 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,那么它会将互斥锁切换回正常模式;

互斥锁的解锁过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:

  • 当互斥锁已经被解锁时,调用 sync.Mutex.Unlock 会直接抛出异常;
  • 当互斥锁处于饥饿模式时,将锁的所有权交给队列中的下一个等待者,等待者会负责设置 mutexLocked 标志位;
  • 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,会直接返回;在其他情况下会通过 sync.runtime_Semrelease 唤醒对应的 Goroutine;

Lock

互斥锁的加锁是靠 sync.Mutex.Lock 完成的Lock 对申请锁的情况分为三种:

  • 无冲突,通过 CAS 操作把当前状态设置为加锁状态
  • 有冲突,开始自旋,并等待锁释放,如果其他 goroutine 在这段时间内释放该锁,直接获得该锁;如果没有释放则为下一种情况
  • 有冲突,且已经过了自旋阶段,通过调用 semrelease 让 goroutine 进入等待状态

最新的 Go 语言源代码中已经将 sync.Mutex.Lock 方法进行了简化,方法的主干只保留最常见、简单的情况 — 当锁的状态是 0 时,将 mutexLocked 位置成 1:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    // 快速路径: 抓取并锁上未锁住状态的互斥锁
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
    // Slow path (outlined so that the fast path can be inlined)
    // 缓慢之路,尝试自旋竞争或饥饿状态下饥饿goroutine竞争
	m.lockSlow()
}

如果互斥锁的状态不是 0 时就会调用 sync.Mutex.lockSlow 尝试通过自旋(Spinnig)等方式等待锁的释放,该方法的主体是一个非常大 for 循环,这里将它分成几个部分介绍获取锁的过程:

  1. 判断当前 Goroutine 能否进入自旋;
  2. 通过自旋等待互斥锁的释放;
  3. 计算互斥锁的最新状态;
  4. 更新互斥锁的状态并获取锁;
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
func (m *Mutex) lockSlow() {
	var waitStartTime int64
	starving := false // 此goroutine的饥饿标记
	awoke := false    // 唤醒标记
	iter := 0         //自旋次数
	old := m.state    //当前的锁的状态
	for {
		// Don't spin in starvation mode, ownership is handed off to waiters
		// so we won't be able to acquire the mutex anyway.
		// 锁是非饥饿状态,锁还没被释放,尝试自旋
		// 对正常状态抢夺锁的 goroutine 尝试 spin,在临界区耗时很短的情况提高性能.
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// Active spinning makes sense.
			// Try to set mutexWoken flag to inform Unlock
			// to not wake other blocked goroutines.
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin()
			iter++
			// 再次获取锁的状态,之后会检查是否锁被释放了
			old = m.state
			continue
		}
		new := old
		// Don't try to acquire starving mutex, new arriving goroutines must queue.
		//非饥饿状态下抢锁。怎么抢?就是要把 state 的锁的那一位,置为加锁状态,后续 CAS 如果成功就可能获取到了锁。
		if old&mutexStarving == 0 {
			new |= mutexLocked // 非饥饿状态,加锁
		}
		//如果锁已经被持有或者锁处于饥饿状态,我们最好的归宿就是等待, 所以 waiter 的数量加 1。
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift // waiter数量加1
		}
		// The current goroutine switches mutex to starvation mode.
		// But if the mutex is currently unlocked, don't do the switch.
		// Unlock expects that starving mutex has waiters, which will not
		// be true in this case.
		//如果此 goroutine 已经处在饥饿状态,并且锁还被持有,那么,我们需要把此 Mutex 设置为饥饿状态。
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving // 设置饥饿状态
		}
		//清除 mutexWoken 标记,因为不管是获得了锁还是进入休眠,我们都需要清除 mutexWoken 标记。
		if awoke {
			// The goroutine has been woken from sleep,
			// so we need to reset the flag in either case.
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
			new &^= mutexWoken // 新状态清除唤醒标记
		}
		// 成功设置新状态
		// 尝试使用 CAS 设置 state。如果成功,检查原来的锁的 状态是未加锁状态,并且也不是饥饿状态的话就成功获取了锁,返回。
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			// 原来锁的状态已释放,并且不是饥饿状态,正常请求到了锁,返回
			if old&(mutexLocked|mutexStarving) == 0 {
				break // locked the mutex with CAS
			}
			// If we were already waiting before, queue at the front of the queue.
			// 处理饥饿状态
			// 如果以前就在队列里面,加入到队列头
			//判断是否第一次加入到 waiter 队列。
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			// 阻塞等待
			//将此 waiter 加入到队列,如果是首次,加入到队尾,先进先出。如果不是首次, 那么加入到队首,这样等待最久的 goroutine 优先能够获取到锁。此 goroutine 会进行休眠。
			//如果没有通过 CAS 获得锁,会调用 runtime.sync_runtime_SemacquireMutex 通过信号量保证资源不会被两个 Goroutine 获取。runtime.sync_runtime_SemacquireMutex 会在方法中不断尝试获取锁并陷入休眠等待信号量的释放,一旦当前 Goroutine 可以获取信号量,它就会立刻返回,sync.Mutex.Lock 的剩余代码也会继续执行
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			//判断此 goroutine 是否处于饥饿状态。注意,执行这一句的时候,它已经被唤醒了。
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			// 唤醒之后检查锁是否应该处于饥饿状态
			old = m.state
			// 如果锁已经处于饥饿状态,直接抢到锁,返回
			//在饥饿模式下,当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出;
			if old&mutexStarving != 0 {
				// If this goroutine was woken and mutex is in starvation mode,
				// ownership was handed off to us but mutex is in somewhat
				// inconsistent state: mutexLocked is not set and we are still
				// accounted as waiter. Fix that.
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				//加锁并且将waiter数减1
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				// 最后一个waiter或者已经不饥饿了,清掉饥饿标记
				if !starving || old>>mutexWaiterShift == 1 {
					// Exit starvation mode.
					// Critical to do it here and consider wait time.
					// Starvation mode is so inefficient, that two goroutines
					// can go lock-step infinitely once they switch mutex
					// to starvation mode.
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			//在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环;
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}

	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
}

自旋

自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序,所以 Goroutine 进入自旋的条件非常苛刻:

  1. 互斥锁只有在普通模式才能进入自旋;
  2. runtime.sync_runtime_canSpin 需要返回 true:
    1. 运行在多 CPU 的机器上;
    2. 当前 Goroutine 为了获取该锁进入自旋的次数小于四次;
    3. 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;

一旦当前 Goroutine 能够进入自旋就会调用runtime.sync_runtime_doSpin 和 runtime.procyield 并执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func sync_runtime_canSpin(i int) bool {
	// sync.Mutex is cooperative, so we are conservative with spinning.
	// Spin only few times and only if running on a multicore machine and
	// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
	// As opposed to runtime mutex we don't do passive spinning here,
	// because there can be work on global runq or on other Ps.
	if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
		return false
	}
	if p := getg().m.p.ptr(); !runqempty(p) {
		return false
	}
	return true
}

一旦当前 Goroutine 能够进入自旋就会调用runtime.sync_runtime_doSpin 和 runtime.procyield 并执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func sync_runtime_doSpin() {
	procyield(active_spin_cnt)
}

TEXT runtime·procyield(SB),NOSPLIT,$0-0
	MOVL	cycles+0(FP), AX
again:
	PAUSE
	SUBL	$1, AX
	JNZ	again
	RET

Unlock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}
    //如果该函数返回的新状态等于 0,当前 Goroutine 就成功解锁了互斥锁;
    //如果该函数返回的新状态不等于 0,这段代码会调用 sync.Mutex.unlockSlow 开始慢速解锁:
	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}

在正常情况下会根据当前互斥锁的状态,分别处理正常模式和饥饿模式下的互斥锁:

  • 在正常模式下,上述代码会使用如下所示的处理过程:

    • 如果互斥锁不存在等待者或者互斥锁的 mutexLocked、mutexStarving、mutexWoken 状态不都为 0,那么当前方法可以直接返回,不需要唤醒其他等待者;
    • 如果互斥锁存在等待者,会通过 sync.runtime_Semrelease 唤醒等待者并移交锁的所有权;
  • 在饥饿模式下,上述代码会直接调用 sync.runtime_Semrelease 将当前锁交给下一个正在尝试获取锁的等待者,等待者被唤醒后会得到锁,在这时互斥锁还不会退出饥饿状态;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (m *Mutex) unlockSlow(new int32) {
    //先校验锁状态的合法性 — 如果当前互斥锁已经被解锁过了会直接抛出异常 “sync: unlock of unlocked mutex” 中止当前程序。
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
    }

	if new&mutexStarving == 0 {
		old := new
		for {
			// If there are no waiters or a goroutine has already
			// been woken or grabbed the lock, no need to wake anyone.
			// In starvation mode ownership is directly handed off from unlocking
			// goroutine to the next waiter. We are not part of this chain,
			// since we did not observe mutexStarving when we unlocked the mutex above.
            // So get off the way.
            // 如果 Mutex 处于正常状态,如果没有 waiter,或者已经有在处理的情况了,那么释放就好,不做额外的处理
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
            // Grab the right to wake someone.
            //否则,waiter 数减 1,mutexWoken 标志设置上,通过 CAS 更新 state 的值
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				// 唤醒一个阻塞的 goroutine,但不是唤醒第一个等待者
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// Starving mode: handoff mutex ownership to the next waiter, and yield
		// our time slice so that the next waiter can start to run immediately.
		// Note: mutexLocked is not set, the waiter will set it after wakeup.
		// But mutex is still considered locked if mutexStarving is set,
        // so new coming goroutines won't acquire it.
        //如果 Mutex 处于饥饿状态,直接唤醒等待队列中的 waiter。
		// 饥饿模式: 直接将 mutex 所有权交给等待队列最前端的 goroutine
		runtime_Semrelease(&m.sema, true, 1)
	}
}

RWMutex

RWMutex 是很常见的并发原语,很多编程语言的库都提供了类似的并发类型。RWMutex 一般都是基于互斥锁、条件变量(condition variables)或者信号量(semaphores)等并发原语来实现。Go 标准库中的 RWMutex 是基于 Mutex 实现的。

readers-writers 问题一般有三类,基于对读和写操作的优先级,读写锁的设计和实现也分成三类。

  • Read-preferring:读优先的设计可以提供很高的并发性,但是,在竞争激烈的情况下可能会导致写饥饿。这是因为,如果有大量的读,这种设计会导致只有所有的读都释放了锁之后,写才可能获取到锁。

  • Write-preferring:写优先的设计意味着,如果已经有一个 writer 在等待请求锁的话,它会阻止新来的请求锁的 reader 获取到锁,所以优先保障 writer。当然,如果有一些 reader 已经请求了锁的话,新请求的 writer 也会等待已经存在的 reader 都释放锁之后才能获取。所以,写优先级设计中的优先权是针对新来的请求而言的。这种设计主要避免了 writer 的饥饿问题。

  • 不指定优先级:这种设计比较简单,不区分 reader 和 writer 优先级,某些场景下这种不指定优先级的设计反而更有效,因为第一类优先级会导致写饥饿,第二类优先级可能会导致读饥饿,这种不指定优先级的访问不再区分读写,大家都是同一个优先级,解决了饥饿的问题。

Go 标准库中的 RWMutex 设计是 Write-preferring 方案。一个正在阻塞的 Lock 调用会排除新的 reader 请求到锁。

sync.RWMutex 中总共包含以下 5 个字段:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// There is a modified copy of this file in runtime/rwmutex.go.
// If you make any changes here, see if you should make them there.

// A RWMutex is a reader/writer mutual exclusion lock.
// The lock can be held by an arbitrary number of readers or a single writer.
// The zero value for a RWMutex is an unlocked mutex.
//
// A RWMutex must not be copied after first use.
//
// If a goroutine holds a RWMutex for reading and another goroutine might
// call Lock, no goroutine should expect to be able to acquire a read lock
// until the initial read lock is released. In particular, this prohibits
// recursive read locking. This is to ensure that the lock eventually becomes
// available; a blocked Lock call excludes new readers from acquiring the
// lock.
type RWMutex struct {
	w           Mutex  // held if there are pending writers // 互斥锁解决多个writer的竞争
	writerSem   uint32 // semaphore for writers to wait for completing readers  // writer信号量
	readerSem   uint32 // semaphore for readers to wait for completing writers  // reader信号量
	readerCount int32  // number of pending readers // reader的数量
	readerWait  int32  // number of departing readers   // writer等待完成的reader的数量
}
  • w:为 writer 的竞争锁而设计;
  • readerCount:记录当前 reader 的数量(以及是否有 writer 竞争锁);
  • readerWait:记录 writer 请求锁时需要等待 read 完成的 reader 的数量;
  • writerSem 和 readerSem:都是为了阻塞设计的信号量。

我们会依次分析获取写锁和读锁的实现原理,其中:

  • 写操作使用 sync.RWMutex.Lock 和 sync.RWMutex.Unlock 方法;
  • 读操作使用 sync.RWMutex.RLock 和 sync.RWMutex.RUnlock 方法;

在 Lock 方法中,是先获取内部互斥锁,才会修改的其他字段;而在 Unlock 方法中,是先修改的其他字段,才会释放内部互斥锁,这样才能保证字段的修改也受到互斥锁的保护。

RLock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// RLock locks rw for reading.
//
// It should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock. See the
// documentation on the RWMutex type.
func (rw *RWMutex) RLock() {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
	//reader 计数加 1。
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// A writer is pending, wait for it.
		// rw.readerCount是负值的时候,意味着此时有writer等待请求锁,因为writer优先
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
	}
}

你可能比较困惑的是,readerCount 怎么还可能为负数呢?其实,这是因为,readerCount 这个字段有双重含义:

  • 没有 writer 竞争或持有锁时,readerCount 和我们正常理解的 reader 的计数是一样的;

  • 但是,如果有 writer 竞争锁或者持有锁时,那么,readerCount 不仅仅承担着 reader 的计数功能,还能够标识当前是否有 writer 竞争或持有锁,在这种情况下,请求锁的 reader 的处理会阻塞等待锁的释放。

RUnlock

调用 RUnlock 的时候,我们需要将 Reader 的计数减去 1,因为 reader 的数量减少了一个。但是, AddInt32 的返回值还有另外一个含义。如果它是负值, 就表示当前有 writer 竞争锁,在这种情况下,还会调用 rUnlockSlow 方法,检查是不是 reader 都释放读锁了,如果读锁都释放了,那么可以唤醒请求写锁的 writer 了。

当一个或者多个 reader 持有锁的时候,竞争锁的 writer 会等待这些 reader 释放完,才可能持有这把锁。

当 writer 请求锁的时候,是无法改变既有的 reader 持有锁的现实的,也不会强制这些 reader 释放锁,它的优先权只是限定后来的 reader 不要和它抢。

所以,rUnlockSlow 将持有锁的 reader 计数减少 1 的时候,会检查既有的 reader 是不是都已经释放了锁,如果都释放了锁,就会唤醒 writer,让 writer 持有锁。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
	if race.Enabled {
		_ = rw.w.state
		race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
		race.Disable()
	}
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		// Outlined slow-path to allow the fast-path to be inlined
		// 有等待的writer
		rw.rUnlockSlow(r)
	}
	if race.Enabled {
		race.Enable()
	}
}

func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		race.Enable()
		throw("sync: RUnlock of unlocked RWMutex")
	}
	// A writer is pending.
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// The last reader unblocks the writer.
		// 最后一个reader了,writer终于有机会获得锁了
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

Lock

RWMutex 是一个多 writer 多 reader 的读写锁,所以同时可能有多个 writer 和 reader。

那么,为了避免 writer 之间的竞争,RWMutex 就会使用一个 Mutex 来保证 writer 的互斥。

一旦一个 writer 获得了内部的互斥锁,就会反转 readerCount 字段,把它从原来的正整数 readerCount(>=0) 修改为负数(readerCount-rwmutexMaxReaders),让这个字段保持两个含义(既保存了 reader 的数量,又表示当前有 writer)。

当资源的使用者想要获取写锁时,需要调用 sync.RWMutex.Lock 方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
	// First, resolve competition with other writers.
	// 首先解决其他writer竞争问题
	rw.w.Lock()
	// Announce to readers there is a pending writer.
	// 反转readerCount,告诉reader有writer竞争锁
	//记录当前活跃的 reader 数量,所谓活跃的 reader,就是指持有读锁还没有释放的那些 reader。
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// Wait for active readers.
	// 如果当前有reader持有锁,那么需要等待
	// 如果 readerCount 不是 0,就说明当前有持有读锁的 reader,RWMutex 需要把这个当前 readerCount 赋值给 readerWait 字段保存下来, 同时,这个 writer 进入阻塞等待状态。
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
		race.Acquire(unsafe.Pointer(&rw.writerSem))
	}
}

每当一个 reader 释放读锁的时候(调用 RUnlock 方法时),readerWait 字段就减 1,直到所有的活跃的 reader 都释放了读锁,才会唤醒这个 writer。

Unlock

当一个 writer 释放锁的时候,它会再次反转 readerCount 字段。可以肯定的是,因为当前锁由 writer 持有,所以,readerCount 字段是反转过的,并且减去了 rwmutexMaxReaders 这个常数,变成了负数。所以,这里的反转方法就是给它增加 rwmutexMaxReaders 这个常数值。

既然 writer 要释放锁了,那么就需要唤醒之后新来的 reader,不必再阻塞它们了,让它们开开心心地继续执行就好了。

在 RWMutex 的 Unlock 返回之前,需要把内部的互斥锁释放。释放完毕后,其他的 writer 才可以继续竞争这把锁。

写锁的释放会调用 sync.RWMutex.Unlock:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
	if race.Enabled {
		_ = rw.w.state
		race.Release(unsafe.Pointer(&rw.readerSem))
		race.Disable()
	}
	// 告诉reader没有活跃的writer了
	// Announce to readers there is no active writer.
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		race.Enable()
		throw("sync: Unlock of unlocked RWMutex")
	}
	// Unblock blocked readers, if any.
	for i := 0; i < int(r); i++ {
		// 唤醒阻塞的reader们
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// Allow other writers to proceed.
	// 释放内部的互斥锁
	rw.w.Unlock()
	if race.Enabled {
		race.Enable()
	}
}

与加锁的过程正好相反,写锁的释放分以下几个执行:

  1. 调用 sync/atomic.AddInt32 函数将 readerCount 变回正数,释放读锁;
  2. 通过 for 循环释放所有因为获取读锁而陷入等待的 Goroutine:
  3. 调用 sync.Mutex.Unlock 释放写锁;

获取写锁时会先阻塞写锁的获取,后阻塞读锁的获取,这种策略能够保证读操作不会被连续的写操作『饿死』。

参考

https://colobu.com/2017/07/11/dive-into-sync-Map

https://segmentfault.com/a/1190000015242373

https://pathbox.github.io/2018/04/05/understand-sync.Map-in-Goalng/

http://www.qiuxiaobing.cn/%E7%BC%96%E7%A8%8B%E8%AF%AD%E8%A8%80/2018/03/09/go-sync-map.html

http://www.gogodjzhu.com/index.php/code/basic/397/

http://russellluo.com/2017/06/go-sync-map-diagram.html

5.4 条件变量

5.5 同步组

Go 标准库源码分析 - sync 包的Pool

5.7 并发安全散列表

6.1 上下文 Context

go context剖析之源码分析

5.3 原子操作