当前位置: 首页 > news >正文

使用reactor-rabbitmq库监听Rabbitmq

文章目录

      • Reactor RabbitMQ 简介
      • Reactor RabbitMQ核心特性
      • 使用方法
        • 添加依赖
        • 创建连接
        • 发送消息
        • 接收消息
      • 高级配置
        • 消息确认模式
        • 错误处理
        • 集群监听(自动ACK)
        • 集群监听手动ACK
      • 性能优化建议
      • 适用场景

Reactor RabbitMQ 简介

Reactor RabbitMQ 是 Reactor 项目的一部分,旨在提供基于 Project Reactor 的 RabbitMQ 响应式编程支持。它将 RabbitMQ 的消息队列功能与 Reactor 的非阻塞、背压友好特性结合,适用于高吞吐量的异步消息处理场景。
注意: Reactor RabbitMQ 是对原生amqp-client 的封装,同样性能强大简单易用。reactor-rabbitmq是spring-boot-starter-amqp 之外的另外一种选择。

维度AMQP-ClientReactor RabbitMQSpring Boot Starter AMQP
编程模型命令式、手动管理响应式、非阻塞声明式、自动配置
框架依赖ReactorSpring Boot
适用场景轻量级/非 Spring 项目响应式微服务Spring Boot 企业应用
资源管理手动自动自动
功能丰富度基础协议操作背压、高并发优化事务、确认、死信队列等
学习曲线中等(需理解 AMQP)高(需掌握 Reactor)低(Spring 生态友好)

Reactor RabbitMQ核心特性

  • 响应式流支持:基于 Reactor 的 FluxMono 实现消息的发布与订阅。
  • 背压管理:自动处理消费者与生产者之间的速率匹配。
  • 非阻塞 API:避免传统 RabbitMQ 客户端的线程阻塞问题。
  • 声明式配置:支持通过代码或配置文件定义队列、交换机和绑定。

使用方法

添加依赖

在 Maven 项目中添加以下依赖:

<dependency>  <groupId>io.projectreactor.rabbitmq</groupId>  <artifactId>reactor-rabbitmq</artifactId>  <version>1.5.6</version>  
</dependency>  
创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();  
connectionFactory.setHost("localhost");  
Sender sender = RabbitFlux.createSender(  Mono.fromCallable(() -> connectionFactory.newConnection())  
);  
Receiver receiver = RabbitFlux.createReceiver(  Mono.fromCallable(() -> connectionFactory.newConnection())  
);  
发送消息
sender.send(  Flux.just(new OutboundMessage(  "exchange-name",  "routing-key",  "Hello RabbitMQ".getBytes()  ))  
).subscribe();  
接收消息
receiver.consumeAutoAck("queue-name")  .map(delivery -> new String(delivery.getBody()))  .subscribe(System.out::println);  

高级配置

消息确认模式

支持自动确认(autoAck)和手动确认(manualAck):

receiver.consumeManualAck("queue-name")  .delayUntil(delivery ->  delivery.ack()  .thenReturn(delivery.getBody())  )  .subscribe();  
错误处理

通过 Reactor 的 onError 机制处理异常:

sender.send(messages)  .doOnError(e -> System.err.println("Send failed: " + e))  .retry(3)  .subscribe();  

集群监听(自动ACK)
 // 1. 配置集群连接ReceiverOptions receiverOptions = new ReceiverOptions().connectionFactory(new ConnectionFactory() {{setUsername("guest");setPassword("guest");}}).connectionSupplier(cf -> cf.newConnection(new Address[]{new Address("localhost", 5672),new Address("localhost", 5673),new Address("localhost", 5674)},"reactive-cluster"));// 2. 创建 ReceiverReceiver receiver = RabbitFlux.createReceiver(receiverOptions);// 监听队列(自动负载均衡)receiver.consumeAutoAck("queue1") // 队列名(需在集群中预先创建).subscribe(delivery -> {String message = new String(delivery.getBody());System.out.println("收到消息: " + message);},error -> System.err.println("监听错误: " + error));// 保持程序运行Mono.never().block();
集群监听手动ACK
// 1. 配置集群连接ReceiverOptions receiverOptions = new ReceiverOptions().connectionFactory(new ConnectionFactory() {{setUsername("guest");setPassword("guest");}}).connectionSupplier(cf -> cf.newConnection(new Address[]{new Address("localhost", 5672),new Address("localhost", 5673),new Address("localhost", 5674)},"reactive-cluster"));// 2. 创建 ReceiverReceiver receiver = RabbitFlux.createReceiver(receiverOptions);// 消费消息并手动ACKreceiver.consumeManualAck("queue1").flatMap(delivery -> {try {String message = new String(delivery.getBody());log.info("received message:" + message);// 业务逻辑处理...boolean success = false;int i = RandomUtil.randomInt();if (i % 2 == 0) {success = true;}if (success) {log.info("ack success");// 处理成功,手动ACKreturn Mono.fromRunnable(() -> delivery.ack()).thenReturn("ACK");} else {log.info("ack fail");// 处理失败,手动NACK(可选择重试或丢弃)return Mono.fromRunnable(() -> delivery.nack(true)) // false表示不重新入队.thenReturn("NACK");}} catch (Exception e) {// 异常情况,NACK并可选择重试delivery.nack(true); // true表示重新入队return Mono.error(e);}}).subscribe(result -> log.info("Message processed:" + result),error -> log.info("Error:" + error));// 保持程序运行Mono.never().block();

性能优化建议

  • 连接复用:避免频繁创建/关闭连接,使用 Mono 缓存连接。
  • 批量发送:通过 Flux.buffer() 合并多条消息后一次性发送。
  • 线程池调优:自定义 Scheduler 以匹配业务场景的并发需求。

适用场景

  • 微服务间的异步通信。
  • 事件驱动的数据处理流水线。
  • 需要高吞吐量和低延迟的消息系统。

如需进一步功能(如事务、RPC 模式),可参考官方文档或源码示例。

http://www.dtcms.com/a/266589.html

相关文章:

  • Go中使用Google Authenticator
  • 东软8位MCU低功耗调试总结
  • 如何使用python识别出文件夹中全是图片合成的的PDF,并将其移动到指定文件夹
  • 【ASP.NET Core】REST与RESTful详解,从理论到实现
  • 当前主流AI智能代理框架对比分析报告
  • 分布式光伏监控系统防孤岛保护装置光功率预测
  • 【论文阅读】VARGPT-v1.1
  • Webpack构建工具
  • node.js下载教程
  • 机器学习数学基础与Python实现
  • 机器学习在智能建筑中的应用:能源管理与环境优化
  • 每日问题总结记录
  • 一、如何用MATLAB画一个三角形 代码
  • 基于AR和SLAM技术的商场智能导视系统技术原理详解
  • 京东小程序JS API仓颉改造实践
  • 深圳安锐科技发布国内首款4G 索力仪!让斜拉桥索力自动化监测更精准高效
  • 【centos8服务如何给服务器开发3306端口】
  • Python 中线程和进程在实际项目使用中的区别和联系
  • 解决HttpServletRequest无法获取@RequestBody修饰的参数
  • Java并发性能优化|读写锁与互斥锁解析
  • Python 中的可迭代对象与迭代器:原理与项目实战
  • 【Verilog】parameter、localparam和 `define的区别
  • Android View的绘制原理详解
  • 基于虚拟化技术的网闸安全交换:物理隔离时代的智能数据流通引擎
  • 最快实现的前端灰度方案
  • python打卡day58@浙大疏锦行
  • 算法19天|回溯算法:理论基础、组合、组合总和Ⅲ、电话号码的字母组合
  • 用原生 JS + Vue 实现一套可复用的前端错误监控系统
  • Python 机器学习核心入门与实战进阶 Day 2 - KNN(K-近邻算法)分类实战与调参
  • 【MATLAB代码】AOA与TDOA混合定位例程,适用于三维环境、4个锚点的情况,订阅专栏后可以获得完整代码