当前位置: 首页 > news >正文

rabbitMQ续谈

前章内容在“苍穹外卖优化-续”笔记中

交换机

种类

RabbitMQ 官方提供了 4 种原生交换机类型,外加 1 种“兜底”的备用类型,一共 5 种:

类型常量(AMQP 协议名)Spring 枚举类中文习惯叫法路由规则一句话典型场景
directDirectExchange直连交换机路由键 完全匹配 才转发单点精准投递(命令、通知)
topicTopicExchange主题交换机路由键 通配符匹配* 单层,# 多层)多类消息分组订阅(日志、监控)
fanoutFanoutExchange广播交换机忽略路由键,直接复制到所有绑定队列群发公告、配置刷新
headersHeadersExchange头交换机忽略 routingKey,按 消息头中的 KV 匹配复杂多条件匹配(较少用)
system 预留值系统交换机客户端不能声明,仅做内部回调几乎见不到

1. direct(直连)

  • 队列绑定到交换机时必须指定一个 精确的 routingKey

  • 消息只被转发到 routingKey 完全相同 的队列。

  • 最简单、最快速,默认交换机(名字为空字符串 "")就是 direct 型,路由键=队列名。

2. topic(主题)

  • 绑定队列时用 模式字符串 做 routingKey,支持 2 个通配符:
    * 匹配 一个单词(用 . 分隔)
    # 匹配 零个或多个单词

  • 可以实现“发布-订阅”的 多级分类过滤,例如 order.pay.successlog.system.error

3. fanout(广播)

  • 不处理 routingKey,只要队列绑定了这个交换机,就 每人一份

  • 场景:配置刷新、价格推送、广告广播。

4. headers(头)

  • 绑定队列时给一组 x-match=any/all 的 KV 条件;发消息时把条件写在 消息头 里。

  • 路由键完全忽略,适合 多属性组合过滤;但性能比前三种差,生产环境用得最少。

5. system(系统)

  • AMQP 协议保留值,客户端无法声明,可忽略


速记口诀

direct 点对点,topic 带通配,fanout 全广播,headers 看头信息。

在 Spring AMQP 里分别对应:
DirectExchangeTopicExchangeFanoutExchangeHeadersExchange,用 BindingBuilder 链式绑定即可。

        // new DirectExchange(交换机名, 是否持久化, 是否自动删除)return new DirectExchange(RabbitConstant.EXCHANGE_ORDER,true,false);

配置选项

durable

场景durable=truedurable=false
RabbitMQ 正常跑着交换机一直在,无差别交换机一直在,无差别
RabbitMQ 服务重启交换机会 自动重建绑定关系丢失(需应用重新声明绑定)交换机 消失,必须等应用重新声明交换机 + 绑定,否则生产者投递会报错 NOT_FOUND
控制台手动删除重启后仍恢复(除非手动删)重启后不存在

autoDelete 

1、当“没人用”时,是否自动把自己从 RabbitMQ 里删掉。

2、autoDelete 高于 durable;就算 durable=true,一旦触发“无人使用”条件,照样被删。

场景autoDelete=trueautoDelete=false
交换机最后一个队列解绑 → 交换机立即删除即使无队列绑定,仍保留
队列最后一个消费者断开连接 → 队列立即删除即使无消费者,仍保留

生产环境:一律写 false,防止重启后“找不到交换机/队列”而报错

临时/测试:可设 true,跑完自动清理,避免残留。

消息队列

创建

 Spring AMQP 两种 Builder

方式示例特点
QueueBuilder(推荐)QueueBuilder.durable("q").ttl(60_000).build()链式、可读性高、支持所有参数
直接 newnew Queue("q", true, false, false, args)需手动拼 Map,底层相同
// new 写法
Queue q = new Queue("risk.calc.queue",   // nametrue,                // durablefalse,               // exclusivefalse,               // autoDeletenull);               // arguments// Builder 写法(推荐)
Queue q = QueueBuilder.durable("risk.calc.queue")      // 等价 durable=true.build();

必背参数

参数类型作用常用值
nameString队列名业务语义,如 order.pay.queue
durableboolean重启后队列定义是否保留生产 true
exclusiveboolean仅当前连接可见,连接断即删几乎永远 false(除 RPC Reply-To)
autoDeleteboolean最后一个消费者断开后自动删生产 false
argumentsMap<String,Object>高级特性见第 4 节

常用 x-* 扩展参数

参数示例值效果
x-message-ttl60_000消息入队后 60 s 未消费 → 自动丢弃或进死信
x-max-length1_000队列最多 1 000 条,超限队头被丢弃或进死信
x-max-length-bytes10_485_760队列总字节上限(10 MB)
x-overflow"reject-publish" / "drop-head"达到上限后 拒绝新消息 或 删除老消息
x-dead-letter-exchange"dlx"死信交换机(DLX)
x-dead-letter-routing-key"dlq.routing"死信消息的路由键
x-single-active-consumertrue仅 一个消费者 能消费,故障自动切换
x-queue-type"quorum" / "classic"默认 classicquorum 为仲裁队列(高可用)
x-max-priority10开启 优先级队列,消息 0-10 级
x-queue-mode"lazy"消息直接落盘,百万级堆积场景 CPU 更平稳
x-queue-master-locator"min-masters"集群下选择 最少主节点 的机子建队

绑定

 是什么

  • "桥":把 ExchangeQueue路由规则 连接起来;

  • 消息能否到达队列,取决于有没有桥

  • 无绑定 → 消息直接丢弃零报错

参数

片段含义
.bind(queue)要投送的 队列 实例
.to(exchange)源 交换机 实例
.with(key)binding key(Direct/Topic)
.noargs()Fanout 专用,无路由键
.where(...).match()Headers 专用,KV 匹配

 核心 API

new BindingBuilder.bind(Queue/Exchange)   // 目标(队列或交换机).to(Exchange)           // 源交换机.with("routing.key")    // 路由键/模式/条件

4 种交换机对应写法

交换机类型路由规则代码示例
Direct完全匹配.with("email.sent")
Topic通配符.with("notify.*.push") 或 .with("order.#")
Fanout忽略键.noargs()(可省)
HeadersKV 头.where("sys", "push").match()

RabbitMQ 消费端“监听容器”

作用

  • MessageListenerContainer 是 Spring AMQP 的核心调度器

  • 负责:
    ① 建立/保持 TCP 连接(Connection)
    ② 创建 Channel
    ③ 向 Broker 发送 basic.consume
    ④ 收到消息后反序列化反射调注解方法
    ⑤ 处理 ACK、事务、并发、异常、重试

开启方式(二选一)

① 注解式(最常用)

@RabbitListener(queues = "order.queue", concurrency = "3-5")
public void handle(MessageEntity  messageEntity) { ... }
  • 一注解 ⇒ Spring 自动注册 SimpleMessageListenerContainer

  • concurrency:核心-最大线程数(单队列并发)

@RabbitListener 生效的前提
1. 必须成为 Spring Bean
  • @RabbitListenerRabbitListenerAnnotationBeanPostProcessor 扫描并注册监听容器

  • 仅对 Spring 容器管理的 Bean 生效
    → 所在类必须加:

@Component / @Service / @Configuration / @RestController ...

② 显式 Bean(高级定制)

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory cf,MessageListenerAdapter listener) {SimpleMessageListenerContainer c = new SimpleMessageListenerContainer(cf);c.setQueueNames("order.queue");c.setConcurrentConsumers(3);c.setMaxConcurrentConsumers(5);c.setDefaultRequeueRejected(false);   // 拒收不再重排队c.setAdviceChain(RetryInterceptorBuilder.stateless().maxAttempts(3).backOffOptions(1000, 2, 5000).build());return c;
}

RabbitMQ 生产者

作用

  • 负责 创建/发送 消息到 Exchange

  • 只关心 Exchange + RoutingKey不直接面对队列

  • 发送过程 = 序列化 → 设置属性 → 通道 basicPublish → Broker 路由 → 队列

三种 API 风格

风格示例场景
AmqpTemplatetemplate.convertAndSend("ex", "rk", obj)最早、最简
RabbitTemplate同上,功能最全推荐(底层就是 AmqpTemplate 实现)
StreamBridge (Spring Cloud Stream)bridge.send("output", obj)屏蔽 MQ 差异,云原生

日常开发:RabbitTemplate 即可,Boot 自动配置单例。

自动配置速用

yaml

spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /

注入

@Autowired
private RabbitTemplate rabbitTemplate;

零配置直接注入即可发消息。或者自定义RabbitTemplate

最简发送(JSON 默认)

rabbitTemplate.convertAndSend("order.exchange",          // exchange"order.created",           // routingKeynew MyMessage myMessage(...) // 任何 POJO
);

一张图流程

业务Service↓ convertAndSend
RabbitTemplate↓ Jackson2JsonMessageConverter
JSON 字节 + MessageProperties↓ Channel#basicPublish
Broker Exchange↓ 路由
Queue → 消费者

springboot中使用是yml文件中的配置相关内容

acknowledge-modemanual 手动签收业务可靠必开
acknowledge-mode: manual   # 关键

手动 ACK 三种操作

API语义场景
basicAck(deliveryTag, false)签收当前消息正常处理完
basicNack(deliveryTag, false, true)拒绝并重新入队瞬时异常可重试
basicNack(deliveryTag, false, false)拒绝不再入队重试耗尽 → 死信

 单条手动 ACK

@RabbitListener(queues = "order.queue", ackMode = "MANUAL")
public void handle(OrderEvent event, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) {try {businessService.process(event);channel.basicAck(tag, false);          // 成功签收} catch (BizException e) {// 业务规则失败,不再重试channel.basicNack(tag, false, false);  // 直接进死信} catch (Exception e) {// 系统异常 → 本地重试(Spring Retry 3 次)仍失败channel.basicNack(tag, false, false);  // 进死信}
}

2. 批量 ACK(高性能)

@RabbitListener(queues = "batch.queue", ackMode = "MANUAL")
public void handleBatch(List<Message> messages, Channel channel) {long lastTag = messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag();try {businessService.processBatch(messages);channel.basicAck(lastTag, true);   // 批量确认到 lastTag} catch (Exception e) {channel.basicNack(lastTag, true, false); // 整批进死信}
}

acknowledge-mode: manual
✅ 消费方法一定写 basicAck / basicNack
default-requeue-rejected: false(防死循环)
✅ 开启 死信交换机 承接 Nack 消息
prefetch=1 或按并发调整
✅ 记录 deliveryTag 原值传回
✅ 监控 Unacked 指标告警

“manual = 自己签快递
不 ack 就永远占地方,重试耗尽进死信,
监控 Unacked > 0 立刻排查!”

http://www.dtcms.com/a/445746.html

相关文章:

  • RabbitMQ概念 与 工作原理
  • 力扣每日一题(一)双指针 + 状态转移dp 矩阵快速幂
  • [ Redis ] 数据结构储存系统
  • 广东网站开发推荐山东住房城乡建设厅网站首页
  • [人工智能-综述-21]:学习人工智能的路径
  • 黄冈手机网站建设网站支付宝网上支付功能怎么做
  • Oracle OCP认证考试题目详解082系列第49题
  • HarmonyOS ArkTS深度解析:从语法特性到UI开发实践
  • Oracle OCP认证考试题目详解082系列第53题
  • 第十四篇:Python异步IO编程(asyncio)核心原理解析
  • RabbitMQ的核心组件有哪些?
  • Go语言:给AI开发装上高性能引擎
  • 中国五大网站建设公司外贸网站建设模板
  • 【Qt】多线程
  • 如何把qt + opencv的库按需要拷贝到开发板
  • 网络安全设备 防火墙
  • Java学习之旅第二季-6:static关键字与this关键字
  • 高校健康驿站建设指引妖精直播
  • 违规通知功能修改说明
  • SOFA 架构--01--简介
  • 家具网站首页模板郑州销售网站
  • 如何将Spring Boot 2接口改造为MCP服务,供大模型调用!
  • DC-DC电源芯片解读:RK860
  • 从零开始的C++学习生活 3:类和对象(中)
  • 做网站的技术员包装设计概念
  • 【深度学习02】TensorBoard 基础与 torchvision 图像变换工具详解(附代码演示)
  • k8s中Pod和Node的故事(1):过滤、打分、亲和性和拓扑分布
  • springboot自助甜品网站的设计与实现(代码+数据库+LW)
  • 网站建设业动态wordpress出现404
  • Vue3组件通信8大方式详解