信号量

信号量的概念是荷兰计算机科学家 Edsger Dijkstra 在 1963 年左右提出来的,广泛应用在不同的操作系统中。在系统中,会给每一个进程一个信号量,代表每个进程目前的状态。

未得到控制权的进程,会在特定的地方被迫停下来,等待可以继续进行的信号到来。

最简单的信号量就是一个变量加一些并发控制的能力,这个变量是 0 到 n 之间的一个数值。当 goroutine 完成对此信号量的等待(wait)时,该计数值就减 1,当 goroutine 完成对此信号量的释放(release)时,该计数值就加 1。当计数值为 0 的时候,goroutine 调用 wait 等待该信号量是不会成功的,除非计数器又大于 0,等待的 goroutine 才有可能成功返回。

Dijkstra 在他的论文中为信号量定义了两个操作 P 和 V。P 操作(descrease、wait、acquire)是减少信号量的计数值,而 V 操作(increase、signal、release)是增加信号量的计数值。

使用伪代码表示如下(中括号代表原子操作):

1
2
3
4
5
6
7
8
9

function V(semaphore S, integer I):
    [SS + I]

function P(semaphore S, integer I):
    repeat:
        [if SI:
        SSI
        break]

可以看到,初始化信号量 S 有一个指定数量(n)的资源,它就像是一个有 n 个资源的池子。P 操作相当于请求资源,如果资源可用,就立即返回;如果没有资源或者不够,那么,它可以不断尝试或者阻塞等待。V 操作会释放自己持有的资源,把资源返还给信号量。信号量的值除了初始化的操作以外,只能由 P/V 操作改变。

现在,我们来总结下信号量的实现。

  1. 初始化信号量:设定初始的资源的数量。
  2. P 操作:将信号量的计数值减去 1,如果新值已经为负,那么调用者会被阻塞并加入到等待队列中。否则,调用者会继续执行,并且获得一个资源。
  3. V 操作:将信号量的计数值加 1,如果先前的计数值为负,就说明有等待的 P 操作的调用者。它会从等待队列中取出一个等待的调用者,唤醒它,让它继续执行。

Go扩展库Semaphore

Go 在它的扩展包中提供了信号量semaphore,不过这个信号量的类型名并不叫 Semaphore,而是叫 Weighted。

我们来分析下这个信号量的几个实现方法。

  1. Acquire 方法:相当于 P 操作,你可以一次获取多个资源,如果没有足够多的资源,调用者就会被阻塞。它的第一个参数是 Context,这就意味着,你可以通过 Context 增加超时或者 cancel 的机制。如果是正常获取了资源,就返回 nil;否则,就返回 ctx.Err(),信号量不改变。
  2. Release 方法:相当于 V 操作,可以将 n 个资源释放,返还给信号量。
  3. TryAcquire 方法:尝试获取 n 个资源,但是它不会阻塞,要么成功获取 n 个资源,返回 true,要么一个也不获取,返回 false。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

var (
    maxWorkers = runtime.GOMAXPROCS(0)                    // worker数量
    sema       = semaphore.NewWeighted(int64(maxWorkers)) //信号量
    task       = make([]int, maxWorkers*4)                // 任务数,是worker的四倍
)

func main() {
    ctx := context.Background()

    for i := range task {
        // 如果没有worker可用,会阻塞在这里,直到某个worker被释放
        if err := sema.Acquire(ctx, 1); err != nil {
            break
        }

        // 启动worker goroutine
        go func(i int) {
            defer sema.Release(1)
            time.Sleep(100 * time.Millisecond) // 模拟一个耗时操作
            task[i] = i + 1
        }(i)
    }

    // 请求所有的worker,这样能确保前面的worker都执行完
    if err := sema.Acquire(ctx, int64(maxWorkers)); err != nil {
        log.Printf("获取所有的worker失败: %v", err)
    }

    fmt.Println(task)
}

在这段代码中,main goroutine 相当于一个 dispacher,负责任务的分发。它先请求信号量,如果获取成功,就会启动一个 goroutine 去处理计算,然后,这个 goroutine 会释放这个信号量(有意思的是,信号量的获取是在 main goroutine,信号量的释放是在 worker goroutine 中),如果获取不成功,就等到有信号量可以使用的时候,再去获取。

需要提醒你的是,其实,在这个例子中,还有一个值得我们学习的知识点,就是最后的那一段处理(第 25 行)。如果在实际应用中,你想等所有的 Worker 都执行完,就可以获取最大计数值的信号量。

源码分析

Go 扩展库中的信号量是使用互斥锁 +List 实现的。互斥锁实现其它字段的保护,而 List 实现了一个等待队列,等待者的通知是通过 Channel 的通知机制实现的。

Weighted

我们来看一下信号量 Weighted 的数据结构:

1
2
3
4
5
6
type waiter struct {
	// 信号量的权重
	n     int64
	// 获得信号量后关闭
	ready chan<- struct{}
}
1
2
3
4
5
6
type Weighted struct {
    size    int64         // 最大资源数
    cur     int64         // 当前已被使用的资源
    mu      sync.Mutex    // 互斥锁,对字段的保护
    waiters list.List     // 等待队列
}
1
2
3
4
5
6
// NewWeighted使用给定的值创建一个新的加权信号量
// 并发访问的最大组合权重。
func NewWeighted(n int64) *Weighted {
	w := &Weighted{size: n}
	return w
}

Acquire

在信号量的几个实现方法里,Acquire 是代码最复杂的一个方法,它不仅仅要监控资源是否可用,而且还要检测 Context 的 Done 是否已关闭。我们来看下它的实现代码。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// Acquire获取权重为n的信号量,阻塞直到资源可用或ctx完成。
// 成功时,返回nil。失败时返回 ctx.Err()并保持信号量不变。
// 如果ctx已经完成,则Acquire仍然可以成功执行而不会阻塞
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
    s.mu.Lock()
    // fast path, 如果有足够的资源,都不考虑ctx.Done的状态,将cur加上n就返回
    // 如果资源足够,并且没有排队等待的waiters
	// cur+n,直接返回
    if s.size-s.cur >= n && s.waiters.Len() == 0 {
      s.cur += n
      s.mu.Unlock()
      return nil
    }

    // 如果是不可能完成的任务,请求的资源数大于能提供的最大的资源数
    // 资源不够,err返回
    if n > s.size {
        // 不要其他的Acquire,阻塞在此
      s.mu.Unlock()
        // 依赖ctx的状态返回,否则一直等待
      <-ctx.Done()
      return ctx.Err()
    }

    // 否则就需要把调用者加入到等待队列中
    // 创建了一个ready chan,以便被通知唤醒
    ready := make(chan struct{})
    // 组装waiter
    w := waiter{n: n, ready: ready}
    // 插入waiters中
    elem := s.waiters.PushBack(w)
    s.mu.Unlock()


    // 等待
    // 阻塞等待,直到资源可用或ctx完成
    select {
    case <-ctx.Done(): // context的Done被关闭
      err := ctx.Err()
      s.mu.Lock()
      select {
      case <-ready:
        // 如果被唤醒了,忽略ctx的状态
        // 在canceled之后获取了信号量,不要试图去修复队列,假装没看到取消
        err = nil
      default: // 通知waiter
        isFront := s.waiters.Front() == elem
        s.waiters.Remove(elem)
        // 通知其它的waiters,检查是否有足够的资源
        if isFront && s.size > s.cur {
          s.notifyWaiters()
        }
      }
      s.mu.Unlock()
      return err
    case <-ready: // 被唤醒了
      return nil
    }
  }

其实,为了提高性能,这个方法中的 fast path 之外的代码,可以抽取成 acquireSlow 方法,以便其它 Acquire 被内联。

TryAcquire

TryAcquire获取权重为n的信号量而不阻塞,相比Acquire少了等待队列的处理。

TryAcquire 非阻塞地获取指定权重的资源,如果当前没有空闲资源,会直接返回false。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// TryAcquire获取权重为n的信号量而不阻塞。
// 成功时返回true。 失败时,返回false并保持信号量不变。
func (s *Weighted) TryAcquire(n int64) bool {
	s.mu.Lock()
	success := s.size-s.cur >= n && s.waiters.Len() == 0
	if success {
		s.cur += n
	}
	s.mu.Unlock()
	return success
}

Release

Release 方法将当前计数值减去释放的资源数 n,并唤醒等待队列中的调用者,看是否有足够的资源被获取。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11

func (s *Weighted) Release(n int64) {
    s.mu.Lock()
    s.cur -= n
    if s.cur < 0 {
      s.mu.Unlock()
      panic("semaphore: released more than held")
    }
    s.notifyWaiters()
    s.mu.Unlock()
}

notifyWaiters

notifyWaiters 方法就是逐个检查等待的调用者,如果资源不够,或者是没有等待者了,就返回:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (s *Weighted) notifyWaiters() {
    // 如果有阻塞的waiters,尝试去进行一一唤醒
	// 唤醒的时候,先进先出,避免被资源比较大的waiter被饿死
    for {
      next := s.waiters.Front()
      // 已经没有waiter了
      if next == nil {
        break // No more waiters blocked.
      }


      w := next.Value.(waiter)
      // waiter需要的资源不足
      if s.size-s.cur < w.n {
        // 没有足够的令牌供下一个waiter使用。我们可以继续(尝试
		// 查找请求较小的waiter),但在负载下可能会导致
		// 饥饿的大型请求;相反,我们留下所有剩余的waiter阻塞
		//
		// 考虑一个用作读写锁的信号量,带有N个令牌,N个reader和一位writer
		// 每个reader都可以通过Acquire(1)获取读锁。
		// writer写入可以通过Acquire(N)获得写锁定,但不包括所有的reader。
		// 如果我们允许读者在队列中前进,writer将会饿死-总是有一个令牌可供每个读者。
        //避免饥饿,这里还是按照先入先出的方式处理
        break
      }

      s.cur += w.n
      s.waiters.Remove(next)
      close(w.ready)
    }
  }

notifyWaiters 方法是按照先入先出的方式唤醒调用者。当释放 100 个资源的时候,如果第一个等待者需要 101 个资源,那么,队列中的所有等待者都会继续等待,即使有的等待者只需要 1 个资源。这样做的目的是避免饥饿,否则的话,资源可能总是被那些请求资源数小的调用者获取,这样一来,请求资源数巨大的调用者,就没有机会获得资源了。

总结

Acquire和TryAcquire都可用于获取资源,Acquire是可以阻塞的获取资源,TryAcquire只能非阻塞的获取资源;

Release对于waiters的唤醒原则,总是先进先出,避免资源需求比较大的waiter被饿死;

使用信号量的常见错误

保证信号量不出错的前提是正确地使用它,否则,公平性和安全性就会受到损害,导致程序 panic。

在使用信号量时,最常见的几个错误如下:

  • 请求了资源,但是忘记释放它;
  • 释放了从未请求的资源;
  • 长时间持有一个资源,即使不需要它;
  • 不持有一个资源,却直接使用它。

不过,即使你规避了这些坑,在同时使用多种资源,不同的信号量控制不同的资源的时候,也可能会出现死锁现象,比如哲学家就餐问题。

就 Go 扩展库实现的信号量来说,在调用 Release 方法的时候,你可以传递任意的整数。 但是,如果你传递一个比请求到的数量大的错误的数值,程序就会 panic。如果传递一个负数,会导致资源永久被持有。如果你请求的资源数比最大的资源数还大,那么,调用者可能永远被阻塞。

所以,使用信号量遵循的原则就是请求多少资源,就释放多少资源。你一定要注意,必须使用正确的方法传递整数,不要“耍小聪明”,而且,请求的资源数一定不要超过最大资源数。

其它信号量的实现

除了官方扩展库的实现,实际上,我们还有很多方法实现信号量,比较典型的就是使用 Channel 来实现。

根据之前的 Channel 类型的介绍以及 Go 内存模型的定义,你应该能想到,使用一个 buffer 为 n 的 Channel 很容易实现信号量,比如下面的代码,我们就是使用 chan struct{}类型来实现的。

在初始化这个信号量的时候,我们设置它的初始容量,代表有多少个资源可以使用。它使用 Lock 和 Unlock 方法实现请求资源和释放资源,正好实现了 Locker 接口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

  // Semaphore 数据结构,并且还实现了Locker接口
  type semaphore struct {
    sync.Locker
    ch chan struct{}
  }

  // 创建一个新的信号量
  func NewSemaphore(capacity int) sync.Locker {
    if capacity <= 0 {
      capacity = 1 // 容量为1就变成了一个互斥锁
    }
    return &semaphore{ch: make(chan struct{}, capacity)}
  }

  // 请求一个资源
  func (s *semaphore) Lock() {
    s.ch <- struct{}{}
  }

  // 释放资源
  func (s *semaphore) Unlock() {
    <-s.ch
  }

当然,你还可以自己扩展一些方法,比如在请求资源的时候使用 Context 参数(Acquire(ctx))、实现 TryLock 等功能。

看到这里,你可能会问,这个信号量的实现看起来非常简单,而且也能应对大部分的信号量的场景,为什么官方扩展库的信号量的实现不采用这种方法呢?其实,具体是什么原因,我也不知道,但是我必须要强调的是,官方的实现方式有这样一个功能:它可以一次请求多个资源,这是通过 Channel 实现的信号量所不具备的。

参考

go中x/sync/semaphore解读