当前位置: 首页 > news >正文

限流系列:sentinel

目录

滑动窗口算法

Sentinel

数据模型

示例

大致流程

​​​​​​​entry

​​​​​​​entryWithPriority

​​​​​​​FlowSlot.entry

​​​​​​​checkFlow

​​​​​​​canPass

​​​​​​​avgUsedTokens

​​​​​​​passQps

​​​​​​​pass

​​​​​​​currentWindow

calculateTimeIdx

​​​​​​​calculateWindowStart

​​​​​​​values


滑动窗口算法

       滑动窗口算法是将时间周期分为n个小周期,分别记录每个小周期内的访问次数,并且根据时间滑动删除过期的小周期。

       如下图,假设时间周期为1min,将1min再分割成2个小周期,统计每个小周期的访问数量,则可以看到,第一个时间周期内访问数量为75,第二个时间周期内访问数量为100,超过100的数量被限流掉了。


       由此可见,当滑动窗口格子划分得越多,那么滑动窗口的滚动就越平滑,限流的统计就越精确。可以很好的解决固定窗口的流动问题。

Sentinel

       滑动窗口算法也是Sentinel的默认算法。

​​​​​​​数据模型

英文名称

中文名称

备注

array

窗口数组

windowLengthInMs

单个窗口时间长度

sampleCount

总窗口数量

intervalInMs

时间窗口总长度

sampleCount * windowLengthInMs

示例

public static void main(String[] args) throws Exception {
    //加载流控规则
    initFlowRules();
    for (int i = 0; i < 5; i++) {
        Thread.sleep(200);
        Entry entry = null;
        try {
            entry = SphU.entry("sayHello");
            //被保护的逻辑
            log.info("访问sayHello资源");
        } catch (Exception ex) {
            log.info("被流量控制了,可以进行降级处理");
        } finally {
            if (entry != null) {
                entry.exit();
            }
        }
    }
}
private static void initFlowRules() {
    List<FlowRule> rules = new ArrayList<>();
    // 创建一个流控规则
    FlowRule rule = new FlowRule();
    // 对sayHello这个资源限流
    rule.setResource("sayHello");
    // 基于qps限流
    rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
    // qps最大为2,超过2就要被限流
    rule.setCount(2);
    rules.add(rule);
    // 设置规则
    FlowRuleManager.loadRules(rules);
}

大致流程

    点击示例里的entry()方法,如下所示:

​​​​​​​entry

public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
    return entryWithPriority(resourceWrapper, count, false, args);
}

        点击entryWithPriority()方法,如下所示:

​​​​​​​entryWithPriority

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
    .. ...
    Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
        chain.entry(context, resourceWrapper, null, count, prioritized, args);
    } catch (BlockException e1) {
        e.exit(count, args);
        throw e1;
    } catch (Throwable e1) {
        // This should not happen, unless there are errors existing in Sentinel internal.
        RecordLog.info("Sentinel unexpected exception", e1);
    }
    return e;
}

       点击chain.entry()方法,因为我们这次探究的是限流算法,所以选择FlowSlot类,如下所示:

​​​​​​​FlowSlot.entry

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
    checkFlow(resourceWrapper, context, node, count, prioritized);
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

        点击checkFlow()方法,如下所示:

​​​​​​​checkFlow

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {

        if (ruleProvider == null || resource == null) {

            return;

        }

        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());

        if (rules != null) {

            for (FlowRule rule : rules) {

                if (!canPassCheck(rule, context, node, count, prioritized)) {

                    throw new FlowException(rule.getLimitApp(), rule);

                }

            }

        }

}

​​​​​​​canPass

public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    int curCount = avgUsedTokens(node);
    if (curCount + acquireCount > count) {
        ... ...
        return false;
    }
    return true;
}

       在这里,获取已经使用的token数量,加上待申请的数量,如果超过流控规则里设置的最大值,则返回false。

       点击avgUsedTokens()方法,如下所示:

​​​​​​​avgUsedTokens

private int avgUsedTokens(Node node) {

        if (node == null) {

            return DEFAULT_AVG_USED_TOKENS;

        }

        return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());

}

       在这里,通过调用node.passQps()获取已经使用的token数量。

       点击passQps(),如下所示:

​​​​​​​passQps

public double passQps() {
    return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}

       在这里,rollingCounterInSecond对象保存了时间窗口对象的数组。pass()方法可以获取每个有效的时间窗口对象里已经使用的令牌数量,getWindowIntervalInSec()方法是时间窗口总长度,以秒为单位。两者相除就可以已使用的QPS。

       点击pass()方法,如下所示:

​​​​​​​pass

public long pass() {

        data.currentWindow();

        long pass = 0;

        List<MetricBucket> list = data.values();

        for (MetricBucket window : list) {

            pass += window.pass();

        }

        return pass;

    }

       在这里,会调用currentWindow()刷新当前窗口信息,然后累加每个窗口的计数值作为当前计数周期的计数值。

       点击currentWindow()方法,如下所示:

​​​​​​​currentWindow

public WindowWrap<T> currentWindow(long timeMillis) {

    int idx = calculateTimeIdx(timeMillis);

    long windowStart = calculateWindowStart(timeMillis);

    while (true) {

        WindowWrap<T> old = array.get(idx);

        if (old == null) {

            // 创建新窗口

            WindowWrap<T> window = new WindowWrap<>(windowLengthInMs, windowStart, newEmptyBucket());

            if (array.compareAndSet(idx, null, window)) {

                return window;

            }

        } else if (windowStart == old.windowStart()) {

            // 命中当前有效窗口

            return old;

        } else if (windowStart > old.windowStart()) {

            // 窗口已过期,重置并复用

            if (updateLock.tryLock()) {

                try {

                    return resetWindowTo(old, windowStart);

                } finally {

                    updateLock.unlock();

                }

            }

        }

    }

}

在这里:

  1. 获取当前时间窗口的索引
  2. 获取当前窗口的起始时间
  3. 根据当前时间窗口的索引,获取时间窗口对象,如果时间窗口对象为空,则创建一个新时间窗口对象;如果已经存在时间窗口对象,则返回该对象;如果时间窗口对象已过期,则重置并复用。

calculateTimeIdx

public int calculateTimeIdx(long timeMillis) {

    long timeId = timeMillis / windowLengthInMs;

    return (int)(timeId % array.length());

}

       在这里,根据当前时间戳计算对应窗口索引。

​​​​​​​calculateWindowStart

protected long calculateWindowStart(long timeMillis) {

    return timeMillis - timeMillis % windowLengthInMs;

}

       在这里,计算当前窗口的起始时间(对齐到窗口边界)。

       点击步骤pass的values()方法,如下所示:

​​​​​​​values

public List<T> values() {
    return values(TimeUtil.currentTimeMillis());
}
public List<T> values(long timeMillis) {
    if (timeMillis < 0) {
        return new ArrayList<T>();
    }
    int size = array.length();
    List<T> result = new ArrayList<T>(size);
    for (int i = 0; i < size; i++) {
        WindowWrap<T> windowWrap = array.get(i);
        if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
            continue;
        }
        result.add(windowWrap.value());
    }
    return result;
}

        在这里,循环时间窗口数组,忽略已经失效的时间窗口对象,将有效的时间窗口对象保存在一个列表对象里,并作为方法返回值进行返回。

​​​​​​​isWindowDeprecated

public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
    return time - windowWrap.windowStart() > intervalInMs;
}

        在这里,将当前时间减去指定时间窗口对象的起始时间,如果结果大于计数周期时长,则表明指定的时间窗口对象已经失效。

相关文章:

  • 边缘AI:在物联网设备上实现智能处理
  • Webpack和Vite构建工具有什么区别?各自的优缺点是什么
  • 【论文解读】STaR:不用人类思维链指导,模型可以自我进化!
  • ChatGPT与认知科学:人机协同的未来图景
  • 云原生微服务devops项目管理英文表述详解
  • 论文阅读笔记:YOLO-World: Real-Time Open-Vocabulary Object Detection
  • 【科研绘图系列】R语言绘制气泡图(bubble plot)
  • 项目 react+taro 编写的微信 小程序,什么命令,可以减少console的显示
  • Django orm详解--工作流程
  • Mac安装配置InfluxDB,InfluxDB快速入门,Java集成InfluxDB
  • 答题pk小程序题库题型更新啦
  • Kafka Kraft模式集群 + ssl
  • 基于开源链动2+1模式AI智能名片S2B2C商城小程序的产品驱动型增长策略研究
  • vs2022 调试时 控制台界面不出来
  • kafka实践与C++操作kafka
  • AI智能混剪核心技术解析(一):字幕与标题生成的三大支柱-字幕与标题生成-优雅草卓伊凡
  • 李宏毅NLP-7-CTC/RNN-T文本对齐
  • Jupyter Notebook 完全指南:从入门到高效使用
  • VS Code新手基础教程
  • MERIT:用于可靠且可解释的肝纤维化分期的多视图证据学习|文献速递-深度学习医疗AI最新文献
  • 可在哪些网站做链接/百度服务中心
  • 淘宝联盟个人网站怎么做/seo技术快速网站排名
  • 论坛门户网站建设运营费用/网页设计主要做什么
  • vs做网站/做引流推广的平台
  • 做拼图字的网站/百度关键词搜索排名多少钱
  • 网站建设入门到精通/选择一个产品做营销方案