首页 > 数据库 >redisson分布式锁源码和原理浅析

redisson分布式锁源码和原理浅析

时间:2023-02-01 13:05:10浏览次数:46  
标签:redisson ttl 加锁 threadId redis 源码 time 超时 浅析


在redisson之前,很多人可能已经自己实现过基于redis的分布式锁,本身原理也比较简单,redis自身就是一个单线程处理器,具备互斥的特性,通过setNx,exist等命令就可以完成简单的分布式锁,处理好超时释放锁的逻辑即可。

redisson在此基础上,加上了更多的逻辑控制和功能,譬如公平锁等。这一篇我们就来看看redisson是如何完成分布式锁的。

redisson分布式锁源码和原理浅析_redisson分布式锁

先是使用锁的地方,在上一篇里已经用了。先获取RLock对象,调用lock、tryLock方法来完成加锁的功能。之后执行完毕需要被加锁的逻辑后,释放锁。

redisson分布式锁源码和原理浅析_加锁_02

lock方法是直接加锁,如果锁已被占用,则直接线程阻塞,进行等待,直到锁被占用方释放。

tryLock方法则是设定了waitTime(等待时间),在这个等待时间没到前,也是线程阻塞并反复去获取锁,直到取到锁或等待时间超时,则返回false。

这里就以tryLock的源码为例来看看。pom里依赖的redisson版本是

redisson分布式锁源码和原理浅析_System_03

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
//尝试获取锁,如果没取到锁,则获取锁的剩余超时时间
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
//如果waitTime已经超时了,就返回false
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}

current = System.currentTimeMillis();
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(threadId);
return false;
}

try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
//进入死循环,反复去调用tryAcquire尝试获取锁,ttl为null时就是别的线程已经unlock了
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}

time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}

// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}

time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}

可以看到,其中主要的逻辑就是尝试加锁,成功了就返回true,失败了就进入死循环反复去尝试加锁。中途还有一些超时的判断。逻辑还是比较简单的。

再看看tryAcquire方法

redisson分布式锁源码和原理浅析_加锁_04

这个方法的调用栈也是比较多,之后会进入下面这个方法

redisson分布式锁源码和原理浅析_System_05

该方法就是与redis通信的地方,通过exists key的方法来判断是否已经上锁,如果没锁,则会返回null,锁了则返回超时时间。

回到那个死循环的地方:

redisson分布式锁源码和原理浅析_redis_06

这里有一个针对waitTime和redis锁住的key的超时时间大小的比较,取到二者中比较小的那个值,然后用java的Semaphore信号量的tryAcquire方法来阻塞线程。

那么Semaphore信号量又是由谁控制呢,何时才能release呢。这里又需要回到上面来看。

redisson分布式锁源码和原理浅析_redisson分布式锁原理_07

注意这个subscribe方法,它会进入到PublishSubscribe.java中

redisson分布式锁源码和原理浅析_System_08

在这里可以看到,将当前的threadId添加到一个AsyncSemaphore中,并且设置一个redis的监听器,这个监听器是通过redis的发布、订阅功能实现的。如果不了解redis发布订阅的可以去百度查查。一旦监听器收到redis发来的消息,就从中获取与当前thread相关的,如果是锁被释放的消息,就立马通过操作Semaphore来让刚才阻塞的地方释放。

redisson分布式锁源码和原理浅析_redisson分布式锁_09

释放后,线程继续执行,仍旧是判断是否已经超时。如果还没超时,就进入下一次循环。

redisson分布式锁源码和原理浅析_redisson分布式锁原理_10

再次去获取锁,取到了就返回true,取不到,继续刚才的步骤。

所以,总体流程应该是这样的。

redisson分布式锁源码和原理浅析_System_11

最后还有个知识点,就是可重入加锁机制:

假如客户端1已经持有这个锁了,再次加锁时会怎样

redisson分布式锁源码和原理浅析_redisson分布式锁_12

这时我们来分析一下上面那段lua脚本。

第一个if判断肯定不成立,“exists myLock”会显示锁key已经存在了。

第二个if判断会成立,因为myLock的hash数据结构中包含的那个ID,就是客户端1的那个ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”

此时就会执行可重入加锁的逻辑,他会用:

incrby myLock 

 8743c9c0-0795-4907-87fd-6c71a6b4586:1 1

通过这个命令,对客户端1的加锁次数,累加1。

此时myLock数据结构变为下面这样:

大家看到了吧,那个myLock的hash数据结构中的那个客户端ID,就对应着加锁的次数。相应的,释放锁时,如果执行lock.unlock(),就可以释放分布式锁,此时的业务逻辑也是非常简单的。

其实说白了,就是每次都对myLock数据结构中的那个加锁次数减1。

如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:

“del myLock”命令,从redis里删除这个key。

然后呢,另外的客户端2就可以尝试完成加锁了。

这就是所谓的分布式锁的开源Redisson框架的实现机制。

一般我们在生产系统中,可以用Redisson框架提供的这个类库来基于redis进行分布式锁的加锁与释放锁。

 


标签:redisson,ttl,加锁,threadId,redis,源码,time,超时,浅析
From: https://blog.51cto.com/u_13706148/6031390

相关文章