1 序言
近期遇到一个问题:
- 外部查询缓存了InfluxDB中物联网数据表的字段信息元数据的本地缓存(基于Google Guava Cache、及其RefreshAfterWrite(seconds, TimeUnit.SECOND))的Web接口
为什么会缓存 Influxdb的字段信息呢?因为字段信息非常多,多到每次查询时需要花费1.5-2分钟(80秒以上)
- 但总会偶尔几次查询超时失败————原因是:Guava的RefreshAfterWrite并不是最初预期【主动刷新(自动刷新)】,而是【懒式刷新(需等待新请求来时,才清理过期数据,并重新从源头load数据到缓存中)】
那么,怎么办呢?
1、我想了个方案:
共享缓存
(基于Redis;放弃原来的本地缓存)分布式锁
(基于RedisLock;防止同一工程的多个Pod实例反复执行,浪费Influxdb的集群资源、及影响Influxdb的正常运行)定时调度
(基于Java自带的ScheduledExecutorService[默认实现类:ScheduledThreadPoolExecutor])
看到这里应该知道,本文的主角登场了————分布锁
。博主的实现思路如下:
- Redis(利用好
setnx {key} {value} {expirationMilliseconds}
的原子特性)
版本:
redis version:5.0.14
- RedisTemplate(
spring-data-redis
框架封装的 Redis Java 客户端)
版本:
spring-data-redis:2.3.9-RELEASE
关于分布式锁的概念、基础部分,参见: [系统设计] 分布式系统 (1) 分布式锁 - 博客园/千千寰宇
2 源码实现
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.util.ObjectUtils;
import java.util.concurrent.TimeUnit;
/**
* @author johnny-zen
* @version v1.0
* @description Redis 分布式锁
* @refrence-doc
* [1] RedisTemplate分布式锁 - python100 - https://www.python100.com/html/96639.html
* [2] 美团面试:分布式锁实现方案,你选哪种? - weixin - https://mp.weixin.qq.com/s/2XSXEoDppnZApFyspXTXag
* @gpt-promt
*/
public class RedisLock {
private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);
private RedisTemplate redisTemplate;
private String lockKey;
/**
* key锁对应的值
*/
private String lockValue;
private String threadId;
/**
* 锁的过期时间 | 单位:毫秒
*/
private long keepMills;
private boolean locked = false;
public RedisLock(RedisTemplate redisTemplate, String lockKey,String lockValue, long keepMills) {
this.redisTemplate = redisTemplate;
this.lockKey = lockKey;
this.lockValue = lockValue;
this.keepMills = keepMills;
this.threadId = String.valueOf(Thread.currentThread().getId());
}
/**
* 加锁操作
* @param enableLockAutoExpire 是否启用锁自动过期删除(删除 即 解锁)
* @return
*/
public boolean lock(boolean enableLockAutoExpire) {
return enableLockAutoExpire?lockSupportLockAutoExpire():lockNotSupportLockAutoExpire();
}
public boolean lock(){
return lock(true);
}
/**
* 没有过期时间的加锁操作
* @return
*/
public boolean lockNotSupportLockAutoExpire() {
// 尝试加锁;若获取锁成功,则设置locked为true,返回true
// 注: setIfAbsent(key, value, keepMills, TimeUnit.MICROSECONDS) => setnx {key=lockKey}
if( redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue) ) {
locked = true;
return true;
}
return false;
}
/**
* 加锁操作支持锁自动过期
* @description
* 关键原理:
* 1) 必须基于 `set {key} {value} NX PX {milliseconds}`
* `NX` : 只有 key 不存在时才设置 K-V
* @return
*/
public boolean lockSupportLockAutoExpire() {
//String expiredStr = String.valueOf((Long)(System.currentTimeMillis() + keepMills + 1));
//lockValue = expiredStr;
// 尝试加锁;若获取锁成功,则设置locked为true,返回true
// 注: setIfAbsent(key, value, keepMills, TimeUnit.MICROSECONDS) => setnx {key=lockKey} {value=expiredStr} {expiration=keepMills}
if( redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, keepMills, TimeUnit.MICROSECONDS) ) {
locked = true;
return true;
}
//[错误示范]不能先设置值,再设置过期时间 | 原因: 分为2步骤,非不具备原子性,有可能在设置过期时间之前宕机,会造成死锁(key永久存在)
//if (redisTemplate.opsForValue().setIfAbsent(lockKey, expiredStr)) {//
// locked = true;
// if(enableLockAutoExpire){
// redisTemplate.expire(lockKey, keepMills, TimeUnit.MICROSECONDS);
// }
// return true;
//}
return false;
}
/**
* 解锁操作
*/
public void unlock() {
if (locked) {
redisTemplate.delete(lockKey);
locked = false;
}
}
}
3 UseDemo
/**
* 刷新缓存数据
*/
public void refreshCacheDataset(){
cacheServicesCollection.entrySet().stream().forEach( serviceConfigEntry -> {
String serviceDatasetCacheKeyId = serviceConfigEntry.getKey();// 即 CacheServiceKey
//step3.1 定时刷新时,加锁 | 定时刷新缓存信息的分布式锁 : redisLock
String lockKey = String.format(serviceDatasetCacheTaskLockKeyTemplate, serviceDatasetCacheKeyId);
String lockValue = String.valueOf(System.currentTimeMillis());
Long keepMills = refreshAfterWrite*1000L;
RedisLock redisLock = new RedisLock(redisTemplate, lockKey, lockValue, keepMills);
boolean locked = redisLock.lock(true);//加锁 (程序不主动解锁,通过过期时间被动/自动解锁)
if(!locked){//未获得锁 => 取消本次任务
logger.debug("Fail and cancel current dataset's service cache task because that fail to get the task lock!serviceDatasetCacheKeyId:{}", serviceDatasetCacheKeyId);
} else {
logger.debug("Success to get task lock!serviceDatasetCacheKeyId:{},keepMills:{}", serviceDatasetCacheKeyId, keepMills);
//step3.2 从源头查询数据
Map<String, String> serviceIdAndServiceVersion = parseCacheServiceKey(serviceDatasetCacheKeyId);
List serviceCacheDataset = loadDatasourceDataset(
serviceIdAndServiceVersion.get(SERVICE_ID_PARAM),
serviceIdAndServiceVersion.get(SERVICE_VERSION_PARAM)
);
Integer datasetSize = ObjectUtils.isEmpty(serviceCacheDataset)?-1:serviceCacheDataset.size();
logger.debug("Success to load dataset from source!serviceCacheKey:{},datasetSize:{}", serviceDatasetCacheKeyId, datasetSize);
//step3.3 插入/更新缓存数据至redis
String cacheKey = String.format(serviceDatasetCacheKeyTemplate, serviceDatasetCacheKeyId);
//注: 缓存数据本身不设置过期时间,除非由应用程序step3.2、step3.3执行成功后,主动写入/更新其值
redisTemplate.opsForValue().set(cacheKey, serviceCacheDataset);
logger.info("Success to refresh cache dataset now!serviceDatasetCacheKeyId:{},datasetSize:{}", serviceDatasetCacheKeyId, datasetSize);
}
});
}