当前位置: 首页 > news >正文

利用 Google Guava 的令牌桶限流实现数据处理限流控制

目录

一、令牌桶限流机制原理

二、场景设计与目标

三、核心实现代码(Java)

1. 完整代码实现

四、运行效果分析

五、应用建议


在高吞吐数据处理场景中,如何限制数据处理速率、保护系统资源、防止下游服务过载是系统设计中重要的环节。本文将介绍一种简单实用的限流方式 —— 基于 Google Guava 的令牌桶限流机制(Token Bucket),并通过实际代码演示如何将其应用于数据处理任务中。

一、令牌桶限流机制原理

令牌桶算法(Token Bucket)是一种常见的流量控制算法。其基本原理如下:

  • 系统按照固定速率(如每秒 5 个)往桶中放入令牌;

  • 每次处理数据前,需要从桶中取出一个令牌;

  • 若桶中有令牌,则允许处理数据;

  • 若桶中无令牌,当前请求会被阻塞或丢弃(根据实现策略);

  • 桶容量可设定为最大突发请求数,支持短时间突发。

在 Google 的 Guava 库中,RateLimiter 类就实现了一个基于令牌桶的限流器,它适用于:

  • 接口请求限速;

  • 后台批处理速率控制;

  • 防止后端系统过载等场景。

二、场景设计与目标

我们构建一个数据处理系统,包含以下三个核心组件:

  1. 数据生成器线程:以较高频率(每秒 2 条)不断将数据放入队列。

  2. 数据处理器线程:通过 RateLimiter 每秒只允许处理 1 条数据。

  3. 监控线程:每隔 2 秒打印队列中积压的数据数量,观测限流效果。

三、核心实现代码(Java)

使用 Guava 31+,Maven 依赖如下:

<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>31.1-jre</version>
</dependency>

1. 完整代码实现

package google;
​
import com.google.common.util.concurrent.RateLimiter;
​
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
​
public class TokenBucketDataProcessor {
​// 每秒最多处理5个数据private static final RateLimiter rateLimiter = RateLimiter.create(1);
​// 模拟数据管道private static final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
​public static void main(String[] args) {
​// 线程1:数据生成线程,每秒生成2条数据Thread producer = new Thread(() -> {int i = 0;while (true) {try {Thread.sleep(500); // 每100ms生成1条数据 -> 每秒10条String data = "data-" + (i++);queue.put(data);System.out.println("[Producer] Generated: " + data);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}});
​// 线程2:数据处理线程,受RateLimiter限流影响Thread consumer = new Thread(() -> {while (true) {try {rateLimiter.acquire(1); // 阻塞直到拿到令牌String data = queue.take(); // 取数据System.out.println("[Consumer] Processed: " + data);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}});
​// 线程3:监控线程,每2秒输出积压情况Thread monitor = new Thread(() -> {while (true) {try {Thread.sleep(2000);System.out.println("[Monitor] Queue size: " + queue.size());} catch (InterruptedException e) {Thread.currentThread().interrupt();}}});
​producer.start();consumer.start();monitor.setDaemon(true); // 守护线程monitor.start();}
}

四、运行效果分析

预期输出如下:

[Producer] Generated: data-0
[Consumer] Processed: data-0
[Producer] Generated: data-1
[Consumer] Processed: data-1
[Producer] Generated: data-2
[Consumer] Processed: data-2
[Monitor] Queue size: 0
[Producer] Generated: data-3
[Producer] Generated: data-4
[Consumer] Processed: data-3
[Producer] Generated: data-5
[Producer] Generated: data-6
[Consumer] Processed: data-4
[Monitor] Queue size: 2
[Producer] Generated: data-7
[Producer] Generated: data-8
[Consumer] Processed: data-5
[Producer] Generated: data-9
[Producer] Generated: data-10
[Consumer] Processed: data-6
[Monitor] Queue size: 4
[Producer] Generated: data-11
[Producer] Generated: data-12
[Consumer] Processed: data-7
[Producer] Generated: data-13
[Producer] Generated: data-14
[Consumer] Processed: data-8
[Monitor] Queue size: 6
... ...

从输出中可以看到:

  • 数据生产速度 > 数据处理速度;

  • queue.size() 会逐步增长,表明数据被限流处理;

  • RateLimiter.acquire() 自动阻塞了处理线程,使得处理速度不超过 2 qps;

  • 限流处理过程对系统其他线程无影响,灵活、安全、无锁。

五、应用建议

在需要处理数据流或消息流的系统中,控制处理速率是一项必要手段:

  • Guava 的 RateLimiter 实现了高效、线程安全、非阻塞或阻塞可控的令牌桶限流机制;

  • 利用其 acquire() 阻塞等待机制,我们可以方便地将限流逻辑嵌入处理线程中;

  • 搭配 BlockingQueue 和监控线程,我们可以直观观察限流效果,并对系统做动态调优;

  • 适用场景包括但不限于:

    • Kafka 消费处理限流

    • 多线程数据清洗任务控制速率

    • 日志写入、数据库插入频率控制

http://www.dtcms.com/a/298494.html

相关文章:

  • linux修改用户名和主目录及权限-linux029
  • 商品的create
  • 求职招聘小程序源码招聘小程序开发定制
  • 矩阵的极分解
  • [Dify] -进阶13- 使用“知识库 + 工作流”打造智能推荐系统
  • 网络安全基础知识【1】
  • PHP插件开发中的一个错误:JSON直接输出导致网站首页异常
  • 零碳园区如何破局?安科瑞EMS3.0以智慧能源管理重构低碳未来
  • 焊接机器人节能先锋
  • Seaborn可视化
  • MYOJ_8516:CSP初赛题单8:计算机语言和信息编码
  • 工作学习笔记(深圳xxx公司软件工程师助理)
  • Map学习笔记
  • 扫描对方是否开启局域网远程桌面
  • Windows安装压缩包形式的PostgreSQL
  • Python 列表排序:快速上手指南
  • Palindrome Reorder
  • 腾讯研究院:AI Coding引发编程范式革命
  • 微信小程序动态切换窗口主题色
  • 多智能体强化学习入门:从基础到 IPPO 算法—强化学习(20)
  • 2507C++,C++协程的发送者
  • 浅谈生成式AI语言模型的现状与展望
  • haproxy七层代理(原理)
  • SawtoothSoftware 模板注入漏洞复现(CVE-2025-34300)
  • 8.异常处理:优雅地处理错误
  • ISIS高级特性GR
  • Springboot+activiti启动时报错XMLException: Error reading XML
  • 优思学院|QC七大手法之一的检查表应如何有效使用?
  • 【unitrix】 6.15 “非零非负一“的整数类型(NonZeroNonMinusOne)特质(non_zero_non_minus_one.rs)
  • 亚马逊广告策略:如何平衡大词和长尾词的效果?