首页 > 其他分享 >Spring Cloud Alibaba——Sentinel Slot

Spring Cloud Alibaba——Sentinel Slot

时间:2023-01-17 18:06:14浏览次数:60  
标签:Slot count Object node Spring args resourceWrapper context Sentinel

slot概述

在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如:

  • NodeSelectorSlot:负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级。

  • ClusterBuilderSlot:则用于存储资源的统计信息以及调用者信息,例如该资源的 RT、QPS、thread count 等等,这些信息将用作为多维度限流,降级的依据。

  • LogSlot:则用于记录用于记录块异常,为故障排除提供具体的日志。

  • StatisticSlot:则用于记录、统计不同纬度的 runtime 指标监控信息。

  • AuthoritySlot:则根据配置的黑白名单和调用来源信息,来做黑白名单控制。

  • SystemSlot:则通过系统的状态,例如 load1 等,来控制总的入口流量。

  • FlowSlot:则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制。

  • DegradeSlot:则通过统计信息以及预设的规则,来做熔断降级。

下面是关系结构图:

solt的基本逻辑及代码演示

每个Slot执行完业务逻辑处理后,会调用fireEntry()方法,该方法将会触发下一个节点的entry方法,下一个节点又会调用他的fireEntry,以此类推直到最后一个Slot,由此就形成了sentinel的责任链。

 

工作流概述:

根据slot 的基本实现processorSlot的实现类讲一下slot 的基本结构

先看看顶层接口ProcessorSlot

public interface ProcessorSlot<T> {

	//开始入口
    void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
               Object... args) throws Throwable;

	//finish意味着结束
    void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,
                   Object... args) throws Throwable;

	//退出插槽
    void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);

	//退出插槽结束
    void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}

这个接口有4个方法,entry,fireEntry,exit,fireExit

ProcessorSlot 的抽象实现 AbstractLinkedProcessorSlot

public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {

    private AbstractLinkedProcessorSlot<?> next = null;

    @Override
    public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
		//当业务执行完毕后,如果还有下一个slot
        if (next != null) {
            next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        }
    }

    @SuppressWarnings("unchecked")
	//指向下一个slot的entry,每一个slot根据自己的职责不同,有自己的实现
    void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
        throws Throwable {
        T t = (T)o;
        entry(context, resourceWrapper, t, count, prioritized, args);
    }

    @Override
    public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
		//当一个slot的exit执行完毕后,如果还有下一个未关闭slot
        if (next != null) {
			 //指向下一个slot的exit
            next.exit(context, resourceWrapper, count, args);
        }
    }

    public AbstractLinkedProcessorSlot<?> getNext() {
        return next;
    }

    public void setNext(AbstractLinkedProcessorSlot<?> next) {
        this.next = next;
    }

}

DefaultProcessorSlotChain实现了上述的chain(setNext和getNext)

public class DefaultProcessorSlotChain extends ProcessorSlotChain {

	//直接实现了AbstractLinkedProcessorSlot的实例并作为first,可以理解为当前slot
    AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
        public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable {
            super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
        }

        public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
            super.fireExit(context, resourceWrapper, count, args);
        }
    };
	
	//默认的end(可以理解为当前的后一个slot)
    AbstractLinkedProcessorSlot<?> end;

    public DefaultProcessorSlotChain() {
        this.end = this.first;
    }

    public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        protocolProcessor.setNext(this.first.getNext());
        this.first.setNext(protocolProcessor);
		//如果当前为最后一个
        if (this.end == this.first) {
            this.end = protocolProcessor;
        }

    }

    public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
		//将后一个slot放进当前slot的next
        this.end.setNext(protocolProcessor);
		//将end指向后一个slot
        this.end = protocolProcessor;
    }

    public void setNext(AbstractLinkedProcessorSlot<?> next) {
        this.addLast(next);
    }

    public AbstractLinkedProcessorSlot<?> getNext() {
        return this.first.getNext();
    }

    public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable {
        this.first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
    }

    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        this.first.exit(context, resourceWrapper, count, args);
    }
}

NodeSelectorSlot

@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {

    /**
     * 相同的资源但是Context不同,分别新建 DefaultNode,并以ContextName为key
     */
    private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
	
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
		// 根据ContextName尝试获取DefaultNode
        DefaultNode node = map.get(context.getName());
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
					// 初始化resource对应的Node 类型为DefaultNode,每个Context对应一个
                    node = new DefaultNode(resourceWrapper, null);
					//保存Node  一个resource对应一个Node
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
					// 构建 Node tree
					// 添加到Context的Node节点,这里构造了一棵节点树
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }

            }
        }
		//设置当前Context的当前节点为node
        context.setCurNode(node);
		// 唤醒执行下一个插槽,调用下一个Slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }
}

NodeSelectorSlot顾名思义是用来构建Node的。

 

我们可以看到NodeSelectorSlot对于不同的上下文都会生成一个DefaultNode。这里还有一个要注意的点:相同的资源({@link ResourceWrapper#equals(Object)})将全局共享相同的{@link ProcessorSlotChain},无论在哪个上下文中,因此不同的上下文可以进入到同一个对象的NodeSelectorSlot.entry方法中,那么这里要怎么区分不同的上下文所创建的资源Node呢?显然可以使用上下文名称作为映射键以区分相同的资源Node。

 

然后这里要考虑另一个问题。一个资源有可能创建多个DefaultNode(有多个上下文时),那么我们应该如何快速的获取总的统计数据呢?

 

答案就在下一个Slot(ClusterBuilderSlot)中被解决了。

ClusterBuilderSlot

上面有提到一个问题,我们要如何统计不同上下文相同资源的总量数据。ClusterBuilderSlot给了很好的解决方案:具有相同资源名称的共享一个ClusterNode。

@Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT)
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

	// 相同的资源共享一个 ClusterNode
    private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();

    private static final Object lock = new Object();

    private volatile ClusterNode clusterNode = null;
	
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args)
        throws Throwable {
		// 判断本资源是否已经初始化过clusterNode
        if (clusterNode == null) {
            synchronized (lock) {
                if (clusterNode == null) {
                    // 没有初始化则初始化clusterNode
                    clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                    HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                    newMap.putAll(clusterNodeMap);
                    newMap.put(node.getId(), clusterNode);

                    clusterNodeMap = newMap;
                }
            }
        }
		// 给相同资源的DefaultNode设置一样的ClusterNode
        node.setClusterNode(clusterNode);

        /*
         *  如果有来源则新建一个来源Node
         */
        if (!"".equals(context.getOrigin())) {
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        }

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}	

上面的代码其实就是做了一件事情,为资源创建CluserNode,相同的资源({@link ResourceWrapper#equals(Object)})将全局共享相同的{@link ProcessorSlotChain},无论在哪个上下文中。也就是说,能进入到同一个ClusterBuilderSlot对象的entry方法的请求都是来自同一个资源的,所以这些相同资源需要初始化一个统一的CluserNode用来做流量的汇总统计。

LogSlot

代码比较简单,逻辑就是打印异常日志,就不分析了。

StatisticSlot

StatisticSlot 是 Sentinel 的核心功能插槽之一,用于统计实时的调用数据。

@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // Do some checking.
			//next(下一个)节点调用Entry方法
			//先进行后续的check,包括规则的check,黑白名单check
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
			// 如果能通过SlotChain中后面的Slot的entry方法,说明没有被限流或降级

            // 统计默认qps 线程数, 当前线程数加1
            node.increaseThreadNum();
			//通过的请求加上count
            node.addPassRequest(count);

			// 元节点通过请求数和当前线程(LongAdder curThreadNum)计数器加1
            if (context.getCurEntry().getOriginNode() != null) {
                // 根据来源统计qps 线程数
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }
			
			// 入口节点通过请求数和当前线程(LongAdder curThreadNum)计数器加1
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // 统计入口 qps 线程数
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            // Handle pass event with registered entry callback handlers.
			// 注册的扩展点的数据统计
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {
            // Blocked, set block exception to current entry.
            context.getCurEntry().setBlockError(e);

            // Add block count.
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable e) {
            // Unexpected internal error, set error to current entry.
            context.getCurEntry().setError(e);

            throw e;
        }
    }
}

StatisticSlot主要做了4种不同维度的流量统计

  • 1、资源在上下文维度(DefaultNode)的统计。
  • 2、clusterNode 维度的统计。
  • 3、Origin 来源维度的统计。
  • 4、入口全局流量的统计。

SystemSlot

SystemSlot比较简单,其实就是根据StatisticSlot所统计的全局入口流量进行限流。

AuthoritySlot

@Spi(order = Constants.ORDER_AUTHORITY_SLOT)
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
        throws Throwable {
        checkBlackWhiteAuthority(resourceWrapper, context);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
	
    void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
        Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();

        if (authorityRules == null) {
            return;
        }
		
		// 根据资源获取黑白名单规则
        Set<AuthorityRule> rules = authorityRules.get(resource.getName());
        if (rules == null) {
            return;
        }

		// 对规则进行校验,只要有一条不通过 就抛异常
        for (AuthorityRule rule : rules) {
            if (!AuthorityRuleChecker.passCheck(rule, context)) {
                throw new AuthorityException(context.getOrigin(), rule);
            }
        }
    }
}	

AuthoritySlot会对资源的黑白名单做检查,并且只要有一条不通过就抛异常。

FlowSlot

@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    private final FlowRuleChecker checker;
	
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        checkFlow(resourceWrapper, context, node, count, prioritized);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
        throws BlockException {
		// 检查限流规则
        checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
    }
}	

这个slot主要根据预设的资源的统计信息,按照固定的次序,依次生效。如果一个资源对应两条或者多条流控规则,则会根据如下次序依次检验,直到全部通过或者有一个规则生效为止,并且同样也会根据三种不同的维度来进行限流:

  • 1、资源在上下文维度(DefaultNode)的统计。
  • 2、clusterNode 维度的统计。
  • 3、Origin 来源维度的统计。

DegradeSlot

这个slot主要针对资源的平均响应时间(RT)以及异常比率,来决定资源是否在接下来的时间被自动熔断掉。

总结

  • 1、相同的资源({@link ResourceWrapper#equals(Object)})将全局共享相同的{@link ProcessorSlotChain},无论在哪个上下文中。

  • 2、流控有多个维度,分别包括:

    • 1、不同上下文中的资源。
    • 2、相同资源。
    • 3、入口流量。
    • 4、相同的来源。

Sentinel中预设的SlotChain执行的完整流程:

参考: https://www.cnblogs.com/taromilk/p/11751000.html

https://juejin.cn/post/6870671845582962702

https://blog.csdn.net/wk52525/article/details/104439404

标签:Slot,count,Object,node,Spring,args,resourceWrapper,context,Sentinel
From: https://blog.51cto.com/u_14014612/6017533

相关文章