Redisson中的RateLimiter令牌桶限流简单使用
部分内容来源:JavaGuide
初步了解
使用流程
RRateLimiter 的使用方式非常简单。我们首先需要获取一个RRateLimiter对象,直接通过 Redisson 客户端获取即可。然后,设置限流规则就好
redissonClient.getRateLimiter()
trySetRate()
// 创建一个 Redisson 客户端实例
RedissonClient redissonClient = Redisson.create();
// 获取一个名为 "javaguide.limiter" 的限流器对象
RRateLimiter rateLimiter = redissonClient.getRateLimiter("javaguide.limiter");
// 尝试设置限流器的速率为每小时 100 次
// RateType 有两种,OVERALL是全局限流,ER_CLIENT是单Client限流(可以认为就是单机限流)
rateLimiter.trySetRate(RateType.OVERALL, 100, 1, RateIntervalUnit.HOURS);
接下来我们调用 acquire()方法 或 tryAcquire() 方法即可获取许可
// 获取一个许可,如果超过限流器的速率则会等待
// acquire()是同步方法,对应的异步方法:acquireAsync()
rateLimiter.acquire(1);
// 尝试在 5 秒内获取一个许可,如果成功则返回 true,否则返回 false
// tryAcquire()是同步方法,对应的异步方法:tryAcquireAsync()
boolean res = rateLimiter.tryAcquire(1, 5, TimeUnit.SECONDS);
这行代码调用了 RRateLimiter
的 acquire
方法,传入参数 1
,表示尝试获取 1
个令牌。如果当前令牌桶中有足够的令牌,请求会立即获取到令牌并继续执行后续操作;如果令牌桶中没有足够的令牌,该方法会进入阻塞状态,等待直到有足够的令牌可以获取。
acquire()
是同步方法,意味着调用该方法时,程序会暂停执行,直到获取到令牌或者发生异常。对应的异步方法是 acquireAsync()
,使用异步方法时,程序不会阻塞,而是继续执行后续代码,当获取到令牌时会触发相应的回调
RRateLimiter
的 tryAcquire
方法,传入三个参数:1
表示尝试获取 1
个令牌;5
表示等待的时间;TimeUnit.SECONDS
表示时间单位为秒。该方法会尝试在指定的 5
秒时间内获取 1
个令牌,如果在 5
秒内成功获取到令牌,方法返回 true
;如果在 5
秒内没有获取到令牌,方法返回 false
。
同样,tryAcquire()
是同步方法,调用时程序会等待指定的时间。对应的异步方法是 tryAcquireAsync()
,使用异步方法时程序不会阻塞,而是在获取结果后触发回调
acquire 方法(一直阻塞)
acquire
是一个阻塞方法。当调用 acquire(n)
(n
表示要获取的令牌数量)时,如果当前令牌桶中没有足够的令牌,线程会进入阻塞状态,一直等待,直到有足够的令牌可供获取
tryAcquire 方法(限时阻塞)
tryAcquire
方法是非阻塞的,它有两种使用形式。一种是 tryAcquire(n)
,这种形式会立即尝试获取 n
个令牌,如果有足够的令牌,会获取并返回 true
;如果没有足够的令牌,会直接返回 false
,不会等待。
另一种形式是 tryAcquire(n, timeout, unit)
,它会在指定的 timeout
时间内尝试获取 n
个令牌,如果在该时间内获取到了令牌,返回 true
;如果超时还未获取到令牌,返回 false
acquireAsync() 方法
acquireAsync()
用于异步地从限流器中获取指定数量的令牌。与同步的 acquire()
方法不同,调用 acquireAsync()
时,线程不会被阻塞,方法会立即返回一个 CompletableFuture
对象,你可以通过该对象在令牌获取操作完成后执行相应的回调逻辑
tryAcquireAsync() 方法
tryAcquireAsync()
用于异步地尝试在指定时间内获取指定数量的令牌。它有两种重载形式:tryAcquireAsync(n)
和 tryAcquireAsync(n, timeout, unit)
。前者会立即尝试获取 n
个令牌,后者会在指定的 timeout
时间内尝试获取 n
个令牌。与同步的 tryAcquire()
方法类似,该方法会返回一个 CompletableFuture<Boolean>
对象,true
表示成功获取到令牌,false
表示未获取到令牌
Demo
初始化RedissonClient
引入依赖
<!-- https://mvnrepository.com/artifact/org.redisson/redisson-spring-boot-starter -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.45.1</version>
</dependency>
配置类注册为Bean
package com.example.transational.RateLimiter;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
// 也可以将 redis 配置信息保存到配置文件
config.useSingleServer().setAddress("redis://192.168.88.130:6379");
return Redisson.create(config);
}
}
简单的Main函数模拟使用
package com.example.transational.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RRateLimiter;
import org.redisson.api.RateIntervalUnit;
import org.redisson.api.RateType;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
public class RedissonRateLimiterExample {
private final RedissonClient redisson;
public RedissonRateLimiterExample() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://192.168.88.130:6379");
this.redisson = Redisson.create(config);
}
/**
* 令牌桶限流器
* @param key 限流键(如 "api:login:user123")
* @param rate 每秒允许的请求数
* @param burstCapacity 突发容量
* @return 是否允许访问
*/
public boolean tryAcquire(String key, long rate, long burstCapacity) {
RRateLimiter rateLimiter = redisson.getRateLimiter(key);
// 初始化限流规则(分布式环境下只会初始化一次)
rateLimiter.trySetRate(RateType.OVERALL, burstCapacity, rate, RateIntervalUnit.SECONDS);
return rateLimiter.tryAcquire(1);
}
public static void main(String[] args) throws InterruptedException {
// 创建限流器实例(初始化 Redisson 客户端连接)
RedissonRateLimiterExample limiter = new RedissonRateLimiterExample();
// 测试场景配置:
// - 限流键为 "api:payment"(支付接口)
// - 令牌桶参数:每秒生成5个令牌(持续处理能力),突发容量10个令牌(应对流量高峰),是令牌总数最高10个
// - 模拟连续发起15次请求(超过限流器容量)
for (int i = 0; i < 15; i++) {
// 尝试获取访问许可:
// 参数1: 业务标识键(可区分不同接口/用户)
// 参数2: 持续速率(每秒5个)
// 参数3: 突发容量(瞬时最多10个)
boolean allowed = limiter.tryAcquire("api:payment", 5, 10);
// 输出结果:
// 格式:请求序号 + 是否允许 + 剩余令牌数(示例中没有返回剩余数)
System.out.println("Request " + (i+1) + ": " + (allowed ? "Allowed" : "Denied"));
}
}
}
单元测试-简单使用基本函数
tryAcquire():
是抢到为ture,没抢到为false
acquire():
是阻塞当前线程,直到我们抢到为止
acquireAsync() 方法:
acquireAsync()
用于异步地从限流器中获取指定数量的令牌。与同步的 acquire()
方法不同,调用 acquireAsync()
时,线程不会被阻塞,方法会立即返回一个 CompletableFuture
对象,你可以通过该对象在令牌获取操作完成后执行相应的回调逻辑(我们可以等待异步执行完,然后对是否得到令牌进行判断)
tryAcquireAsync() 方法:
tryAcquireAsync()
用于异步地尝试在指定时间内获取指定数量的令牌。它有两种重载形式:tryAcquireAsync(n)
和 tryAcquireAsync(n, timeout, unit)
。前者会立即尝试获取 n
个令牌,后者会在指定的 timeout
时间内尝试获取 n
个令牌。与同步的 tryAcquire()
方法类似,该方法会返回一个 CompletableFuture<Boolean>
对象,true
表示成功获取到令牌,false
表示未获取到令牌
package com.example.transational;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.redisson.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@SpringBootTest
public class TestRatelimiter {
@Resource
RedissonClient redissonClient;
@Test
void test() throws ExecutionException, InterruptedException, TimeoutException {
// 获取一个名为 "javaguide.limiter" 的限流器对象
RRateLimiter rateLimiter = redissonClient.getRateLimiter("Kira-Test-RateLimiter");
// 初始化限流规则(分布式环境下只会初始化一次)
//burstCapacity 桶容量,我们这里初始化成10
//rate 每秒最多令牌数
//RateType的OverAll是全局模式,PER_CLIENT是单机模式
rateLimiter.trySetRate(RateType.OVERALL, 10, 5, RateIntervalUnit.SECONDS);
rateLimiter.acquire(1);//我们一直阻塞,直到抢到令牌
//抢到令牌后我们往下执行业务逻辑
boolean allowed1 = rateLimiter.tryAcquire(1);//如果我们超出限流限制,我们就返回false
if(allowed1){
//往下执行业务逻辑
}
rateLimiter.tryAcquire(10,1, TimeUnit.SECONDS);//尝试在1秒内获取10个令牌
// 异步获取令牌
RFuture<Boolean> future = rateLimiter.tryAcquireAsync();
// 异步回调处理
future.onComplete((allowed, exception) -> {
if (exception != null) {
System.err.println("限流请求异常: " + exception.getMessage());
return;
}
if (allowed) {
System.out.println("[" + Thread.currentThread().getName() + "] 请求通过");
// 执行业务逻辑
} else {
System.out.println("[" + Thread.currentThread().getName() + "] 请求被限流");
// 执行降级逻辑
}
});
// get(),得到返回值,为true就是拿到令牌,不为ture就是没拿到令牌阻塞等待结果(仅用于演示,实际生产环境建议用回调)
Boolean result = future.get(1, TimeUnit.SECONDS);
// 异步获取1个令牌(无限等待)
rateLimiter.acquireAsync().onComplete((res, ex) -> {
if (ex != null) {
System.err.println("获取令牌失败: " + ex.getMessage());
} else {
System.out.println("令牌获取成功,执行业务...");
}
});
// 超时等待,最多等待500ms
rateLimiter.acquireAsync(500)
.onComplete((success, ex) -> {
});
}
}