当前位置: 首页 > news >正文

深入学习RabbitMQ队列的知识

目录

1、AMQP协议

1.1、介绍

1.2、AMQP的特点

1.3、工作流程

1.4、消息模型

1.5、消息结构

1.6、AMQP 的交换器类型

2、RabbitMQ结构介绍

2.1、核心组件

2.2、最大特点

2.3、工作原理

3、消息可靠性保障

3.1、生产端可靠性

1、生产者确认机制

2、持久化消息

3.2、Broker 端可靠性

1、镜像队列

2、RabbitMQ 集群

3.3. 消费端可靠性

1、手动确认

2、死信队列

4、处理重复消费问题

4.1. 消息幂等性

4.2. 事务机制

4.3. 消息去重中间件


前言

        RabbitMQ 是一个开源的 消息队列中间件,基于 AMQP 1.0 协议 实现,支持多种语言客户端(如 Java、Python、Go、Node.js 等)。

        它通过异步通信机制解耦系统组件,常用于分布式系统、微服务架构、任务队列等场景。

更多队列知识,可参考:MQ消息队列的深入研究-CSDN博客


1、AMQP协议

1.1、介绍

        AMQPAdvanced Message Queuing Protocol)是一种高级消息队列协议,用于在分布式系统中实现异步通信消息传递

        它定义了消息的格式、传输规则和路由机制,允许不同系统、语言或平台之间通过统一的标准进行消息交互。

1.2、AMQP的特点

如下图所示:

1、跨平台互操作性

        不同的消息中间件(如 RabbitMQ、ActiveMQ、Qpid)可以基于 AMQP 协议互相通信。

2、可靠的消息传递

        支持消息持久化、确认机制(ACK)、重试等,确保消息不丢失。

3、灵活的路由机制

        通过交换器(Exchange)和绑定键(Binding Key)实现复杂的消息路由规则。

1.3、工作流程

工作流程如下所示:

Producer → [消息] → Exchange → [根据绑定规则] → Queue → [消息] → Consumer

  1. 生产者 将消息发送到 交换器
  2. 交换器 根据绑定规则将消息路由到对应的 队列
  3. 消费者 从 队列 中拉取消息进行处理。

1.4、消息模型

AMQP 使用 发布-订阅模型 和 点对点模型 的混合方式。

         RabbitMQ 的消息传递机制支持复杂的路由逻辑。用户可以根据需要定义交换机(Exchange)和队列(Queue),然后制定路由规则,实现灵活的消息传递。

如下图所示:

核心组件包括:

  • 生产者(Producer):发送消息的客户端。
  • 消费者(Consumer):接收并处理消息的客户端。
  • 消息代理(Broker):中间件(如 RabbitMQ),负责存储、路由和转发消息。
  • 交换器(Exchange):接收生产者的消息,并根据规则将其路由到队列。
  • 队列(Queue):存储消息的容器,供消费者消费。
  • 绑定(Binding):定义交换器与队列之间的关联规则(如路由键)。

1.5、消息结构

一条 AMQP 消息包含以下部分:

  • 消息头(Header):元数据(如内容类型、编码、优先级)。
  • 属性(Properties):可选的附加信息(如消息ID、时间戳)。
  • 消息体(Body):实际的数据内容(如 JSON、XML、二进制)。

1.6、AMQP 的交换器类型

如下图所示:

总结

        AMQP 是一种标准化的分布式消息协议,解决了不同系统间消息传递的兼容性和可靠性问题。适用于微服务、物联网、任务队列等需要异步通信的场景。

与其他协议相比较:


2、RabbitMQ结构介绍

如下图所示:

2.1、核心组件

1.Producer(生产者):

        生成并发送消息到 RabbitMQ 的组件。生产者发布消息到交换机,而不是直接发到队列。

2.Exchange(交换机):

        负责接收生产者发送的消息,并根据一定的路由规则将消息路由到一个或多个队列中。交换机类型决定了消息路由策略(如 Direct、Fanout、Topic、Headers)。

3.Queue(队列):

        消息在 RabbitMQ 中存储的地方。消息被路由到一个或多个队列后,消费者可以从队列中获取消息进行处理。

4.Consumer(消费者):

        从队列中接收并处理消息的组件。消费者可以对消息进行确认(ack),表明消息已被处理。

5.Bindings(绑定):

        定义交换机和队列之间的关系,通过绑定键来定义消息的路由规则。

6.Virtual Hosts(虚拟主机):

        提供逻辑隔离,使得多个应用可以安全地使用相同的 RabbitMQ 实例。

2.2、最大特点

        灵活的消息路由:支持多种消息传递模式,如直连主题发布-订阅等,具有强大的消息路由能力。

2.3、工作原理

工作原理如下所示:

1.消息发布:

        生产者将消息发送到指定的交换机,提供一个路由键以指出如何路由消息。

2.消息路由:

        交换机根据绑定键,将消息分发到一个或多个与其绑定的队列中。如果没有找到匹配的队列,消息可能被丢弃。

3.消息消费:

        消费者从队列中拉取消息进行处理。消费者可以通过自动或手动消息确认机制来确保消息已成功处理。

4.消息确认和重发:

        当消费者确认接收到消息后,RabbitMQ 从队列中删除消息。未确认的消息可以重发给其他消费者。


3、消息可靠性保障

消息从生产到消费的过程如下图所示:

防止消息丢失,那么rabbitmq如何保证消息不丢失?

3.1、生产端可靠性

1、生产者确认机制

Publisher Confirm

作用:确保消息成功发送到 RabbitMQ 服务器。

步骤:生产者发送消息后,RabbitMQ 返回 ack 确认。如果未收到 ack,生产者可重发消息。

示例:

channel.confirmSelect(); // 开启确认模式
channel.addConfirmListener((sequenceNumber, multiple) -> {System.out.println("消息确认成功");
}, (sequenceNumber, multiple) -> {System.out.println("消息确认失败");
});

2、持久化消息

Persistent Messages

作用:防止 RabbitMQ 重启后消息丢失。

配置

        消息持久化:将消息标记为 persistent

        队列持久化:声明队列时设置 durable = true

// 声明持久化队列
channel.queueDeclare("my_queue", true, false, false, null);// 发送持久化消息
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2) // 2 表示持久化.build();
channel.basicPublish("", "my_queue", props, message.getBytes());

3.2、Broker 端可靠性

1、镜像队列

Mirrored Queues

  • 作用:通过集群复制队列数据,防止节点故障导致消息丢失。
  • 配置
rabbitmqctl set_policy ha-all "^my_queue$" '{"ha-mode":"all"}'

2、RabbitMQ 集群

  • 高可用架构:多节点部署,自动故障转移。
  • 数据同步:磁盘节点持久化元数据,内存节点提升性能。

3.3. 消费端可靠性

1、手动确认

Manual Acknowledgement

作用:确保消息被正确处理后再确认,避免消息丢失。

流程:消费者拉取消息(basicConsume)。处理消息后,手动发送 ack

示例:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");try {// 处理消息processMessage(message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 发送 ACK} catch (Exception e) {// 处理异常,可选择拒绝消息(nack)并重试channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);}
};
channel.basicConsume("my_queue", false, deliverCallback, consumerTag -> {});

2、死信队列

1、条件

如下图所示:

Dead Letter Queue, DLQ:是 RabbitMQ 中用于处理无法正常消费的消息的机制。

        当消息被拒绝(Reject/Nack)、过期(TTL)、或队列达到最大长度时,这些消息会被自动转移到死信队列中,供后续分析或重试处理。

作用:处理多次重试仍失败的消息。

配置

示例:

Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange"); // 指定死信交换器
channel.queueDeclare("my_queue", true, false, false, args);

完整的配置如下:

// 声明主队列,并指定死信交换器和路由键
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange");
args.put("x-dead-letter-routing-key", "dlx_key"); // 可选:指定死信消息的路由键
channel.queueDeclare("my_queue", true, false, false, args);// 声明死信交换器(通常为 Fanout 或 Direct 类型)
channel.exchangeDeclare("dlx_exchange", "direct");// 声明死信队列,并绑定到死信交换器
channel.queueDeclare("dead_letter_queue", true, false, false, null);
channel.queueBind("dead_letter_queue", "dlx_exchange", "dlx_key");

2、死信消息的流转流程

  1. 主队列(my_queue)

    • 消息正常入队。
    • 如果触发死信条件(如被拒绝),消息会被标记为死信。
  2. 死信交换器(dlx_exchange)

    • 接收死信消息。
    • 根据绑定规则将消息路由到死信队列(dead_letter_queue)。
  3. 死信队列(dead_letter_queue)

    • 存储死信消息。
    • 可由专门的消费者处理(如日志记录、重试、人工干预)。

4、处理重复消费问题

4.1. 消息幂等性

  • 核心思想:确保同一条消息被多次消费时结果一致。
  • 实现方式
    • 唯一标识符(UUID):为每条消息生成唯一 ID。
    • 数据库去重表:记录已处理的消息 ID。
CREATE TABLE message_log (message_id VARCHAR(36) PRIMARY KEY,processed_time TIMESTAMP
);
public void processMessage(String messageId, String data) {if (isAlreadyProcessed(messageId)) {return; // 已处理过,直接返回}// 处理业务逻辑saveToDatabase(messageId);
}

4.2. 事务机制

作用:将消息处理与数据库操作绑定为原子操作。

限制:性能较低,适合对一致性要求极高的场景。

channel.txSelect(); // 开启事务
try {channel.basicPublish(...);saveToDatabase(...);channel.txCommit(); // 提交事务
} catch (Exception e) {channel.txRollback(); // 回滚
}

4.3. 消息去重中间件

Redis 缓存:利用 Redis 的原子操作(如 SETNX)判断消息是否已处理。

String key = "msg:" + messageId;
if (jedis.setnx(key, "1") == 1) {jedis.expire(key, 60); // 设置过期时间// 处理消息
} else {return; // 已处理过
}

整体消息消费处理流程如下:

Producer → [持久化消息 + 确认机制] → RabbitMQ Broker 
           ↓
Consumer ← [手动 ACK + 幂等性校验] ← 队列
 

小结:


总结

        RabbitMQ 通过 生产端确认、消息持久化、手动 ACK、镜像队列 等机制保障消息不丢失,通过 幂等性设计、唯一 ID 校验、Redis 缓存 等手段防止重复消费。

        在实际应用中,需根据业务需求权衡性能与可靠性,合理配置 RabbitMQ 和消费逻辑。


参考文章:

1、MQ消息队列的深入研究-CSDN博客

相关文章:

  • 第11期_网站搭建_极简云 单码网络验证修复版本 虚拟主机搭建笔记
  • CLIP多模态大模型的优势及其在边缘计算中的应用
  • Day13
  • 热门消息中间件汇总
  • 八、Python模块、包
  • 第四十五天打卡
  • PublishSubject、ReplaySubject、BehaviorSubject、AsyncSubject的区别
  • React 第五十四节 Router中useRevalidator的使用详解及案例分析
  • AI智能推荐实战之RunnableParallel并行链
  • Haproxy的基础配置
  • AI问答-vue3+ts+vite:http://www.abc.com:3022/m-abc-pc/#/snow 这样的项目 在服务器怎么部署
  • vue+element-ui一个页面有多个子组件组成。子组件里面有各种表单,实现点击enter实现跳转到下一个表单元素的功能。
  • Excel-vlookup -多条件匹配,返回指定列处的值
  • sanitizer工具
  • 三表查询SQL怎么写?----小白初学+案例引入
  • Compose Multiplatform 实现自定义的系统托盘,解决托盘乱码问题
  • [Java 基础]数组
  • 世事无常,比较复杂,人可以简单一点
  • 钢轨滚动疲劳试验机
  • Spring框架知识体系全面总结
  • 荔湾区做网站公司/广州外包网络推广公司
  • 西安网站建设价格/营销型网站更受用户欢迎的原因是
  • 设计师去哪个网站找工作/百度公司高管排名
  • 多语言企业网站开发/深圳网站优化软件
  • 专门做asmr的网站/2345网址导航用户中心
  • 描述建设一个网站的具体步骤/东莞网站推广排名