当前位置: 首页 > wzjs >正文

别人网站的字体网站推广岗位的要求

别人网站的字体,网站推广岗位的要求,西安搬家公司电话大全,旅游网站的建设依据和背景文章目录 Reactor RabbitMQ 简介Reactor RabbitMQ核心特性使用方法添加依赖创建连接发送消息接收消息 高级配置消息确认模式错误处理集群监听(自动ACK)集群监听手动ACK 性能优化建议适用场景 Reactor RabbitMQ 简介 Reactor RabbitMQ 是 Reactor 项目的…

文章目录

      • Reactor RabbitMQ 简介
      • Reactor RabbitMQ核心特性
      • 使用方法
        • 添加依赖
        • 创建连接
        • 发送消息
        • 接收消息
      • 高级配置
        • 消息确认模式
        • 错误处理
        • 集群监听(自动ACK)
        • 集群监听手动ACK
      • 性能优化建议
      • 适用场景

Reactor RabbitMQ 简介

Reactor RabbitMQ 是 Reactor 项目的一部分,旨在提供基于 Project Reactor 的 RabbitMQ 响应式编程支持。它将 RabbitMQ 的消息队列功能与 Reactor 的非阻塞、背压友好特性结合,适用于高吞吐量的异步消息处理场景。
注意: Reactor RabbitMQ 是对原生amqp-client 的封装,同样性能强大简单易用。reactor-rabbitmq是spring-boot-starter-amqp 之外的另外一种选择。

维度AMQP-ClientReactor RabbitMQSpring Boot Starter AMQP
编程模型命令式、手动管理响应式、非阻塞声明式、自动配置
框架依赖ReactorSpring Boot
适用场景轻量级/非 Spring 项目响应式微服务Spring Boot 企业应用
资源管理手动自动自动
功能丰富度基础协议操作背压、高并发优化事务、确认、死信队列等
学习曲线中等(需理解 AMQP)高(需掌握 Reactor)低(Spring 生态友好)

Reactor RabbitMQ核心特性

  • 响应式流支持:基于 Reactor 的 FluxMono 实现消息的发布与订阅。
  • 背压管理:自动处理消费者与生产者之间的速率匹配。
  • 非阻塞 API:避免传统 RabbitMQ 客户端的线程阻塞问题。
  • 声明式配置:支持通过代码或配置文件定义队列、交换机和绑定。

使用方法

添加依赖

在 Maven 项目中添加以下依赖:

<dependency>  <groupId>io.projectreactor.rabbitmq</groupId>  <artifactId>reactor-rabbitmq</artifactId>  <version>1.5.6</version>  
</dependency>  
创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();  
connectionFactory.setHost("localhost");  
Sender sender = RabbitFlux.createSender(  Mono.fromCallable(() -> connectionFactory.newConnection())  
);  
Receiver receiver = RabbitFlux.createReceiver(  Mono.fromCallable(() -> connectionFactory.newConnection())  
);  
发送消息
sender.send(  Flux.just(new OutboundMessage(  "exchange-name",  "routing-key",  "Hello RabbitMQ".getBytes()  ))  
).subscribe();  
接收消息
receiver.consumeAutoAck("queue-name")  .map(delivery -> new String(delivery.getBody()))  .subscribe(System.out::println);  

高级配置

消息确认模式

支持自动确认(autoAck)和手动确认(manualAck):

receiver.consumeManualAck("queue-name")  .delayUntil(delivery ->  delivery.ack()  .thenReturn(delivery.getBody())  )  .subscribe();  
错误处理

通过 Reactor 的 onError 机制处理异常:

sender.send(messages)  .doOnError(e -> System.err.println("Send failed: " + e))  .retry(3)  .subscribe();  

集群监听(自动ACK)
 // 1. 配置集群连接ReceiverOptions receiverOptions = new ReceiverOptions().connectionFactory(new ConnectionFactory() {{setUsername("guest");setPassword("guest");}}).connectionSupplier(cf -> cf.newConnection(new Address[]{new Address("localhost", 5672),new Address("localhost", 5673),new Address("localhost", 5674)},"reactive-cluster"));// 2. 创建 ReceiverReceiver receiver = RabbitFlux.createReceiver(receiverOptions);// 监听队列(自动负载均衡)receiver.consumeAutoAck("queue1") // 队列名(需在集群中预先创建).subscribe(delivery -> {String message = new String(delivery.getBody());System.out.println("收到消息: " + message);},error -> System.err.println("监听错误: " + error));// 保持程序运行Mono.never().block();
集群监听手动ACK
// 1. 配置集群连接ReceiverOptions receiverOptions = new ReceiverOptions().connectionFactory(new ConnectionFactory() {{setUsername("guest");setPassword("guest");}}).connectionSupplier(cf -> cf.newConnection(new Address[]{new Address("localhost", 5672),new Address("localhost", 5673),new Address("localhost", 5674)},"reactive-cluster"));// 2. 创建 ReceiverReceiver receiver = RabbitFlux.createReceiver(receiverOptions);// 消费消息并手动ACKreceiver.consumeManualAck("queue1").flatMap(delivery -> {try {String message = new String(delivery.getBody());log.info("received message:" + message);// 业务逻辑处理...boolean success = false;int i = RandomUtil.randomInt();if (i % 2 == 0) {success = true;}if (success) {log.info("ack success");// 处理成功,手动ACKreturn Mono.fromRunnable(() -> delivery.ack()).thenReturn("ACK");} else {log.info("ack fail");// 处理失败,手动NACK(可选择重试或丢弃)return Mono.fromRunnable(() -> delivery.nack(true)) // false表示不重新入队.thenReturn("NACK");}} catch (Exception e) {// 异常情况,NACK并可选择重试delivery.nack(true); // true表示重新入队return Mono.error(e);}}).subscribe(result -> log.info("Message processed:" + result),error -> log.info("Error:" + error));// 保持程序运行Mono.never().block();

性能优化建议

  • 连接复用:避免频繁创建/关闭连接,使用 Mono 缓存连接。
  • 批量发送:通过 Flux.buffer() 合并多条消息后一次性发送。
  • 线程池调优:自定义 Scheduler 以匹配业务场景的并发需求。

适用场景

  • 微服务间的异步通信。
  • 事件驱动的数据处理流水线。
  • 需要高吞吐量和低延迟的消息系统。

如需进一步功能(如事务、RPC 模式),可参考官方文档或源码示例。

http://www.dtcms.com/wzjs/585379.html

相关文章:

  • 文明网站建设情况手工活外包加工官方网
  • 网站建设风格要求中国建设银行官网站e路护航
  • 个人印章在线制作网站河北智能网站建设平台
  • 建门户网站要多少钱装修公司展厅布置方案
  • 南部县建设局网站成都高速公路网站建设招标
  • 建一个网站需要做什么的呼伦贝尔建设网站
  • 网站开发技术有网页设计网站设计欣赏
  • 网站设计技巧苏州做网站费用明细
  • 图片瀑布流网站模板网站备案 排名影响
  • h5免费制作网站有哪些中国十大采购平台app
  • 成都哪家做网站最好网站开发的意义和作用
  • wordpress手机菜单导航代码外贸seo是什么意思啊
  • 外贸网站源码免费蓝色 宽屏 网站 模板
  • 二级学院网站建设自评报告在线手机网站建设
  • 营销网站建设推广网页 网站 站点的区别
  • 网站建设 项目书 框架网站运营费用预算
  • 免费做网站排名wordpress例行维护
  • 一流的南京网站建设宝塔装wordpress
  • 网站开发维护合同书江苏省住房和建设厅网站首页
  • 制作一号店网站公司装修怎么做账
  • 公司网站建设外包流程图南宁手机网站建设
  • 网站优化文档网站建设部门管理制度
  • 做词云的网站昆山建设工程交易网站
  • 网站优化建设公司宣讲网站建设
  • 鄂尔多斯做网站开封市住房和城乡建设局
  • 设计企业网站主页图片wordpress 仿站步骤
  • wordpress 目录排序模板网站可以优化吗
  • 温州网站开发培训玩具外贸网站
  • 网站建设知名企业石家庄百度seo
  • 代做毕业设计网站现成双流网站建设