单实例的正确实现方式
获取锁的正确操作为: SET resource_name my_random_value NX PX 30000
,它限定了只有当锁空闲且持有锁的时间为30000ms,并且锁资源对应的 value 为一个随机值。设置随机值是为了在释放锁时,确保当前线程能够释放该锁,避免出现操作超时的线程释放了其它线程的锁。
释放锁时查询对比 value 和具体删除锁操作必须是一个原子操作,可以结合 LUA脚本实现,对应代码如下:
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
设置的 value 可以是任意随机值,最简单的就是 UNIX timestamp with microsecond precision
。
代码实例
参考:https://juejin.cn/post/7020803797211414558
// 锁接口,提供释放锁、尝试加锁、重复尝试加锁
public interface RedisLock {
long TIMEOUT_MILLIS = 30000;
int RETRY_MILLIS = 30000;
long SLEEP_MILLIS = 10;
boolean tryLock(String key);
boolean lock(String key);
boolean lock(String key, long expire);
boolean lock(String key, long expire, long retryTimes);
boolean unlock(String key);
}
// 简单抽象类
public abstract class AbstractRedisLock implements RedisLock{
@Override
public boolean lock(String key) {
return lock(key, TIMEOUT_MILLIS);
}
@Override
public boolean lock(String key, long expire) {
return lock(key, TIMEOUT_MILLIS, RETRY_MILLIS);
}
}
// 具体实现类
@Component
public class RedisLockImpl extends AbstractRedisLock {
private Logger logger = LoggerFactory.getLogger(RedisLockImpl.class);
@Autowired
private RedisTemplate<String, String> redisTemplate;
private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
private static final String UNLOCK_LUA; // Lua脚本保证释放锁操作原子性
static {
StringBuilder builder = new StringBuilder();
builder.append("if redis.call(\"get\",KEYS[1]) == ARGV[1]");
builder.append("then ");
builder.append(" return redis.call(\"del\",KEYS[1]) ");
builder.append("else ");
builder.append(" return 0 ");
builder.append("end ");
UNLOCK_LUA = builder.toString();
}
@Override
public boolean tryLock(String key) {
return tryLock(key, TIMEOUT_MILLIS);
}
// unblocking lock
public boolean tryLock(String key, long expire) {
try {
return !StringUtils.isEmpty(redisTemplate.execute((RedisCallback<String>) connection-> {
JedisCommands commands = (JedisCommands) connection.getNativeConnection();
String uuid = UUID.randomUUID().toString();
threadLocal.set(uuid);
return commands.set(key, uuid, SetParams.setParams().nx().px(expire));
}));
} catch (Exception e) {
logger.error("set redis occured an exception", e);
}
return false;
}
// blocking lock
@Override
public boolean lock(String key, long expire, long retryTimes) {
boolean result = tryLock(key, expire);
while (!result && retryTimes-- > 0) {
try {
logger.debug("lock failed, retrying {}......", retryTimes);
Thread.sleep(SLEEP_MILLIS);
} catch (InterruptedException e) {
e.printStackTrace();
}
result = tryLock(key, expire);
}
return result;
}
@Override
public boolean unlock(String key) {
try {
List<String> keys = Collections.singletonList(key);
List<String> args = Collections.singletonList(threadLocal.get());
Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {
Object nativeConnection = connection.getNativeConnection();
if (nativeConnection instanceof JedisCluster) {
return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
}
if (nativeConnection instanceof Jedis) {
return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
}
return 0L;
});
return result != null && result > 0;
} catch (Exception e) {
logger.error("unlock occurred an exception", e);
}
return false;
}
}
项目依赖的资源文件 pom.xml
,注意 redis.version
需要和 spring-data-redis
中的版本保持一致:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.15</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.demo</groupId>
<artifactId>distributed_lock</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>distributed_lock</name>
<description>distributed_lock</description>
<properties>
<java.version>1.8</java.version>
<redis.version>3.8.0</redis.version>
<spring-test.version>5.0.7</spring-test.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${redis.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
测试:
@SpringBootTest
@RunWith(SpringRunner.class)
class DistributedLockApplicationTests {
private Logger logger = LoggerFactory.getLogger(DistributedLockApplicationTests.class);
@Autowired
private RedisLock redisLock;
@Autowired
private StringRedisTemplate redisTemplate;
private ExecutorService executors = Executors.newScheduledThreadPool(8);
@Test
void contextLoads() {
}
@Test
public void lock() {
redisTemplate.opsForValue().set("goods-seckill", "10");
List<Future> futureList = new ArrayList<>();
// 100 threads
for (int i = 0; i < 100; i++) {
futureList.add(executors.submit(this::seckill));
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// waiting for result blocked
futureList.forEach(action -> {
try {
action.get();
} catch (Exception e) {
e.printStackTrace();
}
});
}
public int seckill() {
String key = "goods";
try {
redisLock.lock(key);
int num = Integer.valueOf(Objects.requireNonNull(redisTemplate.opsForValue().get("goods-seckill")));
if (num > 0) {
redisTemplate.opsForValue().set("goods-seckill", String.valueOf(String.valueOf(--num)));
logger.info("秒杀成功,剩余库存{}", num);
} else {
logger.error("秒杀失败,剩余库存{}", num);
}
return num;
} catch (NumberFormatException e) {
logger.error("seckill exception", e);
} finally {
redisLock.unlock(key);
}
return 0;
}
}
单实例实现起来方便,但是不具备实际可用性,实际使用往往是 redis 集群,所以考虑 Redlock
算法。
官方文档解释
为什么基于故障转移的实现是不够的?
实现主从的 Redis架构,但是仍不能保证在主从复制过程中锁资源的互斥性,存在以下情况:
- Client A acquires the lock in the master.
- The master crashes before the write to the key is transmitted to the replica.
- The replica gets promoted to master.
- Client B acquires the lock to the same resource A already holds a lock for. SAFETY VIOLATION!
Redlock 算法
在分布式版本中,我们假设我们有N个redis 的matser,各自独立,没有任何关系(不在一个cluster下),可以部署在不同的服务器或是虚拟机上,假设N设置成5。
某个客户端获取锁的操作:
- 客户端 A 获取当前时间戳;
- 客户端 A 尝试在所有的 N 个 master 中有序地获取锁,使用相同的 key、value;获取锁的过程由于需要遍历多个 redis 服务,可能导致阻塞,需要设置超时时间,假设锁自动释放的时间是 10s,那么这个超时时间可以设置在 5~50ms 的范围内,防止客户端在获取锁期间由于 redis 节点的崩溃导致获取锁的操作超时,如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁;
- 客户端 A 获取到锁,再获取当前的时间戳,和步骤1 的时间相减,得到获取锁消耗的时间。在锁有效期内,当且仅当客户端A拿到大部分(
N/2 + 1
,这里至少3)的锁时,分布式锁才可以被正式获取; - 每次当锁被获取到时(从每个master获取),有效时间可以被设置成初始有效时间减去获取锁消耗的时间;
- 假设步骤2中获取锁失败,没有拿到大部分锁,那么它需要把自己在少部分 redis 上拿到的锁释放掉。
失败重试
当客户端获取不到锁,它应该在之后一个随机时间点重试,这为了避免多个客户端尝试同时获取同一个资源(类似脑裂的情况,大概意思就是竞争了一堆,却发现,没人拿到锁),客户端拿到锁越快(早),脑裂的情况越小(或者重试的需要越小),所以理想情况下,客户端可以同时(多路复用)发送set命令给各个master。
释放锁
锁释放步骤很简单,就是把所有master实例上的锁释放,并不需要关心客户端在该实例上有没有成功得到锁。
Redisson 分析和使用
使用(一)、搭建多个 Redis Master
利用 docker 创建三个容器,使用 redis:5.0.7
镜像搭建多个 Redis-server,再利用端口映射实现各个 Redis 服务端 6379端口分别映射到主服务器 6379、6380、6381端口。
详细搭建步骤如下,这里以 6379的 redis服务搭建为例:
# 创建文件挂载目录
mkdir -p /opt/docker/redis/data
mkdir -p /opt/docker/redis/conf
# 配置文件也需要映射(注意修改配置文件配置才能进行远程连接)
cp redis.conf /opt/docker/redis/conf/redis.conf
# 创建容器启动 redis01-server
docker run -p 6379:6379 --name redis -v /opt/docker/redis/conf/redis.conf:/etc/redis/redis.conf -v /opt/docker/redis/data:/data -d redis:5.0.7 redis-server /etc/redis/redis.conf --appendonly yes
# 同理创建 redis02-server
docker run -p 6380:6379 --name redis02 -v /opt/docker/redis02/conf/redis.conf:/etc/redis/redis.conf -v /opt/docker/redis02/data:/data -d redis:5.0.7 redis-server /etc/redis/redis.conf --appendonly yes
# 创建 redis03-server
docker run -p 6381:6379 --name redis03 -v /opt/docker/redis03/conf/redis.conf:/etc/redis/redis.conf -v /opt/docker/redis03/data:/data -d redis:5.0.7 redis-server /etc/redis/redis.conf --appendonly yes
使用(二)、redisson使用
- 引入pom.xml
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.3.2</version>
</dependency>
- 获取多个服务端连接
@Test
public void testSentinel() {
// 创建到多个 master redis 的客户端连接
Config config1 = new Config();
config1.useSingleServer().setAddress("xxxxxxxx:6379").setPassword("xjx123456").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);
Config config2 = new Config();
config2.useSingleServer().setAddress("xxxxxxxx:6380").setPassword("xjx123456").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);
Config config3 = new Config();
config3.useSingleServer().setAddress("xxxxxxxx:6381").setPassword("xjx123456").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);
String resourceName = "REDLOCK_KEY";
RLock lock1 = redissonClient1.getLock(resourceName);
RLock lock2 = redissonClient2.getLock(resourceName);
RLock lock3 = redissonClient3.getLock(resourceName);
// 向三个实例拿锁
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
boolean isLock;
// 500ms 拿锁,拿到锁有效期10s
try {
isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
System.out.println("isLock = " + isLock);
if (isLock) {
System.out.println("成功获取到 RedLock 锁");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
redLock.unlock();
}
}
Redisson源码分析(一)、非公平锁获取锁
上述代码中 tryLock()
方法实际调用了 RedissonMultiLock
的 tryLock()
方法,其中 waitTime(获取锁的最长时间)、leaseTime(锁生效的最长时间)、unit(时间单位)
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
if (leaseTime != -1) {
newLeaseTime = waitTime*2;
}
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
int failedLocksLimit = failedLocksLimit();
List<RLock> lockedLocks = new ArrayList<RLock>(locks.size());
// 循环遍历每把锁的实例集合,获取 RLock 对象
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
// 对每一把锁都尝试获取锁,如果设置了获取锁过程的超时时间,则超时等待获取锁
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = unit.convert(remainTime, TimeUnit.MILLISECONDS);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, unit);
}
} catch (Exception e) {
lockAcquired = false;
}
// 获取成功,添加到 lockedLocks 集合中
if (lockAcquired) {
lockedLocks.add(lock);
} else {
// 如果不能保证获取至少 N/2 + 1 把锁,就结束
if (locks.size() - lockedLocks.size() == failedLocksLimit()) {
break;
}
// 获取锁失败超过了一定次数
if (failedLocksLimit == 0) {
unlockInner(lockedLocks);
if (waitTime == -1 && leaseTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
lockedLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
// 否则 failedLocksLimit 减一,切换获取下一把锁
failedLocksLimit--;
}
}
if (remainTime != -1) {
// 获取完每一把锁,用总超时时间减去该把锁获取的时间间隔,判断是否有效
remainTime -= (System.currentTimeMillis() - time);
time = System.currentTimeMillis();
// 获取超时,释放所有锁资源
if (remainTime <= 0) {
unlockInner(lockedLocks);
return false;
}
}
}
// 分布式锁有设置超时时间,针对每一把锁设置超时时间
if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(lockedLocks.size());
for (RLock rLock : lockedLocks) {
RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
return true;
}
REDLOCK
加锁逻辑如下:
REDLOCK
加锁实际上对应了RedissonMultiLock
的加锁方法;- 首先获取到当前系统时间,以毫秒为单位;
- 依次尝试从
N
个Master
实例中获取相同 hash-Key(比如LOCK_KEY
)和 inner-Key(UUID+ThreadId
)的锁资源。在针对每个Redis-Master
实例加锁时,设置一个超时时间并且这个超时时间需要小于锁的失效时间(比如总的获取锁时间为10s,每个Master获取锁的失效时间为5~50ms内)。这样可以避免服务器端获取锁资源时超时而导致整个过程阻塞; - 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
- 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(上一步)。
- 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功)。
上面分析了 REDLOCK
加锁算法的步骤,其中 RedissonLock
的 tryLock()
方法是实际单把锁的加锁逻辑:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
final long threadId = Thread.currentThread().getId();
// 通过 LUA 脚本的加锁过程,若成功获取锁则返回空;否则返回锁剩余的有效期时间
Long ttl = tryAcquire(leaseTime, unit, threadId);
// 成功获取到锁,返回
if (ttl == null) {
return true;
}
// 计算当前线程等待获取锁剩余有效时间
time -= (System.currentTimeMillis() - current);
// 小于有效时间,快速返回失败
if (time <= 0) {
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
// 订阅解锁 channel,返回一个 Future 对象
final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 超时阻塞地监听 Future 对象,如果有效期时间内没有收到,就取消订阅消息,获取锁失败
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (subscribeFuture.isSuccess()) {
unsubscribe(subscribeFuture, threadId);
}
}
});
}
acquireFailed(threadId);
return false;
}
// 直到获取锁成功或者获取锁超时失败,执行以下逻辑
try {
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// 自旋尝试获取锁资源
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// 使用信号量机制,保证一个客户端中多个线程竞争分布式锁场景下,只会有一个线程被唤醒去和集群争夺锁,这样避免了大量的无效请求。
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
// 接上部分
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
// 接上部分
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
// 锁生效时间不为 -1 时,详细的加锁逻辑
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
获取锁 redLock.tryLock()
的核心源码如下,其中默认的租约时间(leaseTime
)是 LOCK_EXPIRATION_INTERVAL_SECONDS
,即 30s。获取锁时会执行如下 LUA 指令:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 分布式key不存在,不存在就 hset创建(hset REDLOCK_KEY uuid+threadId 1),并且 pexpire 设置锁资源失效时间
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 分布式Key已经存在,value也能对应上,可重入,hincrby 添加计数,重置锁失效时间
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 已经有线程持有分布式锁,返回锁对应Key失效的毫秒数
"return redis.call('pttl', KEYS[1]);",
// 三个参数分别对应:
// REDLOCK_KEY ——> KEYS[1]
// leaseTime ——> ARGV[1]
// uuid+threaId ——> ARGV[2]
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
总的来说,获取锁的逻辑为:
- 总共奇数个
Redis_Master
实例,每个实例对应一个RedissonClient
,通过getLock()
方法获取到对应的RedissonLock
对象,所有对象存储在一个 ArrayList中; - 遍历每个
RedissonLock
对象,通过tryLock()
方法尝试获取锁,该方法底层通过LUA
指令实现。锁在 redis 中以hash
结构存储,哈希表名为REDLOCK_KEY
字符串、key
为uuid+threadId
、value
为整数值,记录锁重入的次数。 - 根据
tryLock()
返回的结果判断是否成功获取锁:- 若成功返回结果为空;
- 若失败返回锁剩余有效时间,并订阅释放锁的 channel,然后
await()
阻塞避免无效的空自旋。- 如果此时有其它线程释放锁,会向释放锁的 channel 发送消息,所有阻塞等待的客户端线程会被唤醒。争夺过程是基于 Semaphore 信号量机制,注意这里只会唤醒一个线程去和集群中其它节点争夺锁资源,因为在一个客户端进程中多个线程竞争分布式锁的场景下,没有必要唤醒所有线程去争夺锁,锁在同一时刻只会由一个线程持有。
- 如果解锁的消息没有成功发送到 channel 中(PubSub 是不可靠的,可能发布订阅失效了),阻塞等待超时后进入死循环不断获取锁。
Redisson源码分析(二)、非公平锁释放锁
释放锁参考 RedissonMultiLock
类的 unlock()
方法,该方法返回 RFuture
对象,然后异步的方式获取结果,如下:
public void unlock() {
List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>(locks.size());
for (RLock lock : locks) {
futures.add(lock.unlockAsync());
}
for (RFuture<Void> future : futures) {
future.syncUninterruptibly();
}
}
方法底层调用了 unlockAsnyc()
方法,如下:
public RFuture<Void> unlockAsync() {
long threadId = Thread.currentThread().getId();
return unlockAsync(threadId);
}
public RFuture<Void> unlockAsync(final long threadId) {
final RPromise<Void> result = newPromise();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
Boolean opStatus = future.getNow();
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
if (opStatus) {
cancelExpirationRenewal();
}
result.trySuccess(null);
}
});
return result;
}
底层释放锁资源的 LUA 指令:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 如果 Key 不存在,向 channel 发布一条消息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
// 判断锁是否当前线程持有,不为自己持有则不允许解锁
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 当前线程就是持有分布式锁的线程,重入计数减1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 减去后租约值仍然大于0,重新设置锁有效时间
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
// 否则删除Key,发布解锁消息
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
// 四个参数对应:
// KEYS[1]——>REDLOCK_KEY、KEYS[2]——>通道名称
// ARGV[1]——>解锁消息
// ARGV[2]——>锁生效时间
// ARGV[3]——>uuid+threadId
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
总的来说释放锁步骤如下:
- 遍历每一个
RedissonLock
对象,异步方式调用unlockAsync()
方法,该方法底层依赖于unlockInnerAsync()
的一系列LUA
指令; LUA
指令先判断 key 是否存在,不存在就向 channel 发布消息;- 若分布式锁存在,但是 value 值不匹配,说明锁已经被占用,返回;
- 若当前线程持有的就是分布式锁,重入计数减1;若减去后值仍然大于0,重新设置锁生效时间,否则删除对应的 key,发布解锁消息。
Redisson源码分析(三)、看门狗
如果业务执行时间比较长,可能会出现业务未执行完而分布式锁提前释放。针对这个问题,可以通过设置一个守护线程,定时检查锁的状态,如果锁快要过期了,就自动给锁续期。
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
// 执行 LUA 脚本抢锁
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
// 执行完后注册监听器
ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Boolean ttlRemaining = future.getNow();
// 如果加锁成功,开启守护线程定时给锁续期
if (ttlRemaining) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
// 给锁续期方法
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
// 默认每 10 秒进行一次续期
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
在执行完抢锁 LUA 脚本后,会返回一个 Future
对象,并给该对象注册监听器。监听器会检查是否成功获取锁,若成功获取锁,开启看门狗线程定时给锁续期。
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end;
return 0;
Redisson源码分析(三)、公平锁获取锁
Redisson源码分析(四)、公平锁释放锁
参考
https://juejin.cn/post/7168802584684134413
https://juejin.cn/post/6961380552519712798
https://www.zhihu.com/question/456788102
标签:lock,return,Redis,redis,long,获取,threadId,分布式 From: https://www.cnblogs.com/istitches/p/18118042