限流算法
在开发中我们可能会遇到接口访问频次过高,这时候就需要做流量限制,你可能是用的 Nginx 这种 Web Server 来控制也可能是用了一些流行的类库实现。在分布式系统中更是如此,限流是高并发系统的一大杀器,在设计限流算法之前我们先来了解一下它们是什么。
限流的英文是 Rate limit(速率限制),维基百科中的定义比较简单。我们编写的程序可以被外部调用,Web 应用通过浏览器或者其他方式的 HTTP 方式访问,接口的访问频率可能会非常快,如果我们没有对接口访问频次做限制可能会导致服务器无法承受过高的压力挂掉,这时候也可能会产生数据丢失。
而限流算法就可以帮助我们去控制每个接口或程序的函数被调用频率,它有点儿像保险丝,防止系统因为超过访问频率或并发量而引起瘫痪。我们可能在调用某些第三方的接口的时候会看到类似这样的响应头:
1
2
3
|
X-RateLimit-Limit: 60 //每秒60次请求
X-RateLimit-Remaining: 23 //当前还剩下多少次
X-RateLimit-Reset: 1540650789 //限制重置时间
|
上面的 HTTP Response 是通过响应头告诉调用方服务端的限流频次是怎样的,保证后端的接口访问上限。为了解决限流问题出现了很多的算法,它们都有不同的用途,通常的策略就是拒绝超出的请求,或者让超出的请求排队等待。
漏桶算法
漏桶可以看作是一个带有常量服务时间的单服务器队列,如果漏桶(包缓存)溢出,那么数据包会被丢弃。 在网络中,漏桶算法可以控制端口的流量输出速率,平滑网络上的突发流量,实现流量整形,从而为网络提供一个稳定的流量。
如图所示,把请求比作是水,水来了都先放进桶里,并以限定的速度出水,当水来得过猛而出水不够快时就会导致水直接溢出,即拒绝服务。

可以看出,漏桶算法可以很好的控制流量的访问速度,一旦超过该速度就拒绝服务。
这张图中有 2 个变量,一个是桶的大小(capacity),另一个是水桶漏洞的大小(rate),那么我们可以写下如下代码来实现:
漏桶算法有以下特点:
- 一个固定容量的漏桶,按照常量固定速率流出水滴。
- 如果桶是空的,则不需流出水滴。
- 可以以任意速率流入水滴到漏桶。
- 如果流入水滴超出了桶的容量,则流入的水滴溢出了(被丢弃),而漏桶容量是不变的。
作为计量工具(The Leaky Bucket Algorithm as a Meter)时,可以用于流量整形(Traffic Shaping)和流量控制(TrafficPolicing),漏桶算法的描述如下:
令牌桶算法
令牌桶算法是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。典型情况下,令牌桶算法用来控制发送到网络上的数据的数目,并允许突发数据的发送。
令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。
从原理上看,令牌桶算法和漏桶算法是相反的,一个“进水”,一个是“漏水”。一开始桶是空的,系统按固定的时间(rate)往桶里添加令牌,直到桶里的令牌数满,多余的请求会被丢弃。当请求来的时候,从桶里移除一个令牌,如果桶是空的则拒绝请求或者阻塞。
令牌桶是一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌。令牌桶算法的描述如下:
- 假设限制2r/s,则按照500毫秒的固定速率往桶中添加令牌。
- 桶中最多存放 b 个令牌,当桶满时,新添加的令牌被丢弃或拒绝。
- 当一个 n 个字节大小的数据包到达,将从桶中删除n 个令牌,接着数据包被发送到网络上。
- 如果桶中的令牌不足 n 个,则不会删除令牌,且该数据包将被限流(要么丢弃,要么缓冲区等待)。

算法比较
漏桶算法与令牌桶算法在表面看起来类似,很容易将两者混淆。但事实上,这两者具有截然不同的特性,且为不同的目的而使用。漏桶算法与令牌桶算法的区别在于,漏桶算法能够强行限制数据的传输速率,令牌桶算法能够在限制数据的平均传输速率的同时还允许某种程度的突发传输:

需要注意的是,在某些情况下,漏桶算法不能够有效地使用网络资源,因为漏桶的漏出速率是固定的,所以即使网络中没有发生拥塞,漏桶算法也不能使某一个单独的数据流达到端口速率。因此,漏桶算法对于存在突发特性的流量来说缺乏效率。而令牌桶算法则能够满足这些具有突发特性的流量。
令牌桶算法:time-rate
golang 提供了拓展库(golang.org/x/time/rate)提供了令牌桶算法限流器.
我们可以使用以下方法构造一个限流器对象:
1
|
limiter := NewLimiter(10, 1);
|
这里有两个参数:
- 第一个参数是 r Limit。代表每秒可以向 Token 桶中产生多少 token。Limit 实际上是 float64 的别名。
- 第二个参数是 b int。b 代表 Token 桶的容量大小。
那么,对于以上例子来说,其构造出的限流器含义为,其令牌桶大小为 1, 以每秒 10 个 Token 的速率向桶中放置 Token。
除了直接指定每秒产生的 Token 个数外,还可以用 Every 方法来指定向 Token 桶中放置 Token 的间隔,例如:
1
2
|
limit := Every(100 * time.Millisecond);
limiter := NewLimiter(limit, 1);
|
以上就表示每 100ms 往桶中放一个 Token。本质上也就是一秒钟产生 10 个。
Limiter 提供了三类方法供用户消费 Token,用户可以每次消费一个 Token,也可以一次性消费多个 Token。而每种方法代表了当 Token 不足时,各自不同的对应手段。
Wait/WaitN
1
2
|
func (lim *Limiter) Wait(ctx context.Context) (err error)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
|
Wait 实际上就是 WaitN(ctx,1)
。
当使用 Wait 方法消费 Token 时,如果此时桶内 Token 数组不足 (小于 N),那么 Wait 方法将会阻塞一段时间,直至 Token 满足条件。如果充足则直接返回。
这里可以看到,Wait 方法有一个 context 参数。
我们可以设置 context 的 Deadline 或者 Timeout,来决定此次 Wait 的最长时间。
Allow/AllowN
1
2
|
func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time, n int) bool
|
Allow 实际上就是 AllowN(time.Now(),1)
。
AllowN 方法表示,截止到某一时刻,目前桶中数目是否至少为 n 个,满足则返回 true,同时从桶中消费 n 个 token。反之返回不消费 Token,false。
通常对应这样的线上场景,如果请求速率过快,就直接丢到某些请求。
Reserve/ReserveN
1
2
|
func (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
|
Reserve 相当于 ReserveN(time.Now(), 1)
。
ReserveN 的用法就相对来说复杂一些,当调用完成后,无论 Token 是否充足,都会返回一个 Reservation * 对象。
你可以调用该对象的 Delay() 方法,该方法返回了需要等待的时间。如果等待时间为 0,则说明不用等待。必须等到等待时间之后,才能进行接下来的工作。
或者,如果不想等待,可以调用 Cancel() 方法,该方法会将 Token 归还。
举一个简单的例子,我们可以这么使用 Reserve 方法。
1
2
3
4
5
6
7
|
r := lim.Reserve()
f !r.OK() {
// Not allowed to act! Did you remember to set lim.burst to be > 0 ?
return
}
time.Sleep(r.Delay())
Act() // 执行相关逻辑
|
动态调整速率
Limiter 支持可以调整速率和桶大小:
SetLimit(Limit) 改变放入 Token 的速率
SetBurst(int) 改变 Token 桶大小
有了这两个方法,可以根据现有环境和条件以及我们的需求,动态地改变 Token 桶大小和速率。
API
Constants
1
|
const Inf = Limit(math.MaxFloat64)
|
Inf是无限速率限制;它允许所有事件(即使burst为零)。
1
|
const InfDuration = time.Duration(1<<63 - 1)
|
InfDuration是预留不正确时由Delay返回的持续时间。
type Limit
Limit定义了某些事件的最大频率。限制表示为每秒的事件数。zero Limit 不允许发生任何事件。
func Every
1
|
func Every(interval time.Duration) Limit
|
Every将事件之间的最小时间间隔转换为一个Limit。
type Limiter
1
2
3
|
type Limiter struct {
// contains filtered or unexported fields
}
|
Limiter控制事件发生的频率。它实现了一个大小为b的“令牌桶”,最初是满的,并以每秒r个令牌的速率重新填充。非正式地,在足够大的时间间隔内,Limiter将速率限制为每秒r个令牌,最大突发大小为b个事件。作为一种特殊情况,如果r == Inf(无限速率),则b被忽略。
零值是有效的Limiter,但它将拒绝所有事件。使用NewLimiter创建非零限制器。
Limiter有三种主要方法,Allow, Reserve, Wait。大多数情况应使用“Wait”。
三种方法中的每一种都消耗一个令牌。当没有令牌可用时,它们的行为不同。
- 如果没有可用的令牌,则Allow返回false。
- 如果没有可用的令牌,Reserve将返回 Reservation,包括以及调用者在使用它之前必须等待的时间。
- 如果没有可用的令牌,则Wait阻塞,直到获得一个令牌或其关联的context被取消为止。
方法AllowN,ReserveN和WaitN消耗n个令牌。
func NewLimiter
1
|
func NewLimiter(r Limit, b int) *Limiter
|
NewLimiter返回一个新的限制器,该限制器允许事件的发生率达到r,并允许突发最多b个令牌。
Limter限制时间的发生频率,采用令牌池的算法实现。这个池子一开始容量为b,装满b个令牌,然后每秒往里面填充r个令牌。
由于令牌池中最多有b个令牌,所以一次最多只能允许b个事件发生,一个事件花费掉一个令牌。
func (*Limiter) Allow
1
|
func (lim *Limiter) Allow() bool
|
Allow是AllowN(time.Now(),1)的简写。
func (*Limiter) AllowN
1
|
func (lim *Limiter) AllowN(now time.Time, n int) bool
|
AllowN标识在时间now的时候,n个事件是否可以同时发生(也意思就是now的时候是否可以从令牌池中取n个令牌)
如果你需要在事件超出频率的时候丢弃或跳过事件,就使用AllowN,否则使用Reserve或Wait.
func (*Limiter) Burst
1
|
func (lim *Limiter) Burst() int
|
Burst返回最大突发大小。突发是一次调用Allow, Reserve, Wait时可以消耗的最大令牌数,因此较高的Burst值允许一次发生更多事件。零Burst不带任何事件,除非limit == Inf。
func (*Limiter) Limit
1
|
func (lim *Limiter) Limit() Limit
|
限制返回最大总事件发生率。
func (*Limiter) Reserve
1
|
func (lim *Limiter) Reserve() *Reservation
|
Reserve是ReserveN(time.Now(),1)的简写。
func (*Limiter) ReserveN
1
|
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
|
ReserveN返回一个Reservation,该Reservation指示调用者在n个事件发生之前必须等待多长时间。
当允许将来发生事件时,限制器会将此Reservation考虑在内。
如果n超过Limiter的burst大小,ReserveN返回false。用法示例:
1
2
3
4
5
6
7
|
r := lim.ReserveN(time.Now(), 1)
if !r.OK() {
// Not allowed to act! Did you remember to set lim.burst to be > 0 ?
return
}
time.Sleep(r.Delay())
Act()
|
如果您希望根据速率限制等待并放慢速度而不丢失事件,请使用此方法。如果您需要遵守最后期限或取消延迟,请使用wait。要丢弃或跳过超出速率限制的事件,请改用allow。
我认为这里要表达的意思就是如果事件发生的频率是可以由调用者控制的话,可以用ReserveN 来控制事件发生的速度而不丢掉事件。如果要使用context的截止日期或cancel方法的话,使用WaitN。
func (*Limiter) SetBurst
1
|
func (lim *Limiter) SetBurst(newBurst int)
|
SetBurst是SetBurstAt(time.Now(),newBurst)的简写。
func (*Limiter) SetBurstAt
1
|
func (lim *Limiter) SetBurstAt(now time.Time, newBurst int)
|
SetBurstAt为限制器设置新的突发大小。
func (*Limiter) SetLimit
1
|
func (lim *Limiter) SetLimit(newLimit Limit)
|
SetLimit是SetLimitAt(time.Now(),newLimit)的简写。
func (*Limiter) SetLimitAt
1
|
func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit)
|
SetLimitAt为限制器设置新的限制。保留(使用Reserve或Wait)但在调用SetLimitAt之前尚未执行的操作可能会违反或未充分利用新的Limit和Burst。
func (*Limiter) Wait
1
|
func (lim *Limiter) Wait(ctx context.Context) (err error)
|
Wait是WaitN(ctx,1)的简写。
func (*Limiter) WaitN
1
|
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
|
WaitN阻塞,直到lim允许n个事件发生。
如WaitN 阻塞当前直到Limiter允许n个事件的发生。
- 如果n超过了令牌池的容量大小则报错。
- 如果Context被取消了则报错。
- 如果lim的等待时间超过了Context的超时时间则报错。
如果速率限制为Inf,则忽略突发限制。
type Reservation
1
2
3
|
type Reservation struct {
// contains filtered or unexported fields
}
|
Reserve保留有关限制器允许在延迟后发生的事件的信息。Reserve可以被取消,这可以使限制器允许其他事件。
func (*Reservation) Cancel
1
|
func (r *Reservation) Cancel()
|
Cancel是CancelAt(time.Now())的简写。
func (*Reservation) CancelAt
1
|
func (r *Reservation) CancelAt(now time.Time)
|
CancelAt表示预订持有者将不执行Reserve操作,并考虑可能已经进行了其他Reserve,因此尽可能地降低此Reserve对速率限制的影响。
func (*Reservation) Delay
1
|
func (r *Reservation) Delay() time.Duration
|
Delay是DelayFrom(time.Now())的简写。
func (*Reservation) DelayFrom
1
|
func (r *Reservation) DelayFrom(now time.Time) time.Duration
|
DelayFrom返回Reservation所有者在执行Reservation操作之前必须等待的持续时间。持续时间为零意味着立即采取行动。
InfDuration表示限制器无法在最大等待时间内授予此Reservation中请求的令牌。
func (*Reservation) OK
1
|
func (r *Reservation) OK() bool
|
确定返回限制器是否可以在最大等待时间内提供请求数量的令牌。如果OK为false,则Delay返回InfDuration,而Cancel不执行任何操作。
样例
基本用法
Limter提供三中主要的函数 Allow, Reserve, Wait. 大部分时候使用Wait
Wait
Wait/WaitN 当没有可用事件时,将阻塞等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
package main
import (
"context"
"fmt"
"golang.org/x/time/rate"
"time"
)
func main() {
l := rate.NewLimiter(1, 3) // 一个参数为每秒发生多少次事件,第二个参数是最大可运行多少个事件(burst)
c, _ := context.WithCancel(context.TODO())
for {
l.Wait(c)
fmt.Println(time.Now().Format("04:05.000"))
}
}
|
输出
1
2
3
4
5
6
|
25:08.243
25:08.243
25:08.243
25:09.248
25:10.248
25:11.248
|
Allow
Allow/AllowN 当没有可用事件时,返回false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
package main
import (
"fmt"
"golang.org/x/time/rate"
"time"
)
func main() {
l := rate.NewLimiter(1, 3) // 一个参数为每秒发生多少次事件,第二个参数是最大可运行多少个事件(burst)
for {
if l.AllowN(time.Now(), 1) {
fmt.Println(time.Now().Format("04:05.000"))
} else {
time.Sleep(1 * time.Second / 10)
fmt.Println(time.Now().Format("Second 04:05.000"))
}
}
}
|
Reserve
Reserve/ReserveN 当没有可用事件时,返回 Reservation,和要等待多久才能获得足够的事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
package main
import (
"fmt"
"golang.org/x/time/rate"
"time"
)
func main() {
l := rate.NewLimiter(1, 3) // 一个参数为每秒发生多少次事件,第二个参数是最大可运行多少个事件(burst)
for {
r := l.ReserveN(time.Now(), 1)
s := r.Delay()
time.Sleep(s)
fmt.Println(s, time.Now().Format("04:05.000"))
}
}
|
ReserveN 的高级用法
使用Reserve可以根据Delay的时间自行控制程序是否执行.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
package main
import (
"bytes"
"fmt"
"io"
"time"
"golang.org/x/time/rate"
)
type reader struct {
r io.Reader
limiter *rate.Limiter
}
// Reader returns a reader that is rate limited by
// the given token bucket. Each token in the bucket
// represents one byte.
func NewReader(r io.Reader, l *rate.Limiter) io.Reader {
return &reader{
r: r,
limiter: l,
}
}
func (r *reader) Read(buf []byte) (int, error) {
n, err := r.r.Read(buf)
if n <= 0 {
return n, err
}
now := time.Now()
rv := r.limiter.ReserveN(now, n)
if !rv.OK() {
return 0, fmt.Errorf("Exceeds limiter's burst")
}
delay := rv.DelayFrom(now)
fmt.Printf("Read %d bytes, delay %f\n", n, delay.Seconds())
time.Sleep(delay)
return n, err
}
func main() {
// Source holding 1MB
src := bytes.NewReader(make([]byte, 1024*1024))
// Destination
dst := &bytes.Buffer{}
// Bucket adding 100KB every second, holding max 100KB
limit := rate.NewLimiter(100*1024, 100*1024)
start := time.Now()
buf := make([]byte, 10*1024)
// Copy source to destination, but wrap our reader with rate limited one
//io.CopyBuffer(dst, NewReader(src, limit), buf)
r := NewReader(src, limit)
for {
if n, err := r.Read(buf); err == nil {
dst.Write(buf[0:n])
} else {
break
}
}
fmt.Printf("Copied %d bytes in %s\n", dst.Len(), time.Since(start))
}
|
值得注意的是,这里不能直接用io.Copy,bytes.Buffer实现了ReaderFrom,每次Read的时候,buf的长度是变化的,会导致len(buf)超过rate.Limiter.burst。对于这种情况,rv.DelayFrom(now)会返回InfDuration。
源码剖析
设计思路
这个库并没有使用定时器来发放 token 而是用了 lazyload 的方式,等需要消费 token 的时候才通过时间去计算然后更新 token 的数量,下面我们先通过一个例子来看一下这个流程是怎么跑的

如上图所示,假设我们有一个限速器,它的 token 生成速度为 1,也就是一秒一个,桶的大小为 10,每个格子表示一秒的时间间隔
- last 表示上一次更新 token 时还有 2 个 token。
- 现在我有一个请求进来,我总共需要 7 个 token 才能完成这个请求
- now 表示我现在进来的时间,距离 last 已经过去了 2s,那么现在就有 4 个 token
- 所以我如果需要 7 个 token 那么也就还需要等待 3s 中才真的有 7 个,所以这就是 timeToAct 所在的时间节点
- 预约成功之后更新 last = now 、token = -3 因为 token 已经被预约出去了所以现在剩下的就是负数了
数据结构
常量变量
1
2
3
4
5
6
|
//定义某个时间的最大频率
//表示每秒的事件数
type Limit float64
//Inf表示无速率限制
const Inf = Limit(math.MaxFloat64)
|
Limiter
1
2
3
4
5
6
7
8
9
10
11
12
13
|
type Limiter struct {
limit Limit //每秒允许处理的事件数量,即每秒处理事件的频率
burst int //令牌桶的最大数量, 如果burst为0,则除非limit == Inf,否则不允许处理任何事件。
mu sync.Mutex
tokens float64 //令牌桶中可用的令牌数量
// last is the last time the limiter's tokens field was updated
//记录上次limiter的tokens被更新的时间
last time.Time
// lastEvent is the latest time of a rate-limited event (past or future)
//lastEvent记录速率受限制(桶中没有令牌)的时间点,该时间点可能是过去的,也可能是将来的(Reservation预定的结束时间点)
lastEvent time.Time
}
|
Limiter是限流器中最核心的结构体,用于限流(控制事件发生的频率),在初始化后默认是满的,并以每秒r个令牌的速率重新填充直到达到桶的容量(burst),如果r == Inf表示无限制速率。
Limiter有三个主要的方法 Allow、Reserve和Wait,最常用的是Wait和Allow方法
这三个方法每调用一次都会消耗一个令牌,这三个方法的区别在于没有令牌时,他们的处理方式不同
Allow: 如果没有令牌,则直接返回false
Reserve:如果没有令牌,则返回一个reservation,
Wait:如果没有令牌,则等待直到获取一个令牌或者其上下文被取消。
tokens更新的策略:
- 成功获取到令牌或成功预约(Reserve)到令牌
- 预约取消时(Cancel)并且需要还原令牌到令牌桶中时
- 重新设置限流器的速率时(SetLimit)
- 重新设置限流器的容量时(SetBurst)
lastEvent表示速率受限制的时间点,它可能时过去的时间,也可能时将来的时间。
- 如果没有预约令牌的话,该时间等于last,是过去的
- 如果有预约令牌的话,该时间等于最新的预约的截至时间。
注意:由于令牌桶的令牌可以预约,所有令牌桶中的tokens可能为负数。
Reservation
1
2
3
4
5
6
7
8
|
type Reservation struct {
ok bool //到截至时间是否可以获取足够的令牌
lim *Limiter
tokens int //需要获取的令牌数量
timeToAct time.Time //需要等待的时间点
// This is the Limit at reservation time, it can change later.
limit Limit
}
|
Reservation可以理解成预定令牌的操作,timeToAct是本次预约需要等待到的指定时间点才有足够预约的令牌。
- r.tokens 指的是本次消费的 Token 数
- r.timeToAct 指的是 Token 桶可以满足本次消费数目的时刻,也就是消费的时刻 + 等待的时长。
- r.lim.lastEvent 指的是最近一次消费的 timeToAct 值
Limiter相关的方法
Limiter初始化
1
2
3
4
5
6
|
func NewLimiter(r Limit, b int) *Limiter {
return &Limiter{
limit: r,
burst: b,
}
}
|
初始化Limiter,指定每秒允许处理事件的上限为r,允许令牌桶的最大容量为b
1
2
3
4
5
6
|
func Every(interval time.Duration) Limit {
if interval <= 0 {
return Inf
}
return 1 / Limit(interval.Seconds())
}
|
Every将事件的最小时间间隔转换为限制
Limiter使用
1
2
3
|
func (lim *Limiter) Allow() bool {
return lim.AllowN(time.Now(), 1)
}
|
从令牌桶中获取一个令牌,成功获取到则返回true
1
2
3
|
func (lim *Limiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(now, n, 0).ok
}
|
从令牌桶中获取n个令牌,成功获取到则返回true.
Wait 函数可以通过 Context 进行取消或者超时等,当通过 Context 进行取消或超时时,此时消费的 Token 数也会归还给 Token 桶。
1
2
3
|
func (lim *Limiter) Wait(ctx context.Context) (err error) {
return lim.WaitN(ctx, 1)
}
|
获取一个令牌,如果没有则等待直到获取令牌或者上下文ctx取消
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
//同步获取令牌桶的最大容量burst和限流器的速率limit
lim.mu.Lock()
burst := lim.burst
limit := lim.limit
lim.mu.Unlock()
//如果n大于令牌桶的最大容量,则返回error
if n > burst && limit != Inf {
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst)
}
// Check if ctx is already cancelled
//判断上下文ctx是否已经被取消,如果已经取消则返回error
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Determine wait limit
now := time.Now()
waitLimit := InfDuration
if deadline, ok := ctx.Deadline(); ok { //如果可以获取上下文的截至时间,则更新可以等待的时间waitLimit
waitLimit = deadline.Sub(now)
}
// Reserve
//调用reserveN获取Reversation
r := lim.reserveN(now, n, waitLimit)
if !r.ok { //没有足够的时间获取令牌,则返回error
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
}
// Wait if necessary
//需要等待的时间
delay := r.DelayFrom(now)
if delay == 0 {
return nil
}
t := time.NewTimer(delay)
defer t.Stop()
select {
case <-t.C:
// We can proceed.
return nil
case <-ctx.Done():
// Context was canceled before we could proceed. Cancel the
// reservation, which may permit other events to proceed sooner.
r.Cancel()
return ctx.Err()
}
}
|
WaitN方法获取n个令牌,直到成功获取或者ctx取消
如果n大于令牌桶的最大容量则返回error
如果上下文被取消或者等待的时间大于上下文的截至时间,则返回error
如果速率限制为Inf则不会限流
无论是Wait、Allow或者Reserve其实都会调用advance和reserveN方法,所以这两个方法是整个限流器rate实现的核心。
在正式讲之前,我们先了解一个简单的概念:
在 time/rate 中,NewLimiter 的第一个参数是速率 limit,代表了一秒钟可以产生多少 Token。那么简单换算一下,我们就可以知道一个 Token 的生成间隔是多少。有了这个生成间隔,我们就可以轻易地得到两个数据:
- 生成 N 个新的 Token 一共需要多久。time/rate 中对应的实现函数为 durationFromTokens。
- 给定一段时长,这段时间一共可以生成多少个 Token。time/rate 中对应的实现函数为 tokensFromDuration。
那么,有了这些转换函数,整个过程就很清晰了,如下:
-
计算从上次取 Token 的时间到当前时刻,期间一共新产生了多少 Token:
- 我们只在取 Token 之前生成新的 Token,也就意味着每次取 Token 的间隔,实际上也是生成 Token 的间隔。我们可以利用 tokensFromDuration, 轻易的算出这段时间一共产生 Token 的数目。
- 那么,当前 Token 数目 = 新产生的 Token 数目 + 之前剩余的 Token 数目 - 要消费的 Token 数目。
-
如果消费后剩余 Token 数目大于零,说明此时 Token 桶内仍不为空,此时 Token 充足,无需调用侧等待。
- 如果 Token 数目小于零,则需等待一段时间。
- 那么,我们可以利用 durationFromTokens 将当前负值的 Token 数转化为需要等待的时间。
将需要等待的时间等相关结果返回给调用方。
从上面可以看出,其实整个过程就是利用了 Token 数可以和时间相互转化 的原理。而如果 Token 数为负,则需要等待相应时间即可。
注意 如果当消费时,Token 桶中的 Token 数目已经为负值了,依然可以按照上述流程进行消费。随着负值越来越小,等待的时间将会越来越长。从结果来看,这个行为跟用 Timer+BlockQueue 实现是一样的。
此外,整个过程为了保证线程安全,更新令牌桶相关数据时都用了 mutex 加锁。
我们模拟下请求与 Token 数变化的关系:
- 当某一时间,桶内 Token 数为 3, 此时 A 线程请求 5 个 Token。那么此时桶内 Token 不足,因此 A 线程需要等待 2 个 Token 的时间。且此时桶内 Token 数变为 - 2。
- 同时,B 线程请求 4 个 Token,此时桶内 Token 数为 - 2,因此 B 线程需要等待 2+4=6 个 Token 的时间,且此时桶内 Token 数变为 - 6。
对于 Allow 函数实现时,只要判断需要等待的时间是否为 0 即可,如果大于 0 说明需要等待,则返回 False,反之返回 True。
对于 Wait 函数,直接 t := time.NewTimer(delay)
,等待对应的时间即可。
advance方法的作用是更新令牌桶的状态,计算出令牌桶未更新的时间(elapsed),根据elapsed算出需要向桶中加入的令牌数delta,然后算出桶中可用的令牌数newTokens
我最开始的理解是,直接可以这么做:
1
2
3
4
5
6
7
8
9
|
// elapsed 表示过去的时间差
elapsed := now.Sub(lim.last)
// delta 表示这段时间一共新产生了多少 Token
delta = tokensFromDuration(now.Sub(lim.last))
tokens := lim.tokens + delta
if(token> lim.burst){
token = lim.burst
}
|
其中,lim.tokens 是当前剩余的 Token,lim.last 是上次取 token 的时刻。lim.burst 是 Token 桶的大小。
使用 tokensFromDuration 计算出新生成了多少 Token,累加起来后,不能超过桶的容量即可。
这么做看起来也没什么问题,然而并不是这样。
在 time/rate 里面是这么做的,如下代码所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// now 是传入的当前的时间点,返回的 newNow 其实就是传入的参数,没有任何改变
// newLast 是更新 token 的时间
// newTokens 是 token 的数量
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
//last不能在当前时间now之后,否则计算出来的elapsed为负数,会导致令牌桶数量减少
last := lim.last
if now.Before(last) {
last = now
}
// Avoid making delta overflow below when last is very old.
//根据令牌桶的缺数计算出令牌桶未进行更新的最大时间
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last) //令牌桶未进行更新的时间段
if elapsed > maxElapsed {
elapsed = maxElapsed
}
// Calculate the new number of tokens, due to time that passed.
//根据未更新的时间(未向桶中加入令牌的时间段)计算出产生的令牌数。
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta //计算出可用的令牌数
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return now, last, tokens
}
|
与我们最开始的代码不一样的是,它没有直接用 now.Sub(lim.last)
来转化为对应的 Token 数,而是
先用 lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
,计算把桶填满的时间 maxElapsed
。取 elapsed
和maxElapsed
的最小值。
这么做算出的结果肯定是正确的,但是这么做相比于我们的做法,好处在哪里?
对于我们的代码,当 last 非常小的时候(或者当其为初始值 0 的时候),此时 now.Sub(lim.last) 的值就会非常大,如果 lim.limit 即每秒生成的 Token 数目也非常大时,直接将二者进行乘法运算,结果有可能会溢出。
因此,time/rate 先计算了把桶填满的时间,将其作为时间差值的上限,这样就规避了溢出的问题。
reserveN方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
// now: 需要消费 token 的时间点
// n: 需要多少个 token
// maxFutureReserve: 能够等待的最长时间
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
//如果没有限流则直接返回
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true, //桶中有足够的令牌
lim: lim,
tokens: n,
timeToAct: now,
}
}
//更新令牌桶的状态,tokens为目前可用的令牌数量
now, last, tokens := lim.advance(now)
// Calculate the remaining number of tokens resulting from the request.
//可用的令牌数tokens减去需要获取的令牌数(n)
tokens -= float64(n)
// Calculate the wait duration
//如果tokens小于0,则说明桶中没有足够的令牌,计算出产生这些缺数的令牌需要多久(waitDuration)
//计算出产生出缺数的令牌(即-tokens)需要多长时间
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// Decide result
//如果n小于等于令牌桶的容量,并且可以等待到足够的令牌(即 waitDuration <= maxFutureReserve),则ok为true。表示可以获取到足够的令牌
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
// 如果可以的话,就把 token 分配给预约者
if ok {
r.tokens = n // 需要的令牌数
r.timeToAct = now.Add(waitDuration) //计算获取到足够令牌的结束时间点
}
// Update state
// 更新各个字段的状态
if ok {
lim.last = now //更新tokens的时间
lim.tokens = tokens //更新令牌桶目前可用的令牌数tokens
lim.lastEvent = r.timeToAct //下次事件时间(即获取到足够令牌的时刻)
} else {
//last不能在当前时间now之后,否则计算出来的elapsed为负数,会导致令牌桶数量减少
lim.last = last
}
lim.mu.Unlock()
return r
}
|
reserveN是 AllowN, ReserveN及 WaitN的辅助方法,用于判断在maxFutureReserve时间内是否有足够的令牌。
durationFromTokens和tokensFromDuration工具转换方法
在 Token 和时间的相互转化函数 durationFromTokens 和 tokensFromDuration 中,涉及到 float64 的乘除运算。一谈到 float 的乘除,我们就需要小心精度问题了。
而 Golang 在这里也踩了坑,以下是 tokensFromDuration 最初的实现版本
1
2
3
|
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
return d.Seconds() * float64(limit)
}
|
这个操作看起来一点问题都没:每秒生成的 Token 数乘于秒数。
然而,这里的问题在于,d.Seconds() 已经是小数了。两个小数相乘,会带来精度的损失。
修改后新的版本如下:
获取指定期间d内产生的令牌数量
1
2
3
4
5
6
7
8
9
|
// tokensFromDuration is a unit conversion function from a time duration to the number of tokens
// which could be accumulated during that duration at a rate of limit tokens per second.
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
// Split the integer and fractional parts ourself to minimize rounding errors.
// See golang.org/issues/34861.
sec := float64(d/time.Second) *float64(limit)
nsec := float64(d%time.Second)* float64(limit)
return sec + nsec/1e9
}
|
time.Duration 是 int64 的别名,代表纳秒。分别求出秒的整数部分和小数部分,进行相乘后再相加,这样可以得到最精确的精度。
根据根据令牌数量tokens计算出产生该数量的令牌需要的时长
1
2
3
4
|
func (limit Limit) durationFromTokens(tokens float64) time.Duration {
seconds := tokens / float64(limit)
return time.Nanosecond *time.Duration(1e9*seconds)
}
|
另外:
Limiter的Limit方法用于获取限流的速率即结构体中limit的值,Burst方法用于返回桶的最大容量。
Reservation相关的方法
Reservation相关的方法即预约令牌需要使用的方法
Reserve和ReserveN分别用于预约1个或者n个令牌
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
func (lim *Limiter) Reserve()*Reservation {
return lim.ReserveN(time.Now(), 1)
}
// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
// The Limiter takes this Reservation into account when allowing future events.
// ReserveN returns false if n exceeds the Limiter's burst size.
// Usage example:
// r := lim.ReserveN(time.Now(), 1)
// if !r.OK() {
// // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
// return
// }
// time.Sleep(r.Delay())
// Act()
// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
// If you need to respect a deadline or cancel the delay, use Wait instead.
// To drop or skip events exceeding rate limit, use Allow instead.
func (lim *Limiter) ReserveN(now time.Time, n int)*Reservation {
r := lim.reserveN(now, n, InfDuration)
return &r
}
|
返回一个Reservation,该Reservation指示调用者在n个事件发生前需要等待多长事件.对于 Reserve 函数,返回的结果中,我们可以通过 Reservation.Delay() 函数,得到需要等待时间。同时调用方可以根据返回条件和现有情况,可以调用 Reservation.Cancel() 函数,取消此次消费。当调用 Cancel() 函数时,消费的 Token 数将会尽可能归还给 Token 桶。
如果n超出了限流器的burst,则返回false
1
2
3
|
func (r *Reservation) OK() bool {
return r.ok
}
|
返回限流器limiter是否可以在最大等待时间内提供请求数量的令牌。
如果Ok为false,则Delay返回InfDuration,Cancel不执行任何操作
1
2
3
|
func (r *Reservation) Delay() time.Duration {
return r.DelayFrom(time.Now())
}
|
返回到截至时间的时间段 ,即需要等待的时间
1
2
3
4
5
6
7
8
9
10
|
func (r *Reservation) DelayFrom(now time.Time) time.Duration {
if !r.ok { //ok为false,则返回InfDuration
return InfDuration
}
delay := r.timeToAct.Sub(now) //截止时间
if delay < 0 { //如果截至时间已过,则返回0
return 0
}
return delay
}
|
DelayFrom方法用于返回当前时间now到截至时间的时间段
如果为0,表示有足够的令牌,需要立即执行
如果返回InfDuration,表示到截至时间时仍然没有足够的令牌
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
func (r *Reservation) Cancel() {
r.CancelAt(time.Now())
return
}
func (r *Reservation) CancelAt(now time.Time) {
if !r.ok {
return
}
r.lim.mu.Lock()
defer r.lim.mu.Unlock()
/*
1. 如果无需限流
2. tokens为0 (需要获取的令牌数量为0)
3. 已经过了截至时间
以上三种情况无需处理取消操作
*/
if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {
return
}
// calculate tokens to restore
// The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
// after r was obtained. These tokens should not be restored.
// 计算出需要还原的令牌数量
// 这里的r.lim.lastEvent可能是本次Reservation的结束时间,也可能是后来的Reservation的结束时间,所以要把本次结束时间点(r.timeToAct)之后产生的令牌数减去
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
if restoreTokens <= 0 {
return
}
// advance time to now
//从新计算令牌桶的状态
now, _, tokens := r.lim.advance(now)
// calculate new number of tokens
//还原当前令牌桶的令牌数量,当前的令牌数tokens加上需要还原的令牌数restoreTokens
tokens += restoreTokens
//如果tokens大于桶的最大容量,则将tokens置为桶的最大容量
if burst := float64(r.lim.burst); tokens > burst {
tokens = burst
}
// update state
r.lim.last = now //记录桶的更新时间
r.lim.tokens = tokens //更新令牌数量
//还原lastEvent,即上次速率受限制的时间
if r.timeToAct == r.lim.lastEvent {
prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
if !prevEvent.Before(now) {
r.lim.lastEvent = prevEvent
}
}
return
}
|
CancelAt用于取消预约令牌操作,如果有需要还原的令牌,则将需要还原的令牌重新放入到令牌桶中。
问题
研究golang rate限频的源码时候,发现rate的Wait,WaitN存在一个问题。当waiter提前return后,当前和后面waiter无法进行延迟时间重排,回收的问题。
问题:
我们知道go rate waitN是可以传递context的。比如,在一个场景里,5秒放一个token,池的大小就一个。开始协程g1拿到了token,g2、g3也需要token, 但是rate池子里已经没有token, g2、g3自然就需要wait方法去等待新token的产生, 那么g2,、g3需要等待多久? 最好的计划肯定是等待到下次产生token的时候,简单说g2等待5s, g3需要等待到10s。
当我们通过传递context来主动关闭g2的等待,但协程 g3 还是在等待10s。也就是说,g2退出了,按理来说后面的rate waiter应该调整下时间。但go rate没有做这方面的处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
func reserveBug() {
l := rate.NewLimiter(1, 1)
for index := 0; index < 10; index++ {
wg.Add(1)
go func() {
r := l.ReserveN(time.Now(), 1)
time.Sleep(200 * time.Millisecond)
r.Cancel()
wg.Done()
}()
}
wg.Wait()
r := l.ReserveN(time.Now(), 1)
fmt.Println("reserve need wait: ", r.Delay())
}
|
rate作为go的标准库,他的实现确实很巧妙。使用预计token的生产时间来分配一个个waiter等待者的延迟时间。但这样带来的问题,一方面充斥了许多的定时器,另一方面不能让waiter快速的收敛排队。这类定时器在高并发下无疑是性能的杀手。
如何解决?
- 修改go rate源码,可以把wait里的timer放在heap里,某waiter退出后,我们可以把大于该waiter等待时间的timer,重新reset一下。当然这个复杂度有点大了。
- 所有协程统一按照下次token的生产时间来等待,但这个问题就有点忙轮询和竞争了。或者可以自定义等待时间加配 rate allow 非阻塞方法。
- 自己去实现限频模块,new一个协程专门来生产token,可以用chan来做通知。
我最后采用的是第三种,就是自己实现令牌桶。
漏桶算法:uber-go/ratelimiter
漏桶和令牌桶的最大的区别就是,令牌桶是支持突发流量的,但是漏桶是不支持的。但是 uber 的这个库通过引入弹性时间的方式也让漏桶算法有了类似令牌桶能够应对部分突发流量的能力,并且实现上还非常的简单,值得学习。
1
2
3
4
5
6
7
8
|
rl := ratelimit.New(100) // per second
prev := time.Now()
for i := 0; i < 10; i++ {
now := rl.Take()
fmt.Println(i, now.Sub(prev))
prev = now
}
|
在这个例子中,我们给定限流器每秒可以通过 100 个请求,也就是平均每个请求间隔 10ms。
因此,最终会每 10ms 打印一行数据。输出结果如下:
1
2
3
4
5
6
7
8
9
10
11
|
// Output:
// 0 0
// 1 10ms
// 2 10ms
// 3 10ms
// 4 10ms
// 5 10ms
// 6 10ms
// 7 10ms
// 8 10ms
// 9 10ms
|
API
1
2
3
4
5
6
7
8
|
type Clock
type Limiter
func New(rate int, opts ...Option) Limiter
func NewUnlimited() Limiter
type Option
func Per(per time.Duration) Option
func WithClock(clock Clock) Option
func WithSlack(slack int) Option
|
Clock 是一个接口,计时器的最小实现,有两个方法,分别是当前的时间和睡眠
1
2
3
4
|
type Clock interface {
Now() time.Time
Sleep(time.Duration)
}
|
Limiter 也是一个接口,只有一个 Take 方法,执行这个方法的时候如果触发了 rps 限制则会阻塞住
1
2
3
4
|
type Limiter interface {
// Take should block to make sure that the RPS is met.
Take() time.Time
}
|
NewLimter 和 NewUnlimited 会分别初始化一个无锁的限速器和没有任何限制的限速器
Option 有三个方法
- Per 可以修改时间单位,默认是秒所以我们默认限制的是 rps,如果改成分钟那么就是 rpm 了
- WithClock 可以修改时钟,这个用于在测试的时候可以 mock 掉不使用真实的时间
- WithSlack 用于修改松弛时间,也就是可以允许的突发流量的大小,默认是 Pre / 10 ,这个后面会讲到
双版本
ratelimit 是漏桶的一个典型实现。提供了 atomic 和 mutex 两个版本:
核心结构
有锁版本:
1
2
3
4
5
6
7
8
|
type mutexLimiter struct {
sync.Mutex
last time.Time
sleepFor time.Duration
perRequest time.Duration
maxSlack time.Duration
clock Clock
}
|
无锁版本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
type state struct {
last time.Time
sleepFor time.Duration
}
type atomicLimiter struct {
state unsafe.Pointer
//lint:ignore U1000 Padding is unused but it is crucial to maintain performance
// of this rate limiter in case of collocation with other frequently accessed memory.
padding [56]byte // cache line size - state pointer size = 64 - 8; created to avoid false sharing.
perRequest time.Duration
maxSlack time.Duration
clock Clock
}
|
atomicLimiter 结构体:
- state 是一个状态的指针,用于存储上一次的执行的时间,以及需要 sleep 的时间
- padding 是一个无意义的填充数据,为了提高性能,避免 cpu 缓存的 false sharing,这里为了尽可能提高性能,填充了 56 字节的无意义数据,因为 state 是一个指针占用了 8 个字节,所以 64 - 8 = 56
剩下三个字段和 Option 中的三个方法意义对应
- perRequest 就是单位,默认是秒
- maxSlack 松弛时间,也就是可以允许的突发流量的大小,默认是 Pre / 10 ,这个后面会讲到
- clock 时钟,这个用于在测试的时候可以 mock 掉不使用真实的时间
初始化
默认 ratelimter 的限速周期是 time.Second 即 1s,参数中 rate 是限速周期内的限速总数,直面上理解为将 1s 化为 rate 个区间,maxSlack 为最大松弛量,下面的篇幅再做解释。
1
2
3
4
5
6
7
8
9
10
11
|
// New returns a Limiter that will limit to the given RPS.
func newMutexBased(rate int, opts ...Option) Limiter {
l := &mutexLimiter{
perRequest: time.Second / time.Duration(rate),
maxSlack: -10 * time.Second / time.Duration(rate), // 最大松弛量
}
if l.clock == nil {
l.clock = clock.New()
}
return l
}
|
在初始化代码中:limiter.perRequest = time.Second / time.Duration(rate)
,这里 limiter.perRequest 指的就是每个请求之间的间隔时间。
newMutexBased 方法中传入的参数 rate 就是每秒允许请求量 (RPS) 的值。
核心限速逻辑 Take
本小节分析下 Uber Leaky Bucket 的核心限速逻辑, New(10) 传入的 10 指的是 1s 内只有能有 10 个请求通过, 于是算出来每个请求之间应该间隔 100 ms. 如果两个请求之间间隔时间过短, 那么需要第二个请求 sleep 一段时间, 这样保证请求能够匀速从桶内流出.
如下图,当请求 1 处理结束后, 我们记录下请求 1 的处理完成的时刻, 记为 limiter.last。稍后请求 2 到来, 如果此刻的时间与 limiter.last 相比并没有达到 perRequest 的间隔大小,那么 sleep 一段时间即可。

对应 ratelimit 的实现代码如下:
1
2
3
4
5
6
7
|
sleepFor = t.perRequest - now.Sub(t.last)
if sleepFor > 0 {
t.clock.Sleep(sleepFor)
t.last = now.Add(sleepFor)
} else {
t.last = now
}
|
由于漏桶的核心限速逻辑是实现每秒固定速率的目的,其实还是比较简单的。 t.last 保存了上一次请求通过的最后时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *mutexLimiter) Take() time.Time {
t.Lock()
defer t.Unlock()
now := t.clock.Now()
// If this is our first request, then we allow it.
// 如果是第一次请求, 直接放行即可
if t.last.IsZero() {
t.last = now
return t.last
}
// sleepFor calculates how much time we should sleep based on
// the perRequest budget and how long the last request took.
// Since the request may take longer than the budget, this number
// can get negative, and is summed across requests.
// 注意这段代码,累加 sleepFor 的值
t.sleepFor += t.perRequest - now.Sub(t.last)
// We shouldn't allow sleepFor to get too negative, since it would mean that
// a service that slowed down a lot for a short period of time would get
// a much higher RPS following that.
if t.sleepFor < t.maxSlack {
t.sleepFor = t.maxSlack
}
// If sleepFor is positive, then we should sleep now.
// 判断是否桶溢出. 如果桶溢出了, 需要 sleep 一段时间
if t.sleepFor > 0 {
t.clock.Sleep(t.sleepFor)
t.last = now.Add(t.sleepFor)
t.sleepFor = 0
} else {
t.last = now
}
return t.last
}
|
无锁版本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
func (t *atomicLimiter) Take() time.Time {
var (
// 状态
newState state
// 用于表示原子操作是否成功
taken bool
// 需要 sleep 的时间
interval time.Duration
)
// 如果 CAS 操作不成功就一直尝试
for !taken {
// 获取当前的时间
now := t.clock.Now()
// load 出上一次调用的时间
previousStatePointer := atomic.LoadPointer(&t.state)
oldState := (*state)(previousStatePointer)
newState = state{
last: now,
sleepFor: oldState.sleepFor,
}
// 如果 last 是零值的话,表示之前就没用过,直接保存返回即可
if oldState.last.IsZero() {
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
continue
}
// sleepFor 是需要睡眠的时间,由于引入了松弛时间,所以 sleepFor 可能是一个
// maxSlack ~ 0 之间的一个值,所以这里需要将现在的需要 sleep 的时间和上一次
// sleepFor 的值相加
newState.sleepFor += t.perRequest - now.Sub(oldState.last)
// 如果距离上一次调用已经很久了,sleepFor 可能会是一个很小的值
// 最小值只能是 maxSlack 的大小
if newState.sleepFor < t.maxSlack {
newState.sleepFor = t.maxSlack
}
// 如果 sleepFor 大于 0 的话,计算出需要 sleep 的时间
// 然后将 state.sleepFor 置零
if newState.sleepFor > 0 {
newState.last = newState.last.Add(newState.sleepFor)
interval, newState.sleepFor = newState.sleepFor, 0
}
// 保存状态
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
}
// sleep interval
t.clock.Sleep(interval)
return newState.last
}
|
maxSlack 应用
我们讲到,传统的 Leaky Bucket,每个请求的间隔是固定的,然而,在实际上的互联网应用中,流量经常是突发性的。对于这种情况,uber-go 对 Leaky Bucket 做了一些改良,引入了最大松弛量 (maxSlack) 的概念。
我们先理解下整体背景: 假如我们要求每秒限定 100 个请求,平均每个请求间隔 10ms。但是实际情况下,有些请求间隔比较长,有些请求间隔比较短。如下图所示:

请求 1 完成后,15ms 后,请求 2 才到来,可以对请求 2 立即处理。请求 2 完成后,5ms 后,请求 3 到来,这个时候距离上次请求还不足 10ms,因此还需要等待 5ms。
但是,对于这种情况,实际上三个请求一共消耗了 25ms 才完成,并不是预期的 20ms。在 uber-go 实现的 ratelimit 中,可以把之前间隔比较长的请求的时间,匀给后面的使用,保证每秒请求数 (RPS) 即可。
对于以上 case,因为请求 2 相当于多等了 5ms,我们可以把这 5ms 移给请求 3 使用。加上请求 3 本身就是 5ms 之后过来的,一共刚好 10ms,所以请求 3 无需等待,直接可以处理。此时三个请求也恰好一共是 20ms。
如下图所示:

在 ratelimit 的对应实现中很简单,是把每个请求多余出来的等待时间累加起来,以给后面的抵消使用。
1
2
3
4
5
6
7
8
|
t.sleepFor += t.perRequest - now.Sub(t.last)
if t.sleepFor > 0 {
t.clock.Sleep(t.sleepFor)
t.last = now.Add(t.sleepFor)
t.sleepFor = 0
} else {
t.last = now
}
|
注意:这里跟上述代码不同的是,这里是 +=。而同时 t.perRequest - now.Sub(t.last) 是可能为负值的,负值代表请求间隔时间比预期的长。
当 t.sleepFor > 0,代表此前的请求多余出来的时间,无法完全抵消此次的所需量,因此需要 sleep 相应时间, 同时将 t.sleepFor 置为 0。
当 t.sleepFor < 0,说明此次请求间隔大于预期间隔,将多出来的时间累加到 t.sleepFor 即可。
但是,对于某种情况,请求 1 完成后,请求 2 过了很久到达 (好几个小时都有可能),那么此时对于请求 2 的请求间隔 now.Sub(t.last),会非常大。以至于即使后面大量请求瞬时到达,也无法抵消完这个时间。那这样就失去了限流的意义。
为了防止这种情况,ratelimit 就引入了最大松弛量 (maxSlack) 的概念, 该值为负值,表示允许抵消的最长时间,防止以上情况的出现。
1
2
3
|
if t.sleepFor < t.maxSlack {
t.sleepFor = t.maxSlack
}
|
ratelimit 中 maxSlack 的值为 10 * time.Second / time.Duration(rate)
, 是十个请求的间隔大小。我们也可以理解为 ratelimit 允许的最大瞬时请求为 10。
高级用法
ratelimit 的 New 函数,除了可以配置每秒请求数 (QPS), 其实还提供了一套可选配置项 Option。
1
|
func New(rate int, opts ...Option) Limiter
|
Option 的类型为 type Option func(l *limiter)
, 也就是说我们可以提供一些这样类型的函数,作为 Option,传给 ratelimit, 定制相关需求。
但实际上,自定义 Option 的用处比较小,因为 limiter 结构体本身就是个私有类型,我们并不能拿它做任何事情。
我们只需要了解 ratelimit 目前提供的两个配置项即可:
WithoutSlack
我们上文讲到 ratelimit 中引入了最大松弛量的概念,而且默认的最大松弛量为 10 个请求的间隔时间。
但是确实会有这样需求场景,需要严格的限制请求的固定间隔。那么我们就可以利用 WithoutSlack 来取消松弛量的影响。
1
2
|
limiter := ratelimit.New(100, ratelimit.WithoutSlack)
WithClock(clock Clock)
|
我们上文讲到,ratelimit 的实现时,会计算当前时间与上次请求时间的差值,并 sleep 相应时间。
在 ratelimit 基于 go 标准库的 time 实现时间相关计算。如果有精度更高或者特殊需求的计时场景,可以用 WithClock 来替换默认时钟。
通过该方法,只要实现了 Clock 的 interface,就可以自定义时钟了。
1
2
3
4
5
6
|
type Clock interface {
Now() time.Time
Sleep(time.Duration)
}
clock &= MyClock{}
limiter := ratelimit.New(100, ratelimit.WithClock(clock))
|
缺点
漏斗桶/令牌桶确实能够保护系统不被拖垮, 但不管漏斗桶还是令牌桶, 其防护思路都是设定一个指标, 当超过该指标后就阻止或减少流量的继续进入,当系统负载降低到某一水平后则恢复流量的进入。但其通常都是被动的,其实际效果取决于限流阈值设置是否合理,但往往设置合理不是一件容易的事情。
- 集群增加机器或者减少机器限流阈值是否要重新设置?
- 设置限流阈值的依据是什么?
- 人力运维成本是否过高?
- 当调用方反馈429时, 这个时候重新设置限流, 其实流量高峰已经过了重新评估限流是否有意义?
这些其实都是采用漏斗桶/令牌桶的缺点, 总体来说就是太被动, 不能快速适应流量变化。
参考
https://www.jianshu.com/p/4ce68a31a71d
https://www.jianshu.com/p/1ecb513f7632
https://blog.csdn.net/u010066807/article/details/79961957
https://segmentfault.com/a/1190000015347065
https://www.jianshu.com/p/a59c13e70582
https://blog.biezhi.me/2018/10/rate-limit-algorithm.html
time/rate 限流源码解析
Golang 标准库限流器 time/rate 实现剖析
uber-go 漏桶限流器使用与原理分析
Golang rate无法延迟重排的BUG
Go可用性(四) 漏桶算法