【仿RabbitMQ的发布订阅式消息队列】--- 模块设计与划分
Welcome to 9ilk's Code World

(๑•́ ₃ •̀๑) 个人主页: 9ilk
(๑•́ ₃ •̀๑) 文章专栏: 项目
从之前对消息队列工作模型的分析,我们的项目总体分为三个部分实现:
1. 服务端
2. 发布客户端
3. 订阅客户端
主要的模块划分图如下:

服务端模块
数据管理模块
数据管理模块主要完成的工作是管理交换机、队列、队列绑定、消息数据的管理,分别对他们实现数据的管理(增删查)以及持久化的存储。因此数据管理模块主要是以下四个小模块的整合:
- a. 交换机数据管理模块
- b. 队列数据管理模块
- c. 绑定数据管理模块
- d. 消息数据管理模块
交换机数据管理
管理的数据:
-
交换机名称 : 作为交换机的唯一标识。
-
交换机类型 : 决定了消息的转发方式。每个队列绑定中有个
binding_key每条消息中有个routing_key。
直接交换 ---
binding_key与routing_key相同,则将消息放入队列。广播交换 ---
将消息放入交换机绑定的所有队列。主题交换 ---
routing_key与多个绑定队列的binding_key有多个匹配规则,匹配成功则放入
-
持久化标志:决定了当前交换机信息是否需要持久化存储,也就决定该交换机重启后数据是否还存在。
-
自动删除标志:指的是关联了当前交换机的所有客户端退出了,是否要自动删除交换机。(扩展用)
-
其他参数:可以用来后续扩展。
对交换机的管理操作:
-
创建交换机:本质上需要的是声明 ---- 强断言的思想,有就OK,没有则创建。
-
删除交换机:注意每个交换机都会绑定一个或多个队列(意味着会有一个或多个绑定信息) ,删除交换机需要删除需要删除相关的绑定信息。
-
获取指定名称交换机。
-
获取当前交换机数量。
队列数据管理
管理的数据:
-
队列名称 : 作为队列的唯一标识。
-
持久化存储标志 : 决定了是否将队列信息持久化存储起来 , 决定了重启后 , 这个队列是否还存在 , 也决定了队列中的消息是否需要持久化。
注意:一个队列如果持久化标志为false,意味着重启后队列就没了,没有客户端能订阅该队列的消息,此时这个队列的消息如果持久化是没有意义的,队列没了,也没人能把消息拿走,因此通常一个队列持久化标志为false,那么它的消息也不需要持久化。
-
是否独占标志:独占指的是只有当前客户端自己能够去订阅队列消息,不允许其他客户端订阅该队列消息。
-
自动删除标志:当订阅了当前队列的所有客户端退出后,是否删除队列(扩展)。
-
其他参数
对队列的管理操作:
-
创建队列
-
删除队列
-
获取指定队列信息
-
获取队列数量
-
获取所有队列名称:当系统重启后,需要重新加载数据,加载历史消息(消息以队列为单元存储在文件中,当系统重启后,我们需要知道有哪些队列,才知道加载哪些消息)
绑定数据管理
该模块主要描述的是哪个队列和哪个交换机产生关联。
管理的数据:
-
交换机名称
-
队列名称
-
binding_key:绑定密钥 --- 描述了在交换机的主题交换&直接交换的消息发布匹配规则。由数字,字符,_,#,.,*组成。比如:news.music.#
对绑定的管理操作:
-
添加绑定
-
解除绑定
-
获取交换机相关的所有绑定信息(删除交换机的时候,要删除相关绑定信息;当消息发布到交换机,交换机得通过这些信息来将消息发布到指定队列)
-
获取队列相关的所有绑定信息(删除队列的时候,也要删除相关的绑定信息)
-
获取绑定信息数量
消息数据管理
1. 消息信息
(1) 消息属性:
- 消息ID --- 消息唯一标识
- 持久化标志:表示是否对消息进行持久化,实际上还取决于队列的持久化标志
-
routing_key:决定了当前消息要发布的队列(消息发布到交换机后,根据绑定队列的binding_key决定是否发布到指定队列)
(2)消息主体:消息内容
(3)服务端用于管理需添加的信息:
- 消息偏移量:消息以队列为单元存储在文件中,这个偏移量指的是当前消息相对于文件起始位置的偏移量。
- 消息长度:从偏移量位置取出指定长度的消息(解决粘包问题)
- 是否有效标志:标识当前消息是否已经被删除。
注意:删除一条消息并不会每次直接将后边的数据直接拷贝到前面,而只是重置标志。我们这里规定,当一个文件中,有效消息占据总消息比例不到50%且数据量超过2000,则进行垃圾回收,重新整理文件数据存储。当系统重启,也只需要重新加载有效消息即可,相当于进行了一次垃圾回收。
2. 消息的管理
管理方式:以队列为单元进行管理(因为消息的所有操作都是以队列为单元)。
管理的数据:
- 待推送消息链表 ---- 保存所有的待推送消息
- 待确认消息
hash:消息推送给客户端,会等待客户端进行消息确认,收到确认后,才会真正删除消息。 -
持久化消息
hash:假设消息都会进行持久化存储 , 操作过程中会存在垃圾回收操作(其实就是将有效消息读取出来,然后重新截断文件,将消息连续写入文件中,操作完文件中都是有效消息),但是垃圾回收会改变消息的存储位置,但是内存中的消息也会存储消息的实际存储位置,垃圾回收后就不一致了,因此每次垃圾回收后要用新的位置去更新持久化消息的信息。 -
持久化的有效消息数量
-
持久化的总消息数量:和持久化的有效消息数决定了什么时候进行垃圾回收。
3. 对每个队列提供的消息管理操作
- 向队列新增消息
-
获取队首消息:获取消息后就会将消息从待推送消息链表删除 ,因为它不再是待发送消息而是待确认消息,应该加入到待确认消息中hash。
-
确认消息:从待确认消息hash中移除消息,并进行持久化数据的删除。
-
恢复队列历史消息:主要是在构造函数中进行,因为只有在重启的时候才会进行。
-
垃圾回收:在消息持久化子模块完成。持久化文件中有效消息比例小于50%,且总消息数量超过2000进行垃圾回收。
-
删除队列相关消息文件:当一个队列被删除那它的消息也就没有存在的意义了。
4. 对外提供的队列消息管理操作
-
初始化队列消息结构
-
移除队列消息结构:在一个队列创建/删除的时候调用。
-
向队列新增消息。
-
对队列消息进行确认。
-
恢复队列历史消息
虚拟机数据管理
虚拟机数据管理模块其实是对前面几个数据管理模块(交换机数据管理,队列数据管理,绑定数据管理,消息数据管理)的整合。
1. 要管理的数据
-
交换机数据管理句柄
-
队列数据管理句柄
-
绑定信息管理句柄
-
消息数据管理句柄
2. 要管理的操作
-
声明/删除交换机:注意在删除交换机的时候要删除相关的绑定信息
-
声明/删除队列:注意在删除队列的时候,要删除相关的绑定信息以及消息数据。
-
队列的绑定/解除绑定:注意绑定的时候必须交换机和队列是存在的。
-
获取指定队列的消息
-
对指定队列的指定消息进行确认
-
获取交换机相关的所有绑定信息:一条消息要发布给指定交换机的时候,交换机获取所有绑定信息,来确定消息要发布到哪些队列。
路由匹配模块
该模块决定了一条消息是否能够发布到指定队列。在每个队列跟交换机的绑定信息中,都有一搜个binding_key,表示队列发布的匹配规则;同时在每条要发布的消息中都有一个routing_key,表示消息的发布规则。
交换机我们这里设置三种类型:
- 广播:直接将消息发布给该交换机的所有绑定队列。
- 直接:
routing_key与binding_key完全一致匹配成功。 - 主题:
binding_key中是匹配规则,routing_key是消息规则,匹配成功才能发布。
路由匹配模块本质上没有需要管理的数据,但是它需要对外提供路由匹配操作:
- 提供一个判断
routing_key与binding_key是否能够匹配成功的接口。 - 判断
routing_key是否符合规定:格式约定为只能由数字,字母,_.构成。 -
判断
binding_key是否符合规定:只能由数字,字母,_#.*构成。
消费者管理模块
客户端有两种,一个是消息发布客户端,一个是订阅消息客户端,对于订阅指定队列消息的客户端才是消费者。消费者数据的存在是必要的,因为当指定队列有了消息,就需要将消息推送给消费者客户端(比如推送的时候就需要找到这个客户端相关的信道)
消费者信息:
- 消费者标识
- 订阅队列名称:当当前队列有消息就会推送给这个客户端以及当客户端收到消息,需要对指定队列的消息进行确认。
- 自动确认标志:自动确认 --- 推送消息后直接删除消息不需要额外确认 ;手动确认 --- 推送消息后需要等待收到确认回复再去删除消息。
- 消费处理回调函数指针:队列有一条消息后通过哪个函数进行处理(函数内部其实逻辑固定,即向指定客户端推送消息)
消费者管理:管理思想是以队列为单元进行管理,因为每个消费者订阅的都是指定队列的消息,并且消费者对消息进行确认也是以队列进行确认。关键的是,当指定队列中有消息,必然是需要获取订阅了这个队列的消费者信息来进行消息推送。
队列消费者管理结构的数据信息:
- 消费者链表 : 保存当前队列的所有消费者信息
不同消息队列会采取不同的消费模式:
(1)队列模式(比如RabbitMQ)是一条消息只能被一个消费者消费,以RR轮转的方式;
(2)发布/订阅者模式(Kafka):一条消息可以被多个消费者同时消费,每个订阅者都会收到消息的副本。
应用场景:
(1)任务处理:只需要一个消费者执行。
(2)事件通知:需要多个消费者接收。
队列消费者管理操作:
1. 新增消费者
2. RR轮转获取一个消费者
3.删除消费者
4.队列消费者数量
5. 队列消费者是否为空
整体的队列消费者管理操作:
- 初始化队列消费者结构
- 删除队列消费者结构
- 向指定队列添加消费者
- 获取指定队列消费者
- 删除指定队列消费者
信道管理模块
-
信道是网络通信中的一个概念,叫做通信通道。网络通信的时候必然是通过网络通信连接完成的,为了能够更加充分的利用资源,因此对通信连接又进行更进一步的细化,细化出了通信通道。
-
对于用户来说,一个通信信道,就是进行网络通信的载体,而一个真正的通信连接可以创建出多个通信通道,每一个信道之间,在用户的眼中是相互独立的,而本质它们底层使用同一个通信连接进行网络通信。因此因为信道是用户眼中的一个通信通道,所以所有的网络通信服务都是由信道提供的。
信道提供的操作(服务):
- 声明 / 删除交换机
- 声明 / 删除队列
- 绑定 / 解绑队列与交换机
- 发布消息 / 订阅队列消息 / 队列消息确认 / 取消队列订阅
信道要管理的数据:
- 信道关联的虚拟机句柄
- 信道关联的消费者句柄:当信道关闭的时候,所有关联的消费者订阅都要取消,相当于删除所有的相关消费者。
- 工作线程池句柄:信道将消息发布到指定队列操作之后,从指定队列获取一个消费者,对这条消息进行消费,这个工作不能在当前线程,否则会阻塞。(也就是将这条消息推送给一个客户端的操作交给线程池执行,并非每个信道都有一个线程池,而是整个服务器有一个线程池,大家所有的信道都是通过同一个线程池进行异步操作而已)
- 信道ID
信道整体管理操作:
- 创建一个信道
- 关闭一个信道
- 获取指定信道句柄
连接管理模块
在网络通信模块中,我们使用muduo库实现底层通信,而muduo库中本身就有一个Connection连接的概念和对象类。但是我们的连接中还有一个上层通信信道的概念,这个概念在muduo库中是没有的。因此我们需要在用户的层面对muduo库中的connection连接进行二次封装,形成我们自己所需的连接管理。
管理的数据:
- muduo库的通信连接
- 当前连接关联的信道管理句柄
提供的操作:
- 创建信道
- 关闭信道
连接整体管理操作:
- 新增连接
- 关闭连接
- 获取指定连接信息。
Broker服务器模块
该模块主要是整合前面所有的模块,同时搭建网络通信服务器,实现与客户端网络通信,能够识别客户端的请求,并提供客户端请求的处理服务。本质上这个模块并不提供实质的功能性操作,这个模块最重要的是资源的整合,是一个资源的载体。
管理信息:
- 虚拟机管理模块句柄
- 消费者管理模块句柄
- 连接管理模块句柄
- 工作线程池句柄
- muduo库通信所需元素
服务器与资源的关系:
- 一个服务器有一个工作线程池,其他所有信道操作的都是这同一个线程池。
- 一个服务器有一个虚拟机,其他所有交换机,队列,绑定,消息的操作都是针对这个虚拟机进行的。
- 一个服务器有一个消费者管理
- 通信相关连接管理,协议处理模块句柄,也是一整个服务器都有一套。
客户端
消费者管理
消费者信息:
- 消费者标识
- 订阅队列名称
- 自动确认标志
- 消费处理回调函数指针
当前消费者订阅了某一个队列的消息,这个队列有了消息后,就会将消息推送给这个客户端,这时候收到了消息则使用回调函数进行处理,处理完毕后就会根据确认标志决定是否进行消息确认。
管理的操作:对消费者的增删查。
信道管理
其实跟服务器的一样,因为客户端要给用户提供什么服务,服务器就要给客户端提供什么服务。
管理信息:
(1)消费者管理句柄:每个信道都有自己相关的消费者。
(2)线程池句柄:对推送过来的消息进行回调处理,处理过程通过工作线程来进行。
(3)信道ID
(4)信道关联的连接
信道提供的服务:
(1)声明/删除交换机
(2)声明/删除队列
(3)绑定/解绑队列与交换机
(4)发布消息/确认消息
(5)订阅队列消息/取消订阅队列消息
(6)创建/关闭信道
信道的管理:信道的增删查
连接管理模块
客户端连接的管理本质上是对客户端TcpClient的二次封装和管理。面对用户,不需要有客户端的概念,连接对于用户来说就是客户端,通过连接创建信道,通过信道完成自己所需服务。因此当前客户端这边的连接对于用户来说就是一个资源的载体。
管理操作:
- 连接服务器
- 创建信道
- 关闭信道
- 关闭连接
管理资源:工作线程池,连接关联的信道管理句柄
异步工作池模块
(1)TcpClient模块需要一个EventLoopThread模块进行IO事件监控
(2)收到推送消息后,需要对推送过来的消息进行处理,因此需要一个线程池来帮助我们完成消息处理的过程
注意:将异步工作线程模块独立出来,原因是多个连接用一个
EventLoopThread进行I/O事件监控就够了, 以及所有的推送消息处理也只需要有一个线程池就够了,并不需要每个连接都要有一个EventLoop,也不需要每个信道的消息处理都有自己的线程池。
模块关系图

