限流

限流实现

gokit 基于go包 golang.org/x/time/rate 内置了一种实现.

本次实现基于gokit内建的类型endpoint.Middleware,该类型实际上是一个function,使用装饰者模式实现对Endpoint的封装。定义如下:

1
2
// Middleware is a chainable behavior modifier for endpoints.
type Middleware func(Endpoint) Endpoint

新建go文件命名为instrument.go,实现限流方法:参数为令牌桶(bkt)返回endpoint.Middleware。使用令牌桶的TakeAvaiable方法获取令牌,若获取成功则继续执行,若获取失败则返回异常(即限流)。

在instrument.go中添加方法NewTokenBucketLimitterWithBuildIn,在其中使用x/time/rate实现限流方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
	"context"
	"errors"
	"github.com/go-kit/kit/endpoint"
	"golang.org/x/time/rate"
)

var ErrLimitExceed = errors.New("Rate limit exceed!")


// NewTokenBucketLimitterWithBuildIn 使用x/time/rate创建限流中间件
func NewTokenBucketLimiter(bkt *rate.Limiter) endpoint.Middleware {
	return func(next endpoint.Endpoint) endpoint.Endpoint {
		return func(ctx context.Context, request interface{}) (response interface{}, err error) {
			if !bkt.Allow() {
				return nil, ErrLimitExceed
			}
			return next(ctx, request)
		}
	}
}

接下来就是使用golang.org/x/time/rate创建令牌桶(每秒刷新一次,容量为3),然后调用限流方法对Endpoint进行装饰。在main方法中增加如下代码。

1
2
3
//add ratelimit,refill every second,set capacity 3
rateBucket := rate.NewLimiter(rate.Every(time.Second*1), 3)
endpoint = NewTokenBucketLimiter(rateBucket)(endpoint)

也可以直接调用gokit提供的方法:

1
2
3
4
5
6
import	kitratelim "github.com/go-kit/kit/ratelimit"

//add ratelimit,refill every second,set capacity 3
rateBucket := rate.NewLimiter(rate.Every(time.Second*1), 3)
endpoint = kitratelim.NewErroringLimiter(rateBucket)(endpoint)
	

熔断

go-kit 提供了三种熔断

  1. gobreaker

  2. handy

  3. hystrix-go

hystrix在java中用的比较多,我们来介绍下go-kit中hystrix的使用方法

go-kit的hystrix

Middleware的实现

  1. Hystrix返回Middleware 此中间件会在原来的endPoint包一层Hystrix的endPoint

  2. hystrix通过传入的commanName获取对应的Hystrix的设置,并设置run失败时运行的fallback函数为nil

客户端hystrix配置

  • Timeout: 执行command的超时时间。默认时间是1000毫秒
  • MaxConcurrentRequests:command的最大并发量 默认值是10
  • SleepWindow:当熔断器被打开后,SleepWindow的时间就是控制过多久后去尝试服务是否可用了。默认值是5000毫秒
  • RequestVolumeThreshold: 一个统计窗口10秒内请求数量。达到这个请求数量后才去判断是否要开启熔断。默认值是20
  • ErrorPercentThreshold:错误百分比,请求数量大于等于RequestVolumeThreshold并且错误率到达这个百分比后就会启动熔断 默认值是50

RequestVolumeThreshold与ErrorPercentThreshold通常组合使用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15

import	"github.com/go-kit/kit/circuitbreaker"

commandName := "my-endpoint"
hystrix.ConfigureCommand(commandName, hystrix.CommandConfig{
	Timeout:                1000 * 30,
	ErrorPercentThreshold:  1,
	SleepWindow:            10000,
	MaxConcurrentRequests:  1000,
	RequestVolumeThreshold: 5,
})
breakerMw := circuitbreaker.Hystrix(commandName)
....
//增加熔断中间件
reqEndPoint = breakerMw(reqEndPoint)

增加熔断中间件的包装,circuitbreaker.Hystrix函数由gokit提供

1
2
3
breakerMw := circuitbreaker.Hystrix(commandName)
//增加熔断中间件  
reqEndPoint = breakerMw(reqEndPoint)

circuitbreaker.Hystrix源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// Hystrix returns an endpoint.Middleware that implements the circuit
// breaker pattern using the afex/hystrix-go package.
//
// When using this circuit breaker, please configure your commands separately.
//
// See https://godoc.org/github.com/afex/hystrix-go/hystrix for more
// information.
func Hystrix(commandName string) endpoint.Middleware {
	return func(next endpoint.Endpoint) endpoint.Endpoint {
		return func(ctx context.Context, request interface{}) (response interface{}, err error) {
			var resp interface{}
			if err := hystrix.Do(commandName, func() (err error) {
				resp, err = next(ctx, request)
				return err
			}, nil); err != nil {
				return nil, err
			}
			return resp, nil
		}
	}
}

client代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main() {
	commandName := "my-endpoint"
	hystrix.ConfigureCommand(commandName, hystrix.CommandConfig{
		Timeout:                1000 * 30,
		ErrorPercentThreshold:  1,
		SleepWindow:            10000,
		MaxConcurrentRequests:  1000,
		RequestVolumeThreshold: 5,
	})
	breakerMw := circuitbreaker.Hystrix(commandName)
	//增加熔断中间件
	reqEndPoint = breakerMw(reqEndPoint)
	//现在我们可以通过 endPoint 发起请求了
	req := struct{}{}
	for i := 1; i <= 20; i++ {
		if _, err = reqEndPoint(ctx, req); err != nil {
			fmt.Println("当前时间: ", time.Now().Format("2006-01-02 15:04:05.99"))
			fmt.Println(err)
			time.Sleep(1 * time.Second)
		}
	}
}

错误处理

当client连接server被限流策略拒绝后,会收到"Rate limit exceed!",这个信息是我们在限流中间件返回的error包含的信息,为什么Gokit自动给client返回这个信息呢?这和Gokit的错误处理有关.

异常处理由kithttp.ServerOption决定,ServerOption是一个切片类型,包含错误处理,日志处理,链路跟踪等配置信息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import	kithttp "github.com/go-kit/kit/transport/http"

options := []kithttp.ServerOption{
	kithttp.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
	kithttp.ServerErrorEncoder(kithttp.DefaultErrorEncoder), //错误处理
	zipkinServer,
}

http.Handle("/calculate", kithttp.NewServer(
	endpoint,
	decodeArithmeticRequest,
	encodeArithmeticResponse,
	options...,
))

下面这行代码设定了错误处理规则

1
kithttp.ServerErrorEncoder(kithttp.DefaultErrorEncoder), //错误处理

默认错误处理规则如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// DefaultErrorEncoder writes the error to the ResponseWriter, by default a
// content type of text/plain, a body of the plain text of the error, and a
// status code of 500. If the error implements Headerer, the provided headers
// will be applied to the response. If the error implements json.Marshaler, and
// the marshaling succeeds, a content type of application/json and the JSON
// encoded form of the error will be used. If the error implements StatusCoder,
// the provided StatusCode will be used instead of 500.
func DefaultErrorEncoder(_ context.Context, err error, w http.ResponseWriter) {
	contentType, body := "text/plain; charset=utf-8", []byte(err.Error())
	if marshaler, ok := err.(json.Marshaler); ok {
		if jsonBody, marshalErr := marshaler.MarshalJSON(); marshalErr == nil {
			contentType, body = "application/json; charset=utf-8", jsonBody
		}
	}
	w.Header().Set("Content-Type", contentType)
	if headerer, ok := err.(Headerer); ok {
		for k, values := range headerer.Headers() {
			for _, v := range values {
				w.Header().Add(k, v)
			}
		}
	}
	code := http.StatusInternalServerError //500
	if sc, ok := err.(StatusCoder); ok {
		code = sc.StatusCode()
	}
	w.WriteHeader(code)
	w.Write(body)
}

如果有需要,我们也可以自己实现,不再赘述.

参考:https://juejin.im/post/5c6eb44d518825760d1ed8f8