当前位置: 首页 > news >正文

RabbitMQ——消息发送的双重保障机制

在分布式系统中,确保消息的可靠传输是至关重要的。无论是处理金融交易、用户互动还是后台数据同步,丢失或重复的消息都可能导致严重的问题。为了增强系统的可靠性,我们可以依赖于生产者的重试机制和生产者确认机制。本文将探讨这两种机制如何共同作用来提高发送者的可靠性。

目录

生产者重试机制

生产者确认机制

实现生产者确认

对应的模块添加配置

定义ReturnCallback

定义ConfirmCallback

生产者确认实现效果


生产者重试机制

问题:网络故障,导致与MQ的连接中断。

解决方案:SpringAMQP提供的消息发送时的重试机制。

即:当RabbitTemplate与MQ连接超时后,多次重试。

方案实现如下:

修改publisher模块的application.yml文件,添加如下配置:

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

测试:(故意停掉RabbitMQ服务,验证方案可行性)

效果如下:

解决方案验证成功


生产者确认机制

问题:消息发送到MQ之后丢失的现象。

描述:

 MQ内部处理消息的进程发生了异常

 生产者发送消息到达MQ后未找到Exchange(往往是程序员编码出问题导致)

生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由(往往是程序员编码出问题导致)

解决方案:RabbitMQ提供了生产者消息确认机制。

即:Publisher ConfirmPublisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执

如图所示:

图像解读:

publisher --- exchange2:当消息投递到MQ,但是路由失败时,返回异常信息,同时返回ACK(投递成功)

publisher --- exchange:临时消息投递到了MQ,并且入队成功,返回ACK(投递成功)

publisher --- exchange:持久消息投递到了MQ,并且入队完成持久化,返回ACK (投递成功)

其它情况都会返回NACK(投递失败)


实现生产者确认

对应的模块添加配置

在publisher模块的application.yml中添加配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

publisher-confirm-type有三种模式,推荐使用correlated(回调机制

定义ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback。

我们在publisher模块定义MqConfig配置类,代码如下:

package com.itheima.publisher.config;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
    private final RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("触发return callback,");
                log.debug("exchange: {}", returned.getExchange());
                log.debug("routingKey: {}", returned.getRoutingKey());
                log.debug("message: {}", returned.getMessage());
                log.debug("replyCode: {}", returned.getReplyCode());
                log.debug("replyText: {}", returned.getReplyText());
            }
        });
    }
}

定义ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数,代码如下:

@Test
void testPublisherConfirm() {
    // 1.创建CorrelationData
    CorrelationData cd = new CorrelationData();
    // 2.给Future添加ConfirmCallback
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            // 2.1.Future发生异常时的处理逻辑,基本不会触发
            log.error("send message fail", ex);
        }
        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
            if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
                log.debug("发送消息成功,收到 ack!");
            }else{ // result.getReason(),String类型,返回nack时的异常描述
                log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
            }
        }
    });
    // 3.发送消息
    rabbitTemplate.convertAndSend("user.direct", "q", "hello", cd);
}

生产者确认实现效果

idea控制台展示:

效果解读:

可以看到,由于传递的RoutingKey是错误的,路由失败后,触发了return callback,同时也收到了ack。

当我们修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。

相关文章:

  • Java【多线程】(2)线程属性与线程安全
  • vue2+ele-ui实践
  • Python的循环和条件判断 笔记250303
  • Spring Security简介与使用
  • 大模型辅助火狐浏览器插件开发:网页保存至本地及 GitHub 仓库
  • UNION 和 UNION ALL 的区别:深入解析 SQL 中的合并操作
  • 电源测试系统有哪些可以利用AI工具的科技??
  • Leetcode 54: 螺旋矩阵
  • smolagents学习笔记系列(番外二)Agent+Ollama分析本地图像与文件
  • Linux系列:如何用 C#调用 C方法造成内存泄露
  • 基于Qt的登陆界面设计及记住密码,简易计算器设计
  • Linux网络_应用层自定义协议与序列化_守护进程
  • 二、QT和驱动模块实现智能家居-----5、通过QT控制LED
  • Python 课堂点名桌面小程序
  • spark 虚拟机基本命令(2)
  • 深入解析Java虚拟机(JVM)的核心组成
  • 2025-03-03 学习记录--C/C++-PTA 7-38 数列求和-加强版
  • 探索Elasticsearch:索引的CRUD
  • DeepSeek系列 清华大学-AIGC发展研究3.0版 pdf完整版(附下载)
  • Qt:day1
  • 马上评|家长抱婴儿值护学岗,如何避免“被自愿”?
  • 跨越三十年友情,61岁余隆和60岁齐默尔曼在上海再度合作
  • 普京调整俄陆军高层人事任命
  • 娃哈哈:自4月起已终止与今麦郎的委托代工关系,未来将坚持自有生产模式
  • 小耳朵等来了春天:公益义诊筛查专家走进安徽安庆
  • 2025年中国网络文明大会将于6月10日在安徽合肥举办