超时控制

超时控制,我们的组件能够快速失效(fail fast),因为我们不希望等到断开的实例直到超时。没有什么比挂起的请求和无响应的界面更令人失望。这不仅浪费资源,而且还会让用户体验变得更差。我们的服务是互相调用的,所以在这些延迟叠加前,应该特别注意防止那些超时的操作。

  • 网路传递具有不确定性。
  • 客户端和服务端不一致的超时策略导致资源浪费。
  • “默认值”策略。
  • 高延迟服务导致 client 浪费资源等待,使用超时传递: 进程间传递 + 跨进程传递。

超时控制是微服务可用性的第一道关,良好的超时策略,可以尽可能让服务不堆积请求,尽快清空高延迟的请求,释放 Goroutine。

超时定义

实际业务开发中,我们依赖的微服务的超时策略并不清楚,或者随着业务迭代耗时超生了变化,意外的导致依赖者出现了超时。

  • 服务提供者定义好 latency SLO,更新到 gRPC Proto 定义中,服务后续迭代,都应保证 SLO。

    避免出现意外的默认超时策略,或者意外的配置超时策略。

  • kit 基础库兜底默认超时,比如 100ms,进行配置防御保护,避免出现类似 60s 之类的超大超时策略。

  • 配置中心公共模版,对于未配置的服务使用公共配置。

1
2
3
4
5
6
7
8
package google.example.library.v1;

service LibraryService {
    // Lagency SLO: 95th in 100ms, 99th in 150ms.
    rpc CreateBook(CreateBookRequest) returns (Book);
    rpc GetBook(GetBookRequest) returns Book);
    rpc ListBooks(ListBooksRequest) returns (ListBooksResponse);
}

内部超时传递

超时传递: 当上游服务已经超时返回 504,但下游服务仍然在执行,会导致浪费资源做无用功。超时传递指的是把当前服务的剩余 Quota 传递到下游服务中,继承超时策略,控制请求级别的全局超时控制。 进程内超时控制

一个请求在每个阶段(网络请求)开始前,就要检查是否还有足够的剩余来处理请求,以及继承他的超时策略,使用 Go 标准库的 context.WithTimeout。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (c *asiiConn) Get(ctx context.Context, key string) (result*Item, err error) {
	c.conn.SetWriteDeadline(shrinkDeadline(ctx, c.writeTimeout))
	if _, err = fmt.Fprintf(c.rw, "gets %s\r\n", key); err != nil {

func shrinkDeadline(ctx context.Context, timeout time.Duration) time.Time {
	var timeoutTime = time.Now().Add(timeout)
	if ctx == nil {
		return timeoutTime
	}
	if deadline, ok := ctx.Deadline(); ok && timeoutTime.After(deadline) {
		return deadline
	}
	return timeoutTime
}
func TestShrinkDeadline(t *testing.T) {
	t.Run("test not deadline", func(t *testing.T) {
		timeout := time.Second
		timeoutTime := time.Now().Add(timeout)
		tm := shrinkDeadline(context.Background(), timeout)
		assert.True(t, tm.After(timeoutTime))
	})
	t.Run("test big deadline", func(t *testing.T) {
		timeout := time.Second
		timeoutTime := time.Now().Add(timeout)
		deadlineTime := time.Now().Add(2 * time.Second)
		ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
		defer cancel()

		tm := shrinkDeadline(ctx, timeout)
		assert.True(t, tm.After(timeoutTime) && tm.Before(deadlineTime))
	})
	t.Run("test small deadline", func(t *testing.T) {
		timeout := time.Second
		deadlineTime := time.Now().Add(500 * time.Millisecond)
		ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
		defer cancel()

		tm := shrinkDeadline(ctx, timeout)
		assert.True(t, tm.After(deadlineTime) && tm.Before(time.Now().Add(timeout)))
	})
}

GRPC超时传递

  1. A gRPC 请求 B,1s超时。
  2. B 使用了300ms 处理请求,再转发请求 C。
  3. C 配置了600ms 超时,但是实际只用了500ms。
  4. 到其他的下游,发现余量不足,取消传递。

在需要强制执行时,下游的服务可以覆盖上游的超时传递和配额。

在 gRPC 框架中,会依赖 gRPC Metadata Exchange,基于 HTTP2 的 Headers 传递 grpc-timeout 字段,自动传递到下游,构建带 timeout 的 context。

耗时分布

  • 双峰分布: 95%的请求耗时在100ms内,5%的请求可能永远不会完成(长超时)。
  • 对于监控不要只看mean,可以看看耗时分布统计,比如 95th,99th。
  • 设置合理的超时,拒绝超长请求,或者当Server 不可用要主动失败。

超时决定着服务线程耗尽。

超时 - Case Stduy

  • SLB 入口 Nginx 没配置超时导致连锁故障。
  • 服务依赖的 DB 连接池漏配超时,导致请求阻塞,最终服务集体 OOM。
  • 下游服务发版耗时增加,而上游服务配置超时过短,导致上游请求失败。

源码分析

我们发现,不仅是 Go gRPC 服务之间超时可以传递(如果你拿到上游的 ctx 继续往下透传的话)。Go 和 Java 服务之间,超时也会随着调用链传递。那么 gRPC 的超时是如何做到跨进程跨语言传递的?

有朋友可能想到了 metadata,是否 gRPC 请求链上游设置了超时后,gRPC 框架底层将过期时间放在 metadata 里了?遗憾的是我们打印 metadata 后发现并未发现 timeout 字段踪迹。那么超时时间到底是怎样传递的呢?以 grpc-go 源码为例,我们来找下线索。

我们知道 gRPC 基于 HTTP2,HTTP2 传输的最小单位是 Frame(帧)。HTTP2 的帧包含很多类型:DATA Frame、HEADERS Frame、PRIORITY Frame、RST_STREAM Frame、CONTINUATON Frame 等。一个 HTTP2 请求/响应可以被拆成多个帧并行发送,每一帧都有一个 StreamID 来标记属于哪个 Stream。服务端收到 Frame 后,根据 StreamID 组装出原始请求数据。

对于 gRPC 而言,Data Frame 用来存放请求的 response payload;Headers Frame 可用来存放一些需要进行跨进程传递的数据,比如 grpc-status(RPC 请求状态码) 、 :path(RPC 完整路径) 等。那么超时时间是否也通过 HEADERS Frame 传递呢?

客户端设置 timeout

我们知道,用户定义好 protobuf 并通过 protoc 生成桩代码后,桩代码中已经包含了 gRPCCient 接口的实现,每一个在 protobuf 中定义的 RPC,底层都会通过 ClientConn. Invoke 向服务端发起调用:

比如对于这样的 protobuf:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
syntax = "proto3";

package proto;

service DemoService {
  rpc SayHi(HiRequest) returns (HiResponse);
}

message HiRequest {
  string name = 1;
}

message HiResponse {
  string message = 1;
}

生成的桩代码中已经包含了 Client 实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
type DemoServiceClient interface {
  SayHiOK(ctx context.Context, in *HiRequest, opts ...grpc.CallOption) (*HiResponse, error)
}

type demoServiceClient struct {
  cc *grpc.ClientConn
}

func NewDemoServiceClient(cc *grpc.ClientConn) DemoServiceClient {
  return &demoServiceClient{cc}
}

func (c *demoServiceClient) SayHiOK(ctx context.Context, in *HiRequest, opts ...grpc.CallOption) (*HiResponse, error) {
  out := new(HiResponse)
  // 调用 grpc.ClientConn.Invoke() 函数,grpc.ClientConn.Invoke() 内部最终会调用 invoke() 函数
  err := c.cc.Invoke(ctx, "/proto.DemoService/SayHi", in, out, opts...)
  if err != nil {
    return nil, err
  }
  return out, nil
}

客户端发起 gRPC 请求时,最终会调用 invoke() 方法,invoke() 源码大概如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
  // 构造 clientStream
  cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
  if err != nil {
    return err
  }
  // 发送 RPC 请求
  if err := cs.SendMsg(req); err != nil {
    return err
  }
  return cs.RecvMsg(reply)
}

我们看下 newClientStream 源码,newClientStream 源码比较复杂,我们挑重点看:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
   ......
  // 前面代码都不关注,这里生成clientStream对象,关注其ctx字段.
	cs := &clientStream{
		callHdr:      callHdr,
		ctx:          ctx,
		methodConfig: &mc,
		opts:         opts,
		callInfo:     c,
		cc:           cc,
		desc:         desc,
		codec:        c.codec,
		cp:           cp,
		comp:         comp,
		cancel:       cancel,
		beginTime:    beginTime,
		firstAttempt: true,
	}
	if !cc.dopts.disableRetry {
		cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
	}
	cs.binlog = binarylog.GetMethodLogger(method)

    // 在newAttemptLocked中会通过负载均衡算法来选择Ready状态的连接.
	// Only this initial attempt has stats/tracing.
	// TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
  // 构造新的 *csAttempt,newAttemptLocked 内部会获取 grpc.ClientTransport 并赋值给 *csAttemp.t
	if err := cs.newAttemptLocked(sh, trInfo); err != nil {
		cs.finish(err)
		return nil, err
	}

  // 在withRetry中最终会调用op函数,该函数会调用到csAttempt.newStream方法.
	op := func(a *csAttempt) error { return a.newStream() }
	if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
		cs.finish(err)
		return nil, err
	}

    ......
    // 下面的也不用关注了.
	return cs, nil

其中 csAttempt.newStream 实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type csAttempt struct {
  cs   *clientStream
  t    transport.ClientTransport // 客户端 Transport
  s    *transport.Stream         // 真正处理RPC 的 Stream
  ...
}

func (a *csAttempt) newStream() error {
	cs := a.cs
    cs.callHdr.PreviousAttempts = cs.numRetries
    // 这里会调用NewStream方法,t指向的是http2Client对象,cs是指向clientStream对象的.
	s, err := a.t.NewStream(cs.ctx, cs.callHdr)
	if err != nil {
		if _, ok := err.(transport.PerformedIOError); ok {
			// Return without converting to an RPC error so retry code can
			// inspect.
			return err
		}
		return toRPCErr(err)
	}
	cs.attempt.s = s
	cs.attempt.p = &parser{r: s}
	return nil
}

transport.ClientTransport 是一个接口,gRPC 内部 internal/transport.http2Client 实现了此接口。

http2Client.NewStream() 源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
  ctx = peer.NewContext(ctx, t.getPeer())
  // 重点关注createHeaderFields,这个方法会处理HEADERS的数据,来传播表头数据.
  headerFields, err := t.createHeaderFields(ctx, callHdr)
  ...
  hdr := &headerFrame{
    hf:        headerFields,
    endStream: false,
    ...
  }
  ...
  for {
    success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
      if !checkForStreamQuota(it) {
        return false
      }
      if !checkForHeaderListSize(it) {
        return false
      }
      return true
    }, hdr)
    ...
  return s, nil
}

createHeaderFields 实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
    // 主要处理HEADERS头部数据.
    ......

    // 支持的字段.
	headerFields := make([]hpack.HeaderField, 0, hfLen)
	headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
	headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
	headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
	headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
	headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(callHdr.ContentSubtype)})
	headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
	headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
	if callHdr.PreviousAttempts > 0 {
		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
	}

	if callHdr.SendCompress != "" {
		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: callHdr.SendCompress})
    }

  // 重点在这里,获取ctx的deadline时间,然后放入头部中的grpc-timeout字段中,客户端就是利用这个来吧超时时间传递到服务端的.
  // 如果透传过来的 ctx 被设置了 timeout/deadline,则在 HTTP2 headers frame 中添加 grpc-timeout 字段,
  // grpc-timeout 字段值被转化成 XhYmZs 字符串形式的超时时间
	if dl, ok := ctx.Deadline(); ok {
		// Send out timeout regardless its value. The server can detect timeout context by itself.
		// TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
		timeout := time.Until(dl)
		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: grpcutil.EncodeDuration(timeout)})
	}
	for k, v := range authData {
		headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
	}
	for k, v := range callAuthData {
		headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
	}
	if b := stats.OutgoingTags(ctx); b != nil {
		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
	}
	if b := stats.OutgoingTrace(ctx); b != nil {
		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
	}

	if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
		var k string
		for k, vv := range md {
			// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
			if isReservedHeader(k) {
				continue
			}
			for _, v := range vv {
				headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
			}
		}
		for _, vv := range added {
			for i, v := range vv {
				if i%2 == 0 {
					k = strings.ToLower(v)
					continue
				}
				// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
				if isReservedHeader(k) {
					continue
				}
				headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
			}
		}
	}
	if md, ok := t.md.(*metadata.MD); ok {
		for k, vv := range *md {
			if isReservedHeader(k) {
				continue
			}
			for _, v := range vv {
				headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
			}
		}
	}
	return headerFields, nil
}

总结:

  1. 在调用rpc接口时设置了超时的context,会最终传递到http2Client对象中,然后在处理HEADERS时会把deadline时间设置到头部参数grpc-timeout中,传递到服务端.
  2. http2Client对象是在调用Dial或DialContext过程中生成的,每个Endpoint对应一个该对象.

服务端解析 timeout

服务端通过 Serve 方法启动 grpc Server,监听来自客户端连接。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (s *Server) Serve(lis net.Listener) error {
  ...
  for {
    // 接收客户端的连接
    rawConn, err := lis.Accept()
    ...
    s.serveWG.Add(1)
    go func() {
      // 对每一个客户端的连接单独开一个协程来处理
      s.handleRawConn(rawConn)
      s.serveWG.Done()
    }()
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (s*Server) handleRawConn(rawConn net.Conn) {
  ...
  // 构造 HTTP2 Transport
  st := s.newHTTP2Transport(conn, authInfo)
  go func() {
    // 处理 HTTP2 Stream
    s.serveStreams(st)
    s.removeConn(st)
  }()
}

func (s *Server) serveStreams(st transport.ServerTransport) {
  defer st.Close()
  var wg sync.WaitGroup
  // http2Server 实现了 transport.ServerTransport 接口,此处会调用 http2Server.HandleSteams方法
// st.HandleStreams 方法签名中第一个参数 handle func(stream*transport.Stream) {}为函数类型,
  // handle 随后会在 operateHeaders 中被调用
  st.HandleStreams(func(stream *transport.Stream) {
    wg.Add(1)
    go func() {
      defer wg.Done()
      // 解析出 gPRC Service, gRPC method, gRPC request message,执行注册到 gRPC.Server 中的 RPC 方法
      s.handleStream(st, stream, s.traceInfo(st, stream))
    }()
  }, ...)
  wg.Wait()
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// http2Server.HandleStreams 会调用传入的 handle 处理 HTTP2 Stream
func (t*http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
  defer close(t.readerDone)
  for {
    // 该函数主要接收客户端发送过来的数据,重点关注处理HEADERS部分的数据.
    t.controlBuf.throttle()
    frame, err := t.framer.fr.ReadFrame()
    ...
    switch frame := frame.(type) {
      // 如果是 Headers 帧,则调用 operateHeaders 方法处理 Headers
      case*http2.MetaHeadersFrame:
      if t.operateHeaders(frame, handle, traceCtx) {
        t.Close()
        break
      }
    // 如果是 Data 帧,则调用 handleData 方法处理
    case *http2.DataFrame:
      t.handleData(frame)
      ...
    }
  }
}

// operateHeaders 解析 Headers 帧
func (t *http2Server) operateHeaders(frame*http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
  // 从HTTP2 Headers 帧中获取 StreamID
  streamID := frame.Header().StreamID
  state := &decodeState{
    serverSide: true,
  }
  // 从HTTP2 Headers 帧中解析出Header。如果其中包含 grpc-timeout HEADER,
  // 则解析出其值并赋值给 state.data.timeout,并将 state.data.timeoutSet 设成 true
  if err := state.decodeHeader(frame); err != nil {
    if se, ok := status.FromError(err); ok {
      ...
  }

  buf := newRecvBuffer()
  // 构造 HTTP2 Stream
  s := &Stream{
    id:             streamID,
    st:             t,
    buf:            buf,
    fc:             &inFlow{limit: uint32(t.initialWindowSize)},
    recvCompress:   state.data.encoding,
    method:         state.data.method,
    contentSubtype: state.data.contentSubtype,
  }
  ...
  // 如果 state.data.timeoutSet 为 true,则构造一个新的带 timeout 的 ctx 覆盖原 s.ctx
  // s.ctx 最终会透传到用户实现的 gRPC Handler 中,参与业务逻辑处理
  // 见 server.go 中 processUnaryRPC 内:
  //    ctx := NewContextWithServerTransportStream(stream.Context(), stream)
  //    reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
  // 此处不再赘述
  if state.data.timeoutSet {
    s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
  } else {
    s.ctx, s.cancel = context.WithCancel(t.ctx)
  }
  ...
  t.controlBuf.put(&registerStream{
    streamID: s.id,
    wq:       s.wq,
  })
  // 调用 serveStreams 定义好的 handle,执行gRPC调用
  handle(s)
  return false
}

decodeHeader 会遍历 frame 中所有 Fields,并调用 processHeaderField 对 HTTP2 HEADERS 帧中的特定的 Field 进行处理。

  • 比如可以从 :path 中解析出包含 protobuf package、service name 和 RPC method name 的完整路径;
  • 比如可以从 grpc-timeout 中解析出上游传递过来的 timeout;

decodeHeader 内部实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (d *decodeState) decodeHeader(frame*http2.MetaHeadersFrame) error {
  ...
  // 遍历Headers帧,解析Field
  for _, hf := range frame.Fields {
    d.processHeaderField(hf)
  }
}

func (d *decodeState) processHeaderField(f hpack.HeaderField) {
  switch f.Name {
    ...
    // 解析出 grpc-timeout
    case "grpc-timeout":
      d.data.timeoutSet = true
      var err error
      if d.data.timeout, err = decodeTimeout(f.Value); err != nil {
        d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed time-out: %v", err)
      }
    ...
    // 解析出 grpc 带 protobuf package path、Service name、RPC method name 的完整路径
    // 形如 /package.service/method
    case ":path":
      d.data.method = f.Value
  }
}

至此可以看到,gRPC 框架确实是通过 HTTP2 HEADERS Frame 中的 grpc-timeout 字段来实现跨进程传递超时时间。

总结

  • 客户端客户端发起 RPC 调用时传入了带 timeout 的 ctx
  • gRPC 框架底层通过 HTTP2 协议发送 RPC 请求时,将 timeout 值写入到 grpc-timeout HEADERS Frame 中
  • 服务端接收 RPC 请求时,gRPC 框架底层解析 HTTP2 HEADERS 帧,读取 grpc-timeout 值,并覆盖透传到实际处理 RPC 请求的业务 gPRC Handle 中
  • 如果此时服务端又发起对其他 gRPC 服务的调用,且使用的是透传的 ctx,这个 timeout 会减去在本进程中耗时,从而导致这个 timeout 传递到下一个 gRPC 服务端时变短,这样即实现了所谓的 超时传递 。

具体使用

建立连接时超时控制

客户端建立连接时,使用的Dial()函数,它位于 google.golang.org/grpc/clientconn.go 中,我们看看这个函数内容:

1
2
3
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
   return DialContext(context.Background(), target, opts...)
}

它里面调用的 DialContext() 函数,这个函数非常长,他们在同一个文件中,它是实际执行的函数,这里面就有context的timeout和Done相关操作。你也可以到google.golang.org/grpc/clientconn.go文件中去看看这个函数DialContext具体是干嘛的。

使用的时候传入设置timeout的context,如下:

1
2
3
ctx, cancel := context.Timeout(context.Bakcground(), time.Second*5)
defer cancel()
conn, err := grpc.DialContext(ctx, address, grpc.WithBlock(), grpc.WithInsecure())
  • grpc.WithInsecure() gRPC是建立在HTTP/2上的,所以对TLS提供了很好的支持。如果在客户端建立连接过程中设置 grpc.WithInsecure() 就可以跳过对服务器证书的验证。
  • grpc.WithBlock() 这个参数会阻塞等待握手成功。 因为用Dial连接时是异步连接,连接状态为正在连接,如果设置了这个参数就是同步连接,会阻塞等待握手成功。 这个还和超时设置有关,如果你没有设置这个参数,那么context超时控制将会失效。

调用Dial建立连接默认只是返回一个ClientConn的指针,相当于new了一个ClientConn 把指针返回给你。并不是一定要建立真实的h2连接.至于真实的连接建立实际上是一个异步的过程。当然了如果你想等真实的链接完全建立再返回ClientConn可以通过WithBlock传入Options来实现,当然了这样的话链接如果建立不成功就会一直阻塞直到Contex超时。

调用时超时

函数的调用超时控制

1
2
3
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5)
defer cancel()
result, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})

举例

Client

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main() {
    ...
	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Duration(5 * time.Second)))
	defer cancel()

	client := pb.NewSearchServiceClient(conn)
	resp, err := client.Search(ctx, &pb.SearchRequest{
		Request: "gRPC",
	})
	if err != nil {
		statusErr, ok := status.FromError(err)
		if ok {
			if statusErr.Code() == codes.DeadlineExceeded {
				log.Fatalln("client.Search err: deadline")
			}
		}

		log.Fatalf("client.Search err: %v", err)
	}

	log.Printf("resp: %s", resp.GetResponse())
}

context.WithDeadline:会返回最终上下文截止时间。第一个形参为父上下文,第二个形参为调整的截止时间。若父级时间早于子级时间,则以父级时间为准,否则以子级时间为最终截止时间

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
	if cur, ok := parent.Deadline(); ok && cur.Before(d) {
		// The current deadline is already sooner than the new one.
		return WithCancel(parent)
	}
	c := &timerCtx{
		cancelCtx: newCancelCtx(parent),
		deadline:  d,
	}
	propagateCancel(parent, c)
	dur := time.Until(d)
	if dur <= 0 {
		c.cancel(true, DeadlineExceeded) // deadline has already passed
		return c, func() { c.cancel(true, Canceled) }
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.err == nil {
		c.timer = time.AfterFunc(dur, func() {
			c.cancel(true, DeadlineExceeded)
		})
	}
	return c, func() { c.cancel(true, Canceled) }
}

context.WithTimeout:很常见的另外一个方法,是便捷操作。实际上是对于 WithDeadline 的封装

1
2
3
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
	return WithDeadline(parent, time.Now().Add(timeout))
}

status.FromError:返回 GRPCStatus 的具体错误码,若为非法,则直接返回 codes.Unknown

Server

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
type SearchService struct{}

func (s *SearchService) Search(ctx context.Context, r *pb.SearchRequest) (*pb.SearchResponse, error) {
	for i := 0; i < 5; i++  {
		if ctx.Err() == context.Canceled {
			return nil, status.Errorf(codes.Canceled, "SearchService.Search canceled")
		}

		time.Sleep(1 * time.Second)
	}

	return &pb.SearchResponse{Response: r.GetRequest() + " Server"}, nil
}

func main() {
	...
}

而在 Server 端,由于 Client 已经设置了截止时间。Server 势必要去检测它

否则如果 Client 已经结束掉了,Server 还傻傻的在那执行,这对资源是一种极大的浪费

因此在这里需要用 ctx.Err() == context.Canceled 进行判断,为了模拟场景我们加了循环和睡眠.

参考

gRPC 系列——grpc超时传递原理

Golang gRPC学习(04): Deadlines超时限制

「连载九」gRPC Deadlines