server

服务注册:

1、连接注册中心

2、注册当前服务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
	"context"
	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/sd"
	"github.com/go-kit/kit/sd/etcdv3"

	"time"
)

func Register() (registar sd.Registrar) {
	var (
		//etcd服务地址
		etcdServer = "127.0.0.1:2379"
		//服务的信息目录
		prefix = "/services/arithmetic/"
		//当前启动服务实例的地址
		instance = "127.0.0.1:9000"
		//服务实例注册的路径
		key = prefix + instance
		//服务实例注册的val
		value = instance
		ctx   = context.Background()
	)

	//etcd的连接参数
	options := etcdv3.ClientOptions{
		DialTimeout:   time.Second * 3,
		DialKeepAlive: time.Second * 3,
	}
	//创建etcd连接
	client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options)
	if err != nil {
		panic(err)
	}

	// 创建注册器
	registrar := etcdv3.NewRegistrar(client, etcdv3.Service{
		Key:   key,
		Value: value,
	}, log.NewNopLogger())

	// 注册器启动注册
	registrar.Register()
	return
}

client

client要完成的工作:

  1. 在endpoint中查询已经在etcd中注册的服务实例
  2. 选择合适的服务实例向其发起Add操作,获取返回值

查阅go-kit源码可知,kit/sd/Endpointer提供了一套服务发现机制,其定义和创建接口如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// Endpointer listens to a service discovery system and yields a set of
// identical endpoints on demand. An error indicates a problem with connectivity
// to the service discovery system, or within the system itself; an Endpointer
// may yield no endpoints without error.
type Endpointer interface {
	Endpoints() ([]endpoint.Endpoint, error)
}

// NewEndpointer creates an Endpointer that subscribes to updates from Instancer src
// and uses factory f to create Endpoints. If src notifies of an error, the Endpointer
// keeps returning previously created Endpoints assuming they are still good, unless
// this behavior is disabled via InvalidateOnError option.
func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) *DefaultEndpointer

通过代码注释我们可以知道: Endpointer通过监听服务发现系统的事件信息,并且通过factory按需创建服务终结点(Endpoint)。

所以,我们需要通过Endpointer来实现服务发现功能。在微服务模式下,同一个服务可能存在多个实例,所以需要通过负载均衡机制完成实例选择,这里使用go-kit工具集中的kit/sd/lb组件(该组件实现RoundRibbon,并具备Retry功能)。

创建endpoint

创建go文件discover/enpoints.go。根据上述分析,在该endpoint实现对服务发现系统的监听,实现实例选择,最终返回可执行的endpoint.Endpoint。下面根据代码注释说明实现过程:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
	"context"
	"github.com/go-kit/kit/endpoint"
	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/sd"
	"github.com/go-kit/kit/sd/etcdv3"
	"github.com/go-kit/kit/sd/lb"
	"time"
)

// MakeDiscoverEndpoint 使用etcd.Client创建服务发现Endpoint
// 为了方便这里默认了一些参数
func MakeDiscoverEndpoint(ctx context.Context, factory sd.Factory,logger log.Logger) endpoint.Endpoint {
	var (
		//监听的服务前缀
		prefix = "/services/arithmetic/"
		//超时重试时间
		duration = 500 * time.Millisecond
		//注册中心地址
		etcdServer = "127.0.0.1:2379"
	)
	options := etcdv3.ClientOptions{
		DialTimeout:   time.Second * 3,
		DialKeepAlive: time.Second * 3,
	}
	//连接注册中心
	client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options)
	if err != nil {
		panic(err)
	}
	//创建实例管理器, 此管理器会Watch监听etc中prefix的目录变化更新缓存的服务实例数据
	instancer, err := etcdv3.NewInstancer(client, prefix, logger)
	if err != nil {
		panic(err)
	}

	//使用etcd连接实例(发现服务系统)、factory创建sd.Factory
	endpointer := sd.NewEndpointer(instancer, factory, logger)

	//创建RoundRibbon负载均衡器
	balancer := lb.NewRoundRobin(endpointer)

	//为负载均衡器增加重试功能,同时该对象为endpoint.Endpoint
	retry := lb.Retry(1, duration, balancer)

	return retry
}

创建factory

在discover目录中创建go文件factory.go,实现sd.Factory的逻辑,即把服务实例转换为endpoint,在该endpoint中实现对于目标服务的调用过程。

go-kit包中存在kithttp.Client结构体,可以根据enc,dec,method,path生成endpoint.Endpoint

这里直接针对算术运算服务进行封装,代码如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
	"context"
	"encoding/json"
	"errors"
	"github.com/go-kit/kit/endpoint"
	"github.com/go-kit/kit/sd"
	kithttp "github.com/go-kit/kit/transport/http"
	"io"
	"net/http"
	"net/url"
	"strconv"
	"strings"
)

//处理instance,合并path,转化为tgt
//将enc,dec,tgt为endpoint.Endpoint
func SumFactory(_ context.Context, method, path string) sd.Factory {
	return func(instance string) (endpoint endpoint.Endpoint, closer io.Closer, err error) {
		if !strings.HasPrefix(instance, "http") {
			instance = "http://" + instance
		}
		tgt, err := url.Parse(instance)
		if err != nil {
			return nil, nil, err
		}
		//path也可以放在enc中做
		tgt.Path = path
		var (
			enc kithttp.EncodeRequestFunc
			dec kithttp.DecodeResponseFunc
		)
		enc, dec = EecodeHTTPSumRequest, DecodeHTTPSumResponse
		//合并path,enc,dec,tgt为endpoint.Endpoint
		return kithttp.NewClient(method, tgt, enc, dec).Endpoint(), nil, nil
	}
}

NewRoundRobin

sd.NewEndpointer返回的endpointer可以是endpoint.Endpoint切片,因为可能发现多个注册同一prefix的服务,instancer可能有多个,通过sd.NewEndpointer返回的Endpoint也可能有多个,若想不通过负载均衡,直接返回第一个实例,可以编写如下代码:

1
2
3
endpoints, _ := endpointer.Endpoints()
fmt.Println("服务有", len(endpoints), "条")
endpoint := endpoints[0]

如果请求多个服务实例,可以遍历endpoints来执行.

NewRoundRobin是轮询算法实现的负载均衡器:负载均衡后,可以调用balancer.Endpoint()来生成endpoint.Endpoint

1
2
3
//创建RoundRibbon负载均衡器
balancer := lb.NewRoundRobin(endpointer)
balancer.Endpoint()

Gokit中还有随机算法实现的负载均衡器,需要传入随机数seeds,建议采用纳秒形式,随机性更强:

1
2
balancer := lb.NewRandom(endpointer,time.Now().UnixNano())
balancer.Endpoint()

一般采用轮询来负载均衡,请求分摊更加平均.

main.go

在main.go中创建etcd连接,执行endpoint函数并输出返回值:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
	"context"
	"time"

	"fmt"
	"github.com/go-kit/kit/log"

	"os"
)

func main() {
	//创建日志组件
	var logger log.Logger
	{
		logger = log.NewLogfmtLogger(os.Stderr)
		logger = log.With(logger, "ts", log.DefaultTimestampUTC)
		logger = log.With(logger, "caller", log.DefaultCaller)
	}
	ctx := context.Background()
	//创建Endpoint
	//针对calculate接口创建sd.Factory
	factory := SumFactory(ctx, "GET", "sum")
	SumDiscoverEndpoint := MakeDiscoverEndpoint(ctx, factory ,logger)
	req := SumRequest{
		RequestType: "Add",
		A:           1,
		B:           2,
	}
   //执行endpoint
	res, err := SumDiscoverEndpoint(context.Background(), req)
	fmt.Println(res, err)
}

源码分析

etcdv3目录结构

1
2
3
4
5
6
7
8
├── client.go 客户端
├── doc.go
├── example_test.go
├── instancer.go 服务实例
├── instancer_test.go
├── integration_test.go
├── registrar.go 注册器
└── registrar_test.go

目录中主要的是这三个文件,client.go instancer.go registrar.go

client.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
type Client interface {
    //获取一组value通过key前缀
    GetEntries(prefix string) ([]string, error)
    //watch指定前缀的key
    WatchPrefix(prefix string, ch chan struct{})
    //注册服务
    Register(s Service) error
    //注销服务
    Deregister(s Service) error
    //etcd
    LeaseID() int64
}

type client struct {
    //etcd客户端使用v3版本api
    cli *clientv3.Client
    ctx context.Context
    //etcd key/value 操作实例
    kv clientv3.KV
    // etcd watcher 操作实例
    watcher clientv3.Watcher
    // watcher context
    wctx context.Context
    // watcher cancel func
    wcf context.CancelFunc
    // leaseID will be 0 (clientv3.NoLease) if a lease was not created
    leaseID clientv3.LeaseID

    //etcdKeepAlive实现心跳检测
    hbch <-chan *clientv3.LeaseKeepAliveResponse
    // etcd Lease 操作实例
    leaser clientv3.Lease
}
func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error)
func (c *client) GetEntries(key string) ([]string, error)
func (c *client) WatchPrefix(prefix string, ch chan struct{})
func (c *client) Register(s Service) error
func (c *client) Deregister(s Service) error

主要包含以下6个函数

  • NewClient 创建etcd客户端,赋值给 client.cli
  • GetEntries 通过 client.kv 获取value
  • WatchPrefix 通过 client.watcher 监听key
  • Deregister 通过 client.cli 服务绑定的key
  • LeaseID return client.leaseID
  • Register
    • 初始化 client.leaser
    • 初始化 client.watcher
    • 初始化 client.kv
    • 通过 client.kv 操作写入etcd,服务注册的key和value
    • 创建 client.leaseID,默认心跳3秒,lease TTL9秒
    • client.leaser调用KeepAlive

registrar.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type Registrar struct {
    //etcd客户端
    client  Client
    //注册的服务
    service Service
    logger  log.Logger

    //服务Deregister并发锁
    quitmtx sync.Mutex
    //服务退出通道
    quit    chan struct{}
}

//服务的key和地址
type Service struct {
    Key   string // unique key, e.g. "/service/foobar/1.2.3.4:8080"
    Value string // returned to subscribers, e.g. "http://1.2.3.4:8080"
    TTL   *TTLOption
}

//服务心跳检测
type TTLOption struct {
    heartbeat time.Duration // e.g. time.Second * 3
    ttl       time.Duration // e.g. time.Second * 10
}

func NewTTLOption(heartbeat, ttl time.Duration) *TTLOption
func NewRegistrar(client Client, service Service, logger log.Logger) *Registrar
func (r *Registrar) Register()
func (r *Registrar) Deregister()

主要包含以下4个函数

  • NewTTLOption 心跳检测参数
  • NewRegistrar 创建 Registrar
  • Register 调用 client.go中 Register 方法
  • Deregister 调用 client.go中 Deregister 方法

instancer.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
type Instancer struct {
  //实例缓存
    cache  *instance.Cache
    //etcd客户端
    client Client
    //实例前缀
    prefix string
    logger log.Logger
    //Instancer 主动退出 通道
    quitc  chan struct{}
}

func NewInstancer(c Client, prefix string, logger log.Logger) (*Instancer, error)
func (s *Instancer) loop()
func (s *Instancer) Stop()
func (s *Instancer) Register(ch chan<- sd.Event)
func (s *Instancer) Deregister(ch chan<- sd.Event)

主要包含以下5个函数

  • NewInstancer
    • 调用 client.go GetEntries函数,获取对应的一组服务地址
    • 查询到的服务地址写入缓存 Instancer.cache
  • loop 监听服务对应的key
  • Stop 关闭服务监听
  • Register
  • Deregister

参考:

http://tongzhao.red/15612106758500.html

https://juejin.im/post/5c740a335188257c1e2c86a7