基本原理和不同的实现方式
分布式锁的核心思想就是让大家共用同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路
- 可见性:多个线程都能看到相同的结果。
注意:这里说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思
- 互斥:互斥是分布式锁的最基本条件,使得程序串行执行
- 高可用:程序不易崩溃,时时刻刻都保证较高的可用性
- 高性能:由于加锁本身就让性能降低,所以对于分布式锁需要他较高的加锁性能和释放锁性能
- 安全性:安全也是程序中必不可少的一环
-
常见的分布式锁有三种
- MySQL:MySQL本身就带有锁机制,但是由于MySQL的性能一般,所以采用分布式锁的情况下,使用MySQL作为分布式锁比较少见
- Redis:Redis作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都是用Redis或者Zookeeper作为分布式锁,利用
SETNX
这个方法,如果插入Key成功,则表示获得到了锁,如果有人插入成功,那么其他人就回插入失败,无法获取到锁,利用这套逻辑完成互斥
,从而实现分布式锁 - Zookeeper:Zookeeper也是企业级开发中较好的一种实现分布式锁的方案
MySQL | Redis | ZooKeeper | |
---|---|---|---|
互斥 | 利用mysql本身的互斥锁机制 | 利用setnx这样的互斥命令 | 利用节点的唯一性和有序性实现互斥 |
高可用 | 好 | 好 | 好 |
高性能 | 一般 | 好 | 一般 |
安全性 | 断开连接,自动释放锁 | 利用锁超时时间,到期释放 | 临时节点,断开连接自动释放 |
实现分布式锁
锁的接口
public interface ILock { /** * 尝试获取锁 * * @param timeoutSec 锁持有的超时时间,过期自动释放 * @return true表示获取锁成功,false表示获取锁失败 */ boolean tryLock(long timeoutSec); /** * 释放锁 */ void unlock(); }
然后创建一个SimpleRedisLock类实现接口
public class SimpleRedisLock implements ILock { //锁的前缀 private static final String KEY_PREFIX = "lock:"; //具体业务名称,将前缀和业务名拼接之后当做Key private String name; //这里不是@Autowired注入,采用的是构造器注入,在创建SimpleRedisLock时,将RedisTemplate作为参数传入 private StringRedisTemplate stringRedisTemplate; public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) { this.name = name; this.stringRedisTemplate = stringRedisTemplate; } @Override public boolean tryLock(long timeoutSec) { //获取线程标识 long threadId = Thread.currentThread().getId(); //获取锁,使用SETNX方法进行加锁,同时设置过期时间,防止死锁 Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS); //自动拆箱可能会出现null,这样写更稳妥 return Boolean.TRUE.equals(success); } @Override public void unlock() { //通过DEL来删除锁 stringRedisTemplate.delete(KEY_PREFIX + name); } }
修改业务代码
@Override public Result seckillVoucher(Long voucherId) { LambdaQueryWrapper<SeckillVoucher> queryWrapper = new LambdaQueryWrapper<>(); //1. 查询优惠券 queryWrapper.eq(SeckillVoucher::getVoucherId, voucherId); SeckillVoucher seckillVoucher = seckillVoucherService.getOne(queryWrapper); //2. 判断秒杀时间是否开始 if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) { return Result.fail("秒杀还未开始,请耐心等待"); } //3. 判断秒杀时间是否结束 if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) { return Result.fail("秒杀已经结束!"); } //4. 判断库存是否充足 if (seckillVoucher.getStock() < 1) { return Result.fail("优惠券已被抢光了哦,下次记得手速快点"); } Long userId = UserHolder.getUser().getId(); // 创建锁对象 SimpleRedisLock redisLock = new SimpleRedisLock("order:" + userId, stringRedisTemplate); // 获取锁对象 boolean isLock = redisLock.tryLock(120); // 加锁失败,说明当前用户开了多个线程抢优惠券,但是由于key是SETNX的,所以不能创建key,得等key的TTL到期或释放锁(删除key) if (!isLock) { return Result.fail("不允许抢多张优惠券"); } try { // 获取代理对象 IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { // 释放锁 redisLock.unlock(); } }
Redis分布式锁误删情况说明
逻辑说明
- 持有锁的线程1在锁的内部出现了阻塞,导致他的锁TTL到期,自动释放
- 此时线程2也来尝试获取锁,由于线程1已经释放了锁,所以线程2可以拿到
- 但是现在线程1阻塞完了,继续往下执行,要开始释放锁了
- 那么此时就会将属于线程2的锁释放,这就是误删别人锁的情况
解决方案
- 解决方案就是在每个线程释放锁的时候,都判断一下这个锁是不是自己的,如果不属于自己,则不进行删除操作。
- 假设还是上面的情况,线程1阻塞,锁自动释放,线程2进入到锁的内部执行逻辑,此时线程1阻塞完了,继续往下执行,开始删除锁,但是线程1发现这把锁不是自己的,所以不进行删除锁的逻辑,当线程2执行到删除锁的逻辑时,如果TTL还未到期,则判断当前这把锁是自己的,于是删除这把锁
- 需求:修改之前的分布式锁实现
- 满足:在获取锁的时候存入线程标识(用UUID标识,在一个JVM中,ThreadId一般不会重复,但是我们现在是集群模式,有多个JVM,多个JVM之间可能会出现ThreadId重复的情况),在释放锁的时候先获取锁的线程标识,判断是否与当前线程标识一致。UUID区分不同JVM,然后线程号区分线程
- 如果一致则释放锁
- 如果不一致则不释放锁
- 核心逻辑:在存入锁的时候,放入自己的线程标识,在删除锁的时候,判断当前这把锁是不是自己存入的
- 如果是,则进行删除
- 如果不是,则不进行删除
- 具体实现代码如下
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-"; @Override public boolean tryLock(long timeoutSec) { // 获取线程标识 String threadId = ID_PREFIX + Thread.currentThread().getId(); // 获取锁 Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } @Override public void unlock() { // 获取当前线程的标识 String threadId = ID_PREFIX + Thread.currentThread().getId(); // 获取锁中的标识 String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); // 判断标识是否一致 if (threadId.equals(id)) { // 释放锁 stringRedisTemplate.delete(KEY_PREFIX + name); } }
分布式锁的原子性问题
- 更为极端的误删逻辑说明
- 假设线程1已经获取了锁,在判断标识一致之后,准备释放锁的时候,又出现了阻塞(例如JVM垃圾回收机制)
- 于是锁的TTL到期了,自动释放了
- 那么现在线程2趁虚而入,拿到了一把锁
- 但是线程1的逻辑还没执行完,那么线程1就会执行删除锁的逻辑
- 但是在阻塞前线程1已经判断了标识一致,所以现在线程1把线程2的锁给删了
- 那么就相当于判断标识那行代码没有起到作用
- 这就是删锁时的原子性问题
- 因为线程1的拿锁,判断标识,删锁,不是原子操作,所以我们要防止刚刚的情况
Lua脚本解决多条命令原子性问题
- Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。
- Lua是一种编程语言,它的基本语法可以上菜鸟教程看看,链接:https://www.runoob.com/lua/lua-tutorial.html
- 这里重点介绍Redis提供的调用函数,我们可以使用Lua去操作Redis,而且还能保证它的原子性,这样就可以实现
拿锁
,判断标识
,删锁
是一个原子性动作了 - Redis提供的调用函数语法如下
redis.call('命令名称','key','其他参数', ...)
例如我们要执行set name Kyle
,则脚本是这样
redis.call('set', 'name', 'Kyle')
例如我我们要执行set name David
,在执行get name
,则脚本如下
# 先执行set name David redis.call('set', 'name', 'David') # 再执行get name local name = redis.call('get', 'name') # 返回 return name
写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下
EVAL script numkeys key [key ...] arg [arg ...]
例如,我们要调用redis.call('set', 'name', 'Kyle') 0
这个脚本,语法如下
EVAL "return redis.call('set', 'name', 'Kyle')" 0
如果脚本中的key和value不想写死,可以作为参数传递,key类型参数会放入KEYS数组,其他参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组中获取这些参数
注意:在Lua中,数组下标从1开始
EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 name Lucy
脚本1
-- 线程标识 local threadId = "UUID-31" -- 锁的key local key = "lock:order:userId" -- 获取锁中线程标识 local id = redis.call('get', key) -- 比较线程标识与锁的标识是否一致 if (threadId == id) then -- 一致则释放锁 del key return redis.call('del', key) end return 0
脚本2
-- 这里的KEYS[1]就是传入锁的key -- 这里的ARGV[1]就是线程标识 -- 比较锁中的线程标识与线程标识是否一致 if (redis.call('get', KEYS[1]) == ARGV[1]) then -- 一致则释放锁 return redis.call('del', KEYS[1]) end return 0
利用Java代码调用Lua脚本改造分布式锁
在RedisTemplate中,可以利用execute方法去执行lua脚本
public <T> T execute(RedisScript<T> script, List<K> keys, Object... args) { return this.scriptExecutor.execute(script, keys, args); }
对应的Java代码如下
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT; static { UNLOCK_SCRIPT = new DefaultRedisScript(); UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua")); UNLOCK_SCRIPT.setResultType(Long.class); } @Override public void unlock() { stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name), ID_PREFIX + Thread.currentThread().getId()); }
- 但是现在的分布式锁还存在一个问题:锁不住
- 那什么是锁不住呢?
- 如果锁的TTL快到期的时候,我们可以给它续期一下,比如续个30s,就好像是网吧上网,快没网费了的时候,让网管再给你续50块钱的,然后该玩玩,程序也继续往下执行
- 那么续期问题怎么解决呢,可以依赖于我们接下来要学习redission了
- 那什么是锁不住呢?
分布式锁-Redisson
- 基于SETNX实现的分布式锁存在以下问题
- 重入问题
- 重入问题是指获取锁的线程,可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,例如在HashTable这样的代码中,它的方法都是使用synchronized修饰的,加入它在一个方法内调用另一个方法,如果此时是不可重入的,那就死锁了。所以可重入锁的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的
- 不可重试
- 我们编写的分布式锁只能尝试一次,失败了就返回false,没有重试机制。但合理的情况应该是:当线程获取锁失败后,他应该能再次尝试获取锁
- 超时释放
- 我们在加锁的时候增加了TTL,这样我们可以防止死锁,但是如果卡顿(阻塞)时间太长,也会导致锁的释放。虽然我们采用Lua脚本来防止删锁的时候,误删别人的锁,但现在的新问题是没锁住,也有安全隐患
- 主从一致性
- 如果Redis提供了主从集群,那么当我们向集群写数据时,主机需要异步的将数据同步给从机,万一在同步之前,主机宕机了(主从同步存在延迟,虽然时间很短,但还是发生了),那么又会出现死锁问题
- 重入问题
- 那么什么是Redisson呢
- Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现
- Redis提供了分布式锁的多种多样功能
- 可重入锁(Reentrant Lock)
- 公平锁(Fair Lock)
- 联锁(MultiLock)
- 红锁(RedLock)
- 读写锁(ReadWriteLock)
- 信号量(Semaphore)
- 可过期性信号量(PermitExpirableSemaphore)
- 闭锁(CountDownLatch)
Redisson入门
导入依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.6</version> </dependency>
配置Redisson客户端,在config包下新建RedissonConfig
类
import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient() { Config config = new Config(); config.useSingleServer() .setAddress("redis://101.XXX.XXX.160:6379") .setPassword("root"); return Redisson.create(config); } }
使用Redisson的分布式锁
@Resource private RedissonClient redissonClient; @Test void testRedisson() throws InterruptedException { //获取可重入锁 RLock lock = redissonClient.getLock("anyLock"); //尝试获取锁,三个参数分别是:获取锁的最大等待时间(期间会重试),锁的自动释放时间,时间单位 boolean success = lock.tryLock(1,10, TimeUnit.SECONDS); //判断获取锁成功 if (success) { try { System.out.println("执行业务"); } finally { //释放锁 lock.unlock(); } } }
替换我们之前自己写的分布式锁
@Resource private RedissonClient redissonClient; @Override public Result seckillVoucher(Long voucherId) { LambdaQueryWrapper<SeckillVoucher> queryWrapper = new LambdaQueryWrapper<>(); //1. 查询优惠券 queryWrapper.eq(SeckillVoucher::getVoucherId, voucherId); SeckillVoucher seckillVoucher = seckillVoucherService.getOne(queryWrapper); //2. 判断秒杀时间是否开始 if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) { return Result.fail("秒杀还未开始,请耐心等待"); } //3. 判断秒杀时间是否结束 if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) { return Result.fail("秒杀已经结束!"); } //4. 判断库存是否充足 if (seckillVoucher.getStock() < 1) { return Result.fail("优惠券已被抢光了哦,下次记得手速快点"); } Long userId = UserHolder.getUser().getId(); RLock redisLock = redissonClient.getLock("order:" + userId); boolean isLock = redisLock.tryLock(); if (!isLock) { return Result.fail("不允许抢多张优惠券"); } try { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { redisLock.unlock(); } }
Redisson的可重入锁原理
- 在Lock锁中,他是借助于等曾的一个voaltile的一个state变量来记录重入的状态的
- 如果当前
没有
人持有这把锁,那么state = 0
- 如果
有
人持有这把锁,那么state = 1
- 如果持有者把锁的人再次持有这把锁,那么state会
+1
- 如果持有者把锁的人再次持有这把锁,那么state会
- 如果对于
synchronize
而言,他在c语言代码中会有一个count - 原理与
state
类似,也是重入一次就+1
,释放一次就-1
,直至减到0,表示这把锁没有被人持有
- 如果当前
-
在redisson中,我们也支持可重入锁
- 在分布式锁中,它采用hash结构来存储锁,其中外层key表示这把锁是否存在,内层key则记录当前这把锁被哪个线程持有
-
method1在方法内部调用method2,method1和method2出于同一个线程,那么method1已经拿到一把锁了,想进入method2中拿另外一把锁,必然是拿不到的,于是就出现了死锁
为了保证原子性,所以流程图中的业务逻辑也是需要我们用Lua来实现的
获取锁的逻辑
local key = KEYS[1]; -- 锁的key local threadId = ARGV[1]; -- 线程唯一标识 local releaseTime = ARGV[2]; -- 锁的自动释放时间 -- 锁不存在 if (redis.call('exists', key) == 0) then -- 获取锁并添加线程标识,state设为1 redis.call('hset', key, threadId, '1'); -- 设置锁有效期 redis.call('expire', key, releaseTime); return 1; -- 返回结果 end; -- 锁存在,判断threadId是否为自己 if (redis.call('hexists', key, threadId) == 1) then -- 锁存在,重入次数 +1,这里用的是hash结构的incrby增长 redis.call('hincrby', key, thread, 1); -- 设置锁的有效期 redis.call('expire', key, releaseTime); return 1; -- 返回结果 end; return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败
释放锁的逻辑
local key = KEYS[1]; local threadId = ARGV[1]; local releaseTime = ARGV[2]; -- 如果锁不是自己的 if (redis.call('HEXISTS', key, threadId) == 0) then return nil; -- 直接返回 end; -- 锁是自己的,锁计数-1,还是用hincrby,不过自增长的值为-1 local count = redis.call('hincrby', key, threadId, -1); -- 判断重入次数为多少 if (count > 0) then -- 大于0,重置有效期 redis.call('expire', key, releaseTime); return nil; else -- 否则直接释放锁 redis.call('del', key); return nil; end;
获取锁源码
查看源码,跟我们的实现方式几乎一致
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { this.internalLockLeaseTime = unit.toMillis(leaseTime); return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId)); }
释放锁源码
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)); }
Redisson锁重试和WatchDog机制
前面我们分析的是空参的tryLock方法,现在我们来分析一下这个带参数的
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { this.internalLockLeaseTime = unit.toMillis(leaseTime); return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId)); }
- 源码分析
-
tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1L) { return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { // 如果没有指定释放时间时间,则指定默认释放时间为getLockWatchdogTimeout,底层源码显示是30*1000ms,也就是30秒 RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e == null) { if (ttlRemaining == null) { this.scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } }
tryLock
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId); //判断ttl是否为null if (ttl == null) { return true; } else { //计算当前时间与获取锁时间的差值,让等待时间减去这个值 time -= System.currentTimeMillis() - current; //如果消耗时间太长了,直接返回false,获取锁失败 if (time <= 0L) { this.acquireFailed(waitTime, unit, threadId); return false; } else { //等待时间还有剩余,再次获取当前时间 current = System.currentTimeMillis(); //订阅别人释放锁的信号 RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId); //在剩余时间内,等待这个信号 if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { //取消订阅 this.unsubscribe(subscribeFuture, threadId); } }); } //剩余时间内没等到,返回false this.acquireFailed(waitTime, unit, threadId); return false; } else { try { //如果剩余时间内等到了别人释放锁的信号,再次计算当前剩余最大等待时间 time -= System.currentTimeMillis() - current; if (time <= 0L) { //如果剩余时间为负数,则直接返回false this.acquireFailed(waitTime, unit, threadId); boolean var20 = false; return var20; } else { boolean var16; do { //如果剩余时间等到了,dowhile循环重试获取锁 long currentTime = System.currentTimeMillis(); ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null) { var16 = true; return var16; } time -= System.currentTimeMillis() - currentTime; if (time <= 0L) { this.acquireFailed(waitTime, unit, threadId); var16 = false; return var16; } currentTime = System.currentTimeMillis(); if (ttl >= 0L && ttl < time) { ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; } while(time > 0L); this.acquireFailed(waitTime, unit, threadId); var16 = false; return var16; } } finally { this.unsubscribe(subscribeFuture, threadId); } } } } }
scheduleExpirationRenewal
private void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); //不存在,才put,表明是第一次进入,不是重入 ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { //如果是第一次进入,则跟新有效期 entry.addThreadId(threadId); this.renewExpiration(); } }
renewExpiration
private void renewExpiration() { ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName()); if (ee != null) { //Timeout是一个定时任务 Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() { public void run(Timeout timeout) throws Exception { ExpirationEntry ent = (ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName()); if (ent != null) { Long threadId = ent.getFirstThreadId(); if (threadId != null) { //重置有效期 RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e); } else { if (res) { //然后调用自己,递归重置有效期 RedissonLock.this.renewExpiration(); } } }); } } } //internalLockLeaseTime是之前WatchDog默认有效期30秒,那这里就是 30 / 3 = 10秒之后,才会执行 }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); ee.setTimeout(task); } }
renewExpirationAsync
重点看lua脚本,先判断锁是不是自己的,然后更新有效时间
protected RFuture<Boolean> renewExpirationAsync(long threadId) { return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId)); }
- 那么之前的重置有效期的行为该怎么终止呢?当然是释放锁的时候会终止
- cancelExpirationRenewal
void cancelExpirationRenewal(Long threadId) { //将之前的线程终止掉 ExpirationEntry task = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName()); if (task != null) { if (threadId != null) { task.removeThreadId(threadId); } if (threadId == null || task.hasNoThreads()) { //获取之前的定时任务 Timeout timeout = task.getTimeout(); if (timeout != null) { //取消 timeout.cancel(); } EXPIRATION_RENEWAL_MAP.remove(this.getEntryName()); } } }
Redisson锁的MutiLock原理
- 为了提高Redis的可用性,我们会搭建集群或者主从,现在以主从为例
- 此时我们去写命令,写在主机上,主机会将数据同步给从机,但是假设主机还没来得及把数据写入到从机去的时候,主机宕机了
- 哨兵会发现主机宕机了,于是选举一个slave(从机)变成master(主机),而此时新的master(主机)上并没有锁的信息,那么其他线程就可以获取锁,又会引发安全问题
-
为了解决这个问题。Redisson提出来了MutiLock锁,使用这把锁的话,那我们就不用主从了,每个节点的地位都是一样的,都可以当做是主机,那我们就需要将加锁的逻辑写入到每一个主从节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获取锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性
-
我们先使用虚拟机额外搭建两个Redis节点
@Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient() { Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.137.130:6379") .setPassword("root"); return Redisson.create(config); } @Bean public RedissonClient redissonClient2() { Config config = new Config(); config.useSingleServer().setAddress("redis://92.168.137.131:6379") .setPassword("root"); return Redisson.create(config); } @Bean public RedissonClient redissonClient3() { Config config = new Config(); config.useSingleServer().setAddress("redis://92.168.137.132:6379") .setPassword("root"); return Redisson.create(config); } }
使用联锁,我们首先要注入三个RedissonClient对象
@Resource private RedissonClient redissonClient; @Resource private RedissonClient redissonClient2; @Resource private RedissonClient redissonClient3; private RLock lock; @BeforeEach void setUp() { RLock lock1 = redissonClient.getLock("lock"); RLock lock2 = redissonClient2.getLock("lock"); RLock lock3 = redissonClient3.getLock("lock"); lock = redissonClient.getMultiLock(lock1, lock2, lock3); } @Test void method1() { boolean success = lock.tryLock(); redissonClient.getMultiLock(); if (!success) { log.error("获取锁失败,1"); return; } try { log.info("获取锁成功"); method2(); } finally { log.info("释放锁,1"); lock.unlock(); } } void method2() { RLock lock = redissonClient.getLock("lock"); boolean success = lock.tryLock(); if (!success) { log.error("获取锁失败,2"); return; } try { log.info("获取锁成功,2"); } finally { log.info("释放锁,2"); lock.unlock(); } }
-
源码分析
- 当我们没有传入锁对象来创建联锁的时候,则会抛出一个异常,反之则将我们传入的可变参数锁对象封装成一个集合
public RedissonMultiLock(RLock... locks) { if (locks.length == 0) { throw new IllegalArgumentException("Lock objects are not defined"); } else { this.locks.addAll(Arrays.asList(locks)); } }
联锁的tryLock
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long newLeaseTime = -1L; //如果传入了释放时间 if (leaseTime != -1L) { //再判断一下是否有等待时间 if (waitTime == -1L) { //如果没传等待时间,不重试,则只获得一次 newLeaseTime = unit.toMillis(leaseTime); } else { //想要重试,耗时较久,万一释放时间小于等待时间,则会有问题,所以这里将等待时间乘以二 newLeaseTime = unit.toMillis(waitTime) * 2L; } } //获取当前时间 long time = System.currentTimeMillis(); //剩余等待时间 long remainTime = -1L; if (waitTime != -1L) { remainTime = unit.toMillis(waitTime); } //锁等待时间,与剩余等待时间一样 long lockWaitTime = this.calcLockWaitTime(remainTime); //锁失败的限制,源码返回是的0 int failedLocksLimit = this.failedLocksLimit(); //已经获取成功的锁 List<RLock> acquiredLocks = new ArrayList(this.locks.size()); //迭代器,用于遍历 ListIterator<RLock> iterator = this.locks.listIterator(); while(iterator.hasNext()) { RLock lock = (RLock)iterator.next(); boolean lockAcquired; try { //没有等待时间和释放时间,调用空参的tryLock if (waitTime == -1L && leaseTime == -1L) { lockAcquired = lock.tryLock(); } else { //否则调用带参的tryLock long awaitTime = Math.min(lockWaitTime, remainTime); lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS); } } catch (RedisResponseTimeoutException var21) { this.unlockInner(Arrays.asList(lock)); lockAcquired = false; } catch (Exception var22) { lockAcquired = false; } //判断获取锁是否成功 if (lockAcquired) { //成功则将锁放入成功锁的集合 acquiredLocks.add(lock); } else { //如果获取锁失败 //判断当前锁的数量,减去成功获取锁的数量,如果为0,则所有锁都成功获取,跳出循环 if (this.locks.size() - acquiredLocks.size() == this.failedLocksLimit()) { break; } //否则将拿到的锁都释放掉 if (failedLocksLimit == 0) { this.unlockInner(acquiredLocks); //如果等待时间为-1,则不想重试,直接返回false if (waitTime == -1L) { return false; } failedLocksLimit = this.failedLocksLimit(); //将已经拿到的锁都清空 acquiredLocks.clear(); //将迭代器往前迭代,相当于重置指针,放到第一个然后重试获取锁 while(iterator.hasPrevious()) { iterator.previous(); } } else { --failedLocksLimit; } } //如果剩余时间不为-1,很充足 if (remainTime != -1L) { //计算现在剩余时间 remainTime -= System.currentTimeMillis() - time; time = System.currentTimeMillis(); //如果剩余时间为负数,则获取锁超时了 if (remainTime <= 0L) { //将之前已经获取到的锁释放掉,并返回false this.unlockInner(acquiredLocks); //联锁成功的条件是:每一把锁都必须成功获取,一把锁失败,则都失败 return false; } } } //如果设置了锁的有效期 if (leaseTime != -1L) { List<RFuture<Boolean>> futures = new ArrayList(acquiredLocks.size()); //迭代器用于遍历已经获取成功的锁 Iterator var24 = acquiredLocks.iterator(); while(var24.hasNext()) { RLock rLock = (RLock)var24.next(); //设置每一把锁的有效期 RFuture<Boolean> future = ((RedissonLock)rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS); futures.add(future); } var24 = futures.iterator(); while(var24.hasNext()) { RFuture<Boolean> rFuture = (RFuture)var24.next(); rFuture.syncUninterruptibly(); } } //但如果没设置有效期,则会触发WatchDog机制,自动帮我们设置有效期,所以大多数情况下,我们不需要自己设置有效期 return true; }
标签:return,--,Redis,redis,线程,threadId,call,分布式 From: https://www.cnblogs.com/szhNJUPT/p/17533436.html