当前位置: 首页 > news >正文

Java研学-RabbitMQ(八)

一 消费者可靠性

  RabbitMQ 的消费者可靠性通过消费者确认机制、消费失败处理策略及业务幂等性设计形成完整闭环:
消费者处理完成后发送 ACK/NACK 显式确认,成功则删除消息,失败则触发重试(自动或手动模式支持),重试超限后消息进入死信队列或异常通道避免无限循环;
同时,业务层需通过唯一请求ID、分布式锁或状态机等实现幂等性,确保重复消费时数据状态一致,最终实现从消息投递到业务落地的全链路可靠性保障。

二 消费者确认机制

1 概述

  RabbitMQ 的消费者确认机制通过显式回执保障消息可靠处理:消费者完成消息处理后,需向 RabbitMQ 发送 ack(确认删除消息)、nack(处理失败并允许重试)或 reject(处理失败且拒绝重试)三种回执,
其中 nack 可触发消息重新入队或路由至死信队列(需配置),而reject 通常用于明确无效消息的丢弃;结合 Spring AMQP 等框架的自动/手动确认模式,该机制实现了消息处理状态的精准追踪,为后续重试、幂等性等可靠性策略提供了基础支撑。
此时消费者需要在监听并处理消息的同时,给MQ发送回执。

2 application.yaml – consumer

  acknowledge-mode: 新增的消费者配置

模式确认/删除时机消息删除条件异常处理
none投递后立即 ACK → 立即删除无条件删除(无论业务是否成功)无处理(消息丢失无感知)
manual手动调用:
- ACK → 删除
- NACK/REJECT → 不删除(可重试/进死信队列)
ACK 时删除需手动实现成功/失败逻辑(灵活但侵入性强)
auto自动触发:
- 成功 → ACK
- 异常 → NACK
以下情况删除
1. 成功执行 → ACK 后删除
2. 业务异常 → NACK + 不重试 → 进死信队列(若配置)
3. 系统异常 → 默认 NACK + 重试(失败后进死信队列)
通过异常类型区分:
- 业务异常:直接丢弃
- 系统异常:可配置重试或丢弃
spring:rabbitmq:host: 192.168.44.128port: 5672virtual-host: /midhuangusername: dahuangpassword: "dahuang66"# 消费者配置listener:simple:prefetch: 1                     # 预取消息数量(平衡吞吐量与公平性)acknowledge-mode: none          # 自动确认模式(推荐生产使用)logging:level:cn.tj.consumer.listeners: DEBUG # 设置为 DEBUG 以查看详细日志

3 监听系统异常消息 – MqListener

@Slf4j
@Component
public class MqListener {@RabbitListener(queues = "huang.queue")public void listenHuangQueue(String msg) {System.out.println("消费者收到消息:【" + msg + "】");throw new RuntimeException("抛出系统异常"); // 抛出系统异常(非业务异常)}
}

4 发送消息 – PublisherApplicationTests

@Slf4j
@SpringBootTest
class PublisherApplicationTests {// 注入 RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testSendMessageQueue() {String queueName = "huang.queue";String msg = "hello, Huang!";rabbitTemplate.convertAndSend(queueName, msg);System.out.println("Sent message: " + msg);}
}

5 为系统异常打断点运行

  消息发送至队列
在这里插入图片描述
断点运行启动,消费者开始监听,此时消息还没有处理完
在这里插入图片描述
此刻会发现消息未被处理完毕,队列已经将信息删除,此时抛出异常,消息丢失
在这里插入图片描述

6 application.yaml – consumer

  acknowledge-mode: 设置为auto

spring:rabbitmq:host: 192.168.44.128port: 5672virtual-host: /midhuangusername: dahuangpassword: "dahuang66"# 消费者配置listener:simple:prefetch: 1                     # 预取消息数量(平衡吞吐量与公平性)acknowledge-mode: auto          # 自动确认模式(推荐生产使用)logging:level:cn.tj.consumer.listeners: DEBUG # 设置为 DEBUG 以查看详细日志

7 再为系统异常打断点运行

  重复上述步骤,发现消息状态发生变化Unacked尚未获得回执
在这里插入图片描述
此时断点放行,会返回Nack,队列重新进行投递消息,直到成功为止
在这里插入图片描述
若服务宕机(关闭consumer服务),消息会被收回,状态恢复,等待下次投递
在这里插入图片描述

8 监听业务异常 – MqListener

@Slf4j
@Component
public class MqListener {@RabbitListener(queues = "huang.queue")public void listenHuangQueue(String msg) {System.out.println("消费者收到消息:【" + msg + "】");throw new MessageConversionException("抛出业务异常"); // 抛出业务异常}
}

9 业务异常结果

  产生业务异常,消息会被直接拒绝,然后移除
在这里插入图片描述
在这里插入图片描述

三 消费失败重试机制

1 介绍

  Spring AMQP 提供了本地重试机制(基于 Spring Retry),可以避免消息因消费者异常而无限次 requeue 到 RabbitMQ 队列,从而减少不必要的消息堆积和系统压力。

2 application.yaml – consumer

  配置重试机制retry:

spring:rabbitmq:host: 192.168.44.128port: 5672virtual-host: /midhuangusername: dahuangpassword: "dahuang66"# 消费者配置listener:simple:prefetch: 1                     # 预取消息数量(平衡吞吐量与公平性)acknowledge-mode: auto          # 自动确认模式(推荐生产使用)retry:enabled: true                # 开启重试机制initial-interval: 1000ms     # 初始重试间隔(1秒)multiplier: 1.0              # 下次重试间隔的倍数(1.0表示固定间隔)max-attempts: 3              # 最大重试次数(含初次消费)stateless: true              # true(无状态,默认);false(有状态,适用于事务)logging:level:cn.tj.consumer: DEBUG # 设置为 DEBUG 以查看详细日志

3 PublisherApplicationTests

  发送消息到队列

@Slf4j
@SpringBootTest
class PublisherApplicationTests {// 注入 RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testSendMessageQueue() {String queueName = "huang.queue";String msg = "hello, Huang!";rabbitTemplate.convertAndSend(queueName, msg);System.out.println("Sent message: " + msg);}
}

4 MqListener

  启动消费者

@Slf4j
@Component
public class MqListener {@RabbitListener(queues = "huang.queue")public void listenHuangQueue(String msg) {System.out.println("消费者收到消息:【" + msg + "】");throw new RuntimeException("抛出系统异常"); // 抛出系统异常(非业务异常)}
}

5 测试结果

  此时消费者每收到一次消息,就抛一次异常,重试3次以后(次数耗尽),消息被丢弃
在这里插入图片描述
在这里插入图片描述

四 失败消息处理策略

1

五 业务幂等性

http://www.dtcms.com/a/331189.html

相关文章:

  • Rabbitmq+STS+discovery_k8s +localpv部署排坑详解
  • 队列的使用以及泛型思考[二叉树的层序遍历]
  • 【P27 4-8】OpenCV Python——Mat类、深拷贝(clone、copyTo、copy)、浅拷贝,原理讲解与示例代码
  • Horse3D游戏引擎研发笔记(五):在QtOpenGL环境下,仿three.js的BufferGeometry管理VAO和EBO绘制四边形
  • 算法训练营day51 图论② 岛屿数量深搜、广搜、最大面积
  • 图论(5)最小生成树算法
  • Claude Code 国内直接使用,原生支持 Windows 免WSL安装教程
  • Day56--图论--108. 冗余的边(卡码网),109. 冗余的边II(卡码网)
  • Day58--图论--117. 软件构建(卡码网),47. 参加科学大会(卡码网)
  • MySQL窗口函数与PyMySQL以及SQL注入
  • MySQLl中OFFSET 的使用方法
  • 中国AI生态加速迭代,AI硬件引领人机互动新范式
  • LeetCode 分类刷题:2302. 统计得分小于 K 的子数组数目
  • Gradle(四)Maven 项目迁移 Gradle 项目实践
  • 文件服务器:samba
  • Java 并发新范式:用 Structured Concurrency 优雅收拾多线程烂摊子
  • 编排之神-Kubernetes微服务专题--ingress-nginx及金丝雀Canary的演练
  • 电动自行车:中国式制霸
  • 支付域——账户系统设计
  • 2025年Java大厂面试场景题全解析:高频考点与实战攻略
  • 优德普SAP一体化平台有哪些功能?
  • 力扣(盛最多水的容器)
  • Java基础 8.14
  • 力扣-5.最长回文子串
  • MySQL的索引(索引的创建和设计原则):
  • 初识c语言————缓冲区字符滞留
  • 天马 TM150XDHG01-04 宽温高亮液晶模组技术档案
  • **标题:发散创新,探索编程中的平衡设计****摘要**:本文将探讨如何在编程中运用平衡设计思想,通过实例分析与
  • STM32F103 basic定时器的介绍和应用
  • 2021-2025全国监测国控断面地表水水质数据