分布式限流算法与组件
固定时间窗口算法(计数法)
当 Redis 版本过低(不支持 CL.THROTTLE 命令)时,可以基于 Redis 的 INCR (原子递增)和 EXPIRE (设置过期时间)命令,自己实现一个简单的固定窗口限流算法。下面结合 ASP.NET Core 框架,以 Test 接口的 GET 请求为例,完整实现:
核心思路(固定窗口算法)
- 时间窗口划分:比如 60 秒一个窗口(如 00:00:00-00:00:59 为一个窗口)。
- 计数器:用 Redis 键存储当前窗口的请求数(键名包含窗口标识,如 rate_limit:api:test:202407231200 ,最后几位是分钟)。
- 原子操作:
- 每次请求进来,先计算当前窗口的 Redis 键。
- 用 INCR 命令递增计数器(原子操作,避免并发问题)。
- 若计数器是第一次创建,用 EXPIRE 设置过期时间(略大于窗口时长,如 61 秒,避免窗口切换时键未清理)。
- 若计数器值 ≤ 阈值,允许请求;否则限流。
具体实现(ASP.NET Core 接口)
步骤1:注入 Redis 服务(同上,确保已安装 StackExchange.Redis )
步骤2:编写限流工具类(封装核心逻辑)
using StackExchange.Redis;
using System;public class RedisRateLimiter
{private readonly IDatabase _redisDb;public RedisRateLimiter(ConnectionMultiplexer redis){_redisDb = redis.GetDatabase();}/// <summary>/// 检查是否允许请求/// </summary>/// <param name="resource">限流资源名(如接口标识)</param>/// <param name="maxRequests">窗口内最大请求数</param>/// <param name="windowSeconds">窗口时长(秒)</param>/// <returns>true=允许,false=限流</returns>public async Task<bool> AllowRequestAsync(string resource, int maxRequests, int windowSeconds){// 1. 生成当前窗口的 Redis 键(包含时间戳,确保每个窗口唯一)// 例如:rate_limit:api:test:1721721600(时间戳=当前时间/窗口秒数,取整数)long timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds() / windowSeconds;string redisKey = $"rate_limit:{resource}:{timestamp}";// 2. 原子递增计数器(+1)long currentCount = await _redisDb.StringIncrementAsync(redisKey, 1);// 3. 若第一次创建键,设置过期时间(窗口时长+1秒,避免窗口切换时键残留)if (currentCount == 1){await _redisDb.KeyExpireAsync(redisKey, TimeSpan.FromSeconds(windowSeconds + 1));}// 4. 判断是否超过阈值return currentCount <= maxRequests;}
}
步骤3:在控制器中使用限流工具类
using Microsoft.AspNetCore.Mvc;
using StackExchange.Redis;[ApiController]
[Route("api/[controller]")]
public class DemoController : ControllerBase
{private readonly RedisRateLimiter _rateLimiter;// 构造函数注入限流工具类public DemoController(ConnectionMultiplexer redis){_rateLimiter = new RedisRateLimiter(redis);}[HttpGet("Test")]public async Task<IActionResult> Test(){// 限流规则:60秒内最多允许100个请求(无突发容忍,如需可调整阈值)bool isAllowed = await _rateLimiter.AllowRequestAsync(resource: "api:test", // 资源标识(对应接口)maxRequests: 100, // 窗口内最大请求数windowSeconds: 60 // 窗口时长(秒));if (isAllowed){// 允许请求:执行业务逻辑return Ok("请求成功,返回数据");}else{// 限流:返回429状态码return StatusCode(429, "请求过于频繁,请稍后再试");}}
}
步骤4:在 Program.cs 中注册服务
var builder = WebApplication.CreateBuilder(args);
// 注册 Redis 连接(单例)
builder.Services.AddSingleton(
ConnectionMultiplexer.Connect(builder.Configuration[“Redis:ConnectionString”])
);
// 其他服务注册…
var app = builder.Build();
// …启动应用
关键说明
- 窗口键的生成:
用 当前时间戳 / 窗口秒数 得到窗口标识(如 60 秒窗口,时间戳 1721721600 和 1721721659 会被分到同一个窗口),确保同一窗口内的请求共用一个计数器。
计算 redisKey 时用的是「当前时间戳除以窗口时长」,结果会把同一窗口内的不同时间映射到同一个键。举个具体例子就清楚了:
举例说明(以 60 秒窗口为例)
假设窗口时长是 60 秒,当前时间戳(Unix 秒数)如下:
请求 1 时间:1721721600 秒(比如 2024-07-23 12:00:00)
请求 2 时间:1721721630 秒(同一分钟内的 12:00:30)
请求 3 时间:1721721659 秒(同一分钟内的 12:00:59)
请求 4 时间:1721721660 秒(下一分钟的 12:01:00)
计算 timestamp = 时间戳 / 60:
请求 1:1721721600 / 60 = 28695360
请求 2:1721721630 / 60 = 28695360(和请求 1 相同)
请求 3:1721721659 / 60 = 28695360(和请求 1 相同)
请求 4:1721721660 / 60 = 28695361(进入下一个窗口,键不同)
因此:
请求 1、2、3 会生成同一个 redisKey(如 rate_limit:api:test:28695360),共用一个计数器,确保 60 秒内总请求数被限制。
请求 4 进入下一个 60 秒窗口,生成新的 redisKey,计数器重新开始计算。
- 原子性保证:
StringIncrementAsync 是 Redis 原子命令,即使多个服务器同时请求,也能保证计数器准确递增,不会出现重复计数问题。 - 过期时间设置:
只在计数器第一次创建时设置过期时间( currentCount == 1 ),避免重复设置;过期时间比窗口时长多 1 秒,确保窗口结束后键被自动清理。 - 突发请求处理:
若需要容忍突发请求,可临时提高 maxRequests (如允许 10 个突发,就设为 110),但固定窗口算法在窗口切换时可能出现“临界问题”(如窗口末尾和开头各发 100 次,实际 2 秒内 200 次),如需更精准可改用滑动窗口(实现稍复杂)。
总结
这种基于 Redis INCR + EXPIRE 的实现,无需依赖高版本 Redis,兼容所有版本,且能满足大部分分布式限流场景(中小型系统足够用)。缺点是不精确,如果追求更精准的限流(如滑动窗口、令牌桶),可基于此思路扩展,或使用成熟库(如 Polly 的限流策略结合 Redis)。
滑动时间窗口算法
using StackExchange.Redis;
using System;
using System.Threading.Tasks;public class SlidingWindowRateLimiter
{private readonly IDatabase _redisDb;private readonly string _prefix = "sliding_rate_limit:";public SlidingWindowRateLimiter(IConnectionMultiplexer redisConnection){_redisDb = redisConnection.GetDatabase();}/// <summary>/// 判断请求是否允许通过滑动窗口限流/// </summary>/// <param name="resource">限流的资源标识(如API路径)</param>/// <param name="maxRequests">窗口内最大允许请求数</param>/// <param name="windowSeconds">窗口时长(秒)</param>/// <returns>是否允许请求通过</returns>public async Task<bool> AllowRequestAsync(string resource, int maxRequests, int windowSeconds){// 生成Redis键string redisKey = $"{_prefix}{resource}";// 获取当前Unix时间戳(秒)long now = DateTimeOffset.UtcNow.ToUnixTimeSeconds();// 计算窗口的起始时间(当前时间 - 窗口时长)long windowStartTime = now - windowSeconds;// 生成唯一标识,用于SortedSet的成员string requestId = Guid.NewGuid().ToString();// 使用Lua脚本确保操作的原子性string luaScript = @"-- 1. 移除窗口外的请求记录redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, ARGV[1])-- 2. 统计当前窗口内的请求数local currentCount = redis.call('ZCARD', KEYS[1])-- 3. 如果未超过阈值,添加当前请求if currentCount < tonumber(ARGV[2]) thenredis.call('ZADD', KEYS[1], ARGV[3], ARGV[4])-- 设置过期时间,避免内存泄漏redis.call('EXPIRE', KEYS[1], ARGV[5])return 1endreturn 0";// 执行Lua脚本var result = await _redisDb.ScriptEvaluateAsync(luaScript,new RedisKey[] { redisKey },new RedisValue[] { windowStartTime, // ARGV[1]: 窗口起始时间maxRequests, // ARGV[2]: 最大请求数now, // ARGV[3]: 当前时间戳(作为score)requestId, // ARGV[4]: 请求唯一标识(作为member)windowSeconds + 1 // ARGV[5]: 过期时间(窗口时长+1秒,确保窗口外的键被清理)});// 结果为1表示允许请求,0表示限流return (long)result == 1;}
}
这段代码是基于 Redis 实现的滑动窗口限流算法,核心通过 Lua 脚本保证操作的原子性,避免并发场景下的计数误差。我们来逐部分解析:
一、核心数据结构:Redis 有序集合(Sorted Set,ZSet)
整个限流逻辑依赖 Redis 的有序集合(ZSet) 实现,原因是 ZSet 有两个关键特性:
每个成员(member)对应一个分数(score),可按分数范围高效操作
支持按分数范围删除成员、统计成员数量等操作,适合时间窗口内的计数场景
二、Lua 脚本详解(核心限流逻辑)
Lua 脚本的作用是将多个 Redis 操作打包成一个原子操作(Redis 会单线程执行整个脚本,中间不会被其他请求打断),确保限流逻辑的准确性。
脚本变量说明
KEYS[1]:Redis 中的键名(唯一),对应一个 ZSet,用于存储当前窗口内的请求记录
ARGV:参数数组,包含 5 个具体值(下文会对应到 C# 代码中的参数)
脚本逻辑分步解析
lua
– 1. 移除窗口外的请求记录 不存在按照空集合处理,不会报错
redis.call(‘ZREMRANGEBYSCORE’, KEYS[1], 0, ARGV[1])
作用:清理 “当前统计窗口” 之外的历史请求(避免旧数据干扰计数)
命令解释:ZREMRANGEBYSCORE 是 ZSet 的命令,用于删除 “分数在 [0, ARGV [1]] 范围内” 的所有成员
变量对应:ARGV[1] 是 “窗口起始时间戳”(C# 代码中的windowStartTime),即只保留 “时间戳> 窗口起始时间” 的请求(这些是当前窗口内的请求)
lua
– 2. 统计当前窗口内的请求数 不存在按照空集合处理,不会报错
local currentCount = redis.call(‘ZCARD’, KEYS[1])
作用:计算当前窗口内还剩多少请求(经过第一步清理后)
命令解释:ZCARD 是 ZSet 的命令,用于获取集合的成员总数
结果:currentCount 就是当前窗口内的请求数量
lua
– 3. 如果未超过阈值,添加当前请求
if currentCount < tonumber(ARGV[2]) then
– 第一个请求时,会自动创建一个zset数据结构
redis.call(‘ZADD’, KEYS[1], ARGV[3], ARGV[4])
– 设置过期时间,避免内存泄漏 每一次调用会刷新过期时间
redis.call(‘EXPIRE’, KEYS[1], ARGV[5])
return 1
end
return 0
条件判断:ARGV[2] 是 “最大允许请求数”(C# 代码中的maxRequests),如果当前请求数小于阈值,则允许本次请求
添加当前请求:ZADD 是 ZSet 的命令,用于添加成员。这里:
ARGV[3] 是 “当前请求的时间戳”(C# 代码中的now),作为该成员的score(用于后续窗口判断)
ARGV[4] 是 “请求唯一标识”(C# 代码中的requestId),作为该成员的member(确保每个请求唯一)
设置过期时间:EXPIRE 用于给 ZSet 设置过期时间,ARGV[5] 是 “窗口时长 + 1 秒”(C# 代码中的windowSeconds + 1),避免键永久占用内存
返回值:1 表示允许请求,0 表示触发限流
三、C# 代码参数对应关系
C# 代码中通过ScriptEvaluateAsync执行 Lua 脚本,参数对应关系如下:
Lua 中的变量 C# 代码中的参数 含义
KEYS[1] redisKey Redis 中 ZSet 的键名(唯一标识一个限流窗口,如 “rate_limit:user123”)
ARGV[1] windowStartTime 窗口起始时间戳(如当前时间 - 窗口时长,单位通常是毫秒)
ARGV[2] maxRequests 窗口内允许的最大请求数(限流阈值,如 100 次 / 分钟)
ARGV[3] now 当前请求的时间戳(作为 ZSet 成员的 score)
ARGV[4] requestId 当前请求的唯一标识(作为 ZSet 成员的 member,避免重复计数)
ARGV[5] windowSeconds + 1 ZSet 的过期时间(窗口时长 + 1 秒,确保窗口外的键被自动清理)
四、整体逻辑总结
清理旧数据:先删除窗口外的请求(时间戳 < 窗口起始时间),确保只统计当前窗口内的请求
统计当前请求数:计算当前窗口内剩余的请求数量
判断是否限流:如果当前数量 < 阈值,则添加本次请求并允许通过;否则拒绝请求
通过 ZSet 的分数(时间戳)管理请求的时间范围,结合 Lua 的原子性,实现了高效、准确的滑动窗口限流。
在上述滑动时间窗口的 C# 实现中,并没有显式地将时间窗口分割成固定数量的小格子,而是采用了更精细的 “基于每个请求时间戳” 的实时计算方式,核心逻辑是:直接以 “当前时间往前推 windowSeconds 秒” 作为动态窗口,通过 Redis 的 SortedSet 实时过滤并统计这个动态窗口内的所有请求。
与 “固定小格子拆分” 的区别
如果用 “固定小格子拆分” 的思路(例如将 120 秒窗口拆分为 6 个 20 秒格子),需要预先定义格子数量和每个格子的时长。但上述实现采用了更灵活的方式:
不依赖预设的格子数量,而是通过ZREMRANGEBYSCORE命令,实时删除所有 “时间戳 < 当前时间 - windowSeconds” 的请求(即超出当前窗口的请求)。
剩余的请求均属于 “当前时间往前推 windowSeconds 秒” 的动态窗口内,直接通过ZCARD统计总数,无需拆分格子。
为什么不拆分格子?
这种实现的优势在于:
更高精度:每个请求的时间戳直接参与计算,无需依赖格子粒度,避免了 “格子拆分过粗导致的精度不足” 问题。
简化逻辑:无需维护格子的编号和对应关系,通过 Redis 的 SortedSet 天然支持按时间戳范围过滤和统计。
动态适配:无论 windowSeconds 是 120 秒还是其他值,逻辑无需调整,通用性更强。
漏桶限流算法
using StackExchange.Redis;
using System;
using System.Threading.Tasks;public class DistributedLeakyBucketRateLimiter
{private readonly IConnectionMultiplexer _redis;public DistributedLeakyBucketRateLimiter(IConnectionMultiplexer redis){_redis = redis;}public async Task<bool> AllowRequestAsync(string resource, int maxRequests, int windowSeconds){var db = _redis.GetDatabase();var now = DateTime.UtcNow;// 资源键名,避免冲突var key = $"leaky_bucket:{resource}";// 使用 Lua 脚本保证原子操作var luaScript = @"-- 获取当前时间戳(毫秒)local now = tonumber(ARGV[1])-- 解析参数local maxRequests = tonumber(ARGV[2])local windowSeconds = tonumber(ARGV[3])-- 获取当前桶状态local state = redis.call('HMGET', KEYS[1], 'volume', 'last_update')local volume = tonumber(state[1])local lastUpdate = tonumber(state[2])-- 初始化桶状态(如果不存在)if not volume thenvolume = 0lastUpdate = nowend-- 计算漏出量local elapsed = (now - lastUpdate) / 1000 -- 转换为秒local leakRate = maxRequests / windowSecondslocal leakedVolume = elapsed * leakRate-- 更新桶状态volume = math.max(0, volume - leakedVolume)lastUpdate = now-- 检查是否可以处理请求if volume < maxRequests thenvolume = volume + 1-- 更新 Redisredis.call('HMSET', KEYS[1], 'volume', volume,'last_update', lastUpdate)-- 设置过期时间(窗口的2倍)redis.call('EXPIRE', KEYS[1], windowSeconds * 2)return 1 -- 允许请求else-- 更新最后更新时间(但不增加水量)redis.call('HSET', KEYS[1], 'last_update', lastUpdate)redis.call('EXPIRE', KEYS[1], windowSeconds * 2)return 0 -- 拒绝请求end";// 执行 Lua 脚本(原子操作)var result = (int)await db.ScriptEvaluateAsync(luaScript,new RedisKey[] { key },new RedisValue[] {now.Ticks / TimeSpan.TicksPerMillisecond, // 当前时间戳(毫秒)maxRequests,windowSeconds});return result == 1;}
}
例如,假设资源名为"api1",则Redis中会有一个键为leaky_bucket:api1
的Hash。其内容可能如下:
HGETALL leaky_bucket:api1
- “volume”
- “3”
- “last_update”
- “1650000000000”
令牌桶算法
using StackExchange.Redis;
using System;
using System.Threading.Tasks;public class DistributedTokenBucketRateLimiter
{private readonly IConnectionMultiplexer _redis;public DistributedTokenBucketRateLimiter(IConnectionMultiplexer redis){_redis = redis;}public async Task<bool> AllowRequestAsync(string resource, int maxRequests, int windowSeconds){var db = _redis.GetDatabase();var now = DateTime.UtcNow;// 资源键名var key = $"token_bucket:{resource}";// 计算令牌生成速率(每秒)double refillRate = (double)maxRequests / windowSeconds;// Lua 脚本保证原子操作var luaScript = @"-- 获取当前时间戳(毫秒)local now = tonumber(ARGV[1])-- 解析参数local capacity = tonumber(ARGV[2])local refillRate = tonumber(ARGV[3])local requested = 1 -- 每次请求消耗1个令牌-- 获取当前桶状态local state = redis.call('HMGET', KEYS[1], 'tokens', 'last_refill')local tokens = state[1]local lastRefill = state[2]-- 初始化桶状态(如果不存在)if not tokens thentokens = capacitylastRefill = nowelsetokens = tonumber(tokens)lastRefill = tonumber(lastRefill)end-- 计算需要补充的令牌数local elapsed = (now - lastRefill) / 1000 -- 转换为秒local refillAmount = elapsed * refillRate-- 补充令牌(不超过桶容量)if refillAmount > 0 thentokens = math.min(capacity, tokens + refillAmount)lastRefill = nowend-- 检查是否有足够令牌local allowed = falseif tokens >= requested thentokens = tokens - requestedallowed = trueend-- 更新 Redisredis.call('HMSET', KEYS[1],'tokens', tokens,'last_refill', lastRefill)-- 设置过期时间(窗口的2倍)redis.call('EXPIRE', KEYS[1], ARGV[4])return allowed and 1 or 0";// 执行 Lua 脚本var result = (int)await db.ScriptEvaluateAsync(luaScript,new RedisKey[] { key },new RedisValue[] {now.Ticks / TimeSpan.TicksPerMillisecond, // 当前时间戳(毫秒)maxRequests, // 桶容量refillRate, // 令牌补充速率windowSeconds * 2 // 过期时间});return result == 1;}
}
令牌桶 vs 漏桶 关键区别
特性 令牌桶 漏桶
数据结构 存储可用令牌数 (tokens) 存储当前水量 (volume)
请求处理 消耗令牌 (令牌数-1) 增加水量 (水量+1)
速率控制 固定速率生成令牌 固定速率漏水
突发流量 允许突发(消耗积攒的令牌) 严格平滑(固定出水速率)
典型应用场景 API限流、网络流量控制 流量整形、严格平滑控制
Redis更新逻辑 先补充令牌再消耗 先漏水再加水
限流组件
Sentinel(阿里巴巴开源)
核心功能:流量控制、熔断降级、系统自适应保护、热点参数限流。
特点:
无缝集成 Spring Boot 、.net Asp core等web框架,Spring Cloud、Dubbo 等微服务框架,支持动态规则配置49。
提供实时监控面板(Dashboard),可动态调整限流策略9。
适用于 API 网关、微服务接口限流等场景。
典型使用:通过 @SentinelResource 注解定义资源并配置 QPS 阈值。
Nginx 的限流能力
- 限流对象灵活
全局限流:对整个域名(如 server_name)的所有接口生效79。
接口级限流:通过 location 匹配特定 URL 路径,仅对该路径下的请求限流17。
客户端级限流:按 IP($binary_remote_addr)限制单个客户端的请求速率或并发连接数310。
- 支持的限流类型
请求频率限流(ngx_http_limit_req_module)
基于漏桶算法,限制每秒/每分钟请求数(QPS),适用于防刷接口16。
nginx
http {
limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s; # 定义限流区域
server {
location /api/user { # 针对特定接口
limit_req zone=api_limit burst=20 nodelay; # 限流规则
}
}
}
并发连接数限流(ngx_http_limit_conn_module)
限制同一时刻的并发连接数,防止后端资源耗尽310。
nginx
http {
limit_conn_zone $binary_remote_addr zone=conn_limit:10m;
server {
location /api/order { # 针对订单接口
limit_conn conn_limit 5; # 单IP并发连接≤5
}
}
}
🎯 二、如何针对单个接口限流?
通过 location 路径匹配 实现接口级限流。以下是典型配置示例:
nginx
http {
# 定义限流规则:10MB 存储空间,每秒处理10个请求
limit_req_zone $binary_remote_addr zone=per_api:10m rate=10r/s;
server {location /api/payment { # 支付接口limit_req zone=per_api burst=5 nodelay; # 突发流量缓冲5个请求proxy_pass http://backend_service; # 转发到后端}location /api/info { # 非关键信息接口limit_req zone=per_api burst=10; # 无突发延迟}
}
}
关键参数:
burst:允许的突发请求队列大小(缓冲桶容量)26。
nodelay:立即处理突发请求(否则按速率延迟处理)27。
效果:
超过 rate + burst 的请求直接返回 503 错误(可自定义状态码)14。
⚙️ 三、高级场景配置
- 黑白名单过滤
结合 deny 或 allow 拦截恶意 IP:
nginx
location /api/sensitive {
deny 192.168.1.100; # 封禁特定IP
allow 10.0.0.0/8; # 允许内网IP
deny all; # 其他全部拒绝
limit_req …; # 限流规则
}
:cite[7]
2. 后端服务保护(ngx_http_upstream_module)
限制转发到后端服务器的并发连接数:
nginx
upstream backend {
server 192.168.1.2:8080 max_conns=50; # 单台后端最大并发50
}
server {
location / {
proxy_pass http://backend;
}
}
:cite[9]