服务端trace

增加trace代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
//创建zipkin上报管理器
reporter := http.NewReporter("http://localhost:9411/api/v2/spans")
  
//运行结束,关闭上报管理器的for-select协程
defer reporter.Close()  

//创建trace跟踪器
zkTracer, err := opzipkin.NewTracer(reporter)  

//添加grpc请求的before after finalizer 事件对应要处理的trace操作方法
zkServerTrace := zipkin.GRPCServerTrace(zkTracer)  

//通过options的方式运行trace
bookListHandler := grpctransport.NewServer(  
   bookListEndPoint,  
   decodeRequest,  
   encodeResponse,  
   zkServerTrace,  
)

完整代码

我们还在之前的代码中加入trace的代码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package main  
  
import (  
   "grpc-test/pb"  
   "context"  
   grpctransport 
   "github.com/go-kit/kit/transport/grpc"  
   "github.com/go-kit/kit/endpoint" 
   "google.golang.org/grpc" 
   "net" 
   "github.com/go-kit/kit/sd/etcdv3" 
   "github.com/go-kit/kit/log" 
   "time" 
   "golang.org/x/time/rate" 
   "github.com/go-kit/kit/ratelimit"  
   opzipkin "github.com/openzipkin/zipkin-go"  
   "github.com/openzipkin/zipkin-go/reporter/http" 
   "github.com/go-kit/kit/tracing/zipkin" 
   "math/rand"
)  
  
type BookServer struct {  
   bookListHandler  grpctransport.Handler  
   bookInfoHandler  grpctransport.Handler  
}  
  
//通过grpc调用GetBookInfo时,GetBookInfo只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理  
func (s *BookServer) GetBookInfo(ctx context.Context, in *book.BookInfoParams) (*book.BookInfo, error) {  
   _, rsp, err := s.bookInfoHandler.ServeGRPC(ctx, in)  
   if err != nil {  
      return nil, err  
   }  
   return rsp.(*book.BookInfo),err  
}  
  
//通过grpc调用GetBookList时,GetBookList只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理  
func (s *BookServer) GetBookList(ctx context.Context, in *book.BookListParams) (*book.BookList, error) {  
   _, rsp, err := s.bookListHandler.ServeGRPC(ctx, in)  
   if err != nil {  
      return nil, err  
   }  
   return rsp.(*book.BookList),err  
}  
  
//创建bookList的EndPoint  
func makeGetBookListEndpoint() endpoint.Endpoint {  
   return func(ctx context.Context, request interface{}) (interface{}, error) {  
      rand.Seed(time.Now().Unix())  
      randInt := rand.Int63n(200)  
      time.Sleep( time.Duration(randInt) * time.Millisecond)  
      //请求列表时返回 书籍列表  
      bl := new(book.BookList)  
      bl.BookList = append(bl.BookList, &book.BookInfo{BookId:1,BookName:"21天精通php"})  
      bl.BookList = append(bl.BookList, &book.BookInfo{BookId:2,BookName:"21天精通java"})  
      return bl,nil  
   }  
}  
  
//创建bookInfo的EndPoint  
func makeGetBookInfoEndpoint() endpoint.Endpoint {  
   return func(ctx context.Context, request interface{}) (interface{}, error) {  
      rand.Seed(time.Now().Unix())  
      randInt := rand.Int63n(200)  
      time.Sleep( time.Duration(randInt) * time.Microsecond)  
      //请求详情时返回 书籍信息  
      req := request.(*book.BookInfoParams)  
      b := new(book.BookInfo)  
      b.BookId = req.BookId  
      b.BookName = "21天精通php"  
      return b,nil  
   }  
}  
  
func decodeRequest(_ context.Context, req interface{}) (interface{}, error) {  
   return req, nil  
}  
  
func encodeResponse(_ context.Context, rsp interface{}) (interface{}, error) {  
   return rsp, nil  
}  
  
func main() {  
  
   var (  
      //etcd服务地址  
      etcdServer = "127.0.0.1:2379"  
      //服务的信息目录  
      prefix     = "/services/book/"  
      //当前启动服务实例的地址  
      instance   = "127.0.0.1:50051"  
      //服务实例注册的路径  
      key        = prefix + instance  
      //服务实例注册的val  
      value      = instance  
      ctx        = context.Background()  
      //服务监听地址  
      serviceAddress = ":50051"  
   )  
  
   //etcd的连接参数  
   options := etcdv3.ClientOptions{  
      DialTimeout: time.Second * 3,  
      DialKeepAlive: time.Second * 3,  
   }  
   //创建etcd连接  
   client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options)  
   if err != nil {  
      panic(err)  
   }  
  
   // 创建注册器  
   registrar := etcdv3.NewRegistrar(client, etcdv3.Service{  
      Key:   key,  
      Value: value,  
   }, log.NewNopLogger())  
  
   // 注册器启动注册  
   registrar.Register()  
  
   reporter := http.NewReporter("http://localhost:9411/api/v2/spans")  
   defer reporter.Close()  
   zkTracer, err := opzipkin.NewTracer(reporter)  
   zkServerTrace := zipkin.GRPCServerTrace(zkTracer)  
   bookServer := new(BookServer)  
   bookListEndPoint := makeGetBookListEndpoint()  
   //创建限流器 1r/s  limiter := rate.NewLimiter(rate.Every(time.Second * 1), 100000)  
   //通过DelayingLimiter中间件,在bookListEndPoint的外层再包裹一层限流的endPoint  
   bookListEndPoint = ratelimit.NewDelayingLimiter(limiter)(bookListEndPoint)  
  
   bookListHandler := grpctransport.NewServer(  
      bookListEndPoint,  
      decodeRequest,  
      encodeResponse,  
      zkServerTrace,  
   )  
   bookServer.bookListHandler = bookListHandler  
  
  
   bookInfoEndPoint := makeGetBookInfoEndpoint()  
   //通过DelayingLimiter中间件,在bookListEndPoint的外层再包裹一层限流的endPoint  
   bookInfoEndPoint = ratelimit.NewDelayingLimiter(limiter)(bookInfoEndPoint)  
   bookInfoHandler := grpctransport.NewServer(  
      bookInfoEndPoint,  
      decodeRequest,  
      encodeResponse,  
      zkServerTrace,  
   )  
   bookServer.bookInfoHandler = bookInfoHandler  
  
   ls, _ := net.Listen("tcp", serviceAddress)  
   gs := grpc.NewServer(grpc.UnaryInterceptor(grpctransport.Interceptor))  
   book.RegisterBookServiceServer(gs, bookServer)  
   gs.Serve(ls)  
}

客户端trace

增加trace代码

与服务端trace的区别在于kitzipkin.GRPCClientTrace

1
2
3
4
5
reporter := http.NewReporter("http://localhost:9411/api/v2/spans")  
defer reporter.Close()  
  
zkTracer, err := opzipkin.NewTracer(reporter)  
zkClientTrace := zipkin.GRPCClientTrace(zkTracer)

可以通过span组装span结构树

1
2
3
parentSpan := zkTracer.StartSpan("bookCaller")  
defer parentSpan.Flush()  
ctx = opzipkin.NewContext(context.Background(), parentSpan)

完整代码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package main  
  
import (  
   "context"  
   "github.com/go-kit/kit/sd/etcdv3" 
   "time" 
   "github.com/go-kit/kit/sd" 
   "github.com/go-kit/kit/log" 
   "github.com/go-kit/kit/endpoint" 
   "io" 
   "github.com/go-kit/kit/sd/lb" 
   "fmt" 
   "google.golang.org/grpc" 
   "github.com/afex/hystrix-go/hystrix" 
   "github.com/go-kit/kit/circuitbreaker"  
   opzipkin "github.com/openzipkin/zipkin-go"  
   "github.com/go-kit/kit/tracing/zipkin"  
   grpctransport "github.com/go-kit/kit/transport/grpc"  
   "grpc-test/pb" 
   "github.com/openzipkin/zipkin-go/reporter/http"
)  
  
func main() {  
   commandName := "my-endpoint"  
   hystrix.ConfigureCommand(commandName, hystrix.CommandConfig{  
      Timeout: 1000 * 30,  
      ErrorPercentThreshold: 1,  
      SleepWindow: 10000,  
      MaxConcurrentRequests: 1000,  
      RequestVolumeThreshold: 5,  
   })  
   breakerMw := circuitbreaker.Hystrix(commandName)  
  
  
   var (  
      //注册中心地址  
      etcdServer = "127.0.0.1:2379"  
      //监听的服务前缀  
      prefix     = "/services/book/"  
      ctx        = context.Background()  
   )  
   options := etcdv3.ClientOptions{  
      DialTimeout: time.Second * 3,  
      DialKeepAlive: time.Second * 3,  
   }  
   //连接注册中心  
   client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options)  
   if err != nil {  
      panic(err)  
   }  
   logger := log.NewNopLogger()  
   //创建实例管理器, 此管理器会Watch监听etc中prefix的目录变化更新缓存的服务实例数据  
   instancer, err := etcdv3.NewInstancer(client, prefix, logger)  
   if err != nil {  
      panic(err)  
   }  
   //创建端点管理器, 此管理器根据Factory和监听的到实例创建endPoint并订阅instancer的变化动态更新Factory创建的endPoint  
   endpointer := sd.NewEndpointer(instancer, reqFactory, logger)  
   //创建负载均衡器  
   balancer := lb.NewRoundRobin(endpointer)  
  
   /**  
   我们可以通过负载均衡器直接获取请求的endPoint,发起请求  
   reqEndPoint,_ := balancer.Endpoint() 
   */  
   /**  
   也可以通过retry定义尝试次数进行请求  
   */  
   reqEndPoint := lb.Retry(3, 100*time.Second, balancer)  
  
   //增加熔断中间件  
   reqEndPoint = breakerMw(reqEndPoint)  
   //现在我们可以通过 endPoint 发起请求了  
  
   req := struct{}{}  
   for i := 1; i <= 1; i++ {  
      if _, err = reqEndPoint(ctx, req); err != nil {  
         fmt.Println(err)  
      }  
   }  
}  
  
  
//通过传入的 实例地址  创建对应的请求endPoint  
func reqFactory(instanceAddr string) (endpoint.Endpoint, io.Closer, error) {  
   return func(ctx context.Context, request interface{}) (interface{}, error) {  
      fmt.Println("请求服务: ", instanceAddr, "当前时间: ", time.Now().Format("2006-01-02 15:04:05.99"))  
      conn, err := grpc.Dial(instanceAddr, grpc.WithInsecure())  
      if err != nil {  
         fmt.Println(err)  
         panic("connect error")  
      }  
  
      reporter := http.NewReporter("http://localhost:9411/api/v2/spans")  
      defer reporter.Close()  
  
      zkTracer, err := opzipkin.NewTracer(reporter)  
      zkClientTrace := zipkin.GRPCClientTrace(zkTracer)  
  
      bookInfoRequest := grpctransport.NewClient(  
         conn,  
         "BookService",  
         "GetBookInfo",  
         func(_ context.Context, in interface{}) (interface{}, error) { return nil, nil },  
         func(_ context.Context, out interface{}) (interface{}, error) {  
            return out, nil  
         },  
         book.BookInfo{},  
         zkClientTrace,  
      ).Endpoint()  
  
      bookListRequest := grpctransport.NewClient(  
         conn,  
         "BookService",  
         "GetBookList",  
         func(_ context.Context, in interface{}) (interface{}, error) { return nil, nil },  
         func(_ context.Context, out interface{}) (interface{}, error) {  
            return out, nil  
         },  
         book.BookList{},  
         zkClientTrace,  
      ).Endpoint()  
  
      parentSpan := zkTracer.StartSpan("bookCaller")  
      defer parentSpan.Flush()  
  
      ctx = opzipkin.NewContext(ctx, parentSpan)  
      infoRet ,_:= bookInfoRequest(ctx, request)  
      bi := infoRet.(*book.BookInfo)  
      fmt.Println("获取书籍详情")  
      fmt.Println("bookId: 1", " => ", "bookName:", bi.BookName)  
  
      listRet,_ := bookListRequest(ctx, request)  
      bl := listRet.(*book.BookList)  
      fmt.Println("获取书籍列表")  
      for _,b := range bl.BookList {  
         fmt.Println("bookId:", b.BookId, " => ", "bookName:", b.BookName)  
      }  
  
      return nil,nil  
   },nil,nil  
}