架构

基于Go kit的应用程序体系结构包含三个主要组件:

  • Transport
  • Endpoint
  • Service

Transport

当您构建基于微服务的分布式系统时,服务通常使用诸如HTTP或gRPC之类的具体传输方式,或者使用诸如NATS之类的发布/订阅系统彼此通信。Go工具包中的Transport层绑定到具体的Transport。Go kit支持使用HTTP,gRPC,NATS,AMQP和Thrift为服务提供各种传输。由于Go kit服务仅专注于实现业务逻辑,而对具体传输没有任何知识,因此您可以为同一服务提供多个Transport。例如,可以使用HTTP和gRPC公开单个Go kit服务。

Endpoint

Endpoint是服务器和客户端的基本构建块。在Go kit中,主要消息传递模式是RPC。Endpoint表示单个RPC方法。Go工具包服务中的每个服务方法都转换为Endpoint,以在服务器和客户端之间进行RPC样式的通信。每个Endpoint通过使用诸如HTTP或gRPC之类的具体传输,通过传输层将服务方法公开给外界。可以通过使用多个传输来暴露单个Endpoint。

Service

业务逻辑在服务中实现。Go kit服务被建模为接口。服务中的业务逻辑包含核心业务逻辑,这些核心业务逻辑不应对Endpoint或具体传输(如HTTP或gRPC)或请求和响应消息类型的编码和解码有任何了解。这将鼓励您遵循基于Go kit的服务的干净架构。每种服务方法都通过使用适配器转换为Endpoint,并通过使用具体的传输方式公开。由于采用了干净的体系结构,因此可以使用多个传输方式来公开单个Go kit服务。

Proto

创建一个新项目, 并且新建 add.proto 文件, 使用 pb3 定义服务, 本文的示例文件来自 go-kit 的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
syntax = "proto3";

package pb;

// The Add service definition.
service Add {
  // Sums two integers.
  rpc Sum (SumRequest) returns (SumReply) {}

  // Concatenates two strings
  rpc Concat (ConcatRequest) returns (ConcatReply) {}
}

// The sum request contains two parameters.
message SumRequest {
  int64 a = 1;
  int64 b = 2;
}

// The sum response contains the result of the calculation.
message SumReply {
  int64 v = 1;
}

// The Concat request contains two parameters.
message ConcatRequest {
  string a = 1;
  string b = 2;
}

// The Concat response contains the result of the concatenation.
message ConcatReply {
  string v = 1;
}

这里定义名为 Add 的服务, 里面有 Sum 和 Concat 两个方法, 本例中就实现这两个方法. 使用以下命令编译服务定义, 得到 add.pb.go 文件

1
protoc add.proto --go_out=plugins=grpc:.

Service

新建 service.go 定义并实现服务:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package server

import "context"

//interface
type AddService interface {
   Sum(_ context.Context, a, b int) (v int)
   Concat(_ context.Context, a, b string) (v string)
}

//service struct
type addService struct{}

//returns a implementation
func New() AddService {
   return addService{}
}

func (addService) Sum(_ context.Context, a, b int) (v int) {
   return a + b
}

func (addService) Concat(_ context.Context, a, b string) (v string) {
   return a + b
}

Endpoint

Request模型:接收http客户端的请求后,把请求参数转为请求模型对象,用于后续业务逻辑处理。观察Service接口可以发现四个接口方法的输入参数均为两个整数,区别在于运算类型不同,所以请求模型只需包含三个字段,即:请求类型、第一个整数、第二个整数。

Response模型:用于向客户端响应结果。对于响应模型可以设置两个字段:一是结果,用于表示正常情况下的运算结果;二是错误描述,用于表示异常时的错误描述。

Endpoint:Endpoint是可以包装到http.Handler中的特殊方法,gokit采用装饰器模式,把Service应该执行的逻辑封装到Endpoint方法中执行。

Endpoint的作用是:调用Service中相应的方法处理请求对象(ArithmeticRequest),返回响应对象(ArithmeticResponse)。所以MakeEndpoint的参数是svc Service,是为了在函数中直接调用service层的函数

新建 endpoints.go 实现需要的两个 endpoint.您需要注意的一件事是,在Endpoints结构实现了Service接口。创建GRPC客户端连接时需要此机制。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package server

import (
   "context"
   "github.com/go-kit/kit/endpoint"
)

//all endpoints required by AddService.
type Endpoints struct {
   SumEndpoint       endpoint.Endpoint
   ConcatEndpoint endpoint.Endpoint
}

type sumRequest struct {
   A int
   B int
}

type sumResponse struct {
   V int
}

// MakeSumEndpoint returns an endpoint that invokes Sum on the AddService
// for server
func MakeSumEndpoint(svc AddService) endpoint.Endpoint {
   return func(ctx context.Context, request interface{}) (interface{}, error) {
      req := request.(sumRequest)
      v := svc.Sum(ctx, req.A, req.B)
      return sumResponse{v}, nil
   }
}

// Sum implements AddService
//for client
func (e Endpoints) Sum(ctx context.Context, a, b int) int {
   req := sumRequest{A:a, B:b}
   res, err := e.SumEndpoint(ctx, req)
   if err != nil {
      return sumResponse{0}.V
   }
   return res.(sumResponse).V
}

type concatRequest struct {
   A string
   B string
}

type concatResponse struct {
   V string
}

// MakeConcatEndpoint returns an endpoint that invokes Sum on the AddService
// for server
func MakeConcatEndpoint(svc AddService) endpoint.Endpoint {
   return func(ctx context.Context, request interface{}) (interface{}, error) {
      req := request.(concatRequest)
      v := svc.Concat(ctx, req.A, req.B)
      return concatResponse{v}, nil
   }
}

// Concat implements AddService
//for client
func (e Endpoints) Concat(ctx context.Context, a, b string) string {
   req := concatRequest{A:a, B:b}
   res, err := e.ConcatEndpoint(ctx, req)
   if err != nil {
      return concatResponse{"error"}.V
   }
   return res.(concatResponse).V
}

Transport

Transport层用于接收用户网络请求并将其转为Endpoint可以处理的对象,然后交由Endpoint执行,最后将处理结果转为响应对象向用户响应。为了完成这项工作,Transport需要具备两个工具方法:

  • 解码器:把用户的请求内容转换为请求对象(ArithmeticRequest);
  • 编码器:把处理结果转换为响应对象(ArithmeticResponse);

HTTP

对于 http 传输层比较简单, 实现 http handler 和响应的 encode 和 decode. 新建 http_transport.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package server

import (
   "context"
   "encoding/json"
   httptransport "github.com/go-kit/kit/transport/http"
   "net/http"
)

func MakeHTTPHandler(endpoints Endpoints) http.Handler {
   m := http.NewServeMux()
   m.Handle("/sum",
      httptransport.NewServer(
         endpoints.SumEndpoint,
         DecodeHTTPSumRequest,
         EncodeHTTPResponse,
      ))

   m.Handle("/concat",
      httptransport.NewServer(
         endpoints.ConcatEndpoint,
         DecodeHTTPConcatRequest,
         EncodeHTTPResponse,
      ))

   return m
}

func DecodeHTTPSumRequest(_ context.Context, r *http.Request) (interface{}, error) {
   var request sumRequest
   if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
      return nil, err
   }
   return request, nil
}

func DecodeHTTPConcatRequest(_ context.Context, r *http.Request) (interface{}, error) {
   var request concatRequest
   if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
      return nil, err
   }
   return request, nil
}

func EncodeHTTPResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
   return json.NewEncoder(w).Encode(response)
}

gRPC

gRPC 也类似, 实现 server 和响应的 encode 和 decode 方法即可, 但需要对所有的请求和相应定义 encode 和 decode 方法, 部分供 gRPC 服务端使用, 部分供 gRPC 客户端使用. 新建 grpc_transport.go

此处 encode / decode 比较繁琐, 注意不要写错,这些代码可以单独创建一个新文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package server

//Server-side bindings for the gRPC transport

import (
   "context"
   grpctransport "github.com/go-kit/kit/transport/grpc"
   "go-kit-demo/pb"
)

//returns a set of handlers available as a gRPC AddServer.
func MakeGRPCServer(endpoints Endpoints) pb.AddServer {
   return &grpcServer{
      sum: grpctransport.NewServer(
         endpoints.SumEndpoint,
         DecodeGRPCSumRequest,
         EncodeGRPCSumResponse,
      ),
      concat: grpctransport.NewServer(
         endpoints.ConcatEndpoint,
         DecodeGRPCConcatRequest,
         EncodeGRPCConcatResponse,
      ),
   }
}

type grpcServer struct {
   sum    grpctransport.Handler
   concat     grpctransport.Handler
}

func (s *grpcServer) Sum(ctx context.Context, req *pb.SumRequest) (*pb.SumReply, error) {
   _, resp, err := s.sum.ServeGRPC(ctx, req)
   if err != nil {
      return nil, err
   }
   return resp.(*pb.SumReply), nil
}

func (s *grpcServer) Concat(ctx context.Context, req *pb.ConcatRequest) (*pb.ConcatReply, error) {
   _, resp, err := s.concat.ServeGRPC(ctx, req)
   if err != nil {
      return nil, err
   }
   return resp.(*pb.ConcatReply), nil
}

//Encode & Decode Func
//https://github.com/go-kit/kit/blob/master/transport/grpc/encode_decode.go

// DecodeGRPCSumRequest is a transport/grpc.DecodeRequestFunc
// for server
func DecodeGRPCSumRequest(_ context.Context, request interface{}) (interface{}, error) {
   req := request.(*pb.SumRequest)
   return sumRequest{int(req.A), int(req.B)}, nil
}

// EncodeGRPCSumRequest is a transport/grpc.EncodeRequestFunc
// for client
func EncodeGRPCSumRequest(_ context.Context, request interface{}) (interface{}, error) {
   req := request.(sumRequest)
   return &pb.SumRequest{A: int64(req.A), B: int64(req.B)}, nil
}

// EncodeGRPCSumResponse is a transport/grpc.EncodeResponseFunc
// for server
func EncodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) {
   resp := response.(sumResponse)
   return &pb.SumReply{V: int64(resp.V)}, nil
}

// DecodeGRPCSumResponse is a transport/grpc.DecodeResponseFunc
// for client
func DecodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) {
   resp := response.(*pb.SumReply)
   return sumResponse{int(resp.V)}, nil
}

func DecodeGRPCConcatRequest(_ context.Context, request interface{}) (interface{}, error) {
   req := request.(*pb.ConcatRequest)
   return concatRequest{req.A, req.B}, nil
}

func EncodeGRPCConcatRequest(_ context.Context, request interface{}) (interface{}, error) {
   req := request.(concatRequest)
   return &pb.ConcatRequest{A: req.A, B: req.B}, nil
}

func EncodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) {
   resp := response.(concatResponse)
   return &pb.ConcatReply{V: resp.V}, nil
}

func DecodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) {
   resp := response.(*pb.ConcatReply)
   return concatResponse{resp.V}, nil
}

Server

最后实现 HTTP 和 gRPC 服务即可, 新建 main.go, 例子中对服务同时实现了 HTTP 传输层与 gRPC 传输层. 运行 main.go 即可启动服务.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package main

import (
   "flag"
   "fmt"
   "github.com/go-kit/kit/log"
   "go-kit-demo/pb"
   "go-kit-demo/server"
   "google.golang.org/grpc"
   "net"
   "net/http"
   "os"
   "os/signal"
   "syscall"
)

func main() {
   httpAddr := flag.String("HTTP", ":8890", "HTTP server")
   gRPCAddr := flag.String("gRPC", ":8891", "gRPC server")
   flag.Parse()

   var logger log.Logger
   {
      logger = log.NewLogfmtLogger(os.Stdout)
      logger = log.With(logger, "ts", log.DefaultTimestampUTC)
      logger = log.With(logger, "caller", log.DefaultCaller)
   }
   logger.Log("msg", "Server Start...")
   defer logger.Log("msg", "Closed")

   svc := server.New()

   endpoints := server.Endpoints{
      SumEndpoint:    server.MakeSumEndpoint(svc),
      ConcatEndpoint: server.MakeConcatEndpoint(svc),
   }

   // Error channel.
   errc := make(chan error)

   // Interrupt handler.
   go func() {
      c := make(chan os.Signal, 1)
      signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
      errc <- fmt.Errorf("%s", <-c)
   }()

   // HTTP transport.
   go func() {
      logger := log.With(logger, "transport", "HTTP")
      logger.Log("addr", *httpAddr)

      handler := server.MakeHTTPHandler(endpoints)
      errc <- http.ListenAndServe(*httpAddr, handler)
   }()

   // gRPC transport.
   go func() {
      logger := log.With(logger, "transport", "gRPC")
      logger.Log("addr", *gRPCAddr)

      listener, err := net.Listen("tcp", *gRPCAddr)
      if err != nil {
         errc <- err
         return
      }

      srv := server.MakeGRPCServer(endpoints)
      s := grpc.NewServer()
      pb.RegisterAddServer(s, srv)
      errc <- s.Serve(listener)
   }()

   logger.Log("exit", <-errc)
}

Client

对于 HTTP Server, 我们可以直接使用 curl 进行调试

1
2
3
4
5
6
7
#!/usr/bin/env sh -v

#Sum Response
curl -d '{"a":11111, "b":22222}' http://127.0.0.1:8890/sum

#Concat Response
curl -d '{"a":"11111", "b":"22222"}' http://127.0.0.1:8890/concat

同时实现一个 gRPC Client, 此处需要用到 grpc_transport.go 中定义的 encode / decode 方法.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package grpc

import (
   grpctransport "github.com/go-kit/kit/transport/grpc"
   "go-kit-demo/pb"
   "go-kit-demo/server"
   "google.golang.org/grpc"
)

func New(conn *grpc.ClientConn) server.AddService {
   sumEndpoint := grpctransport.NewClient(
      conn, "pb.Add", "Sum",
      server.EncodeGRPCSumRequest,
      server.DecodeGRPCSumResponse,
      pb.SumReply{},
   ).Endpoint()

   concatEndpoint := grpctransport.NewClient(
      conn, "pb.Add", "Concat",
      server.EncodeGRPCConcatRequest,
      server.DecodeGRPCConcatResponse,
      pb.ConcatReply{},
   ).Endpoint()

   return server.Endpoints{
      SumEndpoint:    sumEndpoint,
      ConcatEndpoint: concatEndpoint,
   }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
   "context"
   "flag"
   grpcclient "go-kit-demo/client/grpc"
   "go-kit-demo/server"
   "google.golang.org/grpc"
   "log"
   "time"
)

func main() {
   gRPCAddr := flag.String("gRPC", ":8891", "gRPC client")
   flag.Parse()

   conn, err := grpc.Dial(
      *gRPCAddr, grpc.WithInsecure(),
      grpc.WithTimeout(time.Second),
   )

   if err != nil {
      log.Fatalln("gRPC dial error:", err)
   }
   defer conn.Close()

   addService := grpcclient.New(conn)

   println("Sum Response:")
   output := addService.Sum(context.Background(), 11111, 22222)
   println(output)
   println("Concat Response:")
   output := addService.Concat(context.Background(), "11111", "22222")
   println(output)
}