首页 > 编程语言 >老生常谈——分布式限流:部分Sentinal源码解读

老生常谈——分布式限流:部分Sentinal源码解读

时间:2024-12-21 14:19:31浏览次数:11  
标签:count node int public resourceWrapper 限流 Sentinal context 源码

  1. 基础知识

HTTP CODE = 429 “请求过多”

A. 限流的类型

  • 服务端

  • 客户端

限流的标的

  • IP

  • 用户

  • ...

基本要求

  • 准确限制过量的请求。

  • 低延时。限流器不能拖慢HTTP响应时间。

  • 尽量占用较少的内存。

  • 这是一个分布式限流器,可以在多个服务器或者进程之间共享。

  • 需要处理异常。当用户的请求被拦截时,给用户展示明确的异常信息。

  • 高容错性。如果限流器出现任何问题(比如某个缓存服务器宕机),不能影响整个系统。

  1. 限流算法

A. 漏桶算法(Leaking Bucket)

基本原理

当一个请求到达时,系统先检查桶是否已满。如果没有,就将请求添加到队列中。否则,丢弃请求。定期从队列中取出请求并进行处理。(控制消费的速率)

B. 代币桶算法(Token Bucket)

基本原理

代币桶是一个有预定义容量的容器。代币按照预定的速率被放入桶中。一旦桶被装满,就不再往里面添加代币。如如果桶满了,多出来的代币就会溢出。(控制进入的速率)

Sentinal实现
// 这个类及其相关的类是近一年才被加入到Sentinal中的,在主流程中未看到直接的调用

class TokenBucket {

    private final long maxTokens;

    private final long intervalMillis;

    private volatile long nextUpdate;

    private AtomicLong tokens;

    public TokenBucket(long maxTokens, long intervalMillis) {
        if (maxTokens <= 0) {
            throw new IllegalArgumentException("maxTokens should > 0, but given: " + maxTokens);
        }
        if (intervalMillis < 1000) {
            throw new IllegalArgumentException("intervalMillis should be at least 1000, but given: " + intervalMillis);
        }
        this.maxTokens = maxTokens;
        this.intervalMillis = intervalMillis;
        this.nextUpdate = System.currentTimeMillis() / 1000 * 1000 + intervalMillis;
        //第一次全量注入token
        this.tokens = new AtomicLong(maxTokens);
    }

    public boolean accept(long now) {
        long currTokens;
        // 到期自动全量注入token
        if (now > nextUpdate) {
            currTokens = tokens.get();
            if (tokens.compareAndSet(currTokens, maxTokens)) {
                nextUpdate = System.currentTimeMillis() / 1000 * 1000 + intervalMillis;
            }
        }

        // 尝试获取token
        do {
            currTokens = tokens.get();
        } while (currTokens > 0 && !tokens.compareAndSet(currTokens, currTokens - 1));

        return currTokens > 0;
    }
}
优点

算法容易实现。

内存的使用效率高。允许在很短时间内出现突发流量。

只要还有代币,请求就可以通过。

C. 固定窗口计数器算法(Fixed Window Counter)

顾名思义,问题在于,请求如果集中在某个窗口的两侧则可能会出现溢出拒绝。例如每分钟限流5个请求,窗口边界从00->59,则若第01s进入5个请求,第58s进入5个请求,后五个请求将被拒绝。

D. 滑动窗口(LeapArray - Sentinal)

LeapArray提供窗口操作的核心API,结合不同的Bucket(提供统计能力)泛型实现可以演变出不同的子类

// com.alibaba.csp.sentinel.slots.statistic.base.LeapArray -> 提供了窗口相关的核心api


public abstract class LeapArray<T> {

    protected int windowLengthInMs;
    protected int sampleCount;
    protected int intervalInMs;
    private double intervalInSecond;

    protected final AtomicReferenceArray<WindowWrap<T>> array;

    /**
     * The conditional (predicate) update lock is used only when current bucket is deprecated.
     */
    private final ReentrantLock updateLock = new ReentrantLock();

    /**
     * The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.
     *
     * @param sampleCount  bucket count of the sliding window
     * @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds
     */
    public LeapArray(int sampleCount, int intervalInMs) {
        AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
        AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
        AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");

        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.intervalInSecond = intervalInMs / 1000.0;
        this.sampleCount = sampleCount;

        this.array = new AtomicReferenceArray<>(sampleCount);
    }

    /**
     * Get the bucket at current timestamp.
     *
     * @return the bucket at current timestamp
     */
    public WindowWrap<T> currentWindow() {
        return currentWindow(TimeUtil.currentTimeMillis());
    }

    /**
     * Create a new statistic value for bucket.
     *
     * @param timeMillis current time in milliseconds
     * @return the new empty bucket
     */
    public abstract T newEmptyBucket(long timeMillis);

    /**
     * Reset given bucket to provided start time and reset the value.
     *
     * @param startTime  the start time of the bucket in milliseconds
     * @param windowWrap current bucket
     * @return new clean bucket at given start time
     */
    protected abstract WindowWrap<T> resetWindowTo(WindowWrap<T> windowWrap, long startTime);

    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
        long timeId = timeMillis / windowLengthInMs;
        // Calculate current index so we can map the timestamp to the leap array.
        return (int)(timeId % array.length());
    }

    protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
        return timeMillis - timeMillis % windowLengthInMs;
    }

    /**
     * Get bucket item at provided timestamp.
     *
     * @param timeMillis a valid timestamp in milliseconds
     * @return current bucket item at provided timestamp if the time is valid; null if time is invalid
     */
    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }

        int idx = calculateTimeIdx(timeMillis);
        // Calculate current bucket start time.
        long windowStart = calculateWindowStart(timeMillis);

        /*
         * Get bucket item at given time from the array.
         *
         * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
         * (2) Bucket is up-to-date, then just return the bucket.
         * (3) Bucket is deprecated, then reset current bucket.
         */
        while (true) {
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                /*
                 *     B0       B1      B2    NULL      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            bucket is empty, so create new and update
                 *
                 * If the old bucket is absent, then we create a new bucket at {@code windowStart},
                 * then try to update circular array via a CAS operation. Only one thread can
                 * succeed to update, while other threads yield its time slice.
                 */
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // Successfully updated, return the created bucket.
                    return window;
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                /*
                 *     B0       B1      B2     B3      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            startTime of Bucket 3: 800, so it's up-to-date
                 *
                 * If current {@code windowStart} is equal to the start timestamp of old bucket,
                 * that means the time is within the bucket, so directly return the bucket.
                 */
                return old;
            } else if (windowStart > old.windowStart()) {
                /*
                 *   (old)
                 *             B0       B1      B2    NULL      B4
                 * |_______||_______|_______|_______|_______|_______||___
                 * ...    1200     1400    1600    1800    2000    2200  timestamp
                 *                              ^
                 *                           time=1676
                 *          startTime of Bucket 2: 400, deprecated, should be reset
                 *
                 * If the start timestamp of old bucket is behind provided time, that means
                 * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
                 * Note that the reset and clean-up operations are hard to be atomic,
                 * so we need a update lock to guarantee the correctness of bucket update.
                 *
                 * The update lock is conditional (tiny scope) and will take effect only when
                 * bucket is deprecated, so in most cases it won't lead to performance loss.
                 */
                if (updateLock.tryLock()) {
                    try {
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                // Should not go through here, as the provided time is already behind.
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }
}
  1. 限流计数与规则

限流计数

  • 单机:本地LoadingCache等支持过期时间的表

  • 分布式:redis

    • 参见:Better Rate Limiting With Redis Sorted Sets,利用ZSET来避免竞态条件下,系统性能被锁瓶颈影响。(简单来说就是把单变量的get -> add 1 的操作变成 add时间戳,通过count时间戳的数量来确认是否限流)

限流策略

  • 拒绝

  • 放入消息队列,后续消费

  1. Sentinal源码分析

核心抽象

  • 资源

  • 规则 -> FlowRule.class

  • 流控、降级、热点、授权

请求是如何被统计的?

调用链
com.alibaba.csp.sentinel.CtSph#entry(java.lang.String)
    // 业务侧的起点,会初始化出调用链,并且Context类的资源是保存在ThreadLocal中的,
    // 也就是再一次调用中复用的同一个 Context,因此可以通过Context的来自动构建Node树
    
com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain#entry
    // ProcessorSlot的调用链,顺序依照
    public static final int ORDER_NODE_SELECTOR_SLOT = -10000;
    public static final int ORDER_CLUSTER_BUILDER_SLOT = -9000;
    public static final int ORDER_LOG_SLOT = -8000;
    public static final int ORDER_STATISTIC_SLOT = -7000;
    public static final int ORDER_AUTHORITY_SLOT = -6000;
    public static final int ORDER_SYSTEM_SLOT = -5000;
    public static final int ORDER_FLOW_SLOT = -2000;
    public static final int ORDER_DEFAULT_CIRCUIT_BREAKER_SLOT = -1500;
    
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot#entry
    // 会尝试初始化这次调用的Node类,Node类用于统计请求指标
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        /*
         * It's interesting that we use context name rather resource name as the map key.
         *
         * Remember that same resource({@link ResourceWrapper#equals(Object)}) will share
         * the same {@link ProcessorSlotChain} globally, no matter in which context. So if
         * code goes into {@link #entry(Context, ResourceWrapper, DefaultNode, int, Object...)},
         * the resource name must be same but context name may not.
         *
         * If we use {@link com.alibaba.csp.sentinel.SphU#entry(String resource)} to
         * enter same resource in different context, using context name as map key can
         * distinguish the same resource. In this case, multiple {@link DefaultNode}s will be created
         * of the same resource name, for every distinct context (different context name) each.
         *
         * Consider another question. One resource may have multiple {@link DefaultNode},
         * so what is the fastest way to get total statistics of the same resource?
         * The answer is all {@link DefaultNode}s with same resource name share one
         * {@link ClusterNode}. See {@link ClusterBuilderSlot} for detail.
         */
        DefaultNode node = map.get(context.getName());
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    node = new DefaultNode(resourceWrapper, null);
                    // DefaultNode具有统计功能
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    // Build invocation tree
                    // 最上层的调用发起处在此处将会添加到entranceNode的child字段中,
                    // 非最上层的调用发起处则会添加到curNode的child字段中
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }
    
            }
        }
    
        context.setCurNode(node);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
    
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot#entry
    // 而后会通过这个slow进行统计, 这也就是为什么最先执行NodeSelectorSlot,然后再是StatisticSlot
    // 最后再是一些业务规则
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // Do some checking.
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
    
            // Request passed, add thread count and pass count.
            node.increaseThreadNum();
            node.addPassRequest(count);
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }
    
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }
    
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }
    
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {
            // Blocked, set block exception to current entry.
            context.getCurEntry().setBlockError(e);
    
            // Add block count.
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }
    
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }
    
            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }
    
            throw e;
        } catch (Throwable e) {
            // Unexpected internal error, set error to current entry.
            context.getCurEntry().setError(e);
    
            throw e;
        }
    }
    

限流规则是如何被apply的?

核心责任链处理器接口
public interface ProcessorSlot<T> {

    /**
     * Entrance of this slot.
     *
     * @param context         current {@link Context}
     * @param resourceWrapper current resource
     * @param param           generics parameter, usually is a {@link com.alibaba.csp.sentinel.node.Node}
     * @param count           tokens needed
     * @param prioritized     whether the entry is prioritized
     * @param args            parameters of the original call
     * @throws Throwable blocked exception or unexpected error
     */
    void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
               Object... args) throws Throwable;

    /**
     * Means finish of {@link #entry(Context, ResourceWrapper, Object, int, boolean, Object...)}.
     *
     * @param context         current {@link Context}
     * @param resourceWrapper current resource
     * @param obj             relevant object (e.g. Node)
     * @param count           tokens needed
     * @param prioritized     whether the entry is prioritized
     * @param args            parameters of the original call
     * @throws Throwable blocked exception or unexpected error
     */
    void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,
                   Object... args) throws Throwable;

    /**
     * Exit of this slot.
     *
     * @param context         current {@link Context}
     * @param resourceWrapper current resource
     * @param count           tokens needed
     * @param args            parameters of the original call
     */
    void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);

    /**
     * Means finish of {@link #exit(Context, ResourceWrapper, int, Object...)}.
     *
     * @param context         current {@link Context}
     * @param resourceWrapper current resource
     * @param count           tokens needed
     * @param args            parameters of the original call
     */
    void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}

// 责任链的处理顺序 授权规则 -> 系统规则 -> 热点规则 ->  流量控制规则 -> 降级规则 
/**
 * Order of default processor slots
 */
public static final int ORDER_NODE_SELECTOR_SLOT = -10000;
public static final int ORDER_CLUSTER_BUILDER_SLOT = -9000;
public static final int ORDER_LOG_SLOT = -8000;
public static final int ORDER_STATISTIC_SLOT = -7000;
public static final int ORDER_AUTHORITY_SLOT = -6000;
public static final int ORDER_SYSTEM_SLOT = -5000;
public static final int ORDER_FLOW_SLOT = -2000;
public static final int ORDER_DEFAULT_CIRCUIT_BREAKER_SLOT = -1500;
public static final int ORDER_DEGRADE_SLOT = -1000;

整体设计上是通过实现不同的XXXSlot类(提供一些模板性质的方法),并为每个类关联不同的XXXRule来实现级联的规则校验。例如在热点控制这个Slot中,可以为某个特定的Api资源配置多个不同的热点参数值,这些不同的热点参数就会具象为不同的ParamFlowRule

FlowSlot
@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    private final FlowRuleChecker checker;

    public FlowSlot() {
        this(new FlowRuleChecker());
    }

    /**
     * Package-private for test.
     *
     * @param checker flow rule checker
     * @since 1.6.1
     */
    FlowSlot(FlowRuleChecker checker) {
        AssertUtil.notNull(checker, "flow checker should not be null");
        this.checker = checker;
    }

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        checkFlow(resourceWrapper, context, node, count, prioritized);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
        throws BlockException {
        checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }

    private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
        @Override
        public Collection<FlowRule> apply(String resource) {
            return FlowRuleManager.getFlowRules(resource);
        }
    };
}

可以看到核心的规则逻辑是体现在FlowRule中的,通过FlowRuleManager获取到。校验的时候提供了两种机制,集群和本地

// FlowRuleChecker.class

public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                boolean prioritized) {
    String limitApp = rule.getLimitApp();
    if (limitApp == null) {
        return true;
    }

    if (rule.isClusterMode()) {
        return passClusterCheck(rule, context, node, acquireCount, prioritized);
    }

    return passLocalCheck(rule, context, node, acquireCount, prioritized);
}

rater主要有以上四种实现,底层基本是依赖滑动窗口算法

Why?

  • 滑动窗口可以更方便的列出一些统计信息,从而进行额外的限流功能的扩展

  • 令牌桶算法还需要维护token的注入速度,并且只能for限流使用。而在其他场景,还是需要滑动窗口相关的数据结构来统计一些系统指标

DefualtController
// 依赖defaultNode中统计出的qps数,本质是滑动窗口统计的结果
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    int curCount = avgUsedTokens(node);
    if (curCount + acquireCount > count) {
        if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
            long currentTime;
            long waitInMs;
            currentTime = TimeUtil.currentTimeMillis();
            waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
            if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                node.addOccupiedPass(acquireCount);
                sleep(waitInMs);

                // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                throw new PriorityWaitException(waitInMs);
            }
        }
        return false;
    }
    return true;
}
WarmUpLimiter
 @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        // 1. 获取前一个统计周期的 QPS
        long previousQps = (long) node.previousPassQps();
        // 同步令牌,基于之前的 QPS 更新当前存储的令牌数
        syncToken(previousQps);

        long currentTime = TimeUtil.currentTimeMillis();
        long restToken = storedTokens.get();  // 获取当前存储的令牌数

        long costTime = 0;  // 计算本次请求需要的时间
        long expectedTime = 0;  // 预期执行时间

        // 2. 令牌数超过警戒值,说明系统处于预热阶段
        if (restToken >= warningToken) {
            // 计算超出警戒值的令牌数
            long aboveToken = restToken - warningToken;
            
            // 根据斜率计算预热期间的 QPS
            // warmingQps 会随着 restToken 的减少而增加
            double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            // 计算按照预热 QPS 处理请求需要的时间
            costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000);
        } else {
            // 3. 令牌数低于警戒值,按照目标 QPS 处理
            costTime = Math.round(1.0 * (acquireCount) / count * 1000);
        }
        
        // 4. 计算预期完成时间
        expectedTime = costTime + latestPassedTime.get();

        // 5. 如果预期完成时间小于当前时间,说明可以立即处理
        if (expectedTime <= currentTime) {
            latestPassedTime.set(currentTime);
            return true;
        } else {
            // 6. 需要等待的情况
            long waitTime = costTime + latestPassedTime.get() - currentTime;
            // 等待时间超过最大超时时间,直接拒绝
            if (waitTime > timeoutInMs) {
                return false;
            } else {
                // 7. 更新最新通过时间并等待
                long oldTime = latestPassedTime.addAndGet(costTime);
                try {
                    waitTime = oldTime - TimeUtil.currentTimeMillis();
                    if (waitTime > timeoutInMs) {
                        latestPassedTime.addAndGet(-costTime);
                        return false;
                    }
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
    }
} 
ParamFlowSlot

主要的作用是可以根据一些请求的参数值,进行限流,比如促销时一些热点的品类

// com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot#checkFlow

void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
    if (args == null) {
        return;
    }
    if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
        return;
    }
    List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());

    for (ParamFlowRule rule : rules) {
        applyRealParamIdx(rule, args.length);

        // Initialize the parameter metrics.
        ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);

        if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
            String triggeredParam = "";
            if (args.length > rule.getParamIdx()) {
                Object value = args[rule.getParamIdx()];
                // Assign actual value with the result of paramFlowKey method
                if (value instanceof ParamFlowArgument) {
                    value = ((ParamFlowArgument) value).paramFlowKey();
                }
                triggeredParam = String.valueOf(value);
            }
            throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
        }
    }
}

//集群模式,使用本地模式作为兜底策略。集群模式依赖中心化的限流服务,SDK中也提供了嵌入的服务端服务
private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                        boolean prioritized) {
    try {
        TokenService clusterService = pickClusterService();
        if (clusterService == null) {
            return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
        }
        long flowId = rule.getClusterConfig().getFlowId();
        TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
        return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
        // If client is absent, then fallback to local mode.
    } catch (Throwable ex) {
        RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
    }
    // Fallback to local flow control when token client or server for this rule is not available.
    // If fallback is not enabled, then directly pass.
    return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}


// 本地模式则直接使用SDK中的类进行校验,核心逻辑在Rater字段上,rater是一系列实现了com.alibaba.csp.sentinel.slots.block.flow.TrafficShapingController
// 接口的类
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                      boolean prioritized) {
    Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
    if (selectedNode == null) {
        return true;
    }

    return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
SystemSlot

主要是check一些系统层面的宏观统计指标,如全局的qps,cpu利用率,load,线程数等等

@Spi(order = Constants.ORDER_SYSTEM_SLOT)
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        SystemRuleManager.checkSystem(resourceWrapper, count);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }

}

// SystemRuleManager.checkSystem 中的一部端核心逻辑
public static void checkSystem(ResourceWrapper resourceWrapper, int count) throws BlockException {
    if (resourceWrapper == null) {
        return;
    }
    // Ensure the checking switch is on.
    if (!checkSystemStatus.get()) {
        return;
    }

    // for inbound traffic only
    if (resourceWrapper.getEntryType() != EntryType.IN) {
        return;
    }

    // total qps
    double currentQps = Constants.ENTRY_NODE.passQps();
    if (currentQps + count > qps) {
        throw new SystemBlockException(resourceWrapper.getName(), "qps");
    }

    // total thread
    int currentThread = Constants.ENTRY_NODE.curThreadNum();
    if (currentThread > maxThread) {
        throw new SystemBlockException(resourceWrapper.getName(), "thread");
    }

    double rt = Constants.ENTRY_NODE.avgRt();
    if (rt > maxRt) {
        throw new SystemBlockException(resourceWrapper.getName(), "rt");
    }

    // load. BBR algorithm.
    if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
        if (!checkBbr(currentThread)) {
            throw new SystemBlockException(resourceWrapper.getName(), "load");
        }
    }

    // cpu usage
    if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
        throw new SystemBlockException(resourceWrapper.getName(), "cpu");
    }
}
AuthoritySlot

主要是一些白名单和黑名单的逻辑

// 逻辑也非常简单,直接获取具体的auth规则去校验即可
@Spi(order = Constants.ORDER_AUTHORITY_SLOT)
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
        throws Throwable {
        checkBlackWhiteAuthority(resourceWrapper, context);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }

    void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {

        List<AuthorityRule> rules = AuthorityRuleManager.getRules(resource.getName());
        if (rules == null) {
            return;
        }

        for (AuthorityRule rule : rules) {
            if (!AuthorityRuleChecker.passCheck(rule, context)) {
                throw new AuthorityException(context.getOrigin(), rule);
            }
        }
    }
}
CircuitBreakerSlot

和熔断降级相关的职责,围绕状态机OPEN, HALF_OPEN和CLOSE展开

@Spi(order = Constants.ORDER_DEFAULT_CIRCUIT_BREAKER_SLOT)
public class DefaultCircuitBreakerSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        performChecking(context, resourceWrapper);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    private void performChecking(Context context, ResourceWrapper r) throws BlockException {
        // If user has set a degrade rule for the resource, the default rule will not be activated
        if (DegradeRuleManager.hasConfig(r.getName())) {
            return;
        }

        List<CircuitBreaker> circuitBreakers = DefaultCircuitBreakerRuleManager.getDefaultCircuitBreakers(r.getName());

        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
            return;
        }

        for (CircuitBreaker cb : circuitBreakers) {
            if (!cb.tryPass(context)) {
                throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
            }
        }
    }

    @Override
    public void exit(Context context, ResourceWrapper r, int count, Object... args) {
        Entry curEntry = context.getCurEntry();
        if (curEntry.getBlockError() != null) {
            fireExit(context, r, count, args);
            return;
        }

        if (DegradeRuleManager.hasConfig(r.getName())) {
            fireExit(context, r, count, args);
            return;
        }

        List<CircuitBreaker> circuitBreakers = DefaultCircuitBreakerRuleManager.getDefaultCircuitBreakers(r.getName());

        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
            fireExit(context, r, count, args);
            return;
        }

        if (curEntry.getBlockError() == null) {
            // passed request
            for (CircuitBreaker circuitBreaker : circuitBreakers) {
                circuitBreaker.onRequestComplete(context);
            }
        }

        fireExit(context, r, count, args);
    }
}


// OPEN 代表启用熔断, HALF_OPEN时如果抛出异常则重新回到OPEN,否则变为CLOSE,CLOSE代表不启用熔断规则

参考引用

  1. 搞定系统设计:面试敲开大厂的门

  2. Sentinal源码

标签:count,node,int,public,resourceWrapper,限流,Sentinal,context,源码
From: https://www.cnblogs.com/xy1997/p/18620730

相关文章

  • 基于GAN和densnet组合的调制信号分类网络(源码)
    (需要源码请私信或评论)生成对抗网络(GAN)原理生成对抗网络(GAN)是一种革命性的深度学习模型,在无监督学习领域取得了显著进展。其核心思想基于二人零和博弈,通过生成模型和判别模型的相互竞争实现高质量的数据合成。GAN由IanGoodfellow等人于2014年首次提出,随后在图像生成......
  • flask框架简易云在线人事管理系统毕设源码+论文
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于人事管理系统的研究,现有研究主要以传统本地部署的人事管理系统为主,专门针对简易云在线人事管理系统的研究较少。在国内外的企业管......
  • flask框架绿色农场管理系统毕设源码+论文
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于农场管理系统的研究,现有研究多侧重于传统农场管理模式的改进或单一功能模块的优化,专门针对绿色农场整体管理系统的研究较少。在国......
  • 【2025最新计算机毕业设计】新型吃住玩一体化旅游管理系统【提供源码+答辩PPT+文档+项
      作者简介:✌CSDN新星计划导师、Java领域优质创作者、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和学生毕业项目实战,高校老师/讲师/同行前辈交流。✌ 主要内容:......
  • 【2025最新计算机毕业设计】基于SpringBoot+Vue的旅游特产销售系统【提供源码+答辩PPT
     作者简介:✌CSDN新星计划导师、Java领域优质创作者、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和学生毕业项目实战,高校老师/讲师/同行前辈交流。✌ 主要内容:......
  • 计算机毕业设计 | SpringBoot+vue中山社区医疗综合服务平台(附源码+论文)
    1,绪论1.1课题背景信息数据从传统到当代,是一直在变革当中,突如其来的互联网让传统的信息管理看到了革命性的曙光,因为传统信息管理从时效性,还是安全性,还是可操作性等各个方面来讲,遇到了互联网时代才发现能补上自古以来的短板,有效的提升管理的效率和业务水平。传统的管理模式......
  • 浅入浅出docker run命令源码3-containerd续篇
    1.前情回顾上一篇我们已经知道如何找到对应的gRPC请求接口的逻辑代码了,但是还没有看具体的代码。在最初的《浅入浅出dockerrun命令源码》中已知,启动容器还需要启动shim进程以及runc进程。但是具体是如何启动的,还不清楚。这篇文章中,主要解决问题是containerd是如何启动......
  • 免费送源码:Java+B/S+MySQL springboot电影推荐系统 计算机毕业设计原创定制
             摘 要随着互联网与移动互联网迅速普及,网络上的电影娱乐信息数量相当庞大,人们对获取感兴趣的电影娱乐信息的需求越来越大,个性化的电影推荐系统成为一个热门。然而电影信息的表示相当复杂,己有的相似度计算方法与推荐算法都各有优势,导致单一的相似度计算方......
  • ssm毕设青少年编程课程教学评价源码+程序+论文
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容选题背景随着信息技术的迅猛发展,编程教育在青少年群体中的普及率日益提高,成为培养其创新思维和解决问题能力的重要途径。关于青少年编程课程的教学评价,现有研......
  • ssm毕设青少年公共卫生教育平台源码+程序+论文
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容选题背景青少年公共卫生教育作为提升全民健康素养的关键环节,近年来在国内外受到了广泛关注。现有研究主要集中在公共卫生教育的内容设计、教学方法以及效果评......