当前位置: 首页 > news >正文

springboot rabbitmq 消息队列入门与实战

Spring Boot3 RabbitMq 项目地址

https://gitee.com/supervol/loong-springboot-study

(记得给个start,感谢)

RabbitMq 概述

        RabbitMQ 是基于 AMQP(Advanced Message Queuing Protocol)的开源消息中间件,核心优势在于解耦、削峰、异步通信;而 Spring Boot 3 作为主流的 Java 开发框架,通过 spring-boot-starter-amqp starter 简化了 RabbitMQ 的整合流程。本文将从基础概念、环境搭建、核心功能、高级特性到最佳实践,全面讲解 Spring Boot 3 与 RabbitMQ 的整合方案。

RabbitMq 核心

1. RabbitMQ 组件

组件作用说明
BrokerRabbitMQ 服务器实例,负责接收、存储、转发消息
Exchange交换机,接收生产者发送的消息,根据路由规则将消息路由到绑定的队列
Queue消息队列,存储待消费的消息,支持持久化、限流、死信等特性
Binding交换机与队列的绑定关系,包含 “路由键(Routing Key)” 用于匹配消息
Routing Key消息的 “地址标识”,交换机通过 Routing Key 决定消息路由到哪个队列
Virtual Host虚拟主机,实现多租户隔离(不同应用使用不同 Virtual Host,避免资源冲突)
Connection客户端与 Broker 的 TCP 连接,重量级资源,一般复用
Channel基于 Connection 的轻量级通信通道, RabbitMQ 推荐通过 Channel 操作消息(减少 TCP 连接开销)

        交换机(Exchange)的 4 种核心类型,交换机是 RabbitMQ 消息路由的核心,不同类型对应不同的路由策略:

  • Direct Exchange(直连交换机):精确匹配 Routing Key(消息的 Routing Key 与 Binding 的 Routing Key 完全一致才路由),适用于点对点通信(如订单支付通知)。
  • Topic Exchange(主题交换机):模糊匹配 Routing Key(支持 * 匹配单个单词、# 匹配多个单词,单词间用 . 分隔),适用于发布订阅 + 多条件过滤(如日志按 “服务名。级别” 路由)。
  • Fanout Exchange(扇出交换机):忽略 Routing Key,将消息广播到所有绑定的队列,适用于广播通信(如系统通知、缓存清理)。
  • Headers Exchange(头交换机):不依赖 Routing Key,通过匹配消息头(Headers)的键值对路由,适用于复杂属性匹配(较少用,灵活但性能略低)。

2. Spring AMQP 核心组件

        Spring Boot 3 整合 RabbitMQ 依赖 Spring AMQP(版本与 Spring Boot 3 强绑定,如 Spring Boot 3.2 对应 Spring AMQP 3.2+),核心组件如下:

  • RabbitTemplate:封装了 RabbitMQ 的消息发送逻辑,支持同步 / 异步发送、消息回调、消息转换器等。
  • AmqpAdmin:用于声明交换机、队列、绑定关系(支持编程式声明,也可通过注解声明)。
  • @RabbitListener:注解式消费者,标注在方法上即可监听指定队列,支持批量消费、手动确认等。
  • MessageListenerContainer:消费者容器,负责管理消费者生命周期(如并发消费、消息重试、异常处理),Spring Boot 会自动配置默认容器。

RabbitMq 示例

1. 前提条件

        Spring Boot 3 对依赖版本有严格要求,避免版本冲突:

组件最低版本要求推荐版本
JDKJDK 17+JDK 17/21
RabbitMQ3.9+3.12+
Spring Boot3.0+3.2.x(稳定版)
Spring AMQP3.0+(随 Spring Boot 自动引入)3.2.x

2. 代码位置

        请参考项目地址中 springboot-mq/springboot-rabbitmq 模块代码。

RabbitMq 高级

        基础整合仅满足简单通信,实际项目需解决消息丢失、重复消费、延迟消息等问题,本节讲解核心高级特性。

1. 消息可靠性保障

        RabbitMQ 消息丢失可能发生在三个环节:生产者→BrokerBroker 存储Broker→消费者,需针对性防护。

环节防护措施
生产者→Broker开启生产者确认(publisher-confirm-type: correlated)+ 回调重试
Broker 存储交换机 / 队列持久化(durable=true)+ 消息持久化(deliveryMode=PERSISTENT
Broker→消费者手动确认(acknowledge-mode: manual)+ 消费失败转发死信队列
(1)消息持久化配置

        在声明交换机和队列时,需设置 durable=true;发送消息时,需设置 deliveryMode=PERSISTENT

// 1. 声明持久化交换机
DirectExchange durableExchange = new DirectExchange("durable-exchange", true, false);// 2. 声明持久化队列
Queue durableQueue = new Queue("durable-queue", true, false, false);// 3. 发送持久化消息(通过 RabbitTemplate 设置消息属性)
rabbitTemplate.convertAndSend("durable-exchange","durable-routing-key","持久化消息",message -> {// 设置消息持久化(DeliveryMode.PERSISTENT)message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;},new CorrelationData(UUID.randomUUID().toString())
);

2. 死信队列

        死信是指无法被正常消费的消息(如消费失败、消息过期、队列满),死信队列用于存储这些消息,避免丢失或阻塞正常队列。

(1)死信产生条件
  • 消息被消费者拒绝(basicReject/basicNack,且 requeue=false)。
  • 消息过期(队列设置 x-message-ttl 或消息单独设置 expiration)。
  • 队列达到最大长度(x-max-length),无法存储新消息。
(2)死信队列配置示例
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DeadLetterQueueConfig {// 1. 死信交换机(普通 Direct 交换机)@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange("dlx-exchange", true, false);}// 2. 死信队列(存储死信消息)@Beanpublic Queue deadLetterQueue() {return new Queue("dlx-queue", true, false, false);}// 3. 绑定死信交换机与死信队列@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dlx-routing-key"); // 死信路由键}// 4. 普通队列(设置死信属性,将死信转发到死信交换机)@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal-queue").withArgument("x-dead-letter-exchange", "dlx-exchange") // 死信交换机.withArgument("x-dead-letter-routing-key", "dlx-routing-key") // 死信路由键.withArgument("x-message-ttl", 10000) // 消息过期时间(10秒).build();}// 5. 绑定普通队列与普通交换机@Beanpublic Binding normalBinding(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with("normal-routing-key");}// 6. 普通交换机@Beanpublic DirectExchange normalExchange() {return new DirectExchange("normal-exchange", true, false);}
}

        测试:发送消息到 normal-queue,若 10 秒内未被消费,消息会自动转为死信,进入 dlx-queue

3. 延迟队列

        延迟队列用于 “消息延迟指定时间后再消费”(如订单超时未支付自动取消、定时任务),RabbitMQ 无原生延迟队列,需通过以下两种方式实现:

(1)基于死信队列 + TTL

        利用 “消息过期后转为死信” 的特性,设置队列的 x-message-ttl,死信队列即为延迟队列。
缺陷:队列中所有消息的延迟时间固定,无法灵活设置不同延迟时间。

(2)基于 RabbitMQ 延迟插件

        RabbitMQ 提供 rabbitmq_delayed_message_exchange 插件,支持自定义消息延迟时间,灵活性更高。

步骤 1:安装延迟插件

  1. 下载rabbitmq_delayed_message_exchange插件,并放到指定位置
  2. 安装插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  3. 验证:访问管理界面,在 Exchanges 的 Type 下拉框中可看到 x-delayed-message
  4. 注意,本文不讨论和涉及rabbitmq及其插件安装和配置,请自行搜索。

步骤 2:配置延迟交换机与队列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DelayedQueueConfig {// 1. 声明延迟交换机(类型为 x-delayed-message)@Beanpublic CustomExchange delayedExchange() {// 参数:名称、类型、持久化、自动删除、附加参数(指定延迟交换机的路由类型)return new CustomExchange("delayed-exchange","x-delayed-message",true,false,Map.of("x-delayed-type", "direct") // 延迟交换机的底层路由类型(如 direct));}// 2. 声明延迟队列@Beanpublic Queue delayedQueue() {return new Queue("delayed-queue", true, false, false);}// 3. 绑定延迟交换机与队列@Beanpublic Binding delayedBinding() {return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed-routing-key").noargs();}
}

步骤 3:发送延迟消息

// 发送延迟消息(设置延迟时间,单位:毫秒)
public void sendDelayedMessage(Object message, long delayMs) {rabbitTemplate.convertAndSend("delayed-exchange","delayed-routing-key",message,msg -> {// 设置延迟时间msg.getMessageProperties().setDelay((int) delayMs);// 消息持久化msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return msg;},new CorrelationData(UUID.randomUUID().toString()));
}// 调用:延迟 5 秒后消费
sendDelayedMessage("延迟 5 秒的消息", 5000);

4. 消息幂等性

        重复消费:同一消息被消费者多次处理(如消费者确认前宕机,Broker 重新投递)。需保证 “重复消费不影响业务正确性”(即幂等)。

(1) 解决方案:唯一 ID + 去重存储

  1. 生成唯一消息 ID:生产者发送消息时,设置 messageId(如 UUID)。
  2. 消费前检查去重:消费者接收消息后,先查询存储(Redis / 数据库)中是否存在该 messageId,若存在则跳过,若不存在则处理业务并记录 messageId
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;@Service
public class IdempotentConsumerService {@Resourceprivate RedisTemplate<String, String> redisTemplate;// 幂等消费逻辑public void consumeIdempotentMessage(Object message, Message amqpMessage, Channel channel) throws IOException {String messageId = amqpMessage.getMessageProperties().getMessageId();String redisKey = "rabbitmq:message:id:" + messageId;try {// 1. Redis 分布式锁:避免并发重复处理(setIfAbsent 原子操作)Boolean isFirstConsume = redisTemplate.opsForValue().setIfAbsent(redisKey,"CONSUMED",24, // 过期时间(根据业务调整,避免 Redis 堆积)TimeUnit.HOURS);if (Boolean.FALSE.equals(isFirstConsume)) {// 2. 非首次消费:直接确认消息System.out.printf("消息已重复消费,ID=%s%n", messageId);channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);return;}// 3. 首次消费:处理业务逻辑System.out.printf("幂等消费消息:ID=%s,内容=%s%n", messageId, message);// 4. 处理完成:确认消息channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 5. 消费失败:拒绝消息(不重回队列)channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, false);System.err.printf("幂等消费失败:ID=%s,原因=%s%n", messageId, e.getMessage());}}
}

5. 监控与运维

1. RabbitMQ Management UI

        RabbitMQ 管理界面是最基础的监控工具,关键监控指标:

  • Exchanges:交换机是否正常,绑定数、消息入站 / 出站速率。
  • Queues:队列长度(Ready 数,若持续增长需扩容消费者)、消息消费速率(Consumers 数、Acknowledged 数)。
  • Connections/Channels:连接数、信道数是否超出阈值(避免资源耗尽)。
  • Admin:用户权限、虚拟主机配置是否正确。

2. Spring Boot Actuator 监控

        通过 Spring Boot Actuator 暴露 RabbitMQ metrics,结合 Prometheus + Grafana 可实现可视化监控。

<!-- Spring Boot Actuator -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency><!-- 可选:Prometheus 适配(用于对接 Grafana) -->
<dependency><groupId>io.micrometer</groupId><artifactId>micrometer-registry-prometheus</artifactId>
</dependency>

配置暴露监控端点

management:endpoints:web:exposure:include: health,info,metrics,prometheus # 暴露的端点metrics:export:prometheus:enabled: true # 启用 Prometheus 导出endpoint:health:show-details: always # 显示健康详情

查看监控数据

  • 访问 http://localhost:8080/actuator/health:查看 RabbitMQ 连接健康状态(rabbitmq 节点为 UP 表示正常)。
  • 访问 http://localhost:8080/actuator/metrics/rabbitmq.messages.sent:查看消息发送总数。
  • 访问 http://localhost:8080/actuator/prometheus:获取 Prometheus 格式的 metrics,用于 Grafana 可视化。

RabbitMq 总结

  1. 组件设计规范

    • 交换机 / 队列命名:按 “业务模块 - 类型 - 用途” 命名(如 order-direct-exchangeorder-pay-queue)。
    • 虚拟主机隔离:不同环境(开发 / 测试 / 生产)或不同应用使用独立 Virtual Host。
  2. 性能优化

    • 连接池配置:使用 CachingConnectionFactory 缓存信道(默认缓存 25 个),避免频繁创建信道。
    • 消息体大小:单个消息不超过 1MB(大消息建议存储到 MinIO/OSS,消息中携带文件地址)。
    • 并发控制:消费者并发数(concurrency)根据 CPU 核心数调整(如 2-4 倍核心数),避免过度并发导致资源竞争。
  3. 可靠性优先

    • 必开特性:生产者确认、手动确认、消息持久化、死信队列。
    • 避免滥用自动确认:仅在 “消费逻辑无副作用” 场景使用 acknowledge-mode: auto
  4. 问题排查

    • 日志配置:开启 RabbitMQ DEBUG 日志(logging.level.org.springframework.amqp=DEBUG),便于追踪消息流转。
    • 死信监控:定期检查死信队列,分析死信原因(如消费异常、消息过期)。

文章转载自:

http://NGOHqfTi.fgwzL.cn
http://VnVfdXaZ.fgwzL.cn
http://RDlkBdGa.fgwzL.cn
http://c0mOe5Az.fgwzL.cn
http://rUOK5jD2.fgwzL.cn
http://ob1xxLSW.fgwzL.cn
http://hg9HmjP3.fgwzL.cn
http://2AnbIH8g.fgwzL.cn
http://Yydiu0AF.fgwzL.cn
http://B8T0KFsZ.fgwzL.cn
http://s9KbfMLB.fgwzL.cn
http://vCIphLiT.fgwzL.cn
http://tDUeioiz.fgwzL.cn
http://aw8a7iZh.fgwzL.cn
http://5IaEkivH.fgwzL.cn
http://vW19QMIL.fgwzL.cn
http://BGprHND2.fgwzL.cn
http://YxrzCRdt.fgwzL.cn
http://p7NNRvbi.fgwzL.cn
http://jkRfYx2u.fgwzL.cn
http://Di9s69Mn.fgwzL.cn
http://t6N3xS92.fgwzL.cn
http://kBVsexgk.fgwzL.cn
http://cLI0BeJv.fgwzL.cn
http://XdcRsvC6.fgwzL.cn
http://CA22a4BG.fgwzL.cn
http://EW2xWI1r.fgwzL.cn
http://eWbZ8rWc.fgwzL.cn
http://b8Ev1y43.fgwzL.cn
http://7U2ylwjX.fgwzL.cn
http://www.dtcms.com/a/379869.html

相关文章:

  • 使用vllm部署neo4j的text2cypher-gemma-2-9b-it-finetuned-2024v1模型
  • 栈-844.比较含退格的字符串-力扣(LeetCode)
  • [Dify] HTTP 请求节点详解:如何在 Dify 中配置与调用第三方 API
  • SQL优化简单思路
  • 构建AI智能体:三十一、AI医疗场景实践:医学知识精准问答+临床智能辅助决策CDSS
  • HTTP的Web服务测试在Python中的实现
  • 华为HCIE-云计算培训课程有哪些?
  • 绕过 FlashAttention-2 限制:在 Turing 架构上使用 PyTorch 实现 FlashAttention
  • 美食分享|基于Springboot和vue的地方美食分享网站系统设计与实现(源码+数据库+文档)
  • 华为HICE云计算的含金量高吗?
  • 【算法--链表】146.LRU缓存--通俗讲解
  • 5 绑定表
  • 记录一次利用arthas和skywalking做接口性能优化的全过程
  • 缓存三大劫攻防战:穿透、击穿、雪崩的Java实战防御体系(一)
  • 单轴导纳控制 (Single-Axis Admittance Control) 算法介绍
  • 软考~系统规划与管理师考试——真题篇——章节——第1章 信息系统与信息技术发展——纯享题目版
  • 霸王餐返利app的分布式架构设计:基于事件驱动的订单处理系统
  • Android SystemServer 启动 service源码分析
  • CentOS搭建本地源
  • Python的pip镜像源配置
  • ES6 面试题及详细答案 80题 (55-61)-- 类与继承
  • 云手机在办公领域中自动化的应用
  • Flink面试题及详细答案100道(21-40)- 基础概念与架构
  • 用Python打造专业级老照片修复工具:让时光倒流的数字魔法
  • 第八章:移动端着色器的优化-Mobile Shader Adjustment《Unity Shaders and Effets Cookbook》
  • 前端性能优化:Webpack Tree Shaking 的实践与踩坑前端性能优化:Webpack Tree Shaking 的实践与踩坑
  • 国产凝思debian系Linux离线安装rabbitmq教程步骤
  • how to setup k3s on an offline ubuntu
  • RabbitMQ对接MQTT消息发布指南
  • ⸢ 肆-Ⅰ⸥ ⤳ 默认安全建设方案:d.存量风险治理