1 为什么需要限流
限流,也称流量控制。是指系统在面临高并发,或者大流量请求的情况下,限制新的请求对系统的访问,从而保证系统的稳定性。限流会导致部分用户请求处理不及时或者被拒,这就影响了用户体验。所以一般需要在系统稳定和用户体验之间平衡一下。
举个生活的例子:
比如我们的交通拥堵,交管部门为了缓解交通拥堵,通常会采用限号(单双号)或者限行(高峰时间段不允许大货车或者外地车牌通行),如果不采取这些手段,势必会造成交通更加拥堵,最终导致我们的交通道路瘫痪。
2 常见的限流算法
2.1 固定窗口限流算法
首先维护一个计数器,将单位时间段当做一个窗口,计数器记录这个窗口接收请求的次数。当次数少于限流阀值,就允许访问,并且计数器+1,当次数大于限流阀值,就拒绝访问。当前的时间窗口过去之后,计数器清零。
假设单位时间是1秒,限流阀值为3。在单位时间1秒内,每来一个请求,计数器就加1,如果计数器累加的次数超过限流阀值3,后续的请求全部拒绝。等到1s结束后,计数器清0,重新开始计数。如下图:
但是,这种算法有一个很明显的临界问题:假设限流阀值为5个请求,单位时间窗口是1s,如果我们在单位时间内的前0.8-1s和1-1.2s,分别并发5个请求。虽然都没有超过阀值,但是如果算0.8-1.2s,则并发数高达10,已经超过单位时间1s不超过5阀值的定义啦。
代码实现:
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 限流实现-固定时间窗口
* @author xfenggeng
* @date 2024-08-07 15:16
*/
public class FixedWindowRateLimiter {
// 阈值(每个时间窗口内,限制数量)
private int limit;
// 固定时间窗口(毫秒)
private long interval;
// 开始时间
private long startTime;
// 计数器
private AtomicInteger count;
public FixedWindowRateLimiter(int limit, long interval){
this.limit = limit;
this.interval = interval;
this.count = new AtomicInteger();
this.startTime = System.currentTimeMillis();
}
/**
* 是否限流
* @return
*/
public synchronized boolean tryAcquire() {
long nowTime = System.currentTimeMillis();
if ((nowTime - startTime) > interval) {
startTime = nowTime;
count.set(0);
}
return count.incrementAndGet() <= limit;
}
/**
* 测试
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
FixedWindowRateLimiter rateLimiter = new FixedWindowRateLimiter(5, 1000);
for (int i = 0; i < 10; i++) {
Thread.sleep(100);
Date now = new Date();
if (rateLimiter.tryAcquire()) {
// TODO 处理业务
System.out.println(now + " 处理业务...");
} else {
System.out.println(now + " 系统繁忙,请稍后重试...");
}
}
}
}
2.2 滑动窗口限流算法
滑动窗口限流解决固定窗口临界值的问题。它将单位时间周期分为n个小周期,分别记录每个小周期内接口的访问次数,并且根据时间滑动删除过期的小周期。
一张图解释滑动窗口的算法,如下:
假设单位时间还是1s,滑动窗口算法把它划分为5个小周期,也就是滑动窗口(单位时间)被划分为5个小格子。每格表示0.2s。每过0.2s,时间窗口就会往右滑动一格。然后呢,每个小周期,都有自己独立的计数器,如果请求是0.83s到达的,0.8~1.0s对应的计数器就会加1。
我们来看下滑动窗口是如何解决临界问题的?
假设我们1s内的限流阀值还是5个请求,0.8~1.0内(比如0.9s的时候)来了5个请求,落在黄色格子里。时间过了1.0s之后,又来5个请求,落在紫色格子里。如果是固定窗口算法,是不会限流的,但是滑动窗口的话,每过一个小周期,它会右移一个小格。过了1.0s后,会右移一小格,当前的单位时间段是0.2~1.2s,这个区域的请求已经超过限定的5了,已触发限流啦,实际上,紫色格子的请求都被拒绝啦。
TIPS: 当滑动窗口的格子周期划分得越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。
代码实现:
import lombok.Data;
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 限流实现-滑动时间窗口
* @author xfenggeng
* @date 2024-08-07 15:16
*/
public class SlidingWindowRateLimiter {
/**
* 阈值(每个时间窗口内,限制数量)
*/
private int limit;
/**
* 固定时间窗口(毫秒)
*/
private long interval;
/**
* 多少个子窗口
*/
private int windowCount = 10;
/**
* 窗口列表
*/
private WindowInfo[] windowArray = new WindowInfo[windowCount];
public SlidingWindowRateLimiter(int qps, long timeWindows) {
this.limit = qps;
this.interval = timeWindows;
long currentTimeMillis = System.currentTimeMillis();
for (int i = 0; i < windowArray.length; i++) {
windowArray[i] = new WindowInfo(currentTimeMillis, new AtomicInteger(0));
}
}
/**
* 1. 计算当前时间窗口
* 2. 更新当前窗口计数 & 重置过期窗口计数
* 3. 当前 QPS 是否超过限制
*
* @return
*/
public synchronized boolean tryAcquire() {
long currentTimeMillis = System.currentTimeMillis();
// 1. 计算当前时间窗口
int currentIndex = (int)(currentTimeMillis % interval / (interval / windowCount));
// 2. 更新当前窗口计数 & 重置过期窗口计数
int sum = 0;
for (int i = 0; i < windowArray.length; i++) {
WindowInfo windowInfo = windowArray[i];
if ((currentTimeMillis - windowInfo.getTime()) > interval) {
windowInfo.getNumber().set(0);
windowInfo.setTime(currentTimeMillis);
}
if (currentIndex == i && windowInfo.getNumber().get() < limit) {
windowInfo.getNumber().incrementAndGet();
}
sum = sum + windowInfo.getNumber().get();
}
// 3. 当前 limit 是否超过限制
return sum <= limit;
}
public static void main(String[] args) throws InterruptedException {
int limit = 5, count = 10, sleep = 100, success = count * sleep / 1000 * limit;
System.out.println(String.format("当前limit限制为:%d,当前测试次数:%d,间隔:%dms,预计成功次数:%d", limit, count, sleep, success));
success = 0;
SlidingWindowRateLimiter rateLimiter = new SlidingWindowRateLimiter(limit, 1000);
for (int i = 0; i < count; i++) {
Thread.sleep(sleep);
Date now = new Date();
if (rateLimiter.tryAcquire()) {
success++;
if (success % limit == 0) {
System.out.println(now + ": success, ");
} else {
System.out.println(now + ": success, ");
}
} else {
System.out.println(now + ": fail");
}
}
System.out.println("实际测试成功次数:" + success);
}
@Data
class WindowInfo {
// 窗口开始时间
private Long time;
// 计数器
private AtomicInteger number;
public WindowInfo(long time, AtomicInteger number) {
this.time = time;
this.number = number;
}
}
}
2.3 漏桶算法
漏桶算法的原理很简单,可以认为就是注水漏水的过程。往漏桶中以任意速率流入水,以固定的速率流出水。当水超过桶的容量时,会被溢出,也就是被丢弃。因为桶的容量是不变的,保证了整体的速率。
代码实现:
import lombok.Data;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* 限流实现-漏桶
* @author xfenggeng
* @date 2024-08-10 12:46
*/
@Data
public class LeakBucketLimiter {
//桶的大小
private long capacity;
//流出速率,每秒流出数
private long rate;
//开始时间
private long startTime;
//桶中剩余的水
private AtomicLong water;
public LeakBucketLimiter(long capacity, long rate){
this.capacity = capacity;
this.rate = rate;
this.startTime = System.currentTimeMillis();
this.water = new AtomicLong();
}
/**
* true 代表放行,请求可已通过
* false 代表限制,不让请求通过
*/
public synchronized boolean tryAcquire() {
//如果桶的余量问0,直接放行
if (water.get() == 0) {
startTime = System.currentTimeMillis();
water.set(1);
return true;
}
//计算从当前时间到开始时间流出的水,和现在桶中剩余的水
//桶中剩余的水
water.set(water.get() - (System.currentTimeMillis() - startTime) / 1000 * rate);
//防止出现<0的情况
water.set(Math.max(0, water.get()));
//设置新的开始时间
startTime += (System.currentTimeMillis() - startTime) / 1000 * 1000;
//如果当前水小于容量,表示可以放行
if (water.get() < capacity) {
water.incrementAndGet();
return true;
} else {
return false;
}
}
// 测试
public static void main(String[] args) throws InterruptedException {
// 请求数
final int threads = 10;
// 初始化漏桶
LeakBucketLimiter leakBucketLimiter = new LeakBucketLimiter(5,1);
// 被限制的次数
AtomicInteger limited = new AtomicInteger(0);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
if (leakBucketLimiter.tryAcquire()) {
// TODO 处理业务
System.out.println("处理业务...");
} else {
// 被限制的次数累积
limited.getAndIncrement();
System.out.println("系统繁忙,请稍后重试...");
}
Thread.sleep(200);
}
float time = (System.currentTimeMillis() - start) / 1000F;
//输出统计结果
System.out.println("限制的次数为:" + limited.get() + ",通过的次数为:" + (threads-limited.get()));
System.out.println("限制的比例为:" + (float) limited.get() / (float)threads);
System.out.println("运行的时长为:" + time + "s");
}
}
2.4 令牌桶算法
令牌桶算法的核心思想是以固定速率向桶中添加令牌,请求在消费令牌时进行控制。当桶中令牌充足时,请求可以被执行;当令牌不足时,请求会被限制。令牌桶算法的优势在于允许突发流量,同时限制了流量的最大速率。
代码实现:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* 限流实现-令牌桶
* @author xfenggeng
* @date 2024-08-10 12:46
*/
public class TokenBucketLimiter {
//桶的容量
private long capacity;
//放入令牌的速率,单位秒
private long rate;
//上次放置令牌的时间
private long lastTime;
//桶中令牌的余量
private AtomicLong tokenNum;
public TokenBucketLimiter(long capacity, long rate){
this.capacity = capacity;
this.rate = rate;
this.lastTime = System.currentTimeMillis();
this.tokenNum = new AtomicLong();
}
/**
* true 代表放行,请求可已通过
* false 代表限制,不让请求通过
*/
public synchronized boolean tryAcquire() {
//更新桶中剩余令牌的数量
long now = System.currentTimeMillis();
tokenNum.addAndGet((now - lastTime) / 1000 * rate);
tokenNum.set(Math.min(capacity, tokenNum.get()));
//更新时间
lastTime += (now - lastTime) / 1000 * 1000;
//桶中还有令牌就放行
if (tokenNum.get() > 0) {
tokenNum.decrementAndGet();
return true;
} else {
return false;
}
}
//测试
public static void main(String[] args) throws InterruptedException {
// 线程数
int threads = 10;
// 初始化令牌桶
TokenBucketLimiter tokenBucketLimiter = new TokenBucketLimiter(10,1);
// 被限制的次数
AtomicInteger limited = new AtomicInteger(0);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
Thread.sleep(200);
if (tokenBucketLimiter.tryAcquire()) {
// TODO 处理业务
System.out.println("处理业务...");
} else {
// 被限制的次数累计
limited.getAndIncrement();
System.out.println("系统繁忙,请稍后重试...");
}
}
float time = (System.currentTimeMillis() - start) / 1000F;
//输出统计结果
System.out.println("限制的次数为:" + limited.get() + ",通过的次数为:" + (threads - limited.get()));
System.out.println("限制的比例为:" + (float) limited.get() / (float) threads);
System.out.println("运行的时长为:" + time + "s");
}
}
腾讯微服务框架(TSF),使用的限流算法就是令牌桶方式,下面是TSF具体的流控中心设计:
3 其他限流工具
比如 Guava 的限流工具包,不过它毕竟是单机的,开源社区中也有很多分布式限流工具,如阿里开源的 Sentinel 就是不错的工具,Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。以及redis+lua脚本实现的分布式限流。
使用Guava的RateLimiter实现令牌桶算法限流:
Sentinel集群限流:
https://sentinelguard.io/zh-cn/docs/cluster-flow-control.html
基于Redis+Lua实现分布式限流组件:
创作不易,记得关注+点赞+收藏 ^_^
标签:令牌,窗口,TSF,long,currentTimeMillis,限流,private,import From: https://blog.csdn.net/drea_mer/article/details/140988845