首页 > 数据库 >Redisson的看门狗机制底层实现

Redisson的看门狗机制底层实现

时间:2024-05-06 16:44:37浏览次数:24  
标签:Redisson 看门狗 线程 ExpirationEntry threadId null 延迟 底层

1. 看门狗机制概述
看门狗机制是Redission提供的一种自动延期机制,这个机制使得Redission提供的分布式锁是可以自动续期的。

private long lockWatchdogTimeout = 30 * 1000;
1
看门狗机制提供的默认超时时间是30*1000毫秒,也就是30秒

如果一个线程获取锁后,运行程序到释放锁所花费的时间大于锁自动释放时间(也就是看门狗机制提供的超时时间30s),那么Redission会自动给redis中的目标锁延长超时时间。

在Redission中想要启动看门狗机制,那么我们就不用获取锁的时候自己定义leaseTime(锁自动释放时间)。

如果自己定义了锁自动释放时间的话,无论是通过lock还是tryLock方法,都无法启用看门狗机制。

但是,如果传入的leaseTime为-1,也是会开启看门狗机制的。

分布式锁是不能设置永不过期的,这是为了避免在分布式的情况下,一个节点获取锁之后宕机从而出现死锁的情况,所以需要个分布式锁设置一个过期时间。但是这样会导致一个线程拿到锁后,在锁的过期时间到达的时候程序还没运行完,导致锁超时释放了,那么其他线程就能获取锁进来,从而出现问题。

所以,看门狗机制的自动续期,就很好地解决了这一个问题。

看门狗机制的相关代码主要在tryAcquire方法上,在这个方法里主要看到方法是tryAcquireAsync(waitTime, leaseTime, unit, threadId)

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
1
2
3
由于在tryLock方法中没传leaseTime,所以leaseTime为默认值-1

调用tryLockInnerAsync,如果获取锁失败,返回的结果是这个key的剩余有效期,如果获取锁成功,则返回null。

获取锁成功后,如果检测不存在异常并且获取锁成功`(ttlRemaining == null)。

那么则执行this.scheduleExpirationRenewal(threadId);来启动看门狗机制。

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
//如果获取锁失败,返回的结果是这个key的剩余有效期
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
//上面获取锁回调成功之后,执行这代码块的内容
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
//不存在异常
if (e == null) {
//剩余有效期为null
if (ttlRemaining == null) {
//这个函数是解决最长等待有效期的问题
this.scheduleExpirationRenewal(threadId);
}

}
});
return ttlRemainingFuture;
}
}
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);

return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 锁不存在,则往redis中设置锁信息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 锁存在
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
一个锁就对应自己的一个ExpirationEntry类,

EXPIRATION_RENEWAL_MAP存放的是所有的所信息。

根据锁的名称从EXPIRATION_RENEWAL_MAP里面获取锁,如果存在这把锁则冲入,如果不存在,则将这个新锁放置进EXPIRATION_RENEWAL_MAP,并且开启看门狗机制。

private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
//这里EntryName是指锁的名称
ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
//重入
//将线程ID加入
oldEntry.addThreadId(threadId);
} else {
//将线程ID加入
entry.addThreadId(threadId);
//续约
this.renewExpiration();
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
首先,从EXPIRATION_RENEWAL_MAP中获取这个锁,接下来定义一个延迟任务task,这个任务的步骤如下

新创建了一个子线程去反复调用
从EXPIRATION_RENEWAL_MAP中获取这把锁,如果这把锁不存在了,说明被删除了,不在需要续期了。
从锁中获取获得这把锁的线程IDthreadId
调用renewExpirationAsync方法刷新最长等待时间
如果刷新成功,则进来递归调用这个函数renewExpiration()
这个任务task设置为 this.internalLockLeaseTime / 3L,也是锁自动释放时间,因为没传,也就是10s。

也就是说,这个延迟任务延迟十秒执行一次。

最后,为这把锁ee设置延迟任务task即可

private void renewExpiration() {
//先从map里得到这个ExpirationEntry
ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
//这个是一个延迟任务
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
//延迟任务内容
public void run(Timeout timeout) throws Exception {
//拿出ExpirationEntry
ExpirationEntry ent = (ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
if (ent != null) {
//从ExpirationEntry拿出线程ID
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
//调用renewExpirationAsync方法刷新最长等待时间
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
} else {
if (res) {
//renewExpirationAsync方法执行成功之后,进行递归调用,调用自己本身函数
//那么就可以实现这样的效果
//首先第一次进行这个函数,设置了一个延迟任务,在10s后执行
//10s后,执行延迟任务的内容,刷新有效期成功,那么就会再新建一个延迟任务,刷新最长等待有效期
//这样这个最长等待时间就会一直续费
RedissonLock.this.renewExpiration();
}

}
});
}
}
}
},
//这是锁自动释放时间,因为没传,所以是看门狗时间=30*1000
//也就是10s
this.internalLockLeaseTime / 3L,
//时间单位
TimeUnit.MILLISECONDS);
//给当前ExpirationEntry设置延迟任务
ee.setTimeout(task);
}
}

 

// 刷新等待时间
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
最后,在释放锁的时候,就会关闭所有的延迟任务,核心代码如下

public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise();
RFuture<Boolean> future = this.unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
//取消锁更新任务
this.cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
} else if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
result.trySuccess((Object)null);
}
});
return result;
}

void cancelExpirationRenewal(Long threadId) {
//获得当前这把锁的任务
ExpirationEntry task = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (task != null) {
//当前锁的延迟任务不为空,且线程id不为空
if (threadId != null) {
//先把线程ID去掉
task.removeThreadId(threadId);
}

if (threadId == null || task.hasNoThreads()) {
//然后取出延迟任务
Timeout timeout = task.getTimeout();
if (timeout != null) {
//把延迟任务取消掉
timeout.cancel();
}
//再把ExpirationEntry移除出map
EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
}

}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
3. 总结
在使用Redis实现分布式锁的时候,会存在很多问题。

比如说业务逻辑处理时间>自己设置的锁自动释放时间的话,Redis就会按超时情况把锁释放掉,而其他线程就会趁虚而入抢夺锁从而出现问题,因此需要有一个续期的操作。

并且,如果释放锁的操作在finally完成,需要判断一下当前锁是否是属于自己的锁,防止释放掉其他线程的锁,这样释放锁的操作就不是原子性了,而这个问题很好解决,使用lua脚本即可。

Redisson的出现,其中的看门狗机制很好解决续期的问题,它的主要步骤如下:

在获取锁的时候,不能指定leaseTime或者只能将leaseTime设置为-1,这样才能开启看门狗机制。
在tryLockInnerAsync方法里尝试获取锁,如果获取锁成功调用scheduleExpirationRenewal执行看门狗机制
在scheduleExpirationRenewal中比较重要的方法就是renewExpiration,当线程第一次获取到锁(也就是不是重入的情况),那么就会调用renewExpiration方法开启看门狗机制。
在renewExpiration会为当前锁添加一个延迟任务task,这个延迟任务会在10s后执行,执行的任务就是将锁的有效期刷新为30s(这是看门狗机制的默认锁释放时间)
并且在任务最后还会继续递归调用renewExpiration。
也就是总的流程就是,首先获取到锁(这个锁30s后自动释放),然后对锁设置一个延迟任务(10s后执行),延迟任务给锁的释放时间刷新为30s,并且还为锁再设置一个相同的延迟任务(10s后执行),这样就达到了如果一直不释放锁(程序没有执行完)的话,看门狗机制会每10s将锁的自动释放时间刷新为30s。

而当程序出现异常,那么看门狗机制就不会继续递归调用renewExpiration,这样锁会在30s后自动释放。

或者,在程序主动释放锁后,流程如下:

将锁对应的线程ID移除
接着从锁中获取出延迟任务,将延迟任务取消
在将这把锁从EXPIRATION_RENEWAL_MAP中移除。

标签:Redisson,看门狗,线程,ExpirationEntry,threadId,null,延迟,底层
From: https://www.cnblogs.com/wjzohou/p/18175331

相关文章

  • MyBatis学习总结 + 【手写MyBatis底层机制核心】
    MyBatis笔记MyBatis介绍MyBatis是一个持久层框架前身是ibatis,在ibatis3.x时,更名为MyBatisMyBatis在java和sql之间提供更灵活的映射方案mybatis可以将对数据表的操作(sql,方法)等等直接剥离,写到xml配置文件,实现和java代码的解耦mybatis通过SQL操作DB,建库建表......
  • 搞IT的为什么不建议搞底层(编译器、编程语言、)——当你搬进你的新家之后,你会在意这个楼
    文字表达引自:https://www.youtube.com/watch?v=KITqGv1qYg8当你搬进你的新家之后,你会在意这个楼是谁打的地基吗?你猜猜那些打地基的工人赚多少钱,卖你沙发电视机微波炉的人赚多少钱,当你在你温馨的小家里舒适的生活的时候,你会想这地基打的真好吗,只有一种情况下你会想到地基的事,就......
  • redisson分布式锁原理
    参考:图灵课堂 https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95https://blog.csdn.net/asd051377305/article/details/108384490分布式锁的引入当在单机单线程情况下,是不用考虑任何并发问题的,一切都是那么的美好,那么的顺其自然。在单机多线程情况下,就要考虑......
  • 底层架构
    一:角色分类(1)Coordinator协调器:协调器服务监视数据服务器上的历史服务。他们负责将区段分配给特定服务器,并确保区段在历史服务器之间保持良好的平衡。(2)Overlord:控制数据摄入任务的分配,Overlord服务监视数据服务器上的MiddleManager服务,并且是将数据摄取到Druid。他们负......
  • 蚂蚁面试:Springcloud核心组件的底层原理,你知道多少?
    文章很长,且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录博客园版为您奉上珍贵的学习资源:免费赠送:《尼恩Java面试宝典》持续更新+史上最全+面试必备2000页+面试必备+大厂必备+涨薪必备免费赠送:《尼恩技术圣经+高并发系列PDF》,帮你实现技术自由,完成职业升级,薪......
  • Mysql锁机制与优化实践以及MVCC底层原理剖析
    学习来源-图灵课堂https://vip.tulingxueyuan.cn锁学习参考:https://juejin.cn/post/7307889500545253395  锁机制为了保证数据的一致性,当访问共享变量的时候我们可以针对共享数据加锁,但是加锁要时要注意加锁的成本,还有加锁的粒度,还有就是是否会发生死锁,还有就是发生了死锁......
  • 大厂50万节点监控系统架构设计&Prometheus底层源码级剖析
    大厂50万节点监控系统架构设计&Prometheus底层源码级剖析 设计和实现一个大规模监控系统需要深入考虑架构设计、可伸缩性、性能优化等方面。下面是一个关于大规模监控系统架构设计的简要指南,以及有关Prometheus底层源码的剖析:大规模监控系统架构设计:1.架构设计原......
  • python将图片添加到视频底层中(提高处理单个视频的效率)
    代码: importcv2importnumpyasnpimportosimportrandomfromconcurrent.futuresimportThreadPoolExecutor#图片文件夹路径image_folder_path=r'F:\jingguan\tu'#视频文件所在的文件夹路径video_folder_path=r'F:\jingguan\yuan'#输出视频文件夹路径ou......
  • python将图片添加到视频底层中
    代码:importcv2importnumpyasnpimportosimportrandom#图片文件夹路径image_folder_path='path_to_your_images_folder'#视频文件所在的文件夹路径video_folder_path='path_to_your_videos_folder'#输出视频文件夹路径(如果不存在则创建)output_folder_pat......
  • MySQL(1)-索引底层为什么用B+树
    最近在看面经,发现有很多跟B+树相关的问题,为此需要单独总结一下让自己形成一个体系。核心内容是为什么MySQL采用B+树作为索引?|小林coding所以可以直接看小林code的讲解,很到位。进入正题前,首先要对B树、B+树、二分查找树、自平衡二叉树、索引这些概念了初步解再分析具体问题......