Redis 实现分布式锁
JVM 层面的加锁 (synchronized, ReentraLock) 单机版的锁
分布式微服务架构中,为了避免各个微服务之间发生冲突和数据故障从而引入一种锁 -- 分布式锁
愿景: 保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行
基本命令
-
SETNX
- 当且仅当 Key 不存在时,则可以设置,否则不做任何动作。
- 当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0
-
SETEX
- 基于SETNX功能外,还可以设置超时时间,防止死锁。(若Key 已经存在会覆盖掉之前的过期时间和value)
127.0.0.1:6379> setnx k1 v1
(integer) 1
127.0.0.1:6379> keys *
1) "k1"
127.0.0.1:6379> setnx k1 v1
(integer) 0
127.0.0.1:6379> keys *
1) "k1"
127.0.0.1:6379> setex k1 60 v1
OK
127.0.0.1:6379> keys *
1) "k1"
127.0.0.1:6379> ttl k1
(integer) 49
基于 SETNX 和 EXPIRE 实现
-
setnx(lockkey, 1) 如果返回 0,则说明占位失败;如果返回 1,则说明占位成功
-
expire() 命令对 lockkey 设置超时时间,为的是避免死锁问题。
-
执行完业务代码后,可以通过 delete 命令删除 key。
@Autowired
private RedisTemplate<String,String> redisTemplate;
private final String REDIS_LOCK = "key";
@GetMapping("/buy_goods")
public void buyGoods() {
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
Boolean flag = redisTemplate.opsForValue().setIfAbsent(KEY, value); // setnx
// 设置过期时间
redisTemplate.boundValueOps(KEY).expire(60, TimeUnit.SECONDS);
if (!flag) {
throw new RuntimeException("加锁失败");
}
/*
业务代码
*/
// 解锁
redisTemplate.delete(KEY);
}
这便是一个简陋的分布式锁方案,但是还有很多地方需要完善,比如说加锁成功,设置过期时间失败了就有可能导致死锁等。
基于 SETEX 实现
@Autowired
private RedisTemplate<String,String> redisTemplate;
private final String REDIS_LOCK = "key";
@GetMapping("/buy_goods")
public void buyGoods() {
// 值唯一
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try {
// 保障加锁与设置过期时间的操作是一个原子操作 setex
Boolean flag = redisTemplate.opsForValue().setIfAbsent(REDIS_LOCK,value,60,TimeUnit.SECONDS);
if (!flag) {
throw new RuntimeException("加锁失败");
}
/*
业务代码
......
*/
} finally {
// 解锁, 在finally块中,保障一定解锁
/*
解锁前先判断该线程的锁是否存在
避免锁不存在或者 删除错锁
*/
// if (value.equalsIgnoreCase(redisTemplate.opsForValue().get(REDIS_LOCK))) {
// redisTemplate.delete(REDIS_LOCK);
// }
// 以上解锁操作中的 判断锁是否存在与删除操作不是原子操作, 使用lua脚本解决(官网建议)
String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
"then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
try {
Object o = jedis.eval(script, Collections.singletonList(REDIS_LOCK),
Collections.singletonList(value));
if("1".equals(o.toString())) {
System.out.println("---del redis lock ok.");
}else {
System.out.println("---del redis lock error.");
}
}finally {
if(jedis != null)
jedis.close();
}
}
}
存在业务超时情况,业务未执行完,锁过期了(业务超时,锁没有续期)
基于 Redisson 实现
确保RedisLock过期时间大于业务执行时间,自动续期(官方推荐使用)
了解更多 Redisson
依赖:
<dependency>
<groupId>org.redisso</groupId>
<artifactId>redisson</artifactId>
<version>3.9.1</version>
</dependency>
Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定
了解更多Redisson实现分布式锁
配置:
@Configuration
public class RedisConfig {
@Bean
public Redisson redisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);
// 设置看门狗检查锁的时间
// config.setLockWatchdogTimeout(30);
return (Redisson)Redisson.create(config);
}
}
代码实现:
最常见的使用方法
@Autowired
private Redisson redisson;
private final String REDIS_LOCK = "key";
@GetMapping("/buy_goods")
public void buyGoods() {
// 值唯一
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
// 加锁
RLock lock = redisson.getLock(REDIS_LOCK);
/*
阻塞式等待 默认加锁30s
锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s,不用担心业务时间长,锁过期被删掉
*/
lock.lock();
// 尝试加锁(非阻塞式),最多等待100秒,上锁以后10秒自动解锁
// boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
// lock.lock(10,TimeUnit.SECONDS); // 加锁后10s自动解锁,自动解锁时间一定要大于业务时间,不然会报错
// 问题: 10s 后业务未执行完不会自动续期
// lock() 方法:
// 1. 如果传递了锁的超时时间,就发送给redis执行脚本占锁,默认超时就是我们指定的时间
// 2. 如果未指定锁的超时时间,就使用看门狗的默认时间,只要占锁成功,就会开启一个定时任务,重新给锁设置过期时间,新的过期时间就是看门狗的默认时间
// 默认情况每隔 10 s续期一次,也就是【看门狗时间 / 3】秒。每次续期都续满到30s
try {
/*
业务代码
......
*/
} finally {
/*
避免异常
IllegalMonitorStateException: attempt to unlock lock,
not loked by current thread by node id:da6385f-81a5-4e6c-b8c0
*/
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
基于 RedLock 的实现
Redlock 是 Redis 的作者给出的集群模式的 Redis 分布式锁,它基于 N 个完全独立的 Redis 节点(通常情况下 N 可以设置成 5)
有效防止单点故障
RedLock 算法的基本步骤:
- 客户端获取当前时间,以毫秒为单位
- 客户端尝试获取 N 个节点的锁,N 个节点以相同的 key 和 value 获取锁。客户端需要设置接口访问超时,接口超时时间需要远远小于锁超时时间,比如:TTL为5s,设置获取锁最多用1s,所以如果一秒内无法获取锁,就放弃获取这个锁,从而尝试获取下个锁
- 客户端计算在获得锁的时候花费了多少时间,用当前时间减去在步骤一获取的时间,这个时间差要小于TTL时间并且至少有3个redis实例成功获取锁,才算真正的获取锁成功
- 客户端获取到锁的有效时间为设置的锁超时时间减去步骤三计算出的获取锁花费时间
- 如果客户端获取锁失败了,客户端会依次删除所有的锁。 使用 Redlock 算法,可以保证在挂掉最多 2 个节点的时候,分布式锁服务仍然能工作
RedLock释放锁
由于释放锁时会判断这个锁的value是不是自己设置的,如果是才删除;所以在释放锁时非常简单,只要向所有实例都发出释放锁的命令,不用考虑能否成功释放锁
RedLock注意点(Safety arguments)
1.对于以N/2+ 1(也就是一半以 上)的方式判断获取锁成功,是因为如果小于一半判断为成功的话,有可能出现多个client都成功获取锁的情况, 从而使锁失效
3.一个client锁定大多数事例耗费的时间大于或接近锁的过期时间,就认为锁无效,并且解锁这个redis实例(不执行业务) ;只要在TTL时间内成功获取一半以上的锁便是有效锁;否则无效
缺点
失效时间设置多长时间为好?如何设置的失效时间太短,方法没等执行完,锁就自动释放了,那么就会产生并发问题。如果设置的时间太长,其他获取锁的线程就可能要平白的多等一段时间。
标签:加锁,实现,lock,Redis,value,获取,时间,超时,分布式 From: https://www.cnblogs.com/allure-xiaoxin/p/16796136.html