SingleFlight

SingleFlight 是 Go 开发组提供的一个扩展并发原语。它的作用是,在处理多个 goroutine 同时调用同一个函数的时候,只让一个 goroutine 去调用这个函数,等到这个 goroutine 返回结果的时候,再把结果返回给这几个同时调用的 goroutine,这样可以减少并发调用的数量。

这里我想先回答一个问题:标准库中的 sync.Once 也可以保证并发的 goroutine 只会执行一次函数 f,那么,SingleFlight 和 sync.Once 有什么区别呢?

其实,sync.Once 不是只在并发的时候保证只有一个 goroutine 执行函数 f,而是会保证永远只执行一次,而 SingleFlight 是每次调用都重新执行,并且在多个请求同时调用的时候只有一个执行。它们两个面对的场景是不同的,sync.Once 主要是用在单次初始化场景中,而 SingleFlight 主要用在合并并发请求的场景中,尤其是缓存场景。

如果你学会了 SingleFlight,在面对秒杀等大并发请求的场景,而且这些请求都是读请求时,你就可以把这些请求合并为一个请求,这样,你就可以将后端服务的压力从 n 降到 1。尤其是在面对后端是数据库这样的服务的时候,采用 SingleFlight 可以极大地提高性能。那么,话不多说,就让我们开始学习 SingleFlight 吧。

实现原理

SingleFlight 使用互斥锁 Mutex 和 Map 来实现。Mutex 提供并发时的读写保护,Map 用来保存同一个 key 的正在处理(in flight)的请求。

SingleFlight 的数据结构是 Group,它提供了三个方法。

  • Do:这个方法执行一个函数,并返回函数执行的结果。你需要提供一个 key,对于同一个 key,在同一时间只有一个在执行,同一个 key 并发的请求会等待。第一个执行的请求返回的结果,就是它的返回结果。函数 fn 是一个无参的函数,返回一个结果或者 error,而 Do 方法会返回函数执行的结果或者是 error,shared 会指示 v 是否返回给多个请求。
  • DoChan:类似 Do 方法,只不过是返回一个 chan,等 fn 函数执行完,产生了结果以后,就能从这个 chan 中接收这个结果。
  • Forget:告诉 Group 忘记这个 key。这样一来,之后这个 key 请求会执行 f,而不是等待前一个未完成的 fn 函数的结果。

下面,我们来看具体的实现方法。

首先,SingleFlight 定义一个辅助对象 call,这个 call 就代表正在执行 fn 函数的请求或者是已经执行完的请求。Group 代表 SingleFlight。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// errGoexit indicates the runtime.Goexit was called in
// the user given function.
var errGoexit = errors.New("runtime.Goexit was called")

// A panicError is an arbitrary value recovered from a panic
// with the stack trace during the execution of given function.
type panicError struct {
	value interface{}
	stack []byte
}

// Error implements error interface.
func (p *panicError) Error() string {
	return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
}

func newPanicError(v interface{}) error {
	stack := debug.Stack()

	// The first line of the stack trace is of the form "goroutine N [status]:"
	// but by the time the panic reaches Do the goroutine may no longer exist
	// and its status will have changed. Trim out the misleading line.
	if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
		stack = stack[line+1:]
	}
	return &panicError{value: v, stack: stack}
}

// call is an in-flight or completed singleflight.Do call
// call 代表一个正在执行或已完成的函数调用。
type call struct {
	wg sync.WaitGroup // 用于阻塞调用这个 call 的其他请求

	// These fields are written once before the WaitGroup is done
	// and are only read after the WaitGroup is done.
    // 这两个字段在 WaitGroup.Done() 之前被写入(仅写一次),并在 WaitGroup.Done() 之后被读取。
	val interface{}
	err error   // 函数执行后的error

	// forgotten indicates whether Forget was called with this call's key
	// while the call was still in flight.
    // 指示当call在处理时是否要忘掉这个key
	forgotten bool

	// These fields are read and written with the singleflight
	// mutex held before the WaitGroup is done, and are read but
	// not written after the WaitGroup is done.
	dups  int
	chans []chan<- Result
}

// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
// group代表一个singleflight对象
type Group struct {
     // 保护锁
	mu sync.Mutex       // protects m
     // 懒加载
	m  map[string]*call // lazily initialized
}

// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
	Val    interface{}
	Err    error
	Shared bool
}

Do

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
// 在 Do 函数中,函数先是判断这个 key 是否是第一次调用,如果是,就会进入 doCall 调用回调函数获取结果,
// 后续的请求就会阻塞在 c.wg.Wait() 这里,等待回调函数返回以后,直接拿到结果。
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    // 有可能要修改 g.m ,所以先上锁进行保护
	g.mu.Lock()
    // key-call 映射表不存在就创建(懒加载)
	if g.m == nil {
		g.m = make(map[string]*call)
	}
    // 如果 g.m 中已经存在对该 key 的请求,则新协程不会重复处理 key 的请求 ,所以释放锁,然后阻塞等待已存的 key 请求得到的结果。
	if c, ok := g.m[key]; ok {
        //如果已经存在相同的key
        // 释放 mu 锁并 wait 在 wg 上,在 wg.wait() 返回后,把结果返回。
		c.dups++
		g.mu.Unlock()
        //等待这个key的第一个请求完成
        // 如果已存的 key 请求完成,阻塞状态会解除,wg.Wait() 返回
		c.wg.Wait()

		if e, ok := c.err.(*panicError); ok {
			panic(e)
		} else if c.err == errGoexit {
			runtime.Goexit()
		}
		return c.val, c.err, true //使用第一个key的请求结果
	}
    // 如果没有在处理,则创建一个 call ,把 wg 加 1,把 call 存到 m 中表示已经有在请求了,然后释放锁
	c := new(call) // 第一个请求,创建一个call
	c.wg.Add(1)
	g.m[key] = c    //加入到key map中
	g.mu.Unlock()
    // 执行真正的请求函数,得到对该 key 请求的结果
	g.doCall(c, key, fn)    // 调用方法
    // 返回请求结果
	return c.val, c.err, c.dups > 0
}

doCall

doCall 方法会实际调用函数 fn:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	normalReturn := false
	recovered := false

	// use double-defer to distinguish panic from runtime.Goexit,
	// more details see https://golang.org/cl/134395
	defer func() {
		// the given function invoked runtime.Goexit
		if !normalReturn && !recovered {
			c.err = errGoexit
		}
        // 释放 wg,会唤醒外部的 wg.wait() 阻塞
		c.wg.Done()
        // 共享变量 g.m 的写保护
		g.mu.Lock()
		defer g.mu.Unlock()
		if !c.forgotten {
            // 删除 key
			delete(g.m, key)
		}

		if e, ok := c.err.(*panicError); ok {
			// In order to prevent the waiting channels from being blocked forever,
			// needs to ensure that this panic cannot be recovered.
			if len(c.chans) > 0 {
				go panic(e)
				select {} // Keep this goroutine around so that it will appear in the crash dump.
			} else {
				panic(e)
			}
		} else if c.err == errGoexit {
			// Already in the process of goexit, no need to call again
		} else {
			// Normal return
            // 结果递送
			for _, ch := range c.chans {
				ch <- Result{c.val, c.err, c.dups > 0}
			}
		}
	}()

	func() {
		defer func() {
			if !normalReturn {
				// Ideally, we would wait to take a stack trace until we've determined
				// whether this is a panic or a runtime.Goexit.
				//
				// Unfortunately, the only way we can distinguish the two is to see
				// whether the recover stopped the goroutine from terminating, and by
				// the time we know that, the part of the stack trace relevant to the
				// panic has been discarded.
				if r := recover(); r != nil {
					c.err = newPanicError(r)
				}
			}
		}()
         // 执行 fn,记录结果到 c.val, c.err 中
		c.val, c.err = fn()
		normalReturn = true
	}()

	if !normalReturn {
		recovered = true
	}
}

在默认情况下,forgotten==false,所以下一行默认会被调用,也就是说,第一个请求完成后,后续的同一个 key 的请求又重新开始新一次的 fn 函数的调用。

DoChan

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
//
// The returned channel will not be closed.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
	ch := make(chan Result, 1)
    // 有可能要修改 g.m ,所以先上锁进行保护
	g.mu.Lock()
    // key-call 映射表不存在就创建(懒加载)
	if g.m == nil {
		g.m = make(map[string]*call)
	}
    // 如果 g.m 中已经存在对该 key 的请求,则新协程不会重复处理 key 的请求 ,所以释放锁,然后阻塞等待已存的 key 请求得到的结果。
	if c, ok := g.m[key]; ok {
		c.dups++
		c.chans = append(c.chans, ch) // 追加 chan 用来接收返回结果
		g.mu.Unlock()
		return ch
	}
    // 如果没有在处理,则创建一个 call ,把 wg 加 1,把 call 存到 m 中表示已经有在请求了,然后释放锁
	c := &call{chans: []chan<- Result{ch}}
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	go g.doCall(c, key, fn)

	return ch
}

Forget

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// Forget tells the singleflight to forget about a key.  Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
	g.mu.Lock()
	if c, ok := g.m[key]; ok {
		c.forgotten = true
	}
	delete(g.m, key)
	g.mu.Unlock()
}

DoChan使用场景

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
	"context"
	"fmt"
	"sync/atomic"
	"time"

	"golang.org/x/sync/singleflight"
)

type Result string

func find(ctx context.Context, query string) (Result, error) {
	return Result(fmt.Sprintf("result for %q", query)), nil
}

func main() {
	var g singleflight.Group
	const n = 5
	waited := int32(n)
	done := make(chan struct{})
	key := "https://weibo.com/1227368500/H3GIgngon"
	for i := 0; i < n; i++ {
		go func(j int) {
			v, _, shared := g.Do(key, func() (interface{}, error) {
				ret, err := find(context.Background(), key)
				return ret, err
			})
			fmt.Printf("index: %d, val: %v, shared: %v\n", j, v, shared)
			if atomic.AddInt32(&waited, -1) == 0 {
				close(done)
			}
		}(i)
	}

	select {
	case <-done:
	case <-time.After(time.Hour):
		fmt.Println("Do hangs")
	}
}

输出结果如下:

1
2
3
4
5
6
go run main.go
index: 0, val: result for "https://weibo.com/1227368500/H3GIgngon", shared: true
index: 2, val: result for "https://weibo.com/1227368500/H3GIgngon", shared: true
index: 1, val: result for "https://weibo.com/1227368500/H3GIgngon", shared: true
index: 3, val: result for "https://weibo.com/1227368500/H3GIgngon", shared: true
index: 4, val: result for "https://weibo.com/1227368500/H3GIgngon", shared: true

如果函数执行一切正常,则所有请求都能顺利获得正确的数据。相反,如果函数执行遇到问题呢?由于 singleflight 是以阻塞读的方式来控制向下游请求的并发量,在第一个下游请求没有返回之前,所有请求都将被阻塞。

作为 Do() 的替代函数,singleflight 提供了 DoChan()。两者实现上完全一样,不同的是,DoChan() 通过 channel 返回结果。因此可以使用 select 语句实现超时控制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
ch := g.DoChan(key, func() (interface{}, error) {
    ret, err := find(context.Background(), key)
    return ret, err
})
// Create our timeout
timeout := time.After(500 * time.Millisecond)

var ret singleflight.Result
select {
case <-timeout: // Timeout elapsed
        fmt.Println("Timeout")
    return
case ret = <-ch: // Received result from channel
    fmt.Printf("index: %d, val: %v, shared: %v\n", j, ret.Val, ret.Shared)
}

Forget使用场景

在一些对可用性要求极高的场景下,往往需要一定的请求饱和度来保证业务的最终成功率。一次请求还是多次请求,对于下游服务而言并没有太大区别,此时使用 singleflight 只是为了降低请求的数量级,那么使用 Forget() 提高下游请求的并发:

1
2
3
4
5
6
7
8
9
v, _, shared := g.Do(key, func() (interface{}, error) {
    go func() {
        time.Sleep(10 * time.Millisecond)
        fmt.Printf("Deleting key: %v\n", key)
        g.Forget(key)
    }()
    ret, err := find(context.Background(), key)
    return ret, err
})

当有一个并发请求超过 10ms,那么将会有第二个请求发起,此时只有 10ms 内的请求最多发起一次请求,即最大并发:100 QPS。单次请求失败的影响大大降低。

参考

sync.singleflight 到底怎么用才对?