当前位置: 首页 > wzjs >正文

soho需要建网站吗优化设计六年级下册语文答案

soho需要建网站吗,优化设计六年级下册语文答案,网站建设bbs,房屋装修案例文章目录 Reactor RabbitMQ 简介Reactor RabbitMQ核心特性使用方法添加依赖创建连接发送消息接收消息 高级配置消息确认模式错误处理集群监听(自动ACK)集群监听手动ACK 性能优化建议适用场景 Reactor RabbitMQ 简介 Reactor RabbitMQ 是 Reactor 项目的…

文章目录

      • Reactor RabbitMQ 简介
      • Reactor RabbitMQ核心特性
      • 使用方法
        • 添加依赖
        • 创建连接
        • 发送消息
        • 接收消息
      • 高级配置
        • 消息确认模式
        • 错误处理
        • 集群监听(自动ACK)
        • 集群监听手动ACK
      • 性能优化建议
      • 适用场景

Reactor RabbitMQ 简介

Reactor RabbitMQ 是 Reactor 项目的一部分,旨在提供基于 Project Reactor 的 RabbitMQ 响应式编程支持。它将 RabbitMQ 的消息队列功能与 Reactor 的非阻塞、背压友好特性结合,适用于高吞吐量的异步消息处理场景。
注意: Reactor RabbitMQ 是对原生amqp-client 的封装,同样性能强大简单易用。reactor-rabbitmq是spring-boot-starter-amqp 之外的另外一种选择。

维度AMQP-ClientReactor RabbitMQSpring Boot Starter AMQP
编程模型命令式、手动管理响应式、非阻塞声明式、自动配置
框架依赖ReactorSpring Boot
适用场景轻量级/非 Spring 项目响应式微服务Spring Boot 企业应用
资源管理手动自动自动
功能丰富度基础协议操作背压、高并发优化事务、确认、死信队列等
学习曲线中等(需理解 AMQP)高(需掌握 Reactor)低(Spring 生态友好)

Reactor RabbitMQ核心特性

  • 响应式流支持:基于 Reactor 的 FluxMono 实现消息的发布与订阅。
  • 背压管理:自动处理消费者与生产者之间的速率匹配。
  • 非阻塞 API:避免传统 RabbitMQ 客户端的线程阻塞问题。
  • 声明式配置:支持通过代码或配置文件定义队列、交换机和绑定。

使用方法

添加依赖

在 Maven 项目中添加以下依赖:

<dependency>  <groupId>io.projectreactor.rabbitmq</groupId>  <artifactId>reactor-rabbitmq</artifactId>  <version>1.5.6</version>  
</dependency>  
创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();  
connectionFactory.setHost("localhost");  
Sender sender = RabbitFlux.createSender(  Mono.fromCallable(() -> connectionFactory.newConnection())  
);  
Receiver receiver = RabbitFlux.createReceiver(  Mono.fromCallable(() -> connectionFactory.newConnection())  
);  
发送消息
sender.send(  Flux.just(new OutboundMessage(  "exchange-name",  "routing-key",  "Hello RabbitMQ".getBytes()  ))  
).subscribe();  
接收消息
receiver.consumeAutoAck("queue-name")  .map(delivery -> new String(delivery.getBody()))  .subscribe(System.out::println);  

高级配置

消息确认模式

支持自动确认(autoAck)和手动确认(manualAck):

receiver.consumeManualAck("queue-name")  .delayUntil(delivery ->  delivery.ack()  .thenReturn(delivery.getBody())  )  .subscribe();  
错误处理

通过 Reactor 的 onError 机制处理异常:

sender.send(messages)  .doOnError(e -> System.err.println("Send failed: " + e))  .retry(3)  .subscribe();  

集群监听(自动ACK)
 // 1. 配置集群连接ReceiverOptions receiverOptions = new ReceiverOptions().connectionFactory(new ConnectionFactory() {{setUsername("guest");setPassword("guest");}}).connectionSupplier(cf -> cf.newConnection(new Address[]{new Address("localhost", 5672),new Address("localhost", 5673),new Address("localhost", 5674)},"reactive-cluster"));// 2. 创建 ReceiverReceiver receiver = RabbitFlux.createReceiver(receiverOptions);// 监听队列(自动负载均衡)receiver.consumeAutoAck("queue1") // 队列名(需在集群中预先创建).subscribe(delivery -> {String message = new String(delivery.getBody());System.out.println("收到消息: " + message);},error -> System.err.println("监听错误: " + error));// 保持程序运行Mono.never().block();
集群监听手动ACK
// 1. 配置集群连接ReceiverOptions receiverOptions = new ReceiverOptions().connectionFactory(new ConnectionFactory() {{setUsername("guest");setPassword("guest");}}).connectionSupplier(cf -> cf.newConnection(new Address[]{new Address("localhost", 5672),new Address("localhost", 5673),new Address("localhost", 5674)},"reactive-cluster"));// 2. 创建 ReceiverReceiver receiver = RabbitFlux.createReceiver(receiverOptions);// 消费消息并手动ACKreceiver.consumeManualAck("queue1").flatMap(delivery -> {try {String message = new String(delivery.getBody());log.info("received message:" + message);// 业务逻辑处理...boolean success = false;int i = RandomUtil.randomInt();if (i % 2 == 0) {success = true;}if (success) {log.info("ack success");// 处理成功,手动ACKreturn Mono.fromRunnable(() -> delivery.ack()).thenReturn("ACK");} else {log.info("ack fail");// 处理失败,手动NACK(可选择重试或丢弃)return Mono.fromRunnable(() -> delivery.nack(true)) // false表示不重新入队.thenReturn("NACK");}} catch (Exception e) {// 异常情况,NACK并可选择重试delivery.nack(true); // true表示重新入队return Mono.error(e);}}).subscribe(result -> log.info("Message processed:" + result),error -> log.info("Error:" + error));// 保持程序运行Mono.never().block();

性能优化建议

  • 连接复用:避免频繁创建/关闭连接,使用 Mono 缓存连接。
  • 批量发送:通过 Flux.buffer() 合并多条消息后一次性发送。
  • 线程池调优:自定义 Scheduler 以匹配业务场景的并发需求。

适用场景

  • 微服务间的异步通信。
  • 事件驱动的数据处理流水线。
  • 需要高吞吐量和低延迟的消息系统。

如需进一步功能(如事务、RPC 模式),可参考官方文档或源码示例。

http://www.dtcms.com/wzjs/489211.html

相关文章:

  • 义务教育标准化建设网站网站分析工具
  • 淮安新网站制作东莞优化排名推广
  • 手机商城网站设计南城网站优化公司
  • 济南网站建设询问企优互联价低淘宝seo排名优化软件
  • 学做ps的网站有哪些百度服务
  • 做游戏网站用什么系统做网络营销方案如何写
  • 网站建设 图片优化网站搜索
  • 暴雪战网官方网站入口app下载推广平台
  • 怎么查找网站的根目录百度下载app下载
  • 如何做ps4游戏视频网站做网页怎么做
  • 做网站找哪家又便宜又好中国搜索引擎市场份额
  • 网站制作与建设与网页制作网络推广团队哪家好
  • 给别人做网站多少钱百度q3财报2022
  • 免费网络空间搜索引擎百度爱采购关键词优化
  • 网站建设的渠道策略优秀营销软文100篇
  • 余姚网站推广策划案网站查询ip地址查询
  • 淘宝支持做微交易网站吗重庆森林为什么不能看
  • 提交网站引擎优化seo是什么
  • 同城购物网站怎么做长尾关键词排名工具
  • WordPress程序APP制作seo文章关键词怎么优化
  • 用手机制作网站的软件杭州网站seo优化
  • 广东营销式网站百色seo关键词优化公司
  • 杭州杭州网站建设如何免费注册一个网站
  • 漳州手机网站建设免费外链网站seo发布
  • 经营性网站备案时间手机百度app下载安装
  • 重庆网站建设排名聊城seo整站优化报价
  • 武汉做网站 古凡中国万网
  • 质监站网址怎么开自己的网站
  • 淘宝联盟的购物网站怎么做推广网络营销案例
  • 南昌网站做提高工作效率的句子