ETCD

ETCD是用于共享配置和服务发现的分布式,一致性的KV存储系统。ETCD是CoreOS公司发起的一个开源项目,授权协议为Apache。

核心特性:

  • 将数据存储在集群中的高可用kv存储
  • 允许应用实时监控kv变化
  • 能够容忍单点故障,能够应对网络分区

复杂特性:

  • 底层存储是按照key有序排列的,可以顺序遍历
  • 因为key有序,所以etcd天然支持按目录结果高效遍历
  • 支持复杂事务,提供if…then…else的事务能力
  • 基于租约机制实现key的TTL过期

客户端连接实例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
var (
        config clientv3.Config
        client *clientv3.Client
        err error
     )

config = clientv3.Config{
    Endpoints: []string{"127.0.0.1:2379"}, // 集群列表
    DialTimeout: 5 * time.Second,
}

if client, err = clientv3.New(config); err != nil {
    log.Printf("clientv3.New error:", err)
    return
}

api介绍

kv操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
type KV interface {
    Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

    Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

    Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)

    Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
    Do(ctx context.Context, op Op) (OpResponse, error)

    Txn(ctx context.Context) Txn
}

关于租约

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
type Lease interface {

    Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)

    Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)


    TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)

    KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)

    KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)

    Close() error
}

使用方法

KeyValue 请求定义

etcd大部分API都是对KV对的请求和操作。etcd kv的protobuf定义如下:

1
2
3
4
5
6
7
8
message KeyValue {
    bytes key = 1;
    int64 create_revision = 2;
    int64 mod_revision = 3;
    int64 version = 4;
    bytes value = 5;
    int64 lease = 6;
 }

各个字段意义如下:

  • key是字节数组,不可为空;
  • value也是字节数组;
  • version则是key的版本,一个删除动作会把这个值清零,每次更新则会让其值增一;
  • Create_Revision key创建时候的revision;
  • Mod_Revision key最近一次修改时的revision;
  • Lease 与key关联的Lease,如果其值为0则说明没有关联的Lease;

revision是MVCC中的概念,是etcd中cluster级别的计数器,每次修改操作都会让其自增,可以认为是全局逻辑时钟(global logical clock),对所有修改操作进行排序:revision越大说明其值越新,etcd对key索引使用B+树方式进行组织。etcd每个key都有很多revision(修订版本),每次事务操作都会创建一个revision,老的revision在etcd进行compaction操作的时候会被清除。createrevision会在使用mutex lock的时候使用,ModRevision与事务操作和leader选举有关。

revision也与watch有关,当watch的client与server闪断重连后,etcd根据client上次watch相关的revision,把其后的修改再通知给client。

存储值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
	"log"
	"time"

	"golang.org/x/net/context"

	"github.com/coreos/etcd/clientv3"
)

var (
	dialTimeout    = 5 * time.Second
	requestTimeout = 2 * time.Second
	endpoints      = []string{"localhost:2379"}
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

        // 设置 key1 的值为 value1
	key1, value1 := "key1", `value1`
	if resp, err := cli.Put(context.TODO(), key1, value1); err != nil {
		log.Fatal(err)
	} else {
		log.Println(resp)
	}

        // 设置 key1 的值为 value2, 并返回前一个值
	value2 := "value2"
	if resp, err := cli.Put(context.TODO(), key1, value2, clientv3.WithPrevKV()); err != nil {
		log.Fatal(err)
	} else {
		log.Println(resp)
	}
}

获取到的输出如下:

1
2
2018/05/10 22:00:05 &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:113 raft_term:2  <nil>}
2018/05/10 22:00:05 &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:114 raft_term:2  key:"key1" create_revision:113 mod_revision:113 version:1 value:"value1" }

我们看到返回的值中除了 key 和 value 之外,还包括三个字段,它们的定义如下:

  • Version:每次对这个 key 进行修改后增加1,删除这个key之后从0开始
  • Create_Revision:最后一次创建这个 key 的版本号
  • Mod_Revision:最后一次修改这个 key 的版本号

设置值时还可以使用很多的选项,例如 Lease 等。

读取值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
	"log"
	"time"

	"golang.org/x/net/context"

	"github.com/coreos/etcd/clientv3"
)

var (
	dialTimeout    = 5 * time.Second
	requestTimeout = 2 * time.Second
	endpoints      = []string{"localhost:2379"}
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	key1, value1 := "key1", `value1`
	_ = value1
        // 读取一个已存在的值
	if resp, err := cli.Get(context.TODO(), key1); err != nil {
		log.Fatal(err)
	} else {
		log.Println(resp)
	}
        // 读取一个不存在的值
	if resp, err := cli.Get(context.TODO(), key1+key1); err != nil {
		log.Fatal(err)
	} else {
		log.Println(resp)
	}
}

获取到的输出如下:

1
2
2018/05/10 22:05:25 &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:114 raft_term:2  [key:"key1" create_revision:113 mod_revision:114 version:2 value:"value2" ] false 1}
2018/05/10 22:05:25 &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:114 raft_term:2  [] false 0}

返回的数据定义如下:

  • kvs:返回的 key 的列表
  • more:如果指定了 limit,则代表是否还有更多的字段
  • count:符合 range 查询的总数

删除值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main

import (
	"fmt"
	"log"
	"time"

	"golang.org/x/net/context"

	"github.com/coreos/etcd/clientv3"
)

var (
	dialTimeout    = 5 * time.Second
	requestTimeout = 2 * time.Second
	endpoints      = []string{"localhost:2379"}
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	key1, value1 := "key1", `value1`
	_ = value1

	// 删除一个存在的 key
	if resp, err := cli.Delete(context.Background(), key1); err != nil {
		fmt.Println(err)
	} else {
		fmt.Println(resp)
	}

	// 删除一个不存在的 key
	if resp, err := cli.Delete(context.Background(), "key10"); err != nil {
		fmt.Println(err)
	} else {
		fmt.Println(resp)
	}
}

获取到的输出如下:

1
2
&{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:118 raft_term:2  1 []}
&{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:118 raft_term:2  0 []}

返回的数据定义如下:

  • deleted:删除的数量
  • prev_kv:删除的 key

Transaction

请求定义

在一个事务请求内完成),能够防止意外的并行更新,构建原子的compare-and-swap操作,提供了一种更高级的并行控制能力。

事务内revision只增加一次,但是一个事务内对一个kv的写操作只能进行一次。事务要么成功,要么失败,没有中间状态,

事务操作可以认为是一个比较操作链,每个比较动作定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
message Compare {
  enum CompareResult {
    EQUAL = 0;
    GREATER = 1;
    LESS = 2;
    NOT_EQUAL = 3;
  }
  enum CompareTarget {
    VERSION = 0;
    CREATE = 1;
    MOD = 2;
    VALUE= 3;
  }
  CompareResult result = 1;
  // target is the key-value field to inspect for the comparison.
  CompareTarget target = 2;
  // key is the subject key for the comparison operation.
  bytes key = 3;
  oneof target_union {
    int64 version = 4;
    int64 create_revision = 5;
    int64 mod_revision = 6;
    bytes value = 7;
  }
}
  • Result - 逻辑比较类型,如相等、小于或者大于;
  • Target - 有待被比较的kv的某个字段,如key的version、创建 revision、修改revision或者value;
  • Key - 用于比较操作的key;
  • Target_Union - 附带比较对象,如给定的key的版本、给定key的创建revision、最后的修改revision和key的value。

定义了比较算子后,事务请求还需要一连串的子请求操作,定义如下:

1
2
3
4
5
6
7
8
message RequestOp {
  // request is a union of request types accepted by a transaction.
  oneof request {
    RangeRequest request_range = 1;
    PutRequest request_put = 2;
    DeleteRangeRequest request_delete_range = 3;
  }
}
  • Request_Range - 一个RangeRequest;
  • Request_Put - 一个PutRequest,keys中每个key都必须唯一不能重复;
  • Request_Delete_Range - 一个DeleteRangeRequest,其操作的key也必须在整个事务中唯一。

最终事务请求定义如下:

1
2
3
4
5
message TxnRequest {
  repeated Compare compare = 1;
  repeated RequestOp success = 2;
  repeated RequestOp failure = 3;
}
  • Compare - 一个比较算子序列;
  • Success - 如果比较成功,则处理这个请求对象序列,响应的结果就是对这些子请求处理的结果;
  • Failure - 如果比较失败,则处理这个请求对象序列,响应的结果就是对这些子请求处理的结果。

事务响应定义如下:

1
2
3
4
5
message TxnResponse {
  ResponseHeader header = 1;
  bool succeeded = 2;
  repeated ResponseOp responses = 3;
}
  • Succeeded - 算子比较的结果,success则为true,fail则为false;
  • Responses - 对所有子请求的处理结果。

ResponseOp定义如下:

1
2
3
4
5
6
7
message ResponseOp {
  oneof response {
    RangeResponse response_range = 1;
    PutResponse response_put = 2;
    DeleteRangeResponse response_delete_range = 3;
  }
}

ResponseOp的成员与RequestOp对应,此处就不再一一列举解释了。

举例

etcd v3 支持 If/Then/Else 形式的多键事务.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package main

import (
	"fmt"
	"log"
	"sync"
	"time"

	"golang.org/x/net/context"

	"github.com/coreos/etcd/clientv3"
)

var (
	dialTimeout    = 5 * time.Second
	requestTimeout = 2 * time.Second
	endpoints      = []string{"localhost:2379"}
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	key1, value1 := "key1", `value1`
	_ = key1
	_ = value1

	// setnx
	var w sync.WaitGroup
	w.Add(10)
	key10 := "setnx"
	for i := 0; i < 10; i++ {
		go func(i int) {
			time.Sleep(5 * time.Millisecond)
			_, err := cli.Txn(context.Background()).
				If(clientv3.Compare(clientv3.CreateRevision(key10), "=", 0)).
				Then(clientv3.OpPut(key10, fmt.Sprintf("%d", i))).
				Commit()
			if err != nil {
				fmt.Println(err)
			}

			w.Done()
		}(i)
	}

	w.Wait()

	if resp, err := cli.Get(context.TODO(), key10); err != nil {
		log.Fatal(err)
	} else {
		log.Println(resp)
	}
}

获取到的输出如下:

1
2018/05/10 22:28:49 &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:120 raft_term:2  [key:"setnx" create_revision:120 mod_revision:120 version:1 value:"5" ] false 1}

通过 key 的 Create_Revision 是否为 0 来判断 key 是否存在。其中 If,Then 以及 Else 分支都可以包含多个操作。返回的数据包含一个 succeeded 字段,当为 true 时代表 If 的值为真。

Watch

原理

etcd v2的Watch API实际上是一个标准的HTTP GET请求,与一般的请求不同的是,它多了一个”?wait=true”的URL参数。当etcd v2的Server看到这个参数的时候,就知道这是一个watch请求,并且不会立即返回response,而是一直会等到被watch的这个key有了更新以后该请求才会返回。

1
curl http://127.0.0.1:2379/v2/keys/foo&wait=true

值得注意的是,客户端还可以指定版本号来watch。如果客户端指定了版本号,那么服务器端会返回大于该版本号的第一个更新的数据。例如watch的时候可以指定index=7,示例代码如下所示:

1
curl http://127.0.0.1:2379/v2/keys/foo?wait=true&waitindex=7'

客户端可以指定版本号watch,然而服务器端只保留了最新的1000个变更记录。也就是说,如果客户端指定的版本号,是1000个变更记录之前的,则会watch不到。

etcd v2的watch是基于HTTP的long poll实现的,其请求本质上是一个HTTP1.1的长连接。因此一个watch请求需要维持一个TCP连接。这就导致了服务端需要耗费很多资源用于维持TCP长连接。

watch只能watch某一个key以及其子节点(通过参数recursive设置),一个watch请求不能同时watch多个不同的key。

由于watch的历史记录最多只有1000条,因此很难通过watch机制来实现完整的数据同步(有丢失变更的风险),所以当前的大多数使用方式是通过watch来得知变更,然后通过GET来重新获取数据,并不是完全依赖于watch的变更event。

etcd v3 的watch机制在etcd v2的基础上做了很多改进,一个显著的优化是减小了每个watch所带来的资源消耗,从而能够支持更大规模的watch。首先etcd v3的API采用了gRPC,而gRPC又利用了HTTP/2的TCP链接多路复用(multiple stream per tcp connection),这样同一个Client的不同watch可以共享同一个TCP连接。

etcd会保存每个客户端发来的watch请求,watch请求可以关注一个key (单key),或者一个key前缀(区间),所以watchGroup包含两种Watcher:一种是key Watchers,数据结构是每个key对应一组Watcher,另外一种是range Watchers,数据结构是一个线段树,可以方便地通过区间查找到对应的Watcher。

etcd会有一个线程持续不断地遍历所有的watch请求,每个watch对象都会负责维护其监控的key事件,看其推送到了哪个revision。etcd会根据这个revision.main ID去BoltDB中继续向后遍历,实际上BoltDB类似于leveldb,是一个按key有序排列的Key-Value(K-V)引擎,而BoltDB中的key是由revision.main+revision.sub组成的,所以遍历就会依次经过历史上发生过的所有事务(tx)的记录。

对于遍历经过的每个K-V, etcd会反序列化其中的value,也就是实际etcd存储的Key Value,然后判断其中的key是否为watch请求关注的key,如果是就发送给客户端。

然而每次都对单个的watch对象进行扫描效率太差了,实际上etcd在实现的时候会将watch对象分组,然后根据组内的最小revision去检查,这样一次性可以处理多个watcher,减少扫描次数。

请求定义

Watch API提供了一组基于事件的接口,用于异步获取key的变化后的通知。etcd会把key的每一次变化都通知给观察者,而不像zookeeper那样只通知最近一次的变化。

Event代表了key的一次update,包括update的类型和变化前后的数据,定义如下:

1
2
3
4
5
6
7
8
9
message Event {
  enum EventType {
    PUT = 0;
    DELETE = 1;
  }
  EventType type = 1;
  KeyValue kv = 2;
  KeyValue prev_kv = 3;
}
  • Type - event type,PUT则下面会给出新增加的value,DELETE则指出key已被删除;
  • KV - KeyValue是event相关的value,如果type是PUT则KV是当前更新后的kv对,如果kv.Version值为1则说明kv是新创建的。如果type是* DELETE,则KV的revision就是delete动作发生时的revision;
  • Prev_KV - event动作发生前的kv值,为了节省带宽,如果请求中没有特别指明,这个值内容为空。

Watch是一个长久运行的请求,基于gRPC的stream进行stream数据的传输。

Watch对event作出了如下三项保证:

  • 有序 - event依据revision进行排序,一个event投递一次后就不会再次被投递;
  • 可靠 - 一系列event通知如a/b/c的发生时间a < b < c,则只会依次收到a、b和c,不会只收到a和c;
  • 原子 - 一次操作产生一个revision,server发出一个event,事务的结果也仅仅产生一次event通知。

基于一次gRPC stream连接,可以发出如下watch创建请求:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
message WatchCreateRequest {
  bytes key = 1;
  bytes range_end = 2;
  int64 start_revision = 3;
  bool progress_notify = 4;

  enum FilterType {
    NOPUT = 0;
    NODELETE = 1;
  }
  repeated FilterType filters = 5;
  bool prev_kv = 6;
}
  • Key, Range_End - 被观察的key的range[key, range_end),如果 range_end 没有设置,则只有参数key被观察,如果 range_end 等同于’\0’, 则大于等于参数 key 的所有 key 都将被观察;
  • Start_Revision - 观察的起始的revision,如果不设置则是最新的revision;
  • Progress_Notify - 如果设置为true,则etcd将定期发送不带任何事件的空WatchResponse。当一个watch连接断开后,客户端进行重连时候会指定开始的revision,server会根据当前系统的负载决定把发送watch event的频率;
  • Filters - event过滤器,server给watch客户端发送通知的时候,会先把相关事件过滤掉;
  • Prev_kv - 如果设置为true,则被创建的观察者在事件发生前获取上一次的kv,如果上一次的kv在etcd compaction的时候被删除掉,则不会返回任何值。

watch的响应内容定义如下:

1
2
3
4
5
6
7
8
9
message WatchResponse {
  ResponseHeader header = 1;
  int64 watch_id = 2;
  bool created = 3;
  bool canceled = 4;
  int64 compact_revision = 5;

  repeated mvccpb.Event events = 11;
}
  • Watch_ID - 和watch相关的watcher ID;
  • Created - 如果请求是WatchCreateRequest,则这个值为true,所有发送给同一个watch的event都带有同样的watch_id;
  • Canceled - 如果请求是WatchCancelRequest,则这个值为true,这个Response之后watcher不会再收到任何response;
  • Compact_Revision - 如果watcher试图观察一个旧的不存在的revision时候,server会返回当前存在的最小的有效revision。如果watcher根不是server发出的watch通知的时候,server会发出这个通知并断开watch连接。
  • Events - 针对同一个watch ID返回的一批有序的event集合。

如果一个watcher想停止watch,则可以发出如下请求:

1
2
3
message WatchCancelRequest {
   int64 watch_id = 1;
}
  • Watch_ID - 要取消的watcher的ID,server后面就不会再更多的event。

举例

Watch 用于监测一个键空间中发生的变化。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main

import (
	"fmt"
	"log"
	"time"

	"golang.org/x/net/context"

	"github.com/coreos/etcd/clientv3"
)

var (
	dialTimeout    = 5 * time.Second
	requestTimeout = 2 * time.Second
	endpoints      = []string{"localhost:2379"}
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	key1, value1 := "key1", `value1`

	go func() {
		rch := cli.Watch(context.Background(), "", clientv3.WithPrefix())
		for wresp := range rch {
			for _, ev := range wresp.Events {
				fmt.Printf("Watch: %s %q: %q \n", ev.Type, ev.Kv.Key, ev.Kv.Value)
			}
		}
	}()

	if resp, err := cli.Put(context.TODO(), key1, value1); err != nil {
		log.Fatal(err)
	} else {
		log.Println(resp)
	}
}

获取到的输出如下:

1
2
Watch: PUT "key1": "value1"
2018/05/10 22:38:35 &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:125 raft_term:2  <nil>}

注意:你本地的代码可能获取不到 Watch 的输出,这可能是因为主协程已经退出。

假设你在使用 etcd 存储一些配置,就可以使用 Watch 功能在配置发生变更的时候得到通知。

计数器

etcd v3 并没有直接的方式来提供自增操作,但我们可以使用 key 的 Version 字段来间接实现一个自增计数器。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

import (
	"fmt"
	"log"
	"sync"
	"time"

	"golang.org/x/net/context"

	"github.com/coreos/etcd/clientv3"
)

var (
	dialTimeout    = 5 * time.Second
	requestTimeout = 2 * time.Second
	endpoints      = []string{"localhost:2379"}
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	// incr, 可以使用key的 Version 字段
	var w2 sync.WaitGroup
	w2.Add(100)

	for i := 0; i < 100; i++ {
		go func() {
			defer w2.Done()
			resp, err := cli.Put(context.TODO(), "keyincr", "", clientv3.WithPrevKV())
			if err != nil {
				fmt.Println(err)
			} else {
				if resp.PrevKv != nil {
					// fmt.Println(resp.PrevKv.Version)
				}
			}
		}()
	}
	w2.Wait()
	if resp, err := cli.Get(context.TODO(), "keyincr"); err != nil {
		fmt.Println(err)
	} else {
		fmt.Println(resp.Kvs[0].Version)
	}
}

得到的输出如下:

1
100

但是这种方式只能得到一个每次自增1的计数器,无法控制自增的步长,也不能进行自减操作。如果需要更多的操作,可以使用事务来实现,但是这种方式的并发自增的时候可能经常导致失败。

Range

Get和Put

请求定义

Get

etcd允许一次以range形式操作多个key。etcd对数据的组织不像zookeeper那样以目录层次结构的方式进行,而只有一个层级,range的形式是[a, b),即[key, keyend)。如果keyend为空则请求只有key;如果range是[key, key+0x1)则是请求以key为前缀的所有key;如果key_end是’\0’,则请求所有大于等于key的所有key。

Range请求定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
message RangeRequest {
  enum SortOrder {
    NONE = 0; // default, no sorting
    ASCEND = 1; // lowest target value first
    DESCEND = 2; // highest target value first
  }
  enum SortTarget {
    KEY = 0;
    VERSION = 1;
    CREATE = 2;
    MOD = 3;
    VALUE = 4;
  }

  bytes key = 1;
  bytes range_end = 2;
  int64 limit = 3;
  int64 revision = 4;
  SortOrder sort_order = 5;
  SortTarget sort_target = 6;
  bool serializable = 7;
  bool keys_only = 8;
  bool count_only = 9;
  int64 min_mod_revision = 10;
  int64 max_mod_revision = 11;
  int64 min_create_revision = 12;
  int64 max_create_revision = 13;
}

各个字段含义如下:

  • Key, Range_End - key range;
  • Limit - 返回key的数目的最大值,如果为0则说明没有限制;
  • Revision - key修改的时间点(point-in-time),如果其值为0则是获取最新的kv,如果指定的revision已经被compact掉则etcd返回* ErrCompacted错误;
  • Sort_Order - 请求的排序方式;
  • Sort_Target - kv的排序方式;
  • Serializable - sets the range request to use serializable member-local reads. By default, Range is linearizable; * it reflects the current consensus of the cluster. For better performance and availability, in exchange for * possible stale reads, a serializable range request is served locally without needing to reach consensus with * other nodes in the cluster.
  • Keys_Only - 只返回key,无需返回Value;
  • Count_Only - 只返回range内key的数目;
  • Min_Mod_Revision - 最低mod revision值,Mod_Revision低于这个值的kv会被过滤掉;
  • Max_Mod_Revision - 最大mod revision值,Mod_Revision高于这个值的kv会被过滤掉;
  • Min_Create_Revision - 最低create revision值,Mod_Revision低于这个值的kv会被过滤掉;
  • Max_Create_Revision - 最高create revision值,Mod_Revision高于这个值的kv会被过滤掉。

Range请求的响应定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
message ResponseHeader {
  uint64 cluster_id = 1;
  uint64 member_id = 2;
  int64 revision = 3;
  uint64 raft_term = 4;
}

message RangeResponse {
  ResponseHeader header = 1;
  repeated mvccpb.KeyValue kvs = 2;
  bool more = 3;
  int64 count = 4;
}

各个字段含义如下:

  • Cluster_ID - etcd cluster ID;
  • Member_ID - 返回响应的cluster member的ID;
  • Revision - 获取当前系统最新的kv Revision;
  • Raft_Term - 这个字段可用于检测当前集群是否已经选举出一个新的leader;
  • Kvs - 请求返回结果,如果Count_Only为true,则这个结果为空;
  • More - 是否有更多值,如果limit为true;
  • Count - Count_Only为true时候的结果。

我们通过一个特别的Get选项,获取/test目录下的所有孩子:

1
rangeResp, err := kv.Get(context.TODO(), "/test/", clientv3.WithPrefix())

WithPrefix()是指查找以/test/为前缀的所有key,因此可以模拟出查找子目录的效果。

我们知道etcd是一个有序的k-v存储,因此/test/为前缀的key总是顺序排列在一起。

withPrefix实际上会转化为范围查询,它根据前缀/test/生成了一个key range,[“/test/”, “/test0”),为什么呢?因为比/大的字符是’0’,所以以/test0作为范围的末尾,就可以扫描到所有的/test/打头的key了。

在之前,我Put了一个/testxxx干扰项,因为不符合/test/前缀(注意末尾的/),所以就不会被这次Get获取到。但是,如果我查询的前缀是/test,那么/testxxx也会被扫描到,这就是etcd k-v模型导致的,编程时一定要特别注意。

Put

PutReqeust定义如下:

1
2
3
4
5
6
7
8
message PutRequest {
  bytes key = 1;
  bytes value = 2;
  int64 lease = 3;
  bool prev_kv = 4;
  bool ignore_value = 5;
  bool ignore_lease = 6;
}

各个字段含义如下:

  • Key - KV对的key;
  • Value - KV对的value;
  • Lease - KV对的超时lease ID,默认值为0;
  • Prev_kv - 如果为true,则response会返回update前的kv值;
  • Ignore_Value - 不更新当前key的value,当key不存在的时候返回一个error;
  • Ignore_Lease - 不更新key的lease,当key不存在的时候返回一个error。

响应定义如下:

1
2
3
4
message PutResponse {
  ResponseHeader header = 1;
  mvccpb.KeyValue prev_kv = 2;
}

prev_kv:Reqeuest中的 prev_kv 被设置为true的时候,这个结果就是update前的kv值;

举例

一个简单的示例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package main

import (
	"fmt"
	"log"
	"time"

	"golang.org/x/net/context"

	"github.com/coreos/etcd/clientv3"
)

var (
	dialTimeout    = 5 * time.Second
	requestTimeout = 2 * time.Second
	endpoints      = []string{"localhost:2379"}
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	go func() {
		rch := cli.Watch(context.Background(), "", clientv3.WithPrefix())
		for wresp := range rch {
			for _, ev := range wresp.Events {
				fmt.Printf("Watch: %s %q: %q \n", ev.Type, ev.Kv.Key, ev.Kv.Value)
			}
		}
	}()

	kvs := map[string]string{
		"key1":  "value1",
		"key10": "value10",
		"key5":  "value5",
		"keyk":  "valuek",
		"key2":  "value2",
	}
	for k, v := range kvs {
		cli.Put(context.TODO(), k, v)
	}

	if resp, err := cli.Get(context.TODO(), "key1", clientv3.WithRange("keyk")); err != nil {
		log.Fatal(err)
	} else {
		log.Println(resp)
	}
}

获取到的输出如下:

1
2
3
4
5
6
Watch: PUT "keyk": "valuek"
Watch: PUT "key2": "value2"
Watch: PUT "key1": "value1"
Watch: PUT "key10": "value10"
Watch: PUT "key5": "value5"
2018/05/11 11:34:02 &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:21 raft_term:2  [key:"key1" create_revision:2 mod_revision:19 version:4 value:"value1"  key:"key10" create_revision:3 mod_revision:20 version:4 value:"value10"  key:"key2" create_revision:6 mod_revision:18 version:4 value:"value2"  key:"key5" create_revision:4 mod_revision:21 version:4 value:"value5" ] false 4}

可以看到通过指定 startKey 为 key1 以及 endKey 为 keyl,我们查询除了在 [startKey, endKey) 范围内的数据。

Delete

请求定义

删除则可以删除一定范围内的kv对,请求定义如下:

1
2
3
4
5
message DeleteRangeRequest {
  bytes key = 1;
  bytes range_end = 2;
  bool prev_kv = 3;
}
  • Key, Range_End - key range;
  • Prev_kv - 如果设置为true,则返回删除前的kv结果;

响应定义如下:

1
2
3
4
5
message DeleteRangeResponse {
  ResponseHeader header = 1;
  int64 deleted = 2;
  repeated mvccpb.KeyValue prev_kvs = 3;
}
  • Deleted - 被删除的kv数目;
  • Prev_kv - 如果请求中的prev_kv被设为true,则响应中就返回被删除的kv值数组;

举例

Delete 操作也可以通过指定一个 endKey 来删除多个 key:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package main

import (
	"fmt"
	"log"
	"time"

	"golang.org/x/net/context"

	"github.com/coreos/etcd/clientv3"
)

var (
	dialTimeout    = 5 * time.Second
	requestTimeout = 2 * time.Second
	endpoints      = []string{"localhost:2379"}
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	ch := make(chan struct{})
	go func() {
		rch := cli.Watch(context.Background(), "", clientv3.WithPrefix())
		for wresp := range rch {
			for _, ev := range wresp.Events {
				fmt.Printf("Watch: %s %q: %q \n", ev.Type, ev.Kv.Key, ev.Kv.Value)
			}
		}
		ch <- struct{}{}
	}()

	kvs := map[string]string{
		"key1":  "value1",
		"key10": "value10",
		"key5":  "value5",
		"keyk":  "valuek",
		"key2":  "value2",
	}
	for k, v := range kvs {
		cli.Put(context.TODO(), k, v)
	}

	if resp, err := cli.Delete(context.TODO(), "key1", clientv3.WithRange("keyk")); err != nil {
		log.Fatal(err)
	} else {
		log.Println(resp)
	}

	select {
	case <-ch:
	case <-time.After(5 * time.Second):
	}
}

获取的输出如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
Watch: PUT "key1": "value1"
Watch: PUT "key10": "value10"
Watch: PUT "key5": "value5"
Watch: PUT "keyk": "valuek"
Watch: PUT "key2": "value2"
Watch: DELETE "key1": ""
2018/05/11 11:39:24 &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:43 raft_term:2  4 []}
Watch: DELETE "key10": ""
Watch: DELETE "key2": ""
Watch: DELETE "key5": ""

从 Watch 的输出可以看到,在 [“key1”, “keyk”) 范围内的 4 个 key 都被删除了。

Pagination

etcd v3 中可以通过 range 来获取多个 key 的数据并同时指定排序,而且可以用 limit 来限制返回的数量。这非常像经常看到的分页功能,但是 etcd v3 并没有提供 offset 操作。

一个简单的示例程序如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package main

import (
	"fmt"
	"log"
	"time"

	"golang.org/x/net/context"

	"github.com/coreos/etcd/clientv3"
)

var (
	dialTimeout    = 5 * time.Second
	requestTimeout = 2 * time.Second
	endpoints      = []string{"localhost:2379"}
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	ch := make(chan struct{})
	go func() {
		rch := cli.Watch(context.Background(), "", clientv3.WithPrefix())
		for wresp := range rch {
			for _, ev := range wresp.Events {
				fmt.Printf("Watch: %s %q: %q \n", ev.Type, ev.Kv.Key, ev.Kv.Value)
			}
		}
		ch <- struct{}{}
	}()

	kvs := map[string]string{
		"key1":  "value1",
		"key10": "value10",
		"key5":  "value5",
		"keyk":  "valuek",
		"key2":  "value2",
	}
	for k, v := range kvs {
		cli.Put(context.TODO(), k, v)
	}

	startKey := "key1"
	endKey := "keyk"
	for {
		if resp, err := cli.Get(context.TODO(), startKey, clientv3.WithRange(endKey), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), clientv3.WithLimit(1)); err != nil {
			log.Fatal(err)
			break
		} else {
			items := len(resp.Kvs)
			if items == 0 {
				break
			}
			startKey = string(resp.Kvs[items-1].Key) + "\x00"

			fmt.Println(resp)
		}

	}

	select {
	case <-ch:
	case <-time.After(5 * time.Second):
	}
}

获取的输出如下:

1
2
3
4
5
6
7
8
9
Watch: PUT "key1": "value1"
Watch: PUT "key10": "value10"
Watch: PUT "key5": "value5"
Watch: PUT "keyk": "valuek"
Watch: PUT "key2": "value2"
&{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:58 raft_term:2  [key:"key1" create_revision:47 mod_revision:54 version:3 value:"value1" ] true 4}
&{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:58 raft_term:2  [key:"key10" create_revision:48 mod_revision:55 version:3 value:"value10" ] true 3}
&{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:58 raft_term:2  [key:"key2" create_revision:46 mod_revision:58 version:3 value:"value2" ] true 2}
&{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:58 raft_term:2  [key:"key5" create_revision:44 mod_revision:56 version:3 value:"value5" ] false 1}

从输出可以看到,我们分为 4 次获取了 range 内的数据,相当于进行了分页。

注意:这种实现方式有一个缺点,即当 sort 的字段不是 key 时,该分页方式无效。

Lease

请求定义

Lease提供了对租约的支持。cluster保证了lease时间内kv的有效性,当lease到期而客户端没有对lease进行续约时,lease就超时了。每个kv只能绑定到一个lease之上,当lease超时后,相关的所有kv都会被删除,每个key的每个watcher都会收到delete event。

创建一个lease请求体如下:

1
2
3
4
message LeaseGrantRequest {
  int64 TTL = 1;
  int64 ID = 2;
}
  • TTL - 一个以秒为单位的超时时间;
  • ID - Lease ID,如果值为0,则etcd会进行赋值。

server创建lease成功后,会返回如下的响应:

1
2
3
4
5
message LeaseGrantResponse {
  ResponseHeader header = 1;
  int64 ID = 2;
  int64 TTL = 3;
}
  • ID - etcd为lease分配的ID;
  • TTL - 以秒为单位的lease时间;

撤销租约请求如下:

1
2
3
message LeaseRevokeRequest {
  int64 ID = 1;
}
  • ID - 将要撤销的lease ID,请求成功后,所有ID相关的key都会被删除。

如果客户端想要对一个lease进行续约,可以发出如下请求:

1
2
3
message LeaseKeepAliveRequest {
  int64 ID = 1;
}
  • ID - 续约的lease ID。

应答消息体定义如下:

1
2
3
4
5
message LeaseKeepAliveResponse {
  ResponseHeader header = 1;
  int64 ID = 2;
  int64 TTL = 3;
}
  • ID - 续约的ID;

  • TTL - 剩余的TTL,以秒为单位。 github.com/coreos/etcd/clientv3/lease.go:Lease 接口提供了以下一些功能函数:

  • Grante: 创建一个 lease 对象;

  • Revoke: 释放一个 lease 对象;

  • TimeToLive: 获取 lease 剩余的 TTL 时间;

  • Leases: 列举 etcd 中的所有 lease;

  • KeepAlive: 自动定时对 lease 续约;

  • KeepAliveOnce: 为 lease 续约一次,代码注释中说大部分情况下都应该使用 KeepAlive;

  • Close: 关闭当前客户端建立的所有 lease;

Put 函数和 KeepAlive 函数都有一个 Lease 对象,如果在进行 Put 或者 KeepAlive 之前 Lease 已经过期,则 etcd 会返回 error。

创建租约

在 etcd v3 中,你可以先创建一个 Lease(租约), 一个 Lease 代表一个 ttl,然后将一个 key 关联到该 Lease。当 Lease 超时之后所关联的 key 会被自动删除。 一个简单的示例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package main

import (
	"log"
	"time"

	"golang.org/x/net/context"

	"github.com/coreos/etcd/clientv3"
)

var (
	dialTimeout    = 5 * time.Second
	requestTimeout = 2 * time.Second
	endpoints      = []string{"localhost:2379"}
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	ch := make(chan struct{})
	go func() {
		rch := cli.Watch(context.Background(), "", clientv3.WithPrefix())
		for wresp := range rch {
			for _, ev := range wresp.Events {
				log.Printf("Watch: %s %q: %q \n", ev.Type, ev.Kv.Key, ev.Kv.Value)
			}
		}
		ch <- struct{}{}
	}()

	resp, _ := cli.Grant(context.TODO(), 2)

	kvs := map[string]string{
		"key1": "value1",
		"key2": "value2",
		"key3": "value3",
		"key4": "value4",
	}
	for k, v := range kvs {
		cli.Put(context.TODO(), k, v, clientv3.WithLease(resp.ID))
	}

	cli.Put(context.TODO(), "key2", "value2new")
	cli.Put(context.TODO(), "key3", "value3new", clientv3.WithIgnoreLease())
	resp, _ = cli.Grant(context.TODO(), 4)
	cli.Put(context.TODO(), "key4", "value4new", clientv3.WithLease(resp.ID))

	select {
	case <-ch:
	case <-time.After(5 * time.Second):
	}
}

获取的输出如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
2018/05/11 13:37:48 Watch: PUT "key4": "value4"
2018/05/11 13:37:48 Watch: PUT "key1": "value1"
2018/05/11 13:37:48 Watch: PUT "key2": "value2"
2018/05/11 13:37:48 Watch: PUT "key3": "value3"
2018/05/11 13:37:48 Watch: PUT "key2": "value2new"
2018/05/11 13:37:48 Watch: PUT "key3": "value3new"
2018/05/11 13:37:48 Watch: PUT "key4": "value4new"
2018/05/11 13:37:50 Watch: DELETE "key1": ""
2018/05/11 13:37:50 Watch: DELETE "key3": ""
2018/05/11 13:37:52 Watch: DELETE "key4": ""

在上例中,我们通过 Grant 函数来创建一个 Lease,并在 Put 操作中将 key 关联到 LeaseID。

租约续约

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
   var (
       keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
       keepResp *clientv3.LeaseKeepAliveResponse
   )
//租约续租
if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseId); err != nil {
	fmt.Println(err)
	return
}
//消费keepRespChan
go func() {
	for  {
		select {
		case keepResp = <-keepRespChan:
			if keepRespChan == nil {
				fmt.Println("租约已经失效了")
				goto END
			} else {	// 每秒会续租一次, 所以就会受到一次应答
				fmt.Println("收到自动续租应答:", keepResp.ID)
			}
		}
	}
	END:
}()

打印结果:

撤销租约

撤销租约会使当前租约的所关联的key-value失效

1
2
3
4
5
6
7
//撤销租约
_, err = lease.Revoke(context.TODO(), leaseID)
    if err != nil {
        fmt.Printf("撤销租约失败:%s\n",err.Error())
        os.Exit(RevokeErr)
    }
fmt.Printf("撤销租约成功")

op操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
   var (
	config clientv3.Config
	client *clientv3.Client
	err error
	kv clientv3.KV
	op clientv3.Op
	opResp clientv3.OpResponse
)

// 客户端配置
config = clientv3.Config{
	Endpoints: []string{"127.0.0.1:2379"},
	DialTimeout: 5 * time.Second,
}

// 建立连接
if client, err = clientv3.New(config); err != nil {
	fmt.Println(err)
	return
}

// KV
kv = clientv3.NewKV(client)

op = clientv3.OpPut("/cron/jobs/job4", "opPut")
if opResp, err = kv.Do(context.TODO(), op); err != nil {
	log.Printf("clientv3.OpPut error:", err)
}

fmt.Println("写入Revision:", opResp.Put().Header.Revision)

op = clientv3.OpGet("/cron/jobs/job4")
if opResp, err = kv.Do(context.TODO(), op); err != nil {
	log.Printf("clientv3.OpGet error:", err)
}

fmt.Println("读取Revision:", opResp.Get().Kvs[0].ModRevision)
fmt.Println("读取Value:", string(opResp.Get().Kvs[0].Value))

打印结果:

参考:
https://enpsl.top/2019/01/05/2019-01-05-golang-etcd/
https://zhuanlan.zhihu.com/p/36700832
https://zhuanlan.zhihu.com/p/36719209
https://www.beikejiedeliulangmao.top/etcd/ https://alexstocks.github.io/html/etcd.html