当前位置: 首页 > news >正文

微服务消息队列之RabbitMQ,深入了解



在之前的基础篇中,我们探讨了如何在项目中使用消息队列(MQ)进行消息传递。那么,在传递过程中,消息是否会丢失,从而导致后续服务无法正常执行?答案是肯定的。由于消息传递本质上依赖网络通信,网络的不确定性天然存在消息丢失的风险。因此,保证消息的可靠性至关重要。

要解决这个问题,首先需要分析哪些环节可能导致消息丢失?

1.MQ丢失的情况分析

我们知道消息队列在项目中执行的过程是这样的:
在这里插入图片描述

消息从生产者到消费者的每一步都可能导致消息丢失,可以从三种情况分析:

  1. 生产者发送阶段
    • 生产者连接MQ失败
    • 消息到达MQ后未找到Exchange
    • 消息到达Exchange后未匹配Queue
    • MQ进程异常
  2. MQ存储阶段(Queue持久化)
    • 消息到达MQ,保存到队列后,尚未消费就突然宕机
  3. 消费者处理消息阶段
    • 消息接收后尚未处理突然宕机
    • 消息接收后处理过程中抛出异常

从上面情况分析,我们可以发现主要是三个阶段会发生消息丢失,现在我们具有问题具体分析:

2.如何保证发送者消息的可靠?

2.1 生产者连接MQ失败——生产者重试机制

问题本质:当生产者因网络抖动、MQ集群短暂不可用等原因连接失败时

解决方法:生产者重试机制

2.1.1 生产者重试机制
1.修改生产者模块的配置文件application.yaml,添加重试机制
spring:rabbitmq:host: 127.0.0.1 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123456 # 密码connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数
2.关闭虚拟机,构造生成者连接不到MQ情况
docker stop mq
3.启动测试类,观察是否重试

在这里插入图片描述

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。

如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

2.2 其余三种情况——生产者确认机制

在网络繁忙场景下,重试机制能解决大部分生产者到MQ的连接级消息丢失,但仍存在消息成功抵达MQ服务端后丢失的少数情况,例如:

  • 消息到达MQ后未找到Exchange
  • 消息到达Exchange后未匹配Queue
  • MQ进程处理消息时异常崩溃

针对这些场景,RabbitMQ提供了生产者消息确认机制,通过两种独立但互补的机制保障可靠性:

  1. Publisher Confirm:确认消息是否被MQ成功接收(持久化到磁盘/写入内存队列)
  2. Publisher Return:捕获因路由失败(无Exchange/无匹配Queue)被MQ丢弃的消息

当生产者启用这两种机制后,MQ会通过异步回调向生产者返回明确的操作结果(成功确认或失败原因)。

具体如图所示:

在这里插入图片描述

流程总结如下:

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack,代表投递成功。为什么失败还返回投递成功?这是因为这是人为导致的,不是网络链路导致的
  • 临时消息投递到了MQ,并且入队成功,返回ack,告知投递成功!
  • 持久消息投递到了MQ,并且入队完成持久化,返回ack,告知投递成功
  • 其他情况都会返回nack,告知投递失败

其中acknack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。

默认两种机制都是关闭状态,需要通过配置文件来开启。

2.2.1 实现生产者确认
1.开启生产者确认

在publisher模块的application.yaml中添加配置:

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执
2.定义ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

在这里插入图片描述

package com.itheima.publisher.config;import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;/*** @program: mq-demo* @description:* @author: WangXin* @create: 2025-08-01 21:21**/
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}
3.定义ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:

package com.itheima.publisher.amqp;import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.util.concurrent.ListenableFutureCallback;import javax.annotation.Resource;/*** @program: mq-demo* @description:* @author: WangXin* @create: 2025-08-01 21:24**/
@SpringBootTest
@Slf4j
public class test {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid testPublisherConfirm() {// 1.创建CorrelationDataCorrelationData correlationData = new CorrelationData();// 2.给Future添加ConfirmCallbackcorrelationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onSuccess(CorrelationData.Confirm result) {if(result.isAck()){log.debug("发送消息成功,收到 ack!");}else{log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}@Overridepublic void onFailure(Throwable ex) {log.error("send message fail", ex);}});// 3.发送消息rabbitTemplate.convertAndSend("hmall.direct", "blue", "hello", correlationData);}}
5.测试结果

先把日志信息改成debug

在这里插入图片描述

先启动消费者,创建消息通道**(这里是采用注解新建Direct类型的交换机,基础篇定义过)**,再启动测试类,正常结果:

在这里插入图片描述

当我填错router key的结果

 rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", correlationData);

在这里插入图片描述

3. 如何保证MQ的可靠性?

3.1 数据持久化:防止消息丢失的核心保障

问题背景

消息到达MQ后,若未及时保存,仍可能因服务重启或崩溃导致丢失。RabbitMQ默认将数据存储在内存中(临时数据),需手动配置持久化以实现可靠性。

持久化三要素
  1. 交换机持久化
    • 控制台操作:Exchanges页面创建交换机时,设置DurabilityDurable
    • 效果:MQ重启后交换机配置保留
  2. 队列持久化
    • 控制台操作:Queues页面创建队列时,设置DurabilityDurable
    • 效果:MQ重启后队列配置及元数据保留
  3. 消息持久化
    • 控制台操作:发送消息时添加properties参数,设置delivery_mode=2
    • 效果:消息内容写入磁盘,MQ重启后仍存在
生产者确认与持久化的协同
  • 开启生产者确认(Publisher Confirm)时,MQ在消息完成持久化后发送ACK回执
  • 性能优化:MQ批量持久化消息(约100ms间隔),减少IO操作
  • 建议:生产者确认采用异步处理,避免阻塞业务线程

3.2 LazyQueue模式

传统队列的问题
  1. 内存瓶颈
    • 消费者故障/网络问题 → 消息积压
    • 消息量激增 → 内存占用飙升
  2. PageOut阻塞
    • 达到内存预警 → 消息刷盘(PageOut)
    • PageOut期间队列完全阻塞
LazyQueue解决方案(3.6.0+)
特性传统队列LazyQueue
存储位置内存直接存入磁盘
消息加载即时加载消费时加载(懒加载)
内存占用极低
抗堆积能力差(易触发PageOut)强(支持百万消息)

版本提示:3.12+版本默认所有队列为Lazy模式

配置方式
  1. 控制台配置
    • 创建队列时添加参数:x-queue-mode=lazy
  2. 代码配置(Spring AMQP)
// 方式1:QueueBuilder
@Bean
public Queue lazyQueue() {return QueueBuilder.durable("lazy.queue").lazy()  // 关键配置.build();
}// 方式2:注解声明
@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg) {// 消息处理逻辑
}
  1. 更新现有队列为Lazy模式
# 命令行配置Policy
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
  • Lazy:策略名称(自定义)
  • "^lazy-queue$":正则匹配队列名
  • '{"queue-mode":"lazy"}':设置队列模式
  • --apply-to queues:策略作用对象
控制台Policy配置
  1. 进入Admin → Policies页面
  2. 添加新Policy:
    • Pattern^lazy-queue$(队列名正则)
    • Definitionqueue-mode=lazy
    • Apply to:Queues

4.消费者如何保证可靠性?

1. 消费者确认机制(ACK机制)

核心目的:让RabbitMQ知晓消费者处理状态,决定消息是否重新投递

三种确认模式

模式特点适用场景
none自动ACK,消息立即删除(高风险)测试环境,非关键业务
manual需手动调用API发送ack/nack(灵活但代码侵入性强)需精细控制确认时机的复杂业务
autoSpring自动处理(推荐)
业务异常→nack
消息异常→reject
大多数业务场景

关键异常类型(触发reject):

  • MessageConversionException(消息转换失败)
  • MethodArgumentNotValidException(参数校验失败)
  • ClassCastException(类型转换错误)
  • 其他不可恢复异常

最佳实践:生产环境使用auto模式,配合异常分类处理

2. 失败重试机制

配置示例

spring:rabbitmq:listener:simple:retry:enabled: true           # 开启重试initial-interval: 1000  # 初始间隔(ms)multiplier: 2           # 间隔倍数max-attempts: 3         # 最大重试次数stateless: true         # 无状态重试

重试行为

  • 本地重试(非无限requeue)
  • 达到最大重试次数后抛出AmqpRejectAndDontRequeueException
  • 最终返回reject,消息被丢弃
3. 失败处理策略

三级处理策略

重试耗尽
默认丢弃
重新入队
转发死信队列

推荐方案:RepublishMessageRecoverer

@Bean
public MessageRecoverer republishRecoverer(RabbitTemplate rabbitTemplate) {// 将失败消息路由到error.direct交换机的error队列return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); 
}

死信队列配置

@Bean
public DirectExchange errorExchange() {return new DirectExchange("error.direct");
}@Bean
public Queue errorQueue() {return new Queue("error.queue", true);
}@Bean
public Binding errorBinding() {return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");
}
4. 业务幂等性保障

双重防护策略

① 唯一消息ID方案

@Bean
public MessageConverter messageConverter() {Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();converter.setCreateMessageIds(true); // 启用消息ID生成return converter;
}
  • 消费者记录已处理消息ID
  • 重复消息直接跳过

② 业务状态判断(推荐)

public void markOrderPaySuccess(Long orderId) {// 原子操作:仅当状态为未支付时才更新lambdaUpdate().set(Order::getStatus, 2).set(Order::getPayTime, now()).eq(Order::getId, orderId).eq(Order::getStatus, 1) // 关键幂等条件.update();
}

SQL等效UPDATE order SET status=2 WHERE id=? AND status=1

5. 兜底方案(最终一致性保障)

主动查询补偿机制

交易服务支付服务订单数据库定时任务(每20秒)返回支付状态更新为已支付alt[支付成功但状态未更新]交易服务支付服务订单数据库

关键设计

  • 定时任务扫描超时未支付订单
  • 主动查询第三方支付状态
  • 基于乐观锁更新订单状态
  • 报警机制监控补偿执行情况

全链路可靠性保障体系

确认机制
持久化+LazyQueue
ACK机制
本地重试
死信队列
幂等校验
定时任务
生产者
RabbitMQ
磁盘存储
消费者
人工处理
业务数据库
支付状态补偿

核心要点

  1. 三阶段防护

    • 生产者:重试机制 + 确认机制
    • MQ:持久化 + LazyQueue
    • 消费者:ACK + 重试 + 死信队列
  2. 幂等性设计

    • 更新操作必须带状态校验
    • 关键业务使用原子操作
  3. 最终一致性

    • 死信队列人工干预
    • 定时任务主动补偿
    • 全链路监控报警

通过以上措施,可确保消息系统达到99.99%的可靠性,即使在极端故障情况下也能保证业务数据的最终一致性。

http://www.dtcms.com/a/309067.html

相关文章:

  • 逻辑斯蒂回归的模型优化
  • IO流-文件实例
  • MySQL--组从复制的详解及功能演练
  • 数据赋能(371)——数据挖掘——概述
  • java的冒泡排序算法
  • 从O(n²)到O(n log n):深度剖析快速排序的内存优化与cache-friendly实现
  • Java Map和Set
  • Vue 3.5 defineModel:让组件开发效率提升 10 倍
  • 自行实现log2对数运算
  • Pydantic模块学习
  • TDengine 中 TDgp 中添加机器学习模型
  • AT6668B芯片说明书
  • unity学习——视觉小说开发(一)
  • 51单片机入门:模块化编程
  • 用 TensorFlow 1.x 快速找出两幅图的差异 —— 完整实战与逐行解析 -Python程序图片找不同
  • forceStop流程会把对应进程的pendingIntent给cancel掉
  • ceph 14.2.22 nautilus Balancer 数据平衡
  • 通过CISSP考试,共答到第127题
  • 雷达微多普勒特征代表运动中“事物”的运动部件。
  • 机械手弧焊电源气体流量优化方法
  • 算法:分治-快速排序
  • IO流File类的基本使用
  • 前端开发(HTML,CSS,VUE,JS)从入门到精通!第二天(CSS)
  • 《n8n基础教学》第三节:模拟一个自动化场景
  • CSS的2D转换
  • 【Shell脚本自动化编写——报警邮件,检查磁盘,web服务检测】
  • 了解Reddit自动化 社区营销更精准
  • CSS组件化样式新篇章:@scope
  • vi/vim跳转到指定行命令
  • 机器学习第二课之逻辑回归(二)LogisticRegression