SingleFlight
SingleFlight 是 Go 开发组提供的一个扩展并发原语。它的作用是,在处理多个 goroutine 同时调用同一个函数的时候,只让一个 goroutine 去调用这个函数,等到这个 goroutine 返回结果的时候,再把结果返回给这几个同时调用的 goroutine,这样可以减少并发调用的数量。
这里我想先回答一个问题:标准库中的 sync.Once 也可以保证并发的 goroutine 只会执行一次函数 f,那么,SingleFlight 和 sync.Once 有什么区别呢?
其实,sync.Once 不是只在并发的时候保证只有一个 goroutine 执行函数 f,而是会保证永远只执行一次,而 SingleFlight 是每次调用都重新执行,并且在多个请求同时调用的时候只有一个执行。它们两个面对的场景是不同的,sync.Once 主要是用在单次初始化场景中,而 SingleFlight 主要用在合并并发请求的场景中,尤其是缓存场景。
如果你学会了 SingleFlight,在面对秒杀等大并发请求的场景,而且这些请求都是读请求时,你就可以把这些请求合并为一个请求,这样,你就可以将后端服务的压力从 n 降到 1。尤其是在面对后端是数据库这样的服务的时候,采用 SingleFlight 可以极大地提高性能。那么,话不多说,就让我们开始学习 SingleFlight 吧。
实现原理
SingleFlight 使用互斥锁 Mutex 和 Map 来实现。Mutex 提供并发时的读写保护,Map 用来保存同一个 key 的正在处理(in flight)的请求。
SingleFlight 的数据结构是 Group,它提供了三个方法。
- Do:这个方法执行一个函数,并返回函数执行的结果。你需要提供一个 key,对于同一个 key,在同一时间只有一个在执行,同一个 key 并发的请求会等待。第一个执行的请求返回的结果,就是它的返回结果。函数 fn 是一个无参的函数,返回一个结果或者 error,而 Do 方法会返回函数执行的结果或者是 error,shared 会指示 v 是否返回给多个请求。
- DoChan:类似 Do 方法,只不过是返回一个 chan,等 fn 函数执行完,产生了结果以后,就能从这个 chan 中接收这个结果。
- Forget:告诉 Group 忘记这个 key。这样一来,之后这个 key 请求会执行 f,而不是等待前一个未完成的 fn 函数的结果。
下面,我们来看具体的实现方法。
首先,SingleFlight 定义一个辅助对象 call,这个 call 就代表正在执行 fn 函数的请求或者是已经执行完的请求。Group 代表 SingleFlight。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
// errGoexit indicates the runtime.Goexit was called in
// the user given function.
var errGoexit = errors.New("runtime.Goexit was called")
// A panicError is an arbitrary value recovered from a panic
// with the stack trace during the execution of given function.
type panicError struct {
value interface{}
stack []byte
}
// Error implements error interface.
func (p *panicError) Error() string {
return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
}
func newPanicError(v interface{}) error {
stack := debug.Stack()
// The first line of the stack trace is of the form "goroutine N [status]:"
// but by the time the panic reaches Do the goroutine may no longer exist
// and its status will have changed. Trim out the misleading line.
if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
stack = stack[line+1:]
}
return &panicError{value: v, stack: stack}
}
// call is an in-flight or completed singleflight.Do call
// call 代表一个正在执行或已完成的函数调用。
type call struct {
wg sync.WaitGroup // 用于阻塞调用这个 call 的其他请求
// These fields are written once before the WaitGroup is done
// and are only read after the WaitGroup is done.
// 这两个字段在 WaitGroup.Done() 之前被写入(仅写一次),并在 WaitGroup.Done() 之后被读取。
val interface{}
err error // 函数执行后的error
// forgotten indicates whether Forget was called with this call's key
// while the call was still in flight.
// 指示当call在处理时是否要忘掉这个key
forgotten bool
// These fields are read and written with the singleflight
// mutex held before the WaitGroup is done, and are read but
// not written after the WaitGroup is done.
dups int
chans []chan<- Result
}
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
// group代表一个singleflight对象
type Group struct {
// 保护锁
mu sync.Mutex // protects m
// 懒加载
m map[string]*call // lazily initialized
}
// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
Val interface{}
Err error
Shared bool
}
|
Do
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
// 在 Do 函数中,函数先是判断这个 key 是否是第一次调用,如果是,就会进入 doCall 调用回调函数获取结果,
// 后续的请求就会阻塞在 c.wg.Wait() 这里,等待回调函数返回以后,直接拿到结果。
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
// 有可能要修改 g.m ,所以先上锁进行保护
g.mu.Lock()
// key-call 映射表不存在就创建(懒加载)
if g.m == nil {
g.m = make(map[string]*call)
}
// 如果 g.m 中已经存在对该 key 的请求,则新协程不会重复处理 key 的请求 ,所以释放锁,然后阻塞等待已存的 key 请求得到的结果。
if c, ok := g.m[key]; ok {
//如果已经存在相同的key
// 释放 mu 锁并 wait 在 wg 上,在 wg.wait() 返回后,把结果返回。
c.dups++
g.mu.Unlock()
//等待这个key的第一个请求完成
// 如果已存的 key 请求完成,阻塞状态会解除,wg.Wait() 返回
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true //使用第一个key的请求结果
}
// 如果没有在处理,则创建一个 call ,把 wg 加 1,把 call 存到 m 中表示已经有在请求了,然后释放锁
c := new(call) // 第一个请求,创建一个call
c.wg.Add(1)
g.m[key] = c //加入到key map中
g.mu.Unlock()
// 执行真正的请求函数,得到对该 key 请求的结果
g.doCall(c, key, fn) // 调用方法
// 返回请求结果
return c.val, c.err, c.dups > 0
}
|
doCall
doCall 方法会实际调用函数 fn:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
// use double-defer to distinguish panic from runtime.Goexit,
// more details see https://golang.org/cl/134395
defer func() {
// the given function invoked runtime.Goexit
if !normalReturn && !recovered {
c.err = errGoexit
}
// 释放 wg,会唤醒外部的 wg.wait() 阻塞
c.wg.Done()
// 共享变量 g.m 的写保护
g.mu.Lock()
defer g.mu.Unlock()
if !c.forgotten {
// 删除 key
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
// In order to prevent the waiting channels from being blocked forever,
// needs to ensure that this panic cannot be recovered.
if len(c.chans) > 0 {
go panic(e)
select {} // Keep this goroutine around so that it will appear in the crash dump.
} else {
panic(e)
}
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
} else {
// Normal return
// 结果递送
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
defer func() {
if !normalReturn {
// Ideally, we would wait to take a stack trace until we've determined
// whether this is a panic or a runtime.Goexit.
//
// Unfortunately, the only way we can distinguish the two is to see
// whether the recover stopped the goroutine from terminating, and by
// the time we know that, the part of the stack trace relevant to the
// panic has been discarded.
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
// 执行 fn,记录结果到 c.val, c.err 中
c.val, c.err = fn()
normalReturn = true
}()
if !normalReturn {
recovered = true
}
}
|
在默认情况下,forgotten==false,所以下一行默认会被调用,也就是说,第一个请求完成后,后续的同一个 key 的请求又重新开始新一次的 fn 函数的调用。
DoChan
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
//
// The returned channel will not be closed.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
// 有可能要修改 g.m ,所以先上锁进行保护
g.mu.Lock()
// key-call 映射表不存在就创建(懒加载)
if g.m == nil {
g.m = make(map[string]*call)
}
// 如果 g.m 中已经存在对该 key 的请求,则新协程不会重复处理 key 的请求 ,所以释放锁,然后阻塞等待已存的 key 请求得到的结果。
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch) // 追加 chan 用来接收返回结果
g.mu.Unlock()
return ch
}
// 如果没有在处理,则创建一个 call ,把 wg 加 1,把 call 存到 m 中表示已经有在请求了,然后释放锁
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
|
Forget
1
2
3
4
5
6
7
8
9
10
11
|
// Forget tells the singleflight to forget about a key. Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
g.mu.Lock()
if c, ok := g.m[key]; ok {
c.forgotten = true
}
delete(g.m, key)
g.mu.Unlock()
}
|
DoChan使用场景
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
package main
import (
"context"
"fmt"
"sync/atomic"
"time"
"golang.org/x/sync/singleflight"
)
type Result string
func find(ctx context.Context, query string) (Result, error) {
return Result(fmt.Sprintf("result for %q", query)), nil
}
func main() {
var g singleflight.Group
const n = 5
waited := int32(n)
done := make(chan struct{})
key := "https://weibo.com/1227368500/H3GIgngon"
for i := 0; i < n; i++ {
go func(j int) {
v, _, shared := g.Do(key, func() (interface{}, error) {
ret, err := find(context.Background(), key)
return ret, err
})
fmt.Printf("index: %d, val: %v, shared: %v\n", j, v, shared)
if atomic.AddInt32(&waited, -1) == 0 {
close(done)
}
}(i)
}
select {
case <-done:
case <-time.After(time.Hour):
fmt.Println("Do hangs")
}
}
|
输出结果如下:
1
2
3
4
5
6
|
go run main.go
index: 0, val: result for "https://weibo.com/1227368500/H3GIgngon", shared: true
index: 2, val: result for "https://weibo.com/1227368500/H3GIgngon", shared: true
index: 1, val: result for "https://weibo.com/1227368500/H3GIgngon", shared: true
index: 3, val: result for "https://weibo.com/1227368500/H3GIgngon", shared: true
index: 4, val: result for "https://weibo.com/1227368500/H3GIgngon", shared: true
|
如果函数执行一切正常,则所有请求都能顺利获得正确的数据。相反,如果函数执行遇到问题呢?由于 singleflight 是以阻塞读的方式来控制向下游请求的并发量,在第一个下游请求没有返回之前,所有请求都将被阻塞。
作为 Do() 的替代函数,singleflight 提供了 DoChan()。两者实现上完全一样,不同的是,DoChan() 通过 channel 返回结果。因此可以使用 select 语句实现超时控制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
ch := g.DoChan(key, func() (interface{}, error) {
ret, err := find(context.Background(), key)
return ret, err
})
// Create our timeout
timeout := time.After(500 * time.Millisecond)
var ret singleflight.Result
select {
case <-timeout: // Timeout elapsed
fmt.Println("Timeout")
return
case ret = <-ch: // Received result from channel
fmt.Printf("index: %d, val: %v, shared: %v\n", j, ret.Val, ret.Shared)
}
|
Forget使用场景
在一些对可用性要求极高的场景下,往往需要一定的请求饱和度来保证业务的最终成功率。一次请求还是多次请求,对于下游服务而言并没有太大区别,此时使用 singleflight 只是为了降低请求的数量级,那么使用 Forget() 提高下游请求的并发:
1
2
3
4
5
6
7
8
9
|
v, _, shared := g.Do(key, func() (interface{}, error) {
go func() {
time.Sleep(10 * time.Millisecond)
fmt.Printf("Deleting key: %v\n", key)
g.Forget(key)
}()
ret, err := find(context.Background(), key)
return ret, err
})
|
当有一个并发请求超过 10ms,那么将会有第二个请求发起,此时只有 10ms 内的请求最多发起一次请求,即最大并发:100 QPS。单次请求失败的影响大大降低。
参考
sync.singleflight 到底怎么用才对?