当前位置: 首页 > news >正文

Spring Boot 整合 RabbitMQ

Spring Boot 整合 RabbitMQ

一、概述:RabbitMQ 是什么?

你可以把 RabbitMQ 想象成一个「快递中转站」。
比如你在网上买了一本书,卖家(生产者)把包裹(消息)交给快递站(RabbitMQ),快递站根据包裹上的地址(规则)把包裹分给不同的快递员(消费者),最后送到你家(业务系统)。

RabbitMQ 是一个专门用来「传递消息」的软件(专业叫「消息中间件」),它能让不同的程序、不同的电脑之间高效地「传小纸条」。


二、RabbitMQ 的「快递分类方式」(交换机类型)

快递站分包裹时,可能按「地址」「重量」「紧急程度」分类。RabbitMQ 也有类似的「分类规则」,叫 交换机(Exchange)。常用的有 4 种:

1. 直连交换机(Direct Exchange)

规则:包裹上必须写「精确地址」(路由键 Routing Key),只有地址完全匹配的快递员才能收到。
例子:卖家给「北京-朝阳区」的包裹,只有负责朝阳区的快递员能接。

2. 扇形交换机(Fanout Exchange)

规则:不管地址,「所有快递员」都能收到包裹(广播模式)。
例子:卖家发「双11大促通知」,所有快递员都要知道,一起准备加班。

3. 主题交换机(Topic Exchange)

规则:地址可以用「通配符」(比如 * 代表一个词,# 代表多个词)。
例子:卖家发「北京.*」的包裹,所有地址以「北京」开头的快递员(如北京-朝阳、北京-海淀)都能收到。

4. 头交换机(Headers Exchange)

规则:不看地址,看包裹上的「标签」(Headers 头信息,比如「优先级=高」)。
例子:卖家标「紧急」的包裹,只有关注「紧急」标签的快递员能接。


三、RabbitMQ 的使用场景(为什么需要它?)

1. 异步处理:省时间!

比如你在淘宝下单,系统需要「扣库存+发短信+更新积分」。如果一步步做,可能要等 5 秒;用 RabbitMQ 可以把「发短信」和「更新积分」的任务丢给 RabbitMQ,主流程只需要 1 秒完成下单,剩下的由其他程序慢慢处理。

2. 流量削峰:防崩溃!

双11时,订单像洪水一样涌来,系统直接处理可能被冲垮。RabbitMQ 像「水库」,把订单暂时存起来,系统按自己的速度慢慢处理(比如每秒处理 1000 单),避免被瞬间的高流量冲垮。

3. 系统解耦:不互相拖累!

比如电商系统有「订单模块」「库存模块」「短信模块」。如果订单模块直接调用库存和短信模块,一旦短信模块崩溃,订单也会失败。用 RabbitMQ 后,订单模块只需要把消息发给 RabbitMQ,其他模块自己来取,互不影响。

四、整合Springboot

1. 配置 RabbitMQ 连接

1.Maven

<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId>
</dependency>

2.配置文件,yml和properties选择一个

spring:rabbitmq:host: 117.185.165.187port: 5672username: rabbitmqpassword: j8iG3KYs7Wmxxx
# RabbitMQ 服务器地址(默认 localhost:5672)
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# 登录账号密码(默认 guest/guest,注意:远程连接需要改密码!)
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

2、定义「快递规则」:交换机和队列

RabbitMQ 的消息需要通过「交换机(Exchange)」和「队列(Queue)」传递。我们需要先告诉 Spring Boot 要创建哪些交换机和队列。

新建 RabbitMQConfig.java,用 @Bean 声明交换机、队列和绑定关系。

做一个「电商下单后发通知」的功能,需要:

  • 一个直连交换机(order_exchange)。
  • 一个队列(sms_queue),专门存「需要发短信的订单」。
  • 把队列和交换机绑定,路由键是 send_sms(只有路由键匹配的消息才会进这个队列)。
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 1. 声明直连交换机(名字叫 order_exchange)@Beanpublic DirectExchange orderExchange() {return new DirectExchange("order_exchange");}// 2. 声明队列(名字叫 sms_queue,存需要发短信的订单)@Beanpublic Queue smsQueue() {return new Queue("sms_queue");}// 3. 把队列和交换机绑定,路由键是 send_sms(只有路由键匹配的消息才会进这个队列)@Beanpublic Binding smsBinding(Queue smsQueue, DirectExchange orderExchange) {return BindingBuilder.bind(smsQueue).to(orderExchange).with("send_sms");  // 路由键必须和生产者发送时一致}
}

如果说是多个队列按照下面的

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 1. 声明直连交换机(名字叫 order_exchange)@Beanpublic DirectExchange orderExchange() {return new DirectExchange("order_exchange");}// 2. 声明 3 个队列(短信、积分、日志)@Beanpublic Queue smsQueue() {return new Queue("sms_queue");  // 存需要发短信的订单}@Beanpublic Queue scoreQueue() {return new Queue("score_queue");  // 存需要更新积分的订单}@Beanpublic Queue logQueue() {return new Queue("log_queue");  // 存需要记录日志的订单}// 3. 绑定 sms_queue(路由键 send_sms)@Beanpublic Binding smsBinding(Queue smsQueue, DirectExchange orderExchange) {return BindingBuilder.bind(smsQueue).to(orderExchange).with("send_sms");  // 路由键:只有 send_sms 的消息会进 sms_queue}// 4. 绑定 score_queue(路由键 update_score)@Beanpublic Binding scoreBinding(Queue scoreQueue, DirectExchange orderExchange) {return BindingBuilder.bind(scoreQueue).to(orderExchange).with("update_score");  // 路由键:只有 update_score 的消息会进 score_queue}// 5. 绑定 logQueue(路由键 log_order)@Beanpublic Binding logBinding(Queue logQueue, DirectExchange orderExchange) {return BindingBuilder.bind(logQueue).to(orderExchange).with("log_order");  // 路由键:只有 log_order 的消息会进 log_queue}}

3、生产者:发送消息(卖家发包裹)

RabbitTemplate(Spring 提供的发消息工具)发送消息到交换机。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class OrderService {// 注入 RabbitTemplate(Spring 自动帮我们创建好的发消息工具)@Autowiredprivate RabbitTemplate rabbitTemplate;// 用户下单后,发送消息到 RabbitMQpublic void createOrder(String orderInfo) {// 1. 主流程:扣库存、保存订单(这里简化,直接打印)System.out.println("主流程:订单已保存,开始扣库存...");// 2. 异步任务:发送短信通知(把消息发给 RabbitMQ)rabbitTemplate.convertAndSend("order_exchange",  // 交换机名字"send_sms",        // 路由键(和队列绑定的路由键一致)orderInfo          // 消息内容(比如订单详情));System.out.println("已发送短信通知任务到 RabbitMQ");}
}

4、消费者:接收消息(快递员收包裹)

@RabbitListener 注解监听队列,自动接收并处理消息。

新建消费者服务类

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SmsConsumer {// 监听 sms_queue 队列,有消息就自动触发这个方法@RabbitListener(queues = "sms_queue")public void sendSms(String orderInfo) {System.out.println("收到短信任务,正在发送...");// 这里调用短信接口(比如阿里云短信),实际代码需要替换System.out.println("已给用户发送短信:" + orderInfo);}
}

如果说是多线程处理就多添加一个配置concurrency = "5"

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SmsConsumer {// 监听 sms_queue 队列,有消息就自动触发这个方法@RabbitListener(queues = "sms_queue",concurrency = "5")public void sendSms(String orderInfo) {System.out.println("收到短信任务,正在发送...");// 这里调用短信接口(比如阿里云短信),实际代码需要替换System.out.println("已给用户发送短信:" + orderInfo);}
}
1、如何避免消息被重复处理?

如果你的场景是「多个消费者抢着处理同一条消息」(比如并行加速),需要确保 一条消息只被一个消费者处理。RabbitMQ 默认已经帮你实现了这一点!

2、原理:消息确认机制(ACK)
  • 当消费者收到消息后,RabbitMQ 会等待消费者「确认」(ACK)。
  • 如果消费者正常处理完消息并返回 ACK,RabbitMQ 会删除这条消息,不会再发给其他消费者。
  • 如果消费者处理失败(比如崩溃),RabbitMQ 会重新将消息分发给其他消费者。
3、注意事项
1. 消息幂等性(防重复处理)

如果消费者处理消息时,因为网络问题导致 ACK 未成功返回,RabbitMQ 会重新发送消息,可能导致重复处理。
解决方法

  • 消息里加唯一标识(如订单号)。
  • 处理前检查是否已处理过(比如查数据库)。
2. 消费者数量别太多!

concurrency 不是越大越好!如果消费者数量超过服务器 CPU 核心数,反而会因为线程切换浪费资源。
建议:根据业务耗时调整,比如处理耗时 1 秒的任务,消费者数量 = CPU 核心数 × 2 比较合理。

3. 手动确认消息(高级场景)

默认是自动 ACK(auto_ack=true),但如果处理消息可能失败(比如调用外部接口超时),建议用手动 ACK。

@RabbitListener(queues = "order_queue", ackMode = "MANUAL")  // 手动确认
public void processOrder(String orderInfo, Channel channel, Message message) {try {// 处理消息...channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);  // 手动确认成功} catch (Exception e) {// 处理失败,重新入队(或发送到死信队列)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}
}

五、常见问题 & 注意事项

1. 消息丢失怎么办?

  • 开启「消息持久化」:在声明队列和交换机时,设置 durable=true(默认是 true,重启 RabbitMQ 后消息不丢失)。
  • 生产者确认:配置 spring.rabbitmq.publisher-confirm-type=correlated,确保消息成功发到交换机。
  • 消费者确认:默认是 auto_ack=true(自动确认),如果需要手动确认(比如处理消息时可能失败),可以设置 @RabbitListener(ackMode = "MANUAL"),处理完再调用 channel.basicAck()

2. 重复消费怎么办?

  • 消息里加唯一标识(如订单号),消费者处理前检查是否已处理过(比如查数据库)。

3. RabbitMQ 连不上?

  • 检查 application.properties 里的 hostportusernamepassword 是否正确。
  • 远程连接时,RabbitMQ 默认禁止 guest 用户,需要新建用户并授权(管理界面操作)。

六、总结

用 Spring Boot 整合 RabbitMQ 超简单!核心步骤就 4 步:

  1. 配连接:在 application.properties 里填 RabbitMQ 地址。
  2. 定义规则:用 @Bean 声明交换机、队列和绑定关系。
  3. 发消息:用 RabbitTemplate.convertAndSend() 发送。
  4. 收消息:用 @RabbitListener 监听队列。

适合用 Spring Boot + RabbitMQ 的场景

  • 电商、物流等需要「异步任务」的系统。
  • 高并发场景(如双11订单洪峰)。
  • 多个模块需要「松耦合」协作的系统(如订单、短信、积分模块)。
http://www.dtcms.com/a/272177.html

相关文章:

  • 【前端】接口日志追踪
  • 06.消息传递网络
  • 「日拱一码」023 机器学习——超参数优化
  • 判断当前是否为钉钉环境
  • 【Pandas】pandas DataFrame from_dict
  • 1.2.3_1 OSI参考模型
  • STM32F103C8T6单片机内部执行原理及启动流程详解
  • vue3实现pdf文件预览 - vue-pdf-embed
  • 力扣热门算法题 127.单词接龙,128.最长连续序列,130.被围绕的区域
  • MySQL数据库基础教程:从安装到数据操作
  • 快速合并多个CAD图形为单一PDF文档的方法
  • 常见 Docker 错误及解决方法
  • (vue)前端区分接口返回两种格式,一种Blob二进制字节流,一种常规JSON,且将blob响应转为json
  • 基于Catboost算法的茶叶数据分析及价格预测系统的设计与实现
  • 多元函数的切平面与线性近似:几何直观与计算方法
  • 高数附录(1)—常用平面图形
  • 《O-PAS™标准的安全方法》白皮书:为工业自动化系统筑起安全防线
  • msf复现永恒之蓝
  • 每日一SQL 【各赛事的用户注册率】
  • Datawhale AI 夏令营:基于带货视频评论的用户洞察挑战赛 Notebook(下篇)
  • 流媒体服务
  • SIMATIC S7-1200的以太网通信能力:协议与资源详细解析
  • x86架构CPU市场格局
  • WIFI协议全解析05:WiFi的安全机制:IoT设备如何实现安全连接?
  • PHP安全编程实践系列(三):安全会话管理与防护策略
  • 【运维】串口、网络一些基本信息
  • 【超详细】CentOS系统Docker安装与配置一键脚本(附镜像加速配置)
  • Pinia 笔记:Vue3 状态管理库
  • 双模秒切,体验跃迁!飞利浦EVNIA双模游戏显示器27M2N6801M王者降临!
  • UnrealEngine5游戏引擎实践(C++)