当前位置: 首页 > news >正文

Spring Cloud Alibaba Sentinel 基本工作原理源码阅读

SpiLoader

SPI 是一种服务发现机制,它允许第三方为某个接口提供实现,而这些实现可以在运行时被发现和加载。简单来说,就是定义一个接口,然后允许别人提供这个接口的具体实现,而主程序不需要在编译时就知道这些实现类的具体细节,只在运行时去发现和加载它们。 Sentinel 并没有直接使用 JDK 内置的 ServiceLoader,而是自己实现了一套 SpiLoader。这主要是因为 Sentinel 的 SpiLoader 提供了更灵活的控制,例如:

排序 (@Spi 注解的 order 属性): 可以控制加载的实现类的优先级,从而决定它们在 Slot Chain 中的执行顺序。

单例 (@Spi 注解的 isSingleton 属性): 可以指定某个 SPI 实现是否是单例模式。

按需加载: 更好地控制加载时机。

控制台通讯

服务端与控制台初始化都是通过InitFunc来实现,InitFunc是一个接口,服务加载是通过SpiLoader来加载。默认懒加载在第一次调用时加载Env类静态块执行,如果配置了非懒加载在自动装配类SentinelWebAutoConfiguration的init方法也会启动时加载

public class Env {public static final Sph sph = new CtSph();static {// If init fails, the process will exit.InitExecutor.doInit();}}

这里doInit()方法会获取所有配置的InitFunc服务进行初始化。

            List<InitFunc> initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();List<OrderWrapper> initList = new ArrayList<OrderWrapper>();for (InitFunc initFunc : initFuncs) {RecordLog.info("[InitExecutor] Found init func: {}", initFunc.getClass().getCanonicalName());insertSorted(initList, initFunc);}for (OrderWrapper w : initList) {w.func.init();//调用init方法初始化RecordLog.info("[InitExecutor] Executing {} with order {}",w.func.getClass().getCanonicalName(), w.order);}

在sentinel-transport-common-1.8.5.jar包的META-INF/services下就定义了两个和控制台通讯相关的InitFunc:CommandCenterInitFunc和HeartbeatSenderInitFunc。

心跳检测

HeartbeatSenderInitFunc主要建立应用控制台和应用程序直接的心跳检测。将客户端的消息通讯地址发送给控制台。这个在控制台的机器列表可以看到对应信息。

来看HeartbeatSenderInitFunc.init()方法:

    public void init() {//这里还是通过SpiLoader获取HeartbeatSender的服务实例HeartbeatSender的服务实例 sender = HeartbeatSenderProvider.getHeartbeatSender();if (sender == null) {RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");return;}//初始化定时线程池ScheduledThreadPoolExecutorinitSchedulerIfNeeded();long interval = retrieveInterval(sender);setIntervalIfNotExists(interval);//启动定时发送心跳scheduleHeartbeatTask(sender, interval);}

启动定时方法:

    private void scheduleHeartbeatTask(/*@NonNull*/ final HeartbeatSender sender, /*@Valid*/ long interval) {pool.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {sender.sendHeartbeat();} catch (Throwable e) {RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);}}}, 5000, interval, TimeUnit.MILLISECONDS);RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: "+ sender.getClass().getCanonicalName());}

这里看到每隔5秒发送一次心跳包。

HeartbeatSender的spi在sentinel-transport-simple-http-1.8.5.jar包中定义实现为SimpleHttpHeartbeatSender。来看下具体心跳包发送内容。

SimpleHttpHeartbeatSender.sendHeartbeat()主要代码如下

    public boolean sendHeartbeat() throws Exception {//获取控制台地址Endpoint addrInfo = getAvailableAddress();//构造请求对象SimpleHttpRequest request = new SimpleHttpRequest(addrInfo, TransportConfig.getHeartbeatApiPath());//设置心跳报文request.setParams(heartBeat.generateCurrentMessage());//发送心跳请求SimpleHttpResponse response = httpClient.post(request);     return false;}

这里heartBeat变量是HeartbeatMessage类型

HeartbeatMessage#generateCurrentMessage()

    public HeartbeatMessage() {message.put("hostname", HostNameUtil.getHostName());message.put("ip", TransportConfig.getHeartbeatClientIp());message.put("app", AppNameUtil.getAppName());// Put application type (since 1.6.0).message.put("app_type", String.valueOf(SentinelConfig.getAppType()));message.put("port", String.valueOf(TransportConfig.getPort()));}    
public Map<String, String> generateCurrentMessage() {// Version of Sentinel.message.put("v", Constants.SENTINEL_VERSION);// Actually timestamp.message.put("version", String.valueOf(TimeUtil.currentTimeMillis()));message.put("port", String.valueOf(TransportConfig.getPort()));return message;}

这里将本服务的ip和端口信息发送给console,这样控制台就可以与应用程序通过该端口进行通讯。

数据通讯

应用端通讯服务初始化是通过CommandCenterInitFunc。

CommandCenterInitFunc.init()方法调用CommandCenterProvider.getCommandCenter()获取CommandCenter。最后通过SpiLoader.of(CommandCenter.class).loadHighestPriorityInstance();还是SPI机制。这个默认配置在 sentinel-transport-simple-http-1.8.5.jar包中

默认的CommandCenter是SimpleHttpCommandCenter。

SimpleHttpCommandCenter的start()方法会启动一个ServerSocket来和sentinel的console来进行通讯。默认端口是8719,可通过csp.sentinel.api.port指定。这样console会不断从应用拉取流量控制数据,并且在console端配置新的流量规则可以推送到应用端。

有请求指令到来时会交给HttpEventTask来执行具体指令,不同的指令由不同的CommandHandler实现类来处理,像修改流量控制会使用ModifyRulesCommandHandler来处理。ModifyRulesCommandHandler.handler()方法根据不同的流控规则来更新内存控制规则。

ModifyRulesCommandHandler.handler()

    public CommandResponse<String> handle(CommandRequest request) {String type = request.getParam("type");// rule data in get parameterString data = request.getParam("data");...String result = "success";if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);//流量控制规则刷新FlowRuleManager.loadRules(flowRules);if (!writeToDataSource(getFlowDataSource(), flowRules)) {result = WRITE_DS_FAILURE_MSG;}return CommandResponse.ofSuccess(result);} else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) {List<AuthorityRule> rules = JSONArray.parseArray(data, AuthorityRule.class);//黑白名单刷新AuthorityRuleManager.loadRules(rules);if (!writeToDataSource(getAuthorityDataSource(), rules)) {result = WRITE_DS_FAILURE_MSG;}return CommandResponse.ofSuccess(result);} else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) {List<DegradeRule> rules = JSONArray.parseArray(data, DegradeRule.class);//降级规则刷新DegradeRuleManager.loadRules(rules);if (!writeToDataSource(getDegradeDataSource(), rules)) {result = WRITE_DS_FAILURE_MSG;}return CommandResponse.ofSuccess(result);} else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) {List<SystemRule> rules = JSONArray.parseArray(data, SystemRule.class);//系统规则刷新SystemRuleManager.loadRules(rules);if (!writeToDataSource(getSystemSource(), rules)) {result = WRITE_DS_FAILURE_MSG;}return CommandResponse.ofSuccess(result);}return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));}

流量控制工作原理

在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain)链表。

springboot自动装配通过SentinelWebAutoConfiguration类自动装配SentinelWebInterceptor。SentinelWebInterceptor实现了HandlerInterceptor接口,是一个拦截器,在请求到达controller之前通过preHandle()可以执行自定义操作。

AbstractSentinelInterceptor#preHandle()

    @Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)throws Exception {try {//请求资源名称String resourceName = getResourceName(request);if (StringUtil.isEmpty(resourceName)) {return true;}if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {return true;}// Parse the request origin using registered origin parser.String origin = parseOrigin(request);String contextName = getContextName(request);ContextUtil.enter(contextName, origin);//最重要的方法,内部调用不同的功能slot进行流量控制Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);return true;} catch (BlockException e) {//如果发生BlockException,则触发流量控制规则try {handleBlockException(request, response, e);} finally {ContextUtil.exit();}return false;}}

SphU.entry()

SphU的默认实例是CtSph,最后会进入其entryWithPriority()方法

    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)throws BlockException {Context context = ContextUtil.getContext();if (context instanceof NullContext) {// The {@link NullContext} indicates that the amount of context has exceeded the threshold,// so here init the entry only. No rule checking will be done.return new CtEntry(resourceWrapper, null, context);}if (context == null) {// Using default context.context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);}// Global switch is close, no rule checking will do.if (!Constants.ON) {return new CtEntry(resourceWrapper, null, context);}//获取执行链ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);/** Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},* so no rule checking will be done.*/if (chain == null) {return new CtEntry(resourceWrapper, null, context);}Entry e = new CtEntry(resourceWrapper, chain, context);try {chain.entry(context, resourceWrapper, null, count, prioritized, args);} catch (BlockException e1) {e.exit(count, args);throw e1;} catch (Throwable e1) {// This should not happen, unless there are errors existing in Sentinel internal.RecordLog.info("Sentinel unexpected exception", e1);}return e;}

lookProcessChain()方法通过SlotChainProvider.newSlotChain()实例slot链,最后通过

SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault().build()

使用SPI机制从classpath中加载默认的slot链。

在sentinel-core-1.8.5.jar包中有对应的默认配置文件

META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot

默认配置内容

# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

每一个slot都有不同的作用,作为功能链表依次进行调用。

  • NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;

  • ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;

  • StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息;

  • FlowSlot 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;

  • AuthoritySlot 则根据配置的黑白名单和调用来源信息,来做黑白名单控制;

  • DegradeSlot 则通过统计信息以及预设的规则,来做熔断降级;

  • SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量;


文章转载自:
http://beneficially .wsgyq.cn
http://dogmatic .wsgyq.cn
http://unhonored .wsgyq.cn
http://auditor .wsgyq.cn
http://archangelic .wsgyq.cn
http://ashes .wsgyq.cn
http://egis .wsgyq.cn
http://duvetine .wsgyq.cn
http://orans .wsgyq.cn
http://unimplemented .wsgyq.cn
http://amends .wsgyq.cn
http://irrelated .wsgyq.cn
http://meclizine .wsgyq.cn
http://chrysalid .wsgyq.cn
http://tort .wsgyq.cn
http://emporium .wsgyq.cn
http://inanimation .wsgyq.cn
http://auc .wsgyq.cn
http://minah .wsgyq.cn
http://pricker .wsgyq.cn
http://etu .wsgyq.cn
http://zoantharia .wsgyq.cn
http://pretender .wsgyq.cn
http://pimp .wsgyq.cn
http://countryseat .wsgyq.cn
http://shopboy .wsgyq.cn
http://entanglemant .wsgyq.cn
http://cracker .wsgyq.cn
http://culminate .wsgyq.cn
http://seek .wsgyq.cn
http://www.dtcms.com/a/293755.html

相关文章:

  • MACOS安装配置Gradle
  • 国产数据库转向 “融合” 赛道:电科金仓的下一代形态定义之路
  • 基于Matlab传统图像处理技术的车辆车型识别与分类方法研究
  • 资本押注会成长的玩具,AI潮玩赛道开始升温
  • 华为云ELB(弹性负载均衡)持续报异常
  • 永磁同步电机控制算法--弱磁控制(负载能力最大化的定交轴)
  • 【C++】C++ 的入门语法知识1
  • 在easyui中如何设置自带的弹窗,有输入框
  • 解决Spring事务中RPC调用无法回滚的问题
  • 零基础学编程,编程从入门到精通系列教程,附:编程工具箱之时间计算构件的用法#零基础自学编程 学习计划#新手学编程 高效学习方法
  • HF83311_VB1/HF83311Q_VB1:高性能USB HiFi音频解码器固件技术解析
  • Leetcode-15. 三数之和
  • 《计算机“十万个为什么”》之 [特殊字符] 深浅拷贝 引用拷贝:内存世界的复制魔法 ✨
  • 1.1 Deep learning?pytorch ?深度学习训练出来的模型通常有效但无法解释合理性? 如何 解释?
  • 英语词汇积累Day1-10(summary)
  • Django实战:Python代码规范指南
  • 【Java】Reflection反射(代理模式)
  • 算法竞赛备赛——【图论】最小生成树
  • 《元素周期表》超高清PDF
  • IDEA如何管理多个Java版本。
  • STM32 基础知识 定时器【概念】
  • 基于PyTorch的多视角二维流场切片三维流场预测模型
  • 【NLP舆情分析】基于python微博舆情分析可视化系统(flask+pandas+echarts) 视频教程 - 主页-微博点赞量Top6实现
  • 19.动态路由协议基础
  • 备受关注的“Facebook Email Scraper”如何操作?
  • 开源 Arkts 鸿蒙应用 开发(十)通讯--Http
  • 腾讯云推出CodeBuddy:革新AI全栈开发体验
  • 第六章 W55MH32 UDP Multicast示例
  • 神经架构搜索革命:从动态搜索到高性能LLM的蜕变之路
  • AI 搜索引擎:让信息“长脑子”而不是“堆数据”