首页 > 数据库 >Redisson分布式锁如何实现可重入

Redisson分布式锁如何实现可重入

时间:2024-12-09 11:58:30浏览次数:6  
标签:重入 Redisson return KEYS redis ARGV 线程 call 分布式

Redisson分布式锁如何实现可重入


本篇将从源码的角度去讲解Redisson分布式锁如何实现可重入的

我们都知道Redisson分布式锁比起我们自己用Redis实现的分布式锁有许多优点:

  • 可重入
  • 可重试
  • 超时续约

当我们使用Redisson去获取一个分布式锁的时候,大致的代码如下:

@Autowired
private RedissonClient redissonClient; // 注入Redisson客户端

public Result myService() {
    // 前置业务
    // ......
    //创建锁对象
    RLock lock = redissonClient.getLock("name");
    // 获取锁
    boolean isLock = lock.tryLock();
    if (!isLock){ // 没有成功获取锁
        // 返回错误
    }
    try {
        // 执行业务
    }finally {
        lock.unlock();// 释放锁
    }
}

可重入

在实现分布式锁时,我们通常会是利用 Redis 的 String 数据结构和 SETNX 命令,我们将锁存储为一个键(如 lock),值(value)为当前线程的唯一标识符:threadId,如果 SETNX 成功,就说明获取到了锁;否则,说明锁已被占用

Key:lock value: threadId

但是,这种方法有一个限制:不支持可重入
可重入锁的意思是,如果同一线程多次尝试获取同一把锁,应该是允许的,如在A方法中获取了锁,此时A方法调用了B方法,而在B方法中也需要获取锁,但是锁已经被A方法获取,由于是A调用了B,A方法并没有结束,也就无法释放锁,但是A,B方法同处于一个线程中,B尝试获取锁应该是被允许的

与我们自己实现分布式锁不同,Redisson采用的数据结构不是String而是Hash,获取锁时,不仅仅是将线程的唯一标识存入,而是多存入一个重入次数:count,可以来帮助我们记录在当前线程中获取过多少次锁,比对线程标识符,处于同一线程的其他方法也要获取锁时,重入次数就会+1,同理释放锁时,也会进行-1操作,如果-1后重入次数为0,才真正的释放锁,即删除key

Key:lock Value: count Field: threadId

tryLock()

查看Redisson中尝试获取锁的方法:tryLock()的源码,我们使用的是无参的方法,没有设定等待时间和过期时间:

public boolean tryLock() {
        return (Boolean)this.get(this.tryLockAsync());
}

无参的tryLock()方法又调用的无参的tryLockAsync()方法

public RFuture<Boolean> tryLockAsync() {
        return this.tryLockAsync(Thread.currentThread().getId());
}

而无参的tryLockAsync()方法又调用了带参数的tryLockAsync()方法,通过获取当前线程ID传入了线程唯一标识符:threadId

public RFuture<Boolean> tryLockAsync(long threadId) {
        return this.tryAcquireOnceAsync(-1L, -1L, (TimeUnit)null, threadId);
}

带参数的tryLockAsync()方法调用tryAcquireOnceAsync()方法,并传入四个参数:

  • waitTime:锁的最大等待时间,如果在这个时间内没有获得锁,操作会失败。

  • leaseTime:锁的最大持有时间,锁会在这个时间后自动过期,防止死锁。

  • unitleaseTime 时间的单位。

  • threadId:当前线程的唯一标识符,用于标识获取锁的线程。

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        if (leaseTime != -1L) { //调用tryLockInnerAsync()
            return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        } else {
            // 调用tryLockInnerAsync()
            RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
            ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                if (e == null) {
                    if (ttlRemaining) {
                        this.scheduleExpirationRenewal(threadId);
                    }

                }
            });
            return ttlRemainingFuture;
        }
}

此时tryAcquireOnceAsync()方法会对传入的leaseTime进行判断,如果我们没有指定过期时间,传入的值为-1,表示使用默认过期时间,无论是否指定了过期时间,该方法都会调用tryLockInnerAsync()方法并传入全部参数,而真正实现可重入的关键逻辑也在该方法中:

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        this.internalLockLeaseTime = unit.toMillis(leaseTime);
        return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, 
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
                "return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}

该方法中,通过Lua脚本来执行获取锁的操作,通过执行 Lua 脚本来保证加锁操作的原子性,Lua 脚本的具体执行逻辑可以分为以下几步:

1. 检查锁是否存在

首先,Lua 脚本检查锁是否已经存在:

if (redis.call('exists', KEYS[1]) == 0) then
2. 锁不存在时,设置锁和重入次数

如果锁不存在,Lua 脚本执行以下操作:

redis.call('hincrby', KEYS[1], ARGV[2], 1); 
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
  • hincrby:将 threadId 对应的 count 增加 1,即初始化当前线程的重入次数。

  • pexpire:设置该锁的过期时间,防止因线程异常结束而导致锁无法自动释放。过期时间通过 ARGV[1] 传入。

这里的 ARGV[2]threadIdARGV[1] 是过期时间

3. 锁已存在,检查是否是当前线程重入

如果锁已经存在,脚本会检查锁是否被当前线程(即 threadId)持有:

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
  • hexists:检查 Redis 哈希表中是否存在 threadId 对应的字段。如果该字段存在,说明当前线程已经持有锁,可以进行重入。
4.当前线程重入,增加重入计数

如果是当前线程重入,脚本执行以下操作:

redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
  • hincrby:再次增加当前线程的重入计数。
  • pexpire:更新锁的过期时间,防止锁过期。
5. 锁被其他线程持有时,返回失败

如果锁已经被其他线程持有,即 threadId 不匹配,Lua 脚本会返回一个值,表示加锁失败:

return redis.call('pttl', KEYS[1]);
  • pttl:获取锁的剩余过期时间。如果锁被其他线程持有,这个命令返回当前锁的过期时间,以便调用者知道锁何时会自动释放。
6. 返回结果
  • 如果加锁成功,返回值为 nil
  • 如果加锁失败,返回值为锁的剩余有效时间(即 pttl 返回的值),表示当前锁被其他线程持有。

unlock()

再查看释放锁unlock()方法的源码:

由于本篇只关注可重入的实现原理,所以不在赘述前面的调用过程,直接查看异步释放锁的方法源码:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, 
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; " +
            "end; " +
            "return nil;", Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId));
    }

unlockInnerAsync 方法是 Redisson 实现分布式可重入锁的解锁方法。它的主要作用是在释放锁时,减少锁的重入次数,并且在重入次数归零时,删除锁并进行相关的清理操作。整个操作是异步的,并且通过 Lua 脚本实现,以确保操作的原子性和一致性。

1. 检查当前线程是否持有锁
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
end;
  • hexists:检查当前线程的 threadId 是否存在于 Redis 锁的哈希表中。如果不存在,表示当前线程并未持有锁,方法返回 nil,表示无操作。
  • KEYS[1] 是锁的键(即 lock),ARGV[3] 是当前线程的唯一标识符 threadId
2. 减少重入次数
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
  • hincrby:减少当前线程的重入计数。-1 表示将当前线程的重入次数减 1。如果重入次数大于零,说明当前线程还需要持有锁,解锁操作还没有完成。
3. 如果重入次数大于零,更新过期时间
if (counter > 0) then
    redis.call('pexpire', KEYS[1], ARGV[2]);
    return 0;
  • pexpire:如果锁的重入次数大于零,表示该线程仍然需要继续持有锁,因此重设锁的过期时间,防止锁过期。
  • return 0:表示解锁操作完成,但锁没有被完全释放(因为重入次数没有归零)。
4. 如果重入次数归零,删除锁并清理
else
    redis.call('del', KEYS[1]);
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1;
end;
  • del:如果重入次数减到零,表示锁已经完全释放,此时删除 Redis 中的锁。
  • publish:通过 Redis 发布消息,通知其他订阅该锁的客户端锁已经释放。
  • return 1:表示锁已经完全释放。
5. 默认返回 nil
return nil;

如果没有满足任何条件,方法最终会返回 nil,表示没有发生任何操作。

通过这种方式,Redisson 的分布式锁能够支持可重入性

标签:重入,Redisson,return,KEYS,redis,ARGV,线程,call,分布式
From: https://blog.csdn.net/Gaomengsuanjia_/article/details/144329991

相关文章

  • 分布式系统架构1:共识算法Paxos
    1.背景今天开始更新分布式的文章,工作几年后还没系统的学习分布式的内容,趁着还有时间学习沉淀的时候多输出些文章2.为什么需要分布式共识算法思考:现在你有一份随时变动的数据,需要确保它正确存储在网络的几台不同机器上,并且要保证数据是随时可用的,应该怎么做?在分布式环境下,可以......
  • 【笔记】VMware vCenter(VSCA)分布式交换机的使用
    先说咋回事,我有俩服务器,都有万兆电口,但是我交换机是千兆的,只有四个万兆光口而且我只有第一个服务器有光口,第二个只有俩万兆电口,我做虚拟机迁移的时候如果走交换机就把我业务口的带宽占完了,而且还是千兆所以想用vsca的分布式交换机实现俩服务器之间网络直连不走交换机1、首先在V......
  • xxl-job分布式任务调度
    XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。GitHub:https://github.com/xuxueli/xxl-job码云:https://gitee.com/xuxueli0323/xxl-jobxxl-job:一个分布式任务调度平台,其......
  • EMQ分布式MQTT消息服务器部署指南
    1.EMQ简介EMQ(ErlangMQTTBroker)是基于Erlang/OTP平台开发的开源MQTT消息服务器,支持百万级连接和分布式集群。主要特点:完整支持MQTTv3.1/v3.1.1/v5.0协议支持WebSocket协议支持分布式集群支持SSL/TLS加密传输提供Web管理控制台2.EMQ默认端口说明EMQ默认开......
  • 吉林大学2024年并行与分布式计算期末考题
    因为课刚开三年,老师还是比较心慈手软.jpg大概回忆版,希望帮助到大家第一部分:8个简答1.列出三种非冯计算模型第一章ppt里量子计算那一块应该2.Flynn分类法SISD,SIMD,MISD,MIMD3.云计算中的三种存储模型对象存储,块存储,文件存储4.cache的一致性问题解释共享数据进入Cac......
  • 【亲测可用】Doris3.x分布式集群安装部署
    Doris作为⼀款开源的MPP架构的OLAP数据库,能够运⾏在绝⼤多数主流的商⽤服务器上。为了能够充分运⽤MPP架构的并发优势,以及Doris的⾼可⽤特性,我们建议Doris分布式集群的部署遵循以下要求。2.1软硬件环境检查2.1.1硬件检查1.CPU当安装Doris时,建议选择配备支持......
  • HarmonyOS Next 分布式加密协作应用案例剖析
    本文旨在深入探讨华为鸿蒙HarmonyOSNext系统(截止目前API12)在分布式加密协作应用中的应用,基于实际开发实践进行总结。主要作为技术分享与交流载体,难免错漏,欢迎各位同仁提出宝贵意见和问题,以便共同进步。本文为原创内容,任何形式的转载必须注明出处及原作者。第一章:应用场景与架......
  • 13.SpringCloudSeata处理分布式事务
    分布式事务(引入)面试题你简历上写用微服务boot/cloud做过项目,你不可能只有一个数据库吧?请你谈谈多个数据库之间,你如何处理分布式事务?举例:在订单支付成功后,交易中心会调用订单中心的服务把订单状态更新,并调用物流中心的服务通知商品发货,同时还要调用积分中心的服务为用户增加相......
  • 电商项目--分布式文件存储FastDFS搭建
    一、FastDFS环境搭建我们使用Docker搭建FastDFS的开发环境(1)拉取镜像dockerpullmorunchang/fastdfs (2)运行trackerdockerrun-d--nametracker--net=hostmorunchang/fastdfsshtracker.sh(3)运行storagedockerrun-d--namestorage--net=host-eTRACKER_......
  • 基于vpk180边缘场景下分布式神经网络训练模型部署
    本项目目标在于针对边缘场景实现P2P的分布式训练过程,设计方案为将神经网络训练过程对应的裸机程序部署在了PS端的ARMCortex-A72核上,传输方案采用开发板板载的GTM收发器硬件资源通过外部QSFP-DD光模块光传输至对端,最终完成了设计目标。整个项目的实现细节可分为如下几个重......