Spring Boot 整合 RabbitMQ
Spring Boot 整合 RabbitMQ
一、概述:RabbitMQ 是什么?
你可以把 RabbitMQ 想象成一个「快递中转站」。
比如你在网上买了一本书,卖家(生产者)把包裹(消息)交给快递站(RabbitMQ),快递站根据包裹上的地址(规则)把包裹分给不同的快递员(消费者),最后送到你家(业务系统)。
RabbitMQ 是一个专门用来「传递消息」的软件(专业叫「消息中间件」),它能让不同的程序、不同的电脑之间高效地「传小纸条」。
二、RabbitMQ 的「快递分类方式」(交换机类型)
快递站分包裹时,可能按「地址」「重量」「紧急程度」分类。RabbitMQ 也有类似的「分类规则」,叫 交换机(Exchange)。常用的有 4 种:
1. 直连交换机(Direct Exchange)
规则:包裹上必须写「精确地址」(路由键 Routing Key),只有地址完全匹配的快递员才能收到。
例子:卖家给「北京-朝阳区」的包裹,只有负责朝阳区的快递员能接。
2. 扇形交换机(Fanout Exchange)
规则:不管地址,「所有快递员」都能收到包裹(广播模式)。
例子:卖家发「双11大促通知」,所有快递员都要知道,一起准备加班。
3. 主题交换机(Topic Exchange)
规则:地址可以用「通配符」(比如 *
代表一个词,#
代表多个词)。
例子:卖家发「北京.*」的包裹,所有地址以「北京」开头的快递员(如北京-朝阳、北京-海淀)都能收到。
4. 头交换机(Headers Exchange)
规则:不看地址,看包裹上的「标签」(Headers 头信息,比如「优先级=高」)。
例子:卖家标「紧急」的包裹,只有关注「紧急」标签的快递员能接。
三、RabbitMQ 的使用场景(为什么需要它?)
1. 异步处理:省时间!
比如你在淘宝下单,系统需要「扣库存+发短信+更新积分」。如果一步步做,可能要等 5 秒;用 RabbitMQ 可以把「发短信」和「更新积分」的任务丢给 RabbitMQ,主流程只需要 1 秒完成下单,剩下的由其他程序慢慢处理。
2. 流量削峰:防崩溃!
双11时,订单像洪水一样涌来,系统直接处理可能被冲垮。RabbitMQ 像「水库」,把订单暂时存起来,系统按自己的速度慢慢处理(比如每秒处理 1000 单),避免被瞬间的高流量冲垮。
3. 系统解耦:不互相拖累!
比如电商系统有「订单模块」「库存模块」「短信模块」。如果订单模块直接调用库存和短信模块,一旦短信模块崩溃,订单也会失败。用 RabbitMQ 后,订单模块只需要把消息发给 RabbitMQ,其他模块自己来取,互不影响。
四、整合Springboot
1. 配置 RabbitMQ 连接
1.Maven
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId>
</dependency>
2.配置文件,yml和properties选择一个
spring:rabbitmq:host: 117.185.165.187port: 5672username: rabbitmqpassword: j8iG3KYs7Wmxxx
# RabbitMQ 服务器地址(默认 localhost:5672)
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# 登录账号密码(默认 guest/guest,注意:远程连接需要改密码!)
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
2、定义「快递规则」:交换机和队列
RabbitMQ 的消息需要通过「交换机(Exchange)」和「队列(Queue)」传递。我们需要先告诉 Spring Boot 要创建哪些交换机和队列。
新建 RabbitMQConfig.java
,用 @Bean
声明交换机、队列和绑定关系。
做一个「电商下单后发通知」的功能,需要:
- 一个直连交换机(
order_exchange
)。 - 一个队列(
sms_queue
),专门存「需要发短信的订单」。 - 把队列和交换机绑定,路由键是
send_sms
(只有路由键匹配的消息才会进这个队列)。
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 1. 声明直连交换机(名字叫 order_exchange)@Beanpublic DirectExchange orderExchange() {return new DirectExchange("order_exchange");}// 2. 声明队列(名字叫 sms_queue,存需要发短信的订单)@Beanpublic Queue smsQueue() {return new Queue("sms_queue");}// 3. 把队列和交换机绑定,路由键是 send_sms(只有路由键匹配的消息才会进这个队列)@Beanpublic Binding smsBinding(Queue smsQueue, DirectExchange orderExchange) {return BindingBuilder.bind(smsQueue).to(orderExchange).with("send_sms"); // 路由键必须和生产者发送时一致}
}
如果说是多个队列按照下面的
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 1. 声明直连交换机(名字叫 order_exchange)@Beanpublic DirectExchange orderExchange() {return new DirectExchange("order_exchange");}// 2. 声明 3 个队列(短信、积分、日志)@Beanpublic Queue smsQueue() {return new Queue("sms_queue"); // 存需要发短信的订单}@Beanpublic Queue scoreQueue() {return new Queue("score_queue"); // 存需要更新积分的订单}@Beanpublic Queue logQueue() {return new Queue("log_queue"); // 存需要记录日志的订单}// 3. 绑定 sms_queue(路由键 send_sms)@Beanpublic Binding smsBinding(Queue smsQueue, DirectExchange orderExchange) {return BindingBuilder.bind(smsQueue).to(orderExchange).with("send_sms"); // 路由键:只有 send_sms 的消息会进 sms_queue}// 4. 绑定 score_queue(路由键 update_score)@Beanpublic Binding scoreBinding(Queue scoreQueue, DirectExchange orderExchange) {return BindingBuilder.bind(scoreQueue).to(orderExchange).with("update_score"); // 路由键:只有 update_score 的消息会进 score_queue}// 5. 绑定 logQueue(路由键 log_order)@Beanpublic Binding logBinding(Queue logQueue, DirectExchange orderExchange) {return BindingBuilder.bind(logQueue).to(orderExchange).with("log_order"); // 路由键:只有 log_order 的消息会进 log_queue}}
3、生产者:发送消息(卖家发包裹)
用 RabbitTemplate
(Spring 提供的发消息工具)发送消息到交换机。
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class OrderService {// 注入 RabbitTemplate(Spring 自动帮我们创建好的发消息工具)@Autowiredprivate RabbitTemplate rabbitTemplate;// 用户下单后,发送消息到 RabbitMQpublic void createOrder(String orderInfo) {// 1. 主流程:扣库存、保存订单(这里简化,直接打印)System.out.println("主流程:订单已保存,开始扣库存...");// 2. 异步任务:发送短信通知(把消息发给 RabbitMQ)rabbitTemplate.convertAndSend("order_exchange", // 交换机名字"send_sms", // 路由键(和队列绑定的路由键一致)orderInfo // 消息内容(比如订单详情));System.out.println("已发送短信通知任务到 RabbitMQ");}
}
4、消费者:接收消息(快递员收包裹)
用 @RabbitListener
注解监听队列,自动接收并处理消息。
新建消费者服务类
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SmsConsumer {// 监听 sms_queue 队列,有消息就自动触发这个方法@RabbitListener(queues = "sms_queue")public void sendSms(String orderInfo) {System.out.println("收到短信任务,正在发送...");// 这里调用短信接口(比如阿里云短信),实际代码需要替换System.out.println("已给用户发送短信:" + orderInfo);}
}
如果说是多线程处理就多添加一个配置concurrency = "5"
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SmsConsumer {// 监听 sms_queue 队列,有消息就自动触发这个方法@RabbitListener(queues = "sms_queue",concurrency = "5")public void sendSms(String orderInfo) {System.out.println("收到短信任务,正在发送...");// 这里调用短信接口(比如阿里云短信),实际代码需要替换System.out.println("已给用户发送短信:" + orderInfo);}
}
1、如何避免消息被重复处理?
如果你的场景是「多个消费者抢着处理同一条消息」(比如并行加速),需要确保 一条消息只被一个消费者处理。RabbitMQ 默认已经帮你实现了这一点!
2、原理:消息确认机制(ACK)
- 当消费者收到消息后,RabbitMQ 会等待消费者「确认」(ACK)。
- 如果消费者正常处理完消息并返回 ACK,RabbitMQ 会删除这条消息,不会再发给其他消费者。
- 如果消费者处理失败(比如崩溃),RabbitMQ 会重新将消息分发给其他消费者。
3、注意事项
1. 消息幂等性(防重复处理)
如果消费者处理消息时,因为网络问题导致 ACK 未成功返回,RabbitMQ 会重新发送消息,可能导致重复处理。
解决方法:
- 消息里加唯一标识(如订单号)。
- 处理前检查是否已处理过(比如查数据库)。
2. 消费者数量别太多!
concurrency
不是越大越好!如果消费者数量超过服务器 CPU 核心数,反而会因为线程切换浪费资源。
建议:根据业务耗时调整,比如处理耗时 1 秒的任务,消费者数量 = CPU 核心数 × 2 比较合理。
3. 手动确认消息(高级场景)
默认是自动 ACK(auto_ack=true
),但如果处理消息可能失败(比如调用外部接口超时),建议用手动 ACK。
@RabbitListener(queues = "order_queue", ackMode = "MANUAL") // 手动确认
public void processOrder(String orderInfo, Channel channel, Message message) {try {// 处理消息...channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 手动确认成功} catch (Exception e) {// 处理失败,重新入队(或发送到死信队列)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}
}
五、常见问题 & 注意事项
1. 消息丢失怎么办?
- 开启「消息持久化」:在声明队列和交换机时,设置
durable=true
(默认是true
,重启 RabbitMQ 后消息不丢失)。 - 生产者确认:配置
spring.rabbitmq.publisher-confirm-type=correlated
,确保消息成功发到交换机。 - 消费者确认:默认是
auto_ack=true
(自动确认),如果需要手动确认(比如处理消息时可能失败),可以设置@RabbitListener(ackMode = "MANUAL")
,处理完再调用channel.basicAck()
。
2. 重复消费怎么办?
- 消息里加唯一标识(如订单号),消费者处理前检查是否已处理过(比如查数据库)。
3. RabbitMQ 连不上?
- 检查
application.properties
里的host
、port
、username
、password
是否正确。 - 远程连接时,RabbitMQ 默认禁止
guest
用户,需要新建用户并授权(管理界面操作)。
六、总结
用 Spring Boot 整合 RabbitMQ 超简单!核心步骤就 4 步:
- 配连接:在
application.properties
里填 RabbitMQ 地址。 - 定义规则:用
@Bean
声明交换机、队列和绑定关系。 - 发消息:用
RabbitTemplate.convertAndSend()
发送。 - 收消息:用
@RabbitListener
监听队列。
适合用 Spring Boot + RabbitMQ 的场景:
- 电商、物流等需要「异步任务」的系统。
- 高并发场景(如双11订单洪峰)。
- 多个模块需要「松耦合」协作的系统(如订单、短信、积分模块)。