RabbitMQ消息中间件
基本架构
RabbitMQ是一个消息中间件,顾名思义,就像邮局一样,它负责接受消息,并发消息传递出去。
RabbitMQ有哪些部分? 生产者
和消费者
是 开发端
(Java程序,生产者是一个进程,发送消息,配置指定交换机和路由规则。消费者是另一个进程,配置指定队列,内部有监听线程,收到消息后回调方法被触发。两者可以属于同一个程序,也可以属于不同的微服务),交换机
和队列
是 RabbitMQ服务器
的部分,首先消息会由生产者发送到服务器的交换机,然后交换机会根据消息绑定的队列发送到具体队列,在消费者程序里,RabbitMQ 客户端库会起线程来监听和处理消息,如果有就取出来消费。
核心作用可以一句话概括:👉 它是一个消息中间件,用来解耦系统、削峰填谷、实现异步处理、保证消息可靠传递。
🔹具体作用
-
系统解耦
生产者只负责“发消息”,不关心谁来处理。
消费者只负责“处理消息”,不关心谁发的。
双方只通过 消息队列 交互,降低耦合。
✅ 举例:订单系统发“下单消息”,支付系统监听消息 → 二者独立开发部署。 -
异步处理
不需要等任务完成再返回,先把任务丢到队列里。
✅ 举例:用户下单 → 立即返回“下单成功” → 订单系统异步写日志/发短信。 -
削峰填谷
当高并发请求涌入时,消息先进队列,消费者按能力慢慢处理,避免系统被打垮。
✅ 举例:秒杀活动 → 10万请求 → 队列缓存 → 消费者每秒处理几千条。 -
可靠传递
RabbitMQ 支持消息持久化、ACK 确认机制、死信队列,确保消息“不丢、不重”。
✅ 举例:支付消息必须保证“至少一次送达”,防止丢单。 -
灵活的路由
支持 Direct、Topic、Fanout、Headers 等模式,能实现单播、广播、按规则路由等多种通信方式。
✅ 举例:日志系统 → Fanout 广播给“写文件日志队列”和“写数据库日志队列”。
系统解耦、异步处理、削峰填谷只是RabbitMQ作为邮局完成消息传递附带出作用,同时作为邮局,必须保证消息的正确传递,首先引出的就是 **可靠传递** 。
可靠传递
保证消息不丢失:生产者确认 + 持久化 + 消费者确认
生产者—》交换机–》队列–》消费者,在这条路上,任何两个之间都可能出现消息丢失。为了保证消息不丢失,引出了ACK确认机制,即当你收到信息后你就给对方回个信告诉他你收到了。
🔹1. 生产者确认:当“邮局-Broker”收到信息后,如果消息被成功路由并写入队列,Broker 就会回 ACK 给生产者;如果路由失败或存储失败,Broker 回 NACK。
🔹2. 消费者确认:Broker 把消息推送给消费者,如果消费者处理完业务逻辑后,调用 basicAck() → 告诉 Broker:“我处理完了”。Broker 收到这个 ACK 后,会把消息从队列里删除;如果消费者调用 basicNack() / basicReject(),Broker 会决定是否重投、丢弃或转发到死信队列。
📌两个ACK之间是独立的,举个例子:
生产者发消息 → Broker 收到,立即回 ACK 给生产者 ✅(说明消息安全进队列了)。
消费者可能一小时后才消费 → 消费完再 ACK → Broker 删除消息。
持久化
RabbitMQ的数据默认是存储在内存的,如果Broker突然挂掉,但是在它收到生产者消息的时候已经返回了ACK,这怎么办呢?那就需要对交换机、队列和消息都存到磁盘。
-
交换机持久化:保证交换机配置不丢。
-
队列持久化:保证队列配置不丢。
-
消息持久化:保证消息数据不丢。返回ACK和刷盘到磁盘是异步进行的,因此仍小概率丢失,可以通过事务或根据配置强制要求先刷盘在ACK,看业务要求了。
消息重复
发送的消息会丢失,返回的ACK同样也会丢失。如果没收到ACK,那就会重新发送消息,这样就会造成消息重复,只要在消费者那里解决就行。
方案:消息可能多次投递 → 数据库里建唯一约束
或用 Redis 做分布式锁(Setnx)
。
insert into payment(order_id, status) values(1001, 'paid')
on conflict(order_id) do nothing;
死信队列
无法消费的消息(拒绝 / 超时 / 队列满)会进入 DLQ。
案例:库存不足
消费者拿到“下单消息” → 扣减库存失败 → basicReject(requeue=false) → 消息进入死信队列 → 后台监控 & 补偿。
死信交换机和死信队列通常由消费者(或其背后的应用程序)来声明和创建
@Configuration
public class RabbitMQConfig {// 1. 定义业务交换机(通常由生产者使用,但声明可由消费方完成)@Beanpublic DirectExchange businessExchange() {return new DirectExchange("business_exchange");}// 2. 定义死信交换机 (DLX)@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange("dlx.exchange");}// 3. 定义死信队列 (DLQ)@Beanpublic Queue deadLetterQueue() {return new Queue("dlx.queue");}// 4. 将死信交换机和死信队列绑定@Beanpublic Binding bindDLQ() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("#"); // 用通配符路由键,接收所有死信}// 5. 定义业务队列,并设置其死信参数(这是最关键的一步!)@Beanpublic Queue businessQueue() {Map<String, Object> args = new HashMap<>();// 当消息变成死信后,将其发送到名为 "dlx.exchange" 的交换机args.put("x-dead-letter-exchange", "dlx.exchange");// 可选:指定死信的新路由键,不设置则使用原消息的路由键args.put("x-dead-letter-routing-key", "dead.letter");// 其他队列参数,如消息TTL、队列长度限制等return new Queue("business.queue", true, false, false, args);}// 6. 将业务队列绑定到业务交换机@Beanpublic Binding bindBusinessQueue() {return BindingBuilder.bind(businessQueue()).to(businessExchange()).with("order.create");}
}
延迟队列
延迟队列就是一种“会在指定时间之后才被消费”的消息队列。你可以把它想象成一个 “定时消息” 或 “未来消息” 的容器。消息发出去后,不会立即被消费者拿到,而是会在队列里“潜伏”一段时间,等到设定的延迟时间过后,才会变得可供消费。
- 订单系统:自动关闭未付款订单
- 场景:用户下单后30分钟内未付款,订单自动关闭。
- 实现:订单创建时,发一条延迟30分钟的消息。消费者在30分钟后收到这条消息,检查订单状态是否为未支付,如果是则执行关单逻辑。
- 智能家居
- 场景:出门后,延迟10分钟再关闭空调。
- 实现:手机APP发送一条延迟10分钟的“关空调”消息。消息到时后,被家里的智能网关消费并执行命令。
方式一:利用 死信队列(DLX) 实现:在消费者绑定死信队列,并根据消息进行逻辑处理。
方法二:abbitMQ 提供了一个官方插件 rabbitmq_delayed_message_exchange。启用后,可以定义一个特殊类型的交换机 x-delayed-message。
1、生产者将消息发送到延迟交换机,并在消息头(Header)中指定延迟时间
(如 x-delay: 300000表示延迟5分钟)。2、交换机收到消息后,不会立即将其路由到队列,而是将其保存在一个内部的“数据库”(Mnesia)中。3、交换机等待指定的延迟时间过后,才将消息推送到目标队列,从而被消费者消费。
使用插件感觉就不用写判断是垃圾消息还是故意延迟的逻辑了
消息堆积怎么办?
消息堆积了,无非就是消费者消费的太慢了。
- 立即增加消费组的实例数量。在云平台上,通常可以通过快速复制容器或虚拟机实例来实现。
- 在消费者内开启线程池。
- 在配置时将队列设置为惰性队列,该队列会基于磁盘存储消息,容量大但效率低。
集群模式
-
普通集群:分摊压力,但不保证高可用(像连锁店,但商品不备份,你到A商店,A商店没有,A从B取来给你)。
-
镜像队列集群:既分摊压力,又保证高可用(像连锁店,且每家店都有核心商品的备份),但依旧小概率还没有来得及备份就宕机。
-
仲裁队列:与镜像队列集群差不多,只不过基于Raft协议,保证主从强一致。