(目录)
什么是限流?为什么要限流?
限流,这个词其实并不陌生,在我们生活中也随处可见。
做核酸时,工作人员会在核酸检测点的空地上摆放着弯弯曲曲的围栏,人们排着队左拐右拐的往前移动,其实这么做的目的就是限流!因为核酸检测的窗口是有限的,一下子进那么多人,没那么多空间让人们站下,就会造成拥挤,甚至会造成事故。所以需要限流!
任何系统它处理请求的能力都是有限的,一旦请求多到超出系统的处理极限,系统就会崩溃
。
对于生产环境,崩溃是一个很大的生产事故,保不准就会给公司造成很大的损失,轻则赔款,重则判刑都是有可能的。所以今天来聊一下如何实现限流。
限流场景
限流的需求出现在许多常见的场景中:
秒杀活动,有人使用软件恶意刷单抢货
,需要限流防止机器参与活动某api被各式各样系统广泛调用
,严重消耗网络、内存等资源,需要合理限流- 淘宝获取ip所在城市接口、微信公众号识别微信用户等开发接口,
免费提供给用户时需要限流
,更具有实时性和准确性的接口需要付费
重试、限流、熔断、降级被称为分布式系统高可用的四板斧。
不可避免地,
第一,我们一定要设置超时;
第二,要在一些场景里面去考虑重试的逻辑;
第三,考虑熔断的逻辑,不要被下游拖死;
第四,一定要有限流的逻辑,不要被上游打死。
当今社会,互联网公司的流量巨大,系统上线前需要对系统进行全面的流量峰值评估,以判断系统所能承载的最大瞬时请求数,尤其是像各种秒杀促销活动,为了保证系统不被巨大的流量压垮,会事先评估系统最大请数,并设置限流逻辑,以便在系统流量到达设定的阈值时,拒绝掉这部分流量,从而确保系统不会崩溃。
限流会导致用户在短时间内(这个时间段是毫秒级的)系统不可用,假设系统设置的每秒流量阈值是100,理论上一秒内第101个及之后的请求都会被限流,相当于拒绝服务,下一秒进来的请求能正常被响应,这也就是为什么我们抢购时,一会儿能进页面一会儿显示"请稍后"之类的提示语。相比于系统的短暂不可用,要比系统崩溃要好太多了。
对于限流有很多方式,最经典的几种就是:计数器法、滑动窗口、漏桶法、令牌桶
等,文章采用Redis + Lua脚本实现高性能的分布式限流。
分布式限流
所谓的分布式限流,其实道理很简单。分布式区别于单机限流的场景,它把整个分布式集群环境中所有服务器当做一个整体来考量。比如说针对IP限流,我们限制了1个IP每秒最多10个访问,不管来自这个IP地址的请求落在了哪台机器上,只要是访问了集群中的服务节点,那么都会受到限制规则的制约。
从上面的例子不难看出,我们必须将限流信息保存在一个"中心化"的组件上,这样它就可以获取到集群中所有机器的访问状态
。
目前有两个比较主流的限流方案
:
网关层限流
。将限流规则应用在所有流量的入口处中间件限流
。将限流信息存储在分布式环境中某个中间件里(比如redis),每个组件都可以从这里获取到当前时间的流量统计,从而决定是否放行还是拒绝。
限流方案常用算法
1. 令牌桶算法
Token Bucket令牌桶算法
是目前应用最为广泛的限流算法,顾名思义,它有以下两个关键角色
:
令牌
获取到令牌的Request才会被处理,其他Requests要么排队要么被直接丢弃桶
用来装令牌的地方,所有Request都从这个桶里面获取令牌
生成令牌
这个流程涉及到令牌生成器和令牌桶,前面我们提到过令牌桶是一个装令牌的地方,既然是个桶那么必然有一个容量,也就是说令牌桶所能容纳的令牌数量是一个固定的数值
。
对于令牌生成器来说,它会根据一个预定的速率向桶中添加令牌
,比如我们可以配置让它以每秒100个请求的速率发放令牌,或者每分钟50个。注意这里的发放速度是匀速,也就是说这50个令牌并非是在每个时间窗口刚开始的时候一次性发放,而是会在这个时间窗口内匀速发放。
在令牌发放器就是一个水龙头,假如在下面接水的桶子满了,那么自然这个水(令牌)就流到了外面。在令牌发放过程中也一样,令牌桶的容量是有限的,如果当前已经放满了额定容量的令牌,那么新来的令牌就会被丢弃掉
。
获取令牌
每个访问请求到来后,必须获取到一个令牌才能执行后面的逻辑
。假如令牌的数量少,而访问请求较多的情况下,一部分请求自然无法获取到令牌,那么这个时候我们可以设置一个“缓冲队列”来暂存这些多余的请求
。
缓冲队列其实是一个可选的选项
,并不是所有应用了令牌桶算法的程序都会实现队列。当有缓存队列存在的情况下,那些暂时没有获取到令牌的请求将被放到这个队列中排队,直到新的令牌产生后,再从队列头部拿出一个请求来匹配令牌。
当队列已满的情况下,这部分访问请求将被丢弃
。在实际应用中我们还可以给这个队列加一系列的特效,比如设置队列中请求的存活时间,或者将队列改造为PriorityQueue,根据某种优先级排序,而不是先进先出。算法是死的,人是活的,先进的生产力来自于不断的创造,在技术领域尤其如此。
2)漏桶算法
Leaky Bucket
漏桶算法的前半段和令牌桶类似,但是操作的对象不同
。
令牌桶是将令牌放入桶里,而漏桶是将访问请求的数据包放到桶里
。同样的是,如果桶满了,那么后面新来的数据包将被丢弃。
漏桶算法的后半程
是有鲜明特色的,它永远只会以一个恒定的速率将数据包从桶内流出
。打个比方,如果我设置了漏桶可以存放100个数据包,然后流出速度是1s一个,那么不管数据包以什么速率流入桶里,也不管桶里有多少数据包,漏桶能保证这些数据包永远以1s一个的恒定速度被处理。
漏桶 vs 令牌桶的区别
根据它们各自的特点不难看出来,这两种算法都有一个“恒定”的速率和“不定”的速率。
令牌桶是以恒定速率创建令牌
,但是访问请求获取令牌的速率“不定”,反正有多少令牌发多少,令牌没了就干等。
漏桶是以“恒定”的速率处理请求
,但是这些请求流入桶的速率是“不定”的。
从这两个特点来说,漏桶的天然特性决定了它不会发生突发流量
,就算每秒1000个请求到来,那么它对后台服务输出的访问速率永远恒定。
令牌桶则不同,其特性可以“预存”一定量的令牌
,因此在应对突发流量的时候可以在短时间消耗所有令牌,其突发流量处理效率会比漏桶高,但是导向后台系统的压力也会相应增多
。
限流的主流方案
1. Guava RateLimiter客户端限流
RateLimiter
是guava提供的基于令牌桶算法的实现类
,可以非常简单的完成限流特技,并且根据系统的实际情况来调整生成token的速率。通常可应用于抢购限流防止冲垮系统;限制某接口、服务单位时间内的访问量,譬如一些第三方服务会对用户访问量进行限制;限制网速,单位时间内只允许上传下载多少字节等。
引入依赖
1.引入maven
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
编写代码
2.编写Controller
通过create方法创建限流器,参数是每秒允许通过的次数,等同于QPS
RateLimiter.create(permitsPerSecond)
请求进来时,先执行tryAcquire方法获取令牌
,获得到了返回true,则进行业务逻辑,否则该请求就被限流拒绝了。
@RestController
@Slf4j
public class Controller{
//每秒钟可以创建两个令牌
RateLimiter limiter = RateLimiter.create(2.0);
//非阻塞限流
@GetMapping("/tryAcquire")
public String tryAcquire(Integer count){
//count 每次消耗的令牌
if(limiter.tryAcquire(count)){
log.info("成功,允许通过,速率为{}",limiter.getRate());
return "success";
}else{
log.info("错误,不允许通过,速率为{}",limiter.getRate());
return "fail";
}
}
//限定时间的非阻塞限流
@GetMapping("/tryAcquireWithTimeout")
public String tryAcquireWithTimeout(Integer count, Integer timeout){
//count 每次消耗的令牌 timeout 超时等待的时间
if(limiter.tryAcquire(count,timeout,TimeUnit.SECONDS)){
log.info("成功,允许通过,速率为{}",limiter.getRate());
return "success";
}else{
log.info("错误,不允许通过,速率为{}",limiter.getRate());
return "fail";
}
}
//同步阻塞限流
@GetMapping("/acquire")
public String acquire(Integer count){
limiter.acquire(count);
log.info("成功,允许通过,速率为{}",limiter.getRate());
return "success";
}
}
rateLimiter.tryAcquire()
这里为了能在一个接口内,针对不同业务方做分开限流,目的是为了不让一个业务方的调用量突然增大,导致其他的业务方也被限流了。
用一个map集合去管理,key是业务方唯一标识,value是ratelimiter类。这样也可以根据不同业务方的QPS去设置不同的permitsPerSecond。
2. 基于 Redis + Lua 限流 (计数器)
Lua脚本
Lua是一个很小巧精致的语言,它的诞生(1993年)甚至比JDK 1.0还要早。Lua是由标准的C语言编写的,它的源码部分不过2万多行C代码,甚至一个完整的Lua解释器也就200k的大小。
Lua往大了说是一个新的编程语言,往小了说就是一个脚本语言。对于有编程经验的同学,拿到一个Lua脚本大体上就能把业务逻辑猜的八九不离十了。
Redis内置了Lua解释器,执行过程保证原子性
Lua脚本的优点:
- 减少网络开销:使用Lua脚本,无需向Redis 发送多次请求,执行一次即可,减少网络传输
- 原子操作:Redis 将整个Lua脚本作为一个命令执行,原子,无需担心并发
- 复用:Lua脚本一旦执行,会永久保存 Redis 中,,其他客户端可复用
编写模拟限流的 Lua
-- 模拟限流
-- 用作限流的key
local key = 'my key'
-- 限流的最大阈值
local limit = 2
-- 当前限流大小
local currentLimit = 1
-- local currentLimit = 1
-- 是否超过限流标准
if currentLimit + 1 > limit then
print 'reject'
return false
else
print 'accept'
return true
end
优化上述的 Lua
-- 获取调用脚本时传入的第一个key值(用作限流的 key)
local key = KEYS[1]
-- 获取调用脚本时传入的第一个参数值(限流大小)
local limit = tonumber(ARGV[1])
-- 获取调用脚本时传入的第二个参数值(限流时长)
local time = tonumber(ARGV[2])
-- 获取当前流量大小
local curentLimit = tonumber(redis.call('get', key) or "0")
-- 是否超出限流
if curentLimit + 1 > limit then
-- 返回(拒绝)
return 0
else
-- 没有超出 value + 1
redis.call("INCRBY", key, 1)
-- 设置过期时间
redis.call("EXPIRE", key, time)
-- 返回(放行)
return 1
end
参数说明:
- 通过KEYS[1] 获取传入的key参数
- 通过ARGV[1]获取传入的limit参数
- redis.call方法,从缓存中get和key相关的值,如果为null那么就返回0
- 接着判断缓存中记录的数值是否会大于限制大小,如果超出表示该被限流,返回0
- 如果未超过,那么该key的缓存值+1,并设置过期时间为1秒钟以后,并返回缓存值+1
整合到项目中
引入依赖
首先创建一个springboot项目,在pom.xml中引入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>21.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
配置RedisTemplate
首先application.properties配置Redis连接信息
spring:
redis:
database: 0
port: 6379
password:
host: 127.0.0.1
jedis:
pool:
max-idle: 8
通过**@Bean配置RedisTemplate**
/**
* redis配置类
*/
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
/**
* retemplate相关配置
* @param factory
* @return
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
// 配置连接工厂
template.setConnectionFactory(factory);
//使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
// 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
// om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
jacksonSeial.setObjectMapper(om);
// 值采用json序列化
template.setValueSerializer(jacksonSeial);
//使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
// 设置hash key 和value序列化模式
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(jacksonSeial);
template.afterPropertiesSet();
return template;
}
}
创建自定义注解
然后我们创建自定义注解:
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyRedisLimiter {
/**
* Key的前缀
*/
String prefix() default "limiter:";
/**
* 给定的时间范围 单位(秒)
* 默认1秒 即1秒内超过count次的请求将会被限流
*/
int period() default 1;
/**
* 一定时间内最多访问的次数
*/
int count() default 2;
}
创建切面类RedisLimitAspect
大致逻辑是获取方法上的注解MyRedisLimiter,从注解上获取配置信息,组装keys和参数,然后调用RedisTemplate的execute方法获取当前时间内请求数,小于等于limitCount则不限流,否则限流降级处理。
通过
prefix:ip:api路径的作为key
,访问次数为value的方式对某一用户的某一请求进行唯一标识每次访问的时候判断 key 是否存在,是否 count 超过了限制的访问次数
若访问超出限制,则应 response 返回 msg:请求过于频繁 给前端予以展示
@Component
@Aspect
@Slf4j
public class RedisLimitAspect {
// lua 脚本路径
private static final String LIMIT_LUA_PATH = "limit.lua";
// lua 脚本对象 <返回类型>
private DefaultRedisScript<Long> redisScript;
@Autowired
RedisTemplate<String, Object> redisTemplate;
// 只有在 controller 包下 包含 MyRedisLimiter 才拦截
@Pointcut("execution(public * com.maxuan.controller..*(..)) && @annotation(com.maxuan.service.MyRedisLimiter) ")
public void pointCut() {
log.info("进入切点cut.....");
}
@Around("pointCut()")
public Object limit(ProceedingJoinPoint joinPoint) throws Throwable {
// 1. 获取方案签名,作为methodkey
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
// 2. 获取方法对象
Method method = signature.getMethod();
// 3. 获取注解对象
MyRedisLimiter annotation = method.getAnnotation(MyRedisLimiter.class);
// 4. 获取注解上的信息
// Redis前缀
String prefix = annotation.prefix();
// 限流时间
int period = annotation.period();
// 限流次数
int count = annotation.count();
// 5. 获取当前Request请求对象
// 获取 ip 地址 与 远程地址
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = attributes.getRequest();
String remoteAddr = request.getRemoteAddr();
String requestURI = request.getRequestURI();
// 6. key ---> prefix:ip:URI
String key = prefix+remoteAddr+":"+requestURI;
System.out.println(key);
Long result = redisTemplate.execute(
redisScript, // 脚本
Collections.singletonList(key), // key
count, // 限流次数
period // 限流时长
);
System.out.println(result);
// 如果超出限流
if(result!=null && result.equals(0L)){
// 降级 通过Sevlet写入远端
System.out.println("限流了");
demote(attributes.getResponse());
return null;
}
return joinPoint.proceed();
}
/**
* 降级策略
* @param response
*/
private void demote(HttpServletResponse response) {
log.info("try to access fail, this request will be demoted");
throw new RuntimeException("限流了");
// response.setHeader("Content-Type", "text/html;charset=UTF8");
// PrintWriter writer = null;
// try {
// writer = response.getWriter();
// writer.println("访问失败,请稍后再试...");
// writer.flush();
// } catch (Exception e) {
// e.printStackTrace();
// } finally {
// if (writer != null) {
// writer.close();
// }
// }
}
}
到这里,最关键的Lua是如何使用的还没讲到。我们可以看到上述代码调用limitRedisTemplate.execute参数的第一个是redisScript,这便是Redis用于执行Lua脚本的重要支持。
加载Lua脚本
在切面类中,我们可以通过初始化加载Lua脚本
,如下new ClassPathResource(LIMIT_LUA_PATH)
// lua 脚本路径
private static final String LIMIT_LUA_PATH = "limit.lua";
// lua 脚本对象 <返回类型>
private DefaultRedisScript<Long> redisScript;
@Autowired
RedisTemplate<String, Object> redisTemplate;
// 构造对象
@PostConstruct
public void init() {
redisScript = new DefaultRedisScript<>();
// 指定返回类型
redisScript.setResultType(Long.class);
try {
// 指定 lua 脚本
//探测资源是否存在
ClassPathResource classPathResource = new ClassPathResource(LIMIT_LUA_PATH);
classPathResource.getInputStream();
redisScript.setScriptSource(new ResourceScriptSource(classPathResource));
} catch (IOException e) {
log.error("未找到文件:{}", LIMIT_LUA_PATH);
}
}
我们传入常量limit.lua,这是classpath下创建的脚本文件
,Lua脚本如下,也很简单,就不在赘述。通常应该在limit.lua文件中放置脚本文件,这样如果需要修改脚本,仅需要修改文件重启即可。
-- 获取调用脚本时传入的第一个key值(用作限流的 key)
local key = KEYS[1]
-- 获取调用脚本时传入的第一个参数值(限流大小)
local limit = tonumber(ARGV[1])
-- 获取调用脚本时传入的第二个参数值(限流时长)
local time = tonumber(ARGV[2])
-- 获取当前流量大小
local curentLimit = tonumber(redis.call('get', key) or "0")
-- 是否超出限流
if curentLimit + 1 > limit then
-- 返回(拒绝)
return 0
else
-- 没有超出 value + 1
redis.call("INCRBY", key, 1)
-- 设置过期时间
redis.call("EXPIRE", key, time)
-- 返回(放行)
return 1
end
降级
然后在降级方法中写我们的降级逻辑
,通过抛异常或往HttpServletResponse写入返回信息都可以
。
/**
* 降级策略
* @param response
*/
private void demote(HttpServletResponse response) {
log.info("try to access fail, this request will be demoted");
throw new RuntimeException("限流了");
// response.setHeader("Content-Type", "text/html;charset=UTF8");
// PrintWriter writer = null;
// try {
// writer = response.getWriter();
// writer.println("访问失败,请稍后再试...");
// writer.flush();
// } catch (Exception e) {
// e.printStackTrace();
// } finally {
// if (writer != null) {
// writer.close();
// }
// }
}
测试
好了,准备工作都ok了,下面我们在controller接口上加上注解,测试一下。
@RestController
@Slf4j
@RequestMapping("access")
public class AccessController {
/**
* 接口 + 拦截器 实现
* @return
*/
@ResponseBody
@GetMapping("accessLimit")
@AccessLimit(seconds = 60,maxCount = 10)
public String accessLimit(){
return "it is ok!";
}
/**
* 基于 Guava RateLimiter客户端限流
*/
//每秒钟可以创建两个令牌
RateLimiter limiter = RateLimiter.create(2.0);
//非阻塞限流
@GetMapping("/tryAcquire")
public String tryAcquire(Integer count){
//count 每次消耗的令牌
System.out.println(limiter.getRate());
if(limiter.tryAcquire(count)){
log.info("成功,允许通过,速率为{}",limiter.getRate());
return "success";
}else{
log.info("错误,不允许通过,速率为{}",limiter.getRate());
return "fail";
}
}
//限定时间的非阻塞限流
@GetMapping("/tryAcquireWithTimeout")
public String tryAcquireWithTimeout(Integer count, Integer timeout){
//count 每次消耗的令牌 timeout 超时等待的时间
if(limiter.tryAcquire(count,timeout, TimeUnit.SECONDS)){
log.info("成功,允许通过,速率为{}",limiter.getRate());
return "success";
}else{
log.info("错误,不允许通过,速率为{}",limiter.getRate());
return "fail";
}
}
//同步阻塞限流
@GetMapping("/acquire")
public String acquire(Integer count){
limiter.acquire(count);
log.info("成功,允许通过,速率为{}",limiter.getRate());
return "success";
}
/**
* AOP + Redis + Lua 实现限流
*/
@MyRedisLimiter(count = 1)
@GetMapping("/redisLimit")
public String limits(){
return "success";
}
@MyRedisLimiter(count = 2,period = 100)
@GetMapping("/redisLimit2")
public String limits2(){
return "success";
}
}
接口限制每秒2个请求,我们使用jmeter1秒发10个请求
结果只有前两个成功了
(上述降级采用的直接抛异常,方便在这里看到限流时下面时红色的)
总结
springboot + aop + Lua 限流实现是比较简单的,旨在让大家认识下什么是限流?
如何做一个简单的限流功能,面试要知道这是个什么东西。上面虽然说了几种实现限流的方案,但选哪种还要结合具体的业务场景,不能为了用而用。在真正的场景里,不止设置一种限流规则,而是会设置多个限流规则共同作用,如连接数、访问频率、黑白名单、传输速率等。
标签:count,令牌,return,RateLimiter,--,Redis,限流,key From: https://blog.51cto.com/panyujie/6096570