首页 > 数据库 >Redis实战(黑马点评--分布式锁)

Redis实战(黑马点评--分布式锁)

时间:2023-07-06 21:56:16浏览次数:43  
标签:return -- Redis redis 线程 threadId call 分布式

基本原理和不同的实现方式

分布式锁的核心思想就是让大家共用同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路

 

  1. 可见性:多个线程都能看到相同的结果。

    注意:这里说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思

  2. 互斥:互斥是分布式锁的最基本条件,使得程序串行执行
  3. 高可用:程序不易崩溃,时时刻刻都保证较高的可用性
  4. 高性能:由于加锁本身就让性能降低,所以对于分布式锁需要他较高的加锁性能和释放锁性能
  5. 安全性:安全也是程序中必不可少的一环
  • 常见的分布式锁有三种

    1. MySQL:MySQL本身就带有锁机制,但是由于MySQL的性能一般,所以采用分布式锁的情况下,使用MySQL作为分布式锁比较少见
    2. Redis:Redis作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都是用Redis或者Zookeeper作为分布式锁,利用SETNX这个方法,如果插入Key成功,则表示获得到了锁,如果有人插入成功,那么其他人就回插入失败,无法获取到锁,利用这套逻辑完成互斥,从而实现分布式锁
    3. Zookeeper:Zookeeper也是企业级开发中较好的一种实现分布式锁的方案
MySQLRedisZooKeeper
互斥 利用mysql本身的互斥锁机制 利用setnx这样的互斥命令 利用节点的唯一性和有序性实现互斥
高可用
高性能 一般 一般
安全性 断开连接,自动释放锁 利用锁超时时间,到期释放 临时节点,断开连接自动释放

实现分布式锁

锁的接口

public interface ILock {
    /**
     * 尝试获取锁
     *
     * @param timeoutSec 锁持有的超时时间,过期自动释放
     * @return true表示获取锁成功,false表示获取锁失败
     */
    boolean tryLock(long timeoutSec);

    /**
     * 释放锁
     */
    void unlock();
}

然后创建一个SimpleRedisLock类实现接口

public class SimpleRedisLock implements ILock {
    //锁的前缀
    private static final String KEY_PREFIX = "lock:";
    //具体业务名称,将前缀和业务名拼接之后当做Key
    private String name;
    //这里不是@Autowired注入,采用的是构造器注入,在创建SimpleRedisLock时,将RedisTemplate作为参数传入
    private StringRedisTemplate stringRedisTemplate;

    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public boolean tryLock(long timeoutSec) {
        //获取线程标识
        long threadId = Thread.currentThread().getId();
        //获取锁,使用SETNX方法进行加锁,同时设置过期时间,防止死锁
        Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
        //自动拆箱可能会出现null,这样写更稳妥
        return Boolean.TRUE.equals(success);
    }

    @Override
    public void unlock() {
        //通过DEL来删除锁
        stringRedisTemplate.delete(KEY_PREFIX + name);
    }
}

修改业务代码

@Override
public Result seckillVoucher(Long voucherId) {
    LambdaQueryWrapper<SeckillVoucher> queryWrapper = new LambdaQueryWrapper<>();
    //1. 查询优惠券
    queryWrapper.eq(SeckillVoucher::getVoucherId, voucherId);
    SeckillVoucher seckillVoucher = seckillVoucherService.getOne(queryWrapper);
    //2. 判断秒杀时间是否开始
    if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) {
        return Result.fail("秒杀还未开始,请耐心等待");
    }
    //3. 判断秒杀时间是否结束
    if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
        return Result.fail("秒杀已经结束!");
    }
    //4. 判断库存是否充足
    if (seckillVoucher.getStock() < 1) {
        return Result.fail("优惠券已被抢光了哦,下次记得手速快点");
    }
    Long userId = UserHolder.getUser().getId();
    // 创建锁对象
    SimpleRedisLock redisLock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
    // 获取锁对象
    boolean isLock = redisLock.tryLock(120);
    // 加锁失败,说明当前用户开了多个线程抢优惠券,但是由于key是SETNX的,所以不能创建key,得等key的TTL到期或释放锁(删除key)
    if (!isLock) {
        return Result.fail("不允许抢多张优惠券");
    }
    try {
        // 获取代理对象
        IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
        return proxy.createVoucherOrder(voucherId);
    } finally {
        // 释放锁
        redisLock.unlock();
    }
}

Redis分布式锁误删情况说明

  • 逻辑说明
    • 持有锁的线程1在锁的内部出现了阻塞,导致他的锁TTL到期,自动释放
    • 此时线程2也来尝试获取锁,由于线程1已经释放了锁,所以线程2可以拿到
    • 但是现在线程1阻塞完了,继续往下执行,要开始释放锁了
    • 那么此时就会将属于线程2的锁释放,这就是误删别人锁的情况
  • 解决方案
    • 解决方案就是在每个线程释放锁的时候,都判断一下这个锁是不是自己的,如果不属于自己,则不进行删除操作。
    • 假设还是上面的情况,线程1阻塞,锁自动释放,线程2进入到锁的内部执行逻辑,此时线程1阻塞完了,继续往下执行,开始删除锁,但是线程1发现这把锁不是自己的,所以不进行删除锁的逻辑,当线程2执行到删除锁的逻辑时,如果TTL还未到期,则判断当前这把锁是自己的,于是删除这把锁

 

  • 需求:修改之前的分布式锁实现
  • 满足:在获取锁的时候存入线程标识(用UUID标识,在一个JVM中,ThreadId一般不会重复,但是我们现在是集群模式,有多个JVM,多个JVM之间可能会出现ThreadId重复的情况),在释放锁的时候先获取锁的线程标识,判断是否与当前线程标识一致。UUID区分不同JVM,然后线程号区分线程
    • 如果一致则释放锁
    • 如果不一致则不释放锁
  • 核心逻辑:在存入锁的时候,放入自己的线程标识,在删除锁的时候,判断当前这把锁是不是自己存入的
    • 如果是,则进行删除
    • 如果不是,则不进行删除
  • 具体实现代码如下
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(long timeoutSec) {
    // 获取线程标识
    String threadId = ID_PREFIX + Thread.currentThread().getId();
    // 获取锁
    Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
    return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
    // 获取当前线程的标识
    String threadId = ID_PREFIX + Thread.currentThread().getId();
    // 获取锁中的标识
    String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
    // 判断标识是否一致
    if (threadId.equals(id)) {
        // 释放锁
        stringRedisTemplate.delete(KEY_PREFIX + name);
    }
}

 

分布式锁的原子性问题

  • 更为极端的误删逻辑说明
  • 假设线程1已经获取了锁,在判断标识一致之后,准备释放锁的时候,又出现了阻塞(例如JVM垃圾回收机制)
  • 于是锁的TTL到期了,自动释放了
  • 那么现在线程2趁虚而入,拿到了一把锁
  • 但是线程1的逻辑还没执行完,那么线程1就会执行删除锁的逻辑
  • 但是在阻塞前线程1已经判断了标识一致,所以现在线程1把线程2的锁给删了
  • 那么就相当于判断标识那行代码没有起到作用
  • 这就是删锁时的原子性问题
  • 因为线程1的拿锁,判断标识,删锁,不是原子操作,所以我们要防止刚刚的情况

 

Lua脚本解决多条命令原子性问题

  • Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。
  • Lua是一种编程语言,它的基本语法可以上菜鸟教程看看,链接:https://www.runoob.com/lua/lua-tutorial.html
  • 这里重点介绍Redis提供的调用函数,我们可以使用Lua去操作Redis,而且还能保证它的原子性,这样就可以实现拿锁判断标识删锁是一个原子性动作了
  • Redis提供的调用函数语法如下
redis.call('命令名称','key','其他参数', ...)

例如我们要执行set name Kyle,则脚本是这样

redis.call('set', 'name', 'Kyle')

例如我我们要执行set name David,在执行get name,则脚本如下

# 先执行set name David
redis.call('set', 'name', 'David')
# 再执行get name
local name = redis.call('get', 'name')
# 返回
return name

写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下

EVAL script numkeys key [key ...] arg [arg ...]

例如,我们要调用redis.call('set', 'name', 'Kyle') 0这个脚本,语法如下

EVAL "return redis.call('set', 'name', 'Kyle')" 0

如果脚本中的key和value不想写死,可以作为参数传递,key类型参数会放入KEYS数组,其他参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组中获取这些参数

注意:在Lua中,数组下标从1开始

EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 name Lucy

脚本1

-- 线程标识
local threadId = "UUID-31"
-- 锁的key
local key = "lock:order:userId"
-- 获取锁中线程标识
local id = redis.call('get', key)
-- 比较线程标识与锁的标识是否一致
if (threadId == id) then
    -- 一致则释放锁 del key
    return redis.call('del', key)
end
return 0

脚本2

-- 这里的KEYS[1]就是传入锁的key
-- 这里的ARGV[1]就是线程标识
-- 比较锁中的线程标识与线程标识是否一致
if (redis.call('get', KEYS[1]) == ARGV[1]) then
    -- 一致则释放锁
    return redis.call('del', KEYS[1])
end
return 0

利用Java代码调用Lua脚本改造分布式锁

在RedisTemplate中,可以利用execute方法去执行lua脚本

public <T> T execute(RedisScript<T> script, List<K> keys, Object... args) {
    return this.scriptExecutor.execute(script, keys, args);
}

对应的Java代码如下

private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;

static {
    UNLOCK_SCRIPT = new DefaultRedisScript();
    UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
    UNLOCK_SCRIPT.setResultType(Long.class);
}

@Override
public void unlock() {
    stringRedisTemplate.execute(UNLOCK_SCRIPT,
            Collections.singletonList(KEY_PREFIX + name),
            ID_PREFIX + Thread.currentThread().getId());
}
  • 但是现在的分布式锁还存在一个问题:锁不住
    • 那什么是锁不住呢?
      • 如果锁的TTL快到期的时候,我们可以给它续期一下,比如续个30s,就好像是网吧上网,快没网费了的时候,让网管再给你续50块钱的,然后该玩玩,程序也继续往下执行
      • 那么续期问题怎么解决呢,可以依赖于我们接下来要学习redission了

分布式锁-Redisson

  • 基于SETNX实现的分布式锁存在以下问题
    1. 重入问题
      • 重入问题是指获取锁的线程,可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,例如在HashTable这样的代码中,它的方法都是使用synchronized修饰的,加入它在一个方法内调用另一个方法,如果此时是不可重入的,那就死锁了。所以可重入锁的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的
    2. 不可重试
      • 我们编写的分布式锁只能尝试一次,失败了就返回false,没有重试机制。但合理的情况应该是:当线程获取锁失败后,他应该能再次尝试获取锁
    3. 超时释放
      • 我们在加锁的时候增加了TTL,这样我们可以防止死锁,但是如果卡顿(阻塞)时间太长,也会导致锁的释放。虽然我们采用Lua脚本来防止删锁的时候,误删别人的锁,但现在的新问题是没锁住,也有安全隐患
    4. 主从一致性
      • 如果Redis提供了主从集群,那么当我们向集群写数据时,主机需要异步的将数据同步给从机,万一在同步之前,主机宕机了(主从同步存在延迟,虽然时间很短,但还是发生了),那么又会出现死锁问题
  • 那么什么是Redisson呢
    • Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现
  • Redis提供了分布式锁的多种多样功能
    1. 可重入锁(Reentrant Lock)
    2. 公平锁(Fair Lock)
    3. 联锁(MultiLock)
    4. 红锁(RedLock)
    5. 读写锁(ReadWriteLock)
    6. 信号量(Semaphore)
    7. 可过期性信号量(PermitExpirableSemaphore)
    8. 闭锁(CountDownLatch)

Redisson入门

导入依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
</dependency>

配置Redisson客户端,在config包下新建RedissonConfig

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer()
            .setAddress("redis://101.XXX.XXX.160:6379")
            .setPassword("root");
        return Redisson.create(config);
    }
}

使用Redisson的分布式锁

@Resource
private RedissonClient redissonClient;

@Test
void testRedisson() throws InterruptedException {
    //获取可重入锁
    RLock lock = redissonClient.getLock("anyLock");
    //尝试获取锁,三个参数分别是:获取锁的最大等待时间(期间会重试),锁的自动释放时间,时间单位
    boolean success = lock.tryLock(1,10, TimeUnit.SECONDS);
    //判断获取锁成功
    if (success) {
        try {
            System.out.println("执行业务");
        } finally {
            //释放锁
            lock.unlock();
        }
    }
}

替换我们之前自己写的分布式锁

@Resource
private RedissonClient redissonClient;
@Override
public Result seckillVoucher(Long voucherId) {
    LambdaQueryWrapper<SeckillVoucher> queryWrapper = new LambdaQueryWrapper<>();
    //1. 查询优惠券
    queryWrapper.eq(SeckillVoucher::getVoucherId, voucherId);
    SeckillVoucher seckillVoucher = seckillVoucherService.getOne(queryWrapper);
    //2. 判断秒杀时间是否开始
    if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) {
        return Result.fail("秒杀还未开始,请耐心等待");
    }
    //3. 判断秒杀时间是否结束
    if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
        return Result.fail("秒杀已经结束!");
    }
    //4. 判断库存是否充足
    if (seckillVoucher.getStock() < 1) {
        return Result.fail("优惠券已被抢光了哦,下次记得手速快点");
    }
    Long userId = UserHolder.getUser().getId();
    RLock redisLock = redissonClient.getLock("order:" + userId);
    boolean isLock = redisLock.tryLock();
    if (!isLock) {
        return Result.fail("不允许抢多张优惠券");
    }
    try {
        IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
        return proxy.createVoucherOrder(voucherId);
    } finally {
        redisLock.unlock();
    }
}

Redisson的可重入锁原理

  • 在Lock锁中,他是借助于等曾的一个voaltile的一个state变量来记录重入的状态的
    • 如果当前没有人持有这把锁,那么state = 0
    • 如果人持有这把锁,那么state = 1
      • 如果持有者把锁的人再次持有这把锁,那么state会+1
    • 如果对于synchronize而言,他在c语言代码中会有一个count
    • 原理与state类似,也是重入一次就+1,释放一次就-1,直至减到0,表示这把锁没有被人持有
  • 在redisson中,我们也支持可重入锁

    • 在分布式锁中,它采用hash结构来存储锁,其中外层key表示这把锁是否存在,内层key则记录当前这把锁被哪个线程持有
  • method1在方法内部调用method2,method1和method2出于同一个线程,那么method1已经拿到一把锁了,想进入method2中拿另外一把锁,必然是拿不到的,于是就出现了死锁

 为了保证原子性,所以流程图中的业务逻辑也是需要我们用Lua来实现的

 获取锁的逻辑

local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 锁不存在
if (redis.call('exists', key) == 0) then
    -- 获取锁并添加线程标识,state设为1
    redis.call('hset', key, threadId, '1');
    -- 设置锁有效期
    redis.call('expire', key, releaseTime);
    return 1; -- 返回结果
end;
-- 锁存在,判断threadId是否为自己
if (redis.call('hexists', key, threadId) == 1) then
    -- 锁存在,重入次数 +1,这里用的是hash结构的incrby增长
    redis.call('hincrby', key, thread, 1);
    -- 设置锁的有效期
    redis.call('expire', key, releaseTime);
    return 1; -- 返回结果
end;
return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败

释放锁的逻辑

local key = KEYS[1];
local threadId = ARGV[1];
local releaseTime = ARGV[2];
-- 如果锁不是自己的
if (redis.call('HEXISTS', key, threadId) == 0) then
    return nil; -- 直接返回
end;
-- 锁是自己的,锁计数-1,还是用hincrby,不过自增长的值为-1
local count = redis.call('hincrby', key, threadId, -1);
-- 判断重入次数为多少
if (count > 0) then
    -- 大于0,重置有效期
    redis.call('expire', key, releaseTime);
    return nil;
else
    -- 否则直接释放锁
    redis.call('del', key);
    return nil;
end;

获取锁源码
查看源码,跟我们的实现方式几乎一致

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    this.internalLockLeaseTime = unit.toMillis(leaseTime);
    return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}

释放锁源码

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId));
}

Redisson锁重试和WatchDog机制

前面我们分析的是空参的tryLock方法,现在我们来分析一下这个带参数的

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    this.internalLockLeaseTime = unit.toMillis(leaseTime);
    return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
  • 源码分析
  • tryAcquireAsync

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    
    if (leaseTime != -1L) {
        return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        // 如果没有指定释放时间时间,则指定默认释放时间为getLockWatchdogTimeout,底层源码显示是30*1000ms,也就是30秒
        RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e == null) {
                if (ttlRemaining == null) {
                    this.scheduleExpirationRenewal(threadId);
                }

            }
        });
        return ttlRemainingFuture;
    }
}

tryLock

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
        //判断ttl是否为null
        if (ttl == null) {
            return true;
        } else {
            //计算当前时间与获取锁时间的差值,让等待时间减去这个值
            time -= System.currentTimeMillis() - current;
            //如果消耗时间太长了,直接返回false,获取锁失败
            if (time <= 0L) {
                this.acquireFailed(waitTime, unit, threadId);
                return false;
            } else {
                //等待时间还有剩余,再次获取当前时间
                current = System.currentTimeMillis();
                //订阅别人释放锁的信号
                RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
                //在剩余时间内,等待这个信号
                if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
                    if (!subscribeFuture.cancel(false)) {
                        subscribeFuture.onComplete((res, e) -> {
                            if (e == null) {
                                //取消订阅
                                this.unsubscribe(subscribeFuture, threadId);
                            }

                        });
                    }
                    //剩余时间内没等到,返回false
                    this.acquireFailed(waitTime, unit, threadId);
                    return false;
                } else {
                    try {
                        //如果剩余时间内等到了别人释放锁的信号,再次计算当前剩余最大等待时间
                        time -= System.currentTimeMillis() - current;
                        if (time <= 0L) {
                            //如果剩余时间为负数,则直接返回false
                            this.acquireFailed(waitTime, unit, threadId);
                            boolean var20 = false;
                            return var20;
                        } else {
                            boolean var16;
                            do {
                                //如果剩余时间等到了,dowhile循环重试获取锁
                                long currentTime = System.currentTimeMillis();
                                ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
                                if (ttl == null) {
                                    var16 = true;
                                    return var16;
                                }

                                time -= System.currentTimeMillis() - currentTime;
                                if (time <= 0L) {
                                    this.acquireFailed(waitTime, unit, threadId);
                                    var16 = false;
                                    return var16;
                                }

                                currentTime = System.currentTimeMillis();
                                if (ttl >= 0L && ttl < time) {
                                    ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                                } else {
                                    ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                                }

                                time -= System.currentTimeMillis() - currentTime;
                            } while(time > 0L);

                            this.acquireFailed(waitTime, unit, threadId);
                            var16 = false;
                            return var16;
                        }
                    } finally {
                        this.unsubscribe(subscribeFuture, threadId);
                    }
                }
            }
        }
    }

scheduleExpirationRenewal

private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();  
    //不存在,才put,表明是第一次进入,不是重入
    ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        //如果是第一次进入,则跟新有效期
        entry.addThreadId(threadId);
        this.renewExpiration();
    }
}

renewExpiration

private void renewExpiration() {
    ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    if (ee != null) {
        //Timeout是一个定时任务
        Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = (ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
                if (ent != null) {
                    Long threadId = ent.getFirstThreadId();
                    if (threadId != null) {
                        //重置有效期
                        RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
                        future.onComplete((res, e) -> {
                            if (e != null) {
                                RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
                            } else {
                                if (res) {
                                    //然后调用自己,递归重置有效期
                                    RedissonLock.this.renewExpiration();
                                }

                            }
                        });
                    }
                }
            }
            //internalLockLeaseTime是之前WatchDog默认有效期30秒,那这里就是 30 / 3 = 10秒之后,才会执行
        }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
        ee.setTimeout(task);
    }
}

renewExpirationAsync
重点看lua脚本,先判断锁是不是自己的,然后更新有效时间

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
  • 那么之前的重置有效期的行为该怎么终止呢?当然是释放锁的时候会终止
  • cancelExpirationRenewal
void cancelExpirationRenewal(Long threadId) {
    //将之前的线程终止掉
    ExpirationEntry task = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    if (task != null) {
        if (threadId != null) {
            task.removeThreadId(threadId);
        }

        if (threadId == null || task.hasNoThreads()) {
            //获取之前的定时任务
            Timeout timeout = task.getTimeout();
            if (timeout != null) {
                //取消
                timeout.cancel();
            }

            EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
        }

    }
}

 

Redisson锁的MutiLock原理

  • 为了提高Redis的可用性,我们会搭建集群或者主从,现在以主从为例
  • 此时我们去写命令,写在主机上,主机会将数据同步给从机,但是假设主机还没来得及把数据写入到从机去的时候,主机宕机了
  • 哨兵会发现主机宕机了,于是选举一个slave(从机)变成master(主机),而此时新的master(主机)上并没有锁的信息,那么其他线程就可以获取锁,又会引发安全问题
  • 为了解决这个问题。Redisson提出来了MutiLock锁,使用这把锁的话,那我们就不用主从了,每个节点的地位都是一样的,都可以当做是主机,那我们就需要将加锁的逻辑写入到每一个主从节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获取锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性

  • 我们先使用虚拟机额外搭建两个Redis节点

@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.137.130:6379")
                .setPassword("root");
        return Redisson.create(config);
    }

    @Bean
    public RedissonClient redissonClient2() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://92.168.137.131:6379")
                .setPassword("root");
        return Redisson.create(config);
    }

    @Bean
    public RedissonClient redissonClient3() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://92.168.137.132:6379")
                .setPassword("root");
        return Redisson.create(config);
    }
}

使用联锁,我们首先要注入三个RedissonClient对象

@Resource
private RedissonClient redissonClient;
@Resource
private RedissonClient redissonClient2;
@Resource
private RedissonClient redissonClient3;

private RLock lock;

@BeforeEach
void setUp() {
    RLock lock1 = redissonClient.getLock("lock");
    RLock lock2 = redissonClient2.getLock("lock");
    RLock lock3 = redissonClient3.getLock("lock");
    lock = redissonClient.getMultiLock(lock1, lock2, lock3);
}

@Test
void method1() {
    boolean success = lock.tryLock();
    redissonClient.getMultiLock();
    if (!success) {
        log.error("获取锁失败,1");
        return;
    }
    try {
        log.info("获取锁成功");
        method2();
    } finally {
        log.info("释放锁,1");
        lock.unlock();
    }
}

void method2() {
    RLock lock = redissonClient.getLock("lock");
    boolean success = lock.tryLock();
    if (!success) {
        log.error("获取锁失败,2");
        return;
    }
    try {
        log.info("获取锁成功,2");
    } finally {
        log.info("释放锁,2");
        lock.unlock();
    }
}
  • 源码分析

  • 当我们没有传入锁对象来创建联锁的时候,则会抛出一个异常,反之则将我们传入的可变参数锁对象封装成一个集合
public RedissonMultiLock(RLock... locks) {
    if (locks.length == 0) {
        throw new IllegalArgumentException("Lock objects are not defined");
    } else {
        this.locks.addAll(Arrays.asList(locks));
    }
}

联锁的tryLock

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long newLeaseTime = -1L;
    //如果传入了释放时间
    if (leaseTime != -1L) {
        //再判断一下是否有等待时间
        if (waitTime == -1L) {
            //如果没传等待时间,不重试,则只获得一次
            newLeaseTime = unit.toMillis(leaseTime);
        } else {
            //想要重试,耗时较久,万一释放时间小于等待时间,则会有问题,所以这里将等待时间乘以二
            newLeaseTime = unit.toMillis(waitTime) * 2L;
        }
    }
    //获取当前时间
    long time = System.currentTimeMillis();
    //剩余等待时间
    long remainTime = -1L;
    if (waitTime != -1L) {
        remainTime = unit.toMillis(waitTime);
    }
    //锁等待时间,与剩余等待时间一样    
    long lockWaitTime = this.calcLockWaitTime(remainTime);
    //锁失败的限制,源码返回是的0
    int failedLocksLimit = this.failedLocksLimit();
    //已经获取成功的锁
    List<RLock> acquiredLocks = new ArrayList(this.locks.size());
    //迭代器,用于遍历
    ListIterator<RLock> iterator = this.locks.listIterator();

    while(iterator.hasNext()) {
        RLock lock = (RLock)iterator.next();

        boolean lockAcquired;
        try {
            //没有等待时间和释放时间,调用空参的tryLock
            if (waitTime == -1L && leaseTime == -1L) {
                lockAcquired = lock.tryLock();
            } else {
                //否则调用带参的tryLock
                long awaitTime = Math.min(lockWaitTime, remainTime);
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
            }
        } catch (RedisResponseTimeoutException var21) {
            this.unlockInner(Arrays.asList(lock));
            lockAcquired = false;
        } catch (Exception var22) {
            lockAcquired = false;
        }
        //判断获取锁是否成功
        if (lockAcquired) {
            //成功则将锁放入成功锁的集合
            acquiredLocks.add(lock);
        } else {
            //如果获取锁失败
            //判断当前锁的数量,减去成功获取锁的数量,如果为0,则所有锁都成功获取,跳出循环
            if (this.locks.size() - acquiredLocks.size() == this.failedLocksLimit()) {
                break;
            }
            //否则将拿到的锁都释放掉
            if (failedLocksLimit == 0) {
                this.unlockInner(acquiredLocks);
                //如果等待时间为-1,则不想重试,直接返回false
                if (waitTime == -1L) {
                    return false;
                }

                failedLocksLimit = this.failedLocksLimit();
                //将已经拿到的锁都清空
                acquiredLocks.clear();
                //将迭代器往前迭代,相当于重置指针,放到第一个然后重试获取锁
                while(iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
                --failedLocksLimit;
            }
        }
        //如果剩余时间不为-1,很充足
        if (remainTime != -1L) {
            //计算现在剩余时间
            remainTime -= System.currentTimeMillis() - time;
            time = System.currentTimeMillis();
            //如果剩余时间为负数,则获取锁超时了
            if (remainTime <= 0L) {
                //将之前已经获取到的锁释放掉,并返回false
                this.unlockInner(acquiredLocks);
                //联锁成功的条件是:每一把锁都必须成功获取,一把锁失败,则都失败
                return false;
            }
        }
    }
    //如果设置了锁的有效期
    if (leaseTime != -1L) {
        List<RFuture<Boolean>> futures = new ArrayList(acquiredLocks.size());
        //迭代器用于遍历已经获取成功的锁
        Iterator var24 = acquiredLocks.iterator();

        while(var24.hasNext()) {
            RLock rLock = (RLock)var24.next();
            //设置每一把锁的有效期
            RFuture<Boolean> future = ((RedissonLock)rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
            futures.add(future);
        }

        var24 = futures.iterator();

        while(var24.hasNext()) {
            RFuture<Boolean> rFuture = (RFuture)var24.next();
            rFuture.syncUninterruptibly();
        }
    }
    //但如果没设置有效期,则会触发WatchDog机制,自动帮我们设置有效期,所以大多数情况下,我们不需要自己设置有效期
    return true;
}

 

标签:return,--,Redis,redis,线程,threadId,call,分布式
From: https://www.cnblogs.com/szhNJUPT/p/17533436.html

相关文章

  • 上线
    昨日回顾#1搜索接口 -后期扩展 -1做全文检索---》分布式全文检索引擎-2不仅仅搜索实战课:搜其它课程,老师,文章有这些关键字都能搜出来-咱们的只是搜实战课#2搜索前端,搜索结果页面#3支付宝支付-支付宝:元-微信:分-银联-流程:在咱......
  • vim 常用命令
    vim进入vimi:command->editesc:edit->command:wqa.cpp保存->退出(文件名:a.cpp):syntaxon:高亮:setnumber显示行号:q!不保存->退出vim~/.vimrc命令模式下:w跳过下一个单词h左j上k下l右b往回跳一个单词ctrl+f往下翻页ctrl+b往上翻页......
  • 119子类依旧使用父类的属性和方法
    classPhone:IMEI=2020001producer="apple"defcall_by_4g(self):print("4g通话")classMyPhone2(Phone):IMEI=2023001producer="banana"defcall_by_4g(self):old_return_value=super......
  • QOJ 5500. Bars / NOIP 模拟赛 20230706 B 进阶版--zhengjun
    本题转化为梯形面积就已经不是很好想了(赛时切掉,开心!)进阶为静态区间查询。使用不删除莫队+凸包合并凸包合并就是把散块和整块的凸包合并注意这里两个凸包的横坐标值域是无交的于是可以使用二分套二分解决此问题代码咕着,感觉非常难写......
  • 关于JS定时器的整理
    在JS中定时器有非常大的作用,例如:执行延迟操作:使用setTimeout可以在一定的延迟后执行特定的代码。这对于需要在一定时间后执行某些操作的情况非常有用,例如延迟显示提示信息、执行动画效果等。定期刷新数据:使用setInterval可以定期执行某段代码,例如定时从服务器获取最新数据并......
  • this version of the Java Runtime only recognizes class file versions up to 55.0
    问题:  运行SpringBootdemo时报错: thisversionoftheJavaRuntimeonlyrecognizesclassfileversionsupto55.0at原因:   编译版本和运行版本不一致,具体原因是编译版本高于运行版本,SpringBootdemo中使用的是jdk17,我本地的jdk是11 解决:  调整idea中的jd......
  • 2023年度计划
    2023年度计划暑假计划语文:所有初中语文必背内容及注释 会背会默坚持练习《万唯》阅读题尝试练习行楷字体阅读《红星照耀中国》《昆虫记》《雾都孤儿》数学:初中数学内容学完(借助《一本涂书》《万唯中考试题研究》),北京卷120分,河南卷115分以上高中数学《必修一》学习部......
  • pandas打开加密的excel
    pandas打开加密的excelimportpandasaspdimportosimportioimportdatetimefile_temp=io.BytesIO()withopen(io,"rb")asf:file=msoffcrypto.OfficeFile(f)file.load_key(password)file.decrypt(file_temp)#file.decrypt(open(p......
  • 自己动手实现rpc框架(二) 实现集群间rpc通信
    自己动手实现rpc框架(二)实现集群间rpc通信1.集群间rpc通信上一篇博客中MyRpc框架实现了基本的点对点rpc通信功能。而在这篇博客中我们需要实现MyRpc的集群间rpc通信功能。自己动手实现rpc框架(一)实现点对点的rpc通信上篇博客的点对点rpc通信实现中,客户端和服务端的ip......
  • autofac的使用
    安装autofac安装autofac.mvc   注册代码 varbuilder=newContainerBuilder();builder.RegisterControllers(typeof(MvcApplication).Assembly).PropertiesAutowired();//注册所有实现了IDependency接口的类型TypebaseT......