当前位置: 首页 > news >正文

基于 Spring Boot 与 RabbitMQ 的分布式消息通信机制设计与实现

1. rabbit MQ 消息分发模式

模式Exchange 类型特点常见应用场景
1. 简单队列模式(Simple Queue)默认(无 Exchange)一个生产者 → 一个队列 → 一个消费者测试或点对点任务
2. 工作队列模式(Work Queues)direct多个消费者共同竞争同一队列中的消息,实现负载均衡异步任务处理、后台批处理
3. 发布/订阅模式(Publish/Subscribe)fanout生产者将消息广播到多个队列,所有消费者都会收到日志广播、系统通知、推送消息
4. 路由模式(Routing)direct消息根据 routing key 精确匹配路由到队列不同类型日志分发(info、error、warn)
5. 通配符模式(Topics)topic支持模糊匹配,如 log.*log.#复杂的主题订阅系统
6. RPC 模式临时 direct Exchange模拟远程调用机制,通过请求队列和回调队列分布式任务同步请求场景

1.1 工作队列模式

1.2 发布订阅模式

1.3 路由模式

1.4 主题模式

2. 消息可靠性投递

阶段可能失败原因对应机制解决方案
生产者 → Exchange网络异常、Exchange 不存在Publisher Confirm回调确认+重发
Exchange → Queue路由键错误、队列不存在Return Callback记录日志/死信转发
Queue 存储非持久化、Broker 崩溃Message Persistence持久化配置
Queue → Consumer消费异常、程序崩溃ACK/NACK 手动确认重试、DLX
全流程异常处理各阶段累积问题死信队列 DLX后续分析/人工干预

2.1 消息没有发送到消息队列

解决思路A:在生产者端进行确认,具体操作中我们会分别针对交换机和队列来确认,如果没有成功发送到消息队列服务器上,那就可以尝试重新发送

解决思路B:为目标交换机指定备份交换机,但目标交换机投递失败时,把消息投递至备份交换机

2.2 消息队列服务器宕机导致内存中消息丢失

解决思路:消息持久化到硬盘,哪怕服务器重启也不会导致消息丢失。默认配置是支持持久化

2.3 消费端宕机或抛异常导致消息没有成功被消费

消费端消费消息成功,给服务器返回ACK信息,然后消息队列删除该消息

消费端消费消息失败,给服务器端返回NACK消息,同时把消息恢复为带消费端状态,这样就可以再次取回消息,重试一次。

3 死信

  • 概念:当一个消息无法被消费,它就变成了死信。
  • 死信产生的原因大致有下面三种
    • 拒绝:消费者拒接消息,使用basicNack()/basicReject(),并且不把消息重新放入原目标队列,即requeue=false
    • 溢出:队列中消息数量到达限制。比如队列最大只能存储10条消息,且现在已经存储了10条,此时如果再发送一条消息进来,根据先进先出原则,队列中最早的消息会变成死信。
    • 超时:消息到达超时时间未被消费。
  • 死信的处理方式大致有下面三种
    • 丢弃:对不重要的消息直接丢弃,不做处理。
    • 入库:把死信写入数据库,日后处理。
    • 监听:消息变成死信后进入死信队列,专门设置消费端监听死信队列,做后续处理(通常采用)。

3.1 死信的绑定

正常链路

生产者 → 正常交换机(Normal Exchange)→ 正常队列(Normal Queue)→ 消费者

死信链路

Normal Queue(死信触发)→ 死信交换机(Dead Exchange)→ 死信队列(Dead Queue)

3.1.1 创建两套完整的路由体系

需要先创建:

  • 一个普通交换机(如 tcp.data.exchange);

  • 一个普通队列(如 tcp.data.queue);

  • 一个死信交换机(如 dead.exchange);

  • 一个死信队列(如 dead.queue)。

3.1.2 在普通队列上绑定死信交换机

args.put("x-dead-letter-exchange", "dead.exchange");   // 指定死信交换机
args.put("x-dead-letter-routing-key", "dead.key");     // 指定死信路由键

这样,当普通队列中的消息变成“死信”时,RabbitMQ 会自动把它重新路由到 dead.exchange,再由死信交换机转发到 dead.queue

package com.wulang.tcp.config;import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration // ioc容器初始化时加载这个配置类进行对象创建
@Slf4j
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {public static final String NORMAL_EXCHANGE = "tcp.data.exchange";public static final String NORMAL_QUEUE = "tcp.data.queue";public static final String ROUTING_KEY = "tcp.data.key";public static final String DEAD_EXCHANGE = "dead.exchange";public static final String DEAD_QUEUE = "dead.queue";@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 对rabbitTemplate的功能进行增强,用在发送端rabbit配置中*/@PostConstruct // 对象创建之后立即执行的注解public void initRabbitTemplate() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}/*** 声明回调函数来接收RabbitMQ服务器返回的确认信息* 确认消息是否发送到交换机*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {// 消息发送到交换机成功或者失败时调用这个方法log.debug("发送交换机确认 confirm() 回调函数 CorrelationData: {}", correlationData);log.debug("发送交换机确认 confirm() 回调函数 ack: {}", ack);log.debug("发送交换机确认 confirm() 回调函数 cause: {}", cause);}/*** 声明回调函数来接收RabbitMQ服务器返回的确认信息* 确认消息是否发送到队列*/@Overridepublic void returnedMessage(@NotNull ReturnedMessage returnedMessage) {// 发送到队列失败时才调用这个方法log.debug("发送队列失败 消息主体:{}", new String(returnedMessage.getMessage().getBody()));log.debug("发送队列失败 应答码:{}", returnedMessage.getReplyCode());log.debug("发送队列失败 描述:{}", returnedMessage.getReplyText());log.debug("发送队列失败 消息使用的交换机 exchange:{}", returnedMessage.getExchange());log.debug("发送队列失败 消息使用的路由键 routing key:{}", returnedMessage.getRoutingKey());}// ========================= 以下为新增部分:设置超时、容量限制、死信队列 ========================= //@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true); // ✅ 应用启动时自动声明交换机、队列return rabbitAdmin;}/*** 声明普通交换机*/@Beanpublic DirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE);}/*** 声明死信交换机*/@Beanpublic DirectExchange deadExchange() {return new DirectExchange(DEAD_EXCHANGE);}/*** 声明普通队列,设置:* 1. 消息TTL超时时间(例如10秒)* 2. 最大队列长度(超过自动丢弃最早的消息或转入死信)* 3. 死信交换机绑定,用于接收过期或失败消息*/@Beanpublic Queue normalQueue() {Map<String, Object> args = new HashMap<>();args.put("x-message-ttl", 10000); // 消息10秒未被消费则过期args.put("x-max-length", 100); // 队列最多容纳100条消息args.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 设置死信交换机args.put("x-dead-letter-routing-key", "dead.key"); // 设置死信路由键return new Queue(NORMAL_QUEUE, true, false, false, args);}/*** 声明死信队列,用于接收过期、被拒绝或队列满的消息*/@Beanpublic Queue deadQueue() {return new Queue(DEAD_QUEUE);}/*** 普通队列绑定普通交换机*/@Beanpublic Binding normalBinding() {return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(ROUTING_KEY);}/*** 死信队列绑定死信交换机*/@Beanpublic Binding deadBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead.key");}
}

3.2 消费端拒绝消息

3.2.1 配置手动ACK

spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual  # 开启手动确认模式

也可以直接在方法上进行注解

@RabbitListener(queues = {QUEUE_NAME}, ackMode = "MANUAL")

3.2.2 手动拒绝

操作场景处理方式RabbitMQ行为
业务正常完成basicAck()消息被确认并移除
业务异常但希望重试basicReject(deliveryTag, true)消息重新入队等待下一次消费
业务异常且不希望重试basicReject(deliveryTag, false)消息拒绝并转入死信队列
批量拒绝basicNack(deliveryTag, true, false)多条消息批量拒绝并进入死信队列

在手动模式下,开发者可根据业务处理结果显式调用:

  • channel.basicAck():确认消息已成功消费;

  • channel.basicReject()channel.basicNack():拒绝消费该消息。

方法名是否支持批量是否能重新入队常见用途
basicReject(long deliveryTag, boolean requeue)❌ 否✅ 可控制单条消息拒绝处理
basicNack(long deliveryTag, boolean multiple, boolean requeue)✅ 可✅ 可控制批量拒绝消息
package com.wulang.device.rabbitmq.listener;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@Slf4j
public class TCPMessageListener {public static final String QUEUE_NAME = "tcp.data.queue";@RabbitListener(queues = {QUEUE_NAME}, ackMode = "MANUAL")public void processMessage(String dataString, Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {log.debug("消费端接收到消息: {}", dataString);// ======== 模拟业务处理 ========
//            Integer a = 1/0;// 业务处理成功 -> 手动确认channel.basicAck(deliveryTag, false); //告诉队列可以删除 true 确认当前 deliveryTag 之前的所有未确认消息 false:只确认当前这一条消息(最常用)log.debug("消息处理成功,已ACK");} catch (Exception e) {log.error("消息处理异常:{}", e.getMessage(), e);// 拒绝并不重新入队(false),进入死信队列channel.basicReject(deliveryTag, false); //true:重新入队,false:丢弃或进入死信队列log.warn("消息已拒绝并进入死信队列");}}
}

http://www.dtcms.com/a/593015.html

相关文章:

  • 个人网站搭建详细步骤郑州网站建设流程
  • Java 之详解字符串拼接(十四)
  • Redis集群详解
  • 6 ElasticsearchRestTemplate
  • 第3章:矢量与栅格数据模型
  • java 面试问题
  • Elasticsearch-3--什么是Lucene?
  • 01-SQL 语句的关键字顺序
  • 树莓派Raspberry Pi 5的汉化
  • 小红书推荐系统(牛客)
  • 做网站的猫腻网站的链接结构怎么做
  • 【强化学习】DQN 算法
  • 大模型-详解 Vision Transformer (ViT) (2
  • 学习react第一天
  • 2025年电子会计档案管理软件深度介绍及厂商推荐
  • io_uring 避坑指南
  • (附源码)基于Spring boot的校园志愿服务管理系统的设计与实现
  • deepseek回答 如何用deepseek训练出一个我的思路
  • 3ds Max材质高清参数设置:10分钟提升渲染真实感
  • MyBatis 插件
  • 甘肃省城乡住房建设厅网站首页微商软件自助商城
  • 一文掌握,kanass安装与配置
  • C# ASP.NET MVC 数据验证实战:View 层双保险(Html.ValidationMessageFor + jQuery Validate)
  • 工信部 网站 邮箱内容管理系统做网站
  • arcgis用累计值进行分级
  • 生理学实验系统 生理学实验系统软件 集成化生物信号采集与处理系统生物信号采集处理系统 生理机能实验处理系统
  • 环境变量与程序地址空间
  • Node.js的主要应用场景和简单例子
  • 做视频解析网站是犯法的么360优化大师
  • 大网站cn域名淘宝店铺装修模板免费下载