信号量
信号量的概念是荷兰计算机科学家 Edsger Dijkstra 在 1963 年左右提出来的,广泛应用在不同的操作系统中。在系统中,会给每一个进程一个信号量,代表每个进程目前的状态。
未得到控制权的进程,会在特定的地方被迫停下来,等待可以继续进行的信号到来。
最简单的信号量就是一个变量加一些并发控制的能力,这个变量是 0 到 n 之间的一个数值。当 goroutine 完成对此信号量的等待(wait)时,该计数值就减 1,当 goroutine 完成对此信号量的释放(release)时,该计数值就加 1。当计数值为 0 的时候,goroutine 调用 wait 等待该信号量是不会成功的,除非计数器又大于 0,等待的 goroutine 才有可能成功返回。
Dijkstra 在他的论文中为信号量定义了两个操作 P 和 V。P 操作(descrease、wait、acquire)是减少信号量的计数值,而 V 操作(increase、signal、release)是增加信号量的计数值。
使用伪代码表示如下(中括号代表原子操作):
1
2
3
4
5
6
7
8
9
|
function V(semaphore S, integer I):
[S ← S + I]
function P(semaphore S, integer I):
repeat:
[if S ≥ I:
S ← S − I
break]
|
可以看到,初始化信号量 S 有一个指定数量(n)的资源,它就像是一个有 n 个资源的池子。P 操作相当于请求资源,如果资源可用,就立即返回;如果没有资源或者不够,那么,它可以不断尝试或者阻塞等待。V 操作会释放自己持有的资源,把资源返还给信号量。信号量的值除了初始化的操作以外,只能由 P/V 操作改变。
现在,我们来总结下信号量的实现。
- 初始化信号量:设定初始的资源的数量。
- P 操作:将信号量的计数值减去 1,如果新值已经为负,那么调用者会被阻塞并加入到等待队列中。否则,调用者会继续执行,并且获得一个资源。
- V 操作:将信号量的计数值加 1,如果先前的计数值为负,就说明有等待的 P 操作的调用者。它会从等待队列中取出一个等待的调用者,唤醒它,让它继续执行。
Go扩展库Semaphore
Go 在它的扩展包中提供了信号量semaphore,不过这个信号量的类型名并不叫 Semaphore,而是叫 Weighted。
我们来分析下这个信号量的几个实现方法。
- Acquire 方法:相当于 P 操作,你可以一次获取多个资源,如果没有足够多的资源,调用者就会被阻塞。它的第一个参数是 Context,这就意味着,你可以通过 Context 增加超时或者 cancel 的机制。如果是正常获取了资源,就返回 nil;否则,就返回 ctx.Err(),信号量不改变。
- Release 方法:相当于 V 操作,可以将 n 个资源释放,返还给信号量。
- TryAcquire 方法:尝试获取 n 个资源,但是它不会阻塞,要么成功获取 n 个资源,返回 true,要么一个也不获取,返回 false。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
var (
maxWorkers = runtime.GOMAXPROCS(0) // worker数量
sema = semaphore.NewWeighted(int64(maxWorkers)) //信号量
task = make([]int, maxWorkers*4) // 任务数,是worker的四倍
)
func main() {
ctx := context.Background()
for i := range task {
// 如果没有worker可用,会阻塞在这里,直到某个worker被释放
if err := sema.Acquire(ctx, 1); err != nil {
break
}
// 启动worker goroutine
go func(i int) {
defer sema.Release(1)
time.Sleep(100 * time.Millisecond) // 模拟一个耗时操作
task[i] = i + 1
}(i)
}
// 请求所有的worker,这样能确保前面的worker都执行完
if err := sema.Acquire(ctx, int64(maxWorkers)); err != nil {
log.Printf("获取所有的worker失败: %v", err)
}
fmt.Println(task)
}
|
在这段代码中,main goroutine 相当于一个 dispacher,负责任务的分发。它先请求信号量,如果获取成功,就会启动一个 goroutine 去处理计算,然后,这个 goroutine 会释放这个信号量(有意思的是,信号量的获取是在 main goroutine,信号量的释放是在 worker goroutine 中),如果获取不成功,就等到有信号量可以使用的时候,再去获取。
需要提醒你的是,其实,在这个例子中,还有一个值得我们学习的知识点,就是最后的那一段处理(第 25 行)。如果在实际应用中,你想等所有的 Worker 都执行完,就可以获取最大计数值的信号量。
源码分析
Go 扩展库中的信号量是使用互斥锁 +List 实现的。互斥锁实现其它字段的保护,而 List 实现了一个等待队列,等待者的通知是通过 Channel 的通知机制实现的。
Weighted
我们来看一下信号量 Weighted 的数据结构:
1
2
3
4
5
6
|
type waiter struct {
// 信号量的权重
n int64
// 获得信号量后关闭
ready chan<- struct{}
}
|
1
2
3
4
5
6
|
type Weighted struct {
size int64 // 最大资源数
cur int64 // 当前已被使用的资源
mu sync.Mutex // 互斥锁,对字段的保护
waiters list.List // 等待队列
}
|
1
2
3
4
5
6
|
// NewWeighted使用给定的值创建一个新的加权信号量
// 并发访问的最大组合权重。
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}
|
Acquire
在信号量的几个实现方法里,Acquire 是代码最复杂的一个方法,它不仅仅要监控资源是否可用,而且还要检测 Context 的 Done 是否已关闭。我们来看下它的实现代码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
// Acquire获取权重为n的信号量,阻塞直到资源可用或ctx完成。
// 成功时,返回nil。失败时返回 ctx.Err()并保持信号量不变。
// 如果ctx已经完成,则Acquire仍然可以成功执行而不会阻塞
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock()
// fast path, 如果有足够的资源,都不考虑ctx.Done的状态,将cur加上n就返回
// 如果资源足够,并且没有排队等待的waiters
// cur+n,直接返回
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
// 如果是不可能完成的任务,请求的资源数大于能提供的最大的资源数
// 资源不够,err返回
if n > s.size {
// 不要其他的Acquire,阻塞在此
s.mu.Unlock()
// 依赖ctx的状态返回,否则一直等待
<-ctx.Done()
return ctx.Err()
}
// 否则就需要把调用者加入到等待队列中
// 创建了一个ready chan,以便被通知唤醒
ready := make(chan struct{})
// 组装waiter
w := waiter{n: n, ready: ready}
// 插入waiters中
elem := s.waiters.PushBack(w)
s.mu.Unlock()
// 等待
// 阻塞等待,直到资源可用或ctx完成
select {
case <-ctx.Done(): // context的Done被关闭
err := ctx.Err()
s.mu.Lock()
select {
case <-ready:
// 如果被唤醒了,忽略ctx的状态
// 在canceled之后获取了信号量,不要试图去修复队列,假装没看到取消
err = nil
default: // 通知waiter
isFront := s.waiters.Front() == elem
s.waiters.Remove(elem)
// 通知其它的waiters,检查是否有足够的资源
if isFront && s.size > s.cur {
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
case <-ready: // 被唤醒了
return nil
}
}
|
其实,为了提高性能,这个方法中的 fast path 之外的代码,可以抽取成 acquireSlow 方法,以便其它 Acquire 被内联。
TryAcquire
TryAcquire获取权重为n的信号量而不阻塞,相比Acquire少了等待队列的处理。
TryAcquire 非阻塞地获取指定权重的资源,如果当前没有空闲资源,会直接返回false。
1
2
3
4
5
6
7
8
9
10
11
|
// TryAcquire获取权重为n的信号量而不阻塞。
// 成功时返回true。 失败时,返回false并保持信号量不变。
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock()
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}
|
Release
Release 方法将当前计数值减去释放的资源数 n,并唤醒等待队列中的调用者,看是否有足够的资源被获取。
1
2
3
4
5
6
7
8
9
10
11
|
func (s *Weighted) Release(n int64) {
s.mu.Lock()
s.cur -= n
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
s.notifyWaiters()
s.mu.Unlock()
}
|
notifyWaiters
notifyWaiters 方法就是逐个检查等待的调用者,如果资源不够,或者是没有等待者了,就返回:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
func (s *Weighted) notifyWaiters() {
// 如果有阻塞的waiters,尝试去进行一一唤醒
// 唤醒的时候,先进先出,避免被资源比较大的waiter被饿死
for {
next := s.waiters.Front()
// 已经没有waiter了
if next == nil {
break // No more waiters blocked.
}
w := next.Value.(waiter)
// waiter需要的资源不足
if s.size-s.cur < w.n {
// 没有足够的令牌供下一个waiter使用。我们可以继续(尝试
// 查找请求较小的waiter),但在负载下可能会导致
// 饥饿的大型请求;相反,我们留下所有剩余的waiter阻塞
//
// 考虑一个用作读写锁的信号量,带有N个令牌,N个reader和一位writer
// 每个reader都可以通过Acquire(1)获取读锁。
// writer写入可以通过Acquire(N)获得写锁定,但不包括所有的reader。
// 如果我们允许读者在队列中前进,writer将会饿死-总是有一个令牌可供每个读者。
//避免饥饿,这里还是按照先入先出的方式处理
break
}
s.cur += w.n
s.waiters.Remove(next)
close(w.ready)
}
}
|
notifyWaiters 方法是按照先入先出的方式唤醒调用者。当释放 100 个资源的时候,如果第一个等待者需要 101 个资源,那么,队列中的所有等待者都会继续等待,即使有的等待者只需要 1 个资源。这样做的目的是避免饥饿,否则的话,资源可能总是被那些请求资源数小的调用者获取,这样一来,请求资源数巨大的调用者,就没有机会获得资源了。
总结
Acquire和TryAcquire都可用于获取资源,Acquire是可以阻塞的获取资源,TryAcquire只能非阻塞的获取资源;
Release对于waiters的唤醒原则,总是先进先出,避免资源需求比较大的waiter被饿死;
使用信号量的常见错误
保证信号量不出错的前提是正确地使用它,否则,公平性和安全性就会受到损害,导致程序 panic。
在使用信号量时,最常见的几个错误如下:
- 请求了资源,但是忘记释放它;
- 释放了从未请求的资源;
- 长时间持有一个资源,即使不需要它;
- 不持有一个资源,却直接使用它。
不过,即使你规避了这些坑,在同时使用多种资源,不同的信号量控制不同的资源的时候,也可能会出现死锁现象,比如哲学家就餐问题。
就 Go 扩展库实现的信号量来说,在调用 Release 方法的时候,你可以传递任意的整数。
但是,如果你传递一个比请求到的数量大的错误的数值,程序就会 panic。如果传递一个负数,会导致资源永久被持有。如果你请求的资源数比最大的资源数还大,那么,调用者可能永远被阻塞。
所以,使用信号量遵循的原则就是请求多少资源,就释放多少资源。你一定要注意,必须使用正确的方法传递整数,不要“耍小聪明”,而且,请求的资源数一定不要超过最大资源数。
其它信号量的实现
除了官方扩展库的实现,实际上,我们还有很多方法实现信号量,比较典型的就是使用 Channel 来实现。
根据之前的 Channel 类型的介绍以及 Go 内存模型的定义,你应该能想到,使用一个 buffer 为 n 的 Channel 很容易实现信号量,比如下面的代码,我们就是使用 chan struct{}类型来实现的。
在初始化这个信号量的时候,我们设置它的初始容量,代表有多少个资源可以使用。它使用 Lock 和 Unlock 方法实现请求资源和释放资源,正好实现了 Locker 接口。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
// Semaphore 数据结构,并且还实现了Locker接口
type semaphore struct {
sync.Locker
ch chan struct{}
}
// 创建一个新的信号量
func NewSemaphore(capacity int) sync.Locker {
if capacity <= 0 {
capacity = 1 // 容量为1就变成了一个互斥锁
}
return &semaphore{ch: make(chan struct{}, capacity)}
}
// 请求一个资源
func (s *semaphore) Lock() {
s.ch <- struct{}{}
}
// 释放资源
func (s *semaphore) Unlock() {
<-s.ch
}
|
当然,你还可以自己扩展一些方法,比如在请求资源的时候使用 Context 参数(Acquire(ctx))、实现 TryLock 等功能。
看到这里,你可能会问,这个信号量的实现看起来非常简单,而且也能应对大部分的信号量的场景,为什么官方扩展库的信号量的实现不采用这种方法呢?其实,具体是什么原因,我也不知道,但是我必须要强调的是,官方的实现方式有这样一个功能:它可以一次请求多个资源,这是通过 Channel 实现的信号量所不具备的。
参考
go中x/sync/semaphore解读