本文介绍基于Redis LUA脚本实现分布式锁的具体方案。为了便于在微服务架构的项目中使用,方案以注解切面的方式实现,可单独提炼项目打成jar包。
一、注解
核心注解有两个CacheLock和CacheParam。
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface CacheLock { /** * redis 锁key的前缀 * * @return redis 锁key的前缀 */ String prefix() default ""; /** * 过期秒数,默认为5秒 * * @return */ int expire() default 5; /** * 轮询锁的秒数,默认为10 * * @return */ int timeout() default 10; /** * <p>Key的分隔符(默认 :)</p> * <p>生成的Key:N:SO1008:500</p> * * @return String */ String delimiter() default ":"; /** * 是否释放锁,默认为true * * @return 秒 */ boolean releaseLock() default true; }
@Target({ElementType.PARAMETER, ElementType.METHOD, ElementType.FIELD}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface CacheParam { /** * 字段名称 * * @return String */ String name() default ""; }
二、分布式锁名称生成器
public interface CacheKeyGenerator { /** * 获取AOP参数,生成指定缓存Key * * @param pjp PJP * @return 缓存KEY */ String getLockKey(ProceedingJoinPoint pjp); }
public class LockKeyGenerator implements CacheKeyGenerator { @Override public String getLockKey(ProceedingJoinPoint pjp) { MethodSignature signature = (MethodSignature) pjp.getSignature(); Method method = signature.getMethod(); CacheLock lockAnnotation = method.getAnnotation(CacheLock.class); final Object[] args = pjp.getArgs(); final Parameter[] parameters = method.getParameters(); StringBuilder builder = new StringBuilder(); //默认解析方法里面带 CacheParam 注解的属性,如果没有尝试着解析实体对象中的 for (int i = 0; i < parameters.length; i++) { final CacheParam annotation = parameters[i].getAnnotation(CacheParam.class); if (annotation == null) { continue; } builder.append(lockAnnotation.delimiter()).append(args[i]); } if (StringUtils.isEmpty(builder.toString())) { final Annotation[][] parameterAnnotations = method.getParameterAnnotations(); for (int i = 0; i < parameterAnnotations.length; i++) { final Object object = args[i]; if (object == null) { continue; } final Field[] fields = object.getClass().getDeclaredFields(); for (Field field : fields) { final CacheParam annotation = field.getAnnotation(CacheParam.class); if (annotation == null) { continue; } field.setAccessible(true); builder.append(lockAnnotation.delimiter()).append(ReflectionUtils.getField(field, object)); } } } return "method_lock:" + lockAnnotation.prefix() + builder.toString(); } }
可以看到,最后生成的分布式锁的Redis缓存名称的为:method_lock:{CacheLock#prefix}:{CacheParam#name}。分隔符‘:’,可以通过CacheLock的delimiter重置。
三、切面
@Aspect public class LockMethodInterceptor { private CacheKeyGenerator cacheKeyGenerator; public LockMethodInterceptor(CacheKeyGenerator cacheKeyGenerator) { this.cacheKeyGenerator = cacheKeyGenerator; } @Around("execution(public * *(..)) && @annotation(com.vcolco.hxdframe.redis.cachelock.CacheLock)") public Object interceptor(ProceedingJoinPoint pjp) throws Throwable { MethodSignature signature = (MethodSignature) pjp.getSignature(); Method method = signature.getMethod(); CacheLock lock = method.getAnnotation(CacheLock.class); if (StringUtils.isEmpty(lock.prefix())) { throw new RuntimeException("lock key can't be null..."); } String lockKey = cacheKeyGenerator.getLockKey(pjp); boolean es = RedisLockUtil.getLockByLua(lockKey, lockKey, lock.expire(), lock.timeout()); try { return es ? pjp.proceed() : CommonResult.fail("并发锁拦截,请勿重复操作"); } finally { if (es && lock.releaseLock()) { try { RedisLockUtil.releaseLockByLua(lockKey, lockKey); } catch (Throwable ignored) { } } } } }
注意,切面切点为所有标准了CacheLock注解的的public方法。
获取锁失败后的返回结果,最好是你的项目使用的统一返回结构,便于处理。
这里还需要重点注意锁的释放逻辑。加CacheLock注解的方法逻辑执行结束后,进入这里的finally,这里有一个判断条件,决定是否需要进行锁的释放。这里的逻辑是:获取到了锁,且CacheLock#releaseLock属性设置为true时,才释放锁。releaseLock默认为true。
为什么这么设计?主要是为了满足CacheLock使用在定时任务@Scheduled的方法上的情况。在微服务架构下,如果我的服务本身是多实例运行,但定时方法只想执行一遍,就可以结合CacheLock,设置为不释放锁,让锁自然过期,达到服务多实例,定时任务也只执行一遍的效果。
四、LUA操作工具类
public class RedisLockUtil { /** * 成功获取锁返回值 */ private static final Long LOCK_SUCCESS = 1L; /** * 成功释放锁返回值 */ private static final Long UNLOCK_SUCCESS = 1L; private static RedisTemplate<String, Object> redisTemplate; /** * 释放锁的LUA脚本:如果value的值与参数相等,则删除,否则返回0 */ private static final String UNLOCK_SCRIPT_LUA = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; /** * 使用LUA脚本释放锁, 原子操作 * * @param lockKey * @param value * @return */ public static boolean releaseLockByLua(String lockKey, String value) { RedisScript<Long> redisScript = new DefaultRedisScript<>(UNLOCK_SCRIPT_LUA, Long.class); return UNLOCK_SUCCESS.equals(redisTemplate.execute(redisScript, Collections.singletonList(lockKey), value)); } /** * 获取锁的LUA脚本:用setNx命令设置值,并设置过期时间 */ private static final String LOCK_SCRIPT_LUA = "if redis.call('setNx',KEYS[1],ARGV[1]) == 1 then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end"; /** * 使用LUA脚本获取锁, 原子操作。过期时间单位为秒 * * @param lockKey * @param value * @param expireTime * @param timeoutTime * @return * @throws InterruptedException */ public static boolean getLockByLua(String lockKey, String value, int expireTime, int timeoutTime) throws InterruptedException { RedisScript<Long> redisScript = new DefaultRedisScript<>(LOCK_SCRIPT_LUA, Long.class); long timeoutMillisecond = timeoutTime * 1000L; // 请求锁时间 long requestTime = System.currentTimeMillis(); // 如果等待锁超时,加锁失败 while (System.currentTimeMillis() - requestTime < timeoutMillisecond) { if (LOCK_SUCCESS.equals(redisTemplate.execute(redisScript, Collections.singletonList(lockKey), value, expireTime))) { return true; } // 获取锁失败,等待100毫秒继续请求 TimeUnit.MILLISECONDS.sleep(100); } return false; } /** * 工具注入 * * @param redisTemplate 模板工具 */ public static void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) { RedisLockUtil.redisTemplate = redisTemplate; } }
获取锁和释放锁都使用Redis的LUA脚本进行原子操作,实现分布式锁功能。
在getLockByLua方法中,实现了锁的竞争逻辑。锁的过期时间通过CacheLock#expire指定;竞争锁的总时间通过CacheLock#timeout指定,每次尝试获取锁的时间间隔为100毫秒。
五、配置类
@Configuration
@EnableConfigurationProperties(MyRedisProperties.class)
@ConditionalOnProperty(prefix = "spring.redis", value = "enabled", matchIfMissing = true) public class RedisAutoConfigure { /** * 缓存模板升级 * * @param factory * @return */ @Bean @Autowired(required = false) @ConditionalOnMissingBean(RedisTemplate.class) public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(factory); FastJsonRedisSerializer<Object> fastJsonRedisSerializer = new FastJsonRedisSerializer<>(Object.class); ParserConfig.getGlobalInstance().setAutoTypeSupport(false); // 设置值(value)的序列化采用FastJsonRedisSerializer。 redisTemplate.setValueSerializer(fastJsonRedisSerializer); redisTemplate.setHashValueSerializer(fastJsonRedisSerializer); // 设置键(key)的序列化采用StringRedisSerializer。 redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; } /** * 初始化上下文工具 * (打工具包需要) * @return */ @Bean @ConditionalOnMissingBean(RedisContextAware.class) public RedisContextAware redisContext() { return new RedisContextAware(); } /** * 分布式锁方法拦截器 * * @return */ @Bean @ConditionalOnMissingBean(LockMethodInterceptor.class) public LockMethodInterceptor lockInterceptor() { return new LockMethodInterceptor(new LockKeyGenerator()); } /** * 装配locker类,并将实例注入到RedisLockUtil中 * * @return */ @Bean @ConditionalOnMissingBean(RedisLockUtil.class) public RedisLockUtil distributedLocker(RedisTemplate<String, Object> redisTemplate) { RedisLockUtil locker = new RedisLockUtil(); RedisLockUtil.setRedisTemplate(redisTemplate); return locker; } }
@ConfigurationProperties(prefix = "spring.redis") public class MyRedisProperties { private String host; private String port; private String password; private int database; private String config; public String getHost() { return host; } public void setHost(String host) { this.host = host; } public String getPort() { return port; } public void setPort(String port) { this.port = port; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public int getDatabase() { return database; } public void setDatabase(int database) { this.database = database; } public String getConfig() { return config; } public void setConfig(String config) { this.config = config; } }
六、上下文
public class RedisContextAware implements ApplicationContextAware { /** * 实现该接口用来初始化应用程序上下文 * 该接口会在执行完毕@PostConstruct的方法后被执行 * 接着,会进行Mapper地址扫描并加载,就是RequestMapping中指定的那个路径 * * @param applicationContext 应用程序上下文 * @throws BeansException beans异常 */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RedisContextUtils.applicationContext = applicationContext; } }
public class RedisContextUtils { public static ApplicationContext applicationContext; /** * 通过名称获取bean */ public static Object get(String name) { return applicationContext.getBean(name); } /** * 通过类型获取bean */ public static Object get(Class<?> clazz) { return applicationContext.getBean(clazz); } /** * 判断某个bean是不是存在 */ public static boolean has(String name) { return applicationContext.containsBean(name); } }
七、效果测试
第一种情况,不使用@CacheParam参数。
测试代码如下:
@Slf4j @RestController public class TestController { @Autowired private TestService testService; private static final ExecutorService executorService = Executors.newFixedThreadPool(5); @GetMapping("/redis") public void testRedis(){ for (int i = 0; i < 5; i++) { executorService.submit(new TestCallable("线程" + i)); } } /** * 资质报警违章分析内部类 */ class TestCallable implements Callable<Long> { private String name; public TestCallable(String name) { this.name = name; } @Override public Long call() { try { testService.testRedis(new TestParam(name)); }catch (Exception e){ log.error("遇到错误!", e); } return 0L; } } }
@Service public class TestService { @CacheLock(prefix = "test") public void testRedis(TestParam testParam){ System.out.println(testParam.getName() + "开始执行任务, 时间点:" + DateFormatUtils.format(new Date(), "HH:mm:ss.SSS")); } }
@Data @AllArgsConstructor @NoArgsConstructor public class TestParam { // @CacheParam(name = "name") private String name; }
测试效果如下:
每个线程执行的时间间隔都在100毫秒左右,达到预期。
第二种情况,如果把TestParam加上@CacheParam注解,会有什么效果?
执行效果如下:
也达到预期,因为加上CacheParam注解后,锁的名称就变成了method_lock:test:{线程名称}。所以锁没有发生同名竞争。
测试第三种情况,不释放锁。修改CacheLock属性,设置过期时间expire为1s,releaseLock为false,代码如下:
结果如下:
可以看到,每个线程执行的时间都间隔1秒左右。说明锁是在1s之后自动过期失效,其他线程才得以执行。
标签:基于,return,String,Redis,class,CacheLock,public,redisTemplate,分布式 From: https://www.cnblogs.com/sunshine-ground-poems/p/17287015.html