架构
基于Go kit的应用程序体系结构包含三个主要组件:
- Transport
- Endpoint
- Service
Transport
当您构建基于微服务的分布式系统时,服务通常使用诸如HTTP或gRPC之类的具体传输方式,或者使用诸如NATS之类的发布/订阅系统彼此通信。Go工具包中的Transport层绑定到具体的Transport。Go kit支持使用HTTP,gRPC,NATS,AMQP和Thrift为服务提供各种传输。由于Go kit服务仅专注于实现业务逻辑,而对具体传输没有任何知识,因此您可以为同一服务提供多个Transport。例如,可以使用HTTP和gRPC公开单个Go kit服务。
Endpoint
Endpoint是服务器和客户端的基本构建块。在Go kit中,主要消息传递模式是RPC。Endpoint表示单个RPC方法。Go工具包服务中的每个服务方法都转换为Endpoint,以在服务器和客户端之间进行RPC样式的通信。每个Endpoint通过使用诸如HTTP或gRPC之类的具体传输,通过传输层将服务方法公开给外界。可以通过使用多个传输来暴露单个Endpoint。
Service
业务逻辑在服务中实现。Go kit服务被建模为接口。服务中的业务逻辑包含核心业务逻辑,这些核心业务逻辑不应对Endpoint或具体传输(如HTTP或gRPC)或请求和响应消息类型的编码和解码有任何了解。这将鼓励您遵循基于Go kit的服务的干净架构。每种服务方法都通过使用适配器转换为Endpoint,并通过使用具体的传输方式公开。由于采用了干净的体系结构,因此可以使用多个传输方式来公开单个Go kit服务。
Proto
创建一个新项目, 并且新建 add.proto 文件, 使用 pb3 定义服务, 本文的示例文件来自 go-kit 的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
syntax = "proto3";
package pb;
// The Add service definition.
service Add {
// Sums two integers.
rpc Sum (SumRequest) returns (SumReply) {}
// Concatenates two strings
rpc Concat (ConcatRequest) returns (ConcatReply) {}
}
// The sum request contains two parameters.
message SumRequest {
int64 a = 1;
int64 b = 2;
}
// The sum response contains the result of the calculation.
message SumReply {
int64 v = 1;
}
// The Concat request contains two parameters.
message ConcatRequest {
string a = 1;
string b = 2;
}
// The Concat response contains the result of the concatenation.
message ConcatReply {
string v = 1;
}
|
这里定义名为 Add 的服务, 里面有 Sum 和 Concat 两个方法, 本例中就实现这两个方法.
使用以下命令编译服务定义, 得到 add.pb.go 文件
1
|
protoc add.proto --go_out=plugins=grpc:.
|
Service
新建 service.go 定义并实现服务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
package server
import "context"
//interface
type AddService interface {
Sum(_ context.Context, a, b int) (v int)
Concat(_ context.Context, a, b string) (v string)
}
//service struct
type addService struct{}
//returns a implementation
func New() AddService {
return addService{}
}
func (addService) Sum(_ context.Context, a, b int) (v int) {
return a + b
}
func (addService) Concat(_ context.Context, a, b string) (v string) {
return a + b
}
|
Endpoint
Request模型:接收http客户端的请求后,把请求参数转为请求模型对象,用于后续业务逻辑处理。观察Service接口可以发现四个接口方法的输入参数均为两个整数,区别在于运算类型不同,所以请求模型只需包含三个字段,即:请求类型、第一个整数、第二个整数。
Response模型:用于向客户端响应结果。对于响应模型可以设置两个字段:一是结果,用于表示正常情况下的运算结果;二是错误描述,用于表示异常时的错误描述。
Endpoint:Endpoint是可以包装到http.Handler中的特殊方法,gokit采用装饰器模式,把Service应该执行的逻辑封装到Endpoint方法中执行。
Endpoint的作用是:调用Service中相应的方法处理请求对象(ArithmeticRequest),返回响应对象(ArithmeticResponse)。所以MakeEndpoint
的参数是svc Service
,是为了在函数中直接调用service层的函数
新建 endpoints.go 实现需要的两个 endpoint.您需要注意的一件事是,在Endpoints结构实现了Service接口。创建GRPC客户端连接时需要此机制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
package server
import (
"context"
"github.com/go-kit/kit/endpoint"
)
//all endpoints required by AddService.
type Endpoints struct {
SumEndpoint endpoint.Endpoint
ConcatEndpoint endpoint.Endpoint
}
type sumRequest struct {
A int
B int
}
type sumResponse struct {
V int
}
// MakeSumEndpoint returns an endpoint that invokes Sum on the AddService
// for server
func MakeSumEndpoint(svc AddService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(sumRequest)
v := svc.Sum(ctx, req.A, req.B)
return sumResponse{v}, nil
}
}
// Sum implements AddService
//for client
func (e Endpoints) Sum(ctx context.Context, a, b int) int {
req := sumRequest{A:a, B:b}
res, err := e.SumEndpoint(ctx, req)
if err != nil {
return sumResponse{0}.V
}
return res.(sumResponse).V
}
type concatRequest struct {
A string
B string
}
type concatResponse struct {
V string
}
// MakeConcatEndpoint returns an endpoint that invokes Sum on the AddService
// for server
func MakeConcatEndpoint(svc AddService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(concatRequest)
v := svc.Concat(ctx, req.A, req.B)
return concatResponse{v}, nil
}
}
// Concat implements AddService
//for client
func (e Endpoints) Concat(ctx context.Context, a, b string) string {
req := concatRequest{A:a, B:b}
res, err := e.ConcatEndpoint(ctx, req)
if err != nil {
return concatResponse{"error"}.V
}
return res.(concatResponse).V
}
|
Transport
Transport层用于接收用户网络请求并将其转为Endpoint可以处理的对象,然后交由Endpoint执行,最后将处理结果转为响应对象向用户响应。为了完成这项工作,Transport需要具备两个工具方法:
- 解码器:把用户的请求内容转换为请求对象(ArithmeticRequest);
- 编码器:把处理结果转换为响应对象(ArithmeticResponse);
HTTP
对于 http 传输层比较简单, 实现 http handler 和响应的 encode 和 decode. 新建 http_transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
package server
import (
"context"
"encoding/json"
httptransport "github.com/go-kit/kit/transport/http"
"net/http"
)
func MakeHTTPHandler(endpoints Endpoints) http.Handler {
m := http.NewServeMux()
m.Handle("/sum",
httptransport.NewServer(
endpoints.SumEndpoint,
DecodeHTTPSumRequest,
EncodeHTTPResponse,
))
m.Handle("/concat",
httptransport.NewServer(
endpoints.ConcatEndpoint,
DecodeHTTPConcatRequest,
EncodeHTTPResponse,
))
return m
}
func DecodeHTTPSumRequest(_ context.Context, r *http.Request) (interface{}, error) {
var request sumRequest
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
return nil, err
}
return request, nil
}
func DecodeHTTPConcatRequest(_ context.Context, r *http.Request) (interface{}, error) {
var request concatRequest
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
return nil, err
}
return request, nil
}
func EncodeHTTPResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
return json.NewEncoder(w).Encode(response)
}
|
gRPC
gRPC 也类似, 实现 server 和响应的 encode 和 decode 方法即可, 但需要对所有的请求和相应定义 encode 和 decode 方法, 部分供 gRPC 服务端使用, 部分供 gRPC 客户端使用. 新建 grpc_transport.go
此处 encode / decode 比较繁琐, 注意不要写错,这些代码可以单独创建一个新文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
package server
//Server-side bindings for the gRPC transport
import (
"context"
grpctransport "github.com/go-kit/kit/transport/grpc"
"go-kit-demo/pb"
)
//returns a set of handlers available as a gRPC AddServer.
func MakeGRPCServer(endpoints Endpoints) pb.AddServer {
return &grpcServer{
sum: grpctransport.NewServer(
endpoints.SumEndpoint,
DecodeGRPCSumRequest,
EncodeGRPCSumResponse,
),
concat: grpctransport.NewServer(
endpoints.ConcatEndpoint,
DecodeGRPCConcatRequest,
EncodeGRPCConcatResponse,
),
}
}
type grpcServer struct {
sum grpctransport.Handler
concat grpctransport.Handler
}
func (s *grpcServer) Sum(ctx context.Context, req *pb.SumRequest) (*pb.SumReply, error) {
_, resp, err := s.sum.ServeGRPC(ctx, req)
if err != nil {
return nil, err
}
return resp.(*pb.SumReply), nil
}
func (s *grpcServer) Concat(ctx context.Context, req *pb.ConcatRequest) (*pb.ConcatReply, error) {
_, resp, err := s.concat.ServeGRPC(ctx, req)
if err != nil {
return nil, err
}
return resp.(*pb.ConcatReply), nil
}
//Encode & Decode Func
//https://github.com/go-kit/kit/blob/master/transport/grpc/encode_decode.go
// DecodeGRPCSumRequest is a transport/grpc.DecodeRequestFunc
// for server
func DecodeGRPCSumRequest(_ context.Context, request interface{}) (interface{}, error) {
req := request.(*pb.SumRequest)
return sumRequest{int(req.A), int(req.B)}, nil
}
// EncodeGRPCSumRequest is a transport/grpc.EncodeRequestFunc
// for client
func EncodeGRPCSumRequest(_ context.Context, request interface{}) (interface{}, error) {
req := request.(sumRequest)
return &pb.SumRequest{A: int64(req.A), B: int64(req.B)}, nil
}
// EncodeGRPCSumResponse is a transport/grpc.EncodeResponseFunc
// for server
func EncodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) {
resp := response.(sumResponse)
return &pb.SumReply{V: int64(resp.V)}, nil
}
// DecodeGRPCSumResponse is a transport/grpc.DecodeResponseFunc
// for client
func DecodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) {
resp := response.(*pb.SumReply)
return sumResponse{int(resp.V)}, nil
}
func DecodeGRPCConcatRequest(_ context.Context, request interface{}) (interface{}, error) {
req := request.(*pb.ConcatRequest)
return concatRequest{req.A, req.B}, nil
}
func EncodeGRPCConcatRequest(_ context.Context, request interface{}) (interface{}, error) {
req := request.(concatRequest)
return &pb.ConcatRequest{A: req.A, B: req.B}, nil
}
func EncodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) {
resp := response.(concatResponse)
return &pb.ConcatReply{V: resp.V}, nil
}
func DecodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) {
resp := response.(*pb.ConcatReply)
return concatResponse{resp.V}, nil
}
|
Server
最后实现 HTTP 和 gRPC 服务即可, 新建 main.go, 例子中对服务同时实现了 HTTP 传输层与 gRPC 传输层. 运行 main.go 即可启动服务.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
package main
import (
"flag"
"fmt"
"github.com/go-kit/kit/log"
"go-kit-demo/pb"
"go-kit-demo/server"
"google.golang.org/grpc"
"net"
"net/http"
"os"
"os/signal"
"syscall"
)
func main() {
httpAddr := flag.String("HTTP", ":8890", "HTTP server")
gRPCAddr := flag.String("gRPC", ":8891", "gRPC server")
flag.Parse()
var logger log.Logger
{
logger = log.NewLogfmtLogger(os.Stdout)
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
logger = log.With(logger, "caller", log.DefaultCaller)
}
logger.Log("msg", "Server Start...")
defer logger.Log("msg", "Closed")
svc := server.New()
endpoints := server.Endpoints{
SumEndpoint: server.MakeSumEndpoint(svc),
ConcatEndpoint: server.MakeConcatEndpoint(svc),
}
// Error channel.
errc := make(chan error)
// Interrupt handler.
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
errc <- fmt.Errorf("%s", <-c)
}()
// HTTP transport.
go func() {
logger := log.With(logger, "transport", "HTTP")
logger.Log("addr", *httpAddr)
handler := server.MakeHTTPHandler(endpoints)
errc <- http.ListenAndServe(*httpAddr, handler)
}()
// gRPC transport.
go func() {
logger := log.With(logger, "transport", "gRPC")
logger.Log("addr", *gRPCAddr)
listener, err := net.Listen("tcp", *gRPCAddr)
if err != nil {
errc <- err
return
}
srv := server.MakeGRPCServer(endpoints)
s := grpc.NewServer()
pb.RegisterAddServer(s, srv)
errc <- s.Serve(listener)
}()
logger.Log("exit", <-errc)
}
|
Client
对于 HTTP Server, 我们可以直接使用 curl 进行调试
1
2
3
4
5
6
7
|
#!/usr/bin/env sh -v
#Sum Response
curl -d '{"a":11111, "b":22222}' http://127.0.0.1:8890/sum
#Concat Response
curl -d '{"a":"11111", "b":"22222"}' http://127.0.0.1:8890/concat
|
同时实现一个 gRPC Client, 此处需要用到 grpc_transport.go 中定义的 encode / decode 方法.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package grpc
import (
grpctransport "github.com/go-kit/kit/transport/grpc"
"go-kit-demo/pb"
"go-kit-demo/server"
"google.golang.org/grpc"
)
func New(conn *grpc.ClientConn) server.AddService {
sumEndpoint := grpctransport.NewClient(
conn, "pb.Add", "Sum",
server.EncodeGRPCSumRequest,
server.DecodeGRPCSumResponse,
pb.SumReply{},
).Endpoint()
concatEndpoint := grpctransport.NewClient(
conn, "pb.Add", "Concat",
server.EncodeGRPCConcatRequest,
server.DecodeGRPCConcatResponse,
pb.ConcatReply{},
).Endpoint()
return server.Endpoints{
SumEndpoint: sumEndpoint,
ConcatEndpoint: concatEndpoint,
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
package main
import (
"context"
"flag"
grpcclient "go-kit-demo/client/grpc"
"go-kit-demo/server"
"google.golang.org/grpc"
"log"
"time"
)
func main() {
gRPCAddr := flag.String("gRPC", ":8891", "gRPC client")
flag.Parse()
conn, err := grpc.Dial(
*gRPCAddr, grpc.WithInsecure(),
grpc.WithTimeout(time.Second),
)
if err != nil {
log.Fatalln("gRPC dial error:", err)
}
defer conn.Close()
addService := grpcclient.New(conn)
println("Sum Response:")
output := addService.Sum(context.Background(), 11111, 22222)
println(output)
println("Concat Response:")
output := addService.Concat(context.Background(), "11111", "22222")
println(output)
}
|