带你读懂Redisson分布式锁原理
本篇带大家详细了解Redisson分布式锁原理,通过仔细阅读源码,逐步分析获取锁和释放锁的具体流程,并且为大家讲解每一步的执行过程,结尾会附有整个过程的流程图
文章目录
Redisson
是一个在 Java 中用于操作
Redis
的框架,它提供了分布式锁的功能。分布式锁主要用于在分布式系统中,控制多个节点对共享资源的访问,避免数据不一致等问题,其提供了非常不错的
可重试机制和
自动续期功能:
- 可重试机制:
在分布式系统中,多个线程或进程可能同时竞争获取分布式锁,当一个线程尝试获取 Redisson 分布式锁失败时,可重试机制允许该线程在一定条件下再次尝试获取锁,而不是直接放弃,这增加了在高并发场景下获取锁成功的概率 - 自动续期:
当一个线程成功获取 Redisson 分布式锁后,在执行业务逻辑过程中,自动续期机制会自动延长锁的有效期。这样可以防止因为业务逻辑执行时间过长,导致锁提前过期,进而引发其他线程获取锁并访问共享资源,造成数据不一致等问题
我们在使用Redisson分布式锁时,大致的使用方式如下:
@Autowired
private RedissonClient redissonClient; // 注入Redisson客户端
public Result myService() {
// 前置业务
// ......
//创建锁对象
RLock lock = redissonClient.getLock("name");
// 获取锁
boolean isLock = lock.tryLock(1L,TimeUnit.SECONDS);
if (!isLock){ // 没有成功获取锁
// 返回错误
}
try {
// 执行业务
}finally {
lock.unlock();// 释放锁
}
}
当调用tryLock
方法时,他究竟会执行什么样的业务逻辑,让我们来查看他的源码:
重试原理
tryLock
方法:
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return this.tryLock(waitTime, -1L, unit);
}
在tryLock
方法中,调用重载方法设置默认释放时间-1,由于我们没有设置锁的过期时间,他会默认设置为-1,表示没有传参
tryLock
重载方法:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
return true;
} else {
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
this.unsubscribe(subscribeFuture, threadId);
}
});
}
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
boolean var16;
try {
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
boolean var20 = false;
return var20;
}
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
var16 = true;
return var16;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
} finally {
this.unsubscribe(subscribeFuture, threadId);
}
return var16;
}
}
}
}
- long time = unit.toMillis(waitTime):将等待时间转换为毫秒值
- long current = System.currentTimeMillis():获取当前时间的毫秒值
- long threadId = Thread.currentThread().getId():获取线程ID
- Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId):调用
tryAcquire
方法获取返回值是一个Long类型的ttl
在执行完tryAcquire
方法后,执行一大段逻辑代码,我们先省略这些内容,先查看tryAcquire
方法中都做了什么
记住这里记录了一次当前时间current
tryAcquire
方法:
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return (Long)this.get(this.tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
此方法中没有做其他多余的事情,只是调用了tryAcquireAsync
方法
tryAcquireAsync
方法:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining == null) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
首先会判断传入的锁过期时间,由于我们没有设置过期时间,在上面的tryLock
重载方法中,将过期时间设置为了-1,因此这里做条件判断进入了else中的逻辑,将默认过期时间设置为了getLockWatchdogTimeout()
的值,这个值是30s,然后调用了tryLockInnerAsync
方法
这里我们又跳过了tryLockInnerAsync
方法执行完毕后的逻辑部分,下面我们再进行分析
tryLockInnerAsync
方法:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
这里第一步是把一个成员变量internalLockLeaseTime
设置为默认时间30s
请记住这个internalLockLeaseTime
为30s!
接着调用了redis的命令执行方法,执行了一段Lua脚本,如果当前线程的锁没有被获取,则获取锁成功返回一个null,获取锁失败则会返回此锁的剩余过期时间ttl,返回值一路返回,返回到tryLock
重载方法
此时我们又回到了刚刚跳过一大段代码的tryLock
重载方法
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
return true;
} else {
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
this.unsubscribe(subscribeFuture, threadId);
}
});
}
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
boolean var16;
try {
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
boolean var20 = false;
return var20;
}
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
var16 = true;
return var16;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
} finally {
this.unsubscribe(subscribeFuture, threadId);
}
return var16;
}
}
}
}
根据获取到的返回值进行if条件判断
如果ttl == null证明获取锁成功,返回true,如果不等于null则证明获取锁失败,获取锁失败
是否还记得方法一开始获取过当时的时间毫秒值
此时再次获取当前时间相减,得出本次执行获取锁消耗时间,用等待时间减去消耗时间得出剩余时间,判断是否还有时间继续执行,没有时间就返回false,若还有时间,则再次尝试,执行do中的逻辑
会再次获取一次当前时间的毫秒值,执行下面的逻辑:
RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
这条语句的作用是订阅通知,订阅等待有其余线程释放锁的信号,在释放锁unlock
方法的代码中,最终同样会执行一段Lua脚本,对比线程ID,然后对锁进行释放,释放时会发布通知,通知该线程的锁已被释放,而这里就是在等待接受这个通知
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS))
await方法是等待收到信号,等待时间就为剩余时间time,如果等待超过剩余时间就会返回false并且取消订阅
如果超过剩余时间之前收到释放锁的消息,就会往下执行try中的代码:
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
boolean var20 = false;
return var20;
}
再次计算剩余时间,没有剩余时间就返回false
还有剩余时间就再次尝试获取锁:调用tryAcquire方法并获取ttl,
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
var16 = true;
return var16;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)subscribeFuture.getNow())
.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)subscribeFuture.getNow())
.getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
}while(time > 0L);
再次判断返回的ttl,是否获取锁成功,则返回true,不成功则再次判断是否还有剩余时间,没有就返回flase,如果依然有剩余时间,则会执行一次比较判断:
- 如果此时返回的ttl大于零且小于剩余时间,就再次订阅消息等待再次尝试获取锁,最大等待时间是ttl
- 如果剩余时间小于ttl,也会再次订阅消息等待再次尝试获取锁,但最大等待时间是剩余时间
循环往复直到获取成功或者没有剩余时间
这就是可重试的原理
续约原理
是否还记得刚刚未分析完的代码tryAcquireAsync
方法:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining == null) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
当执行Lua脚本的方法返回ttl后:
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining == null) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
当这个回调函数成功后,返回的e为方法中出现的异常,如果e没有异常为null,并且返回的剩余超时时间也为null,表示获取锁成功,执行一个关键方法:scheduleExpirationRenewal
任务调度从方法,要更新过期时间,也就是续期,查看该方法源码:
scheduleExpirationRenewal
方法:
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
this.renewExpiration();
}
}
这里先创建了一个ExpirationEntry
对象,用了一个静态Map:EXPIRATION_RENEWAL_MAP
把这个ExpirationEntry对象put进去,这里只需要知道ExpirationEntry
对象也是一个用来存放信息的容器即可
他的key是一个拼接字符串:
this.id = commandExecutor.getConnectionManager().getId(); // 当前连接ID
this.entryName = this.id + ":" + name;
protected String getEntryName() {
return this.entryName;
}
这里的name就是我们锁的名称,也就是说这个静态Map可以被RedissonLock
类的任何一个实例对象操作,每个实例创建出来的不同的锁都会在这个Map中留下他们的名字,即一个锁对应一个ExpirationEntry
对象
在这个Map调用Put方法时,调用的是putIfAbsent
,表示不存在则put并返回null,如果这个锁的名称已经有ExpirationEntry
对象了,则会返回他的ExpirationEntry
对象赋值给oldEntry
也就是说不管这把锁被重入了几次,他的锁永远只对应一个不变的ExpirationEntry
对象
此时进行判断:
-
如果
oldEntry
不为null,证明已经不止一次重入了,把当前线程ID添加进ExpirationEntry -
如果
oldEntry
为null,是第一次重入,先添加线程ID,接着就要执行renewExpiration()
续期方法
这里为什么只在第一次重入时执行续期方法,会在后面进行解释
renewExpiration()
续期方法:
private void renewExpiration() {
ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = (ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
} else {
if (res) {
RedissonLock.this.renewExpiration();
}
}
});
}
}
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}
该方法进来先拿到这个ExpirationEntry
对象,如果不为null,则执行一个newTimeout
方法获取一个超时任务,该方法传入三个参数,第一个参数是任务本身,其中第二个参数delay是延时,表示这个任务在delay到期之后才开始执行,所以是一个延时任务
这里传入的延时时间是this.internalLockLeaseTime / 3L
把internalLockLeaseTime
除以三
你是否还记得在之前的方法中,由于我们没有指定超时时间,系统默认给我们设置了一个watchDag时间为30s,在执行tryLockInnerAsync
方法的时候,又把这个时间赋给了internalLockLeaseTime
就是这里的成员变量,所以值为30s,也就是10s后这个任务才开始执行
TimerTask
任务:
new TimerTask() {
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = (ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
} else {
if (res) {
RedissonLock.this.renewExpiration();
}
}
});
}
}
}
}
先获取ExpirationEntry
对象,取出线程ID,ID不为null则调用了一个renewExpirationAsync
方法刷新有效期
renewExpirationAsync
方法:
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
这个方法执行了一个lua脚本,先对比获取锁的是否为当前线程,然后再用pexpire
命令更新有效期,重置为原本时间30s,这里就完成了第一次的续期
回到我们的延时任务中:
new TimerTask() {
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = (ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
} else {
if (res) {
RedissonLock.this.renewExpiration();
}
}
});
}
}
}
}
执行完毕之后用onComplete
方法又再次调用了renewExpiration
方法,也就是本方法,实现了一个递归,再次执行这个延时方法
也就是说每过10s就会执行该任务,更新有效期为30s
最终把这个任务封装到ExpirationEntry
中,所以ExpirationEntry
对象封装了两个东西,一个是线程ID,一个是执行完的延时任务,为什么要把这个任务放入ExpirationEntry
对象中呢,且听待会分析
这就是为什么在scheduleExpirationRenewal
方法中如果oldEntry
不为null就不执行renewExpiration
方法了,因为他里面已经有该方法了,并且一直递归执行中,每过10s执行一次更新续约
这个自动续期的机制又被称为看门狗机制
取消续期
那什么时候任务才取消呢?
在释放锁的源码中
unlockAsync
方法:
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise();
RFuture<Boolean> future = this.unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
this.cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
} else if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
result.trySuccess((Object)null);
}
});
return result;
}
刚刚说过,释放锁方法的最后,也就是这里调用的unlockInnerAsync
方法也会执行一段Lua脚本,锁释放执行成功之后返回了RFuture<Boolean>
然后立马执行cancelExpirationRenewal
方法,该方法就是取消更新任务
cancelExpirationRenewal
方法:
void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (task != null) {
if (threadId != null) {
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
}
EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
}
}
}
这里从Map中拿到当前锁对应的ExpirationEntry
对象,删除掉线程ID,再取出这个Timeout
任务调用timeout.cancel()
方法取消任务,最后再把ExpirationEntry
对象删除
前面在执行延时任务之后,之所以要把方法放入ExpirationEntry
对象中,就是为了这里释放锁时,可以拿到这个任务并把它取消,不再更新过期时间
这就是Redisson内部实现简单分布式锁的原理
这里附上执行的流程图: