Redisson分布式限流
当我们在项目中接入AI大模型时,随着平台用户的增长,我们需要实现多级的流量保护机制,防止恶意攻击和资源滥用。AI对话接口作为最核心也是成本最高的功能,更需要重点保护。
一. Redisson介绍
1. 概念
Redisson 是一个基于 Redis 的 Java 分布式框架,它不仅提供了对 Redis 各种数据结构的封装,还实现了许多分布式相关的服务,如分布式锁、限流器、信号量等,非常适合用于分布式系统中的流量控制、资源保护等场景。
其核心特点包括:
1. 丰富的分布式对象:提供了分布式集合(如 Map、List、Set)、分布式锁(RLock)、限流器(RRateLimiter)、信号量(RSemaphore)等,简化分布式开发。
2. 高性能:基于 Netty 框架实现异步非阻塞通信,性能优异,能高效处理高并发请求。
3. 易用性:API 设计贴近 Java 原生集合和工具类,学习成本低,集成简单。
4. 高可靠性:支持 Redis 集群、哨兵、主从等多种部署模式,确保服务稳定。
在流量保护场景中,Redisson 的限流器(RRateLimiter)尤为实用,它基于 Redis 实现了分布式限流功能,可精确控制接口的调用频率,防止恶意请求或突发流量耗尽系统资源,非常适合保护 AI 对话这类核心高成本接口。
2. 限流功能的实现
Redisson 实现了基于令牌桶算法的 RRateLimiter:
Objects - Redisson Reference Guide,而令牌桶算法是经典的网络流量速率限制算法。
- 令牌桶:可以想象成一个固定容量的桶,按照固定的速率往桶里放入令牌 。
- 请求处理:当有请求到来时,尝试从桶中获取令牌,如果桶中有足够的令牌,请求就被允许通过并消耗一定的令牌;如果桶中没有足够的令牌,请求可能会被拒绝或者等待,直到桶中有可用令牌。
优势:能精确控制平均请求速率、可应对突发流量,且实现简单易集成,同时适配分布式环境,能为系统(如AI对话接口)提供可靠的流量保护。
二. 开发实现
1. 引入Redisson依赖
<!-- Redisson -->
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.50.0</version>
</dependency>
2. 配置文件中写入redis连接配置
spring:data:redis:host: localhostport: 6379database: 0password:ttl: 3600 # 缓存过期时间(秒)
3. 编写Redisson客户端配置,读取Redis相关配置并初始化Redisson客户端的Bean
@Configuration
public class RedissonConfig {@Value("${spring.data.redis.host}")private String redisHost;@Value("${spring.data.redis.port}")private Integer redisPort;@Value("${spring.data.redis.password}")private String redisPassword;@Value("${spring.data.redis.database}")private Integer redisDatabase;@Beanpublic RedissonClient redissonClient() {Config config = new Config();String address = "redis://" + redisHost + ":" + redisPort;SingleServerConfig singleServerConfig = config.useSingleServer().setAddress(address).setDatabase(redisDatabase).setConnectionMinimumIdleSize(1).setConnectionPoolSize(10).setIdleConnectionTimeout(30000).setConnectTimeout(5000).setTimeout(3000).setRetryAttempts(3).setRetryInterval(1500);// 如果有密码则设置密码if (redisPassword != null && !redisPassword.isEmpty()) {singleServerConfig.setPassword(redisPassword);}return Redisson.create(config);}
}
配置Redisson
:在redissonClient
方法中,创建一个Config
对象用于配置 Redisson。设置 Redis 的连接地址,通过useSingleServer
指定使用单机模式,并配置了一系列连接参数,比如最小空闲连接数(setConnectionMinimumIdleSize
)、连接池大小(setConnectionPoolSize
)、空闲连接超时时间(setIdleConnectionTimeout
)等,来优化与 Redis 的连接和交互性能。
4. 创建限流类型枚举,支持接口,用户,IP等多个维度的限流
public enum RateLimitType {/*** 接口级别限流*/API,/*** 用户级别限流*/USER,/*** IP级别限流*/IP
}
5. 创建限流注解,提供灵活的配置选项
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimit {/*** 限流key前缀*/String key() default "";/*** 每个时间窗口允许的请求数*/int rate() default 10;/*** 时间窗口(秒)*/int rateInterval() default 1;/*** 限流类型*/RateLimitType limitType() default RateLimitType.USER;/*** 限流提示信息*/String message() default "请求过于频繁,请稍后再试";
}
6. 通过AOP实现限流逻辑
6.1 定义切面和注入依赖
@Aspect
@Component
@Slf4j
public class RateLimitAspect {@Resourceprivate RedissonClient redissonClient;@Resourceprivate UserService userService;
}
6.2 编写限流逻辑
@Before("@annotation(rateLimit)")
public void doBefore(JoinPoint point, RateLimit rateLimit) {String key = generateRateLimitKey(point, rateLimit);// 使用Redisson的分布式限流器RRateLimiter rateLimiter = redissonClient.getRateLimiter(key);rateLimiter.expire(Duration.ofHours(1)); // 1 小时后过期// 设置限流器参数:每个时间窗口允许的请求数和时间窗口rateLimiter.trySetRate(RateType.OVERALL, rateLimit.rate(), rateLimit.rateInterval(), RateIntervalUnit.SECONDS);// 尝试获取令牌,如果获取失败则限流if (!rateLimiter.tryAcquire(1)) {throw new BusinessException(ErrorCode.TOO_MANY_REQUEST, rateLimit.message());}
}
这是限流的核心入口方法,在标注了 @RateLimit 注解的方法执行前进行拦截。生成限流 key,获取 Redisson 限流器,设置限流规则。然后使用令牌桶算法进行限流判断,超限时抛出业务异常。
注意:一定要为限流器设置过期时间,否则Redis中的key永远不会过期,长时间运行后内存占用会越来越高。
分析:
1. 生成限流 key
通过 generateRateLimitKey(point, rateLimit)
方法生成一个唯一的限流标识 key
,用于区分不同的限流场景(比如不同的接口、不同的用户操作等)。
2. 获取 Redisson 限流器
利用 redissonClient.getRateLimiter(key)
获取(或创建)一个基于该 key
的分布式限流器 RRateLimiter
。因为 Redisson 是分布式的,所以这个限流器可以在多服务实例间共享限流状态。
3. 设置限流器过期时间
通过 rateLimiter.expire(Duration.ofHours(1))
设置限流器在 1 小时后过期,避免无用的限流器长期占用 Redis 资源。
4. 配置限流规则
调用 rateLimiter.trySetRate
方法配置限流规则:
RateType.OVERALL
表示整体限流(也可根据需求选择针对客户端等的限流类型)。-
rateLimit.rate()
是每个时间窗口(由rateLimit.rateInterval()
和RateIntervalUnit.SECONDS
确定,这里是秒级时间窗口)允许的请求数量。
5. 尝试获取令牌进行限流判断
使用 rateLimiter.tryAcquire(1)
尝试从限流器中获取 1 个令牌:
- 如果获取成功,说明当前请求可以通过,继续执行被
@RateLimit
标注的方法。 - 如果获取失败,说明请求频率超过了限流规则,抛出
BusinessException
(业务异常),提示 “请求过多” 等信息(由rateLimit.message()
定义),从而阻止方法执行,实现限流。
6.3 编写生成限流Key的方法
private String generateRateLimitKey(JoinPoint point, RateLimit rateLimit) {StringBuilder keyBuilder = new StringBuilder();keyBuilder.append("rate_limit:");// 添加自定义前缀if (!rateLimit.key().isEmpty()) {keyBuilder.append(rateLimit.key()).append(":");}// 根据限流类型生成不同的keyswitch (rateLimit.limitType()) {case API:// 接口级别:方法名MethodSignature signature = (MethodSignature) point.getSignature();Method method = signature.getMethod();keyBuilder.append("api:").append(method.getDeclaringClass().getSimpleName()).append(".").append(method.getName());break;case USER:// 用户级别:用户IDtry {ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();if (attributes != null) {HttpServletRequest request = attributes.getRequest();User loginUser = userService.getLoginUser(request);keyBuilder.append("user:").append(loginUser.getId());} else {// 无法获取请求上下文,使用IP限流keyBuilder.append("ip:").append(getClientIP());}} catch (BusinessException e) {// 未登录用户使用IP限流keyBuilder.append("ip:").append(getClientIP());}break;case IP:// IP级别:客户端IPkeyBuilder.append("ip:").append(getClientIP());break;default:throw new BusinessException(ErrorCode.SYSTEM_ERROR, "不支持的限流类型");}return keyBuilder.toString();
}
作用:根据不同的限流策略生成唯一的 Redis key,API 级别按方法名、用户级别按用户 ID、IP 级别按客户端 IP,从而支持三种限流维度。 这里还有个降级逻辑的小设计,用户级别限流获取用户信息失败时自动降级为 IP 限流。
分析:
-
基础前缀:以 "rate_limit:" 作为所有限流 key 的统一前缀,便于 Redis 中相关键的识别和管理。
-
自定义前缀:如果
@RateLimit
注解中指定了key
属性(非空),则将其添加到 key 中,用于进一步区分不同业务场景。 -
按限流类型生成具体 key:
- API 级别:通过
JoinPoint
获取当前调用的方法信息,生成包含「类名。方法名」的 key(如rate_limit:api:UserController.getUser
),实现对特定接口的限流。 - USER 级别:优先获取当前登录用户的 ID 作为 key 一部分(如
rate_limit:user:1001
);若无法获取用户上下文(如未登录),则降级为使用客户端 IP 作为标识。 - IP 级别:直接使用客户端的 IP 地址生成 key(如
rate_limit:ip:192.168.1.1
),限制来自特定 IP 的请求频率。
- API 级别:通过
-
异常处理:对于不支持的限流类型,抛出系统异常提示。
6.4编写获取客户端IP的工具方法
private String getClientIP() {ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();if (attributes == null) {return "unknown";}HttpServletRequest request = attributes.getRequest();String ip = request.getHeader("X-Forwarded-For");if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {ip = request.getHeader("X-Real-IP");}if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {ip = request.getRemoteAddr();}// 处理多级代理的情况if (ip != null && ip.contains(",")) {ip = ip.split(",")[0].trim();}return ip != null ? ip : "unknown";
}
6.5 在要进行限流的接口上使用限流注解
这里以AI对话接口为例,在限流注解中自定义限流的属性。
@GetMapping(value = "/chat/gen/code", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@RateLimit(limitType = RateLimitType.USER, rate = 5, rateInterval = 60, message = "AI 对话请求过于频繁,请稍后再试")
public Flux<ServerSentEvent<String>> chatToGenCode(@RequestParam Long appId,@RequestParam String message,HttpServletRequest request) {// 方法实现...
}
这样,每个用户一分钟只能发起5次AI对话请求,超过5次就会返回友好的提示。