基本原理

gRPC开源组件官方并未直接提供服务注册与发现的功能实现,但其设计文档已提供实现的思路,并在不同语言的gRPC代码API中已提供了命名解析和负载均衡接口供扩展。

其基本实现原理:

  1. 服务启动后gRPC客户端向命名服务器发出名称解析请求,名称将解析为一个或多个IP地址,每个IP地址标示它是服务器地址还是负载均衡器地址,以及标示要使用那个客户端负载均衡策略或服务配置。
  2. 客户端实例化负载均衡策略,如果解析返回的地址是负载均衡器地址,则客户端将使用grpclb策略,否则客户端使用服务配置请求的负载均衡策略。
  3. 负载均衡策略为每个服务器地址创建一个子通道(channel)。
  4. 当有rpc请求时,负载均衡策略决定那个子通道即grpc服务器将接收请求,当可用服务器为空时客户端的请求将被阻塞。

根据gRPC官方提供的设计思路,基于进程内LB方案(即第2个案,阿里开源的服务框架 Dubbo 也是采用类似机制),结合分布式一致的组件(如Zookeeper、Consul、Etcd),可找到gRPC服务发现和负载均衡的可行解决方案。接下来以GO语言为例,简单介绍下基于Etcd3的关键代码实现:

服务发现

gRPC已提供了简单的负载均衡策略(如:Round Robin),我们只需实现它提供的Builder和Resolver接口,就能完成gRPC客户端负载均衡。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
	// Build creates a new resolver for the given target.
	//
	// gRPC dial calls Build synchronously, and fails if the returned error is
	// not nil.
	Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
	// Scheme returns the scheme supported by this resolver.
	// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
	Scheme() string
}
  • Builder接口:创建一个resolver(本文称之服务发现),用于监视名称解析更新。
  • Build方法:为给定目标创建一个新的resolver,当调用grpc.Dial()时执行。
  • Scheme方法:返回此resolver支持的方案,Scheme定义可参考:https://github.com/grpc/grpc/blob/master/doc/naming.md

Build方法的第一个参数target的类型为Target定义如下,创建ClientConn调用grpc.DialContext的第二个参数target经过解析后需要符合这个结构定义,target定义格式为: scheme://authority/endpoint_name

1
2
3
4
5
type Target struct {
	Scheme    string // 表示要使用的名称系统
	Authority string // 表示一些特定于方案的引导信息
	Endpoint  string // 指出一个具体的名字
}

Build方法返回的Resolver也是一个接口类型。定义如下

1
2
3
4
type Resolver interface {
	ResolveNow(ResolveNowOptions)
	Close()
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
	// ResolveNow will be called by gRPC to try to resolve the target name
	// again. It's just a hint, resolver can ignore this if it's not necessary.
	//
	// It could be called multiple times concurrently.
	ResolveNow(ResolveNowOptions)
	// Close closes the resolver.
	Close()
}
  • Resolver接口:监视指定目标的更新,包括地址更新和服务配置更新。
  • ResolveNow方法:被 gRPC 调用,以尝试再次解析目标名称。只用于提示,可忽略该方法。
  • Close方法:关闭resolver

根据以上两个接口,我们把服务发现的功能写在Build方法中,把获取到的负载均衡服务地址返回到客户端,并监视服务更新情况,以修改客户端连接。

实现resolver

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
package etcdv3

import (
	"context"
	"log"
	"sync"
	"time"

	"github.com/coreos/etcd/mvcc/mvccpb"
	"go.etcd.io/etcd/clientv3"
	"google.golang.org/grpc/resolver"
)

const schema = "grpclb"

//ServiceDiscovery 服务发现
type ServiceDiscovery struct {
	cli        *clientv3.Client //etcd client
	cc         resolver.ClientConn
	serverList sync.Map //服务列表
}

//NewServiceDiscovery  新建发现服务
func NewServiceDiscovery(endpoints []string) resolver.Builder {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}

	return &ServiceDiscovery{
		cli: cli,
	}
}

//Build 为给定目标创建一个新的`resolver`,当调用`grpc.Dial()`时执行
func (s *ServiceDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
	log.Println("Build")
	s.cc = cc
	prefix := "/" + target.Scheme + "/" + target.Endpoint + "/"
	//根据前缀获取现有的key
	resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
	if err != nil {
		return nil, err
	}

	for _, ev := range resp.Kvs {
		s.SetServiceList(string(ev.Key), string(ev.Value))
	}
	s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
	//监视前缀,修改变更的server
	go s.watcher(prefix)
	return s, nil
}

// ResolveNow 监视目标更新
func (s *ServiceDiscovery) ResolveNow(rn resolver.ResolveNowOption) {
	log.Println("ResolveNow")
}

//Scheme return schema
func (s *ServiceDiscovery) Scheme() string {
	return schema
}

//Close 关闭
func (s *ServiceDiscovery) Close() {
	log.Println("Close")
	s.cli.Close()
}

//watcher 监听前缀
func (s *ServiceDiscovery) watcher(prefix string) {
	rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
	log.Printf("watching prefix:%s now...", prefix)
	for wresp := range rch {
		for _, ev := range wresp.Events {
			switch ev.Type {
			case mvccpb.PUT: //新增或修改
				s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
			case mvccpb.DELETE: //删除
				s.DelServiceList(string(ev.Kv.Key))
			}
		}
	}
}

//SetServiceList 新增服务地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
	s.serverList.Store(key, resolver.Address{Addr: val})
	s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
	log.Println("put key :", key, "val:", val)
}

//DelServiceList 删除服务地址
func (s *ServiceDiscovery) DelServiceList(key string) {
	s.serverList.Delete(key)
	s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
	log.Println("del key:", key)
}

//GetServices 获取服务地址
func (s *ServiceDiscovery) getServices() []resolver.Address {
	addrs := make([]resolver.Address, 0, 10)
	s.serverList.Range(func(k, v interface{}) bool {
		addrs = append(addrs, v.(resolver.Address))
		return true
	})
	return addrs
}

代码主要修改以下地方:

  1. 把获取的服务地址转成resolver.Address,供gRPC客户端连接。
  2. 根据schema的定义规则,修改key格式。

实现register

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package etcdv3

import (
	"context"
	"log"
	"time"

	"go.etcd.io/etcd/clientv3"
)

//ServiceRegister 创建租约注册服务
type ServiceRegister struct {
	cli     *clientv3.Client //etcd client
	leaseID clientv3.LeaseID //租约ID
	//租约keepalieve相应chan
	keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
	key           string //key
	val           string //value
}

//NewServiceRegister 新建注册服务
func NewServiceRegister(endpoints []string, serName, addr string, lease int64) (*ServiceRegister, error) {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}

	ser := &ServiceRegister{
		cli: cli,
		key: "/" + schema + "/" + serName + "/" + addr,
		val: addr,
	}

	//申请租约设置时间keepalive
	if err := ser.putKeyWithLease(lease); err != nil {
		return nil, err
	}

	return ser, nil
}

//设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
	//设置租约时间
	resp, err := s.cli.Grant(context.Background(), lease)
	if err != nil {
		return err
	}
	//注册服务并绑定租约
	_, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
	if err != nil {
		return err
	}
	//设置续租 定期发送需求请求
	leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)

	if err != nil {
		return err
	}
	s.leaseID = resp.ID
	s.keepAliveChan = leaseRespChan
	log.Printf("Put key:%s  val:%s  success!", s.key, s.val)
	return nil
}

//ListenLeaseRespChan 监听 续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
	for leaseKeepResp := range s.keepAliveChan {
		log.Println("续约成功", leaseKeepResp)
	}
	log.Println("关闭续租")
}

// Close 注销服务
func (s *ServiceRegister) Close() error {
	//撤销租约
	if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
		return err
	}
	log.Println("撤销租约")
	return s.cli.Close()
}

实现代码

客户端修改gRPC连接服务的部分代码即可:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func main() {
    r := etcdv3.NewServiceDiscovery(EtcdEndpoints)
    resolver.Register(r)
    // 连接服务器
    conn, err := grpc.Dial(r.Scheme()+"://8.8.8.8/simple_grpc", grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`), grpc.WithInsecure())
    if err != nil {
        log.Fatalf("net.Connect err: %v", err)
    }
    defer conn.Close()

    // 建立gRPC连接
    grpcClient = pb.NewSimpleClient(conn)

gRPC内置了简单的负载均衡策略round_robin,根据负载均衡地址,以轮询的方式进行调用服务。

服务端启动时,把服务地址注册到etcd中即可:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func main() {
    // 监听本地端口
    listener, err := net.Listen(Network, Address)
    if err != nil {
        log.Fatalf("net.Listen err: %v", err)
    }
    log.Println(Address + " net.Listing...")
    // 新建gRPC服务器实例
    grpcServer := grpc.NewServer()
    // 在gRPC服务器注册我们的服务
    pb.RegisterSimpleServer(grpcServer, &SimpleService{})
    //把服务注册到etcd
    ser, err := etcdv3.NewServiceRegister(EtcdEndpoints, SerName, Address, 5)
    if err != nil {
        log.Fatalf("register service err: %v", err)
    }
    defer ser.Close()
    //用服务器 Serve() 方法以及我们的端口信息区实现阻塞等待,直到进程被杀死或者 Stop() 被调用
    err = grpcServer.Serve(listener)
    if err != nil {
        log.Fatalf("grpcServer.Serve err: %v", err)
    }
}

负载均衡

官方只提供了pick_first和round_robin两种负载均衡策略,轮询法round_robin不能满足因服务器配置不同而承担不同负载量,这篇文章将介绍如何实现自定义负载均衡策略–加权随机法。

加权随机法可以根据服务器的处理能力而分配不同的权重,从而实现处理能力高的服务器可承担更多的请求,处理能力低的服务器少承担请求。

Banlancer

Banlancer Builder 接口定义如下:

1
2
3
4
5
6
7
8
// Builder creates a balancer.
type Builder interface {
    // Build creates a new balancer with the ClientConn.
    Build(cc ClientConn, opts BuildOptions) Balancer
    // Name returns the name of balancers built by this builder.
    // It will be used to pick balancers (for example in service config).
    Name() string
}

Banlancer 接口定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Balancer takes input from gRPC, manages SubConns, and collects and aggregates
// the connectivity states.
//
// It also generates and updates the Picker used by gRPC to pick SubConns for RPCs.
//
// HandleSubConnectionStateChange, HandleResolvedAddrs and Close are guaranteed
// to be called synchronously from the same goroutine.
// There's no guarantee on picker.Pick, it may be called anytime.
type Balancer interface {
    // HandleSubConnStateChange is called by gRPC when the connectivity state
    // of sc has changed.
    // Balancer is expected to aggregate all the state of SubConn and report
    // that back to gRPC.
    // Balancer should also generate and update Pickers when its internal state has
    // been changed by the new state.
    HandleSubConnStateChange(sc SubConn, state connectivity.State)
    // HandleResolvedAddrs is called by gRPC to send updated resolved addresses to
    // balancers.
    // Balancer can create new SubConn or remove SubConn with the addresses.
    // An empty address slice and a non-nil error will be passed if the resolver returns
    // non-nil error to gRPC.
    HandleResolvedAddrs([]resolver.Address, error)
    // Close closes the balancer. The balancer is not required to call
    // ClientConn.RemoveSubConn for its existing SubConns.
    Close()
}

banlancer/base 中实现了大多数 banlancer 功能,不同选择算法的实现,基于 base 实现 pickBuilder 与 picker 接口即可。

base.NewBalancerBuilder 函数定义如下:

1
2
3
4
5
6
7
8
9
// Base 实现的函数
// NewBalancerBuilder returns a balancer builder. The balancers
// built by this builder will use the picker builder to build pickers.
func NewBalancerBuilder(name string, pb PickerBuilder) balancer.Builder {
   return &baseBuilder{
      name:          name,
      pickerBuilder: pb,
   }
}

RR Banlancer 的实现基于 balancer/base 基础实现,核心功能主体在 balancer/base 中实现,而 RR Banlancer 基于 base.NewBalancerBuilder 实现了 balancer.Builder 接口,可以用于注册。

PickerBuilder & Picker

gRPC提供了PickerBuilder和Picker接口让我们实现自己的负载均衡策略。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// PickerBuildInfo contains information needed by the picker builder to
// construct a picker.
type PickerBuildInfo struct {
	// ReadySCs is a map from all ready SubConns to the Addresses used to
	// create them.
	ReadySCs map[balancer.SubConn]SubConnInfo
}
// PickerBuilder creates balancer.Picker.
type PickerBuilder interface {
	// Build returns a picker that will be used by gRPC to pick a SubConn.
	Build(info PickerBuildInfo) balancer.Picker
}
  • PickerBuilder接口:创建子连接选择器。
  • Build方法:返回一个选择器,将用于gRPC选择子连接。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// PickInfo contains additional information for the Pick operation.
type PickInfo struct {
	// FullMethodName is the method name that NewClientStream() is called
	// with. The canonical format is /service/Method.
	FullMethodName string
	// Ctx is the RPC's context, and may contain relevant RPC-level information
	// like the outgoing header metadata.
	Ctx context.Context
}
// PickResult contains information related to a connection chosen for an RPC.
type PickResult struct {
	// SubConn is the connection to use for this pick, if its state is Ready.
	// If the state is not Ready, gRPC will block the RPC until a new Picker is
	// provided by the balancer (using ClientConn.UpdateState).  The SubConn
	// must be one returned by ClientConn.NewSubConn.
	SubConn SubConn

	// Done is called when the RPC is completed.  If the SubConn is not ready,
	// this will be called with a nil parameter.  If the SubConn is not a valid
	// type, Done may not be called.  May be nil if the balancer does not wish
	// to be notified when the RPC completes.
	Done func(DoneInfo)
}
// Picker is used by gRPC to pick a SubConn to send an RPC.
// Balancer is expected to generate a new picker from its snapshot every time its
// internal state has changed.
//
// The pickers used by gRPC can be updated by ClientConn.UpdateState().
type Picker interface {
	// Pick returns the connection to use for this RPC and related information.
	//
	// Pick should not block.  If the balancer needs to do I/O or any blocking
	// or time-consuming work to service this call, it should return
	// ErrNoSubConnAvailable, and the Pick call will be repeated by gRPC when
	// the Picker is updated (using ClientConn.UpdateState).
	//
	// If an error is returned:
	//
	// - If the error is ErrNoSubConnAvailable, gRPC will block until a new
	//   Picker is provided by the balancer (using ClientConn.UpdateState).
	//
	// - If the error is a status error (implemented by the grpc/status
	//   package), gRPC will terminate the RPC with the code and message
	//   provided.
	//
	// - For all other errors, wait for ready RPCs will wait, but non-wait for
	//   ready RPCs will be terminated with this error's Error() string and
	//   status code Unavailable.
	Pick(info PickInfo) (PickResult, error)
}
  • Picker接口:用于gRPC选择子连接去发送请求。
  • Pick方法:子连接选择

实现RoundRobin

RR Banlancer 的实现基于 balancer/base 基础实现,核心功能主体在 balancer/base 中实现,而 RR Banlancer 基于 base.NewBalancerBuilder 实现了 balancer.Builder 接口,可以用于注册。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// Name is the name of round_robin balancer.
const Name = "round_robin"

// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
    return base.NewBalancerBuilder(Name, &rrPickerBuilder{})
}

func init() {
    balancer.Register(newBuilder())
}

RoundRobin Banlancer Builder 实现了 PickerBuilder 的接口:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (*rrPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
    grpclog.Infof("roundrobinPicker: newPicker called with readySCs: %v", readySCs)
    // 将 map 接口转化成 slice[] 结构,并使用期构造 rrPicker 并返回
    var scs []balancer.SubConn
    for _, sc := range readySCs {
        scs = append(scs, sc)
    }
    return &rrPicker{
        subConns: scs,
    }
}

banlancer.Picker 由 rrPicker 来实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
type rrPicker struct {
    // subConns is the snapshot of the roundrobin balancer when this picker was
    // created. The slice is immutable. Each Get() will do a round robin
    // selection from it and return the selected SubConn.
    subConns []balancer.SubConn

    mu   sync.Mutex
    // 用于记录下一个位移量
    next int
}

func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
	p.mu.Lock()
	sc := p.subConns[p.next]
	p.next = (p.next + 1) % len(p.subConns)
	p.mu.Unlock()
	return balancer.PickResult{SubConn: sc}, nil
}

实现WeightPicker

问题来了,我们需要把服务器地址的权重添加进去,但是地址resolver.Address并没有提供权重的属性。官方给的答复是:把权重存储到地址的元数据metadata中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// attributeKey is the type used as the key to store AddrInfo in the Attributes
// field of resolver.Address.
type attributeKey struct{}

// AddrInfo will be stored inside Address metadata in order to use weighted balancer.
type AddrInfo struct {
	Weight int
}

// SetAddrInfo returns a copy of addr in which the Attributes field is updated
// with addrInfo.
func SetAddrInfo(addr resolver.Address, addrInfo AddrInfo) resolver.Address {
	addr.Attributes = attributes.New()
	addr.Attributes = addr.Attributes.WithValues(attributeKey{}, addrInfo)
	return addr
}

// GetAddrInfo returns the AddrInfo stored in the Attributes fields of addr.
func GetAddrInfo(addr resolver.Address) AddrInfo {
	v := addr.Attributes.Value(attributeKey{})
	ai, _ := v.(AddrInfo)
	return ai
}

定义AddrInfo结构体并添加权重Weight属性,Set方法把Weight存储到resolver.Address中,Get方法从resolver.Address获取Weight。

解决权重存储问题后,接下来我们实现加权随机法负载均衡策略。

首先实现PickerBuilder接口,返回子连接选择器。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
	grpclog.Infof("weightPicker: newPicker called with info: %v", info)
	if len(info.ReadySCs) == 0 {
		return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
	}
	var scs []balancer.SubConn
	for subConn, addr := range info.ReadySCs {
		node := GetAddrInfo(addr.Address)
		if node.Weight <= 0 {
			node.Weight = minWeight
		} else if node.Weight > 5 {
			node.Weight = maxWeight
		}
		for i := 0; i < node.Weight; i++ {
			scs = append(scs, subConn)
		}
	}
	return &rrPicker{
		subConns: scs,
	}
}

加权随机法中,我使用空间换时间的方式,把权重转成地址个数(例如addr1的权重是3,那么添加3个子连接到切片中;addr2权重为1,则添加1个子连接;选择子连接时候,按子连接切片长度生成随机数,以随机数作为下标就是选中的子连接),避免重复计算权重。考虑到内存占用,权重定义从1到5权重。

接下来实现子连接的选择,获取随机数,选择子连接

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
type rrPicker struct {
	subConns []balancer.SubConn
	mu sync.Mutex
}

func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
	p.mu.Lock()
	index := rand.Intn(len(p.subConns))
	sc := p.subConns[index]
	p.mu.Unlock()
	return balancer.PickResult{SubConn: sc}, nil
}

关键代码完成后,我们把加权随机法负载均衡策略命名为weight,并注册到gRPC的负载均衡策略中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// Name is the name of weight balancer.
const Name = "weight"
// NewBuilder creates a new weight balancer builder.
func newBuilder() balancer.Builder {
	return base.NewBalancerBuilder(Name, &rrPickerBuilder{}, base.Config{HealthCheck: false})
}

func init() {
	balancer.Register(newBuilder())
}

最后,我们只需要在服务端注册服务时候附带权重,然后客户端在服务发现时把权重Set到resolver.Address中,最后客户端把负载论衡策略改成weight就完成了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
//SetServiceList 设置服务地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
	s.lock.Lock()
	defer s.lock.Unlock()
	//获取服务地址
	addr := resolver.Address{Addr: strings.TrimPrefix(key, s.prefix)}
	//获取服务地址权重
	nodeWeight, err := strconv.Atoi(val)
	if err != nil {
		//非数字字符默认权重为1
		nodeWeight = 1
	}
	//把服务地址权重存储到resolver.Address的元数据中
	addr = weight.SetAddrInfo(addr, weight.AddrInfo{Weight: nodeWeight})
	s.serverList[key] = addr
	s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
	log.Println("put key :", key, "wieght:", val)
}

客户端使用weight负载均衡策略

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func main() {
	r := etcdv3.NewServiceDiscovery(EtcdEndpoints)
	resolver.Register(r)
	// 连接服务器
	conn, err := grpc.Dial(
		fmt.Sprintf("%s:///%s", r.Scheme(), SerName),
		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"weight"}`),
		grpc.WithInsecure(),
	)
	if err != nil {
		log.Fatalf("net.Connect err: %v", err)
	}
	defer conn.Close()

参考

gRPC服务发现&负载均衡

gRPC负载均衡(客户端负载均衡)

gRPC负载均衡(自定义负载均衡策略)

gRPC 之 LoadBalancer