基于 Spring Boot 与 RabbitMQ 的分布式消息通信机制设计与实现
1. rabbit MQ 消息分发模式
| 模式 | Exchange 类型 | 特点 | 常见应用场景 |
|---|---|---|---|
| 1. 简单队列模式(Simple Queue) | 默认(无 Exchange) | 一个生产者 → 一个队列 → 一个消费者 | 测试或点对点任务 |
| 2. 工作队列模式(Work Queues) | direct | 多个消费者共同竞争同一队列中的消息,实现负载均衡 | 异步任务处理、后台批处理 |
| 3. 发布/订阅模式(Publish/Subscribe) | fanout | 生产者将消息广播到多个队列,所有消费者都会收到 | 日志广播、系统通知、推送消息 |
| 4. 路由模式(Routing) | direct | 消息根据 routing key 精确匹配路由到队列 | 不同类型日志分发(info、error、warn) |
| 5. 通配符模式(Topics) | topic | 支持模糊匹配,如 log.* 或 log.# | 复杂的主题订阅系统 |
| 6. RPC 模式 | 临时 direct Exchange | 模拟远程调用机制,通过请求队列和回调队列 | 分布式任务同步请求场景 |
1.1 工作队列模式

1.2 发布订阅模式

1.3 路由模式

1.4 主题模式

2. 消息可靠性投递
| 阶段 | 可能失败原因 | 对应机制 | 解决方案 |
|---|---|---|---|
| 生产者 → Exchange | 网络异常、Exchange 不存在 | Publisher Confirm | 回调确认+重发 |
| Exchange → Queue | 路由键错误、队列不存在 | Return Callback | 记录日志/死信转发 |
| Queue 存储 | 非持久化、Broker 崩溃 | Message Persistence | 持久化配置 |
| Queue → Consumer | 消费异常、程序崩溃 | ACK/NACK 手动确认 | 重试、DLX |
| 全流程异常处理 | 各阶段累积问题 | 死信队列 DLX | 后续分析/人工干预 |

2.1 消息没有发送到消息队列
解决思路A:在生产者端进行确认,具体操作中我们会分别针对交换机和队列来确认,如果没有成功发送到消息队列服务器上,那就可以尝试重新发送
解决思路B:为目标交换机指定备份交换机,但目标交换机投递失败时,把消息投递至备份交换机
2.2 消息队列服务器宕机导致内存中消息丢失
解决思路:消息持久化到硬盘,哪怕服务器重启也不会导致消息丢失。默认配置是支持持久化
2.3 消费端宕机或抛异常导致消息没有成功被消费
消费端消费消息成功,给服务器返回ACK信息,然后消息队列删除该消息
消费端消费消息失败,给服务器端返回NACK消息,同时把消息恢复为带消费端状态,这样就可以再次取回消息,重试一次。
3 死信
- 概念:当一个消息无法被消费,它就变成了死信。
- 死信产生的原因大致有下面三种:
- 拒绝:消费者拒接消息,使用
basicNack()/basicReject(),并且不把消息重新放入原目标队列,即requeue=false。 - 溢出:队列中消息数量到达限制。比如队列最大只能存储10条消息,且现在已经存储了10条,此时如果再发送一条消息进来,根据先进先出原则,队列中最早的消息会变成死信。
- 超时:消息到达超时时间未被消费。
- 拒绝:消费者拒接消息,使用
- 死信的处理方式大致有下面三种:
- 丢弃:对不重要的消息直接丢弃,不做处理。
- 入库:把死信写入数据库,日后处理。
- 监听:消息变成死信后进入死信队列,专门设置消费端监听死信队列,做后续处理(通常采用)。
3.1 死信的绑定
正常链路
生产者 → 正常交换机(Normal Exchange)→ 正常队列(Normal Queue)→ 消费者
死信链路
Normal Queue(死信触发)→ 死信交换机(Dead Exchange)→ 死信队列(Dead Queue)
3.1.1 创建两套完整的路由体系
需要先创建:
一个普通交换机(如
tcp.data.exchange);一个普通队列(如
tcp.data.queue);一个死信交换机(如
dead.exchange);一个死信队列(如
dead.queue)。
3.1.2 在普通队列上绑定死信交换机
args.put("x-dead-letter-exchange", "dead.exchange"); // 指定死信交换机
args.put("x-dead-letter-routing-key", "dead.key"); // 指定死信路由键
这样,当普通队列中的消息变成“死信”时,RabbitMQ 会自动把它重新路由到 dead.exchange,再由死信交换机转发到 dead.queue。
package com.wulang.tcp.config;import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration // ioc容器初始化时加载这个配置类进行对象创建
@Slf4j
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {public static final String NORMAL_EXCHANGE = "tcp.data.exchange";public static final String NORMAL_QUEUE = "tcp.data.queue";public static final String ROUTING_KEY = "tcp.data.key";public static final String DEAD_EXCHANGE = "dead.exchange";public static final String DEAD_QUEUE = "dead.queue";@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 对rabbitTemplate的功能进行增强,用在发送端rabbit配置中*/@PostConstruct // 对象创建之后立即执行的注解public void initRabbitTemplate() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}/*** 声明回调函数来接收RabbitMQ服务器返回的确认信息* 确认消息是否发送到交换机*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {// 消息发送到交换机成功或者失败时调用这个方法log.debug("发送交换机确认 confirm() 回调函数 CorrelationData: {}", correlationData);log.debug("发送交换机确认 confirm() 回调函数 ack: {}", ack);log.debug("发送交换机确认 confirm() 回调函数 cause: {}", cause);}/*** 声明回调函数来接收RabbitMQ服务器返回的确认信息* 确认消息是否发送到队列*/@Overridepublic void returnedMessage(@NotNull ReturnedMessage returnedMessage) {// 发送到队列失败时才调用这个方法log.debug("发送队列失败 消息主体:{}", new String(returnedMessage.getMessage().getBody()));log.debug("发送队列失败 应答码:{}", returnedMessage.getReplyCode());log.debug("发送队列失败 描述:{}", returnedMessage.getReplyText());log.debug("发送队列失败 消息使用的交换机 exchange:{}", returnedMessage.getExchange());log.debug("发送队列失败 消息使用的路由键 routing key:{}", returnedMessage.getRoutingKey());}// ========================= 以下为新增部分:设置超时、容量限制、死信队列 ========================= //@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true); // ✅ 应用启动时自动声明交换机、队列return rabbitAdmin;}/*** 声明普通交换机*/@Beanpublic DirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE);}/*** 声明死信交换机*/@Beanpublic DirectExchange deadExchange() {return new DirectExchange(DEAD_EXCHANGE);}/*** 声明普通队列,设置:* 1. 消息TTL超时时间(例如10秒)* 2. 最大队列长度(超过自动丢弃最早的消息或转入死信)* 3. 死信交换机绑定,用于接收过期或失败消息*/@Beanpublic Queue normalQueue() {Map<String, Object> args = new HashMap<>();args.put("x-message-ttl", 10000); // 消息10秒未被消费则过期args.put("x-max-length", 100); // 队列最多容纳100条消息args.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 设置死信交换机args.put("x-dead-letter-routing-key", "dead.key"); // 设置死信路由键return new Queue(NORMAL_QUEUE, true, false, false, args);}/*** 声明死信队列,用于接收过期、被拒绝或队列满的消息*/@Beanpublic Queue deadQueue() {return new Queue(DEAD_QUEUE);}/*** 普通队列绑定普通交换机*/@Beanpublic Binding normalBinding() {return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(ROUTING_KEY);}/*** 死信队列绑定死信交换机*/@Beanpublic Binding deadBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead.key");}
}3.2 消费端拒绝消息
3.2.1 配置手动ACK
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 开启手动确认模式
也可以直接在方法上进行注解
@RabbitListener(queues = {QUEUE_NAME}, ackMode = "MANUAL")
3.2.2 手动拒绝
| 操作场景 | 处理方式 | RabbitMQ行为 |
|---|---|---|
| 业务正常完成 | basicAck() | 消息被确认并移除 |
| 业务异常但希望重试 | basicReject(deliveryTag, true) | 消息重新入队等待下一次消费 |
| 业务异常且不希望重试 | basicReject(deliveryTag, false) | 消息拒绝并转入死信队列 |
| 批量拒绝 | basicNack(deliveryTag, true, false) | 多条消息批量拒绝并进入死信队列 |
在手动模式下,开发者可根据业务处理结果显式调用:
channel.basicAck():确认消息已成功消费;channel.basicReject()或channel.basicNack():拒绝消费该消息。
| 方法名 | 是否支持批量 | 是否能重新入队 | 常见用途 |
|---|---|---|---|
basicReject(long deliveryTag, boolean requeue) | ❌ 否 | ✅ 可控制 | 单条消息拒绝处理 |
basicNack(long deliveryTag, boolean multiple, boolean requeue) | ✅ 可 | ✅ 可控制 | 批量拒绝消息 |
package com.wulang.device.rabbitmq.listener;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@Slf4j
public class TCPMessageListener {public static final String QUEUE_NAME = "tcp.data.queue";@RabbitListener(queues = {QUEUE_NAME}, ackMode = "MANUAL")public void processMessage(String dataString, Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {log.debug("消费端接收到消息: {}", dataString);// ======== 模拟业务处理 ========
// Integer a = 1/0;// 业务处理成功 -> 手动确认channel.basicAck(deliveryTag, false); //告诉队列可以删除 true 确认当前 deliveryTag 之前的所有未确认消息 false:只确认当前这一条消息(最常用)log.debug("消息处理成功,已ACK");} catch (Exception e) {log.error("消息处理异常:{}", e.getMessage(), e);// 拒绝并不重新入队(false),进入死信队列channel.basicReject(deliveryTag, false); //true:重新入队,false:丢弃或进入死信队列log.warn("消息已拒绝并进入死信队列");}}
}
