分布式锁是分布式系统中用于协调多个节点对共享资源进行互斥访问的关键机制。
核心目标:在任意时刻,只有一个节点能操作某项共享资源,从而避免数据不一致、重复处理、资源竞争等问题。
以下是分布式锁的典型使用场景:
-
防止重复提交 / 幂等性控制:防止用户多次点击“支付”按钮,或网络重试(防止请求重复)
-
秒杀 / 抢购系统:高并发下抢购限量商品(防止出现商品超卖问题)
-
定时任务防重:多个服务实例部署了相同的定时任务(防止任务被多个节点同时执行,导致数据重复处理或资源浪费)
-
分布式 ID 生成:Snowflake 算法中,机器 ID 需全局唯一(防止多节点启动时可能分配到相同机器 ID)
-
缓存更新:缓存失效瞬间,大量请求穿透到数据库(预防数据库压力骤增,甚至宕机)
-
状态机流转控制:订单状态从“待支付” → “已支付” → “已发货”,需严格顺序(并发请求可能导致状态错乱,如重复发货)
-
分布式配置变更协调:多个服务实例需同时应用新配置(如灰度发布),实例间配置不一致导致业务异常
-
文件/资源互斥写入:多个节点需写入同一个共享文件(如日志聚合、报表生成),导致文件内容错乱或覆盖
为了确保分布式锁可用,至少要确保锁实现同时满足以下四个条件:
-
互斥性。在任意时刻,只有一个客户端能持有锁。
-
不发生死锁。即使有客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
-
容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
-
隔离性。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
常见的实现方案有以下几种:
-
基于数据库(MySQL)实现:利用数据库的唯一约束(如唯一索引)或行锁(如 SELECT ... FOR UPDATE)实现互斥。
-
基于 Redis 实现:利用 Redis 的原子操作(如 SET key value NX EX)实现锁。
-
基于 ZooKeeper 实现:利用 ZooKeeper 的临时顺序节点(ephemeral sequential node)和 watch 机制。
-
基于 Etcd 实现:类似 ZooKeeper,利用 Etcd 的 Lease(租约)和 Watch 机制。
-
其他方案:
- Consul:基于 Raft 协议,提供 KV 存储和 Session 机制实现分布式锁。
- 自研方案:结合多种存储(如 Redis + MySQL 双写校验),但复杂度高。
| 方案 |
一致性 |
性能 |
可靠性 |
自动过期 |
适用场景 |
| 数据库 |
弱 |
低 |
依赖 DB |
需手动 |
低并发、简单场景 |
| Redis |
最终 |
高 |
单点风险 |
支持 |
高并发、性能敏感场景 |
| ZooKeeper |
强 |
中 |
高 |
支持 |
强一致性要求场景 |
| Etcd |
强 |
中 |
高 |
支持 |
云原生、K8s 生态 |
基于数据库实现
利用数据库的唯一约束(如唯一索引)或行锁(如 SELECT ... FOR UPDATE)实现互斥。
-
唯一索引法:尝试插入一条带唯一键的记录,插入成功即获得锁,失败则表示锁已被占用。
-
乐观锁:通过版本号(version)控制并发更新。
-
悲观锁:使用 SELECT ... FOR UPDATE 获取行级排他锁。
优点:简单易懂,无需引入额外组件。
缺点:
-
性能较差(尤其高并发下数据库压力大);
-
依赖数据库可靠性;
-
不支持自动过期(需额外处理死锁);
实现案例:
通过数据库的排他锁来实现分布式锁。 基于MySQL的InnoDB引擎,可以使用以下方法来实现加锁操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
public boolean lock(){ connection.setAutoCommit(false) while(true){ try{ result = "select * from methodLock where method_name=xxx for update"; if(result==null){ return true; } }catch(Exception e){
} sleep(1000); } return false; }
public void unlock(){ connection.commit(); }
|
在查询语句后面增加for update (nowait),数据库会在查询过程中给数据库表增加排他锁( InnoDB引擎在加锁的时候,只有通过唯一索引进行检索的时候才会使用行级锁,否则会使用表级锁)。当某条记录被加上排他锁之后,其他线程无法再该行记录上增加排他锁。
共享锁:由读表操作加的锁,加锁后其他用户只能获取该表或行的共享锁,不能获取排它锁,也就是说只能读不能写
排它锁:由写表操作加的锁,加锁后其他用户不能获取该表或行的任何锁,典型是mysql事务中
基于 Redis 实现
利用 Redis 的原子操作(如 SET key value NX EX)实现锁。
1
| SET lock_key unique_value NX EX 30
|
优点:
-
高性能、低延迟;
-
支持自动过期;
-
社区方案成熟(如 Redisson)。
缺点:
简单实现
加锁的过程很简单,就是通过SET指令来设置值,成功则返回;否则就循环等待,在timeout时间内仍未获取到锁,则获取失败
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| public class RedisLock { private String lock_key = "redis_lock"; protected long internalLockLeaseTime = 30000; private long timeout = 999999;
SetParams params = SetParams.setParams().nx().px(internalLockLeaseTime);
@Autowired JedisPool jedisPool;
public boolean lock(String id){ Jedis jedis = jedisPool.getResource(); Long start = System.currentTimeMillis(); try{ for(;;){ String lock = jedis.set(lock_key, id, "NX", "PX", internalLockLeaseTime); if("OK".equals(lock)){ return true; } long l = System.currentTimeMillis() - start; if (l>=timeout) { return false; } try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } }finally { jedis.close(); } }
public boolean unlock(String id){ Jedis jedis = jedisPool.getResource(); String script = "if redis.call('get',KEYS[1]) == ARGV[1] then" + " return redis.call('del',KEYS[1]) " + "else" + " return 0 " + "end"; try { Object result = jedis.eval(script, Collections.singletonList(lock_key), Collections.singletonList(id)); if("1".equals(result.toString())){ return true; } return false; }finally { jedis.close(); } }
int count = 0;
public String index() throws InterruptedException {
int clientcount =1000; CountDownLatch countDownLatch = new CountDownLatch(clientcount);
ExecutorService executorService = Executors.newFixedThreadPool(clientcount); long start = System.currentTimeMillis(); for (int i = 0;i<clientcount;i++){ executorService.execute(() -> {
String id = IdUtil.getId(); try { redisLock.lock(id); count++; }finally { redisLock.unlock(id); } countDownLatch.countDown(); }); } countDownLatch.await(); long end = System.currentTimeMillis(); logger.info("执行线程数:{},总耗时:{},count数为:{}",clientcount,end-start,count); return "Hello"; }
|
存在的问题:
-
锁不具有可重入性。
-
业务未处理而主动释放锁,此时锁已到期。
获取锁-错误案例1: 过期时间设置分两步,不具有原子性,易发生死锁
1 2 3 4 5
| Long result = jedis.setnx(lockKey, requestId); if (result == 1) { jedis.expire(lockKey, expireTime); }
|
获取锁-错误案例2:过期时间分步判断且各客户端时间必须强一致,不具有原子性,在时间判断过程中到期,则可能被其他客户端加锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public static boolean wrongGetLock2(Jedis jedis, String lockKey, int expireTime) { long expires = System.currentTimeMillis() + expireTime; String expiresStr = String.valueOf(expires);
if (jedis.setnx(lockKey, expiresStr) == 1) { return true; }
String currentValueStr = jedis.get(lockKey); if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) { String oldValueStr = jedis.getSet(lockKey, expiresStr); if (oldValueStr != null && oldValueStr.equals(currentValueStr)) { return true; } } return false; }
|
Redisson 实现
当已经被加锁,存在该key且未过期时,其他线程将 while (true) {} 一直尝试获取锁,而不是队列等待
实现原理
加锁机制
如果该客户端面对的是一个Redis Cluster集群,它首先会根据hash节点选择一台机器,再发送一段lua脚本到redis,
保存Hash 数据结构:myLock:{threadId:1}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
|
锁互斥机制
根据LUA脚本尝试加锁,若不成功,不断尝试,直到信号量释放获取到锁或被中断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(-1, leaseTime, unit, threadId); if (ttl == null) { return; }
RFuture<RedissonLockEntry> future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); }
try { while (true) { ttl = tryAcquire(-1, leaseTime, unit, threadId); if (ttl == null) { break; }
if (ttl >= 0) { try { future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { future.getNow().getLatch().acquire(); } else {
future.getNow().getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(future, threadId); } }
|
释放锁(LUA脚本)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
|
自动延期机制
客户端A加锁的锁key默认生存时间只有30秒,如果超过了30秒,客户端A还想一直持有这把锁,怎么办
只要客户端A一旦加锁成功,就会启动一个watch dog看门狗,它是一个后台线程,会每隔10秒检查一下,如果客户端A还持有锁key,那么就会不断的延长锁key的生存时间
存在的问题
master-slave主从异步复制导致redis分布式锁的缺陷,原锁丢失/多个客户端加锁。
在master实例宕机的时候,可能导致多个客户端同时完成加锁。
为了Redis的高可用,一般都会给Redis的节点挂一个slave,然后采用哨兵模式进行主备切换。但由于Redis的主从复制(replication)是异步的,这可能会出现在数据同步过程中,master宕机,slave来不及同步数据就被选为master,从而数据丢失。
这就会导致客户端B来尝试加锁的时候,在新的master上完成了加锁,而客户端以为自己成功加了锁,就会导致多个客户端对一个分布式锁完成了加锁。这时就会导致各种脏数据的产生。
Redis集群锁(Redlock算法)
在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。
我们确保将在N个实例上使用与在Redis单实例下相同方法获取和释放锁。现在我们假设有5个Redis master节点,同时我们需要在5台服务器上面运行这些Redis实例,这样保证他们不会同时都宕掉。
为了取到锁,客户端应该执行以下操作:
-
获取当前Unix时间,以毫秒为单位。
-
依次尝试从N个实例,使用相同的key和随机值获取锁。在步骤2,当向Redis设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个Redis实例。
-
客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
-
如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
-
如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功)。
看门狗(延长锁时效)
假设锁超时时间是 30 秒,此时程序需要每隔一段时间去扫描一下该锁是否还存在,扫描时间需要小于超时时间,通常可以设置为超时时间的 1/3,在这里也就是 10 秒扫描一次。如果锁还存在,则重置其超时时间恢复到 30 秒。
通过这种方案,只要业务还没有处理完成,锁就会一直有效;而当业务一旦处理完成,程序也会马上删除该锁。
1.watchDog 只有在未显示指定加锁时间时才会生效。(这点很重要)
2.lockWatchdogTimeout 设定的时间不要太小 ,比如我之前设置的是 100毫秒,由于网络直接导致加锁完后,watchdog去延期时,这个key在redis中已经被删除了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime , TimeUnit unit, long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; }
if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
|
lua脚本判断 锁是否在,如果存在就进行 pexpire 延期。
1 2 3 4 5 6 7 8 9 10
| protected RFuture<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
|
基于 ZooKeeper 实现
Zookeeper 的分布式实现依赖以下关键特性:
1. 临时节点和顺序节点: 临时节点在客户端会话结束时自动删除,顺序节点则会根据创建顺序生成唯一编号。这些特性被用于实现分布式锁和任务队列。
2. Watcher 机制: 客户端可以对节点的变化(如创建、删除、数据变更)设置监听器。当节点状态发生变化时,Zookeeper 会通知相关客户端,从而实现事件驱动的分布式协调。
实现步骤:
-
所有客户端在 /locks 路径下创建临时顺序节点(如 /locks/lock_0000000001);
-
客户端检查自己创建的节点是否为序号最小的节点:
- 是 → 获得锁;
- 否 → 监听前一个序号的节点(watch),等待其释放(节点删除)后重试。
优点:
-
强一致性(ZAB 协议保证);
-
临时节点自动释放(客户端宕机时锁自动释放);
-
无单点问题(ZooKeeper 集群高可用)。
缺点:
-
性能低于 Redis(写操作需多数派确认);
-
运维复杂度高;
-
网络抖动可能导致频繁重连。
实现案例:
Zookeeper 的分布式锁通过以下步骤实现:
-
创建节点: 客户端在指定路径下创建临时顺序节点。如果节点是路径下最小的节点,则获取锁;否则,监听前一个节点的删除事件。
-
监听机制: 未获取锁的客户端会阻塞并监听前一个节点的变化。当前一个节点被删除时,触发监听器,重新尝试获取锁。
-
释放锁: 获取锁的客户端完成任务后,删除自身节点,通知下一个等待的客户端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public void zkLock() { currentNode = zk.create("/locks/seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); List<String> childrenNodes = zk.getChildren("/locks", false); Collections.sort(childrenNodes); String thisNode = currentNode.substring("/locks/".length()); int index = childrenNodes.indexOf(thisNode); if (index == 0) { return; } else { waitPath = "/locks/" + childrenNodes.get(index - 1); zk.getData(waitPath, true, new Stat()); waitDownLatch.await(); } }
|
使用 Curator 框架:Curator 是对 Zookeeper 的封装,提供了更高效的分布式锁实现。通过 InterProcessMutex 类,可以轻松实现分布式锁:
1 2 3 4
| InterProcessLock lock = new InterProcessMutex(curatorFramework, "/locks"); lock.acquire();
lock.release();
|
基于 Etcd 实现
类似 ZooKeeper,利用 Etcd 的 Lease(租约)和 Watch 机制。
实现原理
-
租约机制:etcd 使用租约(Lease)为键值对设置 TTL(生存时间)。当租约到期时,键值对会自动删除,从而释放锁。
-
事务机制:通过事务操作确保只有一个客户端能够成功创建特定的键(即加锁)。
-
公平性:etcd 基于键的创建版本号(Revision)实现公平性,确保按顺序获取锁。
优点:
缺点:
自定义实现
加锁逻辑(支持阻塞等待),不保证公平性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { long start = System.currentTimeMillis(); long timeoutMs = unit.toMillis(timeout);
while (System.currentTimeMillis() - start < timeoutMs) { try { LeaseGrantResponse leaseGrant = leaseClient.grant(leaseTTL).get(); this.leaseId = leaseGrant.getID();
PutResponse putResponse = kvClient.put( ByteSequence.from(lockKey, StandardCharsets.UTF_8), ByteSequence.from("locked", StandardCharsets.UTF_8), PutOption.newBuilder().withLeaseId(leaseId).build() ).get();
this.revision = putResponse.getHeader().getRevision();
return true;
} catch (ExecutionException e) { if (leaseId != -1) { leaseClient.revoke(leaseId); leaseId = -1; } Thread.sleep(100); } } return false; }
|
jetcd 实现
使用 jetcd 内置的 Lock 客户端。jetcd 提供了封装好的分布式锁 API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public static void main(String[] args) throws Exception { Client client = Client.builder() .endpoints("http://127.0.0.1:2379") .build();
Lock lockClient = client.getLockClient(); ByteSequence lockName = ByteSequence.from("/my-lock", StandardCharsets.UTF_8);
LockResponse lockResponse = lockClient.lock(lockName).get(10, TimeUnit.SECONDS); System.out.println("Got lock, lease ID: " + lockResponse.getLease());
try { System.out.println("Working..."); Thread.sleep(5000); } finally { lockClient.unlock(lockResponse.getKey()).get(); System.out.println("Lock released."); }
client.close(); }
|
注意事项
-
租约续期:
jetcd 的 Lock 客户端不会自动续租!若业务执行时间 > leaseTTL,锁会自动释放。
解决方案:
- 设置足够长的 TTL;
- 或启动后台线程定期
leaseClient.keepAlive(leaseId)(需自行管理)。
-
异常处理:网络中断、etcd 宕机时,锁可能提前释放,需业务层保证幂等性。
-
锁命名:建议使用 /locks/service_name/resource_id 格式避免冲突。
-
性能:
etcd 锁的吞吐量低于 Redis,适用于强一致性、低频竞争场景。