分布式锁是分布式系统中用于协调多个节点对共享资源进行互斥访问的关键机制。

核心目标:在任意时刻,只有一个节点能操作某项共享资源,从而避免数据不一致、重复处理、资源竞争等问题。

以下是分布式锁的典型使用场景:

  • 防止重复提交 / 幂等性控制:防止用户多次点击“支付”按钮,或网络重试(防止请求重复)

  • 秒杀 / 抢购系统:高并发下抢购限量商品(防止出现商品超卖问题)

  • 定时任务防重:多个服务实例部署了相同的定时任务(防止任务被多个节点同时执行,导致数据重复处理或资源浪费)

  • 分布式 ID 生成:Snowflake 算法中,机器 ID 需全局唯一(防止多节点启动时可能分配到相同机器 ID)

  • 缓存更新:缓存失效瞬间,大量请求穿透到数据库(预防数据库压力骤增,甚至宕机)

  • 状态机流转控制:订单状态从“待支付” → “已支付” → “已发货”,需严格顺序(并发请求可能导致状态错乱,如重复发货)

  • 分布式配置变更协调:多个服务实例需同时应用新配置(如灰度发布),实例间配置不一致导致业务异常

  • 文件/资源互斥写入:多个节点需写入同一个共享文件(如日志聚合、报表生成),导致文件内容错乱或覆盖

为了确保分布式锁可用,至少要确保锁实现同时满足以下四个条件

  1. 互斥性。在任意时刻,只有一个客户端能持有锁。

  2. 不发生死锁。即使有客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。

  3. 容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。

  4. 隔离性。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

常见的实现方案有以下几种

  1. 基于数据库(MySQL)实现:利用数据库的唯一约束(如唯一索引)或行锁(如 SELECT ... FOR UPDATE)实现互斥。

  2. 基于 Redis 实现:利用 Redis 的原子操作(如 SET key value NX EX)实现锁。

  3. 基于 ZooKeeper 实现:利用 ZooKeeper 的临时顺序节点(ephemeral sequential node)和 watch 机制。

  4. 基于 Etcd 实现:类似 ZooKeeper,利用 Etcd 的 Lease(租约)和 Watch 机制。

  5. 其他方案

    • Consul:基于 Raft 协议,提供 KV 存储和 Session 机制实现分布式锁。
    • 自研方案:结合多种存储(如 Redis + MySQL 双写校验),但复杂度高。
方案 一致性 性能 可靠性 自动过期 适用场景
数据库 依赖 DB 需手动 低并发、简单场景
Redis 最终 单点风险 支持 高并发、性能敏感场景
ZooKeeper 支持 强一致性要求场景
Etcd 支持 云原生、K8s 生态

基于数据库实现

利用数据库的唯一约束(如唯一索引)或行锁(如 SELECT ... FOR UPDATE)实现互斥。

  • 唯一索引法:尝试插入一条带唯一键的记录,插入成功即获得锁,失败则表示锁已被占用。

  • 乐观锁:通过版本号(version)控制并发更新。

  • 悲观锁:使用 SELECT ... FOR UPDATE 获取行级排他锁。

优点:简单易懂,无需引入额外组件。

缺点

  • 性能较差(尤其高并发下数据库压力大);

  • 依赖数据库可靠性;

  • 不支持自动过期(需额外处理死锁);

实现案例

通过数据库的排他锁来实现分布式锁。 基于MySQL的InnoDB引擎,可以使用以下方法来实现加锁操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 阻塞锁? for update语句会在执行成功后立即返回,在执行失败时一直处于阻塞状态,直到成功。
// 锁定之后服务宕机,无法释放?使用这种方式,服务宕机之后数据库会自己把锁释放掉。
// 排他锁长时间不提交,就会占用数据库连接。一旦类似的连接变得多了,就可能把数据库连接池撑爆
public boolean lock(){
connection.setAutoCommit(false)
while(true){
try{
result = "select * from methodLock where method_name=xxx for update";
if(result==null){
return true;
}
}catch(Exception e){

}
sleep(1000);
}
return false;
}

// 解锁
public void unlock(){
connection.commit();
}

在查询语句后面增加for update (nowait),数据库会在查询过程中给数据库表增加排他锁( InnoDB引擎在加锁的时候,只有通过唯一索引进行检索的时候才会使用行级锁,否则会使用表级锁)。当某条记录被加上排他锁之后,其他线程无法再该行记录上增加排他锁。

共享锁:由读表操作加的锁,加锁后其他用户只能获取该表或行的共享锁,不能获取排它锁,也就是说只能读不能写

排它锁:由写表操作加的锁,加锁后其他用户不能获取该表或行的任何锁,典型是mysql事务中

基于 Redis 实现

利用 Redis 的原子操作(如 SET key value NX EX)实现锁。

1
SET lock_key unique_value NX EX 30
  • NX:仅当 key 不存在时设置(保证原子性);

  • EX 30:设置过期时间(防止死锁);

  • unique_value:用于解锁时校验(避免误删他人锁)。

优点

  • 高性能、低延迟;

  • 支持自动过期;

  • 社区方案成熟(如 Redisson)。

缺点

  • 单点故障:主从架构下主节点宕机可能导致锁丢失(需用 Redlock 算法缓解);

  • 网络分区时可能违反互斥性(CAP 理论限制)。

简单实现

加锁的过程很简单,就是通过SET指令来设置值,成功则返回;否则就循环等待,在timeout时间内仍未获取到锁,则获取失败

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public class RedisLock {
private String lock_key = "redis_lock"; //锁键
protected long internalLockLeaseTime = 30000;//锁过期时间
private long timeout = 999999; //获取锁的超时时间

//SET命令的参数
SetParams params = SetParams.setParams().nx().px(internalLockLeaseTime);

@Autowired
JedisPool jedisPool;

// 加锁
public boolean lock(String id){
Jedis jedis = jedisPool.getResource();
Long start = System.currentTimeMillis();
try{
for(;;){
//SET命令返回OK ,则证明获取锁成功
// NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
// PX,意思是我们要给这个key加一个过期的设置
String lock = jedis.set(lock_key, id, "NX", "PX", internalLockLeaseTime);
if("OK".equals(lock)){
return true;
}
//否则循环等待,在timeout时间内仍未获取到锁,则获取失败
long l = System.currentTimeMillis() - start;
if (l>=timeout) {
return false;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}finally {
jedis.close();
}
}

// 解锁
public boolean unlock(String id){
Jedis jedis = jedisPool.getResource();
String script =
"if redis.call('get',KEYS[1]) == ARGV[1] then" +
" return redis.call('del',KEYS[1]) " +
"else" +
" return 0 " +
"end";
try {
Object result = jedis.eval(script, Collections.singletonList(lock_key),
Collections.singletonList(id));
if("1".equals(result.toString())){
return true;
}
return false;
}finally {
jedis.close();
}
}


int count = 0;
// 测试
public String index() throws InterruptedException {

int clientcount =1000;
CountDownLatch countDownLatch = new CountDownLatch(clientcount);

ExecutorService executorService = Executors.newFixedThreadPool(clientcount);
long start = System.currentTimeMillis();
for (int i = 0;i<clientcount;i++){
executorService.execute(() -> {

//通过Snowflake算法获取唯一的ID字符串
String id = IdUtil.getId();
try {
redisLock.lock(id);
count++;
}finally {
redisLock.unlock(id);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
long end = System.currentTimeMillis();
logger.info("执行线程数:{},总耗时:{},count数为:{}",clientcount,end-start,count);
return "Hello";
}

存在的问题:

  • 锁不具有可重入性

  • ​ 业务未处理而主动释放锁,此时锁已到期。

获取锁-错误案例1: 过期时间设置分两步,不具有原子性,易发生死锁

1
2
3
4
5
Long result = jedis.setnx(lockKey, requestId);
if (result == 1) {
// 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁,锁永远不会过期
jedis.expire(lockKey, expireTime);
}

获取锁-错误案例2:过期时间分步判断且各客户端时间必须强一致,不具有原子性,在时间判断过程中到期,则可能被其他客户端加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static boolean wrongGetLock2(Jedis jedis, String lockKey, int expireTime) {
long expires = System.currentTimeMillis() + expireTime;
String expiresStr = String.valueOf(expires);

// 如果当前锁不存在,返回加锁成功
if (jedis.setnx(lockKey, expiresStr) == 1) {
return true;
}

// 如果锁存在,获取锁的过期时间
String currentValueStr = jedis.get(lockKey);
if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
// 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间
String oldValueStr = jedis.getSet(lockKey, expiresStr);
if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
// 考虑多线程并发的情况,只有一个线程的设置值和当前值相同,它才有权利加锁
return true;
}
}
// 其他情况,一律返回加锁失败
return false;
}

Redisson 实现

当已经被加锁,存在该key且未过期时,其他线程将 while (true) {} 一直尝试获取锁,而不是队列等待

实现原理

img

加锁机制

如果该客户端面对的是一个Redis Cluster集群,它首先会根据hash节点选择一台机器,再发送一段lua脚本到redis,

保存Hash 数据结构:myLock:{threadId:1}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 可重入加锁判断
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 累加1
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);", // 返回剩余时间
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

锁互斥机制

根据LUA脚本尝试加锁,若不成功,不断尝试,直到信号量释放获取到锁或被中断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}

RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}

try {
// 不断尝试获取锁
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}

// 锁的剩余时间
if (ttl >= 0) {
try {
// 如果在给定的等待时间内变为可用并且当前线程未被中断,则从此信号量获取许可。
// 如果没有可用的许可,则当前线程将出于线程调度目的而被禁用并处于休眠状态,直到发生以下三种情况之一:
// 1.其他线程调用该信号量的release方法,当前线程接下来将被分配一个许可;
// 2.其他线程中断当前线程; 3. 指定的等待时间已过。
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
// 从这个信号量中获取一个许可,阻塞直到一个信号量可用,或者线程被中断。
//获得许可证(如果有)并立即返回,将可用许可证的数量减少一个。
future.getNow().getLatch().acquire();
} else {
//从此信号量获取许可,阻塞直到一个可用。获得许可证(如果有)并立即返回,将可用许可证的数量减少一个。
//如果没有可用的许可,则当前线程将出于线程调度目的而被禁用并处于休眠状态,直到某个其他线程调用此信号量的release方法。
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(future, threadId);
}
}

释放锁(LUA脚本)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " + // 判断可重入次数(加锁次数)
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " + //删除key
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

自动延期机制

客户端A加锁的锁key默认生存时间只有30秒,如果超过了30秒,客户端A还想一直持有这把锁,怎么办

只要客户端A一旦加锁成功,就会启动一个watch dog看门狗,它是一个后台线程,会每隔10秒检查一下,如果客户端A还持有锁key,那么就会不断的延长锁key的生存时间

存在的问题

master-slave主从异步复制导致redis分布式锁的缺陷,原锁丢失/多个客户端加锁。

在master实例宕机的时候,可能导致多个客户端同时完成加锁

为了Redis的高可用,一般都会给Redis的节点挂一个slave,然后采用哨兵模式进行主备切换。但由于Redis的主从复制(replication)是异步的,这可能会出现在数据同步过程中,master宕机,slave来不及同步数据就被选为master,从而数据丢失

这就会导致客户端B来尝试加锁的时候,在新的master上完成了加锁,而客户端以为自己成功加了锁,就会导致多个客户端对一个分布式锁完成了加锁。这时就会导致各种脏数据的产生。

Redis集群锁(Redlock算法

在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制

我们确保将在N个实例上使用与在Redis单实例下相同方法获取和释放锁。现在我们假设有5个Redis master节点,同时我们需要在5台服务器上面运行这些Redis实例,这样保证他们不会同时都宕掉。

img

为了取到锁,客户端应该执行以下操作:

  1. 获取当前Unix时间,以毫秒为单位。

  2. 依次尝试从N个实例,使用相同的key和随机值获取锁。在步骤2,当向Redis设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个Redis实例。

  3. 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。

  4. 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。

  5. 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功)。

看门狗(延长锁时效)

假设锁超时时间是 30 秒,此时程序需要每隔一段时间去扫描一下该锁是否还存在,扫描时间需要小于超时时间,通常可以设置为超时时间的 1/3,在这里也就是 10 秒扫描一次。如果锁还存在,则重置其超时时间恢复到 30 秒。

通过这种方案,只要业务还没有处理完成,锁就会一直有效;而当业务一旦处理完成,程序也会马上删除该锁。

1.watchDog 只有在未显示指定加锁时间时才会生效。(这点很重要)

2.lockWatchdogTimeout 设定的时间不要太小 ,比如我之前设置的是 100毫秒,由于网络直接导致加锁完后,watchdog去延期时,这个key在redis中已经被删除了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime
, TimeUnit unit, long threadId) {
//如果指定了加锁时间,会直接去加锁
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit,
threadId, RedisCommands.EVAL_LONG);
}
//没有指定加锁时间 会先进行加锁,并且默认时间就是 LockWatchdogTimeout的时间
//这个是异步操作 返回RFuture 类似netty中的future
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
//这里也是类似netty Future 的addListener,在future内容执行完成后执行
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}

if (ttlRemaining == null) {
//这里是定时执行 当前锁自动延期的动作
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}

lua脚本判断 锁是否在,如果存在就进行 pexpire 延期。

1
2
3
4
5
6
7
8
9
10
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}

基于 ZooKeeper 实现

Zookeeper 的分布式实现依赖以下关键特性:

1. 临时节点和顺序节点: 临时节点在客户端会话结束时自动删除,顺序节点则会根据创建顺序生成唯一编号。这些特性被用于实现分布式锁和任务队列。

2. Watcher 机制: 客户端可以对节点的变化(如创建、删除、数据变更)设置监听器。当节点状态发生变化时,Zookeeper 会通知相关客户端,从而实现事件驱动的分布式协调。

实现步骤

  1. 所有客户端在 /locks 路径下创建临时顺序节点(如 /locks/lock_0000000001);

  2. 客户端检查自己创建的节点是否为序号最小的节点:

    • 是 → 获得锁;
    • 否 → 监听前一个序号的节点(watch),等待其释放(节点删除)后重试。

优点

  • 强一致性(ZAB 协议保证);

  • 临时节点自动释放(客户端宕机时锁自动释放);

  • 无单点问题(ZooKeeper 集群高可用)。

缺点

  • 性能低于 Redis(写操作需多数派确认);

  • 运维复杂度高;

  • 网络抖动可能导致频繁重连。

实现案例

Zookeeper 的分布式锁通过以下步骤实现:

  1. 创建节点: 客户端在指定路径下创建临时顺序节点。如果节点是路径下最小的节点,则获取锁;否则,监听前一个节点的删除事件。

  2. 监听机制: 未获取锁的客户端会阻塞并监听前一个节点的变化。当前一个节点被删除时,触发监听器,重新尝试获取锁。

  3. 释放锁: 获取锁的客户端完成任务后,删除自身节点,通知下一个等待的客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void zkLock() {
currentNode = zk.create("/locks/seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> childrenNodes = zk.getChildren("/locks", false);
Collections.sort(childrenNodes);
String thisNode = currentNode.substring("/locks/".length());
int index = childrenNodes.indexOf(thisNode);
if (index == 0) {
return; // 当前节点是最小节点,获取锁
} else {
waitPath = "/locks/" + childrenNodes.get(index - 1);
zk.getData(waitPath, true, new Stat());
waitDownLatch.await(); // 等待前一个节点释放锁
}
}

使用 Curator 框架:Curator 是对 Zookeeper 的封装,提供了更高效的分布式锁实现。通过 InterProcessMutex 类,可以轻松实现分布式锁:

1
2
3
4
InterProcessLock lock = new InterProcessMutex(curatorFramework, "/locks");
lock.acquire(); // 获取锁
// 执行业务逻辑
lock.release(); // 释放锁

基于 Etcd 实现

类似 ZooKeeper,利用 Etcd 的 Lease(租约)和 Watch 机制。

  • 通过 Put 操作尝试创建带租约的 key;

  • 若 key 不存在则创建成功(获得锁),否则监听 key 的删除事件;

  • 租约到期自动删除 key(防止死锁)。

实现原理

  1. 租约机制:etcd 使用租约(Lease)为键值对设置 TTL(生存时间)。当租约到期时,键值对会自动删除,从而释放锁。

  2. 事务机制:通过事务操作确保只有一个客户端能够成功创建特定的键(即加锁)。

  3. 公平性:etcd 基于键的创建版本号(Revision)实现公平性,确保按顺序获取锁。

优点

  • 高可用、强一致性(Raft 协议);

  • 支持 TTL 自动过期;

  • 云原生生态友好(Kubernetes 底层依赖)。

缺点

  • 性能略低于 Redis;

  • 学习成本较高。

自定义实现

加锁逻辑(支持阻塞等待),不保证公平性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
long start = System.currentTimeMillis();
long timeoutMs = unit.toMillis(timeout);

while (System.currentTimeMillis() - start < timeoutMs) {
try {
// 1. 创建租约
LeaseGrantResponse leaseGrant = leaseClient.grant(leaseTTL).get();
this.leaseId = leaseGrant.getID();

// 2. 尝试获取锁(put 操作)
PutResponse putResponse = kvClient.put(
ByteSequence.from(lockKey, StandardCharsets.UTF_8),
ByteSequence.from("locked", StandardCharsets.UTF_8),
PutOption.newBuilder().withLeaseId(leaseId).build()
).get();

this.revision = putResponse.getHeader().getRevision();

// 3. 检查是否是最小 revision(即是否获得锁)
// 实际上,etcd 锁的标准做法是:尝试 put 后,检查是否有比自己 revision 更小的 key
// 但更高效的方式是:直接尝试 put,然后通过 range 查询判断自己是否最小
// 简化版:我们假设只要 put 成功且没有更小的 revision,就获得锁
// 但标准做法应使用 etcd 的锁 API(见下文推荐)

// 简化实现:直接认为 put 成功即获得锁(仅适用于单客户端竞争)
// 严格公平锁需检查 revision 队列(见下方“完整公平锁”说明)

return true;

} catch (ExecutionException e) {
// 租约或 put 失败,清理并重试
if (leaseId != -1) {
leaseClient.revoke(leaseId);
leaseId = -1;
}
Thread.sleep(100); // 短暂等待后重试
}
}
return false;
}

jetcd 实现

使用 jetcd 内置的 Lock 客户端。jetcd 提供了封装好的分布式锁 API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) throws Exception {
Client client = Client.builder()
.endpoints("http://127.0.0.1:2379")
.build();

Lock lockClient = client.getLockClient();
ByteSequence lockName = ByteSequence.from("/my-lock", StandardCharsets.UTF_8);

// 获取锁(阻塞直到获得)
LockResponse lockResponse = lockClient.lock(lockName).get(10, TimeUnit.SECONDS);
System.out.println("Got lock, lease ID: " + lockResponse.getLease());

try {
// 执行业务逻辑
System.out.println("Working...");
Thread.sleep(5000);
} finally {
// 释放锁
lockClient.unlock(lockResponse.getKey()).get();
System.out.println("Lock released.");
}

client.close();
}

注意事项

  1. 租约续期
    jetcd 的 Lock 客户端不会自动续租!若业务执行时间 > leaseTTL,锁会自动释放。
    解决方案

    • 设置足够长的 TTL;
    • 或启动后台线程定期 leaseClient.keepAlive(leaseId)(需自行管理)。
  2. 异常处理:网络中断、etcd 宕机时,锁可能提前释放,需业务层保证幂等性。

  3. 锁命名:建议使用 /locks/service_name/resource_id 格式避免冲突。

  4. 性能
    etcd 锁的吞吐量低于 Redis,适用于强一致性、低频竞争场景。