【仿RabbitMQ的发布订阅式消息队列】--- 概念理解
Welcome to 9ilk's Code World

(๑•́ ₃ •̀๑) 个人主页: 9ilk
(๑•́ ₃ •̀๑) 文章专栏: 项目
本篇博客主要是对消息队列模型的一个需求分析, 引出相关的模型概念。
核心概念
- 生产者(Producer)
- 消费者(Consumer)
- 中间人(Broker)
- 发布(Publish)
- 订阅(Subscribe)
- 一个生产者,一个消费者

- N个生产者,N个消费者

其中, Broker Server 是最核心的部分, 负责消息的存储和转发。而在 AMQP(Advanced Message Queuing Protocol-高级消息队列协议,一个提供统一 消息服务的应用层标准高级消息队列协议,是现代消息中间件的核心协议,广泛用于微服务架构和企业应用中,它使得遵从该规范的客户端应用和消息中间件服务器的全功能互操作成为可能)模型中,也就是消息中间件服务器 Broker 中,又存在以下概念:
-
虚拟机 (
VirtualHost): 类似于 MySQL 的 "database", 是一个逻辑上的集合。一个 BrokerServer 上可以存在多个 VirtualHost -
交换机 (
Exchange): 生产者把消息先发送到 Broker 的 Exchange 上,再根据不同 的规则, 把消息转发给不同的Queue -
队列 (
Queue) : 真正用来存储消息的部分,每个消费者决定自己从哪个Queue上 读取消息。 -
绑定 (
Binding) :Exchange和Queue之间的关联关系,Exchange和Queue可以 理解成 "多对多" 关系,使用一个关联表就可以把这两个概念联系起来。 -
消息 (
Message) : 传递的内容。
因此内部的主要工作流程:
1. 接收生产者消息
2. 查找交换机绑定关系
3. 应用路由规则
4. 分发到目标消息队列
5. 处理消息确认

上述数据结构,既需要在内存中存储,也需要在硬盘中存储
• 内存存储:方便使用
• 硬盘存储:重启数据不丢失
Exchange, Queue, Binding, Message 等数据都有持久化需求当程序重启 / 主机重启, 保证上述内容不丢失。
需要注意的是,Exchange和Queue可以理解为"多对多"的关系,即一个Exchange可以绑定多个 Queue (可以向多个 Queue 中转发消息) ,一个 Queue 也可以被多个 Exchange 绑定 (一个 Queue 中的消息可以来自于多个 Exchange)

核心接口
对于 Broker 来说, 要实现以下核心 API,通过这些 API 来实现消息队列的基本功能:
- 创建交换机 (exchangeDeclare)
- 销毁交换机 (exchangeDelete)
- 创建队列 (queueDeclare)
- 销毁队列 (queueDelete)
- 创建绑定 (queueBind)
- 解除绑定 (queueUnbind)
- 发布消息 (basicPublish)
- 订阅消息 (basicConsume)
- 确认消息 (basicAck)
- 取消订阅 (basicCancel)
另一方面,
Producer和Consumer则通过网络的方式, 远程调用这些 API, 实现生产者
消费者模型。
交换机类型
对于 RabbitMQ 来说, 主要支持四种交换机类型:
• Direct:生产者发送消息时, 直接指定被该交换机绑定的队列名。
• Fanout:生产者发送的消息会被复制到该交换机绑定的所有队列中。
• Topic:绑定队列到交换机上时, 指定一个字符串为 binding Key。发送消息指定一个字符串为 routing Key。当 routing Key和 binding Key满足一定的匹配条件的时候, 则把 消息投递到指定队列
• Header:比较复杂, 比较少见,因此本项目主要用的是前三种。
类比理解:QQ群发红包的例子
• Direct 是发一个专属红包, 只有指定的人能领,像对号入座,必须完全匹配。
• Fanout 是使用了魔法, 发一个 10 块钱红包, 群里的每个人都能领 10 块钱,像大喇叭广播,所有人都听。
•Topic 是发一个画图红包, 发 10 块钱红包, 同时出个题, 得画的像的人, 才能领。也是每个领到的人都能领 10 块钱,像智能标签,按规则分类。
网络通信
生产者和消费者都是客户端程序,Broker作为服务器,客户端和服务端之间通过网络通信。在网络通信的过程中, 客户端部分要提供对应的 api, 来实现对服务器的操作:
- 创建 Connection
- 关闭 Connection
- 创建 Channel
- 关闭 Channel
- 创建队列 (queueDeclare)
- 销毁队列 (queueDelete)
- 创建交换机 (exchangeDeclare)
- 销毁交换机 (exchangeDelete)
- 创建绑定 (queueBind)
- 解除绑定 (queueUnbind)
- 发布消息 (basicPublish)
- 订阅消息 (basicConsume)
- 确认消息 (basicAck)
- 取消订阅(basicCancel)
在Broker基础上,客户端还要增加Connection 操作和 Channel 操作
• Connection 对应一个 TCP 连接
• Channel 则是 Connection 中的逻辑通道
类比理解:Connection是"路",而Channel是路上的"车道",一条路上可以有多条车道并行通车。
为什么使用Channel:
- 提高性能:避免频繁建立/断开TCP连接
- 资源节约:多个操作共享同一个连接资源
- 并发安全:不同线程 使用不同Channel,避免竞争
消息应答
RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,RabbitMQ引入消息应答机制,消息应答就是 : 消费者在接收到消息并且处理该消息之后,告诉 RabbitMQ它已经处理了,RabbitMQ可以把该消息删除了。
消费者收到的每一条消息都必须进行确认。消息确认后,RabbitMQ才会从队列删除这条消息,RabbitMQ不会为未确认的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。
RabbitMQ中消息的应答,共有两种方式:
-
自动应答 : 消费者只要消费了消息 , 就算应答完毕了,
Broker直接删除这个消息。 -
手动应答 : 消费者手动调用应答接口,
Broker收到应答请求之后, 才真正删除这个消息。
