当前位置: 首页 > news >正文

Java研学-RabbitMQ(三)

一 消息通信协议

1 AMQP

  AMQP 是一个开放的、跨语言、跨平台的消息协议标准,用于在分布式系统中传递业务消息。它定义了消息队列的二进制协议格式和交互模型(如交换机、队列、绑定等),确保不同语言(Java、Python、C#等)和平台(RabbitMQ、Qpid等)的实现能够互操作。

2 Spring AMQP

  Spring AMQP 是 Spring 框架对 AMQP 协议(如 RabbitMQ)的抽象与封装,提供了一套简洁的 Java API 来简化消息的生产和消费。
  它通过AmqpTemplate模板类实现消息的快速发送(如自动序列化、路由键设置),结合@RabbitListener注解实现声明式的消费者监听,同时支持动态声明交换机、队列和绑定(通过 RabbitAdmin),并深度集成 Spring 生态(如依赖注入、事务管理)。
  其核心优势在于隐藏底层协议细节,让开发者仅需关注业务逻辑,无需手动处理连接、通道或消息确认等复杂操作,从而高效构建异步、解耦的分布式系统。

二 入门demo

1 项目路径图

在这里插入图片描述

2 交换机与队列准备

  创建虚拟主机/midhuang,使用其中的交换机amq.fanout,与队列xiaohuang.queue1进行绑定。

3 父工程导入依赖

  创建spring项目,为父工程导入依赖,两个子工程会会自动继承这些依赖

        <!-- Lombok:简化Java代码(如自动生成Getter/Setter--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- Spring AMQP + RabbitMQ 集成 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Spring Boot 单元测试 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>

4 各个微服务中配置文件

  ① consumer(消费者),5672是用来发消息的端口,15672是Web控制台

spring:rabbitmq:host: 192.168.44.128port: 5672 virtual-host: /midhuangusername: dahuangpassword: "dahuang66"
logging:level:cn.tj.consumer.listeners: DEBUG # 设置为 DEBUG 以查看详细日志

  ② publisher(生产者)

spring:rabbitmq:host: 192.168.44.128port: 5672virtual-host: /midhuangusername: dahuangpassword: "dahuang66"
logging:level:cn.tj.consumer.listeners: DEBUG # 设置为 DEBUG 以查看详细日志

5 测试类发送消息

  利用RabbitTemplate发送消息

@SpringBootTest
class PublisherApplicationTests {// 注入 RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;// 直接发送消息到队列@Testpublic void testHuangQueue() {// 1. 定义队列名称String queueName = ".queue";// 2. 定义消息内容String message = "hello, spring amqp!";// 3. 发送消息(默认路由到队列)rabbitTemplate.convertAndSend(queueName, message);System.out.println("消息发送成功: " + message);}
}

6 控制台检查刚刚发送消息

在这里插入图片描述

7 接收消息

  利用@RabbitListener注解声明要监听的队列,监听消息,此时消费者服务需启动保持监听状态

@Slf4j
@Component
public class MqListener {@RabbitListener(queues = "xiaohuang.queue1")public void listenHuangQueue(String msg) {log.info("收到消息: {}", msg);// 调试:打印消息长度和字节(如果 msg 为 null,会输出 "null")log.debug("消息长度: {}, 内容: {}", msg == null ? "null" : msg.length(), msg);}
}

三 Workqueues

  Workqueues(工作队列)是一种常见的任务分发模型,将多个消费者绑定到一个队列,共同消费队列中的消息,通过共享队列实现负载均衡,从而解决消息堆积问题。

1 创建队列 work.queue

在这里插入图片描述

2 添加两个消费者 – MqListener

@Slf4j
@Component
public class MqListener {@RabbitListener(queues = "work.queue")public void listenWorkQueue1(String msg) {log.info("1号消费者收到消息: {}", msg);}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String msg) {log.info("2号消费者收到消息: {}", msg);}
}

3 生产者连续发送消息 – PublisherApplicationTests

@SpringBootTest
class PublisherApplicationTests {// 注入 RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testWorkQueue() throws InterruptedException {String queueName = "work.queue";// 发送50条测试消息for (int i = 1; i <= 40; i++) {String msg = "hello" + i;rabbitTemplate.convertAndSend(queueName, msg);System.out.println(" [x] Sent '" + msg + "'");Thread.sleep(30); // 模拟延迟,避免消息爆发}}
}

4 控制台输出

  此时并未考虑到消费者的处理能力,RabbitMQ 默认使用 轮询(Round-Robin) 策略分发消息,这可能导致某些消费者积压大量未处理消息,而其他消费者空闲。
在这里插入图片描述

5 编写配置文件 – consumer

  prefetchCount:1像外卖小哥一次只接一单,平台会一次性给小哥派5 个订单(即使他还没处理完之前的订单)。如果小哥手慢(比如堵车),这 5 个订单都会卡在他手里,其他空闲的小哥却没订单可接。
  启用prefetchCount:1后:平台每次只给小哥派 1 个订单。小哥必须完成这个订单(送到顾客手里),平台才会派下一个订单。如果小哥 A 处理快,他很快能接下一个订单;小哥 B 处理慢,他手头的订单不会影响别人。

spring:rabbitmq:host: 192.168.44.128port: 5672virtual-host: /midhuangusername: dahuangpassword: "dahuang66"listener:simple:prefetch: 1
logging:level:cn.tj.consumer.listeners: DEBUG # 设置为 DEBUG 以查看详细日志

6 设置消费者处理速度 – MqListener

@Slf4j
@Component
public class MqListener {@RabbitListener(queues = "work.queue")public void listenWorkQueue1(String msg) throws InterruptedException {log.info("1号消费者收到消息: {}", msg);Thread.sleep(20);}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String msg) throws InterruptedException {log.debug("2号消费者收到消息: {}", msg);Thread.sleep(200);}
}

7 控制台输出

在这里插入图片描述

http://www.dtcms.com/a/297133.html

相关文章:

  • Centos7安装rabbitmq
  • RabbitMQ—HAProxy负载均衡
  • React性能优化终极指南:memo、useCallback、useMemo全解析
  • Ubuntu22 上,用C++ gSoap 创建一个简单的webservice
  • NineData 数据库 DevOps 全面支持 GaussDB,国产化管理再升级!
  • Spring Boot 自动装配底层源码实现详解
  • 国产DevOps平台Gitee:如何重塑中国企业研发效能新格局
  • Java 单元测试详解:从入门到实战,彻底掌握 JUnit 5 + Mockito + Spring Boot 测试技巧
  • react中 多个层级 组件数据同用 组件之间传值 usecontext useReducer
  • Gitee如何成为国内企业DevOps转型的首选平台?
  • 璞致 PZSDR-P101:ZYNQ7100+AD9361 架构软件无线电平台,重塑宽频信号处理范式
  • ERNIE-4.5-0.3B 实战指南:文心一言 4.5 开源模型的轻量化部署与效能跃升
  • 规则分配脚本
  • 初识JVM--从Java文件到机器指令
  • 中国开源Qwen3 Coder与Kimi K2哪个最适合编程
  • “磁”力全开:钕铁硼重塑现代科技生活
  • Linux 网络与 Vim 编辑器操作
  • 3D实景的概念、特点及应用场景
  • 从“人工眼”到‘智能眼’:EZ-Vision视觉系统如何重构生产线视觉检测精度?
  • AI与区块链融合:2025年的技术革命与投资机遇
  • C++与Hive、Spark、libhdfs、ACID交互技巧
  • Vue2下
  • VR 技术在污水处理领域的创新性应用探索​
  • C++ string:准 STL Container
  • 【03】C#入门到精通——C# 输出格式、内容拼接、if判断 、bool 表达式、函数封装调用
  • 【深度学习优化算法】09:Adadelta算法
  • MyBatis-Plus中使用BaseMapper实现基本CRUD
  • MinIO:云原生对象存储的终极指南
  • Qt 与 SQLite 嵌入式数据库开发
  • 云原生可观测-日志观测(Loki)最佳实践