rabbitMQ续谈
前章内容在“苍穹外卖优化-续”笔记中
交换机
种类
RabbitMQ 官方提供了 4 种原生交换机类型,外加 1 种“兜底”的备用类型,一共 5 种:
类型常量(AMQP 协议名) | Spring 枚举类 | 中文习惯叫法 | 路由规则一句话 | 典型场景 |
---|---|---|---|---|
direct | DirectExchange | 直连交换机 | 路由键 完全匹配 才转发 | 单点精准投递(命令、通知) |
topic | TopicExchange | 主题交换机 | 路由键 通配符匹配(* 单层,# 多层) | 多类消息分组订阅(日志、监控) |
fanout | FanoutExchange | 广播交换机 | 忽略路由键,直接复制到所有绑定队列 | 群发公告、配置刷新 |
headers | HeadersExchange | 头交换机 | 忽略 routingKey,按 消息头中的 KV 匹配 | 复杂多条件匹配(较少用) |
system 预留值 | 无 | 系统交换机 | 客户端不能声明,仅做内部回调 | 几乎见不到 |
1. direct(直连)
队列绑定到交换机时必须指定一个 精确的 routingKey。
消息只被转发到 routingKey 完全相同 的队列。
最简单、最快速,默认交换机(名字为空字符串
""
)就是 direct 型,路由键=队列名。
2. topic(主题)
绑定队列时用 模式字符串 做 routingKey,支持 2 个通配符:
–*
匹配 一个单词(用.
分隔)
–#
匹配 零个或多个单词可以实现“发布-订阅”的 多级分类过滤,例如
order.pay.success
、log.system.error
。
3. fanout(广播)
不处理 routingKey,只要队列绑定了这个交换机,就 每人一份。
场景:配置刷新、价格推送、广告广播。
4. headers(头)
绑定队列时给一组
x-match=any/all
的 KV 条件;发消息时把条件写在 消息头 里。路由键完全忽略,适合 多属性组合过滤;但性能比前三种差,生产环境用得最少。
5. system(系统)
AMQP 协议保留值,客户端无法声明,可忽略。
速记口诀
direct 点对点,topic 带通配,fanout 全广播,headers 看头信息。
在 Spring AMQP 里分别对应:
DirectExchange
、TopicExchange
、FanoutExchange
、HeadersExchange
,用 BindingBuilder
链式绑定即可。
// new DirectExchange(交换机名, 是否持久化, 是否自动删除)return new DirectExchange(RabbitConstant.EXCHANGE_ORDER,true,false);
配置选项
durable
场景 | durable=true | durable=false |
---|---|---|
RabbitMQ 正常跑着 | 交换机一直在,无差别 | 交换机一直在,无差别 |
RabbitMQ 服务重启 | 交换机会 自动重建,绑定关系丢失(需应用重新声明绑定) | 交换机 消失,必须等应用重新声明交换机 + 绑定,否则生产者投递会报错 NOT_FOUND |
控制台手动删除 | 重启后仍恢复(除非手动删) | 重启后不存在 |
autoDelete
1、当“没人用”时,是否自动把自己从 RabbitMQ 里删掉。
2、autoDelete
高于 durable
;就算 durable=true
,一旦触发“无人使用”条件,照样被删。
场景 | autoDelete=true | autoDelete=false |
---|---|---|
交换机 | 最后一个队列解绑 → 交换机立即删除 | 即使无队列绑定,仍保留 |
队列 | 最后一个消费者断开连接 → 队列立即删除 | 即使无消费者,仍保留 |
生产环境:一律写 false
,防止重启后“找不到交换机/队列”而报错
临时/测试:可设 true
,跑完自动清理,避免残留。
消息队列
创建
Spring AMQP 两种 Builder
方式 | 示例 | 特点 |
---|---|---|
QueueBuilder(推荐) | QueueBuilder.durable("q").ttl(60_000).build() | 链式、可读性高、支持所有参数 |
直接 new | new Queue("q", true, false, false, args) | 需手动拼 Map,底层相同 |
// new 写法
Queue q = new Queue("risk.calc.queue", // nametrue, // durablefalse, // exclusivefalse, // autoDeletenull); // arguments// Builder 写法(推荐)
Queue q = QueueBuilder.durable("risk.calc.queue") // 等价 durable=true.build();
必背参数
参数 | 类型 | 作用 | 常用值 |
---|---|---|---|
name | String | 队列名 | 业务语义,如 order.pay.queue |
durable | boolean | 重启后队列定义是否保留 | 生产 true |
exclusive | boolean | 仅当前连接可见,连接断即删 | 几乎永远 false (除 RPC Reply-To) |
autoDelete | boolean | 最后一个消费者断开后自动删 | 生产 false |
arguments | Map<String,Object> | 高级特性 | 见第 4 节 |
常用 x-* 扩展参数
参数 | 示例值 | 效果 |
---|---|---|
x-message-ttl | 60_000 | 消息入队后 60 s 未消费 → 自动丢弃或进死信 |
x-max-length | 1_000 | 队列最多 1 000 条,超限队头被丢弃或进死信 |
x-max-length-bytes | 10_485_760 | 队列总字节上限(10 MB) |
x-overflow | "reject-publish" / "drop-head" | 达到上限后 拒绝新消息 或 删除老消息 |
x-dead-letter-exchange | "dlx" | 死信交换机(DLX) |
x-dead-letter-routing-key | "dlq.routing" | 死信消息的路由键 |
x-single-active-consumer | true | 仅 一个消费者 能消费,故障自动切换 |
x-queue-type | "quorum" / "classic" | 默认 classic ;quorum 为仲裁队列(高可用) |
x-max-priority | 10 | 开启 优先级队列,消息 0-10 级 |
x-queue-mode | "lazy" | 消息直接落盘,百万级堆积场景 CPU 更平稳 |
x-queue-master-locator | "min-masters" | 集群下选择 最少主节点 的机子建队 |
绑定
是什么
"桥":把 Exchange 和 Queue 按 路由规则 连接起来;
消息能否到达队列,取决于有没有桥;
无绑定 → 消息直接丢弃,零报错。
参数
片段 | 含义 |
---|---|
.bind(queue) | 要投送的 队列 实例 |
.to(exchange) | 源 交换机 实例 |
.with(key) | binding key(Direct/Topic) |
.noargs() | Fanout 专用,无路由键 |
.where(...).match() | Headers 专用,KV 匹配 |
核心 API
new BindingBuilder.bind(Queue/Exchange) // 目标(队列或交换机).to(Exchange) // 源交换机.with("routing.key") // 路由键/模式/条件
4 种交换机对应写法
交换机类型 | 路由规则 | 代码示例 |
---|---|---|
Direct | 完全匹配 | .with("email.sent") |
Topic | 通配符 | .with("notify.*.push") 或 .with("order.#") |
Fanout | 忽略键 | .noargs() (可省) |
Headers | KV 头 | .where("sys", "push").match() |
RabbitMQ 消费端“监听容器”
作用
MessageListenerContainer 是 Spring AMQP 的核心调度器
负责:
① 建立/保持 TCP 连接(Connection)
② 创建 Channel
③ 向 Broker 发送basic.consume
④ 收到消息后反序列化 → 反射调注解方法
⑤ 处理 ACK、事务、并发、异常、重试
开启方式(二选一)
① 注解式(最常用)
@RabbitListener(queues = "order.queue", concurrency = "3-5")
public void handle(MessageEntity messageEntity) { ... }
一注解 ⇒ Spring 自动注册 SimpleMessageListenerContainer
concurrency
:核心-最大线程数(单队列并发)
@RabbitListener 生效的前提
1. 必须成为 Spring Bean
@RabbitListener
由RabbitListenerAnnotationBeanPostProcessor
扫描并注册监听容器仅对 Spring 容器管理的 Bean 生效
→ 所在类必须加:
@Component / @Service / @Configuration / @RestController ...
② 显式 Bean(高级定制)
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory cf,MessageListenerAdapter listener) {SimpleMessageListenerContainer c = new SimpleMessageListenerContainer(cf);c.setQueueNames("order.queue");c.setConcurrentConsumers(3);c.setMaxConcurrentConsumers(5);c.setDefaultRequeueRejected(false); // 拒收不再重排队c.setAdviceChain(RetryInterceptorBuilder.stateless().maxAttempts(3).backOffOptions(1000, 2, 5000).build());return c;
}
RabbitMQ 生产者
作用
负责 创建/发送 消息到 Exchange;
只关心 Exchange + RoutingKey,不直接面对队列;
发送过程 = 序列化 → 设置属性 → 通道 basicPublish → Broker 路由 → 队列。
三种 API 风格
风格 | 示例 | 场景 |
---|---|---|
AmqpTemplate | template.convertAndSend("ex", "rk", obj) | 最早、最简 |
RabbitTemplate | 同上,功能最全 | 推荐(底层就是 AmqpTemplate 实现) |
StreamBridge (Spring Cloud Stream) | bridge.send("output", obj) | 屏蔽 MQ 差异,云原生 |
日常开发:RabbitTemplate 即可,Boot 自动配置单例。
自动配置速用
yaml
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /
注入
@Autowired
private RabbitTemplate rabbitTemplate;
零配置直接注入即可发消息。或者自定义RabbitTemplate
最简发送(JSON 默认)
rabbitTemplate.convertAndSend("order.exchange", // exchange"order.created", // routingKeynew MyMessage myMessage(...) // 任何 POJO
);
一张图流程
业务Service↓ convertAndSend
RabbitTemplate↓ Jackson2JsonMessageConverter
JSON 字节 + MessageProperties↓ Channel#basicPublish
Broker Exchange↓ 路由
Queue → 消费者
springboot中使用是yml文件中的配置相关内容
acknowledge-mode | manual 手动签收 | 业务可靠必开 |
acknowledge-mode: manual # 关键
手动 ACK 三种操作
API | 语义 | 场景 |
---|---|---|
basicAck(deliveryTag, false) | 签收当前消息 | 正常处理完 |
basicNack(deliveryTag, false, true) | 拒绝并重新入队 | 瞬时异常可重试 |
basicNack(deliveryTag, false, false) | 拒绝不再入队 | 重试耗尽 → 死信 |
单条手动 ACK
@RabbitListener(queues = "order.queue", ackMode = "MANUAL")
public void handle(OrderEvent event, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) {try {businessService.process(event);channel.basicAck(tag, false); // 成功签收} catch (BizException e) {// 业务规则失败,不再重试channel.basicNack(tag, false, false); // 直接进死信} catch (Exception e) {// 系统异常 → 本地重试(Spring Retry 3 次)仍失败channel.basicNack(tag, false, false); // 进死信}
}
2. 批量 ACK(高性能)
@RabbitListener(queues = "batch.queue", ackMode = "MANUAL")
public void handleBatch(List<Message> messages, Channel channel) {long lastTag = messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag();try {businessService.processBatch(messages);channel.basicAck(lastTag, true); // 批量确认到 lastTag} catch (Exception e) {channel.basicNack(lastTag, true, false); // 整批进死信}
}
✅ acknowledge-mode: manual
✅ 消费方法一定写 basicAck
/ basicNack
✅ default-requeue-rejected: false
(防死循环)
✅ 开启 死信交换机 承接 Nack 消息
✅ prefetch=1
或按并发调整
✅ 记录 deliveryTag 原值传回
✅ 监控 Unacked 指标告警
“manual = 自己签快递
不 ack 就永远占地方,重试耗尽进死信,
监控 Unacked > 0 立刻排查!”