在redisson之前,很多人可能已经自己实现过基于redis的分布式锁,本身原理也比较简单,redis自身就是一个单线程处理器,具备互斥的特性,通过setNx,exist等命令就可以完成简单的分布式锁,处理好超时释放锁的逻辑即可。
redisson在此基础上,加上了更多的逻辑控制和功能,譬如公平锁等。这一篇我们就来看看redisson是如何完成分布式锁的。
先是使用锁的地方,在上一篇里已经用了。先获取RLock对象,调用lock、tryLock方法来完成加锁的功能。之后执行完毕需要被加锁的逻辑后,释放锁。
lock方法是直接加锁,如果锁已被占用,则直接线程阻塞,进行等待,直到锁被占用方释放。
tryLock方法则是设定了waitTime(等待时间),在这个等待时间没到前,也是线程阻塞并反复去获取锁,直到取到锁或等待时间超时,则返回false。
这里就以tryLock的源码为例来看看。pom里依赖的redisson版本是
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
//尝试获取锁,如果没取到锁,则获取锁的剩余超时时间
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
//如果waitTime已经超时了,就返回false
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
//进入死循环,反复去调用tryAcquire尝试获取锁,ttl为null时就是别的线程已经unlock了
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
可以看到,其中主要的逻辑就是尝试加锁,成功了就返回true,失败了就进入死循环反复去尝试加锁。中途还有一些超时的判断。逻辑还是比较简单的。
再看看tryAcquire方法
这个方法的调用栈也是比较多,之后会进入下面这个方法
该方法就是与redis通信的地方,通过exists key的方法来判断是否已经上锁,如果没锁,则会返回null,锁了则返回超时时间。
回到那个死循环的地方:
这里有一个针对waitTime和redis锁住的key的超时时间大小的比较,取到二者中比较小的那个值,然后用java的Semaphore信号量的tryAcquire方法来阻塞线程。
那么Semaphore信号量又是由谁控制呢,何时才能release呢。这里又需要回到上面来看。
注意这个subscribe方法,它会进入到PublishSubscribe.java中
在这里可以看到,将当前的threadId添加到一个AsyncSemaphore中,并且设置一个redis的监听器,这个监听器是通过redis的发布、订阅功能实现的。如果不了解redis发布订阅的可以去百度查查。一旦监听器收到redis发来的消息,就从中获取与当前thread相关的,如果是锁被释放的消息,就立马通过操作Semaphore来让刚才阻塞的地方释放。
释放后,线程继续执行,仍旧是判断是否已经超时。如果还没超时,就进入下一次循环。
再次去获取锁,取到了就返回true,取不到,继续刚才的步骤。
所以,总体流程应该是这样的。
最后还有个知识点,就是可重入加锁机制:
假如客户端1已经持有这个锁了,再次加锁时会怎样
这时我们来分析一下上面那段lua脚本。
第一个if判断肯定不成立,“exists myLock”会显示锁key已经存在了。
第二个if判断会成立,因为myLock的hash数据结构中包含的那个ID,就是客户端1的那个ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”
此时就会执行可重入加锁的逻辑,他会用:
incrby myLock
8743c9c0-0795-4907-87fd-6c71a6b4586:1 1
通过这个命令,对客户端1的加锁次数,累加1。
此时myLock数据结构变为下面这样:
大家看到了吧,那个myLock的hash数据结构中的那个客户端ID,就对应着加锁的次数。相应的,释放锁时,如果执行lock.unlock(),就可以释放分布式锁,此时的业务逻辑也是非常简单的。
其实说白了,就是每次都对myLock数据结构中的那个加锁次数减1。
如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:
“del myLock”命令,从redis里删除这个key。
然后呢,另外的客户端2就可以尝试完成加锁了。
这就是所谓的分布式锁的开源Redisson框架的实现机制。
一般我们在生产系统中,可以用Redisson框架提供的这个类库来基于redis进行分布式锁的加锁与释放锁。