首页 > 数据库 >Redission源码浅析一:RedissonLock.lock与RedissonLock.tryLock

Redission源码浅析一:RedissonLock.lock与RedissonLock.tryLock

时间:2024-08-25 23:25:09浏览次数:11  
标签:long 获取 源码 RedissonLock threadId 续期 null 浅析 unit

RedissonLock.lock

入口

在这里插入图片描述

流程图

在这里插入图片描述

源码分析 redisssionLock.lock(long leaseTime, TimeUnit unit, boolean interruptibly)

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    // 获取当前线程的ID
    long threadId = Thread.currentThread().getId();
    
    // 尝试获取锁,如果成功则返回 null,否则返回锁的剩余有效期(TTL,毫秒)
    Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
    
    // 如果锁没有被成功获取
    if (ttl != null) {
        // 订阅锁状态的更新(例如锁的释放)
        RFuture<RedissonLockEntry> future = this.subscribe(threadId);
        
        // 根据是否可中断执行同步订阅操作
        if (interruptibly) {
            this.commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            this.commandExecutor.syncSubscription(future);
        }

        try {
            // 无限循环尝试获取锁
            while (true) {
                // 再次尝试获取锁,如果成功则返回 null
                ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
                
                // 锁已成功获取,直接返回
                if (ttl == null) {
                    return;
                }

                // 如果锁的 TTL(有效期)大于等于 0,表示当前锁仍然存在并且有剩余时间
                if (ttl >= 0L) {
                    try {
                        // 尝试在指定的 TTL 时间内获取信号量(表示锁释放信号)
                        ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException var13) {
                        InterruptedException e = var13;
                        // 如果是可中断的模式,抛出中断异常
                        if (interruptibly) {
                            throw e;
                        }

                        // 否则继续尝试在指定的 TTL 时间内获取信号量
                        ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else if (interruptibly) {
                    // 如果 TTL 小于 0 且模式为可中断,则阻塞直到获取到信号量或被中断
                    ((RedissonLockEntry)future.getNow()).getLatch().acquire();
                } else {
                    // 如果模式为不可中断,则阻塞直到获取到信号量
                    ((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly();
                }
            }
        } finally {
            // 解除订阅(取消对锁状态的监听)
            this.unsubscribe(future, threadId);
        }
    }
}

redisssionLock.tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId)

在这里插入图片描述
redissionLock.tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId)
在这里插入图片描述
这里一共做了两件事:

  1. tryLockInnerAsync:执行加锁LUA脚本,返回null说明加锁成功,反之失败,如果传入leaseTime不为-1,就用传入的,不然就使用默认的internalLockLeaseTime
    lu在这里插入图片描述
    lua脚本:
if (redis.call('exists', KEYS[1]) == 0) then
    -- 如果键不存在,则将哈希字段计数器增加1,并设置过期时间
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    -- 如果哈希字段存在,则将哈希字段计数器增加1,并设置过期时间
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;

-- 返回键的剩余生存时间
return redis.call('pttl', KEYS[1]);

RedissionBaseLock.evalWriteAsync:
在这里插入图片描述
CommandBatchService.evalWriteAsync:
在这里插入图片描述
CommandAsyncService.getNodeSource(key)
在这里插入图片描述
ClusterConnectionManager.calcSlot(key)
在这里插入图片描述
总结一下:
-1. 选择slot槽
-2. 执行lua脚本,检查锁的key是否存在,如果存在,判断是否是同一线程再次过来对同一个key进行加锁,也就是当前key是否被当前线程持有(可重入性),如果上述两个条件任意一个成立,则对当前key执行自增和设置过期时间操作,并返回null表示加锁成功。反之,返回当前锁的过期时间,表示加锁失败

  1. scheduleExpirationRenewal:如果加锁成功(null),且设置了过期时间,将设置过期时间赋值给internallockLeaseTime,如果没设置,则执行scheduleExpirationRenewal方法(看门狗)
    在这里插入图片描述
    RedissionBaseLock.scheduleExpirationRenewal:
    在这里插入图片描述
    EXPIRATION_RENEWAL_MAP存放续期任务,get有值说明当前锁需要续期,为null则不需要再续期了。接下来,就是renewExpiration执行续期任务

RedissionBaseLock.renewExpiration:
在这里插入图片描述
-1: 首先会从EXPIRATION_RENEWAL_MAP中获取一个值,如果为null,就不续期了,说明这个锁可能已经被释放或过期了
-2: 使用netty的时间轮来完成一个定时任务,设置internalLockLeaseTime / 3的时长进行一次锁续期,也就是每10s进行一次续期,

  • 这里也会从EXPIRATION_RENEWAL_MAP里获取一个值,检查锁是否被释放或者过期了
  • 如果不为null,则获取第一个等待该锁的线程,如果没有等待也就说明此时没有竞争,也同样不需要续期了
  • 如果有等待的线程,说明需要续期,它会异步调用renewExpirationAsync(threadId)方法来实现续期。
  • 当异步续期操作完成,会调用whenComplete方法来处理结果,如果有异常,则将该锁从EXPIRATION_RENEWAL_MAP中移除。如果续期成功,则会重新调用renewExpiration()方法进行下一次续期,如果续期失败,则调用cancelExpirationRenewal()方法取消续期。

RedissonBaseLock.renewExpirationAsync:
在这里插入图片描述
如果当前key存在,说明当前锁还被该线程持有,那么就重置过期时间为30s,并返回true,表示续期成功,反之返回false

RedissonBaseLock.cancelExpirationRenewal:
在这里插入图片描述

  • 还是从这个map里获取键值对,如果为null,说明续期任务不存在,也没必要进行下去了,直接返回。
  • 如果threadId不为null,直接将这个续期任务从task里移除。
  • 如果threadId为null或者task中不再有任何线程在等待续期,此时就调用cancel方法来取消定时任务,然后在从EXPIRATION_RENEWAL_MAP中移除该续期任务。

总结

1. 什么时候会进行锁续期

加锁时,如果没有指定过期时间,则默认过期时间为30s且每隔10s进行锁续期操作

2. 什么情况会停止续期
  • 锁被释放。
  • 没有其它线程竞争当前锁资源。
  • 续期时发生异常。
  • 执行锁续期LUA脚本失败。
  • Redission的续期时Netty时间轮(TimerTask、TimeOut、Timer)的,并且操作都是基于JVM,所以当应用宕机、下线或重启后,续期任务也没有了。

RedissonLock.tryLock

入口在这里插入图片描述

在这里插入图片描述
源码分析 RedissonLock.tryLock(long waitTime, long leaseTime, TimeUnit unit)

 @Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    // 将等待时间转换为毫秒
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    // 获取当前线程的ID
    long threadId = Thread.currentThread().getId();
    
    // 尝试获取锁,如果成功获取锁则返回 null,否则返回锁的剩余有效期(TTL,毫秒)
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    
    // 如果锁成功获取
    if (ttl == null) {
        return true;
    }

    // 更新剩余的等待时间
    time -= System.currentTimeMillis() - current;
    if (time <= 0) {
        // 获取锁失败处理
        acquireFailed(waitTime, unit, threadId);
        return false;
    }

    // 记录当前时间
    current = System.currentTimeMillis();
    // 订阅锁的释放消息,以便在锁被其他线程释放时通知当前线程
    RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    
    // 等待订阅的完成或超时
    if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
        // 如果订阅超时,尝试取消订阅
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.onComplete((res, e) -> {
                if (e == null) {
                    unsubscribe(subscribeFuture, threadId);
                }
            });
        }
        // 获取锁失败处理
        acquireFailed(waitTime, unit, threadId);
        return false;
    }

    try {
        // 更新剩余的等待时间
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            // 获取锁失败处理
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        // 循环尝试获取锁,直到成功或超时
        while (true) {
            long currentTime = System.currentTimeMillis();
            // 再次尝试获取锁
            ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
            // 如果锁成功获取
            if (ttl == null) {
                return true;
            }

            // 更新剩余的等待时间
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                // 获取锁失败处理
                acquireFailed(waitTime, unit, threadId);
                return false;
            }

            // 等待释放消息
            currentTime = System.currentTimeMillis();
            if (ttl >= 0 && ttl < time) {
                subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }

            // 更新剩余的等待时间
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                // 获取锁失败处理
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        }
    } finally {
        // 最后,取消订阅锁的释放消息
        unsubscribe(subscribeFuture, threadId);
    }
}

这个方法 tryLock 实现了一个带有超时功能的分布式锁获取逻辑。它会在指定的等待时间内反复尝试获取锁,并在锁释放后通过信号量机制及时响应。

标签:long,获取,源码,RedissonLock,threadId,续期,null,浅析,unit
From: https://blog.csdn.net/LittleStar_Cao/article/details/141533868

相关文章