开启掘金成长之旅!这是我参与「掘金日新计划 · 2 月更文挑战」的第 天,点击查看活动详情
前言:
日常开发中,我们经常会使用到锁,以保证某一段逻辑是线程安全的,同步的。 但是当今一般都是同一个服务部署到多台机器上,在这种情况下,如果用java中的锁,将只能保证在某台机器上的线程安全,而不能保证真正意义上的线程安全,那么此时分布式锁就上场了。
下边我们将以层层递进的方式,看看怎么用redis实现 相对完美的 分布式锁!
- 多说一句:一把合格的分布式锁,至少应该保证以下特性:
1:互斥性 (保证锁内代码逻辑同步)
2:超时自动释放(防止死锁,占用资源)
3:安全性(不能删除非自己加的锁)
4:原子(放值和取值时保证原子)
5:高可用,高性能
6:支持可重入
7:支持自动续期
复制代码
1. setnx + expire 实现一个简陋的锁
setnx 命令解释:如果数据库不存在给定的key, 则设置成功返回true,如果存在则设置不成功,返回false,
复制代码
值的注意的是在一些极端情况下(比如锁中代码执行时间过长,或者没有成功删除锁对应的key,此时将会一直占用资源(也就是死锁),其他线程获取不到锁,会出现很严重的问题)?所以我们给这个key加个过期时间以便给这个key加一个”约束”,使得资源及时释放,伪代码如下:
if ( setnx( key1, value1)==1 ){
expire(key1);
try {
//TODO 执行业务代码...
} finally {
del(key);
}
}else{
//未获取到锁
}
复制代码
1.1 setnx与expire非原子问题以及解决
but这样存在一个问题(因为setnx和expire不是原子操作),也就代表,在某一时刻,可能setx执行了,但是expire没执行。此时就又有可能出现死锁问题。在业界用的最多的一种就是使用lua脚本解决该问题,即通过lua将setnx和expire两个命令变成原子操作。如下:
if (redis.call('setnx',KEYS[1],ARGV[1]) < 1) then return 0; end; redis.call('expire',KEYS[1],tonumber(ARGV[2])); return 1;" 1 key value 100
复制代码
从上脚本可以看出,调用setnx如果设置成功则紧接着调用expire命令设置过期时间。
1.2 B锁 被A删情况以及解决
Ok,上边的lua脚本是保证了原子,但是删除时候又出现问题了。
- 考虑如下场景:
线程A 抢到锁,执行业务逻辑,但是超过了锁的过期时间(比如:10秒)也没执行完,此时由于到了过期时间,所以redis中会删除该key,此时线程B恰好过来抢到了锁,在执行过程中,线程A 执行完了,此时就会进入finally中执行del(key),于是线程A把线程B加的锁给删除了。。。
实时上,A删B锁这种场景不仅是加锁解锁逻辑的混乱,还会导致并发,即线程A删除B加的锁后,设想此时线程C过来并且抢到锁,那么线程c也会执行锁中代码快(假设此时线程B还没结束),也就是说有线程B 线程C并发执行锁中代码,此时锁已经不起作用,形同虚设。。。(士可忍孰不可忍)
为了避免这种严重问题,我们的解决方式是:在某个线程去设置锁的时候,value给一个uuid,当删除key时候,我们先get出来然后和设置时候比对一下,如果等于那就说明是当前线程设置的锁,执行del,否则就说明不是,则不进行del,从而避免A锁B删这种现象发生。
but此时又有个问题,就是解锁时候get操作和del操作不是原子的,那么我们还可以使用lua来解决这个问题,如下:
if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end
复制代码
ok到这里是不是感觉没问题了?答案是:no!
在实际场景中,我们可能对分布式锁有其他的要求,比如一些特殊场景下需要支持:
-
可重入:
-
自动续期:
等特性
,我们这里不再班门弄斧,直接来看下业内大拿:Redisson是怎么做的。
2. redisson锁实现
作为redis客户端框架 redisson不止提供了开箱即用的分布式锁api,还包括其他很多特性这里不再展开,更多请关注:redisson官网 ,我们只是关注redisson是如何实现的分布式锁的。
首先我们要知道的是 redisson实现的分布式锁api,基本上是一把合格的锁,他保证了我们开篇说的那几个特性,如下:
1:互斥性 (保证锁内代码逻辑同步)
2:超时自动释放(防止死锁,占用资源)
3:安全性(不能删除非自己加的锁)
4:原子(放值和取值时保证原子)
5:高可用,高性能
6:支持可重入
7:支持自动续期
复制代码
要想分析redisson如何实现,那必然离不了源码(注意:这里我们只是简单过一遍加锁
,解锁
,重入性
,自动续期
等实现逻辑,其他略过)
2.1 加锁
废话不多说,我们直接来到加锁脚本这,如下:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " + //先检查下给定的key 存不存在,不存在进入下边分支逻辑
//如果不存在,那么往redis写入一个hash结构的数据(hincrby命令如果检测到没有key时,会写入),
//其中key是调用者传入的key,value是map类型,其中key是客户端id(看源码的话知道他是一个uuid每一个连接都保持唯一):threid ,
//value就是某个线程的获取锁的次数(这个操作是实现可重入的保障)
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +//给刚刚设置的key 设置过期时间
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + //如果给定的key存在,
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +//将该key中对应的value值的value值 +1 操作,以记录某个key下 某个线程的重入次数
"redis.call('pexpire', KEYS[1], ARGV[1]); " +//重入后,更新过期时间为给定值
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);", //没获取到锁,则返回锁的剩余时间
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
protected String getLockName(long threadId) {
return id + ":" + threadId;//这里的id是CommandAsyncExecutor实现类中的id,本质上和某一条连接一一对应,值是uuid
}
复制代码
读完上边脚本代码和注释,我们知道redisson
- 通过lua脚本来保证多命令原子性,
- 通过hash结构来存储锁信息
锁 结构如下:
{ 调用方给定的key : {redisson客户端id(是个uuid,每个连接唯一)+获取到锁的线程id : 该线程获取到的锁次数 } }
复制代码
- 通过判断当前线程是否持有锁,持有的话 value值+1,来记录某个线程获取锁的次数,从而间接实现了
重入
的特性
一张图直观看下加锁流程:
2.2 解锁
废话不多说直接上代码
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +//不存在key 则直接返回,不做操作
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +//否则根据key和 当前连接id+当前线程id 将value值减一
"if (counter > 0) then " +//如果减一后value大于0 说明该线程重入过,
"redis.call('pexpire', KEYS[1], ARGV[2]); " +//将该线程持有的锁的过期时间更新
"return 0; " +
"else " +//counter不大于0,则删除key 释放锁
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +//发布事件,广播给其他订阅该事件的线程,通知他们,”可以尝试抢锁啦!“
"return 1; " +//返回释放锁成功
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
复制代码
从上边可以看到:
- 解锁时,如果重入过n次,那么就需要释放n次,直到value值为0 才真正删除 key。
- 另外 在删除时候,我们可以看到先判断存在与否,而这里的判断是根据给定key和连接id+线程id来的,也就是说,不会发生B锁A删的情况出现。
- 同时在删除锁后,发布事件,告知其他等待线程可以抢锁啦!
2.3 自动续期
我们来到 tryAcquireAsync 方法,如下:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
//加锁成功
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
//可以看到如果leaseTime == -1 (也就是没有传leaseTime参数)那么就会自动续期
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
续期代码:
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +//该线程持有锁,
"redis.call('pexpire', KEYS[1], ARGV[1]); " +// 重置过期时间为 internalLockLeaseTime 即 30秒(redisson代码中默认的,但可以配置)
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
复制代码
自动续期内部源码不在展开,我们简单介绍下主要流程:
只要线程加锁成功
并且 没传leaseTime值(leaseTime我喜欢叫它租约时长,这个参数很重要!!!,在使用时候一定要注意,如果调用tryLock时,指定了租约时长,那么redisson看门狗机制将不起作用)
此时该值会默认为 -1 ,就会启动一个timer线程(也有人称为watch dog看门狗机制),它是一个定时任务(基于netty的Timer类实现的),会每隔10(internalLockLeaseTime(默认为30)
/3 =10 )秒检查一下,如果线程A还持有锁,那么就会不断的延长锁key的生存时间。从而实现了 自动续期。
注意:一旦你开启了自动续期(watch dog机制),那么一定要合理控制锁中逻辑的执行时间,避免执行时间过长(事实上,大多数的逻辑我们都应该尽可能的快速执行完毕)。
ok到这里我们用一张图,来直观看下redisson整体的一个流程:
2.4 集群情况下存在的问题以及RedLock
到这redisson我们就说完了,但是有人会说,使用redisson这个分布式锁肯定就能保证完美无瑕吗?
- 单机版 redisson 已经 yyds ,很完美了至少我个人觉得
- 集群 redisson 还是有问题,锁并不一定安全,不能真正保证互斥 考虑如下场景:
如果线程A在Redis的master节点上拿到了锁,但是加锁的key还没同步到slave节点。恰好这时,
master节点发生故障,一个slave节点就会升级为master节点。线程B就可以获取同个key的锁啦,但线程A
也已经拿到锁了,此时两个不同线程都拿到锁,并发执行锁中代码逻辑,锁的安全性,互斥性也就荡然无存了!
复制代码
基于上述集群存在的问题,Redis作者 antirez
大佬提出一种高级的分布式锁算法:Redlock。核心思想如下:
不能只在一个redis实例上创建锁,应该是在多个redis实例上创建锁(
意味着那就要部署多个master
),并且必须在 (n/2)+1 个master节点上都成功创建锁,才能算这个整体的RedLock加锁成功,避免说仅仅在一个redis实例上加锁,然后同步到slave时带来的问题。
而redisson是实现了RedLock了的,其实现也很简单,即遍历所有的Redis客户端,然后依次加锁(也得看redis集群是什么模式的 哨兵应该是不支持RedLock的,因为slave不能由客户端直接写),最后统计成功的次数来判断是否加锁成功。
public class RedissonMultiLock implements RLock
复制代码
该类RedissonMultiLock
即是RedLock的实现。具体如何使用以及有哪些坑我们不去展开,(ps:个人感觉RedLock的思想有点投入与产出不成正比,如果业务一定保证稳定性,互斥,安全,且并发不算太高时候,也可以用zk实现的分布式锁,没必要这么费时费力,还得搭建多个master)
到此redis分布式锁就写完了,我们一般工作中也不造轮子,直接用redisson就好了,但是一定要去了解里边的实现,否则很容易踩坑呀!
标签:return,KEYS,Redis,redis,线程,key,call,搞懂,分布式 From: https://blog.51cto.com/u_15992236/6101322