当前位置: 首页 > news >正文

每秒扛住10万请求?RedissonRateLimiter 分布式限流器详解

目录

  • Java源码分析
  • Lua脚本分析
    • 创建限流器
    • 尝试获取令牌
  • 案例实战
    • 第一次请求进入
    • 第二次请求进入


种一棵树最好的时间是10年前,其次就是现在,加油!
                                                                                   --by蜡笔小柯南

RedissonRateLimiter作为方便好用的限流工具,在某些场景下,极简了我们的开发,通过简单几行代码,就能搞定限流。那么,如何好用的限流器,底层是如何实现的呢?接下来,让我们一起去探索!

Java源码分析

这是 RedissonRateLimiter 类下的 tryAcquireAsync 方法,限流的主要逻辑在这里面

private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {byte[] random = getServiceManager().generateIdArray();return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"local rate = redis.call('hget', KEYS[1], 'rate');"+ "local interval = redis.call('hget', KEYS[1], 'interval');"+ "local type = redis.call('hget', KEYS[1], 'type');"+ "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"+ "local valueName = KEYS[2];"+ "local permitsName = KEYS[4];"+ "if type == '1' then "+ "valueName = KEYS[3];"+ "permitsName = KEYS[5];"+ "end;"+ "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "+ "local currentValue = redis.call('get', valueName); "+ "local res;"+ "if currentValue ~= false then "+ "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "+ "local released = 0; "+ "for i, v in ipairs(expiredValues) do "+ "local random, permits = struct.unpack('Bc0I', v);"+ "released = released + permits;"+ "end; "+ "if released > 0 then "+ "redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "+ "if tonumber(currentValue) + released > tonumber(rate) then "+ "currentValue = tonumber(rate) - redis.call('zcard', permitsName); "+ "else "+ "currentValue = tonumber(currentValue) + released; "+ "end; "+ "redis.call('set', valueName, currentValue);"+ "end;"+ "if tonumber(currentValue) < tonumber(ARGV[1]) then "+ "local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); "+ "res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));"+ "else "+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "+ "redis.call('decrby', valueName, ARGV[1]); "+ "res = nil; "+ "end; "+ "else "+ "redis.call('set', valueName, rate); "+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "+ "redis.call('decrby', valueName, ARGV[1]); "+ "res = nil; "+ "end;"+ "local ttl = redis.call('pttl', KEYS[1]); "+ "if ttl > 0 then "+ "redis.call('pexpire', valueName, ttl); "+ "redis.call('pexpire', permitsName, ttl); "+ "end; "+ "return res;",Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),value, System.currentTimeMillis(), random);}

可以看到,使用了大量的 lua 脚本,其中,KEYS[1]、ARGV[1]分别表示取的key的第1个值,以及参数的第1个值,中括号里面的数字,表示取的是第几个值。

我们看一下 evalWriteAsync 方法的定义,如下:

<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);

最后两个参数,一个是 List<Object> keys,一个是 Object... params。KEYS[1]就是从 keys 中取第1个值,ARGV[1]就是从 params 中取第1个值。

Lua脚本分析

创建限流器

使用 Hash 结构来保存

保存和获取的命令分别为:

  • hset key field value
  • hget key field
redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);
redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);
return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);

创建限流器,向redis中保存了 rateintervaltype 信息

尝试获取令牌

通过 Lua 脚本实现,我把 Lua 脚本拷贝出来,写了注释

-- 速率,即规定的时间内,能够通过多少个
local rate = redis.call('hget', KEYS[1], 'rate');
-- 时间间隔,即规定的时间,如60秒
local interval = redis.call('hget', KEYS[1], 'interval');
-- 类型,用于区分单机限流还是集群限流,默认type=0,所以这里不用关注
local type = redis.call('hget', KEYS[1], 'type');
-- 校验,rate!=false && interval!=false && type!=false,其中任何一个不满足则抛出异常:RateLimiter is not initialized
assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')-- {keyName}:value,大括号内是key的名称,后面+冒号+value共同组成valueName
local valueName = KEYS[2];-- {keyName}:permits,大括号内是key的名称,后面+冒号+ permits共同组成permitsName
local permitsName = KEYS[4];-- type即为第6行获取到的type的值,当type=1时才会走下面的逻辑,由于type默认为0,所以这里不用关注
if type == '1' then valueName = KEYS[3];permitsName = KEYS[5];
end;-- ARGV[1]是请求的令牌数量,rate必须要比请求的大
-- 如:60秒允许1个,rate=1,如果请求的令牌数是2,即rate<ARGV[1],则抛出异常,异常信息为:Requested permits amount could not exceed defined rate
assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); -- get valueName --> get {keyName}:value,第一次执行redis中什么都还没有,所以currentValue为null
-- 第二次执行,currentValue有值
local currentValue = redis.call('get', valueName); 
local res;-- 第一次:currentValue为null,进入else分支
-- 第二次,currentValue不为null,进入下面的if分支
if currentValue ~= false then -- 从zset中取数据,{keyName}:permits为key,取的区间是 0 ~ (当前时间戳 - 时间间隔)-- 如:第一次请求的时间是8:00:00,第二次请求的时间是8:00:30,当前时间戳就是8点30秒,时间间隔是60秒-- 8:00:30减去60秒,7:59:30,则取的数据就是0 ~ 7:59:30 这区间的数据-- 由于第一次的请求时间是8:00:00,所以取出的expiredValues值为空,也就是过期的数据local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); local released = 0; -- 遍历expiredValues,如果expiredValues有值,released=之前所有请求的令牌数之和for i, v in ipairs(expiredValues) do local random, permits = struct.unpack('Bc0I', v);-- 表示释放或者回收已过期的令牌,供下次请求使用released = released + permits;end; -- released大于0,说明有过期的数据,如果没有,不会进入此if分支if released > 0 then -- {keyName}:permits为key,区间是0 ~ (当前时间戳 - 时间间隔),将所有过期的数据从zset中移除redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); -- 如果当前令牌数 + 释放(回收)的令牌数 > rateif tonumber(currentValue) + released > tonumber(rate) then -- 当前的令牌数 = rate - zset中的元素个数currentValue = tonumber(rate) - redis.call('zcard', permitsName); else -- 当前的令牌数 = 当前的令牌数 + 释放(回收)的令牌数currentValue = tonumber(currentValue) + released; end; -- 将更新后的当前令牌数的值,重新保存到{keyName}:value中,表示当前可用的令牌总数redis.call('set', valueName, currentValue);end;-- 如果当前的令牌数小于请求的令牌数if tonumber(currentValue) < tonumber(ARGV[1]) then -- 获取zset中最早的一次请求,返回的firstValue中包含此次请求的令牌数和请求时间local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); -- ARGV[2]:当前时间戳 firstValue[2]:最早请求的时间戳,这里不同的版本源码有区别-- 最终表示的是,距离下次令牌产生还需要多长时间res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));-- 表示当前的令牌数>=请求的令牌数else -- 更新zset,当前请求的令牌数以及当前时间戳redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); -- 对{keyName}:value做自减,总的令牌数 - 请求的令牌数redis.call('decrby', valueName, ARGV[1]); res = nil; end; else -- set一个key-value数据,记录当前限流器的令牌数量-- set {keyName}:value rateredis.call('set', valueName, rate); -- 保存一个zset数据,{keyName}:permits为key,score是当前时间戳,member是经过格式化的,在redis可视化工具中-- 显示的是编码。实际上记录的就是这次请求的令牌数。所以,这个zset的数据,保存的就是请求的时间戳和请求的令牌数量redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); -- 对{keyName}:value做自减,总的令牌数 - 请求的令牌数redis.call('decrby', valueName, ARGV[1]); res = nil; 
end;-- 获取key的过期时间,如果设置了过期时间,则进行续期
local ttl = redis.call('pttl', KEYS[1]); 
if ttl > 0 then redis.call('pexpire', valueName, ttl); redis.call('pexpire', permitsName, ttl); 
end; 
return res;

redisson使用了zset 数据结构来保存请求的信息,这样可以通过比较 score 分数,也就是请求的时间戳,来判断令牌需不需要生产。如果当前请求距离上一次请求超过了一个令牌生产周期,则说明令牌需要生产,之前用掉了多少个,就生产多少个,而之前用掉了多少个令牌的信息也在 zset 中保存了;如果没有超过一个令牌周期,则判断剩余可用的令牌数是否满足请求的令牌数,如果满足,则通过,如果不满足,则拒绝。

案例实战

比如:我们设置,60秒内只允许1个请求通过,也就是说,每60秒,才会生产1个令牌,每次只允许请求1个令牌

这里我们使用发短信场景,60秒内,只允许发送1条短信

key为:telephone:limit:13612345678
rate为:1
interval为:60秒

创建完成后,redis中保存的数据结构是这样的,我们传入的是60秒,在实际保存时会转换为毫秒,所以interval的值是60000

在这里插入图片描述

第一次请求进入

分析源码可得,Lua 脚本中,KEYS[1]的值就是key的值,即KEYS[1] = telephone:limit:13612345678

local rate = redis.call('hget', KEYS[1], 'rate');
local interval = redis.call('hget', KEYS[1], 'interval');
local type = redis.call('hget', KEYS[1], 'type');

Lua 脚本中,这3条语句,转换为redis的实际命令就是:

  • hget telephone:limit:13612345678 rate
  • hget telephone:limit:13612345678 interval
  • hget telephone:limit:13612345678 type

获取到的值就是redis中hash结构保存的信息,得到:

  • rate:1
  • interval:60000
  • type:0

下面,继续执行这条语句:

assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')

这条语句对上面获取到的3个字段做了校验,rate、interval、type 必须同时不为空,否则,抛出异常,异常信息为:RateLimiter is not initialized

下面,继续执行以下语句:

local valueName = KEYS[2];
local permitsName = KEYS[4];
if type == '1' thenvalueName = KEYS[3];permitsName = KEYS[5];
end;

经源码分析可得
KEYS[2] = {telephone:limit:13612345678}:value
KEYS[4] = {telephone:limit:13612345678}:permits

即:
valueName = {telephone:limit:13612345678}:value
permitsName = {telephone:limit:13612345678}:permits

由于 type = 0 ,所以这里的if分支不进入。继续执行后面语句

assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'

这条语句是校验:请求的令牌数不能超过设置的 rate 数,如:我们 rate 设为了1,请求的令牌数是2,则抛出异常,异常信息为:Requested permits amount could not exceed defined rate

继续执行后续语句,

local currentValue = redis.call('get', valueName); 
local res;
if currentValue ~= false then

currentValue 实际执行的redis命令是:

  • get {telephone:limit:13612345678}:value

由于是第一次请求,此时redis中除了保存的 hash 结构的数据外,没有其他任何数据,所以 currentValue 获取到的就是null,再判断 currentValue,由于它为null,if 分支进不去,直接到else分支

else redis.call('set', valueName, rate); redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); redis.call('decrby', valueName, ARGV[1]); res = nil; 
end;

set valueName rate 转换为redis命令就是:

set {telephone:limit:13612345678}:value 1

此时,currentValue才保存到redis中

再向redis中保存一个 zset ,key是permitsName,即{telephone:limit:13612345678}:permits,redis中 zadd 的命令格式是:
zadd key socre member ,最终,score就是当前的时间戳,member就是当前请求的令牌数

在这里插入图片描述

最后,使用 decrby 命令对 valueName 进行自减
decrby {telephone:limit:13612345678}:value 1

自减1后,此时,redis中的值为0

在这里插入图片描述

继续执行后续语句:

local ttl = redis.call('pttl', KEYS[1]);
if ttl > 0 thenredis.call('pexpire', valueName, ttl);redis.call('pexpire', permitsName, ttl);
end
return res;

获取过期时间,默认我们没有手动设置过期时间,所以获取到的 ttl = -1 ,不满足 if 条件,if 分支不执行。后续我们直接略过此段代码。

至此,第一次请求完成,向redis中新增的key有:

  • {telephone:limit:13612345678}:value
    • String 结构,值为0
  • {telephone:limit:13612345678}:permits
    • hash 结构,member为请求的令牌数,score为这次请求的时间戳

第二次请求进入

当执行语句到local currentValue = redis.call('get', valueName);时,currentValue有值,为0,就会进入if currentValue ~= false分支

后续执行的语句为:

if currentValue ~= false thenlocal expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);local released = 0;for i, v in ipairs(expiredValues) dolocal random, permits = struct.unpack('Bc0I', v);released = released + permits;
end;

redis.call 实际执行的redis命令是:zrangebyscore {telephone:limit:13612345678}:permits 0 当前时间-interval,从zset中取数据,key是{telephone:limit:13612345678}:permits,范围是 0 ~ (当前时间戳 - interval)。

例如:第一次请求的时间是8:00:00,请求的令牌数量是1个,那么zset中就会记录第一次请求的时间和请求的令牌数量;第二次的请求时间是8:00:30,interval是60秒,第二次请求的时间减去interval 就是7:59:30,获取的zset数据范围是 0 ~ 7:59:30,因为第一次请求是8:00:00,所以这次获取不到数据,expiredValues就是空,released = 0,for循环不会进入,此语句块执行结束。

因为 released = 0,所以 if released > 0 的分支不会进入,直接走到后面的语句。

if tonumber(currentValue) < tonumber(ARGV[1]) thenlocal firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores');res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));
elseredis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1]));redis.call('decrby', valueName, ARGV[1]);res = nil;
end;

currentValue是当前的令牌数,ARGV[1] 是请求的令牌数,如果当前的令牌数 < 请求的令牌数,则获取zset中按score排序后,score最小的那一条数据,即时间最早的那一条数据,转换为redis命令是:zrange {telephone:limit:13612345678}:permits 0 0 withscores,如果firstValue有值,说明这次请求应该被拒绝,没有可用的令牌提供给这次请求,返回距离下次令牌产生还需要多长时间。

如:60秒允许1个请求通过,9:00:00第一个请求,请求1个令牌,成功通过,令牌分配给第一个请求,此时当前可用令牌为0,9:00:05第二个请求,请求1个令牌,当前已没有空闲可用的令牌,得需要下一个时间周期后才会产生新的令牌,也就是60秒后,第二个请求只比第一个请求晚了5秒,小于令牌产生的周期,所以,第二个请求则不通过,返回距离下次令牌产生还需要多长时间。

下面是else分支执行的语句:

elseredis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); redis.call('decrby', valueName, ARGV[1]); res = nil; 
end; 

如果当前的令牌数 >= 请求的令牌数,走入else分支,这里就比较简单,向zset中保存数据,对valueName做自减1,结束。

下面讨论released > 0 的情况:

if currentValue ~= false thenlocal expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);local released = 0;for i, v in ipairs(expiredValues) dolocal random, permits = struct.unpack('Bc0I', v);released = released + permits;
end;
if released > 0 thenredis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);if tonumber(currentValue) + released > tonumber(rate) thencurrentValue = tonumber(rate) - redis.call('zcard', permitsName);elsecurrentValue = tonumber(currentValue) + released;endredis.call('set', valueName, currentValue);
end

released > 0 说明 firstValue有值,进入for循环后,给released进行了赋值。

例如:第一次请求的时间是8:00:00,请求的令牌数量是1个,那么zset中就会记录第一次请求的时间和请求的令牌数量;第二次的请求时间是8:01:01,interval是60秒,第二次请求的时间减去interval 就是8:00:01,获取的zset数据范围是 0 ~ 8:00:01,因为第一次请求是8:00:00,所以能获取到数据,expiredValues中记录了请求的令牌数以及时间,就是已过期的数据。

进入for循环,permits就是需要释放的令牌数,因为第一次请求的令牌数是1,所以permits = 1,released = released + permits,released = 0 + 1 = 1,released > 0,进入 if 分支。

使用 zremrangebyscore 移除过期的数据,移除的就是expiredValues,即数据范围是0 ~ 8:00:01的数据。下面又做了一层校验,如果 当前令牌数 + 释放的令牌数 > rate,则 当前令牌数 = rate - zset中的元素个数,保证总数不能超过设置的rate;否则,进入else分支,将释放的令牌数加到当前令牌数中,对currentValue进行赋值,更新当前令牌数的值。


完结散花!这一套组合拳分析下来,你学废了吗?

如果你有任何疑问或经验分享,可以在评论区留言哦~~

不管在任何时候,我希望你永远不要害怕挑战,不要畏惧失败。每一个错误都是向成功迈出的一步,每一个挑战都是成长的机会,因为每一次的努力,都会使我们离梦想更近一点。只要你行动起来,任何时候都不算晚。最后,把座右铭送给大家:种一棵树最好的时间是10年前,其次就是现在,加油!共勉 💪。

文章转载自:

http://FIyLvm65.LfLnb.cn
http://p9wYlRHU.LfLnb.cn
http://94YxXO7D.LfLnb.cn
http://i8eJCArx.LfLnb.cn
http://jfMepb3b.LfLnb.cn
http://KQoFjX39.LfLnb.cn
http://NU5VeFfH.LfLnb.cn
http://9Z04Gw0L.LfLnb.cn
http://1NSMqHCE.LfLnb.cn
http://TY42G6la.LfLnb.cn
http://4Db12Er2.LfLnb.cn
http://SRvQER84.LfLnb.cn
http://E5lMIDHO.LfLnb.cn
http://xr9foLBm.LfLnb.cn
http://exe0tt3O.LfLnb.cn
http://NjJqvvr3.LfLnb.cn
http://i4yAhxYE.LfLnb.cn
http://Zpxwlxzt.LfLnb.cn
http://ood1zGCZ.LfLnb.cn
http://IsMIUGUu.LfLnb.cn
http://UTd6g9kS.LfLnb.cn
http://TDm2ESsh.LfLnb.cn
http://kTqGHVXJ.LfLnb.cn
http://wZlxBldC.LfLnb.cn
http://yD6mHMX8.LfLnb.cn
http://4g7wfvg9.LfLnb.cn
http://x8Ib0LJj.LfLnb.cn
http://2F3q3TAC.LfLnb.cn
http://geWynkqs.LfLnb.cn
http://ZrSnpQWd.LfLnb.cn
http://www.dtcms.com/a/363354.html

相关文章:

  • 【机器学习深度学习】向量检索到重排序:RAG 系统中的优化实践
  • 好消息:Oracle 23ai 现已支持一键部署!
  • ThinkPHP的log
  • 使用 C 模仿 C++ 模板的拙劣方法
  • Flutter 3.35.2 主题颜色设置指南
  • 揭密设计模式:像搭乐高一样构建功能的装饰器模式
  • 《Vue进阶教程》(7)响应式系统介绍
  • 05 Centos 7尝试是否有网络
  • 基于STM32与华为云联动的智能电动车充电桩管理系统
  • Stop-Process : 由于以下错误而无法停止进程“redis-server (26392)”: 拒绝访问。
  • Python OpenCV图像处理与深度学习:Python OpenCV DNN模块深度学习与图像处理
  • PHP的error_log()函数
  • 智慧工地如何撕掉“高危低效”标签?三大社会效益重构建筑业价值坐标
  • 一款开源的CMS系统简介
  • 优秀开源内容转自公众号后端开发成长指南
  • QuickUp-Ubuntu
  • js设计模式-职责链模式
  • 【音视频】Opus 编码格式介绍
  • WPF应用程序资源和样式的使用示例
  • HarmonyOS 应用开发新范式:深入剖析 Stage 模型与 ArkUI 最佳实践
  • 基于vue3和springboot框架集成websocket
  • 网络数据包是怎么在客户端和服务端之间进行传输的?
  • C#实现与西门子S7-1200_1500 PLC通信
  • qt QWebSocket详解
  • 系统扩展策略
  • 【LeetCode_26】删除有序数组中的重复项
  • 小迪web自用笔记24
  • GPT-5论文选题实测:如何从2000篇文献中提炼出3个可快速落地的高命中选题?
  • 从零开始学Vue3:Vue3的生命周期
  • Leetcode二分查找(4)