Spring Cloud Alibaba Sentinel 基本工作原理源码阅读
SpiLoader
SPI 是一种服务发现机制,它允许第三方为某个接口提供实现,而这些实现可以在运行时被发现和加载。简单来说,就是定义一个接口,然后允许别人提供这个接口的具体实现,而主程序不需要在编译时就知道这些实现类的具体细节,只在运行时去发现和加载它们。 Sentinel 并没有直接使用 JDK 内置的 ServiceLoader
,而是自己实现了一套 SpiLoader
。这主要是因为 Sentinel 的 SpiLoader
提供了更灵活的控制,例如:
排序 (@Spi 注解的 order 属性): 可以控制加载的实现类的优先级,从而决定它们在 Slot Chain 中的执行顺序。
单例 (@Spi 注解的 isSingleton 属性): 可以指定某个 SPI 实现是否是单例模式。
按需加载: 更好地控制加载时机。
控制台通讯
服务端与控制台初始化都是通过InitFunc来实现,InitFunc是一个接口,服务加载是通过SpiLoader来加载。默认懒加载在第一次调用时加载Env类静态块执行,如果配置了非懒加载在自动装配类SentinelWebAutoConfiguration的init方法也会启动时加载
public class Env {public static final Sph sph = new CtSph();static {// If init fails, the process will exit.InitExecutor.doInit();}}
这里doInit()方法会获取所有配置的InitFunc服务进行初始化。
List<InitFunc> initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();List<OrderWrapper> initList = new ArrayList<OrderWrapper>();for (InitFunc initFunc : initFuncs) {RecordLog.info("[InitExecutor] Found init func: {}", initFunc.getClass().getCanonicalName());insertSorted(initList, initFunc);}for (OrderWrapper w : initList) {w.func.init();//调用init方法初始化RecordLog.info("[InitExecutor] Executing {} with order {}",w.func.getClass().getCanonicalName(), w.order);}
在sentinel-transport-common-1.8.5.jar包的META-INF/services下就定义了两个和控制台通讯相关的InitFunc:CommandCenterInitFunc和HeartbeatSenderInitFunc。
心跳检测
HeartbeatSenderInitFunc主要建立应用控制台和应用程序直接的心跳检测。将客户端的消息通讯地址发送给控制台。这个在控制台的机器列表可以看到对应信息。
来看HeartbeatSenderInitFunc.init()方法:
public void init() {//这里还是通过SpiLoader获取HeartbeatSender的服务实例HeartbeatSender的服务实例 sender = HeartbeatSenderProvider.getHeartbeatSender();if (sender == null) {RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");return;}//初始化定时线程池ScheduledThreadPoolExecutorinitSchedulerIfNeeded();long interval = retrieveInterval(sender);setIntervalIfNotExists(interval);//启动定时发送心跳scheduleHeartbeatTask(sender, interval);}
启动定时方法:
private void scheduleHeartbeatTask(/*@NonNull*/ final HeartbeatSender sender, /*@Valid*/ long interval) {pool.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {sender.sendHeartbeat();} catch (Throwable e) {RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);}}}, 5000, interval, TimeUnit.MILLISECONDS);RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: "+ sender.getClass().getCanonicalName());}
这里看到每隔5秒发送一次心跳包。
HeartbeatSender的spi在sentinel-transport-simple-http-1.8.5.jar包中定义实现为SimpleHttpHeartbeatSender。来看下具体心跳包发送内容。
SimpleHttpHeartbeatSender.sendHeartbeat()主要代码如下
public boolean sendHeartbeat() throws Exception {//获取控制台地址Endpoint addrInfo = getAvailableAddress();//构造请求对象SimpleHttpRequest request = new SimpleHttpRequest(addrInfo, TransportConfig.getHeartbeatApiPath());//设置心跳报文request.setParams(heartBeat.generateCurrentMessage());//发送心跳请求SimpleHttpResponse response = httpClient.post(request); return false;}
这里heartBeat变量是HeartbeatMessage类型
HeartbeatMessage#generateCurrentMessage()
public HeartbeatMessage() {message.put("hostname", HostNameUtil.getHostName());message.put("ip", TransportConfig.getHeartbeatClientIp());message.put("app", AppNameUtil.getAppName());// Put application type (since 1.6.0).message.put("app_type", String.valueOf(SentinelConfig.getAppType()));message.put("port", String.valueOf(TransportConfig.getPort()));}
public Map<String, String> generateCurrentMessage() {// Version of Sentinel.message.put("v", Constants.SENTINEL_VERSION);// Actually timestamp.message.put("version", String.valueOf(TimeUtil.currentTimeMillis()));message.put("port", String.valueOf(TransportConfig.getPort()));return message;}
这里将本服务的ip和端口信息发送给console,这样控制台就可以与应用程序通过该端口进行通讯。
数据通讯
应用端通讯服务初始化是通过CommandCenterInitFunc。
CommandCenterInitFunc.init()方法调用CommandCenterProvider.getCommandCenter()获取CommandCenter。最后通过SpiLoader.of(CommandCenter.class).loadHighestPriorityInstance();还是SPI机制。这个默认配置在 sentinel-transport-simple-http-1.8.5.jar包中
默认的CommandCenter是SimpleHttpCommandCenter。
SimpleHttpCommandCenter的start()方法会启动一个ServerSocket来和sentinel的console来进行通讯。默认端口是8719,可通过csp.sentinel.api.port指定。这样console会不断从应用拉取流量控制数据,并且在console端配置新的流量规则可以推送到应用端。
有请求指令到来时会交给HttpEventTask来执行具体指令,不同的指令由不同的CommandHandler实现类来处理,像修改流量控制会使用ModifyRulesCommandHandler来处理。ModifyRulesCommandHandler.handler()方法根据不同的流控规则来更新内存控制规则。
ModifyRulesCommandHandler.handler()
public CommandResponse<String> handle(CommandRequest request) {String type = request.getParam("type");// rule data in get parameterString data = request.getParam("data");...String result = "success";if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);//流量控制规则刷新FlowRuleManager.loadRules(flowRules);if (!writeToDataSource(getFlowDataSource(), flowRules)) {result = WRITE_DS_FAILURE_MSG;}return CommandResponse.ofSuccess(result);} else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) {List<AuthorityRule> rules = JSONArray.parseArray(data, AuthorityRule.class);//黑白名单刷新AuthorityRuleManager.loadRules(rules);if (!writeToDataSource(getAuthorityDataSource(), rules)) {result = WRITE_DS_FAILURE_MSG;}return CommandResponse.ofSuccess(result);} else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) {List<DegradeRule> rules = JSONArray.parseArray(data, DegradeRule.class);//降级规则刷新DegradeRuleManager.loadRules(rules);if (!writeToDataSource(getDegradeDataSource(), rules)) {result = WRITE_DS_FAILURE_MSG;}return CommandResponse.ofSuccess(result);} else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) {List<SystemRule> rules = JSONArray.parseArray(data, SystemRule.class);//系统规则刷新SystemRuleManager.loadRules(rules);if (!writeToDataSource(getSystemSource(), rules)) {result = WRITE_DS_FAILURE_MSG;}return CommandResponse.ofSuccess(result);}return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));}
流量控制工作原理
在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName
),每次资源调用都会创建一个 Entry
对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU
API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain)链表。
springboot自动装配通过SentinelWebAutoConfiguration类自动装配SentinelWebInterceptor。SentinelWebInterceptor实现了HandlerInterceptor接口,是一个拦截器,在请求到达controller之前通过preHandle()可以执行自定义操作。
AbstractSentinelInterceptor#preHandle()
@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)throws Exception {try {//请求资源名称String resourceName = getResourceName(request);if (StringUtil.isEmpty(resourceName)) {return true;}if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {return true;}// Parse the request origin using registered origin parser.String origin = parseOrigin(request);String contextName = getContextName(request);ContextUtil.enter(contextName, origin);//最重要的方法,内部调用不同的功能slot进行流量控制Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);return true;} catch (BlockException e) {//如果发生BlockException,则触发流量控制规则try {handleBlockException(request, response, e);} finally {ContextUtil.exit();}return false;}}
SphU.entry()
SphU的默认实例是CtSph,最后会进入其entryWithPriority()方法
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)throws BlockException {Context context = ContextUtil.getContext();if (context instanceof NullContext) {// The {@link NullContext} indicates that the amount of context has exceeded the threshold,// so here init the entry only. No rule checking will be done.return new CtEntry(resourceWrapper, null, context);}if (context == null) {// Using default context.context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);}// Global switch is close, no rule checking will do.if (!Constants.ON) {return new CtEntry(resourceWrapper, null, context);}//获取执行链ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);/** Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},* so no rule checking will be done.*/if (chain == null) {return new CtEntry(resourceWrapper, null, context);}Entry e = new CtEntry(resourceWrapper, chain, context);try {chain.entry(context, resourceWrapper, null, count, prioritized, args);} catch (BlockException e1) {e.exit(count, args);throw e1;} catch (Throwable e1) {// This should not happen, unless there are errors existing in Sentinel internal.RecordLog.info("Sentinel unexpected exception", e1);}return e;}
lookProcessChain()方法通过SlotChainProvider.newSlotChain()实例slot链,最后通过
SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault().build()
使用SPI机制从classpath中加载默认的slot链。
在sentinel-core-1.8.5.jar包中有对应的默认配置文件
META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot
默认配置内容
# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
每一个slot都有不同的作用,作为功能链表依次进行调用。
-
NodeSelectorSlot
负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级; -
ClusterBuilderSlot
则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据; -
StatisticSlot
则用于记录、统计不同纬度的 runtime 指标监控信息; -
FlowSlot
则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制; -
AuthoritySlot
则根据配置的黑白名单和调用来源信息,来做黑白名单控制; -
DegradeSlot
则通过统计信息以及预设的规则,来做熔断降级; -
SystemSlot
则通过系统的状态,例如 load1 等,来控制总的入口流量;