为何需要分布式锁

一般我们使用分布式锁有两个场景:

  • 效率:使用分布式锁可以避免不同节点重复相同的工作,这些工作会浪费资源。比如用户付了钱之后有可能不同节点会发出多封短信。
  • 正确性:加分布式锁同样可以避免破坏正确性的发生,如果两个节点在同一条数据上面操作,比如多个节点机器对同一个订单操作不同的流程有可能会导致该笔订单最后状态出现错误,造成损失。

分布式锁的特点

当我们确定了在不同节点上需要分布式锁,那么我们需要了解分布式锁到底应该有哪些特点:

  • 互斥性:和我们本地锁一样互斥性是最基本,但是分布式锁需要保证在不同节点的不同线程的互斥。
  • 可重入性:同一个节点上的同一个线程如果获取了锁之后那么也可以再次获取这个锁。
  • 锁超时:和本地锁一样支持锁超时,防止死锁。
  • 高效,高可用:加锁和解锁需要高效,同时也需要保证高可用防止分布式锁失效,可以增加降级。
  • 支持阻塞和非阻塞:和ReentrantLock一样支持lock和trylock以及tryLock(long timeOut)。
  • 支持公平锁和非公平锁(可选):公平锁的意思是按照请求加锁的顺序获得锁,非公平锁就相反是无序的。这个一般来说实现的比较少。

Mysql实现分布式锁

数据库实现分布式锁一般有三种方式

  • 基于Mysql表做乐观锁,用于分布式锁。(version)
  • 基于Mysql表做悲观锁(InnoDB,for update)
  • 基于Mysql表数据记录做唯一约束(表中记录方法名称)

基于Mysql表数据记录做唯一约束(表中记录方法名称)

要实现分布式锁,最简单的方式可能就是直接创建一张锁表,然后通过操作该表中的数据来实现了。

当我们要锁住某个方法或资源时,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录。

创建这样一张数据库表:

当我们想要锁住某个方法时,执行以下SQL:

因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。

当方法执行完毕之后,想要释放锁的话,需要执行以下Sql:

上面这种简单的实现有以下几个问题:

  1. 这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
  2. 这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
  3. 这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
  4. 这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。

当然,我们也可以有其他方式解决上面的问题。

  1. 数据库是单点?搞两个数据库,数据之前双向同步。一旦挂掉快速切换到备库上。
  2. 没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。
  3. 非阻塞的?搞一个while循环,直到insert成功再返回成功。
  4. 非重入的?在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。

基于Mysql表做悲观锁(InnoDB引擎,for update语句)

除了可以通过增删操作数据表中的记录以外,其实还可以借助数据中自带的锁来实现分布式的锁。

我们还用刚刚创建的那张数据库表。可以通过数据库的排他锁来实现分布式锁。 基于MySql的InnoDB引擎,可以使用以下方法来实现加锁操作:

在查询语句后面增加for update,数据库会在查询过程中给数据库表增加排他锁(这里再多提一句,InnoDB引擎在加锁的时候,只有通过索引进行检索的时候才会使用行级锁,否则会使用表级锁。这里我们希望使用行级锁,就要给method_name添加索引,值得注意的是,这个索引一定要创建成唯一索引,否则会出现多个重载方法之间无法同时被访问的问题。重载方法的话建议把参数类型也加上。)。

当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。

我们可以认为获得排它锁的线程即可获得分布式锁,当获取到锁之后,可以执行方法的业务逻辑,执行完方法之后,再通过以下方法解锁:

1 获取锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public void lock(){
    connection.setAutoCommit(false)
    int count = 0;
    while(count < 4){
        try{
            select * from lock where lock_name=xxx for update;
            if(结果不为空){
                //代表获取到锁
                return;
            }
        }catch(Exception e){

        }
        //为空或者抛异常的话都表示没有获取到锁
        sleep(1000);
        count++;
    }
    throw new LockException();
}

2 释放锁

1
2
3
public void release(){
    connection.commit();
}

这种方法可以有效的解决上面提到的无法释放锁和阻塞锁的问题。

  • 阻塞锁? for update语句会在执行成功后立即返回,在执行失败时一直处于阻塞状态,直到成功。
  • 锁定之后服务宕机,无法释放?使用这种方式,服务宕机之后数据库会自己把锁释放掉

但是还是无法直接解决数据库单点和可重入问题。

这里还可能存在另外一个问题,虽然我们对method_name 使用了唯一索引,并且显示使用for update来使用行级锁。但是,MySql会对查询进行优化,即便在条件中使用了索引字段,但是否使用索引来检索数据是由 MySQL 通过判断不同执行计划的代价来决定的,如果 MySQL 认为全表扫效率更高,比如对一些很小的表,它就不会使用索引,这种情况下 InnoDB 将使用表锁,而不是行锁。(需要使用USE INDEX 强制索引)

还有一个问题,就是我们要使用排他锁来进行分布式锁的lock,那么一个排他锁长时间不提交,就会占用数据库连接。一旦类似的连接变得多了,就可能把数据库连接池撑爆.

基于Mysql资源表做乐观锁,用于分布式锁

大多数是基于数据版本(VERSION)的记录机制实现的。何谓数据版本号?即为数据增加一个版本标识,在基于Mysql表的版本解决方案中,一般是通过为数据库表添加一个 “VERSION”字段来实现读取出数据时,将此版本号一同读出,之后更新时,对此版本号加1。

在更新过程中,会对版本号进行比较,如果是一致的,没有发生改变,则会成功执行本次操作;如果版本号不一致,则会更新失败。

对乐观锁的含义有了一定的了解后,结合具体的例子,我们来推演下我们应该怎么处理:

假设我们有一张资源表,如下图所示: T_RESOURCE , 其中有6个字段ID, RESOOURCE, STATE, ADD_TIME, UPDATE_TIME, VERSION,分别表示表主键、资源、分配状态(1未分配 2已分配)、资源创建时间、资源更新时间、资源数据版本号。

  1. 假设我们现在我们对ID=5780这条数据进行分配,那么非分布式场景的情况下,我们一般先查询出来STATE=1(未分配)的数据,然后从其中选取一条数据可以通过以下语句进行,如果可以更新成功,那么就说明已经占用了这个资源

    1
    
    UPDATE T_RESOURCE SET STATE=2 WHERE STATE=1 AND ID=5780
    
  2. 如果在分布式场景中,由于数据库的UPDATE操作是原子是原子的,其实上边这条语句理论上也没有问题,但是这条语句如果在典型的“ABA”情况下,我们是无法感知的。有人可能会问什么是“ABA”问题呢?大家可以网上搜索一下,这里我说简单一点就是,如果在你第一次SELECT和第二次UPDATE过程中,由于两次操作是非原子的,所以这过程中,如果有一个线程,先是占用了资源(STATE=2),然后又释放了资源(STATE=1),实际上最后你执行UPDATE操作的时候,是无法知道这个资源发生过变化的。也许你会说这个在你说的场景中应该也还好吧,但是在实际的使用过程中,比如银行账户存款或者扣款的过程中,这种情况是比较恐怖的。

那么如果使用乐观锁我们如何解决上边的问题呢?

  1. 先执行SELECT操作查询当前数据的数据版本号,比如当前数据版本号是26:

    1
    
    SELECT ID, RESOURCE, STATE,VERSION FROM T_RESOURCE  WHERE STATE=1 AND ID=5780;
    
  2. 执行更新操作:

    1
    
    UPDATE T_RESOURE SET STATE=2, VERSION=27, UPDATE_TIME=NOW() WHERE RESOURCE=XXXXXX AND STATE=1 AND VERSION=26
    
  3. 如果上述UPDATE语句真正更新影响到了一行数据,那就说明占位成功。如果没有更新影响到一行数据,则说明这个资源已经被别人占位了。

基于Mysql表做乐观锁的一些缺点:

  1. 这种操作方式,使原本一次的UPDATE操作,必须变为2次操作: SELECT版本号一次;UPDATE一次。增加了数据库操作的次数。
  2. 如果业务场景中的一次业务流程中,多个资源都需要用保证数据一致性,那么如果全部使用基于Mysql资源表的乐观锁,就要让每个资源都有一张资源表,这个在实际使用场景中肯定是无法满足的。而且这些都基于Mysql操作,在高并发的要求下,对数据库连接的开销一定是无法忍受的。
  3. 乐观锁机制往往基于系统中的数据存储逻辑,因此可能会造成脏数据被更新到数据库中。在系统设计阶段,我们应该充分考虑到这些情况出现的可能性,并进行相应调整,如将乐观锁策略在数据库存储过程中实现,对外只开放基于此存储过程的数据更新途径,而不是将数据库表直接对外公开。

总结

总结一下使用数据库来实现分布式锁的方式,这两种方式都是依赖数据库的一张表,一种是通过表中的记录的存在情况确定当前是否有锁存在,另外一种是通过数据库的排他锁来实现分布式锁。

优点:

  1. 直接借助数据库,容易理解。

缺点

  1. 会有各种各样的问题,在解决问题的过程中会使整个方案变得越来越复杂。
  2. 操作数据库需要一定的开销,性能问题需要考虑。
  3. 使用数据库的行级锁并不一定靠谱,尤其是当我们的锁表并不大的时候。

Redis实现分布式锁

单节点

基于 setnx()、expire() 方法做分布式锁

  • setnx() setnx 的含义就是 SET if Not Exists,其主要有两个参数 setnx(key, value)。该方法是原子的,如果 key 不存在,则设置当前 key 成功,返回 1;如果当前 key 已经存在,则设置当前 key 失败,返回 0。
  • expire() expire 设置过期时间,要注意的是 setnx 命令不能设置 key 的超时时间,只能通过 expire() 来对 key 设置。

使用步骤:

  1. setnx(lockkey, 1) 如果返回 0,则说明占位失败;如果返回 1,则说明占位成功
  2. expire() 命令对 lockkey 设置超时时间,为的是避免死锁问题。
  3. 执行完业务代码后,可以通过 delete 命令删除 key。
1
2
3
4
5
6
7
8
9
public static void wrongGetLock1(Jedis jedis, String lockKey, String requestId, int expireTime) {

    Long result = jedis.setnx(lockKey, requestId);
    if (result == 1) {
        // 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁
        jedis.expire(lockKey, expireTime);
    }

}

由于这是两条Redis命令,不具有原子性,如果程序在执行完setnx()之后突然崩溃,导致锁没有设置过期时间。那么将会发生死锁。

一种改善方案就是使用Lua脚本(包含SETNX和EXPIRE两条命令),但是如果Redis仅执行了一条命令后crash或者发生主从切换,依然会出现锁没有过期时间,最终导致无法释放。

基于 setnx()、get()、getset()方法做分布式锁

这个方案的背景主要是在 setnx() 和 expire() 的方案上针对可能存在的死锁问题,做了一些优化。

  • getset() 这个命令主要有两个参数 getset(key,newValue)。该方法是原子的,对 key 设置 newValue 这个值,并且返回 key 原来的旧值。假设 key 原来是不存在的,那么多次执行这个命令,会出现下边的效果:

    1
    2
    3
    
    getset(key, "value1") //返回 null 此时 key 的值会被设置为 value1
    getset(key, "value2") //返回 value1 此时 key 的值会被设置为 value2
    //依次类推!
    

使用步骤:

  1. setnx(lockkey, 当前时间+过期超时时间),如果返回 1,则获取锁成功;如果返回 0 则没有获取到锁,转向 2。
  2. get(lockkey) 获取值 oldExpireTime ,并将这个 value 值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向 3。
  3. 计算 newExpireTime = 当前时间+过期超时时间,然后 getset(lockkey, newExpireTime) 会返回当前 lockkey 的值currentExpireTime。
  4. 判断 currentExpireTime 与 oldExpireTime 是否相等,如果相等,说明当前 getset 设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。
  5. 在获取到锁之后,当前线程可以开始自己的业务处理,当处理完毕后,比较自己的处理时间和对于锁设置的超时时间,如果小于锁设置的超时时间,则直接执行 delete 释放锁;如果大于锁设置的超时时间,则不需要再锁进行处理。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static boolean wrongGetLock2(Jedis jedis, String lockKey, int expireTime) {

    long expires = System.currentTimeMillis() + expireTime;
    String expiresStr = String.valueOf(expires);

    // 如果当前锁不存在,返回加锁成功
    if (jedis.setnx(lockKey, expiresStr) == 1) {
        return true;
    }

    // 如果锁存在,获取锁的过期时间
    String currentValueStr = jedis.get(lockKey);
    if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
        // 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间
        String oldValueStr = jedis.getSet(lockKey, expiresStr);
        if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
            // 考虑多线程并发的情况,只有一个线程的设置值和当前值相同,它才有权利加锁
            return true;
        }
    }

    // 其他情况,一律返回加锁失败
    return false;

}

那么这段代码问题在哪里?

  1. 由于是客户端自己生成过期时间,所以需要强制要求分布式下每个客户端的时间必须同步。
  2. 当锁过期的时候,如果多个客户端同时执行getSet()方法,那么虽然最终只有一个客户端可以加锁,但是这个客户端的锁的过期时间可能被其他客户端覆盖。 设想一种场景C1,C2竞争锁,C1获取到了锁,C2锁执行了GETSET操作修改了C1锁的过期时间,如果C1锁的过期时间被延长,其它Client需要等待更久的时间
  3. 锁不具备拥有者标识,即任何客户端都可以解锁。

基于新版 setnx() 方法做分布式锁

1
2
3
4
5
6
tryLock(){
   SETNX Key 1 Seconds
}
release(){
 DELETE Key
}

Redis 2.6.12版本后SETNX增加过期时间参数,这样就解决了两条命令无法保证原子性的问题。但是设想下面一个场景:

  1. C1成功获取到了锁,之后C1因为GC进入等待或者未知原因导致任务执行过长,在锁失效前C1没有主动释放锁
  2. C2在C1的锁超时后获取到锁,并且开始执行,这个时候C1和C2都同时在执行,会因重复执行造成数据不一致等未知情况
  3. C1如果先执行完毕,则会释放C2的锁,此时可能导致另外一个C3进程获取到了锁

大致的流程图

存在问题:

  1. 由于C1的停顿导致C1 和C2同都获得了锁并且同时在执行,在业务实现间接要求必须保证幂等性
  2. C1释放了不属于C1的锁

优化:记录锁的归属

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
tryLock(){
   SET Key UnixTimestamp Seconds
}
release(){
   EVAL(
     //LuaScript
     if redis.call("get",KEYS[1]) == ARGV[1] then
         return redis.call("del",KEYS[1])
     else
         return 0
     end
   )
}

Redis 2.6.12后SET提供了一个NX参数,等同于SETNX命令,官方文档上提醒后面的版本有可能去掉SETNX, SETEX, PSETEX,并用SET命令代替.

这个方案通过指定Value为时间戳,并在释放锁的时候检查锁的Value是否为获取锁的Value,避免了上一版本版本中提到的C1释放了C2持有的锁的问题;另外在释放锁的时候因为涉及到多个Redis操作,并且考虑到Check And Set 模型的并发问题,所以使用Lua脚本来避免并发问题。

那么这段Lua代码的功能是什么呢?

首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)。那么为什么要使用Lua语言来实现呢?因为要确保上述操作是原子性的。

存在问题:

如果在并发极高的场景下,比如抢红包场景,可能存在UnixTimestamp重复问题,另外由于不能保证分布式环境下的物理时钟一致性,也可能存在UnixTimestamp重复问题,只不过极少情况下会遇到。

优化:Timestamp替换为UniqId

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
tryLock(){
   SET Key UniqId Seconds
}
release(){
   EVAL(
     //LuaScript
     if redis.call("get",KEYS[1]) == ARGV[1] then
         return redis.call("del",KEYS[1])
     else
         return 0
     end
   )
}

这段Lua脚本在执行的时候要把前面的UniqId作为ARGV[1]的值传进去,把Key作为KEYS[1]的值传进去。

使用一个自增的唯一UniqId代替时间戳来规避上一版本提到的时钟问题。

优化:共享资源校验锁

即使我们拥有一个完美实现的分布式锁(带自动过期功能),在没有共享资源参与进来提供某种fencing机制的前提下,我们仍然不可能获得足够的安全性。

在上面的时序图中,假设锁服务本身是没有问题的,它总是能保证任一时刻最多只有一个客户端获得锁。上图中出现的lease这个词可以暂且认为就等同于一个带有自动过期功能的锁。客户端1在获得锁之后发生了很长时间的GC pause,在此期间,它获得的锁过期了,而客户端2获得了锁。当客户端1从GC pause中恢复过来的时候,它不知道自己持有的锁已经过期了,它依然向共享资源(上图中是一个存储服务)发起了写数据请求,而这时锁实际上被客户端2持有,因此两个客户端的写请求就有可能冲突(锁的互斥作用失效了)。

初看上去,有人可能会说,既然客户端1从GC pause中恢复过来以后不知道自己持有的锁已经过期了,那么它可以在访问共享资源之前先判断一下锁是否过期。但仔细想想,这丝毫也没有帮助。因为GC pause可能发生在任意时刻,也许恰好在判断完之后。

也有人会说,如果客户端使用没有GC的语言来实现,是不是就没有这个问题呢?Martin指出,系统环境太复杂,仍然有很多原因导致进程的pause,比如虚存造成的缺页故障(page fault),再比如CPU资源的竞争。即使不考虑进程pause的情况,网络延迟也仍然会造成类似的结果。

总结起来就是说,即使锁服务本身是没有问题的,而仅仅是客户端有长时间的pause或网络延迟,仍然会造成两个客户端同时访问共享资源的冲突情况发生。而这种情况其实就是我们在前面已经提出来的“客户端长期阻塞导致锁过期”的那个疑问。

那怎么解决这个问题呢?Martin给出了一种方法,称为fencing token。fencing token是一个单调递增的数字,当客户端成功获取锁的时候它随同锁一起返回给客户端。而客户端访问共享资源的时候带着这个fencing token,这样提供共享资源的服务就能根据它进行检查,拒绝掉延迟到来的访问请求(避免了冲突)。如下图:

在上图中,客户端1先获取到的锁,因此有一个较小的fencing token,等于33,而客户端2后获取到的锁,有一个较大的fencing token,等于34。客户端1从GC pause中恢复过来之后,依然是向存储服务发送访问请求,但是带了fencing token = 33。存储服务发现它之前已经处理过34的请求,所以会拒绝掉这次33的请求。这样就避免了冲突。

fencing token机制本质上是要求客户端在每次访问一个共享资源的时候,在执行任何操作之前,先对资源进行某种形式的“标记”(mark)操作,这个“标记”能保证持有旧的锁的客户端请求(如果延迟到达了)无法操作资源。这种标记操作可以是很多形式,fencing token是其中比较典型的一个。

可以用递增的epoch number(相当于fencing token)来保护共享资源。而对于分布式的资源,为了方便讨论,假设分布式资源是一个小型的多备份的数据存储,执行写操作的时候需要向所有节点上写数据。最简单的做标记的方式,就是在对资源进行任何操作之前,先把epoch number标记到各个资源节点上去。这样,各个节点就保证了旧的(也就是小的)epoch number无法操作数据。

当然,这里再展开讨论下去可能就涉及到了这个数据存储服务的实现细节了。比如在实际系统中,可能为了容错,只要上面讲的标记和写入操作在多数节点上完成就算成功完成了。在这里我们能看到的,最重要的,是这种标记操作如何起作用的方式。这有点类似于Paxos协议(Paxos协议要求每个proposal对应一个递增的数字,执行accept请求之前先执行prepare请求)。random token的方式显然不符合“标记”操作的定义,因为它无法区分新的token和旧的token。只有递增的数字才能确保最终收敛到最新的操作结果上。

在这个分布式数据存储服务(共享资源)的例子中,客户端在标记完成之后执行写入操作的时候,存储服务的节点需要判断epoch number是不是最新,然后确定能不能执行写入操作。如果按照上一节我们的分析思路,这里的epoch判断和接下来的写入操作,是不是在一个原子操作里呢?根据Flavio Junqueira的相关描述,我们相信,应该是原子的。那么既然资源本身可以提供原子互斥操作了,那么分布式锁还有存在的意义吗?应该说有。客户端可以利用分布式锁有效地避免冲突,等待写入机会,这对于包含多个节点的分布式资源尤其有用(当然,是出于效率的原因)。

这个方案是目前最优的分布式锁方案,但是如果在Redis集群环境下依然存在问题:

由于Redis集群数据同步为异步,假设在Master节点获取到锁后未完成数据同步情况下Master节点crash,此时在新的Master节点依然可以获取锁,所以多个Client同时获取到了锁

使用

一个Go的实现:https://github.com/bsm/redislock (基于go-redis)

使用方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import (
  "fmt"
  "time"

  "github.com/bsm/redislock"
  "github.com/go-redis/redis"
)

func main() {
	// Connect to redis.
    client := redis.NewClient(&redis.Options{
		Network:	"tcp",
		Addr:		"127.0.0.1:6379",
	})
	defer client.Close()

	// Create a new lock client.
	locker := redislock.New(client)

	// Try to obtain lock.
	lock, err := locker.Obtain("my-key", 100*time.Millisecond, nil)
	if err == redislock.ErrNotObtained {
		fmt.Println("Could not obtain lock!")
	} else if err != nil {
		log.Fatalln(err)
	}

	// Don't forget to defer Release.
	defer lock.Release()
	fmt.Println("I have a lock!")

	// Sleep and check the remaining TTL.
	time.Sleep(50 * time.Millisecond)
	if ttl, err := lock.TTL(); err != nil {
		log.Fatalln(err)
	} else if ttl > 0 {
		fmt.Println("Yay, I still have my lock!")
	}

	// Extend my lock.
	if err := lock.Refresh(100*time.Millisecond, nil); err != nil {
		log.Fatalln(err)
	}

	// Sleep a little longer, then check.
	time.Sleep(100 * time.Millisecond)
	if ttl, err := lock.TTL(); err != nil {
		log.Fatalln(err)
	} else if ttl == 0 {
		fmt.Println("Now, my lock has expired!")
	}

}

总结

至此,基于单Redis节点的分布式锁的算法就描述完了。这里面有好几个问题需要重点分析一下。

首先第一个问题,这个锁必须要设置一个过期时间。否则的话,当一个客户端获取锁成功之后,假如它崩溃了,或者由于发生了网络分割(network partition)导致它再也无法和Redis节点通信了,那么它就会一直持有这个锁,而其它客户端永远无法获得锁了。antirez在后面的分析中也特别强调了这一点,而且把这个过期时间称为锁的有效时间(lock validity time)。获得锁的客户端必须在这个时间之内完成对共享资源的访问。

第二个问题,第一步获取锁的操作,网上不少文章把它实现成了两个Redis命令:

1
2
SETNX resource_name my_random_value
EXPIRE resource_name 30

虽然这两个命令和前面算法描述中的一个SET命令执行效果相同,但却不是原子的。如果客户端在执行完SETNX后崩溃了,那么就没有机会执行EXPIRE了,导致它一直持有这个锁。

第三个问题,也是antirez指出的,设置一个随机字符串my_random_value是很有必要的,它保证了一个客户端释放的锁必须是自己持有的那个锁。假如获取锁时SET的不是一个随机字符串,而是一个固定值,那么可能会发生下面的执行序列:

  1. 客户端1获取锁成功。
  2. 客户端1在某个操作上阻塞了很长时间。
  3. 过期时间到了,锁自动释放了。
  4. 客户端2获取到了对应同一个资源的锁。
  5. 客户端1从阻塞中恢复过来,释放掉了客户端2持有的锁。

之后,客户端2在访问共享资源的时候,就没有锁为它提供保护了。

第四个问题,释放锁的操作必须使用Lua脚本来实现。释放锁其实包含三步操作:’GET’、判断和’DEL’,用Lua脚本来实现能保证这三步的原子性。否则,如果把这三步操作放到客户端逻辑中去执行的话,就有可能发生与前面第三个问题类似的执行序列:

  1. 客户端1获取锁成功。
  2. 客户端1访问共享资源。
  3. 客户端1为了释放锁,先执行’GET’操作获取随机字符串的值。
  4. 客户端1判断随机字符串的值,与预期的值相等。
  5. 客户端1由于某个原因阻塞住了很长时间。
  6. 过期时间到了,锁自动释放了。
  7. 客户端2获取到了对应同一个资源的锁。
  8. 客户端1从阻塞中恢复过来,执行DEL操纵,释放掉了客户端2持有的锁。

实际上,在上述第三个问题和第四个问题的分析中,如果不是客户端阻塞住了,而是出现了大的网络延迟,也有可能导致类似的执行序列发生。

前面的四个问题,只要实现分布式锁的时候加以注意,就都能够被正确处理。但除此之外,antirez还指出了一个问题,是由failover引起的,却是基于单Redis节点的分布式锁无法解决的。正是这个问题催生了Redlock的出现。

这个问题是这样的。假如Redis节点宕机了,那么所有客户端就都无法获得锁了,服务变得不可用。为了提高可用性,我们可以给这个Redis节点挂一个Slave,当Master节点不可用的时候,系统自动切到Slave上(failover)。但由于Redis的主从复制(replication)是异步的,这可能导致在failover过程中丧失锁的安全性。考虑下面的执行序列:

  1. 客户端1从Master获取了锁。
  2. Master宕机了,存储锁的key还没有来得及同步到Slave上。
  3. Slave升级为Master。

客户端2从新的Master获取到了对应同一个资源的锁。

于是,客户端1和客户端2同时持有了同一个资源的锁。锁的安全性被打破。针对这个问题,antirez设计了Redlock算法,我们接下来会讨论。

多节点

基于 Redlock 做分布式锁

由于前面介绍的基于单Redis节点的分布式锁在failover的时候会产生解决不了的安全性问题,因此antirez提出了新的分布式锁的算法Redlock,它基于N个完全独立的Redis节点(通常情况下N可以设置成5)。

运行Redlock算法的客户端依次执行下面各个步骤,来完成获取锁的操作:

  1. 获取当前时间(毫秒数)。
  2. 按顺序依次向N个Redis节点执行获取锁的操作。这个获取操作跟前面基于单Redis节点的获取锁的过程相同,包含随机字符串my_random_value,也包含过期时间(比如PX 30000,即锁的有效时间)。为了保证在某个Redis节点不可用的时候算法能够继续运行,这个获取锁的操作还有一个超时时间(time out),它要远小于锁的有效时间(几十毫秒量级)。客户端在向某个Redis节点获取锁失败以后,应该立即尝试下一个Redis节点。这里的失败,应该包含任何类型的失败,比如该Redis节点不可用,或者该Redis节点上的锁已经被其它客户端持有(注:Redlock原文中这里只提到了Redis节点不可用的情况,但也应该包含其它的失败情况)。
  3. 计算整个获取锁的过程总共消耗了多长时间,计算方法是用当前时间减去第1步记录的时间。如果客户端从大多数Redis节点(>= N/2+1)成功获取到了锁,并且获取锁总共消耗的时间没有超过锁的有效时间(lock validity time),那么这时客户端才认为最终获取锁成功;否则,认为最终获取锁失败。
  4. 如果最终获取锁成功了,那么这个锁的有效时间应该重新计算,它等于最初的锁的有效时间减去第3步计算出来的获取锁消耗的时间。
  5. 如果最终获取锁失败了(可能由于获取到锁的Redis节点个数少于N/2+1,或者整个获取锁的过程消耗的时间超过了锁的最初有效时间),那么客户端应该立即向所有Redis节点发起释放锁的操作(即前面介绍的Redis Lua脚本)。

当然,上面描述的只是获取锁的过程,而释放锁的过程比较简单:客户端向所有Redis节点发起释放锁的操作,不管这些节点当时在获取锁的时候成功与否。

由于N个Redis节点中的大多数能正常工作就能保证Redlock正常工作,因此理论上它的可用性更高。我们前面讨论的单Redis节点的分布式锁在failover的时候锁失效的问题,在Redlock中不存在了,但如果有节点发生崩溃重启,还是会对锁的安全性有影响的。具体的影响程度跟Redis对数据的持久化程度有关。

假设一共有5个Redis节点:A, B, C, D, E。设想发生了如下的事件序列:

  1. 客户端1成功锁住了A, B, C,获取锁成功(但D和E没有锁住)。
  2. 节点C崩溃重启了,但客户端1在C上加的锁没有持久化下来,丢失了。
  3. 节点C重启后,客户端2锁住了C, D, E,获取锁成功。

这样,客户端1和客户端2同时获得了锁(针对同一资源)。

在默认情况下,Redis的AOF持久化方式是每秒写一次磁盘(即执行fsync),因此最坏情况下可能丢失1秒的数据。为了尽可能不丢数据,Redis允许设置成每次修改数据都进行fsync,但这会降低性能。当然,即使执行了fsync也仍然有可能丢失数据(这取决于系统而不是Redis的实现)。所以,上面分析的由于节点重启引发的锁失效问题,总是有可能出现的。为了应对这一问题,antirez又提出了延迟重启(delayed restarts)的概念。也就是说,一个节点崩溃后,先不立即重启它,而是等待一段时间再重启,这段时间应该大于锁的有效时间(lock validity time)。这样的话,这个节点在重启前所参与的锁都会过期,它在重启后就不会对现有的锁造成影响。

关于Redlock还有一点细节值得拿出来分析一下:在最后释放锁的时候,antirez在算法描述中特别强调,客户端应该向所有Redis节点发起释放锁的操作。也就是说,即使当时向某个节点获取锁没有成功,在释放锁的时候也不应该漏掉这个节点。这是为什么呢?设想这样一种情况,客户端发给某个Redis节点的获取锁的请求成功到达了该Redis节点,这个节点也成功执行了SET操作,但是它返回给客户端的响应包却丢失了。这在客户端看来,获取锁的请求由于超时而失败了,但在Redis这边看来,加锁已经成功了。因此,释放锁的时候,客户端也应该对当时获取锁失败的那些Redis节点同样发起请求。实际上,这种情况在异步通信模型中是有可能发生的:客户端向服务器通信是正常的,但反方向却是有问题的。

Redlock 的质疑

Martin Kleppmann在2016-02-08这一天发表了一篇blog,名字叫" How to do distributed locking "

由于Redlock本质上是建立在一个同步模型之上,对系统的记时假设(timing assumption)有很强的要求,因此本身的安全性是不够的。

为了说明Redlock对系统记时(timing)的过分依赖,首先给出了下面的一个例子(还是假设有5个Redis节点A, B, C, D, E):

  1. 客户端1从Redis节点A, B, C成功获取了锁(多数节点)。由于网络问题,与D和E通信失败。
  2. 节点C上的时钟发生了向前跳跃,导致它上面维护的锁快速过期。
  3. 客户端2从Redis节点C, D, E成功获取了同一个资源的锁(多数节点)。
  4. 客户端1和客户端2现在都认为自己持有了锁。

上面这种情况之所以有可能发生,本质上是因为Redlock的安全性(safety property)对系统的时钟有比较强的依赖,一旦系统的时钟变得不准确,算法的安全性也就保证不了了。Martin在这里其实是要指出分布式算法研究中的一些基础性问题,或者说一些常识问题,即好的分布式算法应该基于异步模型(asynchronous model),算法的安全性不应该依赖于任何记时假设(timing assumption)。在异步模型中:进程可能pause任意长的时间,消息可能在网络中延迟任意长的时间,甚至丢失,系统时钟也可能以任意方式出错。一个好的分布式算法,这些因素不应该影响它的安全性(safety property),只可能影响到它的活性(liveness property),也就是说,即使在非常极端的情况下(比如系统时钟严重错误),算法顶多是不能在有限的时间内给出结果而已,而不应该给出错误的结果。这样的算法在现实中是存在的,像比较著名的Paxos,或Raft。但显然按这个标准的话,Redlock的安全性级别是达不到的。

随后,Martin觉得前面这个时钟跳跃的例子还不够,又给出了一个由客户端GC pause引发Redlock失效的例子。如下:

  1. 客户端1向Redis节点A, B, C, D, E发起锁请求。
  2. 各个Redis节点已经把请求结果返回给了客户端1,但客户端1在收到请求结果之前进入了长时间的GC pause。
  3. 在所有的Redis节点上,锁过期了。
  4. 客户端2在A, B, C, D, E上获取到了锁。
  5. 客户端1从GC pause从恢复,收到了前面第2步来自各个Redis节点的请求结果。客户端1认为自己成功获取到了锁。
  6. 客户端1和客户端2现在都认为自己持有了锁。

Martin给出的这个例子其实有点小问题。在Redlock算法中,客户端在完成向各个Redis节点的获取锁的请求之后,会计算这个过程消耗的时间,然后检查是不是超过了锁的有效时间(lock validity time)。也就是上面的例子中第5步,客户端1从GC pause中恢复过来以后,它会通过这个检查发现锁已经过期了,不会再认为自己成功获取到锁了。随后antirez在他的反驳文章中就指出来了这个问题,但Martin认为这个细节对Redlock整体的安全性没有本质的影响。

抛开这个细节,我们可以分析一下Martin举这个例子的意图在哪。初看起来,这个例子跟文章前半部分分析通用的分布式锁时给出的GC pause的时序图是基本一样的,只不过那里的GC pause发生在客户端1获得了锁之后,而这里的GC pause发生在客户端1获得锁之前。但两个例子的侧重点不太一样。Martin构造这里的这个例子,是为了强调在一个分布式的异步环境下,长时间的GC pause或消息延迟(上面这个例子中,把GC pause换成Redis节点和客户端1之间的消息延迟,逻辑不变),会让客户端获得一个已经过期的锁。从客户端1的角度看,Redlock的安全性被打破了,因为客户端1收到锁的时候,这个锁已经失效了,而Redlock同时还把这个锁分配给了客户端2。换句话说,Redis服务器在把锁分发给客户端的途中,锁就过期了,但又没有有效的机制让客户端明确知道这个问题。而在之前的那个例子中,客户端1收到锁的时候锁还是有效的,锁服务本身的安全性可以认为没有被打破,后面虽然也出了问题,但问题是出在客户端1和共享资源服务器之间的交互上。

在Martin的这篇文章中,还有一个很有见地的观点,就是对锁的用途的区分。他把锁的用途分为两种:

最后,Martin得出了如下的结论:

如果是为了效率(efficiency)而使用分布式锁,允许锁的偶尔失效,那么使用单Redis节点的锁方案就足够了,简单而且效率高。Redlock则是个过重的实现(heavyweight)。

如果是为了正确性(correctness)在很严肃的场合使用分布式锁,那么不要使用Redlock。它不是建立在异步模型上的一个足够强的算法,它对于系统模型的假设中包含很多危险的成分(对于timing)。而且,它没有一个机制能够提供fencing token。那应该使用什么技术呢?Martin认为,应该考虑类似ETCD的方案,或者支持事务的数据库。

就在Martin的文章发表出来的第二天,antirez就在他的博客上贴出了他对于此事的反驳文章,名字叫”Is Redlock safe?”

关于算法在记时(timing)方面的模型假设。在我们前面分析Martin的文章时也提到过,Martin认为Redlock会失效的情况主要有三种:

  1. 时钟发生跳跃。
  2. 长时间的GC pause。
  3. 长时间的网络延迟。

antirez肯定意识到了这三种情况对Redlock最致命的其实是第一点:时钟发生跳跃。这种情况一旦发生,Redlock是没法正常工作的。而对于后两种情况来说,Redlock在当初设计的时候已经考虑到了,对它们引起的后果有一定的免疫力。所以,antirez接下来集中精力来说明通过恰当的运维,完全可以避免时钟发生大的跳动,而Redlock对于时钟的要求在现实系统中是完全可以满足的。

Martin在提到时钟跳跃的时候,举了两个可能造成时钟跳跃的具体例子:

  1. 系统管理员手动修改了时钟。
  2. 从NTP服务收到了一个大的时钟更新事件。

antirez反驳说:

  1. 手动修改时钟这种人为原因,不要那么做就是了。否则的话,如果有人手动修改Raft协议的持久化日志,那么就算是Raft协议它也没法正常工作了。
  2. 使用一个不会进行“跳跃”式调整系统时钟的ntpd程序(可能是通过恰当的配置),对于时钟的修改通过多次微小的调整来完成。

而Redlock对时钟的要求,并不需要完全精确,它只需要时钟差不多精确就可以了。比如,要记时5秒,但可能实际记了4.5秒,然后又记了5.5秒,有一定的误差。不过只要误差不超过一定范围,这对Redlock不会产生影响。antirez认为呢,像这样对时钟精度并不是很高的要求,在实际环境中是完全合理的。

好了,到此为止,如果你相信antirez这里关于时钟的论断,那么接下来antirez的分析就基本上顺理成章了。

关于Martin提到的能使Redlock失效的后两种情况,Martin在分析的时候恰好犯了一个错误(在本文上半部分已经提到过)。在Martin给出的那个由客户端GC pause引发Redlock失效的例子中,这个GC pause引发的后果相当于在锁服务器和客户端之间发生了长时间的消息延迟。Redlock对于这个情况是能处理的。回想一下Redlock算法的具体过程,它使用起来的过程大体可以分成5步:

  1. 获取当前时间。
  2. 完成获取锁的整个过程(与N个Redis节点交互)。
  3. 再次获取当前时间。
  4. 把两个时间相减,计算获取锁的过程是否消耗了太长时间,导致锁已经过期了。如果没过期,
  5. 客户端持有锁去访问共享资源。

在Martin举的例子中,GC pause或网络延迟,实际发生在上述第1步和第3步之间。而不管在第1步和第3步之间由于什么原因(进程停顿或网络延迟等)导致了大的延迟出现,在第4步都能被检查出来,不会让客户端拿到一个它认为有效而实际却已经过期的锁。当然,这个检查依赖系统时钟没有大的跳跃。这也就是为什么antirez在前面要对时钟条件进行辩护的原因。

有人会说,在第3步之后,仍然可能会发生延迟啊。没错,antirez承认这一点,他对此有一段很有意思的论证,原话如下:

延迟只能发生在第3步之后,这导致锁被认为是有效的而实际上已经过期了,也就是说,我们回到了这个问题上:客户端没能够在锁的有效性过期之前完成与共享资源的交互。让我再次申明一下,这个问题对于所有的分布式锁的实现是普遍存在的,而且基于token的这种解决方案是不切实际的,但也能和Redlock一起用。

之前我们提到过,任何一种带自动过期功能的分布式锁在没有提供fencing机制的前提下都有可能失效。换句话说,对于大延迟给Redlock带来的影响,不单单针对Redlock。Redlock的实现已经保证了它是和其它任何分布式锁的安全性是一样的。当然,与其它“更完美”的分布式锁相比,Redlock似乎提供不了那种递增的token,但antirez在前面已经分析过了,关于token的这种论证方式本身就是“不切实际”的,或者退一步讲,Redlock能提供的unique token也能够提供完全一样的效果。

对于Redlock在第4步所做的锁有效性的检查,Martin是予以肯定的。但他认为客户端和资源服务器之间的延迟还是会带来问题的。Martin在这里说的有点模糊。就像antirez前面分析的,客户端和资源服务器之间的延迟,对所有的分布式锁的实现都会带来影响,这不单单是Redlock的问题了。

以上就是antirez在blog中所说的主要内容。有一些点值得我们注意一下:

antirez是同意大的系统时钟跳跃会造成Redlock失效的。在这一点上,他与Martin的观点的不同在于,他认为在实际系统中是可以避免大的时钟跳跃的。当然,这取决于基础设施和运维方式。

antirez在设计Redlock的时候,是充分考虑了网络延迟和程序停顿所带来的影响的。但是,对于客户端和资源服务器之间的延迟(即发生在算法第3步之后的延迟),antirez是承认所有的分布式锁的实现,包括Redlock,是没有什么好办法来应对的。

讨论进行到这,Martin和antirez之间谁对谁错其实并不是那么重要了。只要我们能够对Redlock(或者其它分布式锁)所能提供的安全性的程度有充分的了解,那么我们就能做出自己的选择了。

在Martin的这篇文章中,还有一个很有见地的观点,就是对锁的用途的区分。他把锁的用途分为两种:

  • 为了效率(efficiency),协调各个客户端避免做重复的工作。即使锁偶尔失效了,只是可能把某些操作多做一遍而已,不会产生其它的不良后果。比如重复发送了一封同样的email。
  • 为了正确性(correctness)。在任何情况下都不允许锁失效的情况发生,因为一旦发生,就可能意味着数据不一致(inconsistency),数据丢失,文件损坏,或者其它严重的问题。

最后,Martin得出了如下的结论:

  • 如果是为了效率(efficiency)而使用分布式锁,允许锁的偶尔失效,那么使用单Redis节点的锁方案就足够了,简单而且效率高。Redlock则是个过重的实现(heavyweight)。
  • 如果是为了正确性(correctness)在很严肃的场合使用分布式锁,那么不要使用Redlock。它不是建立在异步模型上的一个足够强的算法,它对于系统模型的假设中包含很多危险的成分(对于timing)。而且,它没有一个机制能够提供fencing token。那应该使用什么技术呢?Martin认为,应该考虑类似etcd的方案,或者支持事务的数据库。

使用

一个Go的实现:https://github.com/go-redsync/redsync (基于redigo))

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
	goredislib "github.com/go-redis/redis"
	"github.com/go-redsync/redsync/v3"
	"github.com/go-redsync/redsync/v3/redis"
	"github.com/go-redsync/redsync/v3/redis/goredis"
	"github.com/stvp/tempredis"
)

func main() {

	server, err := tempredis.Start(tempredis.Config{})
	if err != nil {
		panic(err)
	}
	defer server.Term()

	client := goredislib.NewClient(&goredislib.Options{
		Network: "unix",
		Addr:    server.Socket(),
	})

	pool := goredis.NewGoredisPool(client)

	rs := redsync.New([]redis.Pool{pool})

	mutex := rs.NewMutex("test-redsync")
	err = mutex.Lock()

	if err != nil {
		panic(err)
	}

	mutex.Unlock()
}

ETCD实现分布式锁

很多人(也包括Martin在内)都认为,如果你想构建一个更安全的分布式锁,那么应该使用ETCD,而不是Redis:

  1. 客户端尝试创建一个节点node,比如/lock。那么第一个客户端就创建成功了,相当于拿到了锁;而其它的客户端会创建失败(节点node已存在),获取锁失败。
  2. 持有锁的客户端访问共享资源完成后,将znode删掉,这样其它客户端接下来就能来获取锁了。
  3. node与session的lease绑定。它保证如果创建node的那个客户端崩溃了,那么相应的node会被自动删除。这保证了锁一定会被释放。

这是ETCD客户端的分布式锁实现。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package concurrency

import (
	"context"
	"fmt"
	"sync"

	v3 "github.com/coreos/etcd/clientv3"
	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)

// Mutex implements the sync Locker interface with etcd
type Mutex struct {
	s *Session

	pfx   string
	myKey string
	myRev int64
	hdr   *pb.ResponseHeader
}

func NewMutex(s *Session, pfx string) *Mutex {
	return &Mutex{s, pfx + "/", "", -1, nil}
}

// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
	s := m.s
    client := m.s.Client()

    //m.pfx是前缀,比如"service/lock/"
    //s.Lease()是一个64位的整数值,etcd v3引入了lease(租约)的概念,concurrency包基于lease封装了session,每一个客户端都有自己的lease,也就是说每个客户端都有一个唯一的64位整形值,
    //m.myKey类似于"service/lock/12345"
    m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())

    //etcdv3新引入的多键条件事务,替代了v2中Compare-And-put操作。etcdv3的多键条件事务的语意是先做一个比较(compare)操作,如果比较成立则执行一系列操作,如果比较不成立则执行另外一系列操作。有类似于C语言中的条件表达式。
    //接下来的这部分实现了如果不存在这个key,则将这个key写入到etcd,如果存在则读取这个key的值这样的功能。
    //下面这一句,是构建了一个compare的条件,比较的是key的createRevision,如果revision是0,则存入一个key,如果revision不为0,则读取这个key。
    //revision是etcd一个全局的序列号,每一个对etcd存储进行改动都会分配一个这个序号,在v2中叫index,createRevision是表示这个key创建时被分配的这个序号。当key不存在时,createRivision是0。

	////比较key的revision为0(0表示没有key)
	cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
	// put self in lock waiters via myKey; oldest waiter holds lock
	//则put key,并设置租约
	put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
	// reuse key in case this session already holds the lock
	//否则 获取这个key,重用租约中的锁(这里主要目的是在于重入)
    //通过第二次获取锁,判断锁是否存在来支持重入
    //所以只要租约一致,那么是可以重入的.
	get := v3.OpGet(m.myKey)
	// fetch current holder to complete uncontended path with only one RPC
	//通过前缀获取最先创建的key
	getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
	resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
	if err != nil {
		return err
	}
	m.myRev = resp.Header.Revision
	//如果已存在key,获取CreateRevision
	//获取到自身的revision(注意,此处CreateRevision和Revision不一定相等)
	if !resp.Succeeded {
		m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
	}
	// if no key on prefix / the minimum rev is key, already hold the lock
	ownerKey := resp.Responses[1].GetResponseRange().Kvs
	// 比较如果当前没有人获得锁或者锁的owner的createrevision等于当前的kv的revision,则表示已获得锁,就可以退出了
	// 通过对比自身的revision和最先创建的key的revision得出谁获得了锁
    // 例如 自身revision:5,最先创建的key createRevision:3  那么不获得锁,进入waitDeletes
    //    自身revision:5,最先创建的key createRevision:5  那么获得锁

	if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
		m.hdr = resp.Header
		return nil
	}
	//如果上面的code操作成功了,则myRev是当前客户端创建的key的revision值。

	// 走到这里代表没有获得锁,需要等待之前的锁被释放,即等待revision小于当前revision的kv被删除

	//waitDeletes等待  匹配m.pfx ("service/lock/")这个前缀并且createRivision小于等于m.myRev-1的 keys 被删除

	//如果没有比当前客户端创建的key的revision小的key,则当前客户端者获得锁

	// wait for deletion revisions prior to myKey
	hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
	// release lock key if wait failed
	if werr != nil {
		m.Unlock(client.Ctx())
	} else {
		m.hdr = hdr
	}
	return werr
}

func (m *Mutex) Unlock(ctx context.Context) error {
	client := m.s.Client()
	if _, err := client.Delete(ctx, m.myKey); err != nil {
		return err
	}
	m.myKey = "\x00"
	m.myRev = -1
	return nil
}

func (m *Mutex) IsOwner() v3.Cmp {
	return v3.Compare(v3.CreateRevision(m.myKey), "=", m.myRev)
}

func (m *Mutex) Key() string { return m.myKey }

// Header is the response header received from etcd on acquiring the lock.
func (m *Mutex) Header() *pb.ResponseHeader { return m.hdr }

type lockerMutex struct{ *Mutex }

func (lm *lockerMutex) Lock() {
	client := lm.s.Client()
	if err := lm.Mutex.Lock(client.Ctx()); err != nil {
		panic(err)
	}
}
func (lm *lockerMutex) Unlock() {
	client := lm.s.Client()
	if err := lm.Mutex.Unlock(client.Ctx()); err != nil {
		panic(err)
	}
}

// NewLocker creates a sync.Locker backed by an etcd mutex.
func NewLocker(s *Session, pfx string) sync.Locker {
	return &lockerMutex{NewMutex(s, pfx)}
}

// 等待持有锁的key删除
// 内部实现为等其他所有比当前createRevision小的key,监听删除事件
// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
    //WithLastCreate 按照CreateRevision排序,降序 例如 5 4 3 2 1
    //WithMaxCreateRev 获取比maxCreateRev小的key
	getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
	for {
		resp, err := client.Get(ctx, pfx, getOpts...)
		if err != nil {
			return nil, err
		}
		if len(resp.Kvs) == 0 {
			return resp.Header, nil
		}
		lastKey := string(resp.Kvs[0].Key)
        //监听删除事件
		if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
			return nil, err
		}
	}
}
//从revision开始监听删除事件,因为revision存在,所以也避免了ABA问题
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
	cctx, cancel := context.WithCancel(ctx)
	defer cancel()

	var wr v3.WatchResponse
	wch := client.Watch(cctx, key, v3.WithRev(rev))
	for wr = range wch {
		for _, ev := range wr.Events {
            //遇到删除事件才返回
			if ev.Type == mvccpb.DELETE {
				return nil
			}
		}
	}
	if err := wr.Err(); err != nil {
		return err
	}
	if err := ctx.Err(); err != nil {
		return err
	}
	return fmt.Errorf("lost watcher waiting for delete")
}

总结一下,上面的锁的实现,所有的客户端都在service/lock下创建一个自己的key,createrevision最小的那个客户端获得锁,也就是最早建立key的客户端获得锁,之后按照创建的时间先后依次获得锁。

CreateRevision: 创建时的revision

Header.Revision: etcd server现在的revision

刚才说了那么多revision,这里我们需要详细的了解一下etcd中的mvcc多版本机制

那么我们先来明确几个概念:

main ID:在etcd中每个事务的唯一id,全局递增不重复.

sub ID: 在事务中的连续多个修改操作会从0开始编号,这个编号就是sub ID

revision: 由(mainID,subID)组成的唯一标识

所以现在我们的revision就是这么来的啦…

使用方式:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package concurrency_test

import (
	"context"
	"fmt"
	"log"

	"github.com/coreos/etcd/clientv3"
	"github.com/coreos/etcd/clientv3/concurrency"
)

func ExampleMutex_Lock() {
	cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	// create two separate sessions for lock competition
	s1, err := concurrency.NewSession(cli)
	if err != nil {
		log.Fatal(err)
	}
	defer s1.Close()
	m1 := concurrency.NewMutex(s1, "/my-lock/")

	s2, err := concurrency.NewSession(cli)
	if err != nil {
		log.Fatal(err)
	}
	defer s2.Close()
	m2 := concurrency.NewMutex(s2, "/my-lock/")

	// acquire lock for s1
	if err := m1.Lock(context.TODO()); err != nil {
		log.Fatal(err)
	}
	fmt.Println("acquired lock for s1")

	m2Locked := make(chan struct{})
	go func() {
		defer close(m2Locked)
		// wait until s1 is locks /my-lock/
		if err := m2.Lock(context.TODO()); err != nil {
			log.Fatal(err)
		}
	}()

	if err := m1.Unlock(context.TODO()); err != nil {
		log.Fatal(err)
	}
	fmt.Println("released lock for s1")

	<-m2Locked
	fmt.Println("acquired lock for s2")

	// Output:
	// acquired lock for s1
	// released lock for s1
	// acquired lock for s2
}

ETCD版本的分布式锁问题相对比较来说少。

  • 在正常情况下,客户端可以持有锁任意长的时间,这可以确保它做完所有需要的资源访问操作之后再释放锁。这避免了基于Redis的锁对于有效时间(lock validity time)到底设置多长的两难问题。实际上,基于ETCD的锁是依靠Session(心跳)来维持锁的持有状态的,而Redis不支持Sesion。
  • 基于ETCD的锁支持在获取锁失败之后等待锁重新释放的事件。这让客户端对锁的使用更加灵活。ETCD的watch机制。这个机制可以这样来使用,比如当客户端试图创建/lock的时候,发现它已经存在了,这时候创建失败,但客户端不一定就此对外宣告获取锁失败。客户端可以进入一种等待状态,等待当/lock节点被删除的时候,ETCD通过watch机制通知它,这样它就可以继续完成创建操作(获取锁)。这可以让分布式锁在客户端用起来就像一个本地的锁一样:加锁失败就阻塞住,直到获取到锁为止。这样的特性Redlock就无法实现。
  • 是否单点故障:redis本身有很多中玩法,如客户端一致性hash,服务器端sentinel方案或者cluster方案,很难做到一种分布式锁方式能应对所有这些方案。而ETCD只有一种玩法,多台机器的节点数据是一致的,没有redis的那么多的麻烦因素要考虑。

总体上来说ETCD实现分布式锁更加的简单,可靠性更高。

看起来这个锁相当完美,但仔细考察的话,并不尽然。

ETCD是怎么检测出某个客户端已经崩溃了呢?实际上,每个客户端都与ETCD的某台服务器维护着一个Session,这个Session依赖定期的心跳(heartbeat)来维持。如果ETCD长时间收不到客户端的心跳(这个时间称为Sesion的过期时间),那么它就认为Session过期了,通过这个Session所创建的节点都会被自动删除。

设想如下的执行序列:

  1. 客户端1创建了node节点/lock,获得了锁。
  2. 客户端1进入了长时间的GC pause。
  3. 客户端1连接到ETCD的Session过期了。znode节点/lock被自动删除。
  4. 客户端2创建了znode节点/lock,从而获得了锁。
  5. 客户端1从GC pause中恢复过来,它仍然认为自己持有锁。

最后,客户端1和客户端2都认为自己持有了锁,冲突了。这与之前Martin在文章中描述的由于GC pause导致的分布式锁失效的情况类似。

看起来,用ETCD实现的分布式锁也不一定就是安全的。该有的问题它还是有。但是,ETCD作为一个专门为分布式应用提供方案的框架,它提供了一些非常好的特性,是Redis之类的方案所没有的。

ETCD 实现的分布式锁其实存在一个缺点,那就是性能上可能并没有缓存服务那么高。因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。ETCD 中创建和删除节点只能通过 Leader 服务器来执行,然后将数据同不到所有的 Follower 机器上。

对锁数据的可靠性要求极高的话,那只能使用 etcd 或者 zk 这种通过一致性协议保证数据可靠性的锁方案。但可靠的背面往往都是较低的吞吐量和较高的延迟。需要根据业务的量级对其进行压力测试,以确保分布式锁所使用的 etcd/zk 集群可以承受得住实际的业务请求压力。需要注意的是,etcd 集群是没有办法通过增加节点来提高其性能的。要对其进行横向扩展,只能增加搭建多个集群来支持更多的请求。这会进一步提高对运维和监控的要求。多个集群可能需要引入 proxy,没有 proxy 那就需要业务去根据某个业务 id 来做 sharding。如果业务已经上线的情况下做扩展,还要考虑数据的动态迁移。这些都不是容易的事情。

易错点

现象

线上程序一直报错,错误信息:lock failed: context deadline exceeded, retry

排查过程

异常对应代码位置

很明显的是获取锁超时了,由于用的etcd的分布式锁,就怀疑是etcd出问题了,此时看到大量etcd日志,rejected connection from “ip:port” (error “tls: first record does not look like a TLS handshake”, ServerName “”),怀疑是不是这个问题导致的,经过查询报错的IP,均为线上容器IP,登陆容器内看发现都是管理员平台的代码,里面也用到了py03的etcd,但是不是导致超时的原因。在排除各种可能之后,最后去etcd查看锁对应的key的情况,发现有两个key

1
2
3
/notifier/locker/{leaseid}

/notifier/locker/rwl/{leaseid}

其中第一个key是notifier自己添加的,第二个key在代码中搜不到,但是看起来像是redis whitelist的简写,先把第一个key删了,然后看notifier日志,仍然获取不到锁,所以怀疑是第二个key已经获得了锁,虽然key不一样。于是删除了第二个key,再看notifier日志,终于获得了锁,开始正常工作,于是得出猜想,etcd的分布式锁,在子目录下加了锁之后,父目录会加锁失败。然后用etcdctl lock来验证了下,确实如此,/a/b下加了锁,/a再加锁就会失败,但是/a下加了锁,/a/b再加锁会成功。基本上可以验证上面的猜想,剩下的就是从etcd源码中找到对应处理的代码了。

etcd源码部分

在查询源码之前,第一反应就是这肯定是在服务端实现的,于是开始了从etcd服务端找相关源码的过程,从etcdctl命令开始追溯到所涉及的服务端,一直没有发现问题。又在网上搜了相关etcd服务端源码实现的文章,结合本地代码均没有想找的代码,于是反过来从client找起。

首先从etcdctl lock命令开始,挑主要函数展示

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 代码位置go.etcd.io/etcd/etcdctl/ctlv3/command/lock_command.go
func lockUntilSignal(c *clientv3.Client, lockname string, cmdArgs []string) error {
   ...

   if err := m.Lock(ctx); err != nil {
      return err
   }

   ...
}

接下来进入到Lock函数,这是个关键函数,etcd的分布式锁就是在这里实现的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
func (m *Mutex) Lock(ctx context.Context) error {
   s := m.s
   client := m.s.Client()

   // 这里的pfx就是prefix,就是传进来的前缀,后面的s.Lease()会返回一个租约,是一个int64的整数,和session有关
   m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
   // 这里比较上面prefix/lease的createrevision是否为0,为0表示目前不存在该key,需要执行Put操作,下面可以看到
   // 部位0表示已经有对应的key了,只需要执行Get就行
   // createrevision是自增的
   cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
   // put self in lock waiters via myKey; oldest waiter holds lock
   put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
   // reuse key in case this session already holds the lock
   get := v3.OpGet(m.myKey)
   // 获取所得持有者
   getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
   resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
   if err != nil {
      return err
   }
   m.myRev = resp.Header.Revision
   if !resp.Succeeded {
      m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
   }
   // if no key on prefix / the minimum rev is key, already hold the lock
   ownerKey := resp.Responses[1].GetResponseRange().Kvs
   // 比较如果当前没有人获得锁或者锁的owner的createrevision等于当前的kv的revision,则表示已获得锁,就可以退出了
   if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
      m.hdr = resp.Header
      return nil
   }

   // 为了验证自己加的打印信息
   //fmt.Printf("ownerKey: %s\n", ownerKey)
   // 走到这里代表没有获得锁,需要等待之前的锁被释放,即revision小于当前revision的kv被删除
   hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
   // release lock key if wait failed
   if werr != nil {
      m.Unlock(client.Ctx())
   } else {
      m.hdr = hdr
   }
   return werr
}

// waitDeletes 等待所有当前比当前key的revision小的key被删除后,锁释放后才返回
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
   getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
   for {
      resp, err := client.Get(ctx, pfx, getOpts...)
      if err != nil {
         return nil, err
      }
      if len(resp.Kvs) == 0 {
         return resp.Header, nil
      }
      lastKey := string(resp.Kvs[0].Key)
      // 为了调试自己加的这句
      fmt.Printf("wait for %s to delete\n", lastKey)
      if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
         return nil, err
      }
   }
}

func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
   cctx, cancel := context.WithCancel(ctx)
   defer cancel()

   var wr v3.WatchResponse
   // wch是个channel,key被删除后会往这个chan发数据
   wch := client.Watch(cctx, key, v3.WithRev(rev))
   for wr = range wch {
      for _, ev := range wr.Events {
         if ev.Type == mvccpb.DELETE {
            return nil
         }
      }
   }
   if err := wr.Err(); err != nil {
      return err
   }
   if err := ctx.Err(); err != nil {
      return err
   }
   return fmt.Errorf("lost watcher waiting for delete")
}

看完上面的代码基本知道了etcd分布式锁的实现机制了,但是还没看到哪里和前缀Prefix相关了。其实答案就藏在getOwner里,看上述代码,不管是执行Put还是Get,最终都有个getOwner的过程,看一下这个getOwner,options模式里有个v3.WithFirstCreate函数调用,看下这个函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// WithFirstCreate gets the key with the oldest creation revision in the request range.
func WithFirstCreate() []OpOption { return withTop(SortByCreateRevision, SortAscend) }

// withTop gets the first key over the get's prefix given a sort order
func withTop(target SortTarget, order SortOrder) []OpOption {
   return []OpOption{WithPrefix(), WithSort(target, order), WithLimit(1)}
}

// WithPrefix enables 'Get', 'Delete', or 'Watch' requests to operate
// on the keys with matching prefix. For example, 'Get(foo, WithPrefix())'
// can return 'foo1', 'foo2', and so on.
func WithPrefix() OpOption {
   return func(op *Op) {
      if len(op.key) == 0 {
         op.key, op.end = []byte{0}, []byte{0}
         return
      }
      op.end = getPrefix(op.key)
   }
}

看到上面的是三个函数后,大致就找到了对应的源码的感觉,因为看到了WithPrefix函数,和上面的猜测正好匹配。所以getOwner的具体执行效果是会把虽有以lockkey开头的kv都拿到,且按照createrevision升序排列,取第一个值,这个意思就很明白了,就是要拿到当前以lockkey为prefix的且createrevision最小的那个key,就是目前已经拿到锁的key。

看了上面的源码就可以明白为什么/a/b加了锁之后,/a加锁会超时了,因为在getOwner时,拿到了/a/b,且createrevision小于/a的revision,于是/a就会等待/a/b被删除后,watch chanel有数据后才能获得锁。

看到这里还有个需要确认的问题,那就是如果/ab加锁了,那么再对/a加锁会怎么样?/a肯定是/ab的prefix啊,是不是也会加锁失败呢?

结论是会加锁成功,看下源码

1
2
3
 func NewMutex(s *Session, pfx string) *Mutex {
   return &Mutex{s, pfx + "/", "", -1, nil}
}

可以看到在NewMutex时并不是直接拿传进来的pfx作为prefix的,而且在后面加了个”/“,所以上线的/ab加了锁,/a加锁还是可以成功的。一般查找prefix或suffix时都会加上固定的分隔符,要不然就会出现误判。

总结

通过分析问题,看源码,可以了解到etcd锁的实现原理,以及可能存在的小坑。etcd居然把锁的实现放在了client端,也是出乎我的意料,这样的话,可以直接修改client端代码来修改其锁的实现,就可能出现虽然共用一个服务端,但是etcd行为却不一致的问题,不知道为何要这么设计,个人感觉还是要放到服务端更好些。

Chubby与fencing

提到分布式锁,就不能不提Google的Chubby。

Chubby是Google内部使用的分布式锁服务,有点类似于ZooKeeper,但也存在很多差异。

Chubby自然也考虑到了延迟造成的锁失效的问题。论文里有一段描述如下:

一个进程持有锁L,发起了请求R,但是请求失败了。另一个进程获得了锁L并在请求R到达目的方之前执行了一些动作。如果后来请求R到达了,它就有可能在没有锁L保护的情况下进行操作,带来数据不一致的潜在风险。

这跟Martin的分析大同小异。

Chubby给出的用于解决(缓解)这一问题的机制称为sequencer,类似于fencing token机制。锁的持有者可以随时请求一个sequencer,这是一个字节串,它由三部分组成:

  • 锁的名字。
  • 锁的获取模式(排他锁还是共享锁)。
  • lock generation number(一个64bit的单调递增数字)。作用相当于fencing token或epoch number。

客户端拿到sequencer之后,在操作资源的时候把它传给资源服务器。然后,资源服务器负责对sequencer的有效性进行检查。检查可以有两种方式:

  1. 调用Chubby提供的API,CheckSequencer(),将整个sequencer传进去进行检查。这个检查是为了保证客户端持有的锁在进行资源访问的时候仍然有效。
  2. 将客户端传来的sequencer与资源服务器当前观察到的最新的sequencer进行对比检查。可以理解为与Martin描述的对于fencing token的检查类似。

当然,如果由于兼容的原因,资源服务本身不容易修改,那么Chubby还提供了一种机制:

lock-delay。Chubby允许客户端为持有的锁指定一个lock-delay的时间值(默认是1分钟)。当Chubby发现客户端被动失去联系的时候,并不会立即释放锁,而是会在lock-delay指定的时间内阻止其它客户端获得这个锁。这是为了在把锁分配给新的客户端之前,让之前持有锁的客户端有充分的时间把请求队列排空(draining the queue),尽量防止出现延迟到达的未处理请求。

可见,为了应对锁失效问题,Chubby提供的三种处理方式:CheckSequencer()检查、与上次最新的sequencer对比、lock-delay,它们对于安全性的保证是从强到弱的。而且,这些处理方式本身都没有保证提供绝对的正确性(correctness)。但是,Chubby确实提供了单调递增的lock generation number,这就允许资源服务器在需要的时候,利用它提供更强的安全性保障。

总结

业务还在单机就可以搞定的量级时,那么按照需求使用任意的单机锁方案就可以。

如果发展到了分布式服务阶段,但业务规模不大,比如 qps < 1000,使用哪种锁方案都差不多。如果 公司内已有可以使用的 ZooKeeper/etcd/Redis 集群,那么就尽量在不引入新的技术栈的情况下满足业 务需求。 业务发展到一定量级的话,就需要从多方面来考虑了。首先是你的锁是否在任何恶劣的条件下都不允 许数据丢失,如果不允许,那么就不要使用 Redis 的 setnx 的简单锁。

如果要使用 redlock,那么要考虑你们公司 Redis 的集群方案,是否可以直接把对应的 Redis 的实例的 ip+port 暴露给开发人员。如果不可以,那也没法用。

对锁数据的可靠性要求极高的话,那只能使用 etcd 或者 zk 这种通过一致性协议保证数据可靠性的锁 方案。但可靠的背面往往都是较低的吞吐量和较高的延迟。需要根据业务的量级对其进行压力测试, 以确保分布式锁所使用的 etcd/zk 集群可以承受得住实际的业务请求压力。需要注意的是,etcd 和 zk 集群是没有办法通过增加节点来提高其性能的。要对其进行横向扩展,只能增加搭建多个集群来支持 更多的请求。这会进一步提高对运维和监控的要求。多个集群可能需要引入 proxy,没有 proxy 那就需 要业务去根据某个业务 id 来做 sharding。如果业务已经上线的情况下做扩展,还要考虑数据的动态迁 移。这些都不是容易的事情。

在选择具体的方案时,还是需要多加思考,对⻛险早做预估。

参考

https://segmentfault.com/a/1190000017241446 https://juejin.im/post/5bbb0d8df265da0abd3533a5 https://cloud.tencent.com/developer/article/1116306 http://zhangtielei.com/posts/blog-redlock-reasoning.html http://zhangtielei.com/posts/blog-redlock-reasoning-part2.html

http://likakuli.com/post/2019/01/31/etcd_lock/