简单的滑动窗口限流接口
1.需求
我们公司的流程部分使用了好几个版本的流程服务,当前修改为activiti5.5
,那么原有的流程部分则进行了停止,但是历史流程部分还是需要提供查询,当前功能只需要流程历史三个月前数据查询使用即可,所以部分代码写死了只处理流程三个月历史信息查询。
比如近三个数据,这部分数据则需要先查询老流程wfms数据,还要查询新生成的activiti数据,其中wfms历史数据量非常大,两部分查询数据还需要进行整合导致接口查询效率过低,那么现在需要一个限流机制,想要同一时间段内最多5人查询,多出的人查询则提示当前查询人数过多,请稍后再查,等查询的5人当中有人查询结束,则其他人可以再次进行查询,这个功能就是一个简单的时间窗口限流功能。
2.具体实现过程
此功能主要实现的方式是通过redis
的ZSet
功能实现,具体使用的是 ZRANGEBYSCORE
。
他的主要语法为 ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
,意思是返回指定分数的成员。分数在 min max 之间,返回的成员按照 分数 从小到大排列,比如获取key对应score分数在0~100之内的数据,简单例子如下:
ZRANGEBYSCORE key 0 100
我们的时间窗口正是使用此功能, 我们使用功能的url
作为redis
的key
,使用时间戳作为score
分数, 那么判定一段时间内还有多少个该功能请求未完成可以使用 redisTemplate.opsForZSet().rangeByScore(key, currentTime - timeWindow, currentTime);
获取,再获取返回值的size即可。具体实现过程如下:
- 获取请求信息,判定是否添加时间窗口限流注解,没有则正常执行,添加进行限流校验
- 切面拦截请求信息,判定是否首次添加,首次添加则redis注解生成Zset,key为url,value为uuid,score为当前时间戳,如果不是首次添加,则判定
当前时间-timeWindow时间内Zset值的size是否大于初始设定5
,如果大于则提示查询人数过多提示,否则再添加Zset值,key为url,value为uuid,score为当前时间戳。
3.代码部分
代码部分主要有四部分,具体内容如下:
-
注解部分
-
切面部分
-
公共类部分
-
调用部分
-
注解代码部分实现过程
package cn.git.common.limit; import java.lang.annotation.*; /** * @description: 滑动窗口限流接口 * @program: bank-credit-sy * @author: lixuchun * @create: 2023-03-09 04:24:06 */ @Documented @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface ApiLimitSlidingWindow { /** * 限流枚举 默认流程三个月前跟踪列表 * @return */ SlidingWindowEnum slidingWindowEnum() default SlidingWindowEnum.WORKFLOW_TRACE_HIS_WINDOW; }
-
切面部分
package cn.git.common.limit; import cn.git.common.exception.ServiceException; import cn.git.common.log.LogOperateProperties; import cn.git.redis.RedisUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.AfterReturning; import org.aspectj.lang.annotation.AfterThrowing; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import javax.servlet.http.HttpServletRequest; import java.lang.reflect.Method; /** * @description: * @program: bank-credit-sy * @author: lixuchun * @create: 2023-03-09 05:54:11 */ @Aspect @Slf4j @Component public class LimitSlidingWindowAspect { /** * SCORE_INFO */ private static final ThreadLocal<ZsetScore> SCORE_INFO = new ThreadLocal<>(); /** * 设置分数信息 * @param zsetScore score信息 */ private void setZsetScoreInfo(ZsetScore zsetScore) { if (ObjectUtil.isNotNull(zsetScore)) { SCORE_INFO.set(zsetScore); } } /** * 获取分数信息 * @return 加锁值信息 */ private ZsetScore getZsetScoreInfo() { return SCORE_INFO.get(); } /** * 删除分数信息 */ private void removeZsetScoreInfo() { SCORE_INFO.remove(); } @Autowired private RedisUtil redisUtil; /** * post请求标识 */ private static final String POST_FLAG = "POST"; /** * get请求标识 */ private static final String GET_FLAG = "GET"; /** * 跟踪列表数据 */ private static final String WORKFLOW_TRACE_URI = "/findTraceList"; /** * 限流名称前缀 */ private static final String LIMIT_SLIDING_WINDOW_PREFIX = "DICS_CUS_SLIDING_WINDOW:"; /** * 0 信贷系统 1 风险系统 2 储备系统 */ private static final String SYSTEM_FLAG = "0"; /** * 是否三个月前标识 * 0 否 1 是 */ private static final String IF_THREE_MONTH_AGO_FLAG = "1"; /** * 是否替换maxCount标识 0否 1是 */ private static final String IF_SPECIAL_FLAG = "1"; /** * 切面信息前置切面 * @param joinPoint 切点 * @param apiLimitSlidingWindow 枚举类型 */ @Before("@annotation(apiLimitSlidingWindow)") public void beforePointCut(JoinPoint joinPoint, ApiLimitSlidingWindow apiLimitSlidingWindow) { // 参数为空设置默认值 SlidingWindowEnum slidingWindowEnum = apiLimitSlidingWindow.slidingWindowEnum(); if (ObjectUtil.isNull(slidingWindowEnum)) { slidingWindowEnum = SlidingWindowEnum.WORKFLOW_TRACE_HIS_WINDOW; } // 接收到请求,记录请求内容 ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); HttpServletRequest request = attributes.getRequest(); // 获取服务接口名称 MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); // GET请求 if (GET_FLAG.equals(request.getMethod())) { // 获取参数信息 Object[] args = joinPoint.getArgs(); // 当前如果流程跟踪列表,则需要根据参数判断是否进行限流处理 if (request.getRequestURI().contains(WORKFLOW_TRACE_URI)) { if (ObjectUtil.isNotNull(args[0])) { JSONObject paramJSON = JSON.parseObject(JSONObject.toJSONString(args[0])); String systemFlag = paramJSON.getString("systemFlag"); String ifThreeMonthsAgo = paramJSON.getString("ifThreeMonthsAgo"); // 如果是信贷系统查询三个月以前信息,则需要加入限流处理 boolean ifLoanThreeMonthAgo = IF_THREE_MONTH_AGO_FLAG.equals(ifThreeMonthsAgo) && (StrUtil.isBlank(systemFlag) || SYSTEM_FLAG.equals(systemFlag)); if (ifLoanThreeMonthAgo) { setAndCheckLimitSlidingWindow(slidingWindowEnum, request.getRequestURI(), method, null); } else if (ObjectUtil.isNotNull(slidingWindowEnum.getSpecialMaxCount())) { setAndCheckLimitSlidingWindow(slidingWindowEnum, request.getRequestURI(), method, IF_SPECIAL_FLAG); } } } } else { // POST请求 setAndCheckLimitSlidingWindow(slidingWindowEnum, request.getRequestURI(), method, IF_SPECIAL_FLAG); } } /** * 后置切面,释放查询信息 * @param joinPoint 切面 * @param apiLimitSlidingWindow 时间窗口 */ @AfterReturning("@annotation(apiLimitSlidingWindow)") public void afterReturning(JoinPoint joinPoint, ApiLimitSlidingWindow apiLimitSlidingWindow) { // 获取排序分数信息 ZsetScore zsetScore = SCORE_INFO.get(); SCORE_INFO.remove(); if (ObjectUtil.isNotNull(zsetScore)) { redisUtil.removeZset(zsetScore.getKey(), zsetScore.getValue()); } } /** * 后置异常切面 捕获异常信息 * @param joinPoint joinPoint * @param e 异常信息 */ @AfterThrowing(value = "@annotation(apiLimitSlidingWindow)" , throwing = LogOperateProperties.LOG_EXCEPTION) public void afterThrowing(JoinPoint joinPoint, ApiLimitSlidingWindow apiLimitSlidingWindow, Exception e) { // 获取排序分数信息 ZsetScore zsetScore = SCORE_INFO.get(); SCORE_INFO.remove(); if (ObjectUtil.isNotNull(zsetScore)) { redisUtil.removeZset(zsetScore.getKey(), zsetScore.getValue()); } } /** * 设置限流时间窗口信息 * @param slidingWindowEnum 窗口信息 * @param uri 请求路径信息 * @param method 方法描述 * @param ifSpecial 是否maxCount替换标识 0 否 1 是 */ private void setAndCheckLimitSlidingWindow(SlidingWindowEnum slidingWindowEnum, String uri, Method method, String ifSpecial) { // 获取时间窗口内最大count数量,时间窗口 Integer maxCount = slidingWindowEnum.getMaxCount(); if (IF_SPECIAL_FLAG.equals(ifSpecial) && ObjectUtil.isNotNull(slidingWindowEnum.getSpecialMaxCount())) { maxCount = slidingWindowEnum.getSpecialMaxCount(); } Integer timeWindow = slidingWindowEnum.getTimeWindow(); // 判断是否已经存在限流信息 String slidingWindowKey = LIMIT_SLIDING_WINDOW_PREFIX.concat(uri); if (redisUtil.hasKey(slidingWindowKey)) { // 判断是否已经大于最大限流 if (redisUtil.rangeByScore(slidingWindowKey, timeWindow) > maxCount) { ApiOperation apiOperation = method.getAnnotation(ApiOperation.class); String methodDesc = null; if (ObjectUtil.isNotNull(apiOperation)) { methodDesc = apiOperation.value(); } log.error("服务请求地址[{}]请求人数过多!", uri); if (StrUtil.isNotBlank(methodDesc)) { throw new ServiceException(StrUtil.format("[{}]请求人数过多,请稍后再试!", methodDesc)); } else { throw new ServiceException("当前功能请求人数过多,请稍后再试!"); } } } // 新增限流基数 String scoreValue = redisUtil.addZset(slidingWindowKey); // 设置线程传递值 ZsetScore zsetScore = new ZsetScore(); zsetScore.setKey(slidingWindowKey); zsetScore.setValue(scoreValue); SCORE_INFO.set(zsetScore); } }
-
公共类部分
公共类部分包含枚举部分以及ZsetScore对象类package cn.git.common.limit; import lombok.Getter; /** * @description: 滑动窗口枚举类 * @program: bank-credit-sy * @author: lixuchun * @create: 2023-03-09 04:25:57 */ @Getter public enum SlidingWindowEnum { /** * 限流枚举实例 */ WORKFLOW_TRACE_HIS_WINDOW("老流程三个月前老历史任务查询限流", 60000, 5, 30), WORKFLOW_NEW_TRACE_HIS_WINDOW("新流程三个月前老历史任务查询限流", 60000, 5) ; /** * 限流描述信息 */ private String desc; /** * 时间窗口(毫秒) */ private Integer timeWindow; /** * 时间窗口内默认最大限流 */ private Integer maxCount; /** * 正常业务,特定maxCount信息 */ private Integer specialMaxCount; /** * 自定义限流器构造方法 * @param desc 描述信息 * @param timeWindow 时间窗口 * @param maxCount 时间窗口内最大限流 */ SlidingWindowEnum(String desc, Integer timeWindow, Integer maxCount) { this.desc = desc; this.timeWindow = timeWindow; this.maxCount = maxCount; } /** * 自定义限流器构造方法 * @param desc 描述信息 * @param timeWindow 时间窗口 * @param maxCount 时间窗口内最大限流 * @param specialMaxCount 特定maxCount信息,依情况判定,代替maxCount */ SlidingWindowEnum(String desc, Integer timeWindow, Integer maxCount, Integer specialMaxCount) { this.desc = desc; this.timeWindow = timeWindow; this.maxCount = maxCount; this.specialMaxCount = specialMaxCount; } }
实体对象部分
package cn.git.common.limit; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @description: zsetScore * @program: bank-credit-sy * @author: lixuchun * @create: 2023-03-13 02:35:58 */ @Data @AllArgsConstructor @NoArgsConstructor public class ZsetScore { /** * 列key */ private String key; /** * value值 */ private String value; /** * 分数 */ private long score; }
-
调用部分
调用部分很简单,直接使用窗口注解设定窗口枚举对象即可
4. 测试部分
使用jemeter进行简单测,20个线程同时请求数据