当前位置: 首页 > news >正文

RabbitMQ--批量处理

一、RabbitMQ 原生批量处理(Java 客户端)

1.1 原生 RabbitMQ 不支持真正意义上的“批量监听消费”,但支持“批量确认”

// 原生消费:注册一个消费者
channel.basicConsume("queue_name", false, (consumerTag, message) -> {// 处理单条消息System.out.println("收到消息: " + new String(message.getBody()));// 手动单条确认channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}, consumerTag -> {});

1.2 批量确认两种方式

// 逐条确认(循环中)basicAck第二个参数是false
for (Message msg : messages) {channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
}// 批量确认(参数要是为false只确认最后一条)
//basicAck第二个参数是true
//basicAck第二个参数为true时,确认最后一条也就确认了lastTag 前的所有
long lastTag = 0;
for (Message msg : messages) {lastTag = msg.getMessageProperties().getDeliveryTag();
}
channel.basicAck(lastTag, true);
Ack方式调用位置含义
basicAck(tag, false)循环内每条都调单条确认
basicAck(lastTag, true)循环外调用一次批量确认所有未确认消息

二、Spring Boot 批量消费配置

2.1 方式一:yml形式

(1)  使用 YAML 自动配置方式
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestlistener:simple:acknowledge-mode: auto          # 自动确认(成功执行方法即确认)prefetch: 100                   # 消费端每次最多接收 100 条未确认消息(限流)consumer-batch-enabled: true    # ✅ 开启批量消费,方法参数可为 List<T>batch-size: 100                 # 每次最多批量拉取 100 条消息

✅ 注意:

  • consumer-batch-enabled: true 是开启批量消费的关键,没有它就不能使用 List<T> 参数!

  • 仅设置 prefetch 是 RabbitMQ 的限流控制,不等于开启批量消费。

  • batch-size 控制 Spring 每次最多拉多少条消息。


(2) 配置后批量消费的消费者示例
@Component
public class BatchConsumer {@RabbitListener(queues = "test_batch_queue")public void receiveBatch(List<Message> messages, Channel channel) throws IOException {System.out.println("批量接收消息,数量:" + messages.size());long lastTag = 0;for (Message msg : messages) {String body = new String(msg.getBody());System.out.println("消费消息:" + body);lastTag = msg.getMessageProperties().getDeliveryTag();}// 批量确认channel.basicAck(lastTag, true); // 确认所有消息}
}

❗ 如果改为 channel.basicAck(tag, false),必须放在 for 循环中每条都确认。


2.2 方式二:自定义监听容器工厂方式

(1) Java配置方式(替代 YAML 配置)
@Configuration
public class RabbitListenerConfig {@Beanpublic SimpleRabbitListenerContainerFactory batchFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setBatchListener(true);                      // 开启批量消费factory.setBatchSize(100);                           // 每批最大数量factory.setPrefetchCount(100);                       // 限流数量factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认return factory;}
}
(2) 批量消费者绑定工厂
@Component
public class BatchConsumer {@RabbitListener(queues = "test_batch_queue", containerFactory = "batchFactory")public void receiveBatch(List<Message> messages, Channel channel) throws IOException {System.out.println("批量接收消息,数量:" + messages.size());long lastTag = 0;for (Message msg : messages) {lastTag = msg.getMessageProperties().getDeliveryTag();System.out.println("消费:" + new String(msg.getBody()));}channel.basicAck(lastTag, true);}
}

四、关键点

 4.1 只使用 prefetch: 100,是不是不能使 @RabbitListener 方法参数支持 List<T>

✅ 回答:

是的,仅设置 prefetch: 100 不会启用批量消费模式,也就不能让 @RabbitListener 方法参数变成 List<T>


✅ 原因详解:
配置项功能说明
prefetch: 100RabbitMQ 允许消费者最多缓存 100 条未确认消息(限流)
控制的是消费速度,不影响监听方法接收消息的格式
consumer-batch-enabled: true开启 Spring 批量消费功能,允许 @RabbitListener 接收 List<T> 参数

🧠 总结:
场景是否可用 List<T> 接收消息是否批量拉取是否批量确认
只配置 prefetch: 100❌ 只能逐条消费✅ 是预取多个,但逐条进入监听方法❌ 每条单独确认(由 Spring 控制)
配置 consumer-batch-enabled: truebatch-size > 1✅ 支持 List<T> 参数✅ 批量拉取✅ 批量确认或手动确认

✅ 示例对比:
❌【只设置 prefetch: 100,不能用 List】
spring:rabbitmq:listener:simple:prefetch: 100       # ✅ 控制未确认的消息上限# ⚠️ 没有 consumer-batch-enabled

监听方法必须是单条消费:

@RabbitListener(queues = "test_queue")
public void receive(String message) {System.out.println("收到单条消息: " + message);
}

✅【设置 batch 消费,才能用 List】
spring:rabbitmq:listener:simple:prefetch: 100consumer-batch-enabled: truebatch-size: 100

监听方法支持批量消费:

@RabbitListener(queues = "test_batch_queue")
public void receiveBatch(List<String> messages) {System.out.println("批量消息数量:" + messages.size());
}

✅ 结论:

只有配置了 consumer-batch-enabled: true 才能让监听方法支持 List<T> 批量消费形式。

        光设置 prefetch: 100 不行,它只是 RabbitMQ 的限流控制,不影响消息的接收方式。

4.2  三种组合差异详解(prefetch 、consumer-batch-enabled、batch-size)

🔍 各配置项含义
配置项含义
prefetch限流参数:RabbitMQ 向消费者最多投递多少未确认的消息(即使你没处理完)
consumer-batch-enabled是否启用批量消费(也就是说方法能不能用 List<T> 参数)
batch-sizeSpring 每次最多从 RabbitMQ 拉取多少条消息之后才调用一次方法
🧠 关键区别详解
对比项prefetch + consumer-batch-enabledprefetch + consumer-batch-enabled + batch-size
是否批量消费✅ 是(支持 List<T>✅ 是
批次大小默认值默认为 1(即最多 1 条触发一次方法)明确设置为 100
方法执行频率多次执行(每接收到 1 条消息即调用一次)每接收到 100 条才执行一次方法
拉取行为每条都拉但每条都立刻触发方法拉够再触发
效率❌ 不高(虽是 List,但每次只有 1 条)✅ 高(真正意义的批量处理)

✅ 举例说明
配置一(没写 batch-size):
prefetch: 100
consumer-batch-enabled: true

RabbitMQ 会发最多 100 条消息,但 Spring 每收到 1 条就调用一次方法,每次方法里的 List<Message> 只有 1 条数据 → 不是你想象中的“批量处理”。


配置二(加了 batch-size):
consumer-batch-enabled: true
prefetch: 100
batch-size: 100

RabbitMQ 会最多发 100 条,Spring 每次拉够 100 条后才调用一次方法,List<Message> 是 100 条 → 真正的“批量消费”。

http://www.dtcms.com/a/291806.html

相关文章:

  • 【Zephyr开发实践系列】09_LittleFs文件系统操作
  • 在easyui中如何自定义表格里面的内容
  • 目标检测系列(六)labelstudio实现自动化标注
  • vue2 webpack 部署二级目录、根目录nginx配置及打包配置调整
  • 容器化部署 Tomcat + MySQL 实战指南:从入门到进阶
  • MongoDB数据库详解-针对大型分布式项目采用的原因以及基础原理和发展-卓伊凡|贝贝|莉莉
  • 架构演进核心路线:从离线仓库到实时湖仓一体
  • LLM评测框架Ragas Agents or Tool Use Cases指标(解决了Ollama推理框架不支持的问题)
  • 微软徽标认证是什么?如何快速获取驱动签名?
  • Linux操作系统从入门到实战(十二)Linux操作系统第一个程序(进度条)
  • 【用户管理】usermod设置主组和附加组(三)
  • es搜索实现既能模糊查询又能分词查询
  • [Dify] -进阶10- Dify 的用户输入结构:变量、参数、文件上传全解析
  • stm32 智能小车
  • 【多线程篇22】:ConcurrentHashMap的并发安全原理剖析
  • 低成本、高泛化能力的无人机自主飞行!VLM-Nav:基于单目视觉与视觉语言模型的无地图无人机导航
  • C++类和对象(3)
  • 从零搭建 OpenCV 项目(新手向)--第一天初识OpenCV与图像基础
  • MCP:Cline+DeepSeek在VSCode上配置
  • TDengine 计算百分位函数使用手册
  • .net web 中如何优雅地使用 redis?
  • MFC类Qt的自动布局框架
  • 景区负氧离子监测设备:守护清新,赋能旅游
  • 【kubernetes】-2 K8S的资源管理
  • 学习笔记-关于中华心法问答系统的环境配置和源代码理解
  • 基于Vue+ElementUI的借还款利息计算器
  • Java 动态导出 Word 登记表:多人员、分页、动态表格的最佳实践
  • SpringBoot集成PDFBox实现PDF导出(表格导出、分页页码、电子签章与数字签名)
  • RISC-V基金会Datacenter SIG月会圆满举办,探讨RAS、PMU性能分析实践和经验
  • Rust实战:决策树与随机森林实现