文章目录
前言
前序文章介绍了常见的限流算法,包括滑动窗口,并且简单地进行了实现。对于更复杂的场景,我们需要更加深度的去实现滑动窗口,本文将以sentinel的滑动窗口为例,一起探索滑动窗口的强大之处
sentinel的调用链路图
- NodeSelectorSlot:这个 slot 主要负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级。
- ClusterBuilderSlot:此插槽用于构建资源的 ClusterNode 以及调用来源节点。ClusterNode 保持资源运行统计信息(响应时间、QPS、block 数目、线程数、异常数等)以及原始调用者统计信息列表。
- StatisticSlot:Sentinel 的核心功能插槽之一,用于统计实时的调用数据。
真正进行统计的是StatisticSlot
一、StatisticSlot
Sentinel 的核心功能插槽之一,用于统计实时的调用数据。Sentinel 底层采用高性能的滑动窗口数据结构 LeapArray 来统计实时的秒级指标数据,可以很好地支撑写多于读的高并发场景。
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
try {
// 触发下一个Slot的entry方法
fireEntry(context, resourceWrapper, node, count, args);
// 如果能通过SlotChain中后面的Slot的entry方法,说明没有被限流或降级
// 统计信息
node.increaseThreadNum();
node.addPassRequest();
// 省略...
} catch (BlockException e) {
context.getCurEntry().setError(e);
// Add block count.
node.increaseBlockedQps();
// 省略...
throw e;
} catch (Throwable e) {
context.getCurEntry().setError(e);
// Should not happen
node.increaseExceptionQps();
// 省略...
throw e;
}
}
简单的来说,StatisticSlot中就是做了三件事:
- 通过node中的当前的实时统计指标信息进行规则校验
- 如果通过了校验,则重新更新node中的实时指标数据
- 如果被block或出现了异常了,则重新更新node中block的指标或异常指标
可以看出所有的实时指标的统计围绕着node,跟进代码发现node为StatisticNode,接下来探究StatisticNode是一个什么样的存在
1、StatisticNode
StatisticNode使用了滑动窗口算法统计指标,在DefaultController中被调用,作为默认的一种限流方式。(com.alibaba.csp.sentinel.slots.block.flow.TrafficShapingController是Sentinel的流量控制算法类,DefaultController是它的一个实现)
public class StatisticNode implements Node {
// 分钟级滑动窗口,SampleCountProperty.SAMPLE_COUNT = 2,IntervalProperty.INTERVAL = 1000
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
// 秒级滑动窗口:保存最近 60 秒的统计信息
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
}
不管是分钟级,还是秒级的滑动窗口,都是创建了一个 ArrayMetric 对象,该对象为 Sentinel 滑动窗口核心实现类
和ArrayMetric 关联的对象为
- Metric:定义指标收集API,顶层结构
- ArrayMetric:滑动窗口核心实现类
- LeapArray:滑动窗口低层数据结构
- MetricBucket:指标桶
2、Metric
public interface Metric extends DebugSupport {
// 获取 成功个数
long success();
// 获取 最大成功个数
long maxSuccess();
// 获取 异常个数
long exception();
// 获取 阻塞个数
long block();
// 获取 通过个数,不包括 occupiedPass
long pass();
// 获取 总响应时间
long rt();
// 获取 最小响应时间
long minRt();
// 获取所有资源的 MetricNode List
List<MetricNode> details();
// 获取满足timePredicate条件的资源 MetricNode List
List<MetricNode> detailsOnCondition(Predicate<Long> timePredicate);
// 获取 窗口数组
MetricBucket[] windows();
// add
void addException(int n);
void addBlock(int n);
void addSuccess(int n);
void addPass(int n);
void addRT(long rt);
// 以秒为单位获取滑动窗口长度
double getWindowIntervalInSec();
// 获取滑动窗口样本个数
int getSampleCount();
// 获取 timeMillis 时间内,有效窗口的通过个数
long getWindowPass(long timeMillis);
// 添加占用通行证,表示借用后一个窗口令牌的通行证请求。
void addOccupiedPass(int acquireCount);
// 添加funtureTime 时间窗口的占用通行证,表示借用funtureTime 窗口令牌的通行证请求。
void addWaiting(long futureTime, int acquireCount);
// 获取 总占用通行证个数
long waiting();
// 获取占用的通行证计数
long occupiedPass();
// 获取上一个窗口阻塞个数
long previousWindowBlock();
// 获取上一个窗口通过个数
long previousWindowPass();
}
3、ArrayMetric
ArrayMetric用来统计滑动窗口中的各种数据,比如已经通过的请求数,被限流的请求数、向当前窗口中增加通过的请求数等等
public class ArrayMetric implements Metric {
// LeapArray:滑动窗口顶层数据结构,包含一个一个的窗口数据。
// MetricBucket:指标桶
private final LeapArray<MetricBucket> data;
/**
* @param sampleCount 在一个采集间隔中抽样的个数
* @param intervalInMs 采集的时间间隔(毫秒)
*/
public ArrayMetric(int sampleCount, int intervalInMs) {
// 创建一个可占用的LeapArray
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
/**
* @param sampleCount 在一个采集间隔中抽样的个数
* @param intervalInMs 采集的时间间隔(毫秒)
* @param enableOccupy 是否允许抢占,即当前时间戳已经达到限制后,是否可以占用下一个时间窗口的容量
*/
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
// 创建一个可占用的LeapArray
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
// 创建一个普通的LeapArray
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
/**
* For unit test.
*/
public ArrayMetric(LeapArray<MetricBucket> array) {
this.data = array;
}
@Override
public long pass() {
// 更新最新的时间窗口
data.currentWindow();
long pass = 0;
// 得到所有的统计窗口
List<MetricBucket> list = data.values();
// 每个窗口中的统计量累加起来
for (MetricBucket window : list) {
pass += window.pass();
}
return pass;
}
@Override
public void addPass(int count) {
// 向当前时间窗口中增加一个请求数量 这个方法会在StatisticSlot 统计qps的时候使用到
// 时间窗口每个bucket都被WindowWrap包装了下,而且一个MetricBucket 里面可以统计好多维度的数据,使用MetricEvent区分的。
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
}
4、LeapArray
构造方法都创建一个LeapArray data,可以知道它的核心依赖于LeapArray,
public abstract class LeapArray<T> {
// 滑动时间窗口每个bucket的时间长度
protected int windowLengthInMs;
// 滑动时间窗口 一共有多少个bucket
protected int sampleCount;
// 滑动时间窗口 总的时间窗口 单位 毫秒
protected int intervalInMs;
// 滑动时间窗口 总的时间窗口 单位 秒
private double intervalInSecond;
// 每个时间窗口bucket 的存储实例WindowWrap array 就相当于是整个滑动时间窗口
protected final AtomicReferenceArray<WindowWrap<T>> array;
/**
* The conditional (predicate) update lock is used only when current bucket is deprecated.
* 可重入锁,更新滑动时间窗口的时候使用
*/
private final ReentrantLock updateLock = new ReentrantLock();
/**
* The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.
*
* @param sampleCount bucket count of the sliding window
* @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds
*/
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
/**
* Get the bucket at current timestamp.
* 计算当前时间的bucket实例 每次请求过来都会计算处于那个bucket位置
* @return the bucket at current timestamp
*/
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
/**
* Create a new statistic value for bucket.
* 创建一个新的MetricBucket
*
* @param timeMillis current time in milliseconds
* @return the new empty bucket
*
*/
public abstract T newEmptyBucket(long timeMillis);
/**
* Reset given bucket to provided start time and reset the value.
* 重置Window
*
* @param startTime the start time of the bucket in milliseconds
* @param windowWrap current bucket
* @return new clean bucket at given start time
*/
protected abstract WindowWrap<T> resetWindowTo(WindowWrap<T> windowWrap, long startTime);
// 计算当前时间处于滑动时间窗口数组中的索引位置
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int)(timeId % array.length());
}
// 计算时间窗口bucket的起始时间
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
/**
* Get bucket item at provided timestamp.
* 获取 / 创建 timeMillis 时间下的 WindowWrap 对象
*
* @param timeMillis a valid timestamp in milliseconds
* @return current bucket item at provided timestamp if the time is valid; null if time is invalid
*/
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 计算当前时间 在滑动时间窗口array中的索引位置
int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
// 计算当前时间在时间窗口bucket中的开始时间
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
*/
while (true) {
// 根据当前时间计算的bucket 索引值 在array的数据
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
* 这个图解就很好理解了,比如当前时间计算的bucket 所在的位置在上面的800~1000之间的时候,array是空的,
* 就新建一个时间窗口bucket,WindowWrap 通过cas更新到array中 如果cas失败了就让出时间片
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// 释放时间片
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
* 如果当前时间计算出来的索引位置已经有了WindowWrap bucket 而且存在的bucket的开始时间和当前计算的开始相等,
* 就返回已经存在的这个WindowWrap,在StatisticSlot增加请求数量的时候就会使用这个bucket 中的请求数量进行累加
*/
return old;
} else if (windowStart > old.windowStart()) {
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
* 这个时候就表明 时间窗口要向前滑动了 就是把存在的时间窗口内容进行重置
* 重置包括开始时间更新、窗口内的计数清零, 使用加锁操作
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
// 具体逻辑在的子类 OccuiableBucketLeapArray中
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// 释放时间片
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 理论上不会出现 窗口创建后的时间会小于窗口创建前的时间
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
public List<T> values() {
return values(TimeUtil.currentTimeMillis());
}
// 把当前时间窗口中的bucket WindowWrap 都返回出去 用来统计时间窗口总的请求数量
public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
result.add(windowWrap.value());
}
return result;
}
从结构上看,这个滑动窗口是一个环形数组,计算到新的一轮窗口时间的时候,我们就回去重置或者新建一个窗口
统计数量的时候,只需要统计环形数组里面的所有数量即可
// 计算当前时间 在滑动时间窗口array中的索引位置
int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
// 计算当前时间在时间窗口bucket中的开始时间
long windowStart = calculateWindowStart(timeMillis);
5、WindowWrap
每个WindowWrap对象都代表了一个时间窗口,它记录了该窗口的起始时间、窗口长度以及窗口内的统计数据
public class WindowWrap<T> {
// 窗口时间长度
private final long windowLengthInMs;
// 窗口开始时间
private long windowStart;
private T value;
public WindowWrap(long windowLengthInMs, long windowStart, T value) {
this.windowLengthInMs = windowLengthInMs;
this.windowStart = windowStart;
this.value = value;
}
/**
* Reset start timestamp of current bucket to provided time.
*把当前窗口的开始时间设置成传入的时间参数
*/
public WindowWrap<T> resetTo(long startTime) {
this.windowStart = startTime;
return this;
}
/**
* Check whether given timestamp is in current bucket.
*检查当前的时刻是否还属于当前的窗口
*/
public boolean isTimeInWindow(long timeMillis) {
return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs;
}
//省略...
}
6、MetricBucket
MetricBucket是Sentinel中用于存储单个时间窗口内各项统计数据的结构。它通常包含多个计数器,用于记录不同类型的请求事件,如MetricEvent.PASS表示成功通过的请求数,MetricEvent.BLOCK表示被阻塞的请求数等
public class MetricBucket {
// 可以存储自己想统计的数据维度 LongAdder是比Atomic类性能更高的类
private final LongAdder[] counters;
private volatile long minRt;
public MetricBucket() {
// 用一个枚举值来记录想统计的数据维度
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
// 数据中不同的位置表示不同的数据维度
counters[event.ordinal()] = new LongAdder();
}
initMinRt();
}
public MetricBucket reset(MetricBucket bucket) {
for (MetricEvent event : MetricEvent.values()) {
counters[event.ordinal()].reset();
counters[event.ordinal()].add(bucket.get(event));
}
initMinRt();
return this;
}
private void initMinRt() {
this.minRt = SentinelConfig.statisticMaxRt();
}
/**
* Reset the adders.
*
* @return new metric bucket in initial state
*/
public MetricBucket reset() {
for (MetricEvent event : MetricEvent.values()) {
counters[event.ordinal()].reset();
}
initMinRt();
return this;
}
public long get(MetricEvent event) {
return counters[event.ordinal()].sum();
}
public long pass() {
return get(MetricEvent.PASS);
}
public void addPass(int n) {
// 时间窗口bucket 中也是有一个 LongAdder[] counter来存储计数统计的
// 因为可以统计不同维度的数据,比如这里就是统计PASS的数量
// LongAdder,它的效率比Atomic类的性能更好些
add(MetricEvent.PASS, n);
}
public MetricBucket add(MetricEvent event, long n) {
// event.ordinal() 就是获取枚举值在枚举类中的位置,就是索引值 这个操作是cas的 性能更好些。
counters[event.ordinal()].add(n);
return this;
}
// 省略部分方法
}
LongAdder它的效率比Atomic类的性能更好些,它将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样数据冲突就减少了。如果要获取真正的long值,只要将各个槽中的变量值累加返回。空间换时间的一个很好例子。
二、FlowSlot
用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制,通过或者拒绝请求
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
private final FlowRuleChecker checker;
public FlowSlot() {
this(new FlowRuleChecker());
}
/**
* Package-private for test.
*
* @param checker flow rule checker
* @since 1.6.1
*/
FlowSlot(FlowRuleChecker checker) {
AssertUtil.notNull(checker, "flow checker should not be null");
this.checker = checker;
}
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
// 流量检查
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
@Override
public Collection<FlowRule> apply(String resource) {
// Flow rule map should not be null.
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
return flowRules.get(resource);
}
};
}
1、FlowRuleChecker
- 规则检查:FlowRuleChecker 会根据配置的 FlowRule(流控规则)对进入系统的请求进行实时检查,判断请求是否超过了预设的阈值。
- 流量控制:如果请求超过了流控规则设定的阈值,FlowRuleChecker 会拒绝该请求,并可能触发熔断降级等后续操作,以保护系统不被过量的请求压垮。
- 支持多种流控模式:FlowRuleChecker 支持直接流控、关联流控、链路流控等多种流控模式,可以根据不同的业务场景和需求进行灵活配置。
public class FlowRuleChecker {
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
// 如果被限流 抛出异常
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node,
int acquireCount) {
return canPassCheck(rule, context, node, acquireCount, false);
}
// canPassCheck 方法返回 true 说明允许请求通过,反之则不允许通过,
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
// 其中check分为集群模式和单体模式
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
// 根据流控策略找到对应的实时统计信息(Node)
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
// rule.getRater() 获得 TrafficShapingController 的具体实现类
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
/**
* 限流规则中的 limitApp 字段用于根据调用方进行流量控制。该字段的值有以下三种选项,分别对应不同的场景:
*
*{some_origin_name}:表示针对特定的调用者,只有来自这个调用者的请求才会进行流量控制。例如 NodeA 配置了一条针对调用者caller1的规则,
* 那么当且仅当来自 caller1 对 NodeA 的请求才会触发流量控制。
* default:表示不区分调用者,来自任何调用者的请求都将进行限流统计。如果这个资源名的调用总和超过了这条规则定义的阈值,则触发限流。
*
* other:表示针对除 {some_origin_name} 以外的其余调用方的流量进行流量控制。例如,资源NodeA配置了一条针对调用者 caller1 的限流规则,
* 同时又配置了一条调用者为 other 的规则,那么任意来自非 caller1 对 NodeA 的调用,都不能超过 other 这条规则定义的阈值。
*
* 同一个资源名可以配置多条规则,规则的生效顺序为:{some_origin_name} > other > default
*/
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
String limitApp = rule.getLimitApp();
int strategy = rule.getStrategy();
// 调用来源, ContextUtil.enter(res,origin) 时传入
String origin = context.getOrigin();
if (limitApp.equals(origin) && filterOrigin(origin)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Matches limit origin, return origin statistic node.
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// 直接返回 cluster node.
return node.getClusterNode();
}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
}
return null;
}
static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
String refResource = rule.getRefResource();
int strategy = rule.getStrategy();
if (StringUtil.isEmpty(refResource)) {
return null;
}
if (strategy == RuleConstant.STRATEGY_RELATE) {
return ClusterBuilderSlot.getClusterNode(refResource);
}
if (strategy == RuleConstant.STRATEGY_CHAIN) {
if (!refResource.equals(context.getName())) {
return null;
}
return node;
}
// No node.
return null;
}
private static boolean filterOrigin(String origin) {
// origin 不能为 default 和 other
return !RuleConstant.LIMIT_APP_DEFAULT.equals(origin) && !RuleConstant.LIMIT_APP_OTHER.equals(origin);
}
}
2、DefaultController
在DefaultController中,首先获取当前的线程数或者QPS数,如果当前的线程数或者QPS+申请的数量>配置的总数,则不通过,如果当前线程数或者QPS+申请的数量<=配置的总数,则直接通过
public class DefaultController implements TrafficShapingController {
private static final int DEFAULT_AVG_USED_TOKENS = 0;
private double count;
private int grade;
public DefaultController(double count, int grade) {
this.count = count;
this.grade = grade;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//获取当前node节点的线程数或者请求的qps总数
int curCount = avgUsedTokens(node);
//当前请求数+申请总数是否>该资源配置的总数
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
//获取当前node节点的线程数或者请求的qps总数
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
至此,滑动窗口算法实现限流控制的全流程就展示出来了
总结
Sentinel的滑动窗口算法广泛应用于各种需要流量控制的场景,如微服务架构中的服务调用限流、API接口的访问频率限制等。通过合理配置滑动窗口的大小和分割粒度,可以实现对不同场景下的流量进行精细控制和管理。
标签:node,return,int,bucket,public,算法,Sentinel,滑动,窗口 From: https://blog.csdn.net/weixin_44550507/article/details/140783406