SphU调用流程
流程图
代码跟踪
- 开始调用
在总结资源生成的时候,我们已经列举了不同的资源生成,会调用不同的方法,下面我们使用较为常用的方法SphU.Entry
来作为入口分析
/**
* 记录统计数据并对给定资源执行规则检查
* Record statistics and perform rule checking for the given resource.
*
* @param name the unique name of the protected resource
* @return the {@link Entry} of this invocation (used for mark the invocation complete and get context data)
* @throws BlockException if the block criteria is met (e.g. metric exceeded the threshold of any rules)
*/
public static Entry entry(String name) throws BlockException {
// 重载了各种方法 通过参数来确定不同的资源
return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}
------------------------------------------
@Override
public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
// 这里会使用string资源作为我们初始化资源的入参,这里是使用的ResourceWrapper的一个string子类
StringResourceWrapper resource = new StringResourceWrapper(name, type);
return entry(resource, count, args);
}
- 调用途中
沿着调用栈逐渐深入,可以发现无论是从哪个入口,最终都是在com.alibaba.csp.sentinel.CtSph#entryWithPriority(com.alibaba.csp.sentinel.slotchain.ResourceWrapper, int, boolean, java.lang.Object...)
中进行处理
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
// 构建context
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
// so here init the entry only. No rule checking will be done.
// 超过阈值后 才会执行到这里,是一个空context
// 所以只初始化, 不做其他规则校验
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
// Using default context.
// 兜底
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// Global switch is close, no rule checking will do.
// 全局的开关,如果没有打开,不去做规则校验
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
// 开始获取processSlot
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
/*
* 为空说明 链路超过了常量 6000 条, 就不处理了
* Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
* so no rule checking will be done.
*/
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
// 新建一个当前执行的entry, 每次对资源的操作都会生成,
// 然后将上一次生成的entry 作为parent
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
// 最终返回的entry 操作实体
return e;
}
- 获取对应ProcessSlot
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
// 从chainMap 根据当前资源获取slot, 这里也说明了 一个resource 只能对应一个porcessSlot
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
// double check
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
// 超过了上限, 就不弄这个资源的限制了
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
// 新弄一个chain
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
- 责任链的形式一直去执行chain 里面的entry方法, 我们会在
NodeSelectorSlot
之中更新当前的currNode,并绑定defaultNode
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
/*
有趣的是,我们使用上下文名称而不是资源名称作为映射键。记住,无论在哪个上下文中,相同的资源({@link ResourceWrapperequals(Object)})将全局共享相同的{@link ProcessorSlotChain}。因此,如果代码进入{@link条目(Context, ResourceWrapper, DefaultNode, int, Object…)},资源名必须相同,但上下文名可以不同。如果我们使用{@link com.alibaba.csp.sentinel。SphUentry(String resource)}在不同的上下文中输入相同的资源,使用上下文名称作为映射键可以区分相同的资源。在这种情况下,将为每个不同的上下文(不同的上下文名称)创建多个具有相同资源名称的{@link DefaultNode}。考虑另一个问题。一个资源可能有多个{@link DefaultNode},那么获得同一资源的统计数据的最快方法是什么?答案是所有具有相同资源名的{@link DefaultNode}共享一个{@link ClusterNode}。详情请参见{@link ClusterBuilderSlot}。
*/
// 在这里根据上下文去生DefaultNode
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
// 一个context 生成的 一个defaultNode 而且是根据context的name 来确认唯一性的
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
// 调用树,添加子节点
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
// 设置为当前Node
context.setCurNode(node);
// 执行其他的slotchain
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
- 在flowSlot 根据规则拦截,然后在StatisticSlot 统计当前次数数据
// StatisticSlot 中的数据统计,,
// 1 当前entry的次数和成功次数+1
// 2. 整个服务(culsterNode) 的次数和成功次数+1
// 3. 成功统计之后的回调处理.
// 如失败,则当前请求次数+1,成功次数不做增减
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// 阻止访问了的话 不做访问次数+1
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// 未知异常的跑抛出
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
标签:count,node,调用,Sphu,resourceWrapper,context,entry,null
From: https://www.cnblogs.com/wzqshb/p/16585817.html