dubbo限流
单机限流
限流过滤器
package com.doudou.filter;import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.*;import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;/*** @title CustomLimitFilter* @description 自定义限流过滤器* author zzw* version 1.0.0* create 2025/5/6 22:29**/
@Activate(group = CommonConstants.PROVIDER)
public class CustomLimitFilter implements Filter {/*** 存储计数资源的Map数据结构,预分配容量64,避免无谓的扩容消耗*/private static final ConcurrentMap<String, AtomicInteger> COUNT_MAP = new ConcurrentHashMap<>(64);/*** QOS流量启动是否开启。 {@code true}:标识开启流量监测;@{@code 其它值}:标识不开启流量监测*/public static final String KEY_QPS_ENABLE = "qps.enable";/*** 每个方法开启的限流检测值*/public static final String KEY_QPS_VALUE = "qps.value";/*** 默认的限流检测值,默认为 30*/public static final long DEFAULT_QPS_VALUE = 30L;@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {// 获取限流资源的结果// true:获取到计数资源// false:计数已满,无法获取计数资源// null:不需要限流Boolean acquired = null;try {// 获取限流技术资源acquired = tryAcquire(invoker.getUrl(), invocation);if (null != acquired && !acquired) {throw new RuntimeException("Failed to acquire service "+ String.join(".", invoker.getInterface().getName(), invocation.getMethodName())+ " because of overload.");}// 不限流或或者获取到限流资源,执行下一步调用return invoker.invoke(invocation);} finally {// 释放获取到的计数资源release(acquired, invoker.getUrl(), invocation);}}private void release(Boolean acquired, URL url, Invocation invocation) {// 未限流或者获取计数资源失败时,无需释放资源if (null == acquired || !acquired) {return;}String serviceKey = String.join("_", url.getServiceKey(), invocation.getMethodName());COUNT_MAP.get(serviceKey).decrementAndGet();}private Boolean tryAcquire(URL url, Invocation invocation) {// 判断是否开启限流// 获取全局配置 结果是8// url.getParameter(KEY_QPS_ENABLE);// 获取方法级别的配置 结果是5String qpsEnableFlag = url.getMethodParameter(invocation.getMethodName(), KEY_QPS_ENABLE);if (!Boolean.TRUE.toString().equals(qpsEnableFlag)) {// 限流开关的值不是true时,不开启限流return null;}// 获取限流监测值,未配置时默认30long qpsValue = url.getMethodParameter(invocation.getMethodName(), KEY_QPS_VALUE, DEFAULT_QPS_VALUE);// 构建限流的keyString serviceKey = String.join("_", url.getServiceKey(), invocation.getMethodName());// 获取对应的计数器AtomicInteger currentCount = COUNT_MAP.get(serviceKey);if (null == currentCount) {// 第一次访问时没有计数对象值,进行计数容器的初始化。currentCount = COUNT_MAP.putIfAbsent(serviceKey, new AtomicInteger(0));}// 如果当前的计数器值大于等于配置的限流值时,返回false,表示未获取到计数资源if (currentCount.get() >= qpsValue) {return Boolean.FALSE;}currentCount.incrementAndGet();return Boolean.TRUE;}
}
META-INF/dubbo/org.apache.dubbo.rpc.Filter
consumerLimit=com.doudou.filter.CustomLimitFilter
限流服务配置
import com.doudou.demo.api.RoleQueryFacade;
import org.apache.dubbo.config.annotation.DubboService;
import org.apache.dubbo.config.annotation.Method;import java.util.concurrent.TimeUnit;/*** @title RoleQueryFacadeImpl* @description <TODO description class purpose>* author zzw* version 1.0.0* create 2025/5/6 23:27**/
@DubboService(methods = {@Method(name = "queryRoleList", parameters = {"qps.enable", "true", "qps.value", "5"})},parameters = {"qps2.enable", "true", "qps2.value", "8"})
public class RoleQueryFacadeImpl implements RoleQueryFacade {@Overridepublic String queryRoleList(String userId) {try {// 睡眠 1 秒,模拟一下查询数据库需要耗费时间TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}String result = String.format(System.currentTimeMillis() + ": Hello %s, 已查询该用户【角色列表信息】", userId);System.out.println(result);return result;}
}
限流现象
java.lang.RuntimeException: Failed to acquire service com.doudou.demo.api.RoleQueryFacade.queryRoleList because of overload.at com.doudou.filter.CustomLimitFilter.invoke(CustomLimitFilter.java:50)at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:349)at org.apache.dubbo.rpc.filter.AccessLogFilter.invoke(AccessLogFilter.java:120)at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:349)at org.apache.dubbo.rpc.filter.GenericFilter.invoke(GenericFilter.java:222)at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:349)at org.apache.dubbo.rpc.protocol.tri.h12.HttpContextFilter.invoke(HttpContextFilter.java:38)at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:349)at org.apache.dubbo.rpc.filter.ClassLoaderFilter.invoke(ClassLoaderFilter.java:54)at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:349)at org.apache.dubbo.rpc.filter.EchoFilter.invoke(EchoFilter.java:41)at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:349)at org.apache.dubbo.metrics.filter.MetricsFilter.invoke(MetricsFilter.java:86)at org.apache.dubbo.metrics.filter.MetricsProviderFilter.invoke(MetricsProviderFilter.java:37)at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:349)at org.apache.dubbo.rpc.filter.ProfilerServerFilter.invoke(ProfilerServerFilter.java:66)at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:349)at org.apache.dubbo.rpc.filter.ContextFilter.invoke(ContextFilter.java:191)at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:349)at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CallbackRegistrationInvoker.invoke(FilterChainBuilder.java:197)at org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol$1.reply(DubboProtocol.java:167)at org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleRequest(HeaderExchangeHandler.java:110)at org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.received(HeaderExchangeHandler.java:205)at org.apache.dubbo.remoting.transport.DecodeHandler.received(DecodeHandler.java:52)at org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable.run(ChannelEventRunnable.java:64)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at org.apache.dubbo.common.threadlocal.InternalRunnable.run(InternalRunnable.java:39)at java.lang.Thread.run(Thread.java:748)
, dubbo version: 3.3.0, current host: 169.254.80.162, error code: 2-16. This may be caused by failed to retry do invoke, go to https://dubbo.apache.org/faq/2/16 to find instructions.