SpringBoot集成Redisson实战案例
maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
分布式锁接口
public interface DistributedLocker {
/**
* 锁名称前缀
*/
String LOCK_KEY_PREFIX_KEY = "distributed:keys:";
/**
* 可重入锁
*
* @param key
* 锁名称
* @return
* 锁对象
*/
RLock lock(String key);
/**
* 公平锁
*
* @param key
* 锁名称
* @return
* 锁对象
*/
RLock fairLock(String key);
/**
* 读写锁
*
* @param key
* 锁名称
* @return
* 锁对象
*/
RReadWriteLock readWriteLock(String key);
/**
* 在锁内执行业务
*
* @param key
* 锁名称
* @param supplier
* 具体业务逻辑
* @param <T>
* @return
* 操作结果
*/
<T> T lock(String key, Supplier<T> supplier);
/**
* 可重入锁(设置锁的持有时间)
*
* @param key
* 锁名称
* @param leaseTime
* 上锁后, leaseTime 后自动解锁
* @param unit
* 单位
* @param supplier
* 业务执行结果
* @return
* 锁对象
* @param <T>
*/
<T> T lock(String key, int leaseTime, TimeUnit unit, Supplier<T> supplier);
/**
* 尝试获取锁
*
* @param key
* 锁名称
* @param supplier
* 具体业务逻辑
* @return
* 业务执行结果
* @param <T>
*/
<T> T tryLock(String key, Supplier<T> supplier);
/**
* 尝试获取锁(为加锁等待 waitTime unit时间)
*
* @param key
* 锁名称
* @param waitTime
* 获取锁最长等待时间
* @param unit
* 单位
* @param supplier
* 具体业务逻辑
* @return
* 业务执行结果
* @param <T>
* @throws InterruptedException ex
*/
<T> T tryLock(String key, long waitTime, TimeUnit unit, Supplier<T> supplier)
throws InterruptedException;
/**
* 尝试获取锁(为加锁等待 waitTime unit时间,并在加锁成功 leaseTime unit 后自动解开)
*
* @param key
* 锁名称
* @param waitTime
* 获取锁最长等待时间
* @param leaseTime
* 上锁后, leaseTime 后自动解锁
* @param unit
* 单位
* @param supplier
* 具体业务逻辑
* @return
* 业务执行结果
* @param <T>
* @throws InterruptedException ex
*/
<T> T trysLock(String key, long waitTime, int leaseTime, TimeUnit unit, Supplier<T> supplier)
throws InterruptedException;
/**
* 解锁
*
* @param lock
* 锁对象
*/
default void unlock(RLock lock) {
if (lock != null && lock.isLocked()) {
lock.unlock();
}
}
}
分布式锁实现
@Component
public class RedissonLocker implements DistributedLocker {
@Autowired
private RedissonClient redissonClient;
@Override
public RLock lock(String key) {
return redissonClient.getLock(key);
}
@Override
public RLock fairLock(String key) {
return redissonClient.getFairLock(key);
}
@Override
public RReadWriteLock readWriteLock(String key) {
return redissonClient.getReadWriteLock(key);
}
@Override
public <T> T lock(String key, Supplier<T> supplier) {
RLock lock = lock(key);
try {
lock.lock();
return supplier.get();
} finally {
if (lock != null && lock.isLocked()) {
lock.unlock();
}
}
}
@Override
public <T> T lock(String key, int leaseTime, TimeUnit unit, Supplier<T> supplier) {
RLock lock = this.lock(key);
try {
lock.lock(leaseTime, unit);
return supplier.get();
} finally {
unlock(lock);
}
}
@Override
public <T> T tryLock(String key, Supplier<T> supplier) {
RLock lock = this.lock(key);
boolean tryLock = false;
try {
tryLock = lock.tryLock();
if (tryLock) {
return supplier.get();
}
} finally {
if (tryLock) {
unlock(lock);
}
}
return null;
}
@Override
public <T> T tryLock(String key, long waitTime, TimeUnit unit, Supplier<T> supplier)
throws InterruptedException {
RLock lock = this.lock(key);
boolean tryLock = false;
try {
tryLock = lock.tryLock(waitTime, unit);
if (tryLock) {
return supplier.get();
}
} finally {
if (tryLock) {
unlock(lock);
}
}
return null;
}
@Override
public <T> T trysLock(String key, long waitTime, int leaseTime, TimeUnit unit, Supplier<T> supplier)
throws InterruptedException {
RLock lock = this.lock(key);
boolean tryLock = false;
try {
tryLock = lock.tryLock(waitTime, leaseTime, unit);
if (tryLock) {
return supplier.get();
}
} finally {
if (tryLock) {
unlock(lock);
}
}
return null;
}
}
Redisson 客户端配置
@Configuration
public class RedissonConfig {
@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient(RedisProperties redisProperties) {
Config config = new Config();
// 监控锁的看门狗超时(默认30秒),单位:毫秒
config.setLockWatchdogTimeout(60000);
SingleServerConfig singleServerConfig =
config
// 单机环境, 集群环境(useClusterServers())
.useSingleServer()
.setAddress(
String.format("redis://%s:%d", redisProperties.getHost(), redisProperties.getPort())
)
.setDatabase(redisProperties.getDatabase());
if (StringUtils.isNotEmpty(redisProperties.getPassword())) {
singleServerConfig.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}
}
接口验证
@RestController
@Slf4j
public class AppController {
@Autowired
private DistributedLocker distributedLocker;
@GetMapping("/distributed")
public Response distributed() throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
final int index = i;
executorService.submit(() -> {
RLock lock = distributedLocker.lock("task");
lock.lock();
try {
TimeUnit.SECONDS.sleep(2);
log.info("成功执行第{}个任务", index);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock != null && lock.isLocked()) {
lock.unlock();
}
}
});
}
// 20秒后如果获取锁则返回业务逻辑执行结果, 否则返回null
Boolean task = distributedLocker.tryLock("task", 20, TimeUnit.SECONDS, ()-> {
// 如果10秒内没获取到锁,不会执行这里面的逻辑(以下日志不会打印)
log.info("获取到锁并执行业务逻辑");
return true;
});
return Response.success(task);
}
}
日志输出
2020-11-20 18:41:58.246 获取到锁并执行业务逻辑
2020-11-20 18:42:00.266 成功执行第0个任务
2020-11-20 18:42:02.283 成功执行第6个任务
2020-11-20 18:42:04.289 成功执行第18个任务
2020-11-20 18:42:06.301 成功执行第2个任务
2020-11-20 18:42:08.315 成功执行第14个任务
2020-11-20 18:42:10.327 成功执行第17个任务
2020-11-20 18:42:12.340 成功执行第9个任务
2020-11-20 18:42:14.351 成功执行第10个任务
2020-11-20 18:42:16.361 成功执行第16个任务
2020-11-20 18:42:18.373 成功执行第4个任务
2020-11-20 18:42:20.376 成功执行第19个任务
2020-11-20 18:42:22.388 成功执行第7个任务
2020-11-20 18:42:24.400 成功执行第15个任务
2020-11-20 18:42:26.412 成功执行第12个任务
2020-11-20 18:42:28.423 成功执行第8个任务
2020-11-20 18:42:30.434 成功执行第13个任务
2020-11-20 18:42:32.442 成功执行第5个任务
2020-11-20 18:42:34.452 成功执行第3个任务
2020-11-20 18:42:36.455 成功执行第1个任务
2020-11-20 18:42:38.461 成功执行第11个任务
Redisson 客户端配置方式
从json文件中加载
@Bean(destroyMethod="shutdown")
RedissonClient redisson(@Value("classpath:/redisson.json") Resource configFile) throws IOException {
Config config = Config.fromJSON(configFile.getInputStream());
return Redisson.create(config);
}
从yaml文件中加载
@Bean(destroyMethod="shutdown")
RedissonClient redisson(@Value("classpath:/redisson.yaml") Resource configFile) throws IOException {
Config config = Config.fromYAML(configFile.getInputStream());
return Redisson.create(config);
}
配置文件具体格式要求或模板详见: redisson-wiki-2.-配置方法
问题描述
Redis锁的过期时间小于业务的执行时间该如何续期?
即:分布式锁续命问题
- 默认情况下,加锁的时间是30秒(可通过config.setLockWatchdogTimeout修改)。如果加锁的业务没有执行完,那么到
30*2/3 = 20秒
的时候,就会进行一次续期,把锁重置成30秒。那这个时候可能又有同学问了,那业务的机器万一宕机了呢?宕机了定时任务跑不了,就续不了期,那自然30秒之后锁就解开了呗。 因此也不能将锁的过期时间设置的太长,否则迭机情况下,其他节点服务将长时间获取不到锁。
分布式锁续命问题
: 就是每隔一段时间判断当前线程的锁是否存在, 如果存在则重置过期时间(给锁续了一条命!)
/** 具体代码详见: Redisson 客户端配置 */
Config config = new Config();
// 监控锁的看门狗超时,单位:毫秒, 此时到 60 * 2/3 = 40秒时会进行一次续期
config.setLockWatchdogTimeout(60 * 1000);
官方文档