使用 Consul 实现分布式锁

最近公司项目在使用 Consul 做分布式服务,在开发过程中有业务需要分布式锁来做控制逻辑和集群选举,因此使用 Consul 的 Session 机制和 K/V 存储实现了一个分布式锁。

Consul 实现分布式锁

实现方法

使用 Consul 实现分布式锁在 Consul 的官方文档中有所描述。

Consul KV 支持acquirerelease操作。acquire/release操作实现了一种类似 Check-And-Set操作,这两个操作使用 Consul Session 进行操作:

  • acquire操作只有当 Key 的锁不存在持有者(Session)时才会返回 true,同时执行操作的 Session 会持有对该 Key 的锁;否则就返回false;
  • release操作则是使用指定的 Session 来释放某个Key的锁,如果指定的 Session无效,那么会返回 false,否则就会set设置Value值,并返回 true。

由于同一时间只有一个 Session 可以占有一个 Key 的锁,因此可以将一个 Key 当做一把锁,在访问临界资源时调用acquire操作实现 Lock 操作;在访问结束后调用release操作实现 Unlock 操作。

Consul API 解析

Session

Consul Session的 createdestroy详细细节可以参考官方 API 文档。其中,在创建 Session 时通过指定不同的检查方法可以实现不同的功能。

在本文中,使用 TTL 作为 Session 的健康检查。如果超过 TTL 后 Session 没有更新(调用renew方法续约),则认为会话已过期并触发锁失效。这种类型的故障检测器也称为心跳故障检测器。这种故障检测器的可扩展性低,同时也增加了服务器的负担,但在 TTL 的代表了一种下限。即在分布式锁的应用场景中,使用 TTL 我们可以确保在 TTL 过后被 Session 占有的 Key 的锁一定会释放。当使用 TTL 时,需要注意时钟偏差问题。

KV

KV 的 acquirerelease 详情也可见官方 API 文档。其中,如上节所述,在取得一个 Key 的锁并为其设置 Session后,如果Session 过期了,Key 锁也就被释放了。特别的,即使已经有 Key 的锁被某 Session 占有,但是使用不带有acquire的更新操作,并不会受已有锁的限制。这点很像linux的文件锁flock,是劝告式的,这表示一个进程可以简单地忽略另一个进程在文件上放置的锁。要使得劝告式加锁模型能够正常工作,所有访问文件的进程都必须要配合,即在执行文件IO之前先放置一把锁。

python-consul API

由于公司使用 Python 开发,因此使用的是python-consul提供的接口,其对 Session 和 KV 接口的使用方式与 Java 有些区别。具体文档可以参考 python-consul 的接口文档:

  • Session create,注意lock_delay,是指在锁释放后,对其再加锁需要等待的时间。该参数的目的是尽可能防止仍存活的 Leader 节点获取锁之后做出一些可能导致不一致场景出现的操作。
  • KV put

代码实现

代码实现较简单,没有做 Session TTL 到期后续期的需求和实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import socket
import time
from consul import Consul


class Lock(object):
def __init__(self, timeout=10):
self.consul_client = Consul()

def lock():
try:
self.session_id = self.consul_client.session.create(
'distributed_lock_%d' % time.time(),
socket.gethostname(), lock_delay=0,
behavior='delete', ttl=timeout)
ret = self.consul_client.kv.put(
'distributed_lock', session_id, acquire=session_id)
except Exception as e:
print e.message
return False
return ret

def unlock():
try:
ret = self.consul_client.kv.put(
'distributed_lock', None, release=session_id)
except Exception as e:
print e.message
return False
return ret