首页 > 数据库 >基于Redis的分布式锁实现方案

基于Redis的分布式锁实现方案

时间:2023-07-07 12:13:40浏览次数:52  
标签:基于 return String Redis class CacheLock public redisTemplate 分布式

  本文介绍基于Redis LUA脚本实现分布式锁的具体方案。为了便于在微服务架构的项目中使用,方案以注解切面的方式实现,可单独提炼项目打成jar包。

一、注解

  核心注解有两个CacheLock和CacheParam。

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface CacheLock {

    /**
     * redis 锁key的前缀
     *
     * @return redis 锁key的前缀
     */
    String prefix() default "";

    /**
     * 过期秒数,默认为5秒
     *
     * @return
     */
    int expire() default 5;

    /**
     * 轮询锁的秒数,默认为10
     *
     * @return
     */
    int timeout() default 10;

    /**
     * <p>Key的分隔符(默认 :)</p>
     * <p>生成的Key:N:SO1008:500</p>
     *
     * @return String
     */
    String delimiter() default ":";

    /**
     * 是否释放锁,默认为true
     *
     * @return 秒
     */
    boolean releaseLock() default true;
}
@Target({ElementType.PARAMETER, ElementType.METHOD, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface CacheParam {

    /**
     * 字段名称
     *
     * @return String
     */
    String name() default "";
}

二、分布式锁名称生成器

public interface CacheKeyGenerator {

    /**
     * 获取AOP参数,生成指定缓存Key
     *
     * @param pjp PJP
     * @return 缓存KEY
     */
    String getLockKey(ProceedingJoinPoint pjp);
}
public class LockKeyGenerator implements CacheKeyGenerator {

    @Override
    public String getLockKey(ProceedingJoinPoint pjp) {
        MethodSignature signature = (MethodSignature) pjp.getSignature();
        Method method = signature.getMethod();
        CacheLock lockAnnotation = method.getAnnotation(CacheLock.class);
        final Object[] args = pjp.getArgs();
        final Parameter[] parameters = method.getParameters();
        StringBuilder builder = new StringBuilder();
        //默认解析方法里面带 CacheParam 注解的属性,如果没有尝试着解析实体对象中的
        for (int i = 0; i < parameters.length; i++) {
            final CacheParam annotation = parameters[i].getAnnotation(CacheParam.class);
            if (annotation == null) {
                continue;
            }
            builder.append(lockAnnotation.delimiter()).append(args[i]);
        }
        if (StringUtils.isEmpty(builder.toString())) {
            final Annotation[][] parameterAnnotations = method.getParameterAnnotations();
            for (int i = 0; i < parameterAnnotations.length; i++) {
                final Object object = args[i];
                if (object == null) {
                    continue;
                }
                final Field[] fields = object.getClass().getDeclaredFields();
                for (Field field : fields) {
                    final CacheParam annotation = field.getAnnotation(CacheParam.class);
                    if (annotation == null) {
                        continue;
                    }
                    field.setAccessible(true);
                    builder.append(lockAnnotation.delimiter()).append(ReflectionUtils.getField(field, object));
                }
            }
        }
        return "method_lock:" + lockAnnotation.prefix() + builder.toString();
    }
}

  可以看到,最后生成的分布式锁的Redis缓存名称的为:method_lock:{CacheLock#prefix}:{CacheParam#name}。分隔符‘:’,可以通过CacheLock的delimiter重置。

三、切面

@Aspect
public class LockMethodInterceptor {

    private CacheKeyGenerator cacheKeyGenerator;

    public LockMethodInterceptor(CacheKeyGenerator cacheKeyGenerator) {
        this.cacheKeyGenerator = cacheKeyGenerator;
    }

    @Around("execution(public * *(..)) && @annotation(com.vcolco.hxdframe.redis.cachelock.CacheLock)")
    public Object interceptor(ProceedingJoinPoint pjp) throws Throwable {
        MethodSignature signature = (MethodSignature) pjp.getSignature();
        Method method = signature.getMethod();
        CacheLock lock = method.getAnnotation(CacheLock.class);
        if (StringUtils.isEmpty(lock.prefix())) {
            throw new RuntimeException("lock key can't be null...");
        }
        String lockKey = cacheKeyGenerator.getLockKey(pjp);
        boolean es = RedisLockUtil.getLockByLua(lockKey, lockKey, lock.expire(), lock.timeout());
        try {
            return es ? pjp.proceed() : CommonResult.fail("并发锁拦截,请勿重复操作");
        } finally {
            if (es && lock.releaseLock()) {
                try {
                    RedisLockUtil.releaseLockByLua(lockKey, lockKey);
                } catch (Throwable ignored) {
                }
            }
        }
    }
}

  注意,切面切点为所有标准了CacheLock注解的的public方法。

  获取锁失败后的返回结果,最好是你的项目使用的统一返回结构,便于处理。

  这里还需要重点注意锁的释放逻辑。加CacheLock注解的方法逻辑执行结束后,进入这里的finally,这里有一个判断条件,决定是否需要进行锁的释放。这里的逻辑是:获取到了锁,且CacheLock#releaseLock属性设置为true时,才释放锁。releaseLock默认为true。

  为什么这么设计?主要是为了满足CacheLock使用在定时任务@Scheduled的方法上的情况。在微服务架构下,如果我的服务本身是多实例运行,但定时方法只想执行一遍,就可以结合CacheLock,设置为不释放锁,让锁自然过期,达到服务多实例,定时任务也只执行一遍的效果。

四、LUA操作工具类

public class RedisLockUtil {

    /**
     * 成功获取锁返回值
     */
    private static final Long LOCK_SUCCESS = 1L;

    /**
     * 成功释放锁返回值
     */
    private static final Long UNLOCK_SUCCESS = 1L;


    private static RedisTemplate<String, Object> redisTemplate;

    /**
     * 释放锁的LUA脚本:如果value的值与参数相等,则删除,否则返回0
     */
    private static final String UNLOCK_SCRIPT_LUA = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

    /**
     * 使用LUA脚本释放锁, 原子操作
     *
     * @param lockKey
     * @param value
     * @return
     */
    public static boolean releaseLockByLua(String lockKey, String value) {
        RedisScript<Long> redisScript = new DefaultRedisScript<>(UNLOCK_SCRIPT_LUA, Long.class);
        return UNLOCK_SUCCESS.equals(redisTemplate.execute(redisScript, Collections.singletonList(lockKey), value));
    }

    /**
     * 获取锁的LUA脚本:用setNx命令设置值,并设置过期时间
     */
    private static final String LOCK_SCRIPT_LUA = "if redis.call('setNx',KEYS[1],ARGV[1]) == 1 then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end";

    /**
     * 使用LUA脚本获取锁, 原子操作。过期时间单位为秒
     *
     * @param lockKey
     * @param value
     * @param expireTime
     * @param timeoutTime
     * @return
     * @throws InterruptedException
     */
    public static boolean getLockByLua(String lockKey, String value, int expireTime, int timeoutTime) throws InterruptedException {
        RedisScript<Long> redisScript = new DefaultRedisScript<>(LOCK_SCRIPT_LUA, Long.class);

        long timeoutMillisecond = timeoutTime * 1000L;
        // 请求锁时间
        long requestTime = System.currentTimeMillis();
        // 如果等待锁超时,加锁失败
        while (System.currentTimeMillis() - requestTime < timeoutMillisecond) {
            if (LOCK_SUCCESS.equals(redisTemplate.execute(redisScript, Collections.singletonList(lockKey), value, expireTime))) {
                return true;
            }
            // 获取锁失败,等待100毫秒继续请求
            TimeUnit.MILLISECONDS.sleep(100);
        }
        return false;
    }

    /**
     * 工具注入
     *
     * @param redisTemplate 模板工具
     */
    public static void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
        RedisLockUtil.redisTemplate = redisTemplate;
    }
}

  获取锁和释放锁都使用Redis的LUA脚本进行原子操作,实现分布式锁功能。

  在getLockByLua方法中,实现了锁的竞争逻辑。锁的过期时间通过CacheLock#expire指定;竞争锁的总时间通过CacheLock#timeout指定,每次尝试获取锁的时间间隔为100毫秒。

五、配置类

@Configuration
@EnableConfigurationProperties(MyRedisProperties.class)
@ConditionalOnProperty(prefix = "spring.redis", value = "enabled", matchIfMissing = true) public class RedisAutoConfigure { /** * 缓存模板升级 * * @param factory * @return */ @Bean @Autowired(required = false) @ConditionalOnMissingBean(RedisTemplate.class) public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(factory); FastJsonRedisSerializer<Object> fastJsonRedisSerializer = new FastJsonRedisSerializer<>(Object.class); ParserConfig.getGlobalInstance().setAutoTypeSupport(false); // 设置值(value)的序列化采用FastJsonRedisSerializer。 redisTemplate.setValueSerializer(fastJsonRedisSerializer); redisTemplate.setHashValueSerializer(fastJsonRedisSerializer); // 设置键(key)的序列化采用StringRedisSerializer。 redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; } /** * 初始化上下文工具 * (打工具包需要) * @return */ @Bean @ConditionalOnMissingBean(RedisContextAware.class) public RedisContextAware redisContext() { return new RedisContextAware(); } /** * 分布式锁方法拦截器 * * @return */ @Bean @ConditionalOnMissingBean(LockMethodInterceptor.class) public LockMethodInterceptor lockInterceptor() { return new LockMethodInterceptor(new LockKeyGenerator()); } /** * 装配locker类,并将实例注入到RedisLockUtil中 * * @return */ @Bean @ConditionalOnMissingBean(RedisLockUtil.class) public RedisLockUtil distributedLocker(RedisTemplate<String, Object> redisTemplate) { RedisLockUtil locker = new RedisLockUtil(); RedisLockUtil.setRedisTemplate(redisTemplate); return locker; } }
@ConfigurationProperties(prefix = "spring.redis")
public class MyRedisProperties {
    private String host;

    private String port;

    private String password;

    private int database;

    private String config;

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public String getPort() {
        return port;
    }

    public void setPort(String port) {
        this.port = port;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getDatabase() {
        return database;
    }

    public void setDatabase(int database) {
        this.database = database;
    }

    public String getConfig() {
        return config;
    }

    public void setConfig(String config) {
        this.config = config;
    }
}

六、上下文

public class RedisContextAware implements ApplicationContextAware {

    /**
     * 实现该接口用来初始化应用程序上下文
     * 该接口会在执行完毕@PostConstruct的方法后被执行
     * 接着,会进行Mapper地址扫描并加载,就是RequestMapping中指定的那个路径
     *
     * @param applicationContext 应用程序上下文
     * @throws BeansException beans异常
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RedisContextUtils.applicationContext = applicationContext;
    }
}
public class RedisContextUtils {

    public static ApplicationContext applicationContext;

    /**
     * 通过名称获取bean
     */
    public static Object get(String name) {
        return applicationContext.getBean(name);
    }


    /**
     * 通过类型获取bean
     */
    public static Object get(Class<?> clazz) {
        return applicationContext.getBean(clazz);
    }

    /**
     * 判断某个bean是不是存在
     */
    public static boolean has(String name) {
        return applicationContext.containsBean(name);
    }
}

七、效果测试

  第一种情况,不使用@CacheParam参数。

  测试代码如下:

@Slf4j
@RestController
public class TestController {

    @Autowired
    private TestService testService;

    private static final ExecutorService executorService = Executors.newFixedThreadPool(5);


    @GetMapping("/redis")
    public void testRedis(){
        for (int i = 0; i < 5; i++) {
            executorService.submit(new TestCallable("线程" + i));
        }
    }

    /**
     * 资质报警违章分析内部类
     */
    class TestCallable implements Callable<Long> {
        private String name;

        public TestCallable(String name) {
            this.name = name;
        }
        @Override
        public Long call() {
            try {
                testService.testRedis(new TestParam(name));
            }catch (Exception e){
                log.error("遇到错误!", e);
            }
            return 0L;
        }
    }
}
@Service
public class TestService {

    @CacheLock(prefix = "test")
    public void testRedis(TestParam testParam){
        System.out.println(testParam.getName() + "开始执行任务, 时间点:" + DateFormatUtils.format(new Date(), "HH:mm:ss.SSS"));
    }
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TestParam {

//    @CacheParam(name = "name")
    private String name;
}

  测试效果如下:
  

  每个线程执行的时间间隔都在100毫秒左右,达到预期。

  第二种情况,如果把TestParam加上@CacheParam注解,会有什么效果?
  执行效果如下:

  

   也达到预期,因为加上CacheParam注解后,锁的名称就变成了method_lock:test:{线程名称}。所以锁没有发生同名竞争。

  测试第三种情况,不释放锁。修改CacheLock属性,设置过期时间expire为1s,releaseLock为false,代码如下:

  

  结果如下:

  

   可以看到,每个线程执行的时间都间隔1秒左右。说明锁是在1s之后自动过期失效,其他线程才得以执行。

标签:基于,return,String,Redis,class,CacheLock,public,redisTemplate,分布式
From: https://www.cnblogs.com/sunshine-ground-poems/p/17287015.html

相关文章

  • 2021年电赛题目基于互联网的摄像测量系统(D题)
    基于互联网的摄像测量系统设计报告2021年全国大学生电子设计竞赛试题<10cmO拍摄方向θ1mx1m拍摄方向BA边长为1米的正方形测试区域l互联网参赛注意事项(1)11月4日8:00竞赛正式开始。本科组参赛队只能在【本科组】题目中任选一题;高职高专组参赛队在【高职高专组】题目中任......
  • python下使用redis分布式锁
    python下使用redis分布式锁1.什么场景需要分布式锁?我们在写业务逻辑的时候,如果多个线程同时访问某个共享变量,一般是对变量进行上锁或者使用queue.Queue()实现,以做到线程安全保证数据不被污染。在单机部署的情况下这样做完全没问题,但是随着业务规模的发展,某些单机部署的系统......
  • C/C++基于哈夫曼树的文件压缩软件[2023-07-06]
    C/C++基于哈夫曼树的文件压缩软件[2023-07-06]案例2基于哈夫曼树的文件压缩软件2.1简介数据压缩分为两大类:无损压缩和有损压缩。无损压缩是指压缩后还能还原回原来的数据的压缩方法,有损压缩一般是针对图像、视频或音频进行的压缩,在压缩后图像、视频或音频的质量会有所下降......
  • 【慢慢买嗅探神器】基于scrapy+pyqt的电商数据爬虫系统
    项目预览项目演示代码部分爬虫模块GUI......
  • Redis学习笔记(上)
    Redisremotedictionaryserver远程字典服务是一个开源的使用ANSIC语言编写、支持网络、可基于内存亦可持久化的日志型、key-value数据库,并提供多种语言的API。和memcached一样,为了保证效率,数据都是缓存在内存中的。区别的是redis会周期性的把更新的数据写入磁盘或者修......
  • redis的订阅发布功能中,前端如何监听到消息,并修改前端页面。
    ......
  • 基于Aidlux的自动驾驶之智能预警部署
    YOLOP能同时处理目标检测、可行驶区域分割、车道线检测三个视觉感知任务,并速度优异、保持较好精度进行工作,代码开源。它是华中科技大学---王兴刚团队,在全景驾驶感知方面提出的模型。这是实操视频:https://www.bilibili.com/video/BV1LX4y1i7mi/?vd_source=4b36e62d12ccafa4305abaa......
  • docker安装Redis 6.0.8
    1、前置说明参考:尚硅谷(周阳)老师笔记写的。注意:删除容器后,数据也会从(宿主机)加载的。2、安装2.1、拉取Redis6.0.8dockerpullredis:6.0.82.2、宿主机:新建redis目录mkdir-p/app/redis2.2.1、创建redis配置文件#一、进入目录cd/app/redis#二、创建redis.conf......
  • Redis实战(黑马点评--分布式锁)
    基本原理和不同的实现方式分布式锁的核心思想就是让大家共用同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路 可见性:多个线程都能看到相同的结果。注意:这里说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的......
  • 基于Jenkins+Gitee实现SpringBoot项目自动化部署(Docker版)
    前言:上一篇笔记:基于Jenkins+Gitee实现SpringBoot项目自动化部署(非Docker版)。本篇笔记介绍一下Docker版本的Jenkins如何实现项目自动化部署。本案例基于Linux CentOS7服务器,防火墙开放8080端口(Jenkins使用),80端口(项目使用),云服务器直接在控制台配置安全规则即可。1......