ETCD
ETCD是用于共享配置和服务发现的分布式,一致性的KV存储系统。ETCD是CoreOS公司发起的一个开源项目,授权协议为Apache。
核心特性:
- 将数据存储在集群中的高可用kv存储
- 允许应用实时监控kv变化
- 能够容忍单点故障,能够应对网络分区
复杂特性:
- 底层存储是按照key有序排列的,可以顺序遍历
- 因为key有序,所以etcd天然支持按目录结果高效遍历
- 支持复杂事务,提供if…then…else的事务能力
- 基于租约机制实现key的TTL过期
客户端连接实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
var (
config clientv3.Config
client *clientv3.Client
err error
)
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"}, // 集群列表
DialTimeout: 5 * time.Second,
}
if client, err = clientv3.New(config); err != nil {
log.Printf("clientv3.New error:", err)
return
}
|
api介绍
kv操作
1
2
3
4
5
6
7
8
9
10
11
12
|
type KV interface {
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
Do(ctx context.Context, op Op) (OpResponse, error)
Txn(ctx context.Context) Txn
}
|
关于租约
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
type Lease interface {
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
Close() error
}
|
使用方法
KeyValue 请求定义
etcd大部分API都是对KV对的请求和操作。etcd kv的protobuf定义如下:
1
2
3
4
5
6
7
8
|
message KeyValue {
bytes key = 1;
int64 create_revision = 2;
int64 mod_revision = 3;
int64 version = 4;
bytes value = 5;
int64 lease = 6;
}
|
各个字段意义如下:
- key是字节数组,不可为空;
- value也是字节数组;
- version则是key的版本,一个删除动作会把这个值清零,每次更新则会让其值增一;
- Create_Revision key创建时候的revision;
- Mod_Revision key最近一次修改时的revision;
- Lease 与key关联的Lease,如果其值为0则说明没有关联的Lease;
revision是MVCC中的概念,是etcd中cluster级别的计数器,每次修改操作都会让其自增,可以认为是全局逻辑时钟(global logical clock),对所有修改操作进行排序:revision越大说明其值越新,etcd对key索引使用B+树方式进行组织。etcd每个key都有很多revision(修订版本),每次事务操作都会创建一个revision,老的revision在etcd进行compaction操作的时候会被清除。createrevision会在使用mutex lock的时候使用,ModRevision与事务操作和leader选举有关。
revision也与watch有关,当watch的client与server闪断重连后,etcd根据client上次watch相关的revision,把其后的修改再通知给client。
存储值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
package main
import (
"log"
"time"
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
)
var (
dialTimeout = 5 * time.Second
requestTimeout = 2 * time.Second
endpoints = []string{"localhost:2379"}
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// 设置 key1 的值为 value1
key1, value1 := "key1", `value1`
if resp, err := cli.Put(context.TODO(), key1, value1); err != nil {
log.Fatal(err)
} else {
log.Println(resp)
}
// 设置 key1 的值为 value2, 并返回前一个值
value2 := "value2"
if resp, err := cli.Put(context.TODO(), key1, value2, clientv3.WithPrevKV()); err != nil {
log.Fatal(err)
} else {
log.Println(resp)
}
}
|
获取到的输出如下:
1
2
|
2018/05/10 22:00:05 &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:113 raft_term:2 <nil>}
2018/05/10 22:00:05 &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:114 raft_term:2 key:"key1" create_revision:113 mod_revision:113 version:1 value:"value1" }
|
我们看到返回的值中除了 key 和 value 之外,还包括三个字段,它们的定义如下:
- Version:每次对这个 key 进行修改后增加1,删除这个key之后从0开始
- Create_Revision:最后一次创建这个 key 的版本号
- Mod_Revision:最后一次修改这个 key 的版本号
设置值时还可以使用很多的选项,例如 Lease 等。
读取值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
package main
import (
"log"
"time"
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
)
var (
dialTimeout = 5 * time.Second
requestTimeout = 2 * time.Second
endpoints = []string{"localhost:2379"}
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
key1, value1 := "key1", `value1`
_ = value1
// 读取一个已存在的值
if resp, err := cli.Get(context.TODO(), key1); err != nil {
log.Fatal(err)
} else {
log.Println(resp)
}
// 读取一个不存在的值
if resp, err := cli.Get(context.TODO(), key1+key1); err != nil {
log.Fatal(err)
} else {
log.Println(resp)
}
}
|
获取到的输出如下:
1
2
|
2018/05/10 22:05:25 &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:114 raft_term:2 [key:"key1" create_revision:113 mod_revision:114 version:2 value:"value2" ] false 1}
2018/05/10 22:05:25 &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:114 raft_term:2 [] false 0}
|
返回的数据定义如下:
- kvs:返回的 key 的列表
- more:如果指定了 limit,则代表是否还有更多的字段
- count:符合 range 查询的总数
删除值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
package main
import (
"fmt"
"log"
"time"
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
)
var (
dialTimeout = 5 * time.Second
requestTimeout = 2 * time.Second
endpoints = []string{"localhost:2379"}
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
key1, value1 := "key1", `value1`
_ = value1
// 删除一个存在的 key
if resp, err := cli.Delete(context.Background(), key1); err != nil {
fmt.Println(err)
} else {
fmt.Println(resp)
}
// 删除一个不存在的 key
if resp, err := cli.Delete(context.Background(), "key10"); err != nil {
fmt.Println(err)
} else {
fmt.Println(resp)
}
}
|
获取到的输出如下:
1
2
|
&{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:118 raft_term:2 1 []}
&{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:118 raft_term:2 0 []}
|
返回的数据定义如下:
- deleted:删除的数量
- prev_kv:删除的 key
Transaction
请求定义
在一个事务请求内完成),能够防止意外的并行更新,构建原子的compare-and-swap操作,提供了一种更高级的并行控制能力。
事务内revision只增加一次,但是一个事务内对一个kv的写操作只能进行一次。事务要么成功,要么失败,没有中间状态,
事务操作可以认为是一个比较操作链,每个比较动作定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
message Compare {
enum CompareResult {
EQUAL = 0;
GREATER = 1;
LESS = 2;
NOT_EQUAL = 3;
}
enum CompareTarget {
VERSION = 0;
CREATE = 1;
MOD = 2;
VALUE= 3;
}
CompareResult result = 1;
// target is the key-value field to inspect for the comparison.
CompareTarget target = 2;
// key is the subject key for the comparison operation.
bytes key = 3;
oneof target_union {
int64 version = 4;
int64 create_revision = 5;
int64 mod_revision = 6;
bytes value = 7;
}
}
|
- Result - 逻辑比较类型,如相等、小于或者大于;
- Target - 有待被比较的kv的某个字段,如key的version、创建 revision、修改revision或者value;
- Key - 用于比较操作的key;
- Target_Union - 附带比较对象,如给定的key的版本、给定key的创建revision、最后的修改revision和key的value。
定义了比较算子后,事务请求还需要一连串的子请求操作,定义如下:
1
2
3
4
5
6
7
8
|
message RequestOp {
// request is a union of request types accepted by a transaction.
oneof request {
RangeRequest request_range = 1;
PutRequest request_put = 2;
DeleteRangeRequest request_delete_range = 3;
}
}
|
- Request_Range - 一个RangeRequest;
- Request_Put - 一个PutRequest,keys中每个key都必须唯一不能重复;
- Request_Delete_Range - 一个DeleteRangeRequest,其操作的key也必须在整个事务中唯一。
最终事务请求定义如下:
1
2
3
4
5
|
message TxnRequest {
repeated Compare compare = 1;
repeated RequestOp success = 2;
repeated RequestOp failure = 3;
}
|
- Compare - 一个比较算子序列;
- Success - 如果比较成功,则处理这个请求对象序列,响应的结果就是对这些子请求处理的结果;
- Failure - 如果比较失败,则处理这个请求对象序列,响应的结果就是对这些子请求处理的结果。
事务响应定义如下:
1
2
3
4
5
|
message TxnResponse {
ResponseHeader header = 1;
bool succeeded = 2;
repeated ResponseOp responses = 3;
}
|
- Succeeded - 算子比较的结果,success则为true,fail则为false;
- Responses - 对所有子请求的处理结果。
ResponseOp定义如下:
1
2
3
4
5
6
7
|
message ResponseOp {
oneof response {
RangeResponse response_range = 1;
PutResponse response_put = 2;
DeleteRangeResponse response_delete_range = 3;
}
}
|
ResponseOp的成员与RequestOp对应,此处就不再一一列举解释了。
举例
etcd v3 支持 If/Then/Else 形式的多键事务.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
package main
import (
"fmt"
"log"
"sync"
"time"
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
)
var (
dialTimeout = 5 * time.Second
requestTimeout = 2 * time.Second
endpoints = []string{"localhost:2379"}
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
key1, value1 := "key1", `value1`
_ = key1
_ = value1
// setnx
var w sync.WaitGroup
w.Add(10)
key10 := "setnx"
for i := 0; i < 10; i++ {
go func(i int) {
time.Sleep(5 * time.Millisecond)
_, err := cli.Txn(context.Background()).
If(clientv3.Compare(clientv3.CreateRevision(key10), "=", 0)).
Then(clientv3.OpPut(key10, fmt.Sprintf("%d", i))).
Commit()
if err != nil {
fmt.Println(err)
}
w.Done()
}(i)
}
w.Wait()
if resp, err := cli.Get(context.TODO(), key10); err != nil {
log.Fatal(err)
} else {
log.Println(resp)
}
}
|
获取到的输出如下:
1
|
2018/05/10 22:28:49 &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:120 raft_term:2 [key:"setnx" create_revision:120 mod_revision:120 version:1 value:"5" ] false 1}
|
通过 key 的 Create_Revision 是否为 0 来判断 key 是否存在。其中 If,Then 以及 Else 分支都可以包含多个操作。返回的数据包含一个 succeeded 字段,当为 true 时代表 If 的值为真。
Watch
原理
etcd v2的Watch API实际上是一个标准的HTTP GET请求,与一般的请求不同的是,它多了一个”?wait=true”的URL参数。当etcd v2的Server看到这个参数的时候,就知道这是一个watch请求,并且不会立即返回response,而是一直会等到被watch的这个key有了更新以后该请求才会返回。
1
|
curl http://127.0.0.1:2379/v2/keys/foo&wait=true
|
值得注意的是,客户端还可以指定版本号来watch。如果客户端指定了版本号,那么服务器端会返回大于该版本号的第一个更新的数据。例如watch的时候可以指定index=7,示例代码如下所示:
1
|
curl http://127.0.0.1:2379/v2/keys/foo?wait=true&waitindex=7'
|
客户端可以指定版本号watch,然而服务器端只保留了最新的1000个变更记录。也就是说,如果客户端指定的版本号,是1000个变更记录之前的,则会watch不到。
etcd v2的watch是基于HTTP的long poll实现的,其请求本质上是一个HTTP1.1的长连接。因此一个watch请求需要维持一个TCP连接。这就导致了服务端需要耗费很多资源用于维持TCP长连接。
watch只能watch某一个key以及其子节点(通过参数recursive设置),一个watch请求不能同时watch多个不同的key。
由于watch的历史记录最多只有1000条,因此很难通过watch机制来实现完整的数据同步(有丢失变更的风险),所以当前的大多数使用方式是通过watch来得知变更,然后通过GET来重新获取数据,并不是完全依赖于watch的变更event。
etcd v3 的watch机制在etcd v2的基础上做了很多改进,一个显著的优化是减小了每个watch所带来的资源消耗,从而能够支持更大规模的watch。首先etcd v3的API采用了gRPC,而gRPC又利用了HTTP/2的TCP链接多路复用(multiple stream per tcp connection),这样同一个Client的不同watch可以共享同一个TCP连接。
etcd会保存每个客户端发来的watch请求,watch请求可以关注一个key (单key),或者一个key前缀(区间),所以watchGroup包含两种Watcher:一种是key Watchers,数据结构是每个key对应一组Watcher,另外一种是range Watchers,数据结构是一个线段树,可以方便地通过区间查找到对应的Watcher。
etcd会有一个线程持续不断地遍历所有的watch请求,每个watch对象都会负责维护其监控的key事件,看其推送到了哪个revision。etcd会根据这个revision.main ID去BoltDB中继续向后遍历,实际上BoltDB类似于leveldb,是一个按key有序排列的Key-Value(K-V)引擎,而BoltDB中的key是由revision.main+revision.sub组成的,所以遍历就会依次经过历史上发生过的所有事务(tx)的记录。
对于遍历经过的每个K-V, etcd会反序列化其中的value,也就是实际etcd存储的Key Value,然后判断其中的key是否为watch请求关注的key,如果是就发送给客户端。
然而每次都对单个的watch对象进行扫描效率太差了,实际上etcd在实现的时候会将watch对象分组,然后根据组内的最小revision去检查,这样一次性可以处理多个watcher,减少扫描次数。
请求定义
Watch API提供了一组基于事件的接口,用于异步获取key的变化后的通知。etcd会把key的每一次变化都通知给观察者,而不像zookeeper那样只通知最近一次的变化。
Event代表了key的一次update,包括update的类型和变化前后的数据,定义如下:
1
2
3
4
5
6
7
8
9
|
message Event {
enum EventType {
PUT = 0;
DELETE = 1;
}
EventType type = 1;
KeyValue kv = 2;
KeyValue prev_kv = 3;
}
|
- Type - event type,PUT则下面会给出新增加的value,DELETE则指出key已被删除;
- KV - KeyValue是event相关的value,如果type是PUT则KV是当前更新后的kv对,如果kv.Version值为1则说明kv是新创建的。如果type是* DELETE,则KV的revision就是delete动作发生时的revision;
- Prev_KV - event动作发生前的kv值,为了节省带宽,如果请求中没有特别指明,这个值内容为空。
Watch是一个长久运行的请求,基于gRPC的stream进行stream数据的传输。
Watch对event作出了如下三项保证:
- 有序 - event依据revision进行排序,一个event投递一次后就不会再次被投递;
- 可靠 - 一系列event通知如a/b/c的发生时间a < b < c,则只会依次收到a、b和c,不会只收到a和c;
- 原子 - 一次操作产生一个revision,server发出一个event,事务的结果也仅仅产生一次event通知。
基于一次gRPC stream连接,可以发出如下watch创建请求:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
message WatchCreateRequest {
bytes key = 1;
bytes range_end = 2;
int64 start_revision = 3;
bool progress_notify = 4;
enum FilterType {
NOPUT = 0;
NODELETE = 1;
}
repeated FilterType filters = 5;
bool prev_kv = 6;
}
|
- Key, Range_End - 被观察的key的range[key, range_end),如果 range_end 没有设置,则只有参数key被观察,如果 range_end 等同于’\0’, 则大于等于参数 key 的所有 key 都将被观察;
- Start_Revision - 观察的起始的revision,如果不设置则是最新的revision;
- Progress_Notify - 如果设置为true,则etcd将定期发送不带任何事件的空WatchResponse。当一个watch连接断开后,客户端进行重连时候会指定开始的revision,server会根据当前系统的负载决定把发送watch event的频率;
- Filters - event过滤器,server给watch客户端发送通知的时候,会先把相关事件过滤掉;
- Prev_kv - 如果设置为true,则被创建的观察者在事件发生前获取上一次的kv,如果上一次的kv在etcd compaction的时候被删除掉,则不会返回任何值。
watch的响应内容定义如下:
1
2
3
4
5
6
7
8
9
|
message WatchResponse {
ResponseHeader header = 1;
int64 watch_id = 2;
bool created = 3;
bool canceled = 4;
int64 compact_revision = 5;
repeated mvccpb.Event events = 11;
}
|
- Watch_ID - 和watch相关的watcher ID;
- Created - 如果请求是WatchCreateRequest,则这个值为true,所有发送给同一个watch的event都带有同样的watch_id;
- Canceled - 如果请求是WatchCancelRequest,则这个值为true,这个Response之后watcher不会再收到任何response;
- Compact_Revision - 如果watcher试图观察一个旧的不存在的revision时候,server会返回当前存在的最小的有效revision。如果watcher根不是server发出的watch通知的时候,server会发出这个通知并断开watch连接。
- Events - 针对同一个watch ID返回的一批有序的event集合。
如果一个watcher想停止watch,则可以发出如下请求:
1
2
3
|
message WatchCancelRequest {
int64 watch_id = 1;
}
|
- Watch_ID - 要取消的watcher的ID,server后面就不会再更多的event。
举例
Watch 用于监测一个键空间中发生的变化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
package main
import (
"fmt"
"log"
"time"
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
)
var (
dialTimeout = 5 * time.Second
requestTimeout = 2 * time.Second
endpoints = []string{"localhost:2379"}
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
key1, value1 := "key1", `value1`
go func() {
rch := cli.Watch(context.Background(), "", clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("Watch: %s %q: %q \n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}()
if resp, err := cli.Put(context.TODO(), key1, value1); err != nil {
log.Fatal(err)
} else {
log.Println(resp)
}
}
|
获取到的输出如下:
1
2
|
Watch: PUT "key1": "value1"
2018/05/10 22:38:35 &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:125 raft_term:2 <nil>}
|
注意:你本地的代码可能获取不到 Watch 的输出,这可能是因为主协程已经退出。
假设你在使用 etcd 存储一些配置,就可以使用 Watch 功能在配置发生变更的时候得到通知。
计数器
etcd v3 并没有直接的方式来提供自增操作,但我们可以使用 key 的 Version 字段来间接实现一个自增计数器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
package main
import (
"fmt"
"log"
"sync"
"time"
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
)
var (
dialTimeout = 5 * time.Second
requestTimeout = 2 * time.Second
endpoints = []string{"localhost:2379"}
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// incr, 可以使用key的 Version 字段
var w2 sync.WaitGroup
w2.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer w2.Done()
resp, err := cli.Put(context.TODO(), "keyincr", "", clientv3.WithPrevKV())
if err != nil {
fmt.Println(err)
} else {
if resp.PrevKv != nil {
// fmt.Println(resp.PrevKv.Version)
}
}
}()
}
w2.Wait()
if resp, err := cli.Get(context.TODO(), "keyincr"); err != nil {
fmt.Println(err)
} else {
fmt.Println(resp.Kvs[0].Version)
}
}
|
得到的输出如下:
但是这种方式只能得到一个每次自增1的计数器,无法控制自增的步长,也不能进行自减操作。如果需要更多的操作,可以使用事务来实现,但是这种方式的并发自增的时候可能经常导致失败。
Range
Get和Put
请求定义
Get
etcd允许一次以range形式操作多个key。etcd对数据的组织不像zookeeper那样以目录层次结构的方式进行,而只有一个层级,range的形式是[a, b),即[key, keyend)。如果keyend为空则请求只有key;如果range是[key, key+0x1)则是请求以key为前缀的所有key;如果key_end是’\0’,则请求所有大于等于key的所有key。
Range请求定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
message RangeRequest {
enum SortOrder {
NONE = 0; // default, no sorting
ASCEND = 1; // lowest target value first
DESCEND = 2; // highest target value first
}
enum SortTarget {
KEY = 0;
VERSION = 1;
CREATE = 2;
MOD = 3;
VALUE = 4;
}
bytes key = 1;
bytes range_end = 2;
int64 limit = 3;
int64 revision = 4;
SortOrder sort_order = 5;
SortTarget sort_target = 6;
bool serializable = 7;
bool keys_only = 8;
bool count_only = 9;
int64 min_mod_revision = 10;
int64 max_mod_revision = 11;
int64 min_create_revision = 12;
int64 max_create_revision = 13;
}
|
各个字段含义如下:
- Key, Range_End - key range;
- Limit - 返回key的数目的最大值,如果为0则说明没有限制;
- Revision - key修改的时间点(point-in-time),如果其值为0则是获取最新的kv,如果指定的revision已经被compact掉则etcd返回* ErrCompacted错误;
- Sort_Order - 请求的排序方式;
- Sort_Target - kv的排序方式;
- Serializable - sets the range request to use serializable member-local reads. By default, Range is linearizable; * it reflects the current consensus of the cluster. For better performance and availability, in exchange for * possible stale reads, a serializable range request is served locally without needing to reach consensus with * other nodes in the cluster.
- Keys_Only - 只返回key,无需返回Value;
- Count_Only - 只返回range内key的数目;
- Min_Mod_Revision - 最低mod revision值,Mod_Revision低于这个值的kv会被过滤掉;
- Max_Mod_Revision - 最大mod revision值,Mod_Revision高于这个值的kv会被过滤掉;
- Min_Create_Revision - 最低create revision值,Mod_Revision低于这个值的kv会被过滤掉;
- Max_Create_Revision - 最高create revision值,Mod_Revision高于这个值的kv会被过滤掉。
Range请求的响应定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
message ResponseHeader {
uint64 cluster_id = 1;
uint64 member_id = 2;
int64 revision = 3;
uint64 raft_term = 4;
}
message RangeResponse {
ResponseHeader header = 1;
repeated mvccpb.KeyValue kvs = 2;
bool more = 3;
int64 count = 4;
}
|
各个字段含义如下:
- Cluster_ID - etcd cluster ID;
- Member_ID - 返回响应的cluster member的ID;
- Revision - 获取当前系统最新的kv Revision;
- Raft_Term - 这个字段可用于检测当前集群是否已经选举出一个新的leader;
- Kvs - 请求返回结果,如果Count_Only为true,则这个结果为空;
- More - 是否有更多值,如果limit为true;
- Count - Count_Only为true时候的结果。
我们通过一个特别的Get选项,获取/test目录下的所有孩子:
1
|
rangeResp, err := kv.Get(context.TODO(), "/test/", clientv3.WithPrefix())
|
WithPrefix()是指查找以/test/为前缀的所有key,因此可以模拟出查找子目录的效果。
我们知道etcd是一个有序的k-v存储,因此/test/为前缀的key总是顺序排列在一起。
withPrefix实际上会转化为范围查询,它根据前缀/test/生成了一个key range,[“/test/”, “/test0”),为什么呢?因为比/大的字符是’0’,所以以/test0作为范围的末尾,就可以扫描到所有的/test/打头的key了。
在之前,我Put了一个/testxxx干扰项,因为不符合/test/前缀(注意末尾的/),所以就不会被这次Get获取到。但是,如果我查询的前缀是/test,那么/testxxx也会被扫描到,这就是etcd k-v模型导致的,编程时一定要特别注意。
Put
PutReqeust定义如下:
1
2
3
4
5
6
7
8
|
message PutRequest {
bytes key = 1;
bytes value = 2;
int64 lease = 3;
bool prev_kv = 4;
bool ignore_value = 5;
bool ignore_lease = 6;
}
|
各个字段含义如下:
- Key - KV对的key;
- Value - KV对的value;
- Lease - KV对的超时lease ID,默认值为0;
- Prev_kv - 如果为true,则response会返回update前的kv值;
- Ignore_Value - 不更新当前key的value,当key不存在的时候返回一个error;
- Ignore_Lease - 不更新key的lease,当key不存在的时候返回一个error。
响应定义如下:
1
2
3
4
|
message PutResponse {
ResponseHeader header = 1;
mvccpb.KeyValue prev_kv = 2;
}
|
prev_kv:Reqeuest中的 prev_kv 被设置为true的时候,这个结果就是update前的kv值;
举例
一个简单的示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
package main
import (
"fmt"
"log"
"time"
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
)
var (
dialTimeout = 5 * time.Second
requestTimeout = 2 * time.Second
endpoints = []string{"localhost:2379"}
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
go func() {
rch := cli.Watch(context.Background(), "", clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("Watch: %s %q: %q \n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}()
kvs := map[string]string{
"key1": "value1",
"key10": "value10",
"key5": "value5",
"keyk": "valuek",
"key2": "value2",
}
for k, v := range kvs {
cli.Put(context.TODO(), k, v)
}
if resp, err := cli.Get(context.TODO(), "key1", clientv3.WithRange("keyk")); err != nil {
log.Fatal(err)
} else {
log.Println(resp)
}
}
|
获取到的输出如下:
1
2
3
4
5
6
|
Watch: PUT "keyk": "valuek"
Watch: PUT "key2": "value2"
Watch: PUT "key1": "value1"
Watch: PUT "key10": "value10"
Watch: PUT "key5": "value5"
2018/05/11 11:34:02 &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:21 raft_term:2 [key:"key1" create_revision:2 mod_revision:19 version:4 value:"value1" key:"key10" create_revision:3 mod_revision:20 version:4 value:"value10" key:"key2" create_revision:6 mod_revision:18 version:4 value:"value2" key:"key5" create_revision:4 mod_revision:21 version:4 value:"value5" ] false 4}
|
可以看到通过指定 startKey 为 key1 以及 endKey 为 keyl,我们查询除了在 [startKey, endKey) 范围内的数据。
Delete
请求定义
删除则可以删除一定范围内的kv对,请求定义如下:
1
2
3
4
5
|
message DeleteRangeRequest {
bytes key = 1;
bytes range_end = 2;
bool prev_kv = 3;
}
|
- Key, Range_End - key range;
- Prev_kv - 如果设置为true,则返回删除前的kv结果;
响应定义如下:
1
2
3
4
5
|
message DeleteRangeResponse {
ResponseHeader header = 1;
int64 deleted = 2;
repeated mvccpb.KeyValue prev_kvs = 3;
}
|
- Deleted - 被删除的kv数目;
- Prev_kv - 如果请求中的prev_kv被设为true,则响应中就返回被删除的kv值数组;
举例
Delete 操作也可以通过指定一个 endKey 来删除多个 key:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
package main
import (
"fmt"
"log"
"time"
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
)
var (
dialTimeout = 5 * time.Second
requestTimeout = 2 * time.Second
endpoints = []string{"localhost:2379"}
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
ch := make(chan struct{})
go func() {
rch := cli.Watch(context.Background(), "", clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("Watch: %s %q: %q \n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
ch <- struct{}{}
}()
kvs := map[string]string{
"key1": "value1",
"key10": "value10",
"key5": "value5",
"keyk": "valuek",
"key2": "value2",
}
for k, v := range kvs {
cli.Put(context.TODO(), k, v)
}
if resp, err := cli.Delete(context.TODO(), "key1", clientv3.WithRange("keyk")); err != nil {
log.Fatal(err)
} else {
log.Println(resp)
}
select {
case <-ch:
case <-time.After(5 * time.Second):
}
}
|
获取的输出如下:
1
2
3
4
5
6
7
8
9
10
|
Watch: PUT "key1": "value1"
Watch: PUT "key10": "value10"
Watch: PUT "key5": "value5"
Watch: PUT "keyk": "valuek"
Watch: PUT "key2": "value2"
Watch: DELETE "key1": ""
2018/05/11 11:39:24 &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:43 raft_term:2 4 []}
Watch: DELETE "key10": ""
Watch: DELETE "key2": ""
Watch: DELETE "key5": ""
|
从 Watch 的输出可以看到,在 [“key1”, “keyk”) 范围内的 4 个 key 都被删除了。
etcd v3 中可以通过 range 来获取多个 key 的数据并同时指定排序,而且可以用 limit 来限制返回的数量。这非常像经常看到的分页功能,但是 etcd v3 并没有提供 offset 操作。
一个简单的示例程序如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
package main
import (
"fmt"
"log"
"time"
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
)
var (
dialTimeout = 5 * time.Second
requestTimeout = 2 * time.Second
endpoints = []string{"localhost:2379"}
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
ch := make(chan struct{})
go func() {
rch := cli.Watch(context.Background(), "", clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("Watch: %s %q: %q \n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
ch <- struct{}{}
}()
kvs := map[string]string{
"key1": "value1",
"key10": "value10",
"key5": "value5",
"keyk": "valuek",
"key2": "value2",
}
for k, v := range kvs {
cli.Put(context.TODO(), k, v)
}
startKey := "key1"
endKey := "keyk"
for {
if resp, err := cli.Get(context.TODO(), startKey, clientv3.WithRange(endKey), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), clientv3.WithLimit(1)); err != nil {
log.Fatal(err)
break
} else {
items := len(resp.Kvs)
if items == 0 {
break
}
startKey = string(resp.Kvs[items-1].Key) + "\x00"
fmt.Println(resp)
}
}
select {
case <-ch:
case <-time.After(5 * time.Second):
}
}
|
获取的输出如下:
1
2
3
4
5
6
7
8
9
|
Watch: PUT "key1": "value1"
Watch: PUT "key10": "value10"
Watch: PUT "key5": "value5"
Watch: PUT "keyk": "valuek"
Watch: PUT "key2": "value2"
&{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:58 raft_term:2 [key:"key1" create_revision:47 mod_revision:54 version:3 value:"value1" ] true 4}
&{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:58 raft_term:2 [key:"key10" create_revision:48 mod_revision:55 version:3 value:"value10" ] true 3}
&{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:58 raft_term:2 [key:"key2" create_revision:46 mod_revision:58 version:3 value:"value2" ] true 2}
&{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:58 raft_term:2 [key:"key5" create_revision:44 mod_revision:56 version:3 value:"value5" ] false 1}
|
从输出可以看到,我们分为 4 次获取了 range 内的数据,相当于进行了分页。
注意:这种实现方式有一个缺点,即当 sort 的字段不是 key 时,该分页方式无效。
Lease
请求定义
Lease提供了对租约的支持。cluster保证了lease时间内kv的有效性,当lease到期而客户端没有对lease进行续约时,lease就超时了。每个kv只能绑定到一个lease之上,当lease超时后,相关的所有kv都会被删除,每个key的每个watcher都会收到delete event。
创建一个lease请求体如下:
1
2
3
4
|
message LeaseGrantRequest {
int64 TTL = 1;
int64 ID = 2;
}
|
- TTL - 一个以秒为单位的超时时间;
- ID - Lease ID,如果值为0,则etcd会进行赋值。
server创建lease成功后,会返回如下的响应:
1
2
3
4
5
|
message LeaseGrantResponse {
ResponseHeader header = 1;
int64 ID = 2;
int64 TTL = 3;
}
|
- ID - etcd为lease分配的ID;
- TTL - 以秒为单位的lease时间;
撤销租约请求如下:
1
2
3
|
message LeaseRevokeRequest {
int64 ID = 1;
}
|
- ID - 将要撤销的lease ID,请求成功后,所有ID相关的key都会被删除。
如果客户端想要对一个lease进行续约,可以发出如下请求:
1
2
3
|
message LeaseKeepAliveRequest {
int64 ID = 1;
}
|
应答消息体定义如下:
1
2
3
4
5
|
message LeaseKeepAliveResponse {
ResponseHeader header = 1;
int64 ID = 2;
int64 TTL = 3;
}
|
-
ID - 续约的ID;
-
TTL - 剩余的TTL,以秒为单位。
github.com/coreos/etcd/clientv3/lease.go:Lease 接口提供了以下一些功能函数:
-
Grante: 创建一个 lease 对象;
-
Revoke: 释放一个 lease 对象;
-
TimeToLive: 获取 lease 剩余的 TTL 时间;
-
Leases: 列举 etcd 中的所有 lease;
-
KeepAlive: 自动定时对 lease 续约;
-
KeepAliveOnce: 为 lease 续约一次,代码注释中说大部分情况下都应该使用 KeepAlive;
-
Close: 关闭当前客户端建立的所有 lease;
Put 函数和 KeepAlive 函数都有一个 Lease 对象,如果在进行 Put 或者 KeepAlive 之前 Lease 已经过期,则 etcd 会返回 error。
创建租约
在 etcd v3 中,你可以先创建一个 Lease(租约), 一个 Lease 代表一个 ttl,然后将一个 key 关联到该 Lease。当 Lease 超时之后所关联的 key 会被自动删除。 一个简单的示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
package main
import (
"log"
"time"
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
)
var (
dialTimeout = 5 * time.Second
requestTimeout = 2 * time.Second
endpoints = []string{"localhost:2379"}
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
ch := make(chan struct{})
go func() {
rch := cli.Watch(context.Background(), "", clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
log.Printf("Watch: %s %q: %q \n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
ch <- struct{}{}
}()
resp, _ := cli.Grant(context.TODO(), 2)
kvs := map[string]string{
"key1": "value1",
"key2": "value2",
"key3": "value3",
"key4": "value4",
}
for k, v := range kvs {
cli.Put(context.TODO(), k, v, clientv3.WithLease(resp.ID))
}
cli.Put(context.TODO(), "key2", "value2new")
cli.Put(context.TODO(), "key3", "value3new", clientv3.WithIgnoreLease())
resp, _ = cli.Grant(context.TODO(), 4)
cli.Put(context.TODO(), "key4", "value4new", clientv3.WithLease(resp.ID))
select {
case <-ch:
case <-time.After(5 * time.Second):
}
}
|
获取的输出如下:
1
2
3
4
5
6
7
8
9
10
|
2018/05/11 13:37:48 Watch: PUT "key4": "value4"
2018/05/11 13:37:48 Watch: PUT "key1": "value1"
2018/05/11 13:37:48 Watch: PUT "key2": "value2"
2018/05/11 13:37:48 Watch: PUT "key3": "value3"
2018/05/11 13:37:48 Watch: PUT "key2": "value2new"
2018/05/11 13:37:48 Watch: PUT "key3": "value3new"
2018/05/11 13:37:48 Watch: PUT "key4": "value4new"
2018/05/11 13:37:50 Watch: DELETE "key1": ""
2018/05/11 13:37:50 Watch: DELETE "key3": ""
2018/05/11 13:37:52 Watch: DELETE "key4": ""
|
在上例中,我们通过 Grant 函数来创建一个 Lease,并在 Put 操作中将 key 关联到 LeaseID。
租约续约
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
var (
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
keepResp *clientv3.LeaseKeepAliveResponse
)
//租约续租
if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseId); err != nil {
fmt.Println(err)
return
}
//消费keepRespChan
go func() {
for {
select {
case keepResp = <-keepRespChan:
if keepRespChan == nil {
fmt.Println("租约已经失效了")
goto END
} else { // 每秒会续租一次, 所以就会受到一次应答
fmt.Println("收到自动续租应答:", keepResp.ID)
}
}
}
END:
}()
|
打印结果:

撤销租约
撤销租约会使当前租约的所关联的key-value失效
1
2
3
4
5
6
7
|
//撤销租约
_, err = lease.Revoke(context.TODO(), leaseID)
if err != nil {
fmt.Printf("撤销租约失败:%s\n",err.Error())
os.Exit(RevokeErr)
}
fmt.Printf("撤销租约成功")
|
op操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
op clientv3.Op
opResp clientv3.OpResponse
)
// 客户端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
// KV
kv = clientv3.NewKV(client)
op = clientv3.OpPut("/cron/jobs/job4", "opPut")
if opResp, err = kv.Do(context.TODO(), op); err != nil {
log.Printf("clientv3.OpPut error:", err)
}
fmt.Println("写入Revision:", opResp.Put().Header.Revision)
op = clientv3.OpGet("/cron/jobs/job4")
if opResp, err = kv.Do(context.TODO(), op); err != nil {
log.Printf("clientv3.OpGet error:", err)
}
fmt.Println("读取Revision:", opResp.Get().Kvs[0].ModRevision)
fmt.Println("读取Value:", string(opResp.Get().Kvs[0].Value))
|
打印结果:
参考:
https://enpsl.top/2019/01/05/2019-01-05-golang-etcd/
https://zhuanlan.zhihu.com/p/36700832
https://zhuanlan.zhihu.com/p/36719209
https://www.beikejiedeliulangmao.top/etcd/
https://alexstocks.github.io/html/etcd.html