首页 > 数据库 >[系统设计] 分布式系统 (1) 分布式锁(1)基于Redis(setnx)实现分布式锁组件

[系统设计] 分布式系统 (1) 分布式锁(1)基于Redis(setnx)实现分布式锁组件

时间:2023-08-07 20:44:26浏览次数:42  
标签:String serviceDatasetCacheKeyId Redis lockKey 分布式系统 true keepMills redisTemplate

1 序言

近期遇到一个问题:

  • 外部查询缓存了InfluxDB中物联网数据表的字段信息元数据的本地缓存(基于Google Guava Cache、及其RefreshAfterWrite(seconds, TimeUnit.SECOND))的Web接口

为什么会缓存 Influxdb的字段信息呢?因为字段信息非常多,多到每次查询时需要花费1.5-2分钟(80秒以上)

  • 但总会偶尔几次查询超时失败————原因是:Guava的RefreshAfterWrite并不是最初预期【主动刷新(自动刷新)】,而是【懒式刷新(需等待新请求来时,才清理过期数据,并重新从源头load数据到缓存中)】

那么,怎么办呢?
1、我想了个方案:

  • 共享缓存(基于Redis;放弃原来的本地缓存)
  • 分布式锁(基于RedisLock;防止同一工程的多个Pod实例反复执行,浪费Influxdb的集群资源、及影响Influxdb的正常运行)
  • 定时调度(基于Java自带的ScheduledExecutorService[默认实现类:ScheduledThreadPoolExecutor])

看到这里应该知道,本文的主角登场了————分布锁。博主的实现思路如下:

  • Redis(利用好setnx {key} {value} {expirationMilliseconds}的原子特性)

版本: redis version:5.0.14

  • RedisTemplate(spring-data-redis框架封装的 Redis Java 客户端)

版本: spring-data-redis:2.3.9-RELEASE

关于分布式锁的概念、基础部分,参见: [系统设计] 分布式系统 (1) 分布式锁 - 博客园/千千寰宇

2 源码实现


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.util.ObjectUtils;

import java.util.concurrent.TimeUnit;

/**
 * @author johnny-zen
 * @version v1.0
 * @description Redis 分布式锁
 * @refrence-doc
 *  [1] RedisTemplate分布式锁 - python100 - https://www.python100.com/html/96639.html
 *  [2] 美团面试:分布式锁实现方案,你选哪种? - weixin - https://mp.weixin.qq.com/s/2XSXEoDppnZApFyspXTXag
 * @gpt-promt
 */
public class RedisLock {
    private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);
    private RedisTemplate redisTemplate;
    private String lockKey;
    /**
     * key锁对应的值
     */
    private String lockValue;
    private String threadId;
    /**
     * 锁的过期时间 | 单位:毫秒
     */
    private long keepMills;
    private boolean locked = false;

    public RedisLock(RedisTemplate redisTemplate, String lockKey,String lockValue, long keepMills) {
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey;
        this.lockValue = lockValue;
        this.keepMills = keepMills;
        this.threadId = String.valueOf(Thread.currentThread().getId());
    }

    /**
     * 加锁操作
     * @param enableLockAutoExpire 是否启用锁自动过期删除(删除 即 解锁)
     * @return
     */
    public boolean lock(boolean enableLockAutoExpire) {
        return enableLockAutoExpire?lockSupportLockAutoExpire():lockNotSupportLockAutoExpire();
    }

    public boolean lock(){
        return lock(true);
    }

    /**
     * 没有过期时间的加锁操作
     * @return
     */
    public boolean lockNotSupportLockAutoExpire() {
        // 尝试加锁;若获取锁成功,则设置locked为true,返回true
        // 注: setIfAbsent(key, value, keepMills, TimeUnit.MICROSECONDS) => setnx {key=lockKey}
        if( redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue) ) {
            locked = true;
            return true;
        }
        return false;
    }

    /**
     * 加锁操作支持锁自动过期
     * @description
     *  关键原理:
     *      1) 必须基于 `set {key} {value} NX PX {milliseconds}`
     *          `NX` : 只有 key 不存在时才设置 K-V
     * @return
     */
    public boolean lockSupportLockAutoExpire() {
        //String expiredStr = String.valueOf((Long)(System.currentTimeMillis() + keepMills + 1));
        //lockValue = expiredStr;

        // 尝试加锁;若获取锁成功,则设置locked为true,返回true
        // 注: setIfAbsent(key, value, keepMills, TimeUnit.MICROSECONDS) => setnx {key=lockKey} {value=expiredStr} {expiration=keepMills}
        if( redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue,  keepMills, TimeUnit.MICROSECONDS) ) {
            locked = true;
            return true;
        }

        //[错误示范]不能先设置值,再设置过期时间 | 原因: 分为2步骤,非不具备原子性,有可能在设置过期时间之前宕机,会造成死锁(key永久存在)
        //if (redisTemplate.opsForValue().setIfAbsent(lockKey, expiredStr)) {//
        //    locked = true;
        //    if(enableLockAutoExpire){
        //        redisTemplate.expire(lockKey, keepMills, TimeUnit.MICROSECONDS);
        //    }
        //    return true;
        //}
        return false;
    }

    /**
     * 解锁操作
     */
    public void unlock() {
        if (locked) {
            redisTemplate.delete(lockKey);
            locked = false;
        }
    }
}

3 UseDemo

/**
 * 刷新缓存数据
 */
public void refreshCacheDataset(){
	cacheServicesCollection.entrySet().stream().forEach( serviceConfigEntry -> {
		String serviceDatasetCacheKeyId = serviceConfigEntry.getKey();// 即 CacheServiceKey
		//step3.1 定时刷新时,加锁 | 定时刷新缓存信息的分布式锁 : redisLock
		String lockKey = String.format(serviceDatasetCacheTaskLockKeyTemplate, serviceDatasetCacheKeyId);
		String lockValue = String.valueOf(System.currentTimeMillis());
		Long keepMills = refreshAfterWrite*1000L;
		RedisLock redisLock = new RedisLock(redisTemplate, lockKey, lockValue, keepMills);
		boolean locked = redisLock.lock(true);//加锁 (程序不主动解锁,通过过期时间被动/自动解锁)
		if(!locked){//未获得锁 => 取消本次任务
			logger.debug("Fail and cancel current dataset's service cache task because that fail to get the task lock!serviceDatasetCacheKeyId:{}", serviceDatasetCacheKeyId);
		} else {
			logger.debug("Success to get task lock!serviceDatasetCacheKeyId:{},keepMills:{}", serviceDatasetCacheKeyId, keepMills);
			//step3.2 从源头查询数据
			Map<String, String> serviceIdAndServiceVersion = parseCacheServiceKey(serviceDatasetCacheKeyId);
			List serviceCacheDataset = loadDatasourceDataset(
					serviceIdAndServiceVersion.get(SERVICE_ID_PARAM),
					serviceIdAndServiceVersion.get(SERVICE_VERSION_PARAM)
			);
			Integer datasetSize = ObjectUtils.isEmpty(serviceCacheDataset)?-1:serviceCacheDataset.size();
			logger.debug("Success to load dataset from source!serviceCacheKey:{},datasetSize:{}", serviceDatasetCacheKeyId, datasetSize);
			//step3.3 插入/更新缓存数据至redis
			String cacheKey = String.format(serviceDatasetCacheKeyTemplate, serviceDatasetCacheKeyId);
			//注: 缓存数据本身不设置过期时间,除非由应用程序step3.2、step3.3执行成功后,主动写入/更新其值
			redisTemplate.opsForValue().set(cacheKey, serviceCacheDataset);
			logger.info("Success to refresh cache dataset now!serviceDatasetCacheKeyId:{},datasetSize:{}", serviceDatasetCacheKeyId, datasetSize);
		}

	});
}

X 参考文献

标签:String,serviceDatasetCacheKeyId,Redis,lockKey,分布式系统,true,keepMills,redisTemplate
From: https://www.cnblogs.com/johnnyzen/p/17612681.html

相关文章

  • springboot中redis作为缓存使用
    springboot中redis作为缓存使用springboot中的redis作为缓存使用application.yamlserver:port:8089#servlet:#context-path:/demoRedis1spring:redis:host:127.0.0.1port:6379password:pom文件<!--添加的依赖--><!--Redis......
  • Airflow 2.2.6 + MySQL 8.0.27 + Redis 7.0.12 部署Airflow任务调度平台
    本docker-compose文件在centos7.9系统,docker版本为24.0.2上测试的如果你的docker版本低于24.xxx就需要手动安装docker-compose,高于24就不需要安装了,docker已经自带了官方文档,关于docker部署1.先执行mkdir-p./dags./logs./plugins./config./......
  • [Redis]Redis (2) 扩展数据结构: Bitmap
    redisbitmapjavaspringboot1Redis数据结构之bitmap#设置bitmap字符串指定位置的值|SETBITkeyoffsetvaluesetbitsingleSquare:recommend:userId:39991>>0#查看bitmap字符串的长度|占用字节数:=(max_offset/8)+1strlensingleSquare:recommend:us......
  • Redis从入门到放弃(9):集群模式
    前面文章我们介绍了Redis的主从模式是一种在Redis中实现高可用性的方式,但也存在一些缺点。1、主从模式缺点写入单点故障:在主从模式中,写入操作只能在主节点进行,如果主节点宕机,写入将无法执行。虽然可以通过升级从节点为主节点来解决,但这会增加故障切换的复杂性。写入压力分......
  • Redis精通系列——LFU算法详述(Least Frequently Used - 最不经常使用)
    转:Redis精通系列——LFU算法详述(LeastFrequentlyUsed-最不经常使用)  ......
  • Scrapy: scrapy_redis
    1#安装2pip3installscrapy_redis3#源码4https://github.com/rmax/scrapy-redis.git5#文档6https://github.com/rmax/scrapy-redis78#配置说明:https://github.com/rmax/scrapy-redis/wiki/Usage9REDIS_HOST='localhost'10REDIS_PORT=6......
  • 解决报错:Redis ERR unknown command ‘FLUSHDB‘
    RedisERRunknowncommand‘FLUSHDB’报错信息:ERRunknowncommand`flushdb`ERRunknowncommand`flushall`解决方案:我的redis版本是5.0.7修改配置文件打开/etc/redis/redis.conf文件,将下面两行代码注释掉rename-commandFLUSHALL37_dba_FLUSHALLrename-commandFLUSHDB......
  • Redis 2.8主从集群及故障自动切换(转载)
    Redis2.8主从集群及故障自动切换Redis官网:https://redis.io/一、架构操作系统:Debian7Master:127.0.0.1,端口:6379Slave1::127.0.0.1,端口:6378Slave2::127.0.0.1,端口:6377Sentinel1:127.0.0.1,端口:26379Sentinel2:127.0.0.1,端口:26378Sentinel3:127.0.0.1,端口:26377二、主从配置1、下载redis压......
  • 两台服务器redis 3.0.5分布式集群安装部署
    两台服务器redis3.0.5分布式集群安装部署目前redis支持的cluster特性:1):节点自动发现2):slave->master选举,集群容错3):Hotresharding:在线分片4):集群管理:clusterxxx5):基于配置(nodes-port.conf)的集群管理6):ASK转向/MOVED转向机制.一、redis集群安装两台电脑:   192.......
  • 深度学习框架 —— 分布式训练
    现在深度学习的模型结构越来越大,参数动不动都是上亿甚至上千亿,这也对训练模型的资源量有很高的要求,显然单个机器上要训练这么大的网络是不现实的,因此学术界和工业界自然开始研究用分布式训练。也就是将一个机器学习模型任务拆分成多个子任务,并将子任务分发给多个计算节点,解决资源......