限流
限流实现
gokit 基于go包 golang.org/x/time/rate 内置了一种实现.
本次实现基于gokit内建的类型endpoint.Middleware,该类型实际上是一个function,使用装饰者模式实现对Endpoint的封装。定义如下:
1
2
|
// Middleware is a chainable behavior modifier for endpoints.
type Middleware func(Endpoint) Endpoint
|
新建go文件命名为instrument.go,实现限流方法:参数为令牌桶(bkt)返回endpoint.Middleware。使用令牌桶的TakeAvaiable方法获取令牌,若获取成功则继续执行,若获取失败则返回异常(即限流)。
在instrument.go中添加方法NewTokenBucketLimitterWithBuildIn,在其中使用x/time/rate实现限流方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
package main
import (
"context"
"errors"
"github.com/go-kit/kit/endpoint"
"golang.org/x/time/rate"
)
var ErrLimitExceed = errors.New("Rate limit exceed!")
// NewTokenBucketLimitterWithBuildIn 使用x/time/rate创建限流中间件
func NewTokenBucketLimiter(bkt *rate.Limiter) endpoint.Middleware {
return func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
if !bkt.Allow() {
return nil, ErrLimitExceed
}
return next(ctx, request)
}
}
}
|
接下来就是使用golang.org/x/time/rate
创建令牌桶(每秒刷新一次,容量为3),然后调用限流方法对Endpoint进行装饰。在main方法中增加如下代码。
1
2
3
|
//add ratelimit,refill every second,set capacity 3
rateBucket := rate.NewLimiter(rate.Every(time.Second*1), 3)
endpoint = NewTokenBucketLimiter(rateBucket)(endpoint)
|
也可以直接调用gokit提供的方法:
1
2
3
4
5
6
|
import kitratelim "github.com/go-kit/kit/ratelimit"
//add ratelimit,refill every second,set capacity 3
rateBucket := rate.NewLimiter(rate.Every(time.Second*1), 3)
endpoint = kitratelim.NewErroringLimiter(rateBucket)(endpoint)
|
熔断
go-kit 提供了三种熔断
-
gobreaker
-
handy
-
hystrix-go
hystrix在java中用的比较多,我们来介绍下go-kit中hystrix的使用方法
go-kit的hystrix
Middleware的实现
-
Hystrix返回Middleware 此中间件会在原来的endPoint包一层Hystrix的endPoint
-
hystrix通过传入的commanName获取对应的Hystrix的设置,并设置run失败时运行的fallback函数为nil
客户端hystrix配置
- Timeout: 执行command的超时时间。默认时间是1000毫秒
- MaxConcurrentRequests:command的最大并发量 默认值是10
- SleepWindow:当熔断器被打开后,SleepWindow的时间就是控制过多久后去尝试服务是否可用了。默认值是5000毫秒
- RequestVolumeThreshold: 一个统计窗口10秒内请求数量。达到这个请求数量后才去判断是否要开启熔断。默认值是20
- ErrorPercentThreshold:错误百分比,请求数量大于等于RequestVolumeThreshold并且错误率到达这个百分比后就会启动熔断 默认值是50
RequestVolumeThreshold与ErrorPercentThreshold通常组合使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import "github.com/go-kit/kit/circuitbreaker"
commandName := "my-endpoint"
hystrix.ConfigureCommand(commandName, hystrix.CommandConfig{
Timeout: 1000 * 30,
ErrorPercentThreshold: 1,
SleepWindow: 10000,
MaxConcurrentRequests: 1000,
RequestVolumeThreshold: 5,
})
breakerMw := circuitbreaker.Hystrix(commandName)
....
//增加熔断中间件
reqEndPoint = breakerMw(reqEndPoint)
|
增加熔断中间件的包装,circuitbreaker.Hystrix函数由gokit提供
1
2
3
|
breakerMw := circuitbreaker.Hystrix(commandName)
//增加熔断中间件
reqEndPoint = breakerMw(reqEndPoint)
|
circuitbreaker.Hystrix源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
// Hystrix returns an endpoint.Middleware that implements the circuit
// breaker pattern using the afex/hystrix-go package.
//
// When using this circuit breaker, please configure your commands separately.
//
// See https://godoc.org/github.com/afex/hystrix-go/hystrix for more
// information.
func Hystrix(commandName string) endpoint.Middleware {
return func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
var resp interface{}
if err := hystrix.Do(commandName, func() (err error) {
resp, err = next(ctx, request)
return err
}, nil); err != nil {
return nil, err
}
return resp, nil
}
}
}
|
client代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
func main() {
commandName := "my-endpoint"
hystrix.ConfigureCommand(commandName, hystrix.CommandConfig{
Timeout: 1000 * 30,
ErrorPercentThreshold: 1,
SleepWindow: 10000,
MaxConcurrentRequests: 1000,
RequestVolumeThreshold: 5,
})
breakerMw := circuitbreaker.Hystrix(commandName)
//增加熔断中间件
reqEndPoint = breakerMw(reqEndPoint)
//现在我们可以通过 endPoint 发起请求了
req := struct{}{}
for i := 1; i <= 20; i++ {
if _, err = reqEndPoint(ctx, req); err != nil {
fmt.Println("当前时间: ", time.Now().Format("2006-01-02 15:04:05.99"))
fmt.Println(err)
time.Sleep(1 * time.Second)
}
}
}
|
错误处理
当client连接server被限流策略拒绝后,会收到"Rate limit exceed!"
,这个信息是我们在限流中间件返回的error包含的信息,为什么Gokit自动给client返回这个信息呢?这和Gokit的错误处理有关.
异常处理由kithttp.ServerOption决定,ServerOption是一个切片类型,包含错误处理,日志处理,链路跟踪等配置信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import kithttp "github.com/go-kit/kit/transport/http"
options := []kithttp.ServerOption{
kithttp.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
kithttp.ServerErrorEncoder(kithttp.DefaultErrorEncoder), //错误处理
zipkinServer,
}
http.Handle("/calculate", kithttp.NewServer(
endpoint,
decodeArithmeticRequest,
encodeArithmeticResponse,
options...,
))
|
下面这行代码设定了错误处理规则
1
|
kithttp.ServerErrorEncoder(kithttp.DefaultErrorEncoder), //错误处理
|
默认错误处理规则如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
// DefaultErrorEncoder writes the error to the ResponseWriter, by default a
// content type of text/plain, a body of the plain text of the error, and a
// status code of 500. If the error implements Headerer, the provided headers
// will be applied to the response. If the error implements json.Marshaler, and
// the marshaling succeeds, a content type of application/json and the JSON
// encoded form of the error will be used. If the error implements StatusCoder,
// the provided StatusCode will be used instead of 500.
func DefaultErrorEncoder(_ context.Context, err error, w http.ResponseWriter) {
contentType, body := "text/plain; charset=utf-8", []byte(err.Error())
if marshaler, ok := err.(json.Marshaler); ok {
if jsonBody, marshalErr := marshaler.MarshalJSON(); marshalErr == nil {
contentType, body = "application/json; charset=utf-8", jsonBody
}
}
w.Header().Set("Content-Type", contentType)
if headerer, ok := err.(Headerer); ok {
for k, values := range headerer.Headers() {
for _, v := range values {
w.Header().Add(k, v)
}
}
}
code := http.StatusInternalServerError //500
if sc, ok := err.(StatusCoder); ok {
code = sc.StatusCode()
}
w.WriteHeader(code)
w.Write(body)
}
|
如果有需要,我们也可以自己实现,不再赘述.
参考:https://juejin.im/post/5c6eb44d518825760d1ed8f8