当前位置: 首页 > news >正文

MQ 消息持久化方案

MQ 消息持久化方案

1. RabbitMQ 发送与消费消息的模型

2. 消息丢失的几种情况?

  1. 生产者发送消息未到达交换机
  2. 消息到达交换机,没有正确路由到队列
  3. MQ 宕机,队列中的消息不见了
  4. 消费者收到消息,还没消费,消费者宕机

3. 如何保证消息不丢失?

3.1 生产者确认机制

  1. publisher-confirm

    1. 消息成功投递到交换机,返回 ack

    2. 消息未成功投递到交换机,返回 nack

      记录消息以及交换机等相关信息到数据库,后期可以编写任务去补偿发送

  2. publisher-return

    1. 未正确到达队列,返回 ack 及失败原因

      记录消息以及交换机等相关信息到数据库,后期可以编写任务去补偿发送

图示


实现

根据实际情况修改!!!

  1. 配置文件

    spring:
      rabbitmq:
        host: 192.168.200.130 # 虚拟机 IP
        port: 5672 # 端口
        virtual-host: / # MQ 的虚拟主机
        username: username
        password: password
        publisher-confirm-type: correlated
        publisher-returns: true # 开启 publisher-returns
        template:
        	mandatory: true
    

    参数说明:

    • publish-confirm-type:开启 publisher-confirm
      • none:关闭 confirm 机制
      • simple:同步阻塞等待 MQ 的回执(回调方法)
      • correlated:MQ 异步回调返回回执
    • template.mandatory:定义消息路由失败时的策略。
      • true:调用 ReturnCallback
      • false:则直接丢弃消息
  2. 定义 ConfirmCallback

    ConfirmCallback 可以在发送消息时指定,因为每个业务处理 confirm 成功或失败的逻辑不一定相同。

    public void testSendMessage2SimpleQueue() throws InterruptedException {
      // 1 消息体
      String message = "hello, spring amqp!";
      // 2 全局唯一的消息 ID,需要封装到 CorrelationData中
      CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
      // 3 添加 callback
      correlationData.getFuture().addCallback(
        result -> {
          if(result.isAck()) {
            log.debug("消息发送成功, ID:{}", correlationData.getId());
          } else {
            log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
          }
        },
        ex -> log.error("消息发送异常, ID:{}, 原因{}", correlationData.getId(), ex.getMessage())
      );
      // 4 发送消息
      rabbitTemplate.convertAndSend("", "simple.queue", message, correlationData);
    
      // 休眠一会儿,等待 ack 回执
      Thread.sleep(2000);
    }
    
  3. 定义 Return 回调

    每个 RabbitTemplate 只能配置一个 ReturnCallback,因此需要在项目加载时配置。

    @Slf4j
    @Configuration
    public class CommonConfig implements ApplicationContextAware {
      @Override
      public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取 RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置 ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
          // 投递失败,记录日志
          log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
    replyCode, replyText, exchange, routingKey, message.toString());
          // 如果有业务需要,可以重发消息
        });
      }
    }
    

3.2 持久化机制

  1. 交换机持久化:

    默认就是持久化,durable 默认就是 true

  2. 队列持久化

    默认就是持久化,durable 默认就是true

  3. 消息持久化

    默认就是持久化。在发送消息时,使用 Message 对象,并设置 delivery-mode 为持久化

3.3 消费者 ack 机制

ack 取值情况:

  1. none:只要消息到达消费者,消费者直接返回 ack 给 MQ

    MQ 收到 ack,会把队列中的消息删除,消息可能会丢失

    • 消费者配置

      spring:
        rabbitmq:
          listener:
            simple:
              acknowledge-mode: none # 关闭 ack
      
  2. manual:手动 ack

    1. 消费成功,调用 API 给 MQ 返回 ack
    2. 消费失败,调用 API 给 MQ 返回 nack,并且让消息重回队列

    消费者配置

    spring:
      rabbitmq:
        listener:
          simple:
            acknowledge-mode: manual  # 手动 ack
    

    测试代码:

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) {
      try {
        // 从 redis 获取一个 retry_count >= 3 直接记录日志,不重回队列,中断操作 return
        log.warn("消费者接收到 simple.queue 的消息:{}", msg);
        int i = 1 / 0;
        log.info("消息成功消费了 ---> SUCCESS");
        // 手动 ack
        // 可以使用 org.springframework.amqp.core.Messagee 拿到 deLiveryTag
        channel.basicAck(deliveryTag, false);
      } catch (Exception e) {
        e.printStackTrace();
        try {
          // 返回 nack,并且让消息重回队列
          channel.basicNack(deliveryTag, false, true);
          Thread.sleep(1000);
          log.error("消息消费失败,重回队列-->");
          // 向 redis 中设置值
          // redisTemplate.opsForValue().incr(retry_count)
        } catch (Exception ex) {
          ex.printStackTrace();
        }
      }
    }
    
  3. auto:自动 ack。消费消息不出异常,返回 ack 给 MQ。消费消息出异常了,返回 nack,把消息重回队列

    1. 本地重试

      spring:
        rabbitmq:
          listener:
            simple:
              retry:
                enabled: true # 开启消费者失败重试
                initial-interval: 1000 # 初始的失败等待时长为1秒
                multiplier: 2 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
                max-attempts: 3 # 最大重试次数
                stateless: true # true 无状态;false 有状态。如果业务中包含事务,这里改为 false
      

      达到重试次数后,还是失败,则返回 ack,不 requeue。MQ 会删除队列消息

    2. 失败策略

      1. RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息。默认方式
      2. ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队
      3. RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
    3. 使用 RepublishMessageRecoverer

      需求:把消息投递到失败的交换机,路由队列。记录日志,将来人工干预

      实现

      1. 定义错误交换机、队列、绑定关系。定义 RepublishMessageRecoverer
      2. 监听错误队列

4. 总结

  1. 创建交换机、队列、消息进行持久化

    1. 交换机、队列默认就是持久化的

    2. 消息持久化

  2. 生产者开启确认机制

    1. 开启消息发送失败的重试策略
      1. 设置重试次数和重试间隔比例
      2. 耗尽重试次数后,依旧失败,记录失败消息到数据库失败消息表,用于后期执行补偿错误。如使用定时任务去扫描这个表,重新发送消息
    2. 开启 confirm 机制:保证消息正确到达交换机
      • 返回 ack,正确到达
      • 返回 nack,没有到达交换机,写入数据库,后期重试
    3. 开启 return 机制
      • 保证消息正确到达队列
      • 没有到达队列,会调用ReturnCallback,写入数据库,后期重试
  3. 消费者确认机制

    1. 开机自动确认机制

    2. 开启重试策略

      重试次数耗尽后,定义RepublishMessageRecoverer策略来让消息路由到错误队列,落库

相关文章:

  • Android 静态壁纸设置实现方案
  • 应用服务接口第二次请求一直pending问题
  • 网络故障排查
  • C++学习之路,从0到精通的征途:string类
  • 23种设计模式中的策略模式
  • 深入解析 Spring 启动过程
  • 借助可视化,快速洞察数据背后的商机
  • 地理信息系统(GIS)在智慧城市中的40个应用场景案例
  • JUC并发编程
  • 基于PySide6与pycatia的CATIA绘图文本批量处理工具开发实践
  • Windows下安装常用软件--MySQL篇
  • 第一课:Stable Diffusion | Web UI初了解
  • 网络运维学习笔记(DeepSeek优化版) 022 HCIP-Datacom路由概念、BFD协议详解与OSPF第一课
  • FreeCAD教程-dwg格式文件的打开-ODA Connverter的使用方法
  • Vulhub-jangow-01-1.0.1通关攻略
  • 塔能科技:智慧物联节能专利成就裴然
  • Matlab2024a免费版下载教程
  • mysql传统主从模式下,主从中断接续
  • 热门面试题第13天|Leetcode 110.平衡二叉树 257. 二叉树的所有路径 404.左叶子之和 222.完全二叉树的节点个数
  • 贪心:一道简单题的细节问题
  • 日本广岛大学一处拆迁工地发现疑似未爆弹
  • 德国将不再公布对乌克兰军事支持的细节
  • 中美会谈前都发生了什么?美方为何坐不住了?
  • 刘元春在《光明日报》撰文:以法治护航民营经济高质量发展
  • 呼和浩特推进新一轮国企重组整合:杜绝一项目一公司、一业务一公司
  • 新修订的《婚姻登记条例》明起施行,领证不用户口本了