超时控制
超时控制,我们的组件能够快速失效(fail fast),因为我们不希望等到断开的实例直到超时。没有什么比挂起的请求和无响应的界面更令人失望。这不仅浪费资源,而且还会让用户体验变得更差。我们的服务是互相调用的,所以在这些延迟叠加前,应该特别注意防止那些超时的操作。
- 网路传递具有不确定性。
- 客户端和服务端不一致的超时策略导致资源浪费。
- “默认值”策略。
- 高延迟服务导致 client 浪费资源等待,使用超时传递: 进程间传递 + 跨进程传递。
超时控制是微服务可用性的第一道关,良好的超时策略,可以尽可能让服务不堆积请求,尽快清空高延迟的请求,释放 Goroutine。

超时定义
实际业务开发中,我们依赖的微服务的超时策略并不清楚,或者随着业务迭代耗时超生了变化,意外的导致依赖者出现了超时。
-
服务提供者定义好 latency SLO,更新到 gRPC Proto 定义中,服务后续迭代,都应保证 SLO。
避免出现意外的默认超时策略,或者意外的配置超时策略。
-
kit 基础库兜底默认超时,比如 100ms,进行配置防御保护,避免出现类似 60s 之类的超大超时策略。
-
配置中心公共模版,对于未配置的服务使用公共配置。
1
2
3
4
5
6
7
8
|
package google.example.library.v1;
service LibraryService {
// Lagency SLO: 95th in 100ms, 99th in 150ms.
rpc CreateBook(CreateBookRequest) returns (Book);
rpc GetBook(GetBookRequest) returns Book);
rpc ListBooks(ListBooksRequest) returns (ListBooksResponse);
}
|
内部超时传递
超时传递: 当上游服务已经超时返回 504,但下游服务仍然在执行,会导致浪费资源做无用功。超时传递指的是把当前服务的剩余 Quota 传递到下游服务中,继承超时策略,控制请求级别的全局超时控制。
进程内超时控制
一个请求在每个阶段(网络请求)开始前,就要检查是否还有足够的剩余来处理请求,以及继承他的超时策略,使用 Go 标准库的 context.WithTimeout。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
func (c *asiiConn) Get(ctx context.Context, key string) (result*Item, err error) {
c.conn.SetWriteDeadline(shrinkDeadline(ctx, c.writeTimeout))
if _, err = fmt.Fprintf(c.rw, "gets %s\r\n", key); err != nil {
func shrinkDeadline(ctx context.Context, timeout time.Duration) time.Time {
var timeoutTime = time.Now().Add(timeout)
if ctx == nil {
return timeoutTime
}
if deadline, ok := ctx.Deadline(); ok && timeoutTime.After(deadline) {
return deadline
}
return timeoutTime
}
func TestShrinkDeadline(t *testing.T) {
t.Run("test not deadline", func(t *testing.T) {
timeout := time.Second
timeoutTime := time.Now().Add(timeout)
tm := shrinkDeadline(context.Background(), timeout)
assert.True(t, tm.After(timeoutTime))
})
t.Run("test big deadline", func(t *testing.T) {
timeout := time.Second
timeoutTime := time.Now().Add(timeout)
deadlineTime := time.Now().Add(2 * time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
tm := shrinkDeadline(ctx, timeout)
assert.True(t, tm.After(timeoutTime) && tm.Before(deadlineTime))
})
t.Run("test small deadline", func(t *testing.T) {
timeout := time.Second
deadlineTime := time.Now().Add(500 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
tm := shrinkDeadline(ctx, timeout)
assert.True(t, tm.After(deadlineTime) && tm.Before(time.Now().Add(timeout)))
})
}
|

GRPC超时传递
- A gRPC 请求 B,1s超时。
- B 使用了300ms 处理请求,再转发请求 C。
- C 配置了600ms 超时,但是实际只用了500ms。
- 到其他的下游,发现余量不足,取消传递。
在需要强制执行时,下游的服务可以覆盖上游的超时传递和配额。
在 gRPC 框架中,会依赖 gRPC Metadata Exchange,基于 HTTP2 的 Headers 传递 grpc-timeout 字段,自动传递到下游,构建带 timeout 的 context。

耗时分布
- 双峰分布: 95%的请求耗时在100ms内,5%的请求可能永远不会完成(长超时)。
- 对于监控不要只看mean,可以看看耗时分布统计,比如 95th,99th。
- 设置合理的超时,拒绝超长请求,或者当Server 不可用要主动失败。
超时决定着服务线程耗尽。

超时 - Case Stduy
- SLB 入口 Nginx 没配置超时导致连锁故障。
- 服务依赖的 DB 连接池漏配超时,导致请求阻塞,最终服务集体 OOM。
- 下游服务发版耗时增加,而上游服务配置超时过短,导致上游请求失败。
源码分析
我们发现,不仅是 Go gRPC 服务之间超时可以传递(如果你拿到上游的 ctx 继续往下透传的话)。Go 和 Java 服务之间,超时也会随着调用链传递。那么 gRPC 的超时是如何做到跨进程跨语言传递的?
有朋友可能想到了 metadata,是否 gRPC 请求链上游设置了超时后,gRPC 框架底层将过期时间放在 metadata 里了?遗憾的是我们打印 metadata 后发现并未发现 timeout 字段踪迹。那么超时时间到底是怎样传递的呢?以 grpc-go 源码为例,我们来找下线索。
我们知道 gRPC 基于 HTTP2,HTTP2 传输的最小单位是 Frame(帧)。HTTP2 的帧包含很多类型:DATA Frame、HEADERS Frame、PRIORITY Frame、RST_STREAM Frame、CONTINUATON Frame 等。一个 HTTP2 请求/响应可以被拆成多个帧并行发送,每一帧都有一个 StreamID 来标记属于哪个 Stream。服务端收到 Frame 后,根据 StreamID 组装出原始请求数据。

对于 gRPC 而言,Data Frame 用来存放请求的 response payload;Headers Frame 可用来存放一些需要进行跨进程传递的数据,比如 grpc-status(RPC 请求状态码) 、 :path(RPC 完整路径) 等。那么超时时间是否也通过 HEADERS Frame 传递呢?
客户端设置 timeout
我们知道,用户定义好 protobuf 并通过 protoc 生成桩代码后,桩代码中已经包含了 gRPCCient 接口的实现,每一个在 protobuf 中定义的 RPC,底层都会通过 ClientConn. Invoke 向服务端发起调用:
比如对于这样的 protobuf:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
syntax = "proto3";
package proto;
service DemoService {
rpc SayHi(HiRequest) returns (HiResponse);
}
message HiRequest {
string name = 1;
}
message HiResponse {
string message = 1;
}
|
生成的桩代码中已经包含了 Client 实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
type DemoServiceClient interface {
SayHiOK(ctx context.Context, in *HiRequest, opts ...grpc.CallOption) (*HiResponse, error)
}
type demoServiceClient struct {
cc *grpc.ClientConn
}
func NewDemoServiceClient(cc *grpc.ClientConn) DemoServiceClient {
return &demoServiceClient{cc}
}
func (c *demoServiceClient) SayHiOK(ctx context.Context, in *HiRequest, opts ...grpc.CallOption) (*HiResponse, error) {
out := new(HiResponse)
// 调用 grpc.ClientConn.Invoke() 函数,grpc.ClientConn.Invoke() 内部最终会调用 invoke() 函数
err := c.cc.Invoke(ctx, "/proto.DemoService/SayHi", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
|
客户端发起 gRPC 请求时,最终会调用 invoke() 方法,invoke() 源码大概如下:
1
2
3
4
5
6
7
8
9
10
11
12
|
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
// 构造 clientStream
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
// 发送 RPC 请求
if err := cs.SendMsg(req); err != nil {
return err
}
return cs.RecvMsg(reply)
}
|
我们看下 newClientStream 源码,newClientStream 源码比较复杂,我们挑重点看:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
......
// 前面代码都不关注,这里生成clientStream对象,关注其ctx字段.
cs := &clientStream{
callHdr: callHdr,
ctx: ctx,
methodConfig: &mc,
opts: opts,
callInfo: c,
cc: cc,
desc: desc,
codec: c.codec,
cp: cp,
comp: comp,
cancel: cancel,
beginTime: beginTime,
firstAttempt: true,
}
if !cc.dopts.disableRetry {
cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
}
cs.binlog = binarylog.GetMethodLogger(method)
// 在newAttemptLocked中会通过负载均衡算法来选择Ready状态的连接.
// Only this initial attempt has stats/tracing.
// TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
// 构造新的 *csAttempt,newAttemptLocked 内部会获取 grpc.ClientTransport 并赋值给 *csAttemp.t
if err := cs.newAttemptLocked(sh, trInfo); err != nil {
cs.finish(err)
return nil, err
}
// 在withRetry中最终会调用op函数,该函数会调用到csAttempt.newStream方法.
op := func(a *csAttempt) error { return a.newStream() }
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
cs.finish(err)
return nil, err
}
......
// 下面的也不用关注了.
return cs, nil
|
其中 csAttempt.newStream 实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
type csAttempt struct {
cs *clientStream
t transport.ClientTransport // 客户端 Transport
s *transport.Stream // 真正处理RPC 的 Stream
...
}
func (a *csAttempt) newStream() error {
cs := a.cs
cs.callHdr.PreviousAttempts = cs.numRetries
// 这里会调用NewStream方法,t指向的是http2Client对象,cs是指向clientStream对象的.
s, err := a.t.NewStream(cs.ctx, cs.callHdr)
if err != nil {
if _, ok := err.(transport.PerformedIOError); ok {
// Return without converting to an RPC error so retry code can
// inspect.
return err
}
return toRPCErr(err)
}
cs.attempt.s = s
cs.attempt.p = &parser{r: s}
return nil
}
|
transport.ClientTransport 是一个接口,gRPC 内部 internal/transport.http2Client 实现了此接口。
http2Client.NewStream() 源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
ctx = peer.NewContext(ctx, t.getPeer())
// 重点关注createHeaderFields,这个方法会处理HEADERS的数据,来传播表头数据.
headerFields, err := t.createHeaderFields(ctx, callHdr)
...
hdr := &headerFrame{
hf: headerFields,
endStream: false,
...
}
...
for {
success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
if !checkForStreamQuota(it) {
return false
}
if !checkForHeaderListSize(it) {
return false
}
return true
}, hdr)
...
return s, nil
}
|
createHeaderFields 实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
// 主要处理HEADERS头部数据.
......
// 支持的字段.
headerFields := make([]hpack.HeaderField, 0, hfLen)
headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(callHdr.ContentSubtype)})
headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
if callHdr.PreviousAttempts > 0 {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
}
if callHdr.SendCompress != "" {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: callHdr.SendCompress})
}
// 重点在这里,获取ctx的deadline时间,然后放入头部中的grpc-timeout字段中,客户端就是利用这个来吧超时时间传递到服务端的.
// 如果透传过来的 ctx 被设置了 timeout/deadline,则在 HTTP2 headers frame 中添加 grpc-timeout 字段,
// grpc-timeout 字段值被转化成 XhYmZs 字符串形式的超时时间
if dl, ok := ctx.Deadline(); ok {
// Send out timeout regardless its value. The server can detect timeout context by itself.
// TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
timeout := time.Until(dl)
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: grpcutil.EncodeDuration(timeout)})
}
for k, v := range authData {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
for k, v := range callAuthData {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
if b := stats.OutgoingTags(ctx); b != nil {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
}
if b := stats.OutgoingTrace(ctx); b != nil {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
}
if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
var k string
for k, vv := range md {
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
if isReservedHeader(k) {
continue
}
for _, v := range vv {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
for _, vv := range added {
for i, v := range vv {
if i%2 == 0 {
k = strings.ToLower(v)
continue
}
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
if isReservedHeader(k) {
continue
}
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
}
if md, ok := t.md.(*metadata.MD); ok {
for k, vv := range *md {
if isReservedHeader(k) {
continue
}
for _, v := range vv {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
}
return headerFields, nil
}
|
总结:
- 在调用rpc接口时设置了超时的context,会最终传递到http2Client对象中,然后在处理HEADERS时会把deadline时间设置到头部参数grpc-timeout中,传递到服务端.
- http2Client对象是在调用Dial或DialContext过程中生成的,每个Endpoint对应一个该对象.
服务端解析 timeout
服务端通过 Serve 方法启动 grpc Server,监听来自客户端连接。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func (s *Server) Serve(lis net.Listener) error {
...
for {
// 接收客户端的连接
rawConn, err := lis.Accept()
...
s.serveWG.Add(1)
go func() {
// 对每一个客户端的连接单独开一个协程来处理
s.handleRawConn(rawConn)
s.serveWG.Done()
}()
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
func (s*Server) handleRawConn(rawConn net.Conn) {
...
// 构造 HTTP2 Transport
st := s.newHTTP2Transport(conn, authInfo)
go func() {
// 处理 HTTP2 Stream
s.serveStreams(st)
s.removeConn(st)
}()
}
func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close()
var wg sync.WaitGroup
// http2Server 实现了 transport.ServerTransport 接口,此处会调用 http2Server.HandleSteams方法
// st.HandleStreams 方法签名中第一个参数 handle func(stream*transport.Stream) {}为函数类型,
// handle 随后会在 operateHeaders 中被调用
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
go func() {
defer wg.Done()
// 解析出 gPRC Service, gRPC method, gRPC request message,执行注册到 gRPC.Server 中的 RPC 方法
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}, ...)
wg.Wait()
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
// http2Server.HandleStreams 会调用传入的 handle 处理 HTTP2 Stream
func (t*http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
defer close(t.readerDone)
for {
// 该函数主要接收客户端发送过来的数据,重点关注处理HEADERS部分的数据.
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
...
switch frame := frame.(type) {
// 如果是 Headers 帧,则调用 operateHeaders 方法处理 Headers
case*http2.MetaHeadersFrame:
if t.operateHeaders(frame, handle, traceCtx) {
t.Close()
break
}
// 如果是 Data 帧,则调用 handleData 方法处理
case *http2.DataFrame:
t.handleData(frame)
...
}
}
}
// operateHeaders 解析 Headers 帧
func (t *http2Server) operateHeaders(frame*http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
// 从HTTP2 Headers 帧中获取 StreamID
streamID := frame.Header().StreamID
state := &decodeState{
serverSide: true,
}
// 从HTTP2 Headers 帧中解析出Header。如果其中包含 grpc-timeout HEADER,
// 则解析出其值并赋值给 state.data.timeout,并将 state.data.timeoutSet 设成 true
if err := state.decodeHeader(frame); err != nil {
if se, ok := status.FromError(err); ok {
...
}
buf := newRecvBuffer()
// 构造 HTTP2 Stream
s := &Stream{
id: streamID,
st: t,
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
recvCompress: state.data.encoding,
method: state.data.method,
contentSubtype: state.data.contentSubtype,
}
...
// 如果 state.data.timeoutSet 为 true,则构造一个新的带 timeout 的 ctx 覆盖原 s.ctx
// s.ctx 最终会透传到用户实现的 gRPC Handler 中,参与业务逻辑处理
// 见 server.go 中 processUnaryRPC 内:
// ctx := NewContextWithServerTransportStream(stream.Context(), stream)
// reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
// 此处不再赘述
if state.data.timeoutSet {
s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
} else {
s.ctx, s.cancel = context.WithCancel(t.ctx)
}
...
t.controlBuf.put(®isterStream{
streamID: s.id,
wq: s.wq,
})
// 调用 serveStreams 定义好的 handle,执行gRPC调用
handle(s)
return false
}
|
decodeHeader 会遍历 frame 中所有 Fields,并调用 processHeaderField 对 HTTP2 HEADERS 帧中的特定的 Field 进行处理。
- 比如可以从 :path 中解析出包含 protobuf package、service name 和 RPC method name 的完整路径;
- 比如可以从 grpc-timeout 中解析出上游传递过来的 timeout;
decodeHeader 内部实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func (d *decodeState) decodeHeader(frame*http2.MetaHeadersFrame) error {
...
// 遍历Headers帧,解析Field
for _, hf := range frame.Fields {
d.processHeaderField(hf)
}
}
func (d *decodeState) processHeaderField(f hpack.HeaderField) {
switch f.Name {
...
// 解析出 grpc-timeout
case "grpc-timeout":
d.data.timeoutSet = true
var err error
if d.data.timeout, err = decodeTimeout(f.Value); err != nil {
d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed time-out: %v", err)
}
...
// 解析出 grpc 带 protobuf package path、Service name、RPC method name 的完整路径
// 形如 /package.service/method
case ":path":
d.data.method = f.Value
}
}
|
至此可以看到,gRPC 框架确实是通过 HTTP2 HEADERS Frame 中的 grpc-timeout 字段来实现跨进程传递超时时间。
总结
- 客户端客户端发起 RPC 调用时传入了带 timeout 的 ctx
- gRPC 框架底层通过 HTTP2 协议发送 RPC 请求时,将 timeout 值写入到 grpc-timeout HEADERS Frame 中
- 服务端接收 RPC 请求时,gRPC 框架底层解析 HTTP2 HEADERS 帧,读取 grpc-timeout 值,并覆盖透传到实际处理 RPC 请求的业务 gPRC Handle 中
- 如果此时服务端又发起对其他 gRPC 服务的调用,且使用的是透传的 ctx,这个 timeout 会减去在本进程中耗时,从而导致这个 timeout 传递到下一个 gRPC 服务端时变短,这样即实现了所谓的 超时传递 。
具体使用
建立连接时超时控制
客户端建立连接时,使用的Dial()函数,它位于
google.golang.org/grpc/clientconn.go 中,我们看看这个函数内容:
1
2
3
|
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
|
它里面调用的 DialContext() 函数,这个函数非常长,他们在同一个文件中,它是实际执行的函数,这里面就有context的timeout和Done相关操作。你也可以到google.golang.org/grpc/clientconn.go文件中去看看这个函数DialContext具体是干嘛的。
使用的时候传入设置timeout的context,如下:
1
2
3
|
ctx, cancel := context.Timeout(context.Bakcground(), time.Second*5)
defer cancel()
conn, err := grpc.DialContext(ctx, address, grpc.WithBlock(), grpc.WithInsecure())
|
- grpc.WithInsecure()
gRPC是建立在HTTP/2上的,所以对TLS提供了很好的支持。如果在客户端建立连接过程中设置 grpc.WithInsecure() 就可以跳过对服务器证书的验证。
- grpc.WithBlock()
这个参数会阻塞等待握手成功。
因为用Dial连接时是异步连接,连接状态为正在连接,如果设置了这个参数就是同步连接,会阻塞等待握手成功。
这个还和超时设置有关,如果你没有设置这个参数,那么context超时控制将会失效。
调用Dial建立连接默认只是返回一个ClientConn的指针,相当于new了一个ClientConn 把指针返回给你。并不是一定要建立真实的h2连接.至于真实的连接建立实际上是一个异步的过程。当然了如果你想等真实的链接完全建立再返回ClientConn可以通过WithBlock传入Options来实现,当然了这样的话链接如果建立不成功就会一直阻塞直到Contex超时。
调用时超时
函数的调用超时控制
1
2
3
|
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5)
defer cancel()
result, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
|
举例
Client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
func main() {
...
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Duration(5 * time.Second)))
defer cancel()
client := pb.NewSearchServiceClient(conn)
resp, err := client.Search(ctx, &pb.SearchRequest{
Request: "gRPC",
})
if err != nil {
statusErr, ok := status.FromError(err)
if ok {
if statusErr.Code() == codes.DeadlineExceeded {
log.Fatalln("client.Search err: deadline")
}
}
log.Fatalf("client.Search err: %v", err)
}
log.Printf("resp: %s", resp.GetResponse())
}
|
context.WithDeadline:会返回最终上下文截止时间。第一个形参为父上下文,第二个形参为调整的截止时间。若父级时间早于子级时间,则以父级时间为准,否则以子级时间为最终截止时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c)
dur := time.Until(d)
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(true, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
|
context.WithTimeout:很常见的另外一个方法,是便捷操作。实际上是对于 WithDeadline 的封装
1
2
3
|
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
|
status.FromError:返回 GRPCStatus 的具体错误码,若为非法,则直接返回 codes.Unknown
Server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
type SearchService struct{}
func (s *SearchService) Search(ctx context.Context, r *pb.SearchRequest) (*pb.SearchResponse, error) {
for i := 0; i < 5; i++ {
if ctx.Err() == context.Canceled {
return nil, status.Errorf(codes.Canceled, "SearchService.Search canceled")
}
time.Sleep(1 * time.Second)
}
return &pb.SearchResponse{Response: r.GetRequest() + " Server"}, nil
}
func main() {
...
}
|
而在 Server 端,由于 Client 已经设置了截止时间。Server 势必要去检测它
否则如果 Client 已经结束掉了,Server 还傻傻的在那执行,这对资源是一种极大的浪费
因此在这里需要用 ctx.Err() == context.Canceled 进行判断,为了模拟场景我们加了循环和睡眠.
参考
gRPC 系列——grpc超时传递原理
Golang gRPC学习(04): Deadlines超时限制
「连载九」gRPC Deadlines