当前位置: 首页 > news >正文

Spring Boot 与 RabbitMQ 集成示例

文章目录

      • 一、核心前提(环境与依赖)
        • 1. 环境准备
        • 2. 引入核心依赖
      • 二、核心配置(连接与基础设置)
      • 三、核心组件声明(交换机、队列、绑定)
        • 1. 注解式声明(简洁高效)
        • 2. 配置文件声明(适用于简单场景)
      • 四、消息收发实现
        • 1. 生产者(发送消息)
        • 2. 消费者(接收消息)
      • 五、关键特性与进阶配置
        • 1. 消息持久化
        • 2. 死信队列(处理失败 / 过期消息)
        • 3. 消息限流(避免消费者过载)
      • 六、测试验证
        • 1. 编写测试类
        • 2. 启动验证
      • 七、常见问题排查

Spring Boot 集成 RabbitMQ 的核心是通过 spring-boot-starter-amqp实现自动配置,快速完成消息收发,适用于解耦、异步通信等场景。

一、核心前提(环境与依赖)

1. 环境准备
  • 安装 RabbitMQ 服务(本地 / 服务器),启动后默认端口 5672,管理界面端口 15672。

  • 确保 Spring Boot 版本与 AMQP 启动器兼容(推荐 Spring Boot 2.x/3.x)。

2. 引入核心依赖

Maven 项目在pom.xml中添加依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

Gradle 项目对应添加:implementation 'org.springframework.boot:spring-boot-starter-amqp'


二、核心配置(连接与基础设置)

application.yml(或application.properties)中配置 RabbitMQ 连接信息:

spring:rabbitmq:host: 127.0.0.1  # MQ服务地址(远程填服务器IP)port: 5672       # 默认通信端口username: guest  # 默认账号(生产环境需自定义并授权)password: guest  # 默认密码virtual-host: /  # 虚拟主机(用于环境隔离)# 可选配置:消息确认、重试机制publisher-confirm-type: correlated  # 生产者确认机制publisher-returns: true             # 消息路由失败回调listener:simple:acknowledge-mode: auto          # 消费者确认模式(auto/manual/none)retry:enabled: true                 # 开启消费重试max-attempts: 3               # 最大重试次数

三、核心组件声明(交换机、队列、绑定)

需明确交换机(路由消息)、队列(存储消息)、绑定(关联两者),支持两种声明方式:

1. 注解式声明(简洁高效)

通过@Queue@Exchange@Binding注解直接绑定:

import org.springframework.amqp.core.\*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMqConfig {// 队列名称public static final String QUEUE_NAME = "demo_queue";// 交换机名称public static final String EXCHANGE_NAME = "demo_exchange";// 路由键public static final String ROUTING_KEY = "demo.routing.key";// 声明队列(durable=true:持久化,重启MQ不丢失)@Beanpublic Queue demoQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}// 声明交换机(Direct类型:精确路由)@Beanpublic DirectExchange demoExchange() {return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();}// 绑定队列与交换机(指定路由键)@Beanpublic Binding demoBinding(Queue demoQueue, DirectExchange demoExchange) {return BindingBuilder.bind(demoQueue).to(demoExchange).with(ROUTING_KEY);}}
  • 交换机类型:Direct(精确路由)、Topic(模糊路由)、Fanout(广播)、Headers(头匹配)。
2. 配置文件声明(适用于简单场景)

直接在application.yml中声明基础队列(无需代码):

spring:rabbitmq:# 其他配置...template:exchange: demo_exchangerouting-key: demo_routing_keylistener:simple:queues: demo_queue

四、消息收发实现

1. 生产者(发送消息)

使用RabbitTemplate发送消息,支持字符串、对象(自动序列化):

import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;import javax.annotation.Resource;@Componentpublic class MessageProducer {@Resourceprivate RabbitTemplate rabbitTemplate;// 发送字符串消息public void sendStringMessage(String content) {rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,  // 交换机名称RabbitMqConfig.ROUTING_KEY,    // 路由键content                         // 消息内容);System.out.println("生产者发送消息:" + content);}// 发送对象消息(需确保对象可序列化)public void sendObjectMessage(User user) {rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,RabbitMqConfig.ROUTING_KEY,user);System.out.println("生产者发送对象消息:" + user);}// 静态内部类示例(可独立定义)public static class User implements java.io.Serializable {private String id;private String name;// getter/setter/toString}}
2. 消费者(接收消息)

使用@RabbitListener注解监听队列,自动消费消息:

import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class MessageConsumer {// 监听指定队列@RabbitListener(queues = RabbitMqConfig.QUEUE_NAME)public void receiveStringMessage(String content) {System.out.println("消费者接收字符串消息:" + content);// 业务处理逻辑...}// 监听队列并接收对象消息@RabbitListener(queues = RabbitMqConfig.QUEUE_NAME)public void receiveObjectMessage(MessageProducer.User user) {System.out.println("消费者接收对象消息:" + user);// 业务处理逻辑...}}
  • 若需手动确认消息(acknowledge-mode: manual),可通过Channel对象手动 ack:
import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class ManualAckConsumer {@RabbitListener(queues = RabbitMqConfig.QUEUE_NAME)public void receiveMessage(String content, Channel channel, Message message) throws Exception {try {System.out.println("手动确认模式 - 接收消息:" + content);// 业务处理成功后,手动确认消息(第二个参数false:不批量确认)channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 业务处理失败,拒绝消息并重回队列(或死信队列)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}

五、关键特性与进阶配置

1. 消息持久化
  • 队列持久化:QueueBuilder.durable(true)(默认 true)。

  • 交换机持久化:ExchangeBuilder.durable(true)(默认 true)。

  • 消息持久化:发送时指定MessageProperties.PERSISTENT_TEXT_PLAIN

rabbitTemplate.convertAndSend(exchange, routingKey, content, message -> {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;});
2. 死信队列(处理失败 / 过期消息)

通过队列参数配置死信交换机和路由键,失败消息自动路由到死信队列:

@Beanpublic Queue deadLetterQueue() {return QueueBuilder.durable("dead_letter_queue").build();}@Beanpublic DirectExchange deadLetterExchange() {return ExchangeBuilder.directExchange("dead_letter_exchange").durable(true).build();}@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead_letter_routing_key");}// 普通队列绑定死信配置@Beanpublic Queue demoQueue() {return QueueBuilder.durable(QUEUE_NAME).withArgument("x-dead-letter-exchange", "dead_letter_exchange")  // 死信交换机.withArgument("x-dead-letter-routing-key", "dead_letter_routing_key")  // 死信路由键.withArgument("x-message-ttl", 60000)  // 消息过期时间(60秒).build();}
3. 消息限流(避免消费者过载)

application.yml中配置消费者每次拉取消息数量:

spring:rabbitmq:listener:simple:prefetch: 10  # 每次最多拉取10条消息,处理完再拉取

六、测试验证

1. 编写测试类
import org.junit.jupiter.api.Test;import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTestpublic class RabbitMqTest {@Resourceprivate MessageProducer messageProducer;@Testpublic void testSendMessage() {// 测试发送字符串消息messageProducer.sendStringMessage("Hello Spring Boot + RabbitMQ!");// 测试发送对象消息MessageProducer.User user = new MessageProducer.User();user.setId("1001");user.setName("测试用户");messageProducer.sendObjectMessage(user);}}
2. 启动验证
  • 启动 RabbitMQ 服务,访问http://localhost:15672(管理界面),可查看队列、交换机、消息数量。

  • 启动 Spring Boot 应用,执行测试方法,控制台会打印生产者发送日志和消费者接收日志。


七、常见问题排查

  1. 连接失败:检查 RabbitMQ 服务是否启动、地址 / 端口 / 账号密码是否正确、防火墙是否开放 5672 端口。

  2. 消息发送成功但消费者未接收:检查交换机与队列是否绑定、路由键是否匹配、消费者是否被 @Component 扫描到。

  3. 对象消息序列化失败:确保对象实现Serializable接口,或配置 JSON 序列化(需引入jackson-databind依赖)。

http://www.dtcms.com/a/600703.html

相关文章:

  • 家纺 网站模版想自己做网站流程
  • 将 CentOS 风格的命令行提示符(如 [root@slave1 ~]#)修改为 Ubuntu 风格
  • k8s各种场景下排错思路以及命令 k8s常见问题故障处理思路
  • win32k源代码分析之win32k!IsSAS函数中的全局变量win32k!gfsSASModifiers = 3是什么时候被赋值的
  • 序列和可迭代
  • 16.udp_socket(二)
  • 如何在不使用iTunes的情况下在电脑上访问iPhone文件
  • python+websockets,报错RuntimeError: no running event loop
  • 自己做网站流程龙口市最新公告
  • 自助建站系统介绍wordpress 百度推广
  • 基于Springboot的汽车推荐系统设计与实现7f7h74np(程序、源码、数据库、调试部署方案及开发环境)系统界面展示及获取方式置于文档末尾,可供参考。
  • DBLoss: Decomposition-based Loss Function for Time Series Forecasting 论文阅读
  • STM32F103学习笔记-16-RCC(第4节)-使用 HSI 配置系统时钟并用 MCO 监控系统时钟
  • Git 中新建学习分支 + 暂存修改 + VSCode 可视化查看改动(超详细教程)
  • Linux高效编程与实战:自动化构建工具“make/Makefile”和第一个系统程序——进度条
  • Docker 相关使用收录
  • 【详细步骤解析】爬虫小练习——爬取豆瓣Top250电影,最后以csv文件保存,附源码
  • Docker-存储
  • wap手机网站模板上饶网站建设3ao cc专业a
  • 【Nginx】Nginx 多协议负载均衡实战:StarRocks 与 MinIO 代理配置全解析
  • 域名注册和网站设计服务如何做贴吧类网站多钱
  • python+uniapp基于微信小程序的垃圾分类信息系统
  • C语言编译器安卓版 | 强大功能助力编程学习与实践
  • STM32使用金属探测传感器自制金属探测仪
  • vmware嵌套安装esxi7.0.3扩容vmfs
  • 使用 BR 备份 TiDB 到 AWS S3 存储
  • 【OpenCV + VS】OpenCV 绘图:绘制矩形、圆形、椭圆形、线条等
  • 易语言反编译工具 - 高效破解易语言程序的利器
  • 11年始终专注营销型网站提供网站建设小程序制作
  • AOSP Android13 Launcher3——TransformParams 类