当前位置: 首页 > news >正文

Spring Cloud Alibaba Sentinel 源码阅读之流量控制算法

流量统计数据结构

几个重要的类

每一个资源的访问统计都被封装成一个StatisticNode用来记录各项指标

class StatisticNode{Metric rollingCounterInSecond = new ArrayMetric(2,1000);
}

每个StatisticNode中有一个ArrayMetric负责具体的数据统计

class ArrayMetric{LeapArray<MetricBucket> data = new OccupiableBucketLeapArray(2, 1000);
}

LeapArray 负责维护一个循环数组,每个元素代表一个时间窗口的数据

class LeapArray{protected int windowLengthInMs;//单个窗口的长度(毫秒)protected int sampleCount;// 窗口数量protected int intervalInMs;//总的统计时长 = windowLengthInMs * sampleCountprivate double intervalInSecond;protected final AtomicReferenceArray<WindowWrap<T>> array;public LeapArray(int sampleCount, int intervalInMs) {this.windowLengthInMs = intervalInMs / sampleCount;this.intervalInMs = intervalInMs;this.intervalInSecond = intervalInMs / 1000.0;this.sampleCount = sampleCount;this.array = new AtomicReferenceArray<>(sampleCount);}
}

WindowWrap: 是 LeapArray 数组中的一个元素,它包装了一个 MetricBucket 以及该窗口的开始时间戳。

public class WindowWrap<T> {private final long windowStart; // 窗口的开始时间private final int windowLength; // 窗口的长度private volatile T value;       // 实际存储的 MetricBucket
}

MetricBucket: 存储在一个时间窗口内(例如 200ms)的各项指标,如通过请求数、拒绝请求数、异常数等。它内部使用 LongAdder 来保证并发安全地更新计数。

StatisticSlot流量统计

看完了数据结构,继续来看计数流程。开始位置还要冲slot的entry方法

StatisticSlot#entry()

    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {try {//调用slot链fireEntry(context, resourceWrapper, node, count, prioritized, args);// 请求通过,增加线程数和请求通过数node.increaseThreadNum();node.addPassRequest(count);...} catch (PriorityWaitException ex) {node.increaseThreadNum();...} catch (BlockException e) {// Blocked, set block exception to current entry.context.getCurEntry().setBlockError(e);// 添加阻塞数node.increaseBlockQps(count);...throw e;} catch (Throwable e) {...}}

这里不看其他的,就看node.addPassRequest(count);这一步请求通过计数。

这里node继承自StatisticNode会掉到其addPassRequest方法

StatisticNode#addPassRequest()

public void addPassRequest(int count) {rollingCounterInSecond.addPass(count);rollingCounterInMinute.addPass(count);}

根据上面的数据结构,rollingCounterInSecond是ArrayMetric实例,

ArrayMetric#addPass

public void addPass(int count) {WindowWrap<MetricBucket> wrap = data.currentWindow();wrap.value().addPass(count);
}

这里data实例是LeapArray类型,currentWindow()方法获取当前实际窗口数据对象

LeapArray#currentWindow()

public WindowWrap<T> currentWindow() {return currentWindow(TimeUtil.currentTimeMillis());
}
public WindowWrap<T> currentWindow(long timeMillis) {if (timeMillis < 0) {return null;}//根据时间戳获取当前时间对应的窗口下标int idx = calculateTimeIdx(timeMillis);// 根据时间戳获取当前时间对应的窗口开始时间long windowStart = calculateWindowStart(timeMillis);/*从数组中获取指定时间点的桶(bucket)项。1、如果桶不存在,则创建一个新的桶,并使用 CAS(比较并交换)更新到环形数组中。2、如果桶匹配当前开始时间,直接返回该桶。3、如果桶已过期,则重置当前桶,并清理所有过期的桶。*/while (true) {WindowWrap<T> old = array.get(idx);if (old == null) {WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));if (array.compareAndSet(idx, null, window)) {// Successfully updated, return the created bucket.return window;} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}} else if (windowStart == old.windowStart()) {return old;} else if (windowStart > old.windowStart()) {if (updateLock.tryLock()) {try {// Successfully get the update lock, now we reset the bucket.return resetWindowTo(old, windowStart);} finally {updateLock.unlock();}} else {Thread.yield();}} else if (windowStart < old.windowStart()) {return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));}}}

这里有两个重要的方法calculateTimeIdx和calculateWindowStart

    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {//当前时间除以一个时间窗口的长度long timeId = timeMillis / windowLengthInMs;// Calculate current index so we can map the timestamp to the leap array.return (int)(timeId % array.length());}

这样时间每增加一个时间窗口长度,数值下标会往前推进1。

计数窗口开始时间,这个也好理解,其实就是求上个时间窗口的结束时间,

protected long calculateWindowStart(/*@Valid*/ long timeMillis) {return timeMillis - timeMillis % windowLengthInMs;
}

拿到时间窗口buck后WindowWrap<MetricBucket> 调用MetricBucket.addPass(count)增加计数。

实际情况分析:

sentinel默认的滑动窗口周期是1000毫秒,样本数是2。这样一个窗口的时间长度是 1000/2 = 500毫秒,LeapArray数组长度为2,存储两个样本窗口数据。

所属窗口编号 = (当前时间/500)%2

窗口开始时间= (当前时间 - 当前时间%500)

不同时间请求对应窗口信息:

请求编号请求时间所属窗口编号窗口开始时间
1000
220000
330000
46001500
58001500
6110001000
7160011500

这里看到LeapArray数组中两个元素会被循环使用,过去的窗口数据会被清空覆盖掉。

FlowSlot流量控制

数据统计好了,下一步就是根据流量数据统计进行控制

FlowSlot#entry()

    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {// 校验限流规则checkFlow(resourceWrapper, context, node, count, prioritized);//调用下一个slotfireEntry(context, resourceWrapper, node, count, prioritized, args);}

限流规则校验主要在FlowRuleChecker中完成。checkFlow()校验首先根据当前资源从FlowRuleManager获取适配当前资源的限流规则,然后逐一进行规则校验。

FlowRuleChecker#checkFlow

    public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {if (ruleProvider == null || resource == null) {return;}//当前资源上配置的限流规则Collection<FlowRule> rules = ruleProvider.apply(resource.getName());if (rules != null) {//逐一校验for (FlowRule rule : rules) {if (!canPassCheck(rule, context, node, count, prioritized)) {throw new FlowException(rule.getLimitApp(), rule);}}}}

每一个FlowRule有一个对应的TrafficShapingController来执行判断。具体判断在canPass()方法中实现。

默认的策略是直接拒绝,DefaultController。其他的还有RateLimiterController, WarmUpController

DefaultController#canPass()

    public boolean canPass(Node node, int acquireCount, boolean prioritized) {//获取当前节点的请求数int curCount = avgUsedTokens(node);//count是阈值,如果大于阈值直接返回falseif (curCount + acquireCount > count) {if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {....}return false;}return true;}
private int avgUsedTokens(Node node) {if (node == null) {return DEFAULT_AVG_USED_TOKENS;}//计数所有有效的pass数量return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());}

流程算法总结

当一个请求进入 Sentinel 的流量统计 StatisticSlot 时:

  1. 获取当前时间。
  2. 通过 LeapArray.currentWindow() 方法获取当前时间对应的 WindowWrap<MetricBucket>
  3. 如果当前窗口是新窗口或过期窗口,会被重置。
  4. 在获取到的 MetricBucket 上,调用 pass.increment() 等方法,增加对应的指标计数。
  5. 在进入到FlowSlot进行限流判断时,会通过 LeapArray.values(currentTime) 获取所有有效(未过期)的 MetricBucket,然后遍历这些 MetricBucket,累加它们的 pass 计数,从而得到在滑动窗口内的总 QPS。
http://www.dtcms.com/a/296674.html

相关文章:

  • C++编程基础四大件
  • Bright Data 实战指南:从竞品数据抓取到电商策略优化全流程
  • 探秘 VSAR软件:CAN报文转DBC信号的便捷工具
  • 力扣189:轮转数组
  • 5 个适合创意创作的网站,灵感不设限
  • 基于markdown封装的前端文档编辑工具,markdown.js的解析与应用
  • 蚁群优化算法(ACO)求解旅行商问题(TSP)
  • 碳油 PCB 技术解析:高精度制造与多场景应用实践
  • Python爬虫案例:Scrapy+XPath解析当当网网页结构
  • Spring Boot 3整合Spring AI实战:9轮面试对话解析AI应用开发
  • FreeRTOS—计数型信号量
  • 亚马逊Prime Day变革:精细化运营时代的号角
  • 基础05-Java控制流程:掌握if-else、switch和循环语句
  • 使用adb 发送广播 动态改变app内的值
  • 【PyTorch】图像二分类项目-部署
  • 【数字IC验证学习------- SOC 验证 和 IP验证和形式验证的区别】
  • NOTEPAD!NPCommand函数分析之comdlg32!GetSaveFileNameW--windows记事本源代码分析
  • 暑假集训篇之并发处理①练习题
  • prometheus监控k8s的metric详解-01-apiserver部分-05-其他
  • 局域网TCP通过组播放地址rtp推流和拉流实现实时喊话
  • 猎板碳油 PCB和普通PCB的区别
  • 【OpenCV实现多图像拼接】
  • kafka消费者组消费进度(Lag)深入理解
  • Redis--哨兵机制详解
  • Linux C:预处理命令
  • 225. 用队列实现栈
  • markdown学习笔记(个人向) Part.2
  • Redis高可用架构演进面试笔记
  • C#解析JSON数据全攻略
  • SpringBoot框架,不同环境中实体类对应不同的表