Redisson分布式锁如何实现可重入
本篇将从源码的角度去讲解Redisson分布式锁如何实现可重入的
我们都知道Redisson
的分布式锁比起我们自己用Redis
实现的分布式锁有许多优点:
- 可重入
- 可重试
- 超时续约
当我们使用Redisson去获取一个分布式锁的时候,大致的代码如下:
@Autowired
private RedissonClient redissonClient; // 注入Redisson客户端
public Result myService() {
// 前置业务
// ......
//创建锁对象
RLock lock = redissonClient.getLock("name");
// 获取锁
boolean isLock = lock.tryLock();
if (!isLock){ // 没有成功获取锁
// 返回错误
}
try {
// 执行业务
}finally {
lock.unlock();// 释放锁
}
}
可重入
在实现分布式锁时,我们通常会是利用 Redis 的 String 数据结构和 SETNX
命令,我们将锁存储为一个键(如 lock
),值(value
)为当前线程的唯一标识符:threadId
,如果 SETNX
成功,就说明获取到了锁;否则,说明锁已被占用
但是,这种方法有一个限制:不支持可重入。
可重入锁的意思是,如果同一线程多次尝试获取同一把锁,应该是允许的,如在A方法中获取了锁,此时A方法调用了B方法,而在B方法中也需要获取锁,但是锁已经被A方法获取,由于是A调用了B,A方法并没有结束,也就无法释放锁,但是A,B方法同处于一个线程中,B尝试获取锁应该是被允许的
与我们自己实现分布式锁不同,Redisson采用的数据结构不是String而是Hash,获取锁时,不仅仅是将线程的唯一标识存入,而是多存入一个重入次数:count,可以来帮助我们记录在当前线程中获取过多少次锁,比对线程标识符,处于同一线程的其他方法也要获取锁时,重入次数就会+1,同理释放锁时,也会进行-1操作,如果-1后重入次数为0,才真正的释放锁,即删除key
tryLock()
查看Redisson中尝试获取锁的方法:tryLock()
的源码,我们使用的是无参的方法,没有设定等待时间和过期时间:
public boolean tryLock() {
return (Boolean)this.get(this.tryLockAsync());
}
无参的tryLock()
方法又调用的无参的tryLockAsync()
方法
public RFuture<Boolean> tryLockAsync() {
return this.tryLockAsync(Thread.currentThread().getId());
}
而无参的tryLockAsync()
方法又调用了带参数的tryLockAsync()
方法,通过获取当前线程ID传入了线程唯一标识符:threadId
public RFuture<Boolean> tryLockAsync(long threadId) {
return this.tryAcquireOnceAsync(-1L, -1L, (TimeUnit)null, threadId);
}
带参数的tryLockAsync()
方法调用tryAcquireOnceAsync()
方法,并传入四个参数:
-
waitTime
:锁的最大等待时间,如果在这个时间内没有获得锁,操作会失败。 -
leaseTime
:锁的最大持有时间,锁会在这个时间后自动过期,防止死锁。 -
unit
:leaseTime
时间的单位。 -
threadId
:当前线程的唯一标识符,用于标识获取锁的线程。
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) { //调用tryLockInnerAsync()
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
// 调用tryLockInnerAsync()
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
此时tryAcquireOnceAsync()
方法会对传入的leaseTime
进行判断,如果我们没有指定过期时间,传入的值为-1,表示使用默认过期时间,无论是否指定了过期时间,该方法都会调用tryLockInnerAsync()
方法并传入全部参数,而真正实现可重入的关键逻辑也在该方法中:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
该方法中,通过Lua脚本来执行获取锁的操作,通过执行 Lua 脚本来保证加锁操作的原子性,Lua 脚本的具体执行逻辑可以分为以下几步:
1. 检查锁是否存在
首先,Lua 脚本检查锁是否已经存在:
if (redis.call('exists', KEYS[1]) == 0) then
2. 锁不存在时,设置锁和重入次数
如果锁不存在,Lua 脚本执行以下操作:
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
-
hincrby
:将threadId
对应的count
增加 1,即初始化当前线程的重入次数。 -
pexpire
:设置该锁的过期时间,防止因线程异常结束而导致锁无法自动释放。过期时间通过ARGV[1]
传入。
这里的 ARGV[2]
是 threadId
,ARGV[1]
是过期时间
3. 锁已存在,检查是否是当前线程重入
如果锁已经存在,脚本会检查锁是否被当前线程(即 threadId
)持有:
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
hexists
:检查 Redis 哈希表中是否存在threadId
对应的字段。如果该字段存在,说明当前线程已经持有锁,可以进行重入。
4.当前线程重入,增加重入计数
如果是当前线程重入,脚本执行以下操作:
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
hincrby
:再次增加当前线程的重入计数。pexpire
:更新锁的过期时间,防止锁过期。
5. 锁被其他线程持有时,返回失败
如果锁已经被其他线程持有,即 threadId
不匹配,Lua 脚本会返回一个值,表示加锁失败:
return redis.call('pttl', KEYS[1]);
pttl
:获取锁的剩余过期时间。如果锁被其他线程持有,这个命令返回当前锁的过期时间,以便调用者知道锁何时会自动释放。
6. 返回结果
- 如果加锁成功,返回值为
nil
。 - 如果加锁失败,返回值为锁的剩余有效时间(即
pttl
返回的值),表示当前锁被其他线程持有。
unlock()
再查看释放锁unlock()
方法的源码:
由于本篇只关注可重入的实现原理,所以不在赘述前面的调用过程,直接查看异步释放锁的方法源码:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;", Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId));
}
unlockInnerAsync
方法是 Redisson 实现分布式可重入锁的解锁方法。它的主要作用是在释放锁时,减少锁的重入次数,并且在重入次数归零时,删除锁并进行相关的清理操作。整个操作是异步的,并且通过 Lua 脚本实现,以确保操作的原子性和一致性。
1. 检查当前线程是否持有锁
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
hexists
:检查当前线程的threadId
是否存在于 Redis 锁的哈希表中。如果不存在,表示当前线程并未持有锁,方法返回nil
,表示无操作。KEYS[1]
是锁的键(即lock
),ARGV[3]
是当前线程的唯一标识符threadId
。
2. 减少重入次数
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
hincrby
:减少当前线程的重入计数。-1
表示将当前线程的重入次数减 1。如果重入次数大于零,说明当前线程还需要持有锁,解锁操作还没有完成。
3. 如果重入次数大于零,更新过期时间
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
pexpire
:如果锁的重入次数大于零,表示该线程仍然需要继续持有锁,因此重设锁的过期时间,防止锁过期。- return
0
:表示解锁操作完成,但锁没有被完全释放(因为重入次数没有归零)。
4. 如果重入次数归零,删除锁并清理
else
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
del
:如果重入次数减到零,表示锁已经完全释放,此时删除 Redis 中的锁。publish
:通过 Redis 发布消息,通知其他订阅该锁的客户端锁已经释放。- return
1
:表示锁已经完全释放。
5. 默认返回 nil
return nil;
如果没有满足任何条件,方法最终会返回 nil
,表示没有发生任何操作。
通过这种方式,Redisson 的分布式锁能够支持可重入性
标签:重入,Redisson,return,KEYS,redis,ARGV,线程,call,分布式 From: https://blog.csdn.net/Gaomengsuanjia_/article/details/144329991