基本原理
gRPC开源组件官方并未直接提供服务注册与发现的功能实现,但其设计文档已提供实现的思路,并在不同语言的gRPC代码API中已提供了命名解析和负载均衡接口供扩展。

其基本实现原理:
- 服务启动后gRPC客户端向命名服务器发出名称解析请求,名称将解析为一个或多个IP地址,每个IP地址标示它是服务器地址还是负载均衡器地址,以及标示要使用那个客户端负载均衡策略或服务配置。
- 客户端实例化负载均衡策略,如果解析返回的地址是负载均衡器地址,则客户端将使用grpclb策略,否则客户端使用服务配置请求的负载均衡策略。
- 负载均衡策略为每个服务器地址创建一个子通道(channel)。
- 当有rpc请求时,负载均衡策略决定那个子通道即grpc服务器将接收请求,当可用服务器为空时客户端的请求将被阻塞。
根据gRPC官方提供的设计思路,基于进程内LB方案(即第2个案,阿里开源的服务框架 Dubbo 也是采用类似机制),结合分布式一致的组件(如Zookeeper、Consul、Etcd),可找到gRPC服务发现和负载均衡的可行解决方案。接下来以GO语言为例,简单介绍下基于Etcd3的关键代码实现:
服务发现
gRPC已提供了简单的负载均衡策略(如:Round Robin),我们只需实现它提供的Builder和Resolver接口,就能完成gRPC客户端负载均衡。
1
2
3
4
5
6
7
8
9
10
11
|
// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
// Build creates a new resolver for the given target.
//
// gRPC dial calls Build synchronously, and fails if the returned error is
// not nil.
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
// Scheme returns the scheme supported by this resolver.
// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
Scheme() string
}
|
Build方法的第一个参数target的类型为Target定义如下,创建ClientConn调用grpc.DialContext的第二个参数target经过解析后需要符合这个结构定义,target定义格式为: scheme://authority/endpoint_name
1
2
3
4
5
|
type Target struct {
Scheme string // 表示要使用的名称系统
Authority string // 表示一些特定于方案的引导信息
Endpoint string // 指出一个具体的名字
}
|
Build方法返回的Resolver也是一个接口类型。定义如下
1
2
3
4
|
type Resolver interface {
ResolveNow(ResolveNowOptions)
Close()
}
|
1
2
3
4
5
6
7
8
9
10
11
|
// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
// ResolveNow will be called by gRPC to try to resolve the target name
// again. It's just a hint, resolver can ignore this if it's not necessary.
//
// It could be called multiple times concurrently.
ResolveNow(ResolveNowOptions)
// Close closes the resolver.
Close()
}
|
- Resolver接口:监视指定目标的更新,包括地址更新和服务配置更新。
- ResolveNow方法:被 gRPC 调用,以尝试再次解析目标名称。只用于提示,可忽略该方法。
- Close方法:关闭resolver
根据以上两个接口,我们把服务发现的功能写在Build方法中,把获取到的负载均衡服务地址返回到客户端,并监视服务更新情况,以修改客户端连接。
实现resolver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
package etcdv3
import (
"context"
"log"
"sync"
"time"
"github.com/coreos/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc/resolver"
)
const schema = "grpclb"
//ServiceDiscovery 服务发现
type ServiceDiscovery struct {
cli *clientv3.Client //etcd client
cc resolver.ClientConn
serverList sync.Map //服务列表
}
//NewServiceDiscovery 新建发现服务
func NewServiceDiscovery(endpoints []string) resolver.Builder {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
return &ServiceDiscovery{
cli: cli,
}
}
//Build 为给定目标创建一个新的`resolver`,当调用`grpc.Dial()`时执行
func (s *ServiceDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
log.Println("Build")
s.cc = cc
prefix := "/" + target.Scheme + "/" + target.Endpoint + "/"
//根据前缀获取现有的key
resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
if err != nil {
return nil, err
}
for _, ev := range resp.Kvs {
s.SetServiceList(string(ev.Key), string(ev.Value))
}
s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
//监视前缀,修改变更的server
go s.watcher(prefix)
return s, nil
}
// ResolveNow 监视目标更新
func (s *ServiceDiscovery) ResolveNow(rn resolver.ResolveNowOption) {
log.Println("ResolveNow")
}
//Scheme return schema
func (s *ServiceDiscovery) Scheme() string {
return schema
}
//Close 关闭
func (s *ServiceDiscovery) Close() {
log.Println("Close")
s.cli.Close()
}
//watcher 监听前缀
func (s *ServiceDiscovery) watcher(prefix string) {
rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
log.Printf("watching prefix:%s now...", prefix)
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case mvccpb.PUT: //新增或修改
s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
case mvccpb.DELETE: //删除
s.DelServiceList(string(ev.Kv.Key))
}
}
}
}
//SetServiceList 新增服务地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
s.serverList.Store(key, resolver.Address{Addr: val})
s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
log.Println("put key :", key, "val:", val)
}
//DelServiceList 删除服务地址
func (s *ServiceDiscovery) DelServiceList(key string) {
s.serverList.Delete(key)
s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
log.Println("del key:", key)
}
//GetServices 获取服务地址
func (s *ServiceDiscovery) getServices() []resolver.Address {
addrs := make([]resolver.Address, 0, 10)
s.serverList.Range(func(k, v interface{}) bool {
addrs = append(addrs, v.(resolver.Address))
return true
})
return addrs
}
|
代码主要修改以下地方:
- 把获取的服务地址转成resolver.Address,供gRPC客户端连接。
- 根据schema的定义规则,修改key格式。
实现register
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
package etcdv3
import (
"context"
"log"
"time"
"go.etcd.io/etcd/clientv3"
)
//ServiceRegister 创建租约注册服务
type ServiceRegister struct {
cli *clientv3.Client //etcd client
leaseID clientv3.LeaseID //租约ID
//租约keepalieve相应chan
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
key string //key
val string //value
}
//NewServiceRegister 新建注册服务
func NewServiceRegister(endpoints []string, serName, addr string, lease int64) (*ServiceRegister, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
ser := &ServiceRegister{
cli: cli,
key: "/" + schema + "/" + serName + "/" + addr,
val: addr,
}
//申请租约设置时间keepalive
if err := ser.putKeyWithLease(lease); err != nil {
return nil, err
}
return ser, nil
}
//设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
//设置租约时间
resp, err := s.cli.Grant(context.Background(), lease)
if err != nil {
return err
}
//注册服务并绑定租约
_, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
if err != nil {
return err
}
//设置续租 定期发送需求请求
leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)
if err != nil {
return err
}
s.leaseID = resp.ID
s.keepAliveChan = leaseRespChan
log.Printf("Put key:%s val:%s success!", s.key, s.val)
return nil
}
//ListenLeaseRespChan 监听 续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
for leaseKeepResp := range s.keepAliveChan {
log.Println("续约成功", leaseKeepResp)
}
log.Println("关闭续租")
}
// Close 注销服务
func (s *ServiceRegister) Close() error {
//撤销租约
if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
return err
}
log.Println("撤销租约")
return s.cli.Close()
}
|
实现代码
客户端修改gRPC连接服务的部分代码即可:
1
2
3
4
5
6
7
8
9
10
11
12
|
func main() {
r := etcdv3.NewServiceDiscovery(EtcdEndpoints)
resolver.Register(r)
// 连接服务器
conn, err := grpc.Dial(r.Scheme()+"://8.8.8.8/simple_grpc", grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`), grpc.WithInsecure())
if err != nil {
log.Fatalf("net.Connect err: %v", err)
}
defer conn.Close()
// 建立gRPC连接
grpcClient = pb.NewSimpleClient(conn)
|
gRPC内置了简单的负载均衡策略round_robin,根据负载均衡地址,以轮询的方式进行调用服务。
服务端启动时,把服务地址注册到etcd中即可:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
func main() {
// 监听本地端口
listener, err := net.Listen(Network, Address)
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}
log.Println(Address + " net.Listing...")
// 新建gRPC服务器实例
grpcServer := grpc.NewServer()
// 在gRPC服务器注册我们的服务
pb.RegisterSimpleServer(grpcServer, &SimpleService{})
//把服务注册到etcd
ser, err := etcdv3.NewServiceRegister(EtcdEndpoints, SerName, Address, 5)
if err != nil {
log.Fatalf("register service err: %v", err)
}
defer ser.Close()
//用服务器 Serve() 方法以及我们的端口信息区实现阻塞等待,直到进程被杀死或者 Stop() 被调用
err = grpcServer.Serve(listener)
if err != nil {
log.Fatalf("grpcServer.Serve err: %v", err)
}
}
|
负载均衡
官方只提供了pick_first和round_robin两种负载均衡策略,轮询法round_robin不能满足因服务器配置不同而承担不同负载量,这篇文章将介绍如何实现自定义负载均衡策略–加权随机法。
加权随机法可以根据服务器的处理能力而分配不同的权重,从而实现处理能力高的服务器可承担更多的请求,处理能力低的服务器少承担请求。
Banlancer
Banlancer Builder 接口定义如下:
1
2
3
4
5
6
7
8
|
// Builder creates a balancer.
type Builder interface {
// Build creates a new balancer with the ClientConn.
Build(cc ClientConn, opts BuildOptions) Balancer
// Name returns the name of balancers built by this builder.
// It will be used to pick balancers (for example in service config).
Name() string
}
|
Banlancer 接口定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
// Balancer takes input from gRPC, manages SubConns, and collects and aggregates
// the connectivity states.
//
// It also generates and updates the Picker used by gRPC to pick SubConns for RPCs.
//
// HandleSubConnectionStateChange, HandleResolvedAddrs and Close are guaranteed
// to be called synchronously from the same goroutine.
// There's no guarantee on picker.Pick, it may be called anytime.
type Balancer interface {
// HandleSubConnStateChange is called by gRPC when the connectivity state
// of sc has changed.
// Balancer is expected to aggregate all the state of SubConn and report
// that back to gRPC.
// Balancer should also generate and update Pickers when its internal state has
// been changed by the new state.
HandleSubConnStateChange(sc SubConn, state connectivity.State)
// HandleResolvedAddrs is called by gRPC to send updated resolved addresses to
// balancers.
// Balancer can create new SubConn or remove SubConn with the addresses.
// An empty address slice and a non-nil error will be passed if the resolver returns
// non-nil error to gRPC.
HandleResolvedAddrs([]resolver.Address, error)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
Close()
}
|
banlancer/base 中实现了大多数 banlancer 功能,不同选择算法的实现,基于 base 实现 pickBuilder 与 picker 接口即可。
base.NewBalancerBuilder 函数定义如下:
1
2
3
4
5
6
7
8
9
|
// Base 实现的函数
// NewBalancerBuilder returns a balancer builder. The balancers
// built by this builder will use the picker builder to build pickers.
func NewBalancerBuilder(name string, pb PickerBuilder) balancer.Builder {
return &baseBuilder{
name: name,
pickerBuilder: pb,
}
}
|
RR Banlancer 的实现基于 balancer/base 基础实现,核心功能主体在 balancer/base 中实现,而 RR Banlancer 基于 base.NewBalancerBuilder 实现了 balancer.Builder 接口,可以用于注册。
PickerBuilder & Picker
gRPC提供了PickerBuilder和Picker接口让我们实现自己的负载均衡策略。
1
2
3
4
5
6
7
8
9
10
11
12
|
// PickerBuildInfo contains information needed by the picker builder to
// construct a picker.
type PickerBuildInfo struct {
// ReadySCs is a map from all ready SubConns to the Addresses used to
// create them.
ReadySCs map[balancer.SubConn]SubConnInfo
}
// PickerBuilder creates balancer.Picker.
type PickerBuilder interface {
// Build returns a picker that will be used by gRPC to pick a SubConn.
Build(info PickerBuildInfo) balancer.Picker
}
|
- PickerBuilder接口:创建子连接选择器。
- Build方法:返回一个选择器,将用于gRPC选择子连接。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
// PickInfo contains additional information for the Pick operation.
type PickInfo struct {
// FullMethodName is the method name that NewClientStream() is called
// with. The canonical format is /service/Method.
FullMethodName string
// Ctx is the RPC's context, and may contain relevant RPC-level information
// like the outgoing header metadata.
Ctx context.Context
}
// PickResult contains information related to a connection chosen for an RPC.
type PickResult struct {
// SubConn is the connection to use for this pick, if its state is Ready.
// If the state is not Ready, gRPC will block the RPC until a new Picker is
// provided by the balancer (using ClientConn.UpdateState). The SubConn
// must be one returned by ClientConn.NewSubConn.
SubConn SubConn
// Done is called when the RPC is completed. If the SubConn is not ready,
// this will be called with a nil parameter. If the SubConn is not a valid
// type, Done may not be called. May be nil if the balancer does not wish
// to be notified when the RPC completes.
Done func(DoneInfo)
}
// Picker is used by gRPC to pick a SubConn to send an RPC.
// Balancer is expected to generate a new picker from its snapshot every time its
// internal state has changed.
//
// The pickers used by gRPC can be updated by ClientConn.UpdateState().
type Picker interface {
// Pick returns the connection to use for this RPC and related information.
//
// Pick should not block. If the balancer needs to do I/O or any blocking
// or time-consuming work to service this call, it should return
// ErrNoSubConnAvailable, and the Pick call will be repeated by gRPC when
// the Picker is updated (using ClientConn.UpdateState).
//
// If an error is returned:
//
// - If the error is ErrNoSubConnAvailable, gRPC will block until a new
// Picker is provided by the balancer (using ClientConn.UpdateState).
//
// - If the error is a status error (implemented by the grpc/status
// package), gRPC will terminate the RPC with the code and message
// provided.
//
// - For all other errors, wait for ready RPCs will wait, but non-wait for
// ready RPCs will be terminated with this error's Error() string and
// status code Unavailable.
Pick(info PickInfo) (PickResult, error)
}
|
- Picker接口:用于gRPC选择子连接去发送请求。
- Pick方法:子连接选择
实现RoundRobin
RR Banlancer 的实现基于 balancer/base 基础实现,核心功能主体在 balancer/base 中实现,而 RR Banlancer 基于 base.NewBalancerBuilder 实现了 balancer.Builder 接口,可以用于注册。
1
2
3
4
5
6
7
8
9
10
11
|
// Name is the name of round_robin balancer.
const Name = "round_robin"
// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, &rrPickerBuilder{})
}
func init() {
balancer.Register(newBuilder())
}
|
RoundRobin Banlancer Builder 实现了 PickerBuilder 的接口:
1
2
3
4
5
6
7
8
9
10
11
|
func (*rrPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
grpclog.Infof("roundrobinPicker: newPicker called with readySCs: %v", readySCs)
// 将 map 接口转化成 slice[] 结构,并使用期构造 rrPicker 并返回
var scs []balancer.SubConn
for _, sc := range readySCs {
scs = append(scs, sc)
}
return &rrPicker{
subConns: scs,
}
}
|
banlancer.Picker 由 rrPicker 来实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
type rrPicker struct {
// subConns is the snapshot of the roundrobin balancer when this picker was
// created. The slice is immutable. Each Get() will do a round robin
// selection from it and return the selected SubConn.
subConns []balancer.SubConn
mu sync.Mutex
// 用于记录下一个位移量
next int
}
func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
p.mu.Lock()
sc := p.subConns[p.next]
p.next = (p.next + 1) % len(p.subConns)
p.mu.Unlock()
return balancer.PickResult{SubConn: sc}, nil
}
|
实现WeightPicker
问题来了,我们需要把服务器地址的权重添加进去,但是地址resolver.Address并没有提供权重的属性。官方给的答复是:把权重存储到地址的元数据metadata中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
// attributeKey is the type used as the key to store AddrInfo in the Attributes
// field of resolver.Address.
type attributeKey struct{}
// AddrInfo will be stored inside Address metadata in order to use weighted balancer.
type AddrInfo struct {
Weight int
}
// SetAddrInfo returns a copy of addr in which the Attributes field is updated
// with addrInfo.
func SetAddrInfo(addr resolver.Address, addrInfo AddrInfo) resolver.Address {
addr.Attributes = attributes.New()
addr.Attributes = addr.Attributes.WithValues(attributeKey{}, addrInfo)
return addr
}
// GetAddrInfo returns the AddrInfo stored in the Attributes fields of addr.
func GetAddrInfo(addr resolver.Address) AddrInfo {
v := addr.Attributes.Value(attributeKey{})
ai, _ := v.(AddrInfo)
return ai
}
|
定义AddrInfo结构体并添加权重Weight属性,Set方法把Weight存储到resolver.Address中,Get方法从resolver.Address获取Weight。
解决权重存储问题后,接下来我们实现加权随机法负载均衡策略。
首先实现PickerBuilder接口,返回子连接选择器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
grpclog.Infof("weightPicker: newPicker called with info: %v", info)
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
var scs []balancer.SubConn
for subConn, addr := range info.ReadySCs {
node := GetAddrInfo(addr.Address)
if node.Weight <= 0 {
node.Weight = minWeight
} else if node.Weight > 5 {
node.Weight = maxWeight
}
for i := 0; i < node.Weight; i++ {
scs = append(scs, subConn)
}
}
return &rrPicker{
subConns: scs,
}
}
|
加权随机法中,我使用空间换时间的方式,把权重转成地址个数(例如addr1的权重是3,那么添加3个子连接到切片中;addr2权重为1,则添加1个子连接;选择子连接时候,按子连接切片长度生成随机数,以随机数作为下标就是选中的子连接),避免重复计算权重。考虑到内存占用,权重定义从1到5权重。
接下来实现子连接的选择,获取随机数,选择子连接
1
2
3
4
5
6
7
8
9
10
11
12
|
type rrPicker struct {
subConns []balancer.SubConn
mu sync.Mutex
}
func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
p.mu.Lock()
index := rand.Intn(len(p.subConns))
sc := p.subConns[index]
p.mu.Unlock()
return balancer.PickResult{SubConn: sc}, nil
}
|
关键代码完成后,我们把加权随机法负载均衡策略命名为weight,并注册到gRPC的负载均衡策略中。
1
2
3
4
5
6
7
8
9
10
|
// Name is the name of weight balancer.
const Name = "weight"
// NewBuilder creates a new weight balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, &rrPickerBuilder{}, base.Config{HealthCheck: false})
}
func init() {
balancer.Register(newBuilder())
}
|
最后,我们只需要在服务端注册服务时候附带权重,然后客户端在服务发现时把权重Set到resolver.Address中,最后客户端把负载论衡策略改成weight就完成了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
//SetServiceList 设置服务地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
s.lock.Lock()
defer s.lock.Unlock()
//获取服务地址
addr := resolver.Address{Addr: strings.TrimPrefix(key, s.prefix)}
//获取服务地址权重
nodeWeight, err := strconv.Atoi(val)
if err != nil {
//非数字字符默认权重为1
nodeWeight = 1
}
//把服务地址权重存储到resolver.Address的元数据中
addr = weight.SetAddrInfo(addr, weight.AddrInfo{Weight: nodeWeight})
s.serverList[key] = addr
s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
log.Println("put key :", key, "wieght:", val)
}
|
客户端使用weight负载均衡策略
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func main() {
r := etcdv3.NewServiceDiscovery(EtcdEndpoints)
resolver.Register(r)
// 连接服务器
conn, err := grpc.Dial(
fmt.Sprintf("%s:///%s", r.Scheme(), SerName),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"weight"}`),
grpc.WithInsecure(),
)
if err != nil {
log.Fatalf("net.Connect err: %v", err)
}
defer conn.Close()
|
参考
gRPC服务发现&负载均衡
gRPC负载均衡(客户端负载均衡)
gRPC负载均衡(自定义负载均衡策略)
gRPC 之 LoadBalancer