当前位置: 首页 > news >正文

dubbo限流

单机限流

限流过滤器

package com.doudou.filter;import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.*;import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;/*** @title CustomLimitFilter* @description 自定义限流过滤器* author zzw* version 1.0.0* create 2025/5/6 22:29**/
@Activate(group = CommonConstants.PROVIDER)
public class CustomLimitFilter implements Filter {/*** 存储计数资源的Map数据结构,预分配容量64,避免无谓的扩容消耗*/private static final ConcurrentMap<String, AtomicInteger> COUNT_MAP = new ConcurrentHashMap<>(64);/*** QOS流量启动是否开启。 {@code true}:标识开启流量监测;@{@code 其它值}:标识不开启流量监测*/public static final String KEY_QPS_ENABLE = "qps.enable";/*** 每个方法开启的限流检测值*/public static final String KEY_QPS_VALUE = "qps.value";/*** 默认的限流检测值,默认为 30*/public static final long DEFAULT_QPS_VALUE = 30L;@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {// 获取限流资源的结果// true:获取到计数资源// false:计数已满,无法获取计数资源// null:不需要限流Boolean acquired = null;try {// 获取限流技术资源acquired = tryAcquire(invoker.getUrl(), invocation);if (null != acquired && !acquired) {throw new RuntimeException("Failed to acquire service "+ String.join(".", invoker.getInterface().getName(), invocation.getMethodName())+ " because of overload.");}// 不限流或或者获取到限流资源,执行下一步调用return invoker.invoke(invocation);} finally {// 释放获取到的计数资源release(acquired, invoker.getUrl(), invocation);}}private void release(Boolean acquired, URL url, Invocation invocation) {// 未限流或者获取计数资源失败时,无需释放资源if (null == acquired || !acquired) {return;}String serviceKey = String.join("_", url.getServiceKey(), invocation.getMethodName());COUNT_MAP.get(serviceKey).decrementAndGet();}private Boolean tryAcquire(URL url, Invocation invocation) {// 判断是否开启限流// 获取全局配置 结果是8// url.getParameter(KEY_QPS_ENABLE);// 获取方法级别的配置 结果是5String qpsEnableFlag = url.getMethodParameter(invocation.getMethodName(), KEY_QPS_ENABLE);if (!Boolean.TRUE.toString().equals(qpsEnableFlag)) {// 限流开关的值不是true时,不开启限流return null;}// 获取限流监测值,未配置时默认30long qpsValue = url.getMethodParameter(invocation.getMethodName(), KEY_QPS_VALUE, DEFAULT_QPS_VALUE);// 构建限流的keyString serviceKey = String.join("_", url.getServiceKey(), invocation.getMethodName());// 获取对应的计数器AtomicInteger currentCount = COUNT_MAP.get(serviceKey);if (null == currentCount) {// 第一次访问时没有计数对象值,进行计数容器的初始化。currentCount = COUNT_MAP.putIfAbsent(serviceKey, new AtomicInteger(0));}// 如果当前的计数器值大于等于配置的限流值时,返回false,表示未获取到计数资源if (currentCount.get() >= qpsValue) {return Boolean.FALSE;}currentCount.incrementAndGet();return Boolean.TRUE;}
}

META-INF/dubbo/org.apache.dubbo.rpc.Filter

consumerLimit=com.doudou.filter.CustomLimitFilter

限流服务配置

import com.doudou.demo.api.RoleQueryFacade;
import org.apache.dubbo.config.annotation.DubboService;
import org.apache.dubbo.config.annotation.Method;import java.util.concurrent.TimeUnit;/*** @title RoleQueryFacadeImpl* @description <TODO description class purpose>* author zzw* version 1.0.0* create 2025/5/6 23:27**/
@DubboService(methods = {@Method(name = "queryRoleList", parameters = {"qps.enable", "true", "qps.value", "5"})},parameters = {"qps2.enable", "true", "qps2.value", "8"})
public class RoleQueryFacadeImpl implements RoleQueryFacade {@Overridepublic String queryRoleList(String userId) {try {// 睡眠 1 秒,模拟一下查询数据库需要耗费时间TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}String result = String.format(System.currentTimeMillis() + ": Hello %s, 已查询该用户【角色列表信息】", userId);System.out.println(result);return result;}
}

限流现象

java.lang.RuntimeException: Failed to acquire service com.doudou.demo.api.RoleQueryFacade.queryRoleList because of overload.at com.doudou.filter.CustomLimitFilter.invoke(CustomLimitFilter.java:50)at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:349)at org.apache.dubbo.rpc.filter.AccessLogFilter.invoke(AccessLogFilter.java:120)at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:349)at org.apache.dubbo.rpc.filter.GenericFilter.invoke(GenericFilter.java:222)at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:349)at org.apache.dubbo.rpc.protocol.tri.h12.HttpContextFilter.invoke(HttpContextFilter.java:38)at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:349)at org.apache.dubbo.rpc.filter.ClassLoaderFilter.invoke(ClassLoaderFilter.java:54)at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:349)at org.apache.dubbo.rpc.filter.EchoFilter.invoke(EchoFilter.java:41)at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:349)at org.apache.dubbo.metrics.filter.MetricsFilter.invoke(MetricsFilter.java:86)at org.apache.dubbo.metrics.filter.MetricsProviderFilter.invoke(MetricsProviderFilter.java:37)at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:349)at org.apache.dubbo.rpc.filter.ProfilerServerFilter.invoke(ProfilerServerFilter.java:66)at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:349)at org.apache.dubbo.rpc.filter.ContextFilter.invoke(ContextFilter.java:191)at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:349)at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CallbackRegistrationInvoker.invoke(FilterChainBuilder.java:197)at org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol$1.reply(DubboProtocol.java:167)at org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleRequest(HeaderExchangeHandler.java:110)at org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.received(HeaderExchangeHandler.java:205)at org.apache.dubbo.remoting.transport.DecodeHandler.received(DecodeHandler.java:52)at org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable.run(ChannelEventRunnable.java:64)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at org.apache.dubbo.common.threadlocal.InternalRunnable.run(InternalRunnable.java:39)at java.lang.Thread.run(Thread.java:748)
, dubbo version: 3.3.0, current host: 169.254.80.162, error code: 2-16. This may be caused by failed to retry do invoke, go to https://dubbo.apache.org/faq/2/16 to find instructions. 

分布式限流

相关文章:

  • VMware Fusion安装win11 arm;使用Mac远程连接到Win
  • 使用 OpenSSL 吊销 Kubernetes(k8s)的 kubeconfig 里的用户证书
  • 论文速读《Embodied-R: 基于强化学习激活预训练模型具身空间推理能力》
  • 如何添加或删除极狐GitLab 项目成员?
  • Codeforces Round 1023 (Div. 2)
  • 代码随想录训练营第十八天| 150.逆波兰表达式求值 239.滑动窗口最大值 347.前k个高频元素
  • 什么是gitlab自动部署,怎么配置gitlab自动部署
  • QGIS分割平行四边形
  • 分布式、高并发-Day04
  • 白平衡色温坐标系下自适应计算白点权重的方法
  • 部署Superset BI(三)连接HANA数据库
  • yolo训练用的数据集的数据结构
  • RTPSParticipant构建流程
  • SpringBoot整合Kafka、Flink实现流式处理
  • ResNet50应用于农业保险现场照片作物种类核验
  • 【回眸】QAC使用指南——导出 Dashboard Report个性化定制Report
  • ==和equals的区别 hashCode和equals的联系
  • JAVA设计模式——(十二)原型模式(Prototype Pattern)
  • c#OdbcDataReader的数据读取
  • LeetCode LCR 033. 字母异位词分组
  • 夜读丨最美的风景,在亲人的目光里
  • 柳向春:关于美国国会图书馆所藏《全芳备祖》的一些故事
  • 江西浮梁县县长张汉坤被查,此前已有4个月无公开活动
  • 韩国总统选举民调:共同民主党前党首李在明支持率超46%
  • 下达专项资金、党政主官田间调研……全国多地力保夏粮稳收
  • 五一当天1372对新人在沪喜结连理,涉外婚姻登记全市铺开