Redis+Lua的分布式限流器
核心原理
在分布式系统中,限流器需满足 原子性 和 跨进程一致性。Redis 提供高性能存储,Lua 脚本保证多命令原子执行,二者结合实现分布式限流。
常用限流算法
1、令牌桶算法 (Token Bucket)
-
以恒定速率向桶内添加令牌
-
请求消耗令牌,桶空时拒绝请求
-
支持突发流量(桶内令牌可累积)
2、漏桶算法 (Leaky Bucket)
-
请求以恒定速率流出
-
超出桶容量时拒绝请求
Redis数据结构:
使用一个Hash结构来存储令牌桶的相关信息,包括:
tokens
: 当前桶中的令牌数量。last_time
: 上次更新令牌桶的时间戳(单位:秒或毫秒)
Lua脚本设计:
脚本的主要逻辑如下:
- 获取当前时间(单位:秒,可以使用Redis的
TIME
命令获取)。 - 从Redis中读取令牌桶的当前状态(当前令牌数
tokens
和上次更新时间last_time
)。 - 计算从上一次更新到现在这段时间内应该添加的令牌数(基于速率和经过的时间)。
- 更新当前令牌数(不能超过桶的容量)。
- 判断当前令牌数是否足够(至少需要1个令牌):
- 如果足够,则令牌数减1,并更新状态,返回允许(例如返回1)。
- 如果不足,则返回拒绝(例如返回0)。
注意:由于令牌桶的更新需要原子性,所以整个计算和更新过程必须在一个Lua脚本中完成。
Lua脚本示例:
local key = KEYS[1] -- 限流器Key
local capacity = tonumber(ARGV[1]) -- 桶容量
local rate = tonumber(ARGV[2]) -- 令牌生成速率 (个/秒)
local now = tonumber(ARGV[3]) -- 当前时间戳(秒)
local requested = tonumber(ARGV[4]) -- 请求令牌数-- 获取桶当前状态
local data = redis.call('HMGET', key, 'tokens', 'last_time')
local tokens
local last_time-- 处理数据初始化及类型转换
if data[1] == false or data[1] == nil thentokens = capacity
elsetokens = tonumber(data[1]) or capacity -- 非数字时使用默认容量
endif data[2] == false or data[2] == nil thenlast_time = now
elselast_time = tonumber(data[2]) or now -- 非数字时使用当前时间
end-- 处理时间回拨问题(服务器时间调整)
if now < last_time thenlast_time = now -- 重置最后时间为当前时间
end-- 计算新增令牌(仅当时间正常流逝时)
local new_tokens = 0
if now > last_time thennew_tokens = (now - last_time) * rate
end-- 更新令牌数量(不超过桶容量)
tokens = math.min(capacity, tokens + new_tokens)-- 判断是否允许请求
local allowed = false
if tokens >= requested thentokens = tokens - requestedallowed = true-- 更新桶状态redis.call('HMSET', key, 'tokens', tokens, 'last_time', now)-- 设置Key过期时间(避免冷数据堆积)local ttl = math.ceil(capacity / rate) * 2redis.call('EXPIRE', key, ttl)
endreturn allowed and 1 or 0
RedisRateLimiter 类
package com.wang.seckill.service.impl;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;import java.util.Collections;
import java.util.concurrent.TimeUnit;/*** @author xiaoman*/
@Component
public class RedisRateLimiter {@Autowiredprivate StringRedisTemplate stringRedisTemplate;/*** lua脚本*/private static final String RATE_LIMITER_LUA ="local key = KEYS[1] \n" +"local capacity = tonumber(ARGV[1]) \n" +"local rate = tonumber(ARGV[2]) \n" +"local now = tonumber(ARGV[3]) \n" +"local requested = tonumber(ARGV[4]) \n" +"local data = redis.call('HMGET', key, 'tokens', 'last_time') \n" +"local tokens \n" +"local last_time \n" +"if data[1] == false or data[1] == nil then \n" +" tokens = capacity \n" +"else \n" +" tokens = tonumber(data[1]) or capacity \n" +"end \n" +"if data[2] == false or data[2] == nil then \n" +" last_time = now \n" +"else \n" +" last_time = tonumber(data[2]) or now \n" +"end \n" +"if now < last_time then \n" +" last_time = now \n" +"end \n" +"local new_tokens = 0 \n" +"if now > last_time then \n" +" new_tokens = (now - last_time) * rate \n" +"end \n" +"tokens = math.min(capacity, tokens + new_tokens) \n" +"local allowed = false \n" +"if tokens >= requested then \n" +" tokens = tokens - requested \n" +" allowed = true \n" +" redis.call('HMSET', key, 'tokens', tokens, 'last_time', now) \n" +" local ttl = math.ceil(capacity / rate) * 2 \n" +" redis.call('EXPIRE', key, ttl) \n" +"end \n" +"return allowed and 1 or 0";// 预编译的 Redis 脚本(提升性能)private final RedisScript<Long> rateLimiterScript;public RedisRateLimiter() {// 在构造函数中预编译脚本DefaultRedisScript<Long> script = new DefaultRedisScript<>();script.setScriptText(RATE_LIMITER_LUA);script.setResultType(Long.class);this.rateLimiterScript = script;}/*** 尝试获取令牌* @param key 限流key(如:接口名+用户ID)* @param capacity 桶容量* @param rate 每秒补充的令牌数* @param requested 请求的令牌数(通常为1)* @return 是否允许*/public boolean tryAcquire(String key, int capacity, int rate, int requested) {try {long now = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());Long result = stringRedisTemplate.execute(rateLimiterScript,Collections.singletonList(key),String.valueOf(capacity),String.valueOf(rate),String.valueOf(now),String.valueOf(requested));// 处理可能的空结果return result != null && result == 1;} catch (Exception e) {// 根据业务需求选择降级策略// true = 允许通过(宽松模式) false = 拒绝请求(严格模式)return handleException(e);}}/*** 异常处理策略(可重写)*/protected boolean handleException(Exception e) {// 默认降级策略:拒绝请求return false;// 生产环境可根据需要实现:// 1. 记录日志// 2. 根据异常类型选择策略// 3. 动态配置降级方案}
}
工作流程
初始化参数
-
capacity: 桶容量(突发最大请求数)
-
rate: 令牌生成速率(如 10 表示每秒10个)
-
requested: 本次请求消耗令牌数(通常为1)
调用和效果
@PostMapping("/execute")public Result<String> secKill(@RequestBody SecKillParamVO paramVO) {if (paramVO == null) {return Result.error("入参为空");}paramVO.check();LOG.info("secKill:param:{}", paramVO);// 1. 用户维度限流(每个用户每秒最多5次请求)String userLimitKey = "seckill_limit:user:" + paramVO.getUserId();if (!limiter.tryAcquire(userLimitKey, 5, 5, 1)) {LOG.error("用户{}请求过于频繁,请稍后再试", paramVO.getUserId());return Result.error("请求过于频繁,请稍后再试");}// 2. 商品维度限流(每个商品每秒最多1000次请求)String itemLimitKey = "seckill_limit:item:" + paramVO.getItemId();if (!limiter.tryAcquire(itemLimitKey, 1000, 1000, 1)) {LOG.error("当前商品:{} 过于火爆,请稍后再试", paramVO.getItemId());return Result.error("当前商品过于火爆,请稍后再试");}// 秒杀业务String result = secKillService.executeSecKill(paramVO.getUserId(), paramVO.getItemId());return Result.success(result);}