ErrGroup
ErrGroup是 Go 官方提供的一个同步扩展库。我们经常会碰到需要将一个通用的父任务拆成几个小任务并发执行的场景,其实,将一个大的任务拆成几个小任务并发执行,可以有效地提高程序的并发度。就像你在厨房做饭一样,你可以在蒸米饭的同时炒几个小菜,米饭蒸好了,菜同时也做好了,很快就能吃到可口的饭菜。
ErrGroup 就是用来应对这种场景的。它和 WaitGroup 有些类似,但是它提供功能更加丰富:
- 和 Context 集成;
- error 向上传播,可以把子任务的错误传递给 Wait 的调用者。
基本用法
golang.org/x/sync/errgroup 包下定义了一个 Group struct,它就是我们要介绍的 ErrGroup 并发原语,底层也是基于 WaitGroup 实现的。
在使用 ErrGroup 时,我们要用到三个方法,分别是 WithContext、Go 和 Wait。
WithContext
在创建一个 Group 对象时,需要使用 WithContext 方法:
1
|
func WithContext(ctx context.Context) (*Group, context.Context)
|
这个方法返回一个 Group 实例,同时还会返回一个使用 context.WithCancel(ctx) 生成的新 Context。一旦有一个子任务返回错误,或者是 Wait 调用返回,这个新 Context 就会被 cancel。
Group 的零值也是合法的,只不过,你就没有一个可以监控是否 cancel 的 Context 了。
注意,如果传递给 WithContext 的 ctx 参数,是一个可以 cancel 的 Context 的话,那么,它被 cancel 的时候,并不会终止正在执行的子任务。
Go
我们再来学习下执行子任务的 Go 方法:
1
|
func (g *Group) Go(f func() error)
|
传入的子任务函数 f 是类型为 func() error 的函数,如果任务执行成功,就返回 nil,否则就返回 error,并且会 cancel 那个新的 Context。
一个任务可以分成好多个子任务,而且,可能有多个子任务执行失败返回 error,不过,Wait 方法只会返回第一个错误,所以,如果想返回所有的错误,需要特别的处理,我先留个小悬念,一会儿再讲。
Wait
类似 WaitGroup,Group 也有 Wait 方法,等所有的子任务都完成后,它才会返回,否则只会阻塞等待。如果有多个子任务返回错误,它只会返回第一个出现的错误,如果所有的子任务都执行成功,就返回 nil:
1
|
func (g *Group) Wait() error
|
ErrGroup 使用例子
好了,知道了基本用法,下面我来给你介绍几个例子,帮助你全面地掌握 ErrGroup 的使用方法和应用场景。
简单例子:返回第一个错误
先来看一个简单的例子。在这个例子中,启动了三个子任务,其中,子任务 2 会返回执行失败,其它两个执行成功。在三个子任务都执行后,group.Wait 才会返回第 2 个子任务的错误。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
package main
import (
"errors"
"fmt"
"time"
"golang.org/x/sync/errgroup"
)
func main() {
var g errgroup.Group
// 启动第一个子任务,它执行成功
g.Go(func() error {
time.Sleep(5 * time.Second)
fmt.Println("exec #1")
return nil
})
// 启动第二个子任务,它执行失败
g.Go(func() error {
time.Sleep(10 * time.Second)
fmt.Println("exec #2")
return errors.New("failed to exec #2")
})
// 启动第三个子任务,它执行成功
g.Go(func() error {
time.Sleep(15 * time.Second)
fmt.Println("exec #3")
return nil
})
// 等待三个任务都完成
if err := g.Wait(); err == nil {
fmt.Println("Successfully exec all")
} else {
fmt.Println("failed:", err)
}
}
|
如果执行下面的这个程序,会显示三个任务都执行了,而 Wait 返回了子任务 2 的错误:

更进一步,返回所有子任务的错误
Group 只能返回子任务的第一个错误,后续的错误都会被丢弃。但是,有时候我们需要知道每个任务的执行情况。怎么办呢?这个时候,我们就可以用稍微有点曲折的方式去实现。我们使用一个 result slice 保存子任务的执行结果,这样,通过查询 result,就可以知道每一个子任务的结果了。
下面的这个例子,就是使用 result 记录每个子任务成功或失败的结果。其实,你不仅可以使用 result 记录 error 信息,还可以用它记录计算结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
package main
import (
"errors"
"fmt"
"time"
"golang.org/x/sync/errgroup"
)
func main() {
var g errgroup.Group
var result = make([]error, 3)
// 启动第一个子任务,它执行成功
g.Go(func() error {
time.Sleep(5 * time.Second)
fmt.Println("exec #1")
result[0] = nil // 保存成功或者失败的结果
return nil
})
// 启动第二个子任务,它执行失败
g.Go(func() error {
time.Sleep(10 * time.Second)
fmt.Println("exec #2")
result[1] = errors.New("failed to exec #2") // 保存成功或者失败的结果
return result[1]
})
// 启动第三个子任务,它执行成功
g.Go(func() error {
time.Sleep(15 * time.Second)
fmt.Println("exec #3")
result[2] = nil // 保存成功或者失败的结果
return nil
})
if err := g.Wait(); err == nil {
fmt.Printf("Successfully exec all. result: %v\n", result)
} else {
fmt.Printf("failed: %v\n", result)
}
}
|
任务执行流水线 Pipeline
Go 官方文档中还提供了一个 pipeline 的例子。这个例子是说,由一个子任务遍历文件夹下的文件,然后把遍历出的文件交给 20 个 goroutine,让这些 goroutine 并行计算文件的 md5。
这个例子中的计算逻辑你不需要重点掌握,我来把这个例子简化一下(如果你想看原始的代码,可以看这里):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
package main
import (
......
"golang.org/x/sync/errgroup"
)
// 一个多阶段的pipeline.使用有限的goroutine计算每个文件的md5值.
func main() {
m, err := MD5All(context.Background(), ".")
if err != nil {
log.Fatal(err)
}
for k, sum := range m {
fmt.Printf("%s:\t%x\n", k, sum)
}
}
type result struct {
path string
sum [md5.Size]byte
}
// 遍历根目录下所有的文件和子文件夹,计算它们的md5的值.
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
g, ctx := errgroup.WithContext(ctx)
paths := make(chan string) // 文件路径channel
g.Go(func() error {
defer close(paths) // 遍历完关闭paths chan
return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
...... //将文件路径放入到paths
return nil
})
})
// 启动20个goroutine执行计算md5的任务,计算的文件由上一阶段的文件遍历子任务生成.
c := make(chan result)
const numDigesters = 20
for i := 0; i < numDigesters; i++ {
g.Go(func() error {
for path := range paths { // 遍历直到paths chan被关闭
...... // 计算path的md5值,放入到c中
}
return nil
})
}
go func() {
g.Wait() // 20个goroutine以及遍历文件的goroutine都执行完
close(c) // 关闭收集结果的chan
}()
m := make(map[string][md5.Size]byte)
for r := range c { // 将md5结果从chan中读取到map中,直到c被关闭才退出
m[r.path] = r.sum
}
// 再次调用Wait,依然可以得到group的error信息
if err := g.Wait(); err != nil {
return nil, err
}
return m, nil
}
|
通过这个例子,你可以学习到多阶段 pipeline 的实现(这个例子是遍历文件夹和计算 md5 两个阶段),还可以学习到如何控制执行子任务的 goroutine 数量。
源码剖析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package errgroup provides synchronization, error propagation, and Context
// cancelation for groups of goroutines working on subtasks of a common task.
package errgroup
import (
"context"
"sync"
)
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid and does not cancel on error.
// Group 是处理同一总体任务的子任务的 goroutines 集合
// 变量wg可以保证在所有go程结束,这时候返回err的值。在赋值的时候,只需要第一个值,这里用到errOnce变量保证。cancel有值的情况下,回调函数出错会被调用(context.Context现在有点chan里面软中断的意味)。
type Group struct {
// 这里保存的是contex.WithCancel返回的第二个参数。
cancel func()
//可以等待所有go程结束
wg sync.WaitGroup
//errOnce可以保证它的回调函数只执行一次,这正是我们需要的,只返回第一个错误
errOnce sync.Once
//保存第一个错误
err error
}
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
// WithContext 返回一个从 ctx 派生的 context 的新的 Group
// 当函数第一次传递给 Go 时,派生 context 被取消,返回非零错误或第一次 Wait 返回,以发生者为优先。
// 这接口主要是保存context.WithCancel返回的结果
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
// 利用 waitGroup 的 wait
// 这里使用了g.wg.Wait函数等待所有go程结束,err有值,意味着出错,我们通过return告诉调用端。如果设置了cancel,我们通过调用cancel关闭这个context.Context
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
// 这里返回的error其实是从众多goroutine中返回第一个非nil的错误信息,所以这个错误信息如果全部都是一样的话,你是不知道到底是哪个goroutine中报的错,应该在goroutine内部就写清楚错误信息的别分类似可以加入id值这种。
return g.err
}
// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
// Go 在一个新的 goroutine 里面调用给的 function
// 第一次调用返回一个非零值 error,它的错误将会被返回在 Wait
// 首先,要保证g.wg.Wait调用正确,在go程起之前调用g.wg.Add(1)加上1个,结束之后通过defer g.wg.Done()减去1个。
// 无错是相安无事,这里看下出错。g.errOnce.DO保证回调函数会被执行一次。只做一次的事,就是把第一个错误收集起来g.err = err, 如果设置了cancel也关闭了g.cancel()
func (g *Group) Go(f func() error) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := f(); err != nil {
// 只会执行一次,一旦遇到错误,就停止总任务
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}
|
举例:gogrep
为了测试 sync.ErrGroup 的所有特性, 我已经写了一个递归搜索指定目录的Go程序。并且我还加了一个超时的机制 。当程序超时了,所有的 goroutines 将被取消,程序退出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
package main
import (
"bytes"
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
"time"
"golang.org/x/net/context"
"golang.org/x/sync/errgroup"
)
var (
Version = "N/A"
BuildTime = "N/A"
)
func main() {
duration := flag.Duration("timeout", 500*time.Millisecond, "timeout in milliseconds")
flag.Usage = func() {
fmt.Printf("%s by Brian Ketelsen\n", os.Args[0])
fmt.Printf("Version %s, Built: %s \n", Version, BuildTime)
fmt.Println("Usage:")
fmt.Printf(" gogrep [flags] path pattern \n")
fmt.Println("Flags:")
flag.PrintDefaults()
}
flag.Parse()
if flag.NArg() != 2 {
flag.Usage()
os.Exit(-1)
}
path := flag.Arg(0)
pattern := flag.Arg(1)
//这里,我给 context.Context 加了一个超时的时间。这个超时变量是通过 duration 来设置的。当超时时间到了,"ctx" 将接受到 channel 的超时警告。WithTimeout 同样也会返回一个取消的方法,为了防止context泄露,我们需要用defer调用cf.
ctx, cf := context.WithTimeout(context.Background(), *duration)
defer cf()
m, err := search(ctx, path, pattern)
if err != nil {
log.Fatal(err)
}
for _, name := range m {
fmt.Println(name)
}
fmt.Println(len(m), "hits")
}
//search() 方法的参数有 context, search path, 和 search pattern。最后把找到的文件和数量输出到终端上。
func search(ctx context.Context, root string, pattern string) ([]string, error) {
//首先 search() 函数创建了一个新的 errgroup。
g, ctx := errgroup.WithContext(ctx)
// 创建了 channel 用来传递被搜索到的文件。稍后我们将发送搜索到的文件到 channel 中去判断这些文件是否符合 pattern 参数。这个 channel 开启的buffer 数为100
paths := make(chan string, 100)
// get all the paths
g.Go(func() error {
//当所有的目录搜索完成时,我们将用 defer 来关闭 "paths" channel。后续我们将在更多的 goroutines 中使用这些文件。
defer close(paths)
return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
//使用 filepath 包提供的 Walk() 方法去递归查找指定目录的所有文件。我们将检查这些文件是否可读,是否带有 ".go" 后缀的文件
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
if !info.IsDir() && !strings.HasSuffix(info.Name(), ".go") {
return nil
}
//我在 select 里放了两个条件。首先发送 path 到 paths channel 里,另外的 goroutine 将会接收这个 channel 里的数据。第二个事件就是等待 context 超时发生。只要没到超时时间,就会继续处理文件。当超时后,context 的 Done channel 将发送数据导致 goroutine 返回,这个返回将停止文件搜索。
select {
case paths <- path:
case <-ctx.Done():
return ctx.Err()
}
return nil
})
})
//下面我将创建一个 channel 去处理文件是否符合 patter 参数。
c := make(chan string, 100)
for path := range paths {
p := path
g.Go(func() error {
data, err := ioutil.ReadFile(p)
if err != nil {
return err
}
if !bytes.Contains(data, []byte(pattern)) {
return nil
}
select {
case c <- p:
case <-ctx.Done():
return ctx.Err()
}
return nil
})
}
//这个函数将会等待所有的 errgroup 的 goroutines 全部完成后关闭结果的 channel。
go func() {
g.Wait()
close(c)
}()
//现在我们可以收集到所有的文件了。
var m []string
for r := range c {
m = append(m, r)
}
return m, g.Wait()
}
|
执行程序的结果如下:
1
2
3
|
$ gogrep -timeout 1000ms . fmt
gogrep.go
1 hits
|
如果你没有使用正确的参数,将会输出正确的使用方法:
1
2
3
4
5
6
|
gogrep by Brian Ketelsen
Flags:
-timeout duration
timeout in milliseconds (default 500ms)
Usage:
gogrep [flags] path pattern
|
kratos/errgroup
如果我们无限制地直接调用 ErrGroup 的 Go 方法,就可能会创建出非常多的 goroutine,太多的 goroutine 会带来调度和 GC 的压力,而且也会占用更多的内存资源。就像go#34457指出的那样,当前 Go 运行时创建的 g 对象只会增长和重用,不会回收,所以在高并发的情况下,也要尽可能减少 goroutine 的使用。
常用的一个手段就是使用 worker pool(goroutine pool),或者是类似containerd/stargz-snapshotter的方案,使用前面我们讲的信号量,信号量的资源的数量就是可以并行的 goroutine 的数量。但是在这一讲,我来介绍一些其它的手段,比如下面介绍的 bilibili 实现的 errgroup。
bilibili 实现了一个扩展的 ErrGroup,可以使用一个固定数量的 goroutine 处理子任务。如果不设置 goroutine 的数量,那么每个子任务都会比较“放肆地”创建一个 goroutine 并发执行。
除了可以控制并发 goroutine 的数量,它还提供了 2 个功能:
- cancel,失败的子任务可以 cancel 所有正在执行任务;
- recover,而且会把 panic 的堆栈信息放到 error 中,避免子任务 panic 导致的程序崩溃。
但是,有一点不太好的地方就是,一旦你设置了并发数,超过并发数的子任务需要等到调用者调用 Wait 之后才会执行,而不是只要 goroutine 空闲下来,就去执行。如果不注意这一点的话,可能会出现子任务不能及时处理的情况,这是这个库可以优化的一点。
另外,这个库其实是有一个并发问题的。在高并发的情况下,如果任务数大于设定的 goroutine 的数量,并且这些任务被集中加入到 Group 中,这个库的处理方式是把子任务加入到一个数组中,但是,这个数组不是线程安全的,有并发问题,问题就在于,下面图片中的标记为 96 行的那一行,这一行对 slice 的 append 操作不是线程安全的:

我们可以写一个简单的程序来测试这个问题:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
package main
import (
"context"
"fmt"
"sync/atomic"
"time"
"github.com/bilibili/kratos/pkg/sync/errgroup"
)
func main() {
var g errgroup.Group
g.GOMAXPROCS(1) // 只使用一个goroutine处理子任务
var count int64
g.Go(func(ctx context.Context) error {
time.Sleep(time.Second) //睡眠5秒,把这个goroutine占住
return nil
})
total := 10000
for i := 0; i < total; i++ { // 并发一万个goroutine执行子任务,理论上这些子任务都会加入到Group的待处理列表中
go func() {
g.Go(func(ctx context.Context) error {
atomic.AddInt64(&count, 1)
return nil
})
}()
}
// 等待所有的子任务完成。理论上10001个子任务都会被完成
if err := g.Wait(); err != nil {
panic(err)
}
got := atomic.LoadInt64(&count)
if got != int64(total) {
panic(fmt.Sprintf("expect %d but got %d", total, got))
}
}
|
运行这个程序的话,你就会发现死锁问题,因为我们的测试程序是一个简单的命令行工具,程序退出的时候,Go runtime 能检测到死锁问题。如果是一直运行的服务器程序,死锁问题有可能是检测不出来的,程序一直会 hang 在 Wait 的调用上。
使用方式
errgroup 包含三种常用方式
1、直接使用 此时不会因为一个任务失败导致所有任务被 cancel:
1
2
3
4
5
|
g := &errgroup.Group{}
g.Go(func(ctx context.Context) {
// NOTE: 此时 ctx 为 context.Background()
// do something
})
|
2、WithContext 使用 WithContext 时不会因为一个任务失败导致所有任务被 cancel:
1
2
3
4
5
|
g := errgroup.WithContext(ctx)
g.Go(func(ctx context.Context) {
// NOTE: 此时 ctx 为 errgroup.WithContext 传递的 ctx
// do something
})
|
3、WithCancel 使用 WithCancel 时如果有一个人任务失败会导致所有未进行或进行中的任务被 cancel:
1
2
3
4
5
|
g := errgroup.WithCancel(ctx)
g.Go(func(ctx context.Context) {
// NOTE: 此时 ctx 是从 errgroup.WithContext 传递的 ctx 派生出的 ctx
// do something
})
|
设置最大并行数 GOMAXPROCS 对以上三种使用方式均起效
NOTE: 由于 errgroup 实现问题,设定 GOMAXPROCS 的 errgroup 需要立即调用 Wait() 例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
g := errgroup.WithCancel(ctx)
g.GOMAXPROCS(2)
// task1
g.Go(func(ctx context.Context) {
fmt.Println("task1")
})
// task2
g.Go(func(ctx context.Context) {
fmt.Println("task2")
})
// task3
g.Go(func(ctx context.Context) {
fmt.Println("task3")
})
// NOTE: 此时设置的 GOMAXPROCS 为2, 添加了三个任务 task1, task2, task3 此时 task3 是不会运行的!
// 只有调用了 Wait task3 才有运行的机会
g.Wait() // task3 运行
|
源码分析
errgroup 在sync.WaitGroup的功能之上添加了错误传递,以及在发生不可恢复的错误时取消整个goroutine集合的功能(返回值cancel)。
kratos的加强版errgroup从统一goroutine控制,defer错误捕获,并发数量控制等方面对errgroup进行了功能扩充,利用匿名函数的参数context.Context的参数传递从整体上控制goroutine的生命周期。
Group 结构
与原生 errgroup 库相比,增加了三个成员:
1
2
3
4
5
6
7
8
9
10
11
12
|
type Group struct {
err error
wg sync.WaitGroup
errOnce sync.Once
workerOnce sync.Once
ch chan func(ctx context.Context) error
chs []func(ctx context.Context) error
ctx context.Context
cancel func()
}
|
我们先从结构体定义的角度来看待加强点。
- ch、chs、workerOnce用于控制goroutine的并发数量,在基础版的代码中我们发现在使用
Go(function()error)
函数的调用过程中是全开放的,即对于同时进行的goroutine数量并没有做限制。kratos在基础版本的基础上添加了一个chan控制并发数量,一个slice来缓存为并发的函数指针。
- kratos将产生的context对象缓存,并且更改了方法Go的函数签名加入了context参数,即
func (g *Group) Go(f func(ctx context.Context) error)
。在基础版本中,当error发生的是时候函数,仍然需要等到所有goroutine运行结束才会返回,kratos的Group可以使用成员函数ctx作为参数,从而控制全部并发的生命周期。
Go 方法改造
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func(ctx context.Context) error) {
g.wg.Add(1)
if g.ch != nil {
select {
case g.ch <- f:
default:
g.chs = append(g.chs, f)
}
return
}
go g.do(f)
}
|
从Go函数中我们看到,当g.ch != nil时,f函数首先尝试进入g.ch中,当g.ch满的时候存入g.chs中,这就是上面提到的,利用chan控制并发数量,利用slice作为函数指针的缓存。
再看看 do 方法的实现,do 方法在 defer 结构中加入了捕获异常的逻辑,具体步骤是:
- 执行用户方法 err = f(ctx)
- defer 中实现对堆栈信息的存储(如果发生错误)及 err 的保存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
func (g *Group) do(f func(ctx context.Context) error) {
ctx := g.ctx
if ctx == nil {
ctx = context.Background()
}
var err error
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 64<<10)
buf = buf[:runtime.Stack(buf, false)]
err = fmt.Errorf("errgroup: panic recovered: %s\n%s", r, buf)
}
if err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
g.wg.Done()
}()
err = f(ctx)
}
|
此外,Kratos 还提供了 errgroup.GOMAXPROCS 方法,用来设置最大并行数 GOMAXPROCS,其原理也比较简单,利用一个长度为 n 来 channel 来并发调用 do 方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// GOMAXPROCS set max goroutine to work.
func (g *Group) GOMAXPROCS(n int) {
if n <= 0 {
panic("errgroup: GOMAXPROCS must great than 0")
}
g.workerOnce.Do(func() {
// 利用 channel 实现限制
g.ch = make(chan func(context.Context) error, n)
for i := 0; i < n; i++ {
go func() {
for f := range g.ch {
g.do(f)
}
}()
}
})
}
|
GOMAXPROCE 函数初始化g.ch用于开启并发数量控制的开关。并且启动n个goroutine来消费传入的函数。
Wait函数中会不断将缓存中的函数不断压入chan中进行消费。
Wait 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g*Group) Wait() error {
if g.ch != nil {
for _, f := range g.chs {
g.ch <- f
}
}
g.wg.Wait()
if g.ch != nil {
close(g.ch) // let all receiver exit
}
if g.cancel != nil {
g.cancel()
}
return g.err
}
|
neilotoole/errgroup 是今年年中新出现的一个 ErrGroup 扩展库,它可以直接替换官方的 ErrGroup,方法都一样,原有功能也一样,只不过增加了可以控制并发 goroutine 的功能。它的方法集如下:
1
2
3
4
5
|
type Group
func WithContext(ctx context.Context) (*Group, context.Context)
func WithContextN(ctx context.Context, numG, qSize int) (*Group, context.Context)
func (g *Group) Go(f func() error)
func (g *Group) Wait() error
|
新增加的方法 WithContextN,可以设置并发的 goroutine 数,以及等待处理的子任务队列的大小。当队列满的时候,如果调用 Go 方法,就会被阻塞,直到子任务可以放入到队列中才返回。如果你传给这两个参数的值不是正整数,它就会使用 runtime.NumCPU 代替你传入的参数。
当然,你也可以把 bilibili 的 recover 功能扩展到这个库中,以避免子任务的 panic 导致程序崩溃。
facebookgo/errgroup
Facebook 提供的这个 ErrGroup,其实并不是对 Go 扩展库 ErrGroup 的扩展,而是对标准库 WaitGroup 的扩展。不过,因为它们的名字一样,处理的场景也类似,所以我把它也列在了这里。
标准库的 WaitGroup 只提供了 Add、Done、Wait 方法,而且 Wait 方法也没有返回子 goroutine 的 error。而 Facebook 提供的 ErrGroup 提供的 Wait 方法可以返回 error,而且可以包含多个 error。子任务在调用 Done 之前,可以把自己的 error 信息设置给 ErrGroup。接着,Wait 在返回的时候,就会把这些 error 信息返回给调用者。
我们来看下 Group 的方法:
1
2
3
4
5
|
type Group
func (g *Group) Add(delta int)
func (g *Group) Done()
func (g *Group) Error(e error)
func (g *Group) Wait() error
|
关于 Wait 方法,我刚刚已经介绍了它和标准库 WaitGroup 的不同,我就不多说了。这里还有一个不同的方法,就是 Error 方法,
我举个例子演示一下 Error 的使用方法。
在下面的这个例子中,第 26 行的子 goroutine 设置了 error 信息,第 39 行会把这个 error 信息输出出来。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package main
import (
"errors"
"fmt"
"time"
"github.com/facebookgo/errgroup"
)
func main() {
var g errgroup.Group
g.Add(3)
// 启动第一个子任务,它执行成功
go func() {
time.Sleep(5 * time.Second)
fmt.Println("exec #1")
g.Done()
}()
// 启动第二个子任务,它执行失败
go func() {
time.Sleep(10 * time.Second)
fmt.Println("exec #2")
g.Error(errors.New("failed to exec #2"))
g.Done()
}()
// 启动第三个子任务,它执行成功
go func() {
time.Sleep(15 * time.Second)
fmt.Println("exec #3")
g.Done()
}()
// 等待所有的goroutine完成,并检查error
if err := g.Wait(); err == nil {
fmt.Println("Successfully exec all")
} else {
fmt.Println("failed:", err)
}
}
|
关于 ErrGroup,你掌握这些就足够了,接下来,我再介绍几种有趣而实用的 Group 并发原语。这些并发原语都是控制一组子 goroutine 执行的面向特定场景的并发原语,当你遇见这些特定场景时,就可以参考这些库。
SizedGroup/ErrSizedGroup
go-pkgz/syncs提供了两个 Group 并发原语,分别是 SizedGroup 和 ErrSizedGroup。
SizedGroup 内部是使用信号量和 WaitGroup 实现的,它通过信号量控制并发的 goroutine 数量,或者是不控制 goroutine 数量,只控制子任务并发执行时候的数量(通过)。
它的代码实现非常简洁,你可以到它的代码库中了解它的具体实现,你一看就明白了,我就不多说了。下面我重点说说它的功能。
默认情况下,SizedGroup 控制的是子任务的并发数量,而不是 goroutine 的数量。在这种方式下,每次调用 Go 方法都不会被阻塞,而是新建一个 goroutine 去执行。
如果想控制 goroutine 的数量,你可以使用 syncs.Preemptive 设置这个并发原语的可选项。如果设置了这个可选项,但在调用 Go 方法的时候没有可用的 goroutine,那么调用者就会等待,直到有 goroutine 可以处理这个子任务才返回,这个控制在内部是使用信号量实现的。
我们来看一个使用 SizedGroup 的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
package main
import (
"context"
"fmt"
"sync/atomic"
"time"
"github.com/go-pkgz/syncs"
)
func main() {
// 设置goroutine数是10
swg := syncs.NewSizedGroup(10)
// swg := syncs.NewSizedGroup(10, syncs.Preemptive)
var c uint32
// 执行1000个子任务,只会有10个goroutine去执行
for i := 0; i < 1000; i++ {
swg.Go(func(ctx context.Context) {
time.Sleep(5 * time.Millisecond)
atomic.AddUint32(&c, 1)
})
}
// 等待任务完成
swg.Wait()
// 输出结果
fmt.Println(c)
}
|
ErrSizedGroup 为 SizedGroup 提供了 error 处理的功能,它的功能和 Go 官方扩展库的功能一样,就是等待子任务完成并返回第一个出现的 error。不过,它还提供了额外的功能,我来介绍一下。
第一个额外的功能,就是可以控制并发的 goroutine 数量,这和 SizedGroup 的功能一样。
第二个功能是,如果设置了 termOnError,子任务出现第一个错误的时候会 cancel Context,而且后续的 Go 调用会直接返回,Wait 调用者会得到这个错误,这相当于是遇到错误快速返回。如果没有设置 termOnError,Wait 会返回所有的子任务的错误。
不过,ErrSizedGroup 和 SizedGroup 设计得不太一致的地方是,SizedGroup 可以把 Context 传递给子任务,这样可以通过 cancel 让子任务中断执行,但是 ErrSizedGroup 却没有实现。我认为,这是一个值得加强的地方。
gollback
gollback也是用来处理一组子任务的执行的,不过它解决了 ErrGroup 收集子任务返回结果的痛点。使用 ErrGroup 时,如果你要收到子任务的结果和错误,你需要定义额外的变量收集执行结果和错误,但是这个库可以提供更便利的方式。
刚刚在说官方扩展库 ErrGroup 的时候,举了一些例子(返回第一个错误的例子和返回所有子任务错误的例子),在例子中,如果想得到每一个子任务的结果或者 error,我们需要额外提供一个 result slice 进行收集。使用 gollback 的话,就不需要这些额外的处理了,因为它的方法会把结果和 error 信息都返回。
接下来,我们看一下它提供的三个方法,分别是 All、Race 和 Retry。
All 方法
All 方法的签名如下:
1
|
func All(ctx context.Context, fns ...AsyncFunc) ([]interface{}, []error)
|
它会等待所有的异步函数(AsyncFunc)都执行完才返回,而且返回结果的顺序和传入的函数的顺序保持一致。第一个返回参数是子任务的执行结果,第二个参数是子任务执行时的错误信息。
其中,异步函数的定义如下:
1
|
type AsyncFunc func(ctx context.Context) (interface{}, error)
|
可以看到,ctx 会被传递给子任务。如果你 cancel 这个 ctx,可以取消子任务。
我们来看一个使用 All 方法的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package main
import (
"context"
"errors"
"fmt"
"github.com/vardius/gollback"
"time"
)
func main() {
rs, errs := gollback.All( // 调用All方法
context.Background(),
func(ctx context.Context) (interface{}, error) {
time.Sleep(3 * time.Second)
return 1, nil // 第一个任务没有错误,返回1
},
func(ctx context.Context) (interface{}, error) {
return nil, errors.New("failed") // 第二个任务返回一个错误
},
func(ctx context.Context) (interface{}, error) {
return 3, nil // 第三个任务没有错误,返回3
},
)
fmt.Println(rs) // 输出子任务的结果
fmt.Println(errs) // 输出子任务的错误信息
}
|
Race 方法
Race 方法跟 All 方法类似,只不过,在使用 Race 方法的时候,只要一个异步函数执行没有错误,就立马返回,而不会返回所有的子任务信息。如果所有的子任务都没有成功,就会返回最后一个 error 信息。
Race 方法签名如下:
1
|
func Race(ctx context.Context, fns ...AsyncFunc) (interface{}, error)
|
如果有一个正常的子任务的结果返回,Race 会把传入到其它子任务的 Context cancel 掉,这样子任务就可以中断自己的执行。
Race 的使用方法也跟 All 方法类似,我就不再举例子了,你可以把 All 方法的例子中的 All 替换成 Race 方式测试下。
Retry 方法
Retry 不是执行一组子任务,而是执行一个子任务。如果子任务执行失败,它会尝试一定的次数,如果一直不成功 ,就会返回失败错误 ,如果执行成功,它会立即返回。如果 retires 等于 0,它会永远尝试,直到成功。
1
|
func Retry(ctx context.Context, retires int, fn AsyncFunc) (interface{}, error)
|
再来看一个使用 Retry 的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package main
import (
"context"
"errors"
"fmt"
"github.com/vardius/gollback"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 尝试5次,或者超时返回
res, err := gollback.Retry(ctx, 5, func(ctx context.Context) (interface{}, error) {
return nil, errors.New("failed")
})
fmt.Println(res) // 输出结果
fmt.Println(err) // 输出错误信息
}
|
Hunch
Hunch提供的功能和 gollback 类似,不过它提供的方法更多,而且它提供的和 gollback 相应的方法,也有一些不同。我来一一介绍下。
它定义了执行子任务的函数,这和 gollback 的 AyncFunc 是一样的,它的定义如下:
1
|
type Executable func(context.Context) (interface{}, error)
|
All 方法
All 方法的签名如下:
1
|
func All(parentCtx context.Context, execs ...Executable) ([]interface{}, error)
|
它会传入一组可执行的函数(子任务),返回子任务的执行结果。和 gollback 的 All 方法不一样的是,一旦一个子任务出现错误,它就会返回错误信息,执行结果(第一个返回参数)为 nil。
Take 方法
Take 方法的签名如下:
1
2
|
func Take(parentCtx context.Context, num int, execs ...Executable) ([]interface{}, error)
|
你可以指定 num 参数,只要有 num 个子任务正常执行完没有错误,这个方法就会返回这几个子任务的结果。一旦一个子任务出现错误,它就会返回错误信息,执行结果(第一个返回参数)为 nil。
Last 方法
Last 方法的签名如下:
1
|
func Last(parentCtx context.Context, num int, execs ...Executable) ([]interface{}, error)
|
它只返回最后 num 个正常执行的、没有错误的子任务的结果。一旦一个子任务出现错误,它就会返回错误信息,执行结果(第一个返回参数)为 nil。
比如 num 等于 1,那么,它只会返回最后一个无错的子任务的结果。
Retry 方法
Retry 方法的签名如下:
1
|
func Retry(parentCtx context.Context, retries int, fn Executable) (interface{}, error)
|
它的功能和 gollback 的 Retry 方法的功能一样,如果子任务执行出错,就会不断尝试,直到成功或者是达到重试上限。如果达到重试上限,就会返回错误。如果 retries 等于 0,它会不断尝试。
Waterfall 方法
Waterfall 方法签名如下:
1
|
func Waterfall(parentCtx context.Context, execs ...ExecutableInSequence) (interface{}, error)
|
它其实是一个 pipeline 的处理方式,所有的子任务都是串行执行的,前一个子任务的执行结果会被当作参数传给下一个子任务,直到所有的任务都完成,返回最后的执行结果。一旦一个子任务出现错误,它就会返回错误信息,执行结果(第一个返回参数)为 nil。
gollback 和 Hunch 是属于同一类的并发原语,对一组子任务的执行结果,可以选择一个结果或者多个结果,这也是现在热门的微服务常用的服务治理的方法。
schedgroup
接下来,我再介绍一个和时间相关的处理一组 goroutine 的并发原语 schedgroup。
schedgroup是 Matt Layher 开发的 worker pool,可以指定任务在某个时间或者某个时间之后执行。Matt Layher 也是一个知名的 Gopher,经常在一些会议上分享一些他的 Go 开发经验,他在 GopherCon Europe 2020 大会上专门介绍了这个并发原语:schedgroup: a timer-based goroutine concurrency primitive ,课下你可以点开这个链接看一下,下面我来给你介绍一些重点。
这个并发原语包含的方法如下:
1
2
3
4
5
|
type Group
func New(ctx context.Context) *Group
func (g *Group) Delay(delay time.Duration, fn func())
func (g *Group) Schedule(when time.Time, fn func())
func (g *Group) Wait() error
|
我来介绍下这些方法。
先说 Delay 和 Schedule。
它们的功能其实是一样的,都是用来指定在某个时间或者之后执行一个函数。只不过,Delay 传入的是一个 time.Duration 参数,它会在 time.Now()+delay 之后执行函数,而 Schedule 可以指定明确的某个时间执行。
再来说说 Wait 方法。
这个方法调用会阻塞调用者,直到之前安排的所有子任务都执行完才返回。如果 Context 被取消,那么,Wait 方法会返回这个 cancel error。
在使用 Wait 方法的时候,有 2 点需要注意一下。
第一点是,如果调用了 Wait 方法,你就不能再调用它的 Delay 和 Schedule 方法,否则会 panic。
第二点是,Wait 方法只能调用一次,如果多次调用的话,就会 panic。
你可能认为,简单地使用 timer 就可以实现这个功能。其实,如果只有几个子任务,使用 timer 不是问题,但一旦有大量的子任务,而且还要能够 cancel,那么,使用 timer 的话,CPU 资源消耗就比较大了。所以,schedgroup 在实现的时候,就使用 container/heap,按照子任务的执行时间进行排序,这样可以避免使用大量的 timer,从而提高性能。
我们来看一个使用 schedgroup 的例子,下面代码会依次输出 1、2、3:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
sg := schedgroup.New(context.Background())
// 设置子任务分别在100、200、300之后执行
for i := 0; i < 3; i++ {
n := i + 1
sg.Delay(time.Duration(n)*100*time.Millisecond, func() {
log.Println(n) //输出任务编号
})
}
// 等待所有的子任务都完成
if err := sg.Wait(); err != nil {
log.Fatalf("failed to wait: %v", err)
}
|
参考
Kratos 源码分析:Errgroup 机制
【GoCN酷Go推荐】 errgroup 并发小工具