使用reactor-rabbitmq库监听Rabbitmq
文章目录
- Reactor RabbitMQ 简介
- Reactor RabbitMQ核心特性
- 使用方法
- 添加依赖
- 创建连接
- 发送消息
- 接收消息
- 高级配置
- 消息确认模式
- 错误处理
- 集群监听(自动ACK)
- 集群监听手动ACK
- 性能优化建议
- 适用场景
Reactor RabbitMQ 简介
Reactor RabbitMQ 是 Reactor 项目的一部分,旨在提供基于 Project Reactor 的 RabbitMQ 响应式编程支持。它将 RabbitMQ 的消息队列功能与 Reactor 的非阻塞、背压友好特性结合,适用于高吞吐量的异步消息处理场景。
注意: Reactor RabbitMQ 是对原生amqp-client 的封装,同样性能强大简单易用。reactor-rabbitmq是spring-boot-starter-amqp 之外的另外一种选择。
维度 | AMQP-Client | Reactor RabbitMQ | Spring Boot Starter AMQP |
---|---|---|---|
编程模型 | 命令式、手动管理 | 响应式、非阻塞 | 声明式、自动配置 |
框架依赖 | 无 | Reactor | Spring Boot |
适用场景 | 轻量级/非 Spring 项目 | 响应式微服务 | Spring Boot 企业应用 |
资源管理 | 手动 | 自动 | 自动 |
功能丰富度 | 基础协议操作 | 背压、高并发优化 | 事务、确认、死信队列等 |
学习曲线 | 中等(需理解 AMQP) | 高(需掌握 Reactor) | 低(Spring 生态友好) |
Reactor RabbitMQ核心特性
- 响应式流支持:基于 Reactor 的
Flux
和Mono
实现消息的发布与订阅。 - 背压管理:自动处理消费者与生产者之间的速率匹配。
- 非阻塞 API:避免传统 RabbitMQ 客户端的线程阻塞问题。
- 声明式配置:支持通过代码或配置文件定义队列、交换机和绑定。
使用方法
添加依赖
在 Maven 项目中添加以下依赖:
<dependency> <groupId>io.projectreactor.rabbitmq</groupId> <artifactId>reactor-rabbitmq</artifactId> <version>1.5.6</version>
</dependency>
创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Sender sender = RabbitFlux.createSender( Mono.fromCallable(() -> connectionFactory.newConnection())
);
Receiver receiver = RabbitFlux.createReceiver( Mono.fromCallable(() -> connectionFactory.newConnection())
);
发送消息
sender.send( Flux.just(new OutboundMessage( "exchange-name", "routing-key", "Hello RabbitMQ".getBytes() ))
).subscribe();
接收消息
receiver.consumeAutoAck("queue-name") .map(delivery -> new String(delivery.getBody())) .subscribe(System.out::println);
高级配置
消息确认模式
支持自动确认(autoAck
)和手动确认(manualAck
):
receiver.consumeManualAck("queue-name") .delayUntil(delivery -> delivery.ack() .thenReturn(delivery.getBody()) ) .subscribe();
错误处理
通过 Reactor 的 onError
机制处理异常:
sender.send(messages) .doOnError(e -> System.err.println("Send failed: " + e)) .retry(3) .subscribe();
集群监听(自动ACK)
// 1. 配置集群连接ReceiverOptions receiverOptions = new ReceiverOptions().connectionFactory(new ConnectionFactory() {{setUsername("guest");setPassword("guest");}}).connectionSupplier(cf -> cf.newConnection(new Address[]{new Address("localhost", 5672),new Address("localhost", 5673),new Address("localhost", 5674)},"reactive-cluster"));// 2. 创建 ReceiverReceiver receiver = RabbitFlux.createReceiver(receiverOptions);// 监听队列(自动负载均衡)receiver.consumeAutoAck("queue1") // 队列名(需在集群中预先创建).subscribe(delivery -> {String message = new String(delivery.getBody());System.out.println("收到消息: " + message);},error -> System.err.println("监听错误: " + error));// 保持程序运行Mono.never().block();
集群监听手动ACK
// 1. 配置集群连接ReceiverOptions receiverOptions = new ReceiverOptions().connectionFactory(new ConnectionFactory() {{setUsername("guest");setPassword("guest");}}).connectionSupplier(cf -> cf.newConnection(new Address[]{new Address("localhost", 5672),new Address("localhost", 5673),new Address("localhost", 5674)},"reactive-cluster"));// 2. 创建 ReceiverReceiver receiver = RabbitFlux.createReceiver(receiverOptions);// 消费消息并手动ACKreceiver.consumeManualAck("queue1").flatMap(delivery -> {try {String message = new String(delivery.getBody());log.info("received message:" + message);// 业务逻辑处理...boolean success = false;int i = RandomUtil.randomInt();if (i % 2 == 0) {success = true;}if (success) {log.info("ack success");// 处理成功,手动ACKreturn Mono.fromRunnable(() -> delivery.ack()).thenReturn("ACK");} else {log.info("ack fail");// 处理失败,手动NACK(可选择重试或丢弃)return Mono.fromRunnable(() -> delivery.nack(true)) // false表示不重新入队.thenReturn("NACK");}} catch (Exception e) {// 异常情况,NACK并可选择重试delivery.nack(true); // true表示重新入队return Mono.error(e);}}).subscribe(result -> log.info("Message processed:" + result),error -> log.info("Error:" + error));// 保持程序运行Mono.never().block();
性能优化建议
- 连接复用:避免频繁创建/关闭连接,使用
Mono
缓存连接。 - 批量发送:通过
Flux.buffer()
合并多条消息后一次性发送。 - 线程池调优:自定义
Scheduler
以匹配业务场景的并发需求。
适用场景
- 微服务间的异步通信。
- 事件驱动的数据处理流水线。
- 需要高吞吐量和低延迟的消息系统。
如需进一步功能(如事务、RPC 模式),可参考官方文档或源码示例。