Sentinel源码—3.ProcessorSlot的执行过程一
大纲
1.NodeSelectorSlot构建资源调用树
2.LogSlot和StatisticSlot采集资源的数据
3.Sentinel监听器模式的规则对象与规则管理
4.AuthoritySlot控制黑白名单权限
5.SystemSlot根据系统保护规则进行流控
1.NodeSelectorSlot构建资源调用树
(1)Entry的处理链的执行入口
(2)NodeSelectorSlot的源码
(3)Context对象中存储的资源调用树总结
(1)Entry的处理链的执行入口
每当一个线程处理包含某些资源的接口请求时,会调用SphU的entry()方法去创建并管控该接口中涉及的Entry资源访问对象。
在创建Entry资源访问对象的期间,会创建一个ResourceWrapper对象、一个Context对象、以及根据ResourceWrapper对象创建或获取一个ProcessorSlotChain对象,也就是把ProcessorSlotChain对象、Context对象与ResourceWrapper对象绑定到Entry对象中。
public class SphU {private static final Object[] OBJECTS0 = new Object[0];...public static Entry entry(String name) throws BlockException {//调用CtSph.entry()方法创建一个Entry资源访问对象,默认的请求类型为OUTreturn Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);}
}public class Env {//创建一个CtSph对象public static final Sph sph = new CtSph();static {InitExecutor.doInit();}
}public class CtSph implements Sph {//Same resource will share the same ProcessorSlotChain}, no matter in which Context.//Same resource is that ResourceWrapper#equals(Object).private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap = new HashMap<ResourceWrapper, ProcessorSlotChain>();...@Overridepublic Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {//StringResourceWrapper是ResourceWrapper的子类,且StringResourceWrapper的构造方法默认了资源类型为COMMONStringResourceWrapper resource = new StringResourceWrapper(name, type);return entry(resource, count, args);}//Do all {@link Rule}s checking about the resource.public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {//调用CtSph.entryWithPriority()方法,执行如下处理://初始化Context -> 将Context与线程绑定 -> 初始化Entry -> 将Context和ResourceWrapper放入Entry中return entryWithPriority(resourceWrapper, count, false, args);}private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {//从当前线程中获取ContextContext context = ContextUtil.getContext();if (context instanceof NullContext) {return new CtEntry(resourceWrapper, null, context);}//如果没获取到Contextif (context == null) {//Using default context.//创建一个名为sentinel_default_context的Context,并且与当前线程绑定context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);}//Global switch is close, no rule checking will do.if (!Constants.ON) {return new CtEntry(resourceWrapper, null, context);}//调用CtSph.lookProcessChain()方法初始化处理链(处理器插槽链条)ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);if (chain == null) {return new CtEntry(resourceWrapper, null, context);}//创建出一个Entry对象,将处理链(处理器插槽链条)、Context与Entry绑定//其中会将Entry的三个基础属性(封装在resourceWrapper里)以及当前Entry所属的Context作为参数传入CtEntry的构造方法Entry e = new CtEntry(resourceWrapper, chain, context);try {//处理链(处理器插槽链条)入口,负责采集数据,规则验证//调用DefaultProcessorSlotChain.entry()方法执行处理链每个节点的逻辑(数据采集+规则验证)chain.entry(context, resourceWrapper, null, count, prioritized, args);} catch (BlockException e1) {//规则验证失败,比如:被流控、被熔断降级、触发黑白名单等e.exit(count, args);throw e1;} catch (Throwable e1) {RecordLog.info("Sentinel unexpected exception", e1);}return e;}...private final static class InternalContextUtil extends ContextUtil {static Context internalEnter(String name) {//调用ContextUtil.trueEnter()方法创建一个Context对象return trueEnter(name, "");}static Context internalEnter(String name, String origin) {return trueEnter(name, origin);}}//Get ProcessorSlotChain of the resource. //new ProcessorSlotChain will be created if the resource doesn't relate one.//Same resource will share the same ProcessorSlotChain globally, no matter in which Context.//Same resource is that ResourceWrapper#equals(Object).ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {ProcessorSlotChain chain = chainMap.get(resourceWrapper);if (chain == null) {synchronized (LOCK) {chain = chainMap.get(resourceWrapper);if (chain == null) {//Entry size limit.if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {return null;}//调用SlotChainProvider.newSlotChain()方法初始化处理链(处理器插槽链条)chain = SlotChainProvider.newSlotChain();//写时复制Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1);newMap.putAll(chainMap);newMap.put(resourceWrapper, chain);chainMap = newMap;}}}return chain;}
}public class StringResourceWrapper extends ResourceWrapper {public StringResourceWrapper(String name, EntryType e) {//调用父类构造方法,且默认资源类型为COMMONsuper(name, e, ResourceTypeConstants.COMMON);}...
}//Utility class to get or create Context in current thread.
//Each SphU.entry() should be in a Context.
//If we don't invoke ContextUtil.enter() explicitly, DEFAULT context will be used.
public class ContextUtil {//Store the context in ThreadLocal for easy access.//存放线程与Context的绑定关系//每个请求对应一个线程,每个线程绑定一个Context,所以每个请求对应一个Contextprivate static ThreadLocal<Context> contextHolder = new ThreadLocal<>();//Holds all EntranceNode. Each EntranceNode is associated with a distinct context name.//以Context的name作为key,EntranceNode作为value缓存到HashMap中private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();private static final ReentrantLock LOCK = new ReentrantLock();private static final Context NULL_CONTEXT = new NullContext();...//ContextUtil.trueEnter()方法会尝试从ThreadLocal获取一个Context对象//如果获取不到,再创建一个Context对象然后放入ThreadLocal中//入参name其实一般就是默认的Constants.CONTEXT_DEFAULT_NAME=sentinel_default_context//由于当前线程可能会涉及创建多个Entry资源访问对象,所以trueEnter()方法需要注意并发问题protected static Context trueEnter(String name, String origin) {//从ThreadLocal中获取当前线程绑定的Context对象Context context = contextHolder.get();//如果当前线程还没绑定Context对象,则初始化Context对象并且与当前线程进行绑定if (context == null) {//首先要获取或创建Context对象所需要的EntranceNode对象,EntranceNode会负责统计名字相同的Context下的指标数据//将全局缓存contextNameNodeMap赋值给一个临时变量localCacheNameMap//因为后续会对contextNameNodeMap的内容进行修改,所以这里需要将原来的contextNameNodeMap复制一份出来//从而避免后续对contextNameNodeMap的内容进行修改时,可能造成对接下来读取contextNameNodeMap内容的影响Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;//从缓存副本localCacheNameMap中获取EntranceNode//这个name其实一般就是默认的sentinel_default_contextDefaultNode node = localCacheNameMap.get(name);//如果获取的EntranceNode为空if (node == null) {//为了防止缓存无限制地增长,导致内存占用过高,需要设置一个上限,只要超过上限,就直接返回NULL_CONTEXTif (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {setNullContext();return NULL_CONTEXT;} else {//如果Context还没创建,缓存里也没有当前Context名称对应的EntranceNode,并且缓存数量尚未达到2000//那么就创建一个EntranceNode,创建EntranceNode时需要加锁,否则会有线程不安全问题//毕竟需要修改HashMap类型的contextNameNodeMap//通过加锁 + 缓存 + 写时复制更新缓存,避免并发情况下创建出多个EntranceNode对象//一个线程对应一个Context对象,多个线程对应多个Context对象//这些Context对象会使用ThreadLocal进行隔离,但它们的name默认都是sentinel_default_context//根据下面的代码逻辑://多个线程(对应多个Context的name默认都是sentinel_default_context)会共用同一个EntranceNode//于是可知,多个Context对象会共用一个EntranceNode对象LOCK.lock();try {//从缓存中获取EntranceNodenode = contextNameNodeMap.get(name);//对node进行Double Check//如果没获取到EntranceNodeif (node == null) {if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {setNullContext();return NULL_CONTEXT;} else {//创建EntranceNode,缓存到contextNameNodeMap当中node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);//Add entrance node.//将新创建的EntranceNode添加到ROOT中,ROOT就是每个Node的根结点Constants.ROOT.addChild(node);//写时复制,将新创建的EntranceNode添加到缓存中Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);newMap.putAll(contextNameNodeMap);newMap.put(name, node);contextNameNodeMap = newMap;}}} finally {//解锁LOCK.unlock();}}}//此处可能会有多个线程同时执行到此处,并发创建多个Context对象//但这是允许的,因为一个请求对应一个Context,一个请求对应一个线程,所以一个线程本来就需要创建一个Context对象//初始化Context,将刚获取到或刚创建的EntranceNode放到Context的entranceNode属性中context = new Context(node, name);context.setOrigin(origin);//将创建出来的Context对象放入ThreadLocal变量contextHolder中,实现Context对象与当前线程的绑定contextHolder.set(context);}return context;}...
}public final class SlotChainProvider {private static volatile SlotChainBuilder slotChainBuilder = null;//The load and pick process is not thread-safe, //but it's okay since the method should be only invoked via CtSph.lookProcessChain() under lock.public static ProcessorSlotChain newSlotChain() {//如果存在,则直接返回if (slotChainBuilder != null) {return slotChainBuilder.build();}//Resolve the slot chain builder SPI.//通过SPI机制初始化SlotChainBuilderslotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();if (slotChainBuilder == null) {//Should not go through here.RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");slotChainBuilder = new DefaultSlotChainBuilder();} else {RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}", slotChainBuilder.getClass().getCanonicalName());}return slotChainBuilder.build();}private SlotChainProvider() {}
}@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {@Overridepublic ProcessorSlotChain build() {//创建一个DefaultProcessorSlotChain对象实例ProcessorSlotChain chain = new DefaultProcessorSlotChain();//通过SPI机制加载责任链的节点ProcessorSlot实现类//然后按照@Spi注解的order属性进行排序并进行实例化//最后将ProcessorSlot实例放到sortedSlotList中List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();//遍历已排好序的ProcessorSlot集合for (ProcessorSlot slot : sortedSlotList) {//安全检查,防止业务系统也写了一个SPI文件,但没按规定继承AbstractLinkedProcessorSlotif (!(slot instanceof AbstractLinkedProcessorSlot)) {RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");continue;}//调用DefaultProcessorSlotChain.addLast()方法构建单向链表//将责任链的节点ProcessorSlot实例放入DefaultProcessorSlotChain中chain.addLast((AbstractLinkedProcessorSlot<?>) slot);}//返回单向链表return chain;}
}
在DefaultSlotChainBuilder的build()方法中,从其初始化ProcessorSlotChain的逻辑可知,Entry的处理链的执行入口就是DefaultProcessorSlotChain的entry()方法。
当一个线程调用SphU的entry()方法创建完与接口相关的Entry对象后,就会调用DefaultProcessorSlotChain的entry()方法执行处理链节点的逻辑。因为NodeSelectorSlot是Entry的处理链ProcessorSlotChain的第一个节点,所以接着会调用NodeSelectorSlot的entry()方法。由于处理链中紧接着NodeSelectorSlot的下一个节点是ClusterBuilderSlot,所以执行完NodeSelectorSlot的entry()方法后,会接着执行ClusterBuilderSlot的entry()方法。
public class DefaultProcessorSlotChain extends ProcessorSlotChain {...@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable {//默认情况下会调用处理链的第一个节点NodeSelectorSlot的transformEntry()方法first.transformEntry(context, resourceWrapper, t, count, prioritized, args);}...
}public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {...void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable {T t = (T)o;entry(context, resourceWrapper, t, count, prioritized, args);}...
}
(2)NodeSelectorSlot的源码
NodeSelectorSlot和ClusterBuilderSlot会一起构建Context的资源调用树,资源调用树的作用其实就是用来统计资源的调用数据。
在一个Context对象实例的资源调用树上主要会有如下三类节点:DefaultNode、EntranceNode、ClusterNode,分别对应于:单机里的资源维度、接口维度、集群中的资源维度。
其中DefaultNode会统计名字相同的Context下的某个资源的调用数据,EntranceNode会统计名字相同的Context下的全部资源的调用数据,ClusterNode会统计某个资源在全部Context下的调用数据。
在执行NodeSelectorSlot的entry()方法时,首先会从缓存(NodeSelectorSlot.map属性)中获取一个DefaultNode对象。如果获取不到,再通过DCL机制创建一个DefaultNode对象并更新缓存。其中缓存的key是Context的name,value是DefaultNode对象。由于默认情况下多个线程对应的Context的name都相同,所以多个线程访问同一资源时使用的DefaultNode对象也一样。
在执行ClusterBuilderSlot的entry()方法时,首先会判断缓存是否为null,若是则创建一个ClusterNode对象,然后再将ClusterNode对象设置到DefaultNode对象的clusterNode属性中。
由DefaultNode、EntranceNode、ClusterNode构成的资源调用树:因为DefaultNode是和资源ResourceWrapper以及Context挂钩的,所以DefaultNode应该添加到EntranceNode中。因为ClusterNode和资源挂钩,而不和Context挂钩,所以ClusterNode应该添加到DefaultNode中。
具体的资源调用树构建源码如下:
//This class will try to build the calling traces via:
//adding a new DefaultNode if needed as the last child in the context.
//the context's last node is the current node or the parent node of the context.
//setting itself to the context current node.//It works as follow:
// ContextUtil.enter("entrance1", "appA");
// Entry nodeA = SphU.entry("nodeA");
// if (nodeA != null) {
// nodeA.exit();
// }
// ContextUtil.exit();//Above code will generate the following invocation structure in memory:
// machine-root
// /
// /
// EntranceNode1
// /
// /
// DefaultNode(nodeA)- - - - - -> ClusterNode(nodeA);
//Here the EntranceNode represents "entrance1" given by ContextUtil.enter("entrance1", "appA").
//Both DefaultNode(nodeA) and ClusterNode(nodeA) holds statistics of "nodeA", which is given by SphU.entry("nodeA").
//The ClusterNode is uniquely identified by the ResourceId;
//The DefaultNode is identified by both the resource id and {@link Context}.
//In other words, one resource id will generate multiple DefaultNode for each distinct context,
//but only one ClusterNode.//the following code shows one resource id in two different context:
// ContextUtil.enter("entrance1", "appA");
// Entry nodeA = SphU.entry("nodeA");
// if (nodeA != null) {
// nodeA.exit();
// }
// ContextUtil.exit();
// ContextUtil.enter("entrance2", "appA");
// nodeA = SphU.entry("nodeA");
// if (nodeA != null) {
// nodeA.exit();
// }
// ContextUtil.exit();//Above code will generate the following invocation structure in memory:
// machine-root
// / \
// / \
// EntranceNode1 EntranceNode2
// / \
// / \
// DefaultNode(nodeA) DefaultNode(nodeA)
// | |
// +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA);
//As we can see, two DefaultNode are created for "nodeA" in two context,
//but only one ClusterNode is created.
//We can also check this structure by calling: http://localhost:8719/tree?type=root
@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {//DefaultNodes of the same resource in different context.//缓存map以Context的name为key,DefaultNode为value//由于默认情况下多个线程对应的Context的name都相同,所以多个线程访问资源时使用的DefaultNode也一样private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {//It's interesting that we use context name rather resource name as the map key.//Remember that same resource will share the same ProcessorSlotChain globally, no matter in which context. //Same resource is that ResourceWrapper#equals(Object).//So if code goes into entry(Context, ResourceWrapper, DefaultNode, int, Object...),//the resource name must be same but context name may not.//If we use SphU.entry(String resource)} to enter same resource in different context, //using context name as map key can distinguish the same resource.//In this case, multiple DefaultNodes will be created of the same resource name, //for every distinct context (different context name) each.//Consider another question. One resource may have multiple DefaultNode,//so what is the fastest way to get total statistics of the same resource?//The answer is all DefaultNodes with same resource name share one ClusterNode.//See ClusterBuilderSlot for detail.//先从缓存中获取DefaultNode node = map.get(context.getName());if (node == null) {//使用DCL机制,即Double Check + Lock机制synchronized (this) {node = map.get(context.getName());if (node == null) {//每个线程访问Entry时,都会调用CtSph.entry()方法创建一个ResourceWrapper对象//下面根据ResourceWrapper创建一个DefaultNode对象node = new DefaultNode(resourceWrapper, null);//写时复制更新缓存mapHashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());cacheMap.putAll(map);cacheMap.put(context.getName(), node);map = cacheMap;//Build invocation tree//首先会调用context.getLastNode()方法,获取到的是Context.entranceNode属性即一个EntranceNode对象//EntranceNode对象是在执行ContextUtil.trueEnter()方法创建Context对象实例时添加到Context对象中的//然后会将刚创建的DefaultNode对象添加到EntranceNode对象的childList列表中((DefaultNode) context.getLastNode()).addChild(node);}}}//设置Context的curNode属性为当前获取到或新创建的DefaultNode对象context.setCurNode(node);//触发执行下一个ProcessorSlot,即ClusterBuilderSlotfireEntry(context, resourceWrapper, node, count, prioritized, args);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {fireExit(context, resourceWrapper, count, args);}
}//This slot maintains resource running statistics (response time, qps, thread count, exception),
//and a list of callers as well which is marked by ContextUtil.enter(String origin).
//One resource has only one cluster node, while one resource can have multiple default nodes.
@Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT)
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {//Remember that same resource will share the same ProcessorSlotChain globally, no matter in which context.//Same resource is that ResourceWrapper#equals(Object).//So if code goes into entry(Context, ResourceWrapper, DefaultNode, int, boolean, Object...),//the resource name must be same but context name may not.//To get total statistics of the same resource in different context, //same resource shares the same ClusterNode} globally.//All ClusterNodes are cached in this map.//The longer the application runs, the more stable this mapping will become. //so we don't concurrent map but a lock. //as this lock only happens at the very beginning while concurrent map will hold the lock all the time.private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();private static final Object lock = new Object();private volatile ClusterNode clusterNode = null;@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {if (clusterNode == null) {//使用DCL机制,即Double Check + Lock机制synchronized (lock) {if (clusterNode == null) {//Create the cluster node.//创建ClusterNode对象clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));newMap.putAll(clusterNodeMap);newMap.put(node.getId(), clusterNode);clusterNodeMap = newMap;}}}//设置DefaultNode的clusterNode属性为获取到的ClusterNode对象node.setClusterNode(clusterNode);//if context origin is set, we should get or create a new {@link Node} of the specific origin.if (!"".equals(context.getOrigin())) {Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());context.getCurEntry().setOriginNode(originNode);}//执行下一个ProcessorSlotfireEntry(context, resourceWrapper, node, count, prioritized, args);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {fireExit(context, resourceWrapper, count, args);}...
}
(3)Context对象中存储的资源调用树总结
其实Context对象的属性entranceNode就代表了一棵资源调用树。
首先,在调用ContextUtil的trueEnter()方法创建Context对象实例时,便会创建一个EntranceNode对象并赋值给Context的entranceNode属性,以及调用Constants.ROOT的addChild()方法,将这个EntranceNode对象放入Constants.ROOT的childList列表中。
然后,执行NodeSelectorSlot的entry()方法时,便会创建一个DefaultNode对象。该DefaultNode对象会被添加到Context.entranceNode的childList列表中,也就是前面创建的EntranceNode对象的childList列表中。
接着,执行ClusterBuilderSlot的entry()方法时,便会创建一个ClusterNode对象,该ClusterNode对象会赋值给前面DefaultNode对象中的clusterNode属性。
至此,便构建完Context下的资源调用树了。Constants.ROOT的childList里会存放多个EntranceNode对象,每个EntranceNode对象的childList里会存放多个DefaultNode对象,而每个DefaultNode对象会指向一个ClusterNode对象。
//This class holds metadata of current invocation:
//the EntranceNode: the root of the current invocation tree.
//the current Entry: the current invocation point.
//the current Node: the statistics related to the Entry.
//the origin: The origin is useful when we want to control different invoker/consumer separately.
//Usually the origin could be the Service Consumer's app name or origin IP.//Each SphU.entry() or SphO.entry() should be in a Context,
//if we don't invoke ContextUtil.enter() explicitly, DEFAULT context will be used.
//A invocation tree will be created if we invoke SphU.entry() multi times in the same context.
//Same resource in different context will count separately, see NodeSelectorSlot.
public class Context {//Context name.private final String name;//The entrance node of current invocation tree.private DefaultNode entranceNode;//Current processing entry.private Entry curEntry;//The origin of this context (usually indicate different invokers, e.g. service consumer name or origin IP).private String origin = "";...public Context(DefaultNode entranceNode, String name) {this(name, entranceNode, false);}public Context(String name, DefaultNode entranceNode, boolean async) {this.name = name;this.entranceNode = entranceNode;this.async = async;}//Get the parent Node of the current.public Node getLastNode() {if (curEntry != null && curEntry.getLastNode() != null) {return curEntry.getLastNode();} else {return entranceNode;}}...
}public class ContextUtil {//以Context的name作为key,EntranceNode作为value缓存所有的EntranceNode到HashMap中private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();...protected static Context trueEnter(String name, String origin) {...//从缓存中获取EntranceNodeDefaultNode node = contextNameNodeMap.get(name);...//创建EntranceNode,缓存到contextNameNodeMap当中node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);//将新创建的EntranceNode添加到ROOT中,ROOT就是每个Node的根结点Constants.ROOT.addChild(node);...//初始化Context,将刚获取到或刚创建的EntranceNode放到Context的entranceNode属性中context = new Context(node, name);...}...
}public final class Constants {...//Global ROOT statistic node that represents the universal parent node.public final static DefaultNode ROOT = new EntranceNode(new StringResourceWrapper(ROOT_ID, EntryType.IN),new ClusterNode(ROOT_ID, ResourceTypeConstants.COMMON));...
}//A Node used to hold statistics for specific resource name in the specific context.
//Each distinct resource in each distinct Context will corresponding to a DefaultNode.
//This class may have a list of sub DefaultNodes.
//Child nodes will be created when calling SphU.entry() or SphO.entry() multiple times in the same Context.
public class DefaultNode extends StatisticNode {//The resource associated with the node.private ResourceWrapper id;//The list of all child nodes.private volatile Set<Node> childList = new HashSet<>();//Associated cluster node.private ClusterNode clusterNode;...//Add child node to current node.public void addChild(Node node) {if (node == null) {RecordLog.warn("Trying to add null child to node <{}>, ignored", id.getName());return;}if (!childList.contains(node)) {synchronized (this) {if (!childList.contains(node)) {Set<Node> newSet = new HashSet<>(childList.size() + 1);newSet.addAll(childList);newSet.add(node);childList = newSet;}}RecordLog.info("Add child <{}> to node <{}>", ((DefaultNode)node).id.getName(), id.getName());}}//Reset the child node list.public void removeChildList() {this.childList = new HashSet<>();}...
}@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {//DefaultNodes of the same resource in different context.//缓存map以Context的name为key,DefaultNode为value//由于默认情况下多个线程对应的Context的name都相同,所以多个线程访问资源时使用的DefaultNode也一样private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);...@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {...//先从缓存中获取DefaultNode node = map.get(context.getName());...//下面根据ResourceWrapper创建一个DefaultNode对象node = new DefaultNode(resourceWrapper, null);...//Build invocation tree//首先会调用context.getLastNode()方法,获取到的是Context.entranceNode属性即一个EntranceNode对象//EntranceNode对象是在执行ContextUtil.trueEnter()方法创建Context对象实例时添加到Context对象中的//然后会将刚创建的DefaultNode对象添加到EntranceNode对象的childList列表中((DefaultNode) context.getLastNode()).addChild(node);...//执行下一个ProcessorSlotfireEntry(context, resourceWrapper, node, count, prioritized, args);}...
}@Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT)
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {...private volatile ClusterNode clusterNode = null;@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {...//创建ClusterNode对象clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType()); ...//设置DefaultNode的clusterNode属性为获取到的ClusterNode对象node.setClusterNode(clusterNode);...//执行下一个ProcessorSlotfireEntry(context, resourceWrapper, node, count, prioritized, args);}...
}//资源调用树的示例如下所示:
// machine-root
// / \
// / \
// EntranceNode1 EntranceNode2
// / \
// / \
// DefaultNode(nodeA) DefaultNode(nodeA)
// | |
// +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA);
//其中,machine-root中的childList里会有很多个EntranceNode对象
//EntranceNode对象中的childList里又会有很多个DefaultNode对象
//每个DefaultNode对象下都会指向一个ClusterNode对象
一些对应关系的梳理总结:
一个线程对应一个ResourceWrapper对象实例,一个线程对应一个Context对象实例。如果ResourceWrapper对象相同,则会共用一个ProcessorSlotChain实例。如果ResourceWrapper对象相同,则也会共用一个ClusterNode实例。如果Context对象的名字相同,则会共用一个EntranceNode对象实例。如果Context对象的名字相同,则也会共用一个DefaultNode对象实例。
//每个请求对应一个线程,每个线程绑定一个Context,所以每个请求对应一个Context
private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();//以Context的name作为key,EntranceNode作为value缓存所有的EntranceNode到HashMap中
private static volatile Map<String, EntranceNode> contextNameNodeMap = new HashMap<>();//Same resource will share the same ProcessorSlotChain}, no matter in which Context.
//Same resource is that ResourceWrapper#equals(Object).
private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap = new HashMap<ResourceWrapper, ProcessorSlotChain>();//DefaultNodes of the same resource in different context.
//以Context的name作为key,DefaultNode作为value
//由于默认情况下多个线程对应的Context的name都相同,所以多个线程访问资源时使用的DefaultNode也一样
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);//To get total statistics of the same resource in different context,
//same resource shares the same ClusterNode globally.
//All ClusterNodes are cached in this map.
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
2.LogSlot和StatisticSlot采集资源的数据
(1)LogSlot的源码
(2)StatisticSlot的源码
(3)记录资源在不同维度下的调用数据
(1)LogSlot的源码
LogSlot用于记录异常请求日志,以便于故障排查。也就是当出现BlockException异常时,调用EagleEyeLogUtil的log()方法将日志写到sentinel-block.log文件中。
//A ProcessorSlot that is response for logging block exceptions to provide concrete logs for troubleshooting.
@Spi(order = Constants.ORDER_LOG_SLOT)
public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args) throws Throwable {try {//调用下一个ProcessorSlotfireEntry(context, resourceWrapper, obj, count, prioritized, args);} catch (BlockException e) {//被流控或者熔断降级后打印log日志EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(), context.getOrigin(), e.getRule().getId(), count);throw e;} catch (Throwable e) {RecordLog.warn("Unexpected entry exception", e);}}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {try {//调用下一个ProcessorSlotfireExit(context, resourceWrapper, count, args);} catch (Throwable e) {RecordLog.warn("Unexpected entry exit exception", e);}}
}public class EagleEyeLogUtil {public static final String FILE_NAME = "sentinel-block.log";private static StatLogger statLogger;static {String path = LogBase.getLogBaseDir() + FILE_NAME;statLogger = EagleEye.statLoggerBuilder("sentinel-block-log").intervalSeconds(1).entryDelimiter('|').keyDelimiter(',').valueDelimiter(',').maxEntryCount(6000).configLogFilePath(path).maxFileSizeMB(300).maxBackupIndex(3).buildSingleton();}public static void log(String resource, String exceptionName, String ruleLimitApp, String origin, Long ruleId, int count) {String ruleIdString = StringUtil.EMPTY;if (ruleId != null) {ruleIdString = String.valueOf(ruleId);}statLogger.stat(resource, exceptionName, ruleLimitApp, origin, ruleIdString).count(count);}
}
(2)StatisticSlot的源码
StatisticSlot用于统计资源的调用数据,如请求成功数、请求失败数、响应时间等。
注意:开始对请求进行规则验证时,需要调用SphU的entry()方法。完成对请求的规则验证后,也需要调用Entry的exit()方法。
//A processor slot that dedicates to real time statistics.
//When entering this slot, we need to separately count the following information:
//ClusterNode: total statistics of a cluster node of the resource ID.
//Origin node: statistics of a cluster node from different callers/origins.
//DefaultNode: statistics for specific resource name in the specific context.
//Finally, the sum statistics of all entrances.
@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {try {//Do some checking.//执行下一个ProcessorSlot,先进行规则验证等fireEntry(context, resourceWrapper, node, count, prioritized, args);//Request passed, add thread count and pass count.//如果通过了后面ProcessorSlot的验证//则将处理当前资源resourceWrapper的线程数 + 1 以及 将对当前资源resourceWrapper的成功请求数 + 1node.increaseThreadNum();node.addPassRequest(count);if (context.getCurEntry().getOriginNode() != null) {//Add count for origin node.context.getCurEntry().getOriginNode().increaseThreadNum();context.getCurEntry().getOriginNode().addPassRequest(count);}if (resourceWrapper.getEntryType() == EntryType.IN) {//Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseThreadNum();Constants.ENTRY_NODE.addPassRequest(count);}//Handle pass event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onPass(context, resourceWrapper, node, count, args);}} catch (PriorityWaitException ex) {node.increaseThreadNum();if (context.getCurEntry().getOriginNode() != null) {//Add count for origin node.context.getCurEntry().getOriginNode().increaseThreadNum();}if (resourceWrapper.getEntryType() == EntryType.IN) {//Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseThreadNum();}//Handle pass event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onPass(context, resourceWrapper, node, count, args);}} catch (BlockException e) {//捕获BlockException//Blocked, set block exception to current entry.context.getCurEntry().setBlockError(e);//Add block count.//如果规则验证失败,则将BlockQps+1node.increaseBlockQps(count);if (context.getCurEntry().getOriginNode() != null) {context.getCurEntry().getOriginNode().increaseBlockQps(count);}if (resourceWrapper.getEntryType() == EntryType.IN) {//Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseBlockQps(count);}//Handle block event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onBlocked(e, context, resourceWrapper, node, count, args);}throw e;} catch (Throwable e) {//Unexpected internal error, set error to current entry.context.getCurEntry().setError(e);throw e;}}//开始对请求进行规则验证时,需要调用SphU.entry()方法//完成对请求的规则验证后,也需要调用Entry.exit()方法@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {Node node = context.getCurNode();if (context.getCurEntry().getBlockError() == null) {//Calculate response time (use completeStatTime as the time of completion).//获取系统当前时间long completeStatTime = TimeUtil.currentTimeMillis();context.getCurEntry().setCompleteTimestamp(completeStatTime);//计算响应时间 = 系统当前事件 - 根据资源resourceWrapper创建Entry资源访问对象时的时间long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();Throwable error = context.getCurEntry().getError();//Record response time and success count.//记录响应时间等信息recordCompleteFor(node, count, rt, error);recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error);if (resourceWrapper.getEntryType() == EntryType.IN) {recordCompleteFor(Constants.ENTRY_NODE, count, rt, error);}}//Handle exit event with registered exit callback handlers.Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();for (ProcessorSlotExitCallback handler : exitCallbacks) {handler.onExit(context, resourceWrapper, count, args);}fireExit(context, resourceWrapper, count, args);}private void recordCompleteFor(Node node, int batchCount, long rt, Throwable error) {if (node == null) {return;}node.addRtAndSuccess(rt, batchCount);node.decreaseThreadNum();if (error != null && !(error instanceof BlockException)) {node.increaseExceptionQps(batchCount);}}
}
(3)记录资源在不同维度下的调用数据
一.如何统计单机里某个资源的调用数据
二.如何统计所有资源的调用数据即接口调用数据
三.如何统计集群中某个资源的调用数据
一.如何统计单机里某个资源的调用数据
由于DefaultNode会统计名字相同的Context下的某个资源的调用数据,它是按照单机里的资源维度进行调用数据统计的,所以在StatisticSlot的entry()方法中,会调用DefaultNode的方法来进行统计。
//A Node used to hold statistics for specific resource name in the specific context.
//Each distinct resource in each distinct Context will corresponding to a DefaultNode.
//This class may have a list of sub DefaultNodes.
//Child nodes will be created when calling SphU.entry() or SphO.entry() multiple times in the same Context.
public class DefaultNode extends StatisticNode {//The resource associated with the node.private ResourceWrapper id;//Associated cluster node.private ClusterNode clusterNode;...@Overridepublic void increaseThreadNum() {super.increaseThreadNum();this.clusterNode.increaseThreadNum();}@Overridepublic void addPassRequest(int count) {super.addPassRequest(count);this.clusterNode.addPassRequest(count);}@Overridepublic void increaseBlockQps(int count) {super.increaseBlockQps(count);this.clusterNode.increaseBlockQps(count);}@Overridepublic void addRtAndSuccess(long rt, int successCount) {super.addRtAndSuccess(rt, successCount);this.clusterNode.addRtAndSuccess(rt, successCount);}@Overridepublic void decreaseThreadNum() {super.decreaseThreadNum();this.clusterNode.decreaseThreadNum();}...
}public class StatisticNode implements Node {//The counter for thread count.private LongAdder curThreadNum = new LongAdder();//Holds statistics of the recent INTERVAL milliseconds. //The INTERVAL is divided into time spans by given sampleCount.private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);//Holds statistics of the recent 60 seconds. //The windowLengthInMs is deliberately set to 1000 milliseconds,//meaning each bucket per second, in this way we can get accurate statistics of each second.private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);...@Overridepublic void increaseThreadNum() {curThreadNum.increment();}@Overridepublic void addPassRequest(int count) {rollingCounterInSecond.addPass(count);rollingCounterInMinute.addPass(count);}@Overridepublic void increaseBlockQps(int count) {rollingCounterInSecond.addBlock(count);rollingCounterInMinute.addBlock(count);}@Overridepublic void addRtAndSuccess(long rt, int successCount) {rollingCounterInSecond.addSuccess(successCount);rollingCounterInSecond.addRT(rt);rollingCounterInMinute.addSuccess(successCount);rollingCounterInMinute.addRT(rt);}@Overridepublic void decreaseThreadNum() {curThreadNum.decrement();}...
}
二.如何统计所有资源的调用数据即接口调用数据
由于EntranceNode会统计名字相同的Context下的全部资源的调用数据,它是按接口维度来统计调用数据的,即统计接口下所有资源的调用情况,所以可以通过遍历EntranceNode的childList来统计接口的调用数据。
//A Node represents the entrance of the invocation tree.
//One Context will related to a EntranceNode,
//which represents the entrance of the invocation tree.
//New EntranceNode will be created if current context does't have one.
//Note that same context name will share same EntranceNode globally.
public class EntranceNode extends DefaultNode {public EntranceNode(ResourceWrapper id, ClusterNode clusterNode) {super(id, clusterNode);}@Overridepublic double avgRt() {double total = 0;double totalQps = 0;for (Node node : getChildList()) {total += node.avgRt() * node.passQps();totalQps += node.passQps();}return total / (totalQps == 0 ? 1 : totalQps);}@Overridepublic double blockQps() {double blockQps = 0;for (Node node : getChildList()) {blockQps += node.blockQps();}return blockQps;}@Overridepublic long blockRequest() {long r = 0;for (Node node : getChildList()) {r += node.blockRequest();}return r;}@Overridepublic int curThreadNum() {int r = 0;for (Node node : getChildList()) {r += node.curThreadNum();}return r;}@Overridepublic double totalQps() {double r = 0;for (Node node : getChildList()) {r += node.totalQps();}return r;}@Overridepublic double successQps() {double r = 0;for (Node node : getChildList()) {r += node.successQps();}return r;}@Overridepublic double passQps() {double r = 0;for (Node node : getChildList()) {r += node.passQps();}return r;}@Overridepublic long totalRequest() {long r = 0;for (Node node : getChildList()) {r += node.totalRequest();}return r;}@Overridepublic long totalPass() {long r = 0;for (Node node : getChildList()) {r += node.totalPass();}return r;}
}
三.如何统计集群中某个资源的调用数据
由于ClusterNode会统计某个资源在全部Context下的调用数据,它是按照集群中的资源维度进行调用数据统计的,而StatisticSlot的entry()调用DefaultNode的方法统计单机下的资源时,会顺便调用ClusterNode的方法来统计集群下的资源调用,所以通过ClusterNode就可以获取集群中某个资源的调用数据。
//A Node used to hold statistics for specific resource name in the specific context.
//Each distinct resource in each distinct Context will corresponding to a DefaultNode.
//This class may have a list of sub DefaultNodes.
//Child nodes will be created when calling SphU.entry() or SphO.entry() multiple times in the same Context.
public class DefaultNode extends StatisticNode {//The resource associated with the node.private ResourceWrapper id;//Associated cluster node.private ClusterNode clusterNode;...@Overridepublic void increaseThreadNum() {super.increaseThreadNum();this.clusterNode.increaseThreadNum();}@Overridepublic void addPassRequest(int count) {super.addPassRequest(count);this.clusterNode.addPassRequest(count);}@Overridepublic void increaseBlockQps(int count) {super.increaseBlockQps(count);this.clusterNode.increaseBlockQps(count);}@Overridepublic void addRtAndSuccess(long rt, int successCount) {super.addRtAndSuccess(rt, successCount);this.clusterNode.addRtAndSuccess(rt, successCount);}@Overridepublic void decreaseThreadNum() {super.decreaseThreadNum();this.clusterNode.decreaseThreadNum();}...
}