RedissonLock.lock
入口
流程图
源码分析 redisssionLock.lock(long leaseTime, TimeUnit unit, boolean interruptibly)
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// 获取当前线程的ID
long threadId = Thread.currentThread().getId();
// 尝试获取锁,如果成功则返回 null,否则返回锁的剩余有效期(TTL,毫秒)
Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
// 如果锁没有被成功获取
if (ttl != null) {
// 订阅锁状态的更新(例如锁的释放)
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
// 根据是否可中断执行同步订阅操作
if (interruptibly) {
this.commandExecutor.syncSubscriptionInterrupted(future);
} else {
this.commandExecutor.syncSubscription(future);
}
try {
// 无限循环尝试获取锁
while (true) {
// 再次尝试获取锁,如果成功则返回 null
ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
// 锁已成功获取,直接返回
if (ttl == null) {
return;
}
// 如果锁的 TTL(有效期)大于等于 0,表示当前锁仍然存在并且有剩余时间
if (ttl >= 0L) {
try {
// 尝试在指定的 TTL 时间内获取信号量(表示锁释放信号)
((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException var13) {
InterruptedException e = var13;
// 如果是可中断的模式,抛出中断异常
if (interruptibly) {
throw e;
}
// 否则继续尝试在指定的 TTL 时间内获取信号量
((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else if (interruptibly) {
// 如果 TTL 小于 0 且模式为可中断,则阻塞直到获取到信号量或被中断
((RedissonLockEntry)future.getNow()).getLatch().acquire();
} else {
// 如果模式为不可中断,则阻塞直到获取到信号量
((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly();
}
}
} finally {
// 解除订阅(取消对锁状态的监听)
this.unsubscribe(future, threadId);
}
}
}
redisssionLock.tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId)
redissionLock.tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId)
这里一共做了两件事:
- tryLockInnerAsync:执行加锁LUA脚本,返回null说明加锁成功,反之失败,如果传入leaseTime不为-1,就用传入的,不然就使用默认的internalLockLeaseTime
lu
lua脚本:
if (redis.call('exists', KEYS[1]) == 0) then
-- 如果键不存在,则将哈希字段计数器增加1,并设置过期时间
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 如果哈希字段存在,则将哈希字段计数器增加1,并设置过期时间
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 返回键的剩余生存时间
return redis.call('pttl', KEYS[1]);
RedissionBaseLock.evalWriteAsync:
CommandBatchService.evalWriteAsync:
CommandAsyncService.getNodeSource(key)
ClusterConnectionManager.calcSlot(key)
总结一下:
-1. 选择slot槽
-2. 执行lua脚本,检查锁的key是否存在,如果存在,判断是否是同一线程再次过来对同一个key进行加锁,也就是当前key是否被当前线程持有(可重入性),如果上述两个条件任意一个成立,则对当前key执行自增和设置过期时间操作,并返回null表示加锁成功。反之,返回当前锁的过期时间,表示加锁失败
- scheduleExpirationRenewal:如果加锁成功(null),且设置了过期时间,将设置过期时间赋值给internallockLeaseTime,如果没设置,则执行scheduleExpirationRenewal方法(看门狗)
RedissionBaseLock.scheduleExpirationRenewal:
EXPIRATION_RENEWAL_MAP存放续期任务,get有值说明当前锁需要续期,为null则不需要再续期了。接下来,就是renewExpiration执行续期任务
RedissionBaseLock.renewExpiration:
-1: 首先会从EXPIRATION_RENEWAL_MAP中获取一个值,如果为null,就不续期了,说明这个锁可能已经被释放或过期了
-2: 使用netty的时间轮来完成一个定时任务,设置internalLockLeaseTime / 3的时长进行一次锁续期,也就是每10s进行一次续期,
- 这里也会从EXPIRATION_RENEWAL_MAP里获取一个值,检查锁是否被释放或者过期了
- 如果不为null,则获取第一个等待该锁的线程,如果没有等待也就说明此时没有竞争,也同样不需要续期了
- 如果有等待的线程,说明需要续期,它会异步调用renewExpirationAsync(threadId)方法来实现续期。
- 当异步续期操作完成,会调用whenComplete方法来处理结果,如果有异常,则将该锁从EXPIRATION_RENEWAL_MAP中移除。如果续期成功,则会重新调用renewExpiration()方法进行下一次续期,如果续期失败,则调用cancelExpirationRenewal()方法取消续期。
RedissonBaseLock.renewExpirationAsync:
如果当前key存在,说明当前锁还被该线程持有,那么就重置过期时间为30s,并返回true,表示续期成功,反之返回false
RedissonBaseLock.cancelExpirationRenewal:
- 还是从这个map里获取键值对,如果为null,说明续期任务不存在,也没必要进行下去了,直接返回。
- 如果threadId不为null,直接将这个续期任务从task里移除。
- 如果threadId为null或者task中不再有任何线程在等待续期,此时就调用cancel方法来取消定时任务,然后在从EXPIRATION_RENEWAL_MAP中移除该续期任务。
总结
1. 什么时候会进行锁续期
加锁时,如果没有指定过期时间,则默认过期时间为30s且每隔10s进行锁续期操作
2. 什么情况会停止续期
- 锁被释放。
- 没有其它线程竞争当前锁资源。
- 续期时发生异常。
- 执行锁续期LUA脚本失败。
- Redission的续期时Netty时间轮(TimerTask、TimeOut、Timer)的,并且操作都是基于JVM,所以当应用宕机、下线或重启后,续期任务也没有了。
RedissonLock.tryLock
入口
源码分析 RedissonLock.tryLock(long waitTime, long leaseTime, TimeUnit unit)
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 将等待时间转换为毫秒
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
// 获取当前线程的ID
long threadId = Thread.currentThread().getId();
// 尝试获取锁,如果成功获取锁则返回 null,否则返回锁的剩余有效期(TTL,毫秒)
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// 如果锁成功获取
if (ttl == null) {
return true;
}
// 更新剩余的等待时间
time -= System.currentTimeMillis() - current;
if (time <= 0) {
// 获取锁失败处理
acquireFailed(waitTime, unit, threadId);
return false;
}
// 记录当前时间
current = System.currentTimeMillis();
// 订阅锁的释放消息,以便在锁被其他线程释放时通知当前线程
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 等待订阅的完成或超时
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
// 如果订阅超时,尝试取消订阅
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
// 获取锁失败处理
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
// 更新剩余的等待时间
time -= System.currentTimeMillis() - current;
if (time <= 0) {
// 获取锁失败处理
acquireFailed(waitTime, unit, threadId);
return false;
}
// 循环尝试获取锁,直到成功或超时
while (true) {
long currentTime = System.currentTimeMillis();
// 再次尝试获取锁
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// 如果锁成功获取
if (ttl == null) {
return true;
}
// 更新剩余的等待时间
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
// 获取锁失败处理
acquireFailed(waitTime, unit, threadId);
return false;
}
// 等待释放消息
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 更新剩余的等待时间
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
// 获取锁失败处理
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
// 最后,取消订阅锁的释放消息
unsubscribe(subscribeFuture, threadId);
}
}
这个方法 tryLock 实现了一个带有超时功能的分布式锁获取逻辑。它会在指定的等待时间内反复尝试获取锁,并在锁释放后通过信号量机制及时响应。
标签:long,获取,源码,RedissonLock,threadId,续期,null,浅析,unit From: https://blog.csdn.net/LittleStar_Cao/article/details/141533868