当前位置: 首页 > news >正文

基于 Spring AMQP 的 RabbitMQ 分布式消息系统实战

在分布式系统中,服务间的解耦与异步通信是关键挑战。RabbitMQ 作为一款成熟的消息中间件,凭借其灵活的交换器模型(Direct/Fanout/Topic)、可靠的消息传递机制(持久化、确认机制)和丰富的客户端支持(Spring AMQP),成为解决分布式通信问题的首选方案。


一、核心概念:RabbitMQ 消息模式与关键组件

1.1 三种核心交换器模式

RabbitMQ 的消息路由能力由其支持的交换器类型决定,本文将重点实践以下三种模式:

模式特性适用场景
Direct精确路由:消息仅发送到路由键与绑定键完全匹配的队列一对一通信(如订单支付结果通知特定用户)
Fanout广播路由:消息发送到所有绑定的队列(忽略路由键)一对多广播(如系统维护通知所有服务节点)
Topic模式匹配路由:通过通配符(*匹配一个单词,#匹配零或多个单词)匹配路由键多维度分类通信(如商品价格变更通知关注该商品的用户)

1.2 关键组件解析

  • 队列(Queue):消息的存储容器,支持持久化(RabbitMQ 重启后保留消息)。
  • 交换器(Exchange):消息的路由枢纽,负责将消息根据规则分发到队列。
  • 绑定(Binding):队列与交换器之间的关联规则(路由键匹配策略)。
  • 消息确认(Publisher Confirm):生产者确认消息是否成功到达交换器。
  • 返回机制(Publisher Return):处理无法路由到任何队列的消息(如路由键错误)。

二、Spring AMQP 实战:从配置到消息流转

2.1 环境准备与依赖配置

2.1.1 Maven 依赖

pom.xml中添加 Spring AMQP 与 RabbitMQ 客户端依赖:

        <!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Boot Starter AMQP (RabbitMQ) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.dwl</groupId><artifactId>by_du_common</artifactId><version>city-luoYang</version></dependency>
2.1.2 RabbitMQ 连接配置

application.yml中配置 RabbitMQ 服务器连接信息:

spring:application:name: rabbit_mq_case# -------------------------------# RabbitMQ 服务器连接配置(核心)# -------------------------------rabbitmq:# RabbitMQ 服务器地址(必填)# - 本地开发:localhost 或 127.0.0.1# - 生产环境:域名(如 rabbitmq.prod.example.com)或 IP 地址# - 集群场景:可配置多个地址(通过逗号分隔,Spring 会自动负载均衡)host: localhost# AMQP 协议默认端口(非 SSL 场景)# - 标准端口:5672(AMQP 0.9.1)# - SSL 加密端口:5671(需启用 ssl.enabled=true)port: 5672# 认证信息(必填,除非 RabbitMQ 启用了匿名访问)# - 生产环境:建议使用专用账号(非 guest),并分配最小权限(如仅读写特定队列)username: guestpassword: guest# 虚拟主机(逻辑隔离资源,非必填,默认 "/")# - 作用:通过虚拟主机隔离不同业务/环境的队列、交换器等资源(如 "/order" 隔离订单业务)# - 注意:虚拟主机需在 RabbitMQ 服务器提前创建(通过管理界面或命令行)virtual-host: /# -------------------------------# 消费者监听器配置(simple 模式,最常用)# -------------------------------listener:simple:# 每次从队列预取的消息数量(关键性能调优参数)# - 含义:消费者启动时,一次性从队列获取 N 条消息(未确认前不会获取新消息)# - 默认值:1(保守策略,避免内存溢出)# - 生产建议:5-10(根据消息处理耗时调整,耗时越长,预取数可适当增大)prefetch: 1# 消息确认模式(决定消费者是否自动确认消息)# - auto:自动确认(消息被消费者接收后立即确认,RabbitMQ 标记为已消费)#   - 优点:简单高效,适合低可靠性要求的场景(如日志收集)#   - 缺点:若消费者处理消息时崩溃,消息会丢失(未手动确认)# - manual:手动确认(消费者处理完成后主动调用 channel.basicAck() 确认)#   - 优点:可靠,适合高可靠性场景(如订单支付、用户通知)#   - 缺点:需编写额外确认逻辑,可能增加延迟acknowledge-mode: auto

2.2 消息基础设施配置(RabbitMQConfig)

通过 @Configuration类定义队列、交换器和绑定关系,确保消息路由规则的正确性。

2.2.1 点对点模式(Direct)
    /*** 创建点对点模式(Direct Exchange)的持久化队列* 作用:定义一个持久化的队列,用于接收点对点模式下交换器路由的消息,确保 RabbitMQ 重启后队列不丢失** @return 队列实例(durable=true:RabbitMQ 重启后队列保留)*/@Bean  // 声明为 Spring Bean,自动注册到容器中,供其他组件注入使用public Queue directQueue() {/** QueueBuilder:Spring AMQP 提供的队列构建工具类,用于灵活定义队列属性(持久化、排他、自动删除等)* durable(true):设置队列的持久化属性为 true(关键参数)*   - durable:布尔值,true 表示队列持久化(RabbitMQ 服务器重启后队列仍存在);false 表示临时队列(服务器重启后自动删除)*   - 持久化队列适用于需要长期保留消息的场景(如订单记录、用户通知),避免因服务器故障导致消息丢失** DIRECT_QUEUE_NAME:静态常量(类中定义),表示队列的唯一名称(如 "direct_queue")*   - 队列名称需全局唯一,建议通过常量统一管理,提高代码可维护性*/return QueueBuilder.durable(DIRECT_QUEUE_NAME).build();}/*** 创建直接模式(Direct Exchange)的持久化交换器* 作用:定义一个直接交换器,用于根据路由键(Routing Key)精确路由消息到绑定的队列,确保 RabbitMQ 重启后交换器保留** @return 直接交换器实例(durable=true:交换机持久化;autoDelete=false:无绑定队列时不自动删除)*/@Bean  // 声明为 Spring Bean,自动注册到容器中,供其他组件注入使用public DirectExchange directExchange() {/** DirectExchange 构造函数参数说明:* 1. 名称(name):交换器的全局唯一标识符(如 "direct_exchange")*    - 需与绑定队列时指定的交换器名称一致(通过 @RabbitListener 或 bind 方法配置)*    - 建议通过静态常量统一管理(如类中定义的 DIRECT_EXCHANGE_NAME),提高可维护性** 2. durable(持久化):布尔值,表示交换器是否持久化(关键参数)*    - true:交换器持久化(RabbitMQ 服务器重启后,交换器元数据(名称、绑定关系)保留)*    - false:交换器临时(服务器重启后自动删除,适用于短期任务)*    - 作用:确保关键交换器在服务器故障后不丢失,保障消息路由的连续性** 3. autoDelete(自动删除):布尔值,表示交换器是否自动删除(关键参数)*    - true:当交换器无绑定的队列时,自动删除(避免残留无用交换器)*    - false:交换器不会自动删除(即使无绑定队列也保留,需显式删除)*    - 作用:生产环境中通常设置为 false,避免因临时断开队列导致交换器被误删** DIRECT_EXCHANGE_NAME:静态常量(类中定义),表示交换器的唯一名称(如 "direct_exchange")*/return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);}/*** 绑定点对点队列到直接交换器(Direct Exchange),实现精确路由* 作用:将指定队列与直接交换器绑定,仅当消息的路由键与绑定的路由键完全一致时,消息才会被路由到该队列* 特性:基于路由键的精确匹配(一对一路由),适用于需要精准投递消息的场景(如订单通知特定用户)** @return 绑定关系实例(Spring 管理的 Binding 对象,描述队列与直接交换器的精确路由规则)*/@Bean  // 声明为 Spring Bean,自动注册到容器中,供其他组件注入使用public Binding directBinding() {/** BindingBuilder:Spring AMQP 提供的绑定关系构建工具类,用于定义队列与交换器的精确绑定规则* bind(directQueue()):指定要绑定的队列(目标队列,接收精确路由的消息)*   - directQueue():需提前通过 @Bean 声明的点对点队列(如类中的 directQueue() 方法返回的队列)* to(directExchange()):指定目标交换器(消息来源的直接交换器)*   - directExchange():需提前通过 @Bean 声明的直接交换器(如类中的 directExchange() 方法返回的交换器)* with(DIRECT_ROUTING_KEY):指定绑定的路由键(关键参数)*   - 路由键(Routing Key):消息的“地址标签”,直接交换器仅将路由键与此值完全匹配的消息路由到该队列*   - 示例:若 DIRECT_ROUTING_KEY = "order.payment.success",则只有路由键为该值的消息会被路由到绑定的队列*/return BindingBuilder.bind(directQueue())  // 指定要绑定的点对点队列(接收精确路由的消息).to(directExchange())   // 指定目标直接交换器(消息来源的交换器).with(DIRECT_ROUTING_KEY);  // 指定精确匹配的路由键(仅匹配此键的消息会被路由)}
2.2.2 广播模式(Fanout)
   /*** 创建发布/订阅模式(Fanout Exchange)的持久化队列1* 作用:定义一个持久化的队列,用于接收 Fanout 交换器广播的消息(与点对点队列独立,专为广播场景设计)** @return 队列实例(durable=true:RabbitMQ 重启后队列保留;默认非排他、非自动删除)*/@Bean  // 声明为 Spring Bean,自动注册到容器中,供其他组件注入使用public Queue fanoutQueue1() {/** QueueBuilder:Spring AMQP 提供的队列构建工具类,用于灵活定义队列属性(持久化、排他、自动删除等)* durable(FANOUT_QUEUE1_NAME):设置队列的持久化属性为 true,并指定队列名称(关键参数)*   - durable:布尔值,true 表示队列持久化(RabbitMQ 服务器重启后,队列元数据和消息保留)*   - FANOUT_QUEUE1_NAME:静态常量(类中定义),表示队列的唯一名称(如 "fanout_queue1")** 队列的其他默认属性(未显式设置):* - exclusive:false(非排他队列,多个连接可同时访问)* - autoDelete:false(非自动删除队列,无消费者时仍保留)** 作用:确保队列在 RabbitMQ 重启后不丢失,且允许多个消费者并行接收广播消息*/return QueueBuilder.durable(FANOUT_QUEUE1_NAME).build();}/*** 创建发布/订阅模式(Fanout Exchange)的持久化队列2* 作用:定义一个独立于队列1的持久化队列,用于接收 Fanout 交换器广播的消息(与队列1接收相同广播内容)** @return 队列实例(durable=true:RabbitMQ 重启后队列保留;默认非排他、非自动删除)*/@Bean  // 声明为 Spring Bean,自动注册到容器中,供其他组件注入使用public Queue fanoutQueue2() {/** QueueBuilder:Spring AMQP 提供的队列构建工具类,用于灵活定义队列属性(持久化、排他、自动删除等)* durable(FANOUT_QUEUE2_NAME):设置队列的持久化属性为 true,并指定队列名称(关键参数)*   - durable:布尔值,true 表示队列持久化(RabbitMQ 服务器重启后,队列元数据和消息保留)*   - FANOUT_QUEUE2_NAME:静态常量(类中定义),表示队列的唯一名称(如 "fanout_queue2")** 队列的其他默认属性(未显式设置):* - exclusive:false(非排他队列,多个连接可同时访问)* - autoDelete:false(非自动删除队列,无消费者时仍保留)** 作用:确保队列在 RabbitMQ 重启后不丢失,且允许多个消费者并行接收广播消息*/return QueueBuilder.durable(FANOUT_QUEUE2_NAME).build();}/*** 创建扇出模式(Fanout)的持久化交换器* 作用:定义一个扇出交换器,用于将消息广播到所有绑定的队列(忽略路由键),确保 RabbitMQ 重启后交换器保留** @return 扇出交换器实例(durable=true:RabbitMQ 重启后交换器保留;autoDelete=false:无绑定队列时不自动删除)*/@Bean  // 声明为 Spring Bean,自动注册到容器中,供其他组件注入使用public FanoutExchange fanoutExchange() {/** FanoutExchange 构造函数参数说明:* 1. 名称(name):交换器的全局唯一标识符(如 "fanout_exchange")*    - 需与绑定队列时指定的交换器名称一致(通过 @RabbitListener 或 bind 方法配置)*    - 建议通过静态常量统一管理(如类中定义的 FANOUT_EXCHANGE_NAME),提高可维护性** 2. durable(持久化):布尔值,表示交换器是否持久化(关键参数)*    - true:交换器持久化(RabbitMQ 服务器重启后,交换器元数据(名称、绑定关系)保留)*    - false:交换器临时(服务器重启后自动删除,适用于短期任务)*    - 作用:确保关键交换器在服务器故障后不丢失,保障消息分发的连续性** 3. autoDelete(自动删除):布尔值,表示交换器是否自动删除(关键参数)*    - true:当交换器无绑定的队列时,自动删除(避免残留无用交换器)*    - false:交换器不会自动删除(即使无绑定队列也保留,需显式删除)*    - 作用:生产环境中通常设置为 false,避免因临时断开队列导致交换器被误删** FANOUT_EXCHANGE_NAME:静态常量(类中定义),表示交换器的唯一名称(如 "fanout_exchange")*/return new FanoutExchange(FANOUT_EXCHANGE_NAME, true, false);}/*** 绑定队列1到扇出交换器(Fanout Exchange),形成广播消息路由关系* 作用:将指定队列与扇出交换器绑定,使交换器接收的消息能广播到该队列(忽略路由键)** @return 绑定关系实例(Spring 管理的 Binding 对象,描述队列与交换器的绑定规则)*/@Bean  // 声明为 Spring Bean,自动注册到容器中,供其他组件注入使用public Binding fanoutBinding1() {/** BindingBuilder:Spring AMQP 提供的绑定关系构建工具类,用于定义队列与交换器的绑定规则* bind(Queue queue):指定要绑定的队列(目标队列,接收广播消息的队列)* to(Exchange exchange):指定目标交换器(消息来源的交换器)** 扇出交换器(Fanout Exchange)的绑定特性:* - 无需指定路由键(Routing Key):扇出交换器的核心机制是“广播”,会将接收到的消息发送到所有绑定的队列,与路由键无关* - 绑定后,交换器接收的任何消息都会被复制到该队列(无论消息的路由键是什么)** fanoutQueue1():被绑定的队列实例(需提前通过 @Bean 声明,如类中的 directQueue() 方法)* fanoutExchange():目标扇出交换器实例(需提前通过 @Bean 声明,如类中的 fanoutExchange() 方法)*/return BindingBuilder.bind(fanoutQueue1())  // 指定要绑定的队列(队列1).to(fanoutExchange());   // 指定目标扇出交换器(fanout_exchange)}/*** 绑定队列2到扇出交换器(Fanout Exchange),形成独立的广播消息路由关系* 作用:将指定队列(队列2)与扇出交换器绑定,使交换器接收的消息能广播到该队列(忽略路由键)* 特性:与队列1的绑定独立,两者均可接收同一交换器的广播消息,适用于多消费者并行处理场景** @return 绑定关系实例(Spring 管理的 Binding 对象,描述队列2与扇出交换器的绑定规则)*/@Bean  // 声明为 Spring Bean,自动注册到容器中,供其他组件注入使用public Binding fanoutBinding2() {/** BindingBuilder:Spring AMQP 提供的绑定关系构建工具类,用于定义队列与交换器的绑定规则* bind(fanoutQueue2()):指定要绑定的队列(目标队列,接收广播消息的队列2)* to(fanoutExchange()):指定目标交换器(消息来源的扇出交换器)** 扇出交换器(Fanout Exchange)的绑定特性:* - 无需指定路由键(Routing Key):扇出交换器的核心机制是“广播”,会将接收到的消息发送到所有绑定的队列,与路由键无关* - 绑定后,交换器接收的任何消息都会被复制到该队列(无论消息的路由键是什么)** fanoutQueue2():被绑定的队列实例(需提前通过 @Bean 声明,如类中的 fanoutQueue2() 方法)* fanoutExchange():目标扇出交换器实例(需提前通过 @Bean 声明,如类中的 fanoutExchange() 方法)*/return BindingBuilder.bind(fanoutQueue2())  // 指定要绑定的队列(队列2).to(fanoutExchange());   // 指定目标扇出交换器(fanout_exchange)}
2.2.3 主题模式(Topic)
/*** 创建主题模式(Topic Exchange)的持久化队列1* 作用:定义一个持久化的队列,用于接收 Topic 交换器广播的、路由键匹配特定主题模式的消息(如 "topic.key1")** @return 队列实例(durable=true:RabbitMQ 重启后队列保留;默认非排他、非自动删除)*/@Bean  // 声明为 Spring Bean,自动注册到容器中,供其他组件注入使用public Queue topicQueue1() {/** QueueBuilder:Spring AMQP 提供的队列构建工具类,用于灵活定义队列属性(持久化、排他、自动删除等)* durable(TOPIC_QUEUE1_NAME):设置队列的持久化属性为 true,并指定队列名称(关键参数)*   - durable:布尔值,true 表示队列持久化(RabbitMQ 服务器重启后,队列元数据和消息保留)*   - TOPIC_QUEUE1_NAME:静态常量(类中定义),表示队列的唯一名称(如 "topic_queue1")** 队列的其他默认属性(未显式设置):* - exclusive:false(非排他队列,多个连接可同时访问)* - autoDelete:false(非自动删除队列,无消费者时仍保留)** 作用:确保队列在 RabbitMQ 重启后不丢失,且能长期接收匹配主题模式的广播消息*/return QueueBuilder.durable(TOPIC_QUEUE1_NAME).build();}/*** 创建主题模式(Topic Exchange)的持久化队列2* 作用:定义一个独立于队列1的持久化队列,用于接收 Topic 交换器广播的、路由键匹配 `topic.key2.#` 模式的消息** @return 队列实例(durable=true:RabbitMQ 重启后队列保留;默认非排他、非自动删除)*/@Bean  // 声明为 Spring Bean,自动注册到容器中,供其他组件注入使用public Queue topicQueue2() {/** QueueBuilder:Spring AMQP 提供的队列构建工具类,用于灵活定义队列属性(持久化、排他、自动删除等)* durable(TOPIC_QUEUE2_NAME):设置队列的持久化属性为 true,并指定队列名称(关键参数)*   - durable:布尔值,true 表示队列持久化(RabbitMQ 服务器重启后,队列元数据和消息保留)*   - TOPIC_QUEUE2_NAME:静态常量(类中定义),表示队列的唯一名称(如 "topic_queue2")** 队列的其他默认属性(未显式设置):* - exclusive:false(非排他队列,多个连接可同时访问)* - autoDelete:false(非自动删除队列,无消费者时仍保留)** 作用:确保队列在 RabbitMQ 重启后不丢失,且能长期接收匹配 `topic.key2.#` 模式的广播消息*/return QueueBuilder.durable(TOPIC_QUEUE2_NAME).build();}/*** 创建主题模式(Topic Exchange)的持久化交换器* 作用:定义一个主题交换器,用于根据路由键的通配符模式(*、#)广播消息到绑定的队列,确保 RabbitMQ 重启后交换器保留** @return 主题交换器实例(durable=true:交换机持久化;autoDelete=false:无绑定队列时不自动删除)*/@Bean  // 声明为 Spring Bean,自动注册到容器中,供其他组件注入使用public TopicExchange topicExchange() {/** TopicExchange 构造函数参数说明:* 1. 名称(name):交换器的全局唯一标识符(如 "topic_exchange")*    - 需与绑定队列时指定的交换器名称一致(通过 @RabbitListener 或 bind 方法配置)*    - 建议通过静态常量统一管理(如类中定义的 TOPIC_EXCHANGE_NAME),提高可维护性** 2. durable(持久化):布尔值,表示交换器是否持久化(关键参数)*    - true:交换器持久化(RabbitMQ 服务器重启后,交换器元数据(名称、绑定关系)保留)*    - false:交换器临时(服务器重启后自动删除,适用于短期任务)*    - 作用:确保关键交换器在服务器故障后不丢失,保障消息路由的连续性** 3. autoDelete(自动删除):布尔值,表示交换器是否自动删除(关键参数)*    - true:当交换器无绑定的队列时,自动删除(避免残留无用交换器)*    - false:交换器不会自动删除(即使无绑定队列也保留,需显式删除)*    - 作用:生产环境中通常设置为 false,避免因临时断开队列导致交换器被误删** TOPIC_EXCHANGE_NAME:静态常量(类中定义),表示交换器的唯一名称(如 "topic_exchange")*/return new TopicExchange(TOPIC_EXCHANGE_NAME, true, false);}/*** 绑定队列1到主题交换机(Topic Exchange),实现精确路由键匹配* 作用:将指定队列与主题交换器绑定,仅当消息的路由键与绑定的路由键**完全一致**时,消息才会被路由到该队列* 特性:基于路由键的精确匹配(一对一路由),适用于需要精准投递消息到特定处理逻辑的场景** @return 绑定关系实例(Spring 管理的 Binding 对象,描述队列与主题交换器的精确路由规则)*/@Bean  // 声明为 Spring Bean,自动注册到容器中,供其他组件注入使用public Binding topicBinding1() {/** BindingBuilder:Spring AMQP 提供的绑定关系构建工具类,用于定义队列与交换器的精确绑定规则* bind(topicQueue1()):指定要绑定的队列(目标队列,接收精确路由的消息)*   - topicQueue1():需提前通过 @Bean 声明的主题队列(如类中的 topicQueue1() 方法返回的队列)* to(topicExchange()):指定目标交换器(消息来源的主题交换器)*   - topicExchange():需提前通过 @Bean 声明的主题交换器(如类中的 topicExchange() 方法返回的交换器)* with(TOPIC_ROUTING_KEY1):指定绑定的路由键(关键参数)*   - 路由键(Routing Key):消息的“精准地址标签”,主题交换器仅将路由键与此值**完全一致**的消息路由到该队列*   - 示例:若 TOPIC_ROUTING_KEY1 = "user.login.success",则只有路由键为该值的消息会被路由到绑定的队列*/return BindingBuilder.bind(topicQueue1())  // 指定要绑定的主题队列(接收精确路由的消息).to(topicExchange())   // 指定目标主题交换器(消息来源的交换器).with(TOPIC_ROUTING_KEY1);  // 指定精确匹配的路由键(仅匹配此键的消息会被路由)}/*** 绑定队列2到主题交换机(Topic Exchange),实现多级模式匹配路由* 作用:将指定队列与主题交换器绑定,接收路由键以 `topic.key2.` 开头的所有消息(支持任意后续内容)* 特性:基于通配符 `#` 的多级匹配(模糊路由),适用于需要批量处理某一主题下所有子事件的场景** @return 绑定关系实例(Spring 管理的 Binding 对象,描述队列与主题交换器的多级路由规则)*/@Bean  // 声明为 Spring Bean,自动注册到容器中,供其他组件注入使用public Binding topicBinding2() {/** BindingBuilder:Spring AMQP 提供的绑定关系构建工具类,用于定义队列与交换器的多级绑定规则* bind(topicQueue2()):指定要绑定的队列(目标队列,接收多级匹配的消息)*   - topicQueue2():需提前通过 @Bean 声明的主题队列(如类中的 topicQueue2() 方法返回的队列)* to(topicExchange()):指定目标交换器(消息来源的主题交换器)*   - topicExchange():需提前通过 @Bean 声明的主题交换器(如类中的 topicExchange() 方法返回的交换器)* with(TOPIC_ROUTING_KEY2):指定绑定的路由键模式(关键参数)*   - 路由键模式:`topic.key2.#`(`#` 为通配符,匹配零个或多个单词)*   - 匹配规则:所有路由键以 `topic.key2.` 开头的消息(如 `topic.key2.x`、`topic.key2.x.y`、`topic.key2.123`)均会被路由到该队列*/return BindingBuilder.bind(topicQueue2())  // 指定要绑定的主题队列(接收多级匹配的消息).to(topicExchange())   // 指定目标主题交换器(消息来源的交换器).with(TOPIC_ROUTING_KEY2);  // 指定多级匹配的路由键模式(`topic.key2.#`)}

2.3 消息序列化与转换(MessageConverterConfig)

Spring AMQP 默认使用 SimpleMessageConverter(支持 String、byte[] 等基础类型),但生产环境更推荐使用 JSON 格式传递复杂对象。通过 Jackson2JsonMessageConverter实现 Java 对象与 JSON 的互转:

 /*** 配置 Jackson2JsonMessageConverter(JSON 消息转换器,推荐生产环境使用)* 作用:定义消息的序列化(Java 对象 → JSON 字符串)和反序列化(JSON 字符串 → Java 对象)规则,基于 Jackson 库实现* 核心特性:* - 支持复杂对象(嵌套对象、集合、泛型等)的序列化与反序列化* - 跨语言兼容(JSON 为通用数据格式,支持多语言系统间消息传递)* - 灵活配置(可通过 Jackson 的 ObjectMapper 自定义序列化规则)* 创建并配置 Jackson2JsonMessageConverter 实例* 注意:需确保项目中包含 Jackson 依赖(如 jackson-databind),否则会抛出 ClassNotFoundException** @return Jackson2JsonMessageConverter 实例*/@Beanpublic Jackson2JsonMessageConverter jackson2JsonMessageConverter() {// 直接初始化 Jackson2JsonMessageConverter(默认使用 Jackson 的 ObjectMapper)return new Jackson2JsonMessageConverter();}/*** (可选扩展)自定义 Jackson 的 ObjectMapper 以优化序列化规则* 说明:通过自定义 ObjectMapper 可配置日期格式、空值处理、枚举序列化方式等** @return 自定义配置的 Jackson2JsonMessageConverter 实例*/@Beanpublic MessageConverter customJacksonMessageConverter() {// 自定义 ObjectMapper(可选)ObjectMapper objectMapper = new ObjectMapper();// 配置日期格式(如 "yyyy-MM-dd HH:mm:ss")objectMapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));// 配置空值不序列化(默认序列化为空对象)objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);// 配置枚举使用名称而非序号(如 ENUM_VALUE 而非 0)objectMapper.configure(SerializationFeature.WRITE_ENUMS_USING_TO_STRING, true);// 使用自定义 ObjectMapper 创建转换器return new Jackson2JsonMessageConverter(objectMapper);}

2.4 消息发送与确认(RabbitTemplateConfig)

通过 RabbitTemplate封装消息发送逻辑,并配置发布者确认返回机制,确保消息“不丢失”。

package com.dwl.foundation.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.nio.charset.StandardCharsets;/*** @ClassName RabbitTemplateConfig* @Description RabbitTemplate 配置类(核心消息发送配置)* 作用:定义消息发送的行为(如交换器路由、消息转换、确认机制等),并注入到 Spring 容器中* 核心功能:* - 配置发布者确认(Publisher Confirm):追踪消息是否成功到达交换器* - 配置发布者返回(Publisher Return):处理无法路由到队列的消息* - 可选扩展:消息转换器、重试策略等(根据业务需求配置)* @Version 1.0.0* @Date 2025* @Author By Dwl*/
@Configuration
@Slf4j
public class RabbitTemplateConfig {/*** 配置 RabbitTemplate(Spring AMQP 消息发送核心组件)* 作用:定义消息发送的行为(如交换器路由、消息转换、确认机制等),并注入到 Spring 容器中** @param connectionFactory 连接工厂(由 Spring 自动创建,用于管理与 RabbitMQ 服务器的连接)* @return 配置完成的 RabbitTemplate 实例*/@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {// 初始化 RabbitTemplate,传入连接工厂(必须参数,用于建立与 RabbitMQ 的连接)RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);/*---------------------------------------------------------------------------| 1. 配置发布者确认回调(Confirm Callback)| 作用:监听消息是否成功到达 RabbitMQ 交换器(Exchange)| 触发条件:当 RabbitMQ 处理完消息(无论成功或失败)后,会向生产者发送确认(ack/nack)| 核心价值:追踪消息发送状态(成功/失败),确保消息“不丢失”*--------------------------------------------------------------------------*/rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {// correlationData:消息关联标识(通过 send 方法传入的 CorrelationData 对象,用于唯一标识本次发送的消息)// ack:确认结果(true=消息成功到达交换器;false=消息未到达交换器)// cause:失败时的具体原因(如交换器不存在、网络异常、路由键错误等)String messageId = (correlationData != null) ? correlationData.getId() : "未知";if (ack) {// 消息成功到达交换器的日志记录(关键操作审计,用于追踪消息发送状态)log.info("✅ 消息发送到交换机成功,消息ID: {}", messageId);} else {// 消息发送到交换器失败的日志记录(记录失败原因和消息 ID,便于问题排查)log.error("❌ 消息发送到交换机失败,原因: {},消息ID: {}", cause, messageId);/** 可选扩展:触发告警或重试逻辑(根据业务需求)* 示例 1:调用重试服务重新发送消息(需实现重试逻辑,如网络临时故障时重试)* retryService.retrySend(correlationData);** 示例 2:发送告警通知(如邮件、钉钉机器人),通知运维人员及时处理* alertService.sendAlert("消息发送失败,消息ID: " + messageId, cause);*/}});/*---------------------------------------------------------------------------| 2. 配置发布者返回回调(Return Callback)| 作用:监听消息无法被路由到任何队列的情况(交换器接收了消息,但无匹配的队列绑定)| 触发条件:当消息通过交换器路由时,若没有任何队列与该交换器的绑定键匹配,RabbitMQ 会将消息返回给生产者| 核心价值:处理“未被路由”的消息(如路由键错误、交换器未绑定队列),避免消息丢失*--------------------------------------------------------------------------*/rabbitTemplate.setReturnsCallback(returned -> {// returned:返回的消息元数据(包含消息内容、交换器、路由键、响应码等信息)// 提取消息体(需将字节数组转换为字符串,假设使用 UTF-8 编码)String messageBody = new String(returned.getMessage().getBody(), StandardCharsets.UTF_8);// 记录详细的返回日志(包含消息内容、响应码、交换器、路由键等关键信息,便于定位问题)log.error("""❌ 消息发送到队列失败(未被路由)!消息内容: {}          // 消息核心业务数据(如 JSON 字符串)响应码: {}(含义:{})  // RabbitMQ 返回的状态码(如 312=NO_ROUTE)响应文本: {}          // 状态码的文字描述(如 "NO_ROUTE")交换器: {}            // 目标交换器名称(消息原本要发送到的交换器)路由键: {}            // 消息使用的路由键(用于匹配队列绑定)""",messageBody,returned.getReplyCode(),getReplyCodeMeaning(returned.getReplyCode()),  // 自定义方法解析响应码含义returned.getReplyText(),returned.getExchange(),returned.getRoutingKey());/** 可选扩展:处理未被路由的消息(根据业务需求)* 示例 1:将消息存入死信队列(Dead Letter Queue),后续人工核查或自动重试* deadLetterService.saveToDeadLetterQueue(returned.getMessage());** 示例 2:记录到数据库待重试表,定时任务重试发送(需结合重试策略)* retryRecordService.save(returned.getMessage());*/});/*---------------------------------------------------------------------------| 3. 可选扩展配置(根据业务需求添加)* 以下配置为可选,可根据实际业务场景启用或调整*--------------------------------------------------------------------------*/// 配置消息转换器(默认使用 SimpleMessageConverter,可自定义为 JSON 转换器)// 作用:定义消息的序列化/反序列化规则(如将 Java 对象转换为 JSON 字符串)// 示例:使用 Jackson2JsonMessageConverter 替代默认的 SimpleMessageConverter// rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());// 配置消息发送重试策略(需配合 spring.rabbitmq.listener.simple.retry 启用)// 作用:消息发送失败时自动重试(如网络临时故障导致的发送失败)// 示例:设置重试次数为 3 次,每次间隔 1 秒// rabbitTemplate.setRetryTemplate(new RetryTemplate());// retryTemplate.setBackOffPolicy(new FixedBackOffPolicy(1000));  // 间隔 1 秒// retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3));       // 最多重试 3 次return rabbitTemplate;}/*** 辅助方法:解析 RabbitMQ 响应码的含义(示例)* 作用:将数字响应码转换为可读的文字描述,帮助开发者快速定位问题** @param replyCode 响应码(如 312=NO_ROUTE,313=NO_CONSUMERS 等)* @return 响应码的文字描述(如 "NO_ROUTE(消息无匹配的队列)")*/private String getReplyCodeMeaning(int replyCode) {// 使用 switch 表达式匹配常见响应码(可根据需要扩展更多码)return switch (replyCode) {case 312 -> "NO_ROUTE(消息无匹配的队列,交换器未绑定对应路由键的队列)";case 313 -> "NO_CONSUMERS(队列存在但无活跃消费者,消息无法被消费)";case 404 -> "NOT_FOUND(交换器或队列不存在,可能是名称拼写错误或未声明)";case 320 -> "PRECONDITION_FAILED(队列/交换器属性不匹配,如持久化设置冲突)";default -> "未知错误码(请查阅 RabbitMQ 官方文档)";};}
}

2.5 消息消费者(MessageConsumer)

通过 @RabbitListener注解监听队列,实现消息的异步处理。支持点对点、广播、主题三种模式。

package com.dwl.foundation.consumer;import com.dwl.foundation.config.RabbitMQConfig;
import com.dwl.foundation.model.BusinessMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @ClassName MessageConsumer* @Description 消息消费者类(核心消息处理组件)* 作用:监听 RabbitMQ 队列中的消息,将消息反序列化为业务对象(BusinessMessage),并执行具体的业务逻辑处理* 特性:支持多种消息模式(点对点、广播、主题),通过 @RabbitListener 注解灵活配置监听队列* @Version 1.0.0* @Date 2025* @Author By Dwl*/
@Component
@Slf4j
public class MessageConsumer {/*---------------------------------------------------------------------------| 方法 1:监听点对点(Direct)交换器的队列| 作用:接收 Direct 交换器路由的消息,执行点对点业务处理(如订单通知特定用户)*--------------------------------------------------------------------------*//*** 监听并消费点对点(Direct)交换器路由的消息* 核心逻辑:* 1. 通过 @RabbitListener 注解声明监听的目标队列(来自 RabbitMQConfig.DIRECT_QUEUE_NAME)* 2. 自动接收队列中的消息并反序列化为 BusinessMessage 对象(依赖消息转换器)* 3. 记录日志并执行业务逻辑处理(如更新数据库、调用外部接口)* 4. 支持消息确认模式(AUTO/MANUAL),默认 AUTO(异常时自动拒绝消息)** @param message 接收到的消息体(已反序列化的 BusinessMessage 对象)*                - 若消息无法反序列化(如格式错误),会触发异常并进入错误处理流程*                - 若使用 MANUAL 确认模式,需手动调用 channel.basicAck() 确认消息*/@RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE_NAME,  // 监听的队列名称(来自配置类,如 "direct_queue")ackMode = "AUTO"  // 消息确认模式(AUTO:自动确认;MANUAL:手动确认,默认 AUTO)// 可选配置示例:// concurrency = "3"       // 并发消费者数量(3 个消费者并行处理消息)// prefetch = "10"         // 预取消息数(消费者最多预取 10 条消息))public void receiveDirectMessage(BusinessMessage message) {// 步骤 1:记录接收到的消息日志(生产环境建议记录关键信息,如消息 ID、时间戳)log.info("接收到点对点消息: {}", message);  // 输出消息内容(含 ID、内容、发送时间)// 步骤 2:执行业务逻辑处理(根据实际需求实现)try {// 示例:调用业务服务处理消息(如更新订单状态)log.info("调用业务服务处理点对点消息。");/** 若使用 MANUAL 确认模式(ackMode = "MANUAL"),需在处理成功后手动确认消息:* channel.basicAck(deliveryTag, false);  // deliveryTag 由 Spring 自动注入(需通过参数获取)* 若处理失败,调用 channel.basicNack(deliveryTag, false, requeue = true);  // 消息重新入队*/} catch (Exception e) {// 异常处理:记录错误日志,可根据需要触发消息重试或人工干预log.error("点对点消息处理失败,消息内容:{}", message, e);  // 记录完整异常堆栈/** 手动确认模式下的异常处理示例:* throw new RuntimeException("处理失败", e);  // 抛出异常触发消息回滚(重新入队)*/}}/*---------------------------------------------------------------------------| 方法 2:监听广播消息(Fanout 交换器队列 1)| 作用:监听 Fanout 交换器广播到队列 1 的消息,执行广播场景下的业务逻辑(如库存同步)*--------------------------------------------------------------------------*//*** 消费广播消息(队列 1 监听器)* 作用:监听并消费 Fanout 类型交换器广播到队列 1 的消息(Fanout 交换器会广播到所有绑定队列)* 特性:同一消息会被所有绑定的队列接收(如队列 1 和队列 2),适用于多模块并行处理同一事件** @param message 接收到的广播消息(已反序列化的 BusinessMessage 对象)*/@RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE1_NAME)  // 监听 Fanout 交换器绑定的队列 1(如 "fanout_queue1")public void receiveFanoutMessage1(BusinessMessage message) {// 记录日志:输出队列 1 接收到的消息内容log.info("队列1接收到广播消息: {}", message);  // 输出消息 ID、内容、发送时间/** 处理消息的业务逻辑(示例)* 实际场景中可能包含:* 1. 消息校验(如检查消息完整性、签名)* 2. 数据转换(如将消息对象转换为数据库实体)* 3. 业务操作(如更新数据库、调用外部接口)* 4. 异常处理(如捕获业务异常并记录,或触发重试)*/try {// 示例:假设 BusinessMessage 包含业务 ID 和内容,此处进行业务处理String businessId = message.getId();       // 消息唯一 ID(用于追踪)String content = message.getContent();     // 消息核心内容(如 JSON 格式的业务数据)log.debug("开始处理队列1的业务消息,业务ID: {}, 内容: {}", businessId, content);// ... 具体业务逻辑代码(如调用库存服务扣减库存)...log.info("队列1的业务消息处理完成,业务ID: {}", businessId);  // 记录处理成功日志} catch (Exception e) {// 异常处理:记录错误日志,必要时触发补偿机制log.error("队列1处理广播消息失败,消息内容: {}, 异常信息: {}", message, e.getMessage(), e);// 若使用手动确认模式,可在此抛出异常触发消息回滚(重新入队)// throw new RuntimeException("队列1处理失败", e);}}/*---------------------------------------------------------------------------| 方法 3:监听广播消息(Fanout 交换器队列 2)| 作用:监听 Fanout 交换器广播到队列 2 的消息,执行另一维度的广播业务逻辑(如物流通知)*--------------------------------------------------------------------------*//*** 消费广播消息(队列 2 监听器)* 作用:监听并消费 Fanout 类型交换器广播到队列 2 的消息(与队列 1 独立,可执行不同逻辑)* 特性:广播消息会被所有绑定队列接收,因此队列 1 和队列 2 可并行处理同一消息** @param message 接收到的广播消息(已反序列化的 BusinessMessage 对象)*/@RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE2_NAME)  // 监听 Fanout 交换器绑定的队列 2(如 "fanout_queue2")public void receiveFanoutMessage2(BusinessMessage message) {// 记录日志:输出队列 2 接收到的消息内容log.info("队列2接收到广播消息: {}", message);  // 输出消息 ID、内容、发送时间/** 处理消息的业务逻辑(示例)* 注意:广播消息会被所有绑定的队列接收,因此队列 1 和队列 2 可能执行不同的业务逻辑* 例如:* - 队列 1 用于订单通知,队列 2 用于库存同步* - 队列 1 记录操作日志,队列 2 触发数据分析*/try {// 示例:假设 BusinessMessage 包含操作类型和数据,此处根据类型分发处理String messageType = message.getContent();  // 假设内容为操作类型(如 "ORDER_CREATED")log.debug("开始处理队列2的广播消息,消息类型: {}, 内容: {}", messageType, message.getContent());// 根据消息类型执行不同逻辑if ("ORDER_CREATED".equals(messageType)) {// 处理订单创建后的库存扣减逻辑log.info("处理订单创建后的库存扣减逻辑,订单ID: {}", message.getId());} else if ("ORDER_CANCELLED".equals(messageType)) {// 处理订单取消后的库存恢复逻辑log.info("处理订单取消后的库存恢复逻辑,订单ID: {}", message.getId());}log.info("队列2的广播消息处理完成,消息类型: {}", messageType);  // 记录处理成功日志} catch (Exception e) {// 异常处理:记录错误日志,必要时触发补偿机制log.error("队列2处理广播消息失败,消息内容: {}, 异常信息: {}", message, e.getMessage(), e);// 若需要重试,可在此抛出异常(需配合手动确认模式和重试配置)// throw new RuntimeException("队列2处理失败", e);}}/*---------------------------------------------------------------------------| 方法 4:监听主题消息(Topic 交换器队列 1)| 作用:监听 Topic 交换器广播到队列 1 的消息(路由键匹配 "topic.key1")*--------------------------------------------------------------------------*//*** 消费主题消息1* 作用:监听并消费 Topic 交换器广播到队列 1 的消息(路由键匹配 "topic.key1")* 特性:Topic 交换器通过路由键模式匹配分发消息(如 "topic.key1" 精确匹配)** @param message 接收到的主题消息(已反序列化的 BusinessMessage 对象)*/@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE1_NAME)  // 监听 Topic 交换器绑定的队列 1(如 "topic_queue1")public void receiveTopicMessage1(BusinessMessage message) {log.info("主题队列1接收到消息: {}", message);  // 记录消息内容// 处理消息的业务逻辑(示例:如用户登录日志记录)}/*---------------------------------------------------------------------------| 方法 5:监听主题消息(Topic 交换器队列 2)| 作用:监听并消费 Topic 交换器广播到队列 2 的消息(路由键匹配 "topic.key2.#")*--------------------------------------------------------------------------*//*** 消费主题消息2* 作用:监听并消费 Topic 交换器广播到队列 2 的消息(路由键匹配 "topic.key2.#",如 "topic.key2.x")* 特性:Topic 交换器支持通配符匹配(* 匹配一个单词,# 匹配零或多个单词)** @param message 接收到的主题消息(已反序列化的 BusinessMessage 对象)*/@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE2_NAME)  // 监听 Topic 交换器绑定的队列 2(如 "topic_queue2")public void receiveTopicMessage2(BusinessMessage message) {log.info("主题队列2接收到消息: {}", message);  // 记录消息内容// 处理消息的业务逻辑(示例:如用户操作数据分析)}
}

2.6 消息生产者(MessageProducer)

封装消息发送逻辑,支持点对点、广播、主题三种模式,确保消息正确路由。

package com.dwl.foundation.producer;import com.dwl.foundation.config.RabbitMQConfig;
import com.dwl.foundation.model.BusinessMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;
import java.util.UUID;/*** @ClassName MessageProducer* @Description 消息生产者* @Version 1.0.0* @Date 2025* @Author By Dwl*/
@Component
@Slf4j
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送点对点(Direct)类型的消息到 RabbitMQ* 核心逻辑:* 1. 构造业务消息对象(包含唯一ID、内容、发送时间)* 2. 生成消息关联标识(CorrelationData)用于追踪* 3. 通过 RabbitTemplate 发送到指定交换器和路由键* 4. 记录发送结果日志* 特性:Direct 交换器根据路由键精确路由到绑定的队列(一对一通信),适用于需要精准投递的场景(如订单通知特定用户)** @param content 消息内容(业务自定义数据,如文本、JSON 等)*/public void sendDirectMessage(String content) {/*---------------------------------------------------------------------------| 步骤 1:构造业务消息对象(封装消息元数据与核心内容)| 作用:定义消息的业务属性(如唯一 ID、发送时间)和核心内容,确保消息可追踪、可识别| 关键字段说明:| - id:全局唯一消息 ID(UUID 生成),用于全链路追踪消息状态(如发送成功/失败、消费情况)| - content:消息核心业务数据(如 JSON 字符串、对象序列化后的内容)| - sendTime:消息发送时间戳(毫秒级),用于监控消息延迟或日志时间轴分析*--------------------------------------------------------------------------*/BusinessMessage message = new BusinessMessage();message.setId(UUID.randomUUID().toString());  // 生成全局唯一消息 ID(避免消息重复或混淆)message.setContent(content);                  // 设置消息核心内容(业务自定义数据,如订单变更信息)message.setSendTime(LocalDateTime.now());     // 记录消息发送时间(精确到毫秒,用于后续监控或问题排查)/*---------------------------------------------------------------------------| 步骤 2:构建消息关联标识(用于发布者确认/返回机制)| 作用:绑定消息与 RabbitMQ 的确认回调逻辑,通过唯一标识追踪消息的发送状态(成功/失败)| 核心对象:CorrelationData(Spring AMQP 提供的消息关联类)| 关键设计:使用消息 ID 作为关联标识(correlationData.setId(message.getId())),确保回调时能唯一对应本次发送的消息*--------------------------------------------------------------------------*/CorrelationData correlationData = new CorrelationData(message.getId());/*---------------------------------------------------------------------------| 步骤 3:通过 RabbitTemplate 发送消息到 Direct 交换器(核心操作)| 作用:将消息发送到 Direct 交换器(点对点交换器),由交换器根据路由键精确路由到绑定的队列| 核心参数说明(rabbitTemplate.convertAndSend()):| 1. exchange: 目标交换器名称(来自 RabbitMQConfig.DIRECT_EXCHANGE_NAME,需提前声明为 Direct 类型)|    - Direct 交换器特性:根据路由键(Routing Key)精确匹配绑定的队列(仅路由键完全一致的队列会接收消息)| 2. routingKey: 路由键(来自 RabbitMQConfig.DIRECT_ROUTING_KEY,需与队列绑定的路由键一致)|    - 示例:若队列 "direct_queue" 绑定路由键 "direct_key",则只有路由键为 "direct_key" 的消息会被路由到该队列| 3. message: 待发送的业务消息对象(Spring 会通过消息转换器自动序列化为字节数组,如 JSON 格式)| 4. correlationData: 消息关联标识(可选,用于发送确认或返回回调,追踪消息是否成功到达交换器)*--------------------------------------------------------------------------*/try {rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE_NAME,  // Direct 交换器名称(如 "direct_exchange")RabbitMQConfig.DIRECT_ROUTING_KEY,    // 路由键(如 "direct_routing_key",需与队列绑定键一致)message,                              // 待发送的业务消息对象(自动序列化为 JSON 或其他格式)correlationData                       // 消息关联标识(绑定消息 ID,用于回调));/*---------------------------------------------------------------------------| 步骤 4:记录发送成功日志(生产环境建议记录关键信息)| 作用:记录消息发送的核心信息(消息 ID、内容),便于后续问题排查和业务追溯| 日志内容:包含消息 ID(唯一标识)、内容摘要(避免敏感信息泄露),确保审计的完整性和安全性*--------------------------------------------------------------------------*/log.info("点对点消息发送成功,消息ID:{},内容摘要:{}", message.getId(), content);} catch (Exception e) {/*---------------------------------------------------------------------------| 异常处理(关键!避免消息丢失无感知)| 作用:捕获消息发送过程中的异常(如网络连接失败、交换器不存在、序列化失败等)| 常见异常类型:| - AmqpConnectException:RabbitMQ 连接失败(如服务器宕机、网络中断)| - SerializationException:消息序列化失败(如 BusinessMessage 无法转换为 JSON)| - IllegalArgumentException:交换器/路由键错误(如配置错误导致交换器未声明)| - NoSuchExchangeException:目标交换器不存在(未提前声明或名称错误)*--------------------------------------------------------------------------*/log.error("点对点消息发送失败,消息ID:{},内容:{},异常信息:{}",message.getId(), content, e.getMessage(), e);  // 记录完整异常堆栈(e 包含堆栈信息)/*---------------------------------------------------------------------------| 可选扩展:触发消息重试或告警(根据业务需求)| 示例1:使用 Spring Retry 实现自动重试(需配合 @Retryable 注解)* @Retryable(value = {AmqpConnectException.class, NoSuchExchangeException.class},*            maxAttempts = 3, backoff = @Backoff(delay = 1000))* public void sendDirectMessage(String content) { ... }*| 示例2:调用自定义重试服务(手动控制重试逻辑)* retryService.retry(() -> rabbitTemplate.convertAndSend(...), 3, 1000);*| 示例3:发送告警通知(如邮件、钉钉机器人)* alertService.sendAlert("点对点消息发送失败,消息 ID: " + message.getId(), e);*--------------------------------------------------------------------------*/}}/*** 发送广播消息(基于 Fanout 交换器)* 作用:将业务消息通过 Fanout 交换器广播到所有绑定的队列,实现多消费者并行消费* 特性:Fanout 交换器会忽略路由键,将消息广播到所有绑定的队列(“一对多”分发),适用于多模块数据同步、系统通知广播等场景*/public void sendFanoutMessage(String content) {/*---------------------------------------------------------------------------| 步骤1:构造业务消息对象(封装消息元数据与核心内容)| 作用:定义消息的业务属性(如唯一 ID、发送时间)和核心内容,确保消息可追踪、可识别| 关键字段说明:| - id:全局唯一消息 ID(UUID 生成),用于全链路追踪消息状态(如发送成功/失败、消费情况)| - content:消息核心业务数据(如 JSON 字符串、对象序列化后的内容)| - sendTime:消息发送时间戳(毫秒级),用于监控消息延迟或日志时间轴分析*--------------------------------------------------------------------------*/BusinessMessage message = new BusinessMessage();message.setId(UUID.randomUUID().toString());  // 生成全局唯一消息 ID(避免消息重复或混淆)message.setContent(content);                  // 设置消息核心内容(业务自定义数据,如订单变更信息)message.setSendTime(LocalDateTime.now());     // 记录消息发送时间(精确到毫秒,用于后续监控或问题排查)/*---------------------------------------------------------------------------| 步骤2:创建消息关联标识(用于发布者确认/返回机制)| 作用:绑定消息与 RabbitMQ 的确认回调逻辑,通过唯一标识追踪消息的发送状态(成功/失败)| 核心对象:CorrelationData(Spring AMQP 提供的消息关联类)| 关键设计:使用消息 ID 作为关联标识(correlationData.setId(message.getId())),确保回调时能唯一对应本次发送的消息*--------------------------------------------------------------------------*/CorrelationData correlationData = new CorrelationData(message.getId());/*---------------------------------------------------------------------------| 步骤3:通过 RabbitTemplate 发送消息到 Fanout 交换器| 作用:将消息发送到 Fanout 交换器(扇出交换器),由交换器完成广播分发| 核心参数说明(rabbitTemplate.convertAndSend()):| 1. exchange: 目标交换器名称(来自 RabbitMQConfig.FANOUT_EXCHANGE_NAME,需提前声明为 Fanout 类型)| 2. routingKey: 路由键(Fanout 交换器**忽略路由键**,因此传空字符串即可)| 3. message: 待发送的业务消息对象(Spring 会通过消息转换器自动序列化为字节数组,如 JSON 格式)| 4. correlationData: 消息关联标识(用于发布者确认/返回的回调逻辑,追踪消息是否成功到达交换器)*--------------------------------------------------------------------------*/try {rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_NAME,  // Fanout 交换器名称(如 "fanout_exchange")"",                                  // 路由键(Fanout 交换器不依赖路由键,留空)message,                             // 待发送的业务消息对象(自动序列化为 JSON 或其他格式)correlationData                      // 消息关联标识(绑定消息 ID,用于回调));/*---------------------------------------------------------------------------| 步骤4:记录消息发送日志(关键操作审计)| 作用:记录消息发送的核心信息(消息 ID、内容),便于后续问题排查和业务追溯| 日志内容:包含消息 ID(唯一标识)、内容摘要(避免敏感信息泄露),确保审计的完整性和安全性*--------------------------------------------------------------------------*/log.info("广播消息发送成功,消息 ID: {}, 内容摘要: {}", message.getId(), content);} catch (Exception e) {/*---------------------------------------------------------------------------| 异常处理:捕获消息发送过程中的异常(如网络连接失败、交换器不存在、序列化失败等)| 作用:避免程序崩溃,记录详细错误信息(包含消息 ID、内容、异常堆栈),便于快速定位问题| 异常类型示例:| - AmqpConnectException:RabbitMQ 连接失败(如服务器宕机、网络中断)| - SerializationException:消息序列化失败(如 BusinessMessage 无法转换为 JSON)| - IllegalArgumentException:交换器名称错误(如配置错误导致交换器未声明)*--------------------------------------------------------------------------*/log.error("广播消息发送失败,消息 ID: {}, 内容: {}, 异常信息: {}",message.getId(), content, e.getMessage(), e);  // 记录完整异常堆栈(e 包含堆栈信息)/*---------------------------------------------------------------------------| 可选扩展:触发消息重试或告警(根据业务需求)| 示例1:使用 Spring Retry 实现自动重试(需配合 @Retryable 注解)* @Retryable(value = {AmqpConnectException.class}, maxAttempts = 3, backoff = @Backoff(delay = 1000))* public void sendFanoutMessage(String content) { ... }*| 示例2:调用自定义重试服务(手动控制重试逻辑)* retryService.retry(() -> rabbitTemplate.convertAndSend(...), 3, 1000);*| 示例3:发送告警通知(如邮件、钉钉机器人)* alertService.sendAlert("广播消息发送失败,消息 ID: " + message.getId(), e);*--------------------------------------------------------------------------*/}}/*** 发送主题模式消息(基于 Topic Exchange)* 作用:将业务消息通过主题交换机(Topic Exchange)路由到匹配路由键模式的队列,实现多维度消息分发* 特性:支持通配符路由(* 匹配一个单词,# 匹配零或多个单词),适用于按业务维度分类的消息广播(如日志分级、多服务协作)*/public void sendTopicMessage(String routingKey, String content) {/*---------------------------------------------------------------------------| 步骤1:构造业务消息对象(封装消息元数据与内容)| 作用:定义消息的业务属性(如唯一 ID、发送时间)和核心内容,确保消息可追踪和业务可识别*--------------------------------------------------------------------------*/BusinessMessage message = new BusinessMessage();message.setId(UUID.randomUUID().toString());  // 生成全局唯一消息 ID(用于全链路追踪)message.setContent(content);                  // 设置消息核心内容(业务自定义数据,如 JSON 字符串)message.setSendTime(LocalDateTime.now());     // 记录消息发送时间戳(用于监控延迟或日志分析)/*---------------------------------------------------------------------------| 步骤2:创建消息关联标识(用于发布者确认/返回机制)| 作用:绑定消息与 RabbitMQ 的确认回调,通过唯一标识追踪消息的发送状态(成功/失败)| 关键属性:使用消息 ID 作为关联标识,确保回调时能唯一对应本次发送的消息*--------------------------------------------------------------------------*/CorrelationData correlationData = new CorrelationData(message.getId());/*---------------------------------------------------------------------------| 步骤3:通过 RabbitTemplate 发送消息到主题交换机| 作用:将消息发送到主题交换机(Topic Exchange),由交换机根据路由键模式匹配目标队列| 核心参数说明:| - exchange: 主题交换机名称(来自 RabbitMQConfig.TOPIC_EXCHANGE_NAME)| - routingKey: 路由键(决定消息被路由到哪些队列,支持 * 和 # 通配符)| - message: 待发送的业务消息对象(Spring 会通过消息转换器序列化为字节数组)| - correlationData: 消息关联标识(用于发布者确认/返回的回调)*--------------------------------------------------------------------------*/try {rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_NAME,  // 主题交换机名称(已声明为持久化)routingKey,                          // 路由键(需匹配队列绑定的模式,如 "topic.key1" 或 "topic.key2.x")message,                             // 待发送的业务消息对象(自动序列化为 JSON 或其他格式)correlationData                      // 消息关联标识(用于追踪发送状态));/*---------------------------------------------------------------------------| 步骤4:记录消息发送日志(关键操作审计)| 作用:记录消息发送的关键信息(路由键、消息 ID、内容),便于问题排查和业务追溯*--------------------------------------------------------------------------*/log.info("主题消息发送成功,路由键: {}, 消息 ID: {}, 内容: {}", routingKey, message.getId(), content);} catch (Exception e) {/*---------------------------------------------------------------------------| 异常处理:捕获消息发送过程中的异常(如连接失败、序列化失败、交换器不存在等)| 作用:避免程序崩溃,记录详细错误信息(包含消息 ID、路由键、异常堆栈),便于故障定位*--------------------------------------------------------------------------*/log.error("主题消息发送失败,路由键: {}, 消息 ID: {}, 内容: {}, 异常信息: {}",routingKey, message.getId(), content, e.getMessage(), e);/*---------------------------------------------------------------------------| 可选扩展:触发消息重试或告警(根据业务需求)| 示例:调用重试服务重新发送消息,或发送告警通知运维人员* retryService.retrySend(routingKey, content, e);* alertService.sendAlert("主题消息发送失败", e);*--------------------------------------------------------------------------*/}}}

2.7 控制器(测试入口)

通过 REST 接口触发消息发送,验证消息流转逻辑。

package com.dwl.foundation.controller;import com.dwl.foundation.producer.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;/*** @ClassName MessageController* @Description 测试控制器,用于发送消息* @Version 1.0.0* @Date 2025* @Author By Dwl*/
@RestController
public class MessageController {/*** 普通消息生产者(用于发送点对点、广播、主题消息)* 作用:封装 RabbitMQ 基础消息发送逻辑(如交换器、路由键配置、消息序列化)*/@Autowiredprivate MessageProducer messageProducer;/*** 发送点对点消息(Direct Exchange)* 接口说明:通过 GET 请求发送点对点消息,适用于一对一消息传递场景(如用户通知、订单状态更新)** @param content 消息内容(文本格式,如 "用户ID:123 订单已支付")* @return 发送结果提示("点对点消息发送成功")* @RabbitMQ 机制关联:* - 交换器类型:Direct Exchange(直接交换器)* - 路由规则:消息路由键与队列绑定键完全匹配时,消息被路由到目标队列* - 典型场景:用户登录成功通知(仅通知特定用户)、订单支付结果通知(仅通知订单相关服务)*/@GetMapping("/sendDirect")public String sendDirectMessage(@RequestParam("content") String content) {// 调用普通消息生产者发送点对点消息messageProducer.sendDirectMessage(content);return "点对点消息发送成功";}/*** 发送广播消息(Fanout Exchange)* 接口说明:通过 GET 请求发送广播消息,适用于一对多消息传递场景(如系统公告、全局事件通知)** @param content 消息内容(文本格式,如 "系统将于20:00进行维护")* @return 发送结果提示("广播消息发送成功")* @RabbitMQ 机制关联:* - 交换器类型:Fanout Exchange(扇出交换器)* - 路由规则:忽略路由键,将消息广播到所有绑定的队列(无论绑定键是什么)* - 典型场景:全局促销活动通知(所有用户)、日志收集(所有服务节点)*/@GetMapping("/sendFanout")public String sendFanoutMessage(@RequestParam("content") String content) {// 调用普通消息生产者发送广播消息messageProducer.sendFanoutMessage(content);return "广播消息发送成功";}/*** 发送主题消息(Topic Exchange)* 接口说明:通过 GET 请求发送主题消息,适用于按主题分类的一对多消息传递场景(如商品分类通知、区域化消息)** @param routingKey 消息路由键(支持通配符:* 匹配一个单词,# 匹配零或多个单词,如 "order.*"、"#.payment")* @param content    消息内容(文本格式,如 "商品ID:456 库存不足")* @return 发送结果提示("主题消息发送成功")* @RabbitMQ 机制关联:* - 交换器类型:Topic Exchange(主题交换器)* - 路由规则:消息路由键与队列绑定键通过通配符匹配(如绑定键 "order.payment" 匹配路由键 "order.payment.success")* - 典型场景:商品价格变更通知(仅通知关注该商品的用户)、区域天气提醒(仅通知特定区域用户)*/@GetMapping("/sendTopic")public String sendTopicMessage(@RequestParam("routingKey") String routingKey,@RequestParam("content") String content) {// 调用普通消息生产者发送主题消息(传递路由键和内容)messageProducer.sendTopicMessage(routingKey, content);return "主题消息发送成功";}}

三、生产案例:电商系统中的消息应用

3.1 场景描述

假设我们有一个电商系统,包含以下核心服务:

  • 订单服务:处理订单支付、取消等操作。
  • 库存服务:管理商品库存,扣减/恢复库存。
  • 用户服务:发送用户通知(如短信、APP 推送)。
  • 日志服务:收集系统操作日志。

3.2 消息模式选择与实现

3.2.1 订单支付成功通知(点对点模式)

需求:订单支付成功后,仅通知对应的用户(如短信通知)。

方案:使用 Direct 交换器,订单服务发送消息到 direct_exchange,路由键为 order.payment.success.{userId},用户服务监听对应的队列 direct_queue_{userId}

// 订单服务发送支付成功消息
messageProducer.sendDirectMessage("订单支付成功,用户ID: 123");// 用户服务监听专属队列
@RabbitListener(queues = "direct_queue_123")
public void handleUserPaymentNotification(BusinessMessage message) {log.info("用户 123 收到支付通知: {}", message.getContent());// 调用短信服务发送通知
}
3.2.2 系统维护通知(广播模式)

需求:系统凌晨维护时,通知所有服务节点(订单、库存、用户服务)暂停部分操作。

方案:使用 Fanout 交换器,维护服务发送广播消息到 fanout_exchange,所有服务监听各自的队列(fanout_queue_orderfanout_queue_inventory)。

// 维护服务发送广播消息
messageProducer.sendFanoutMessage("系统将于 00:00-02:00 维护,请暂停写操作");// 订单服务监听广播队列
@RabbitListener(queues = "fanout_queue_order")
public void handleMaintenanceNotification(BusinessMessage message) {log.info("订单服务收到维护通知: {}", message.getContent());// 暂停订单创建/修改操作
}
3.2.3 商品价格变更通知(主题模式)

需求:商品价格变更时,通知关注该商品的用户(如收藏用户)和促销服务。

方案:使用 Topic 交换器,商品服务发送消息到 topic_exchange,路由键为 product.price.change.{productId},用户服务监听 topic_queue_user_{productId},促销服务监听 topic_queue_promotion(匹配 product.price.change.#)。

// 商品服务发送价格变更消息
messageProducer.sendTopicMessage("product.price.change.456", "商品 456 价格变更为 99 元");// 用户服务监听特定商品变更
@RabbitListener(queues = "topic_queue_user_456")
public void handleUserPriceChange(BusinessMessage message) {log.info("用户收到商品 456 价格变更通知: {}", message.getContent());// 触发收藏用户的推送
}// 促销服务监听所有价格变更
@RabbitListener(queues = "topic_queue_promotion")
public void handlePromotionPriceChange(BusinessMessage message) {log.info("促销服务收到价格变更通知: {}", message.getContent());// 更新促销活动规则
}

四、生产环境优化与扩展

4.1 消息可靠性保障

4.1.1 持久化配置
  • 队列持久化:通过 QueueBuilder.durable()声明持久化队列(RabbitMQ 重启后保留队列)。
  • 消息持久化:在 rabbitTemplate.convertAndSend()中设置 MessageProperties.PERSISTENT_TEXT_PLAIN(默认已开启)。
  • 交换器持久化:通过 new DirectExchange(name, true, false)声明持久化交换器。
4.1.2 确认与重试机制
  • 发布者确认:通过 RabbitTemplate.ConfirmCallback监听消息是否到达交换器(如网络故障导致未到达)。
  • 返回机制:通过 RabbitTemplate.ReturnsCallback处理未被路由的消息(如路由键错误)。
  • 手动重试:结合 Spring Retry实现消息发送失败后的自动重试(如网络临时中断)。
// 配置重试策略(在 RabbitTemplateConfig 中添加)
@Bean
public RetryTemplate retryTemplate() {RetryTemplate retryTemplate = new RetryTemplate();// 重试 3 次,每次间隔 1 秒FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();backOffPolicy.setBackOffPeriod(1000);SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();retryPolicy.setMaxAttempts(3);retryTemplate.setBackOffPolicy(backOffPolicy);retryTemplate.setRetryPolicy(retryPolicy);return retryTemplate;
}// 在消息发送时使用重试
rabbitTemplate.setRetryTemplate(retryTemplate);

4.2 性能优化

4.2.1 并发消费者

通过 @RabbitListenerconcurrency参数设置消费者并发数(如 concurrency = "3"),提升消息处理能力。

@RabbitListener(queues = "direct_queue", concurrency = "3",  // 3 个消费者并行处理prefetch = "10"     // 每个消费者预取 10 条消息
)
public void handleDirectMessage(BusinessMessage message) {// 处理逻辑
}
4.2.2 消息批量处理

对于高吞吐量场景,可使用 BatchMessageListenerContainer批量接收消息,减少网络开销。

4.3 安全性增强

4.3.1 访问控制
  • 虚拟主机隔离:为不同业务线创建独立虚拟主机(如 /order/inventory),避免资源干扰。
  • 权限控制:通过 RabbitMQ 管理界面限制用户权限(如仅允许订单服务读写 direct_exchange)。
4.3.2 消息加密

对敏感消息(如用户手机号)进行加密传输,使用 SSL/TLS 加密 RabbitMQ 连接(配置 ssl.enabled=true和证书路径)。

4.4 监控与运维

4.4.1 日志监控

通过 logback-spring.xml配置 RabbitMQ 相关日志级别(如 DEBUG记录消息详情),结合 ELK(Elasticsearch+Logstash+Kibana)集中分析日志。

4.4.2 指标监控

集成 Prometheus + Grafana 监控 RabbitMQ 指标(如队列长度、消息速率、消费者数量),设置告警规则(如队列积压超过 1000 条时触发告警)。


五、总结与展望

本文详细讲解了如何基于 Spring AMQP 实现 RabbitMQ 的核心功能,包括三种消息模式的配置、消息序列化、发送确认与返回机制,以及消费者端的消息处理。在生产环境中,还需关注消息可靠性、性能优化、安全性和监控运维,通过持久化、重试策略、并发消费者等机制保障系统的稳定性。

http://www.dtcms.com/a/354998.html

相关文章:

  • imx6ull-驱动开发篇47——Linux SPI 驱动实验
  • Java全栈工程师的实战面试:从基础到微服务的全面解析
  • 磁力计校准矩阵求解方法解析
  • go grpc使用场景和使用示例
  • python02
  • Codeforces Round 1043 (Div. 3) F. Rada and the Chamomile Valley
  • 02Shell的变量运算以及数据比较
  • 卷积神经网络(一):卷积神经网络基础
  • 基于卷积神经网络 (CNN) 的 MNIST 手写数字识别模型
  • 如果给我们直接创建的类加上索引?和len方法?
  • 深度学习篇---模型参数保存
  • 卷积神经网络实现mnist手写数字集识别案例
  • Apollo-PETRv1演示DEMO操作指南
  • 【Qt】QCryptographicHash 设置密钥(Key)
  • Deeplizard 深度学习课程(四)—— 模型构建
  • jwt原理及Java中实现
  • 海盗王64位dx9客户端修改篇之二
  • 学习Java29天(tcp多发多收)但是无解决客户端启动多个问题
  • ProfiNet 转 Ethernet/IP 柔性产线构建方案:网关技术保护新能源企业现有设备投资
  • LeetCode Hot 100 第7天
  • 第三十天:世界杯队伍团结力问题
  • EF Core 编译模型 / 模型裁剪:冷启动与查询优化
  • QT之双缓冲 (QMutex/QWaitCondition)——读写分离
  • 企业如何管理跨多个系统的主数据?
  • MaxCompute MaxFrame | 分布式Python计算服务MaxFrame(完整操作版)
  • 【Lua】题目小练12
  • 如何实现HTML动态爱心表白效果?
  • 多版本并发控制MVCC
  • 黑马点评|项目日记(day02)
  • C#和Lua相互访问