CyclicBarrier允许一组 goroutine 彼此等待,到达一个共同的执行点。同时,因为它可以被重复使用,所以叫循环栅栏。具体的机制是,大家都在栅栏前等待,等全部都到齐了,就抬起栅栏放行。
事实上,这个 CyclicBarrier 是参考Java CyclicBarrier和C# Barrier的功能实现的。Java 提供了 CountDownLatch(倒计时器)和 CyclicBarrier(循环栅栏)两个类似的用于保证多线程到达同一个执行点的类,只不过前者是到达 0 的时候放行,后者是到达某个指定的数的时候放行。C# Barrier 功能也是类似的,你可以查看链接,了解它的具体用法。
你可能会觉得,CyclicBarrier 和 WaitGroup 的功能有点类似,确实是这样。不过,CyclicBarrier 更适合用在“固定数量的 goroutine 等待同一个执行点”的场景中,而且在放行 goroutine 之后,CyclicBarrier 可以重复利用,不像 WaitGroup 重用的时候,必须小心翼翼避免 panic。处理可重用的多 goroutine 等待同一个执行点的场景的时候,CyclicBarrier 和 WaitGroup 方法调用的对应关系如下:

可以看到,如果使用 WaitGroup 实现的话,调用比较复杂,不像 CyclicBarrier 那么清爽。更重要的是,如果想重用 WaitGroup,你还要保证,将 WaitGroup 的计数值重置到 n 的时候不会出现并发问题。
WaitGroup 更适合用在“一个 goroutine 等待一组 goroutine 到达同一个执行点”的场景中,或者是不需要重用的场景中。
实现原理
CyclicBarrier 有两个初始化方法:
- 第一个是 New 方法,它只需要一个参数,来指定循环栅栏参与者的数量;
- 第二个方法是 NewWithAction,它额外提供一个函数,可以在每一次到达执行点的时候执行一次。具体的时间点是在最后一个参与者到达之后,但是其它的参与者还未被放行之前。我们可以利用它,做放行之前的一些共享状态的更新等操作。
这两个方法的签名如下:
1
2
|
func New(parties int) CyclicBarrier
func NewWithAction(parties int, barrierAction func() error) CyclicBarrier
|
CyclicBarrier 是一个接口,定义的方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
type CyclicBarrier interface {
// 等待所有的参与者到达,如果被ctx.Done()中断,会返回ErrBrokenBarrier
Await(ctx context.Context) error
// 重置循环栅栏到初始化状态。如果当前有等待者,那么它们会返回ErrBrokenBarrier
Reset()
// 返回当前等待者的数量
GetNumberWaiting() int
// 参与者的数量
GetParties() int
// 循环栅栏是否处于中断状态
IsBroken() bool
}
|
循环栅栏的使用也很简单。循环栅栏的参与者只需调用 Await 等待,等所有的参与者都到达后,再执行下一步。当执行下一步的时候,循环栅栏的状态又恢复到初始的状态了,可以迎接下一轮同样多的参与者。
并发趣题:一氧化二氢制造工厂
题目是这样的:
有一个名叫大自然的搬运工的工厂,生产一种叫做一氧化二氢的神秘液体。这种液体的分子是由一个氧原子和两个氢原子组成的,也就是水。
这个工厂有多条生产线,每条生产线负责生产氧原子或者是氢原子,每条生产线由一个 goroutine 负责。
这些生产线会通过一个栅栏,只有一个氧原子生产线和两个氢原子生产线都准备好,才能生成出一个水分子,否则所有的生产线都会处于等待状态。也就是说,一个水分子必须由三个不同的生产线提供原子,而且水分子是一个一个按照顺序产生的,每生产一个水分子,就会打印出 HHO、HOH、OHH 三种形式的其中一种。HHH、OOH、OHO、HOO、OOO 都是不允许的。
生产线中氢原子的生产线为 2N 条,氧原子的生产线为 N 条。
你可以先想一下,我们怎么来实现呢?
首先,我们来定义一个 H2O 辅助数据类型,它包含两个信号量的字段和一个循环栅栏。
- semaH 信号量:控制氢原子。一个水分子需要两个氢原子,所以,氢原子的空槽数资源数设置为
- semaO 信号量:控制氧原子。一个水分子需要一个氧原子,所以资源数的空槽数设置为
- 循环栅栏:等待两个氢原子和一个氧原子填补空槽,直到任务完成。
我们来看下具体的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
package water
import (
"context"
"github.com/marusama/cyclicbarrier"
"golang.org/x/sync/semaphore"
)
// 定义水分子合成的辅助数据结构
type H2O struct {
semaH *semaphore.Weighted // 氢原子的信号量
semaO *semaphore.Weighted // 氧原子的信号量
b cyclicbarrier.CyclicBarrier // 循环栅栏,用来控制合成
}
func New() *H2O {
return &H2O{
semaH: semaphore.NewWeighted(2), //氢原子需要两个
semaO: semaphore.NewWeighted(1), // 氧原子需要一个
b: cyclicbarrier.New(3), // 需要三个原子才能合成
}
}
|
接下来,我们看看各条流水线的处理情况。
流水线分为氢原子处理流水线和氧原子处理流水线,首先,我们先看一下氢原子的流水线:如果有可用的空槽,氢原子的流水线的处理方法是 hydrogen,hydrogen 方法就会占用一个空槽(h2o.semaH.Acquire),输出一个 H 字符,然后等待栅栏放行。等其它的 goroutine 填补了氢原子的另一个空槽和氧原子的空槽之后,程序才可以继续进行。
1
2
3
4
5
6
7
8
|
func (h2o *H2O) hydrogen(releaseHydrogen func()) {
h2o.semaH.Acquire(context.Background(), 1)
releaseHydrogen() // 输出H
h2o.b.Await(context.Background()) //等待栅栏放行
h2o.semaH.Release(1) // 释放氢原子空槽
}
|
在栅栏放行之前,只有两个氢原子的空槽位和一个氧原子的空槽位。只有等栅栏放行之后,这些空槽位才会被释放。栅栏放行,就意味着一个水分子组成成功。
这个算法是不是正确呢?我们来编写一个单元测试检测一下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
package water
import (
"math/rand"
"sort"
"sync"
"testing"
"time"
)
func TestWaterFactory(t *testing.T) {
//用来存放水分子结果的channel
var ch chan string
releaseHydrogen := func() {
ch <- "H"
}
releaseOxygen := func() {
ch <- "O"
}
// 300个原子,300个goroutine,每个goroutine并发的产生一个原子
var N = 100
ch = make(chan string, N*3)
h2o := New()
// 用来等待所有的goroutine完成
var wg sync.WaitGroup
wg.Add(N * 3)
// 200个氢原子goroutine
for i := 0; i < 2*N; i++ {
go func() {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
h2o.hydrogen(releaseHydrogen)
wg.Done()
}()
}
// 100个氧原子goroutine
for i := 0; i < N; i++ {
go func() {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
h2o.oxygen(releaseOxygen)
wg.Done()
}()
}
//等待所有的goroutine执行完
wg.Wait()
// 结果中肯定是300个原子
if len(ch) != N*3 {
t.Fatalf("expect %d atom but got %d", N*3, len(ch))
}
// 每三个原子一组,分别进行检查。要求这一组原子中必须包含两个氢原子和一个氧原子,这样才能正确组成一个水分子。
var s = make([]string, 3)
for i := 0; i < N; i++ {
s[0] = <-ch
s[1] = <-ch
s[2] = <-ch
sort.Strings(s)
water := s[0] + s[1] + s[2]
if water != "HHO" {
t.Fatalf("expect a water molecule but got %s", water)
}
}
}
|
如果你没有学习 CyclicBarrier,你可能只会想到,用 WaitGroup 来实现这个水分子制造工厂的例子。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
type H2O struct {
semaH *semaphore.Weighted
semaO *semaphore.Weighted
wg sync.WaitGroup //将循环栅栏替换成WaitGroup
}
func New() *H2O {
var wg sync.WaitGroup
wg.Add(3)
return &H2O{
semaH: semaphore.NewWeighted(2),
semaO: semaphore.NewWeighted(1),
wg: wg,
}
}
func (h2o *H2O) hydrogen(releaseHydrogen func()) {
h2o.semaH.Acquire(context.Background(), 1)
releaseHydrogen()
// 标记自己已达到,等待其它goroutine到达
h2o.wg.Done()
h2o.wg.Wait()
h2o.semaH.Release(1)
}
func (h2o *H2O) oxygen(releaseOxygen func()) {
h2o.semaO.Acquire(context.Background(), 1)
releaseOxygen()
// 标记自己已达到,等待其它goroutine到达
h2o.wg.Done()
h2o.wg.Wait()
//都到达后重置wg
h2o.wg.Add(3)
h2o.semaO.Release(1)
}
|
你一看代码就知道了,使用 WaitGroup 非常复杂,而且,重用和 Done 方法的调用有并发的问题,程序可能 panic,远远没有使用循环栅栏更加简单直接。
所以,我建议你多了解一些并发原语,甚至是从其它编程语言、操作系统中学习更多的并发原语,这样可以让你的知识库更加丰富,在面对并发场景的时候,你也能更加游刃有余。