手撕基于AMQP协议的简易消息队列-4(项目需求分析)
需求分析
核心概念需求
- ⽣产者 (Producer)
 - 消费者 (Consumer)
 - 中间⼈ (Broker)
 - 发布 (Publish)
 - 订阅 (Subscribe)
 
整体模型需求
-  
一个生产者一个消费者

 -  
N个⽣产者, N个消费者

 -  
不难看出Broker Server是最核⼼的部分, 负责消息的存储和转发
 
Broker Server模块需求
Broker Server模块概览
-  
为了实现AMQP(Advanced Message Queuing Protocol)-⾼级消息队列协议模型,对于消息中间件服务器Broker中,又需要一下模块:
- 虚拟机 (VirtualHost): 类似于 MySQL 的 “database”, 是⼀个逻辑上的集合。⼀个 BrokerServer 上可以存在多个 VirtualHost
 - 交换机 (Exchange): ⽣产者把消息先发送到 Broker 的 Exchange 上,再根据不同的规则, 把消息转发给不同的 Queue队列 (Queue): 真正⽤来存储消息的部分, 每个消费者决定⾃⼰从哪个 Queue 上读取消息
 - 绑定 (Binding): Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以理解成 "多对多"关系,使⽤⼀个关联表就可以把这两个概念联系起来
 - 消息 (Message): 传递的内容
 
 -  
BrokerServer结构图
- 大体上是BrokerServer中包含多个虚拟机,但本项目为了简化代码,仅支持定义一个虚拟机,同志们可以在这做拓展
 

 -  
VirtualHost结构图
- 对于生产客户端来说,其生产的消息应该交给交换机进行处理,由交换机根据交换机类型决定该消息应该推送到哪些队列当中去,而不是由生产客户端直接推送到队列当中
 - 对于消费客户端来说,其要消费得消息应该直接从队列里面进行拿取。
 

 
Broker Server模块核心API
- Broker Server需要提供相应的接口来给客户端进行操作
 
- 创建交换机 (exchangeDeclare)
 - 销毁交换机 (exchangeDelete)
 - 创建队列 (queueDeclare)
 - 销毁队列 (queueDelete)
 - 创建绑定 (queueBind)
 - 解除绑定 (queueUnbind)
 - 发布消息 (basicPublish)
 - 订阅消息 (basicConsume)
 - 确认消息 (basicAck)
 - 取消订阅 (basicCancel)
 
交换机类型需求
-  
Direct: ⽣产者发送消息时, 直接指定被该交换机绑定的队列名
 -  
Fanout: ⽣产者发送的消息会被 到该交换机的所有队列中
 -  
Topic: 绑定队列到交换机上时, 指定⼀个字符串为 bindingKey。发送消息指定⼀个字符串为routingKey。当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列
 
持久化需求
- 需要持久化的数据: 
- Exchange:交换机数据
 - Queue:队列数据
 - Binding:绑定数据,消费者与队列与交换机之间的关系
 - Message:未被消费得数据
 
 
网络通信需求
- ⽣产者和消费者都是客⼾端程序, Broker 则是作为服务器,通过⽹络进⾏通信。
 - 在⽹络通信的过程中, 客⼾端部分要提供对应的 api, 来实现对服务器的操作
 - 客户端需要提供的api: 
- 创建 Connection
 - 关闭 Connection
 - 创建 Channel
 - 关闭 Channel
 - 创建队列 (queueDeclare)
 - 销毁队列 (queueDelete)
 - 创建交换机 (exchangeDeclare)
 - 销毁交换机 (exchangeDelete)
 - 创建绑定 (queueBind)解除绑定 (queueUnbind)
 - 发布消息 (basicPublish)
 - 订阅消息 (basicConsume)
 - 确认消息 (basicAck)
 - 取消订阅(basicCancel)
 
 
消息应答种类需求
-  
⾃动应答: 消费者只要消费了消息, 就算应答完毕了,Broker 直接删除这个消息
 -  
⼿动应答: 消费者⼿动调⽤应答接⼝, Broker 收到应答请求之后, 才真正删除这个消息
 
模块划分
三层处理架构

项目模块关系图

服务端模块
服务器的整体结构图

- _server:Muduo库提供的⼀个通⽤TCP服务器, 我们可以封装这个服务器进⾏TCP通信
 - _baseloop:主事件循环器, ⽤于响应IO事件和定时器事件,主loop主要是为了响应监听描述符的IO事件
 - _codec: ⼀个protobuf编解码器, 我们在TCP服务器上设计了⼀层应⽤层协议, 这个编解码器主要就是负责实现应⽤层协议的解析和封装, 下边具体讲解
 - _dispatcher:⼀个消息分发器, 当Socket接收到⼀个报⽂消息后, 我们需要按照消息的类型, 即上⾯提到的typeName进⾏消息分发, 会将不同类型的消息分发相对应的的处理函数中,下边具体讲解
 - _consumer: 服务器中的消费者信息管理句柄。
 - _threadpool: 异步⼯作线程池,主要⽤于队列消息的推送⼯作。
 - _connections: 连接管理句柄,管理当前服务器上的所有已经建⽴的通信连接。
 - _virtual_host:服务器持有的虚拟主机。 队列、交换机 、绑定、消息等数据都是通过虚拟主机管理
 
关键组件协作

持久化数据管理中⼼模块
- 在数据管理模块中管理交换机,队列,队列绑定,消息等部分数据数据。
 
-  
交换机管理:
-  
管理信息:名称,类型,是否持久化标志,是否(⽆⼈使⽤时)⾃动删除标志,其他参数
 -  
管理操作:恢复历史信息,声明,删除,获取,判断是否存在
 
 -  
 -  
队列管理:
-  
管理信息:名称,是否持久化标志,是否独有标志,是否(⽆⼈使⽤时)⾃动删除标志,其他参数…
 -  
管理操作:恢复历史信息,声明,删除,获取,判断是否存在
 
 -  
 -  
绑定管理:
 
-  
管理信息:交换机名称,队列名称,绑定主题
 -  
管理操作:恢复历史信息,绑定,解绑,解除交换机关联绑定信息,解除队列关联绑定信息,获取交换机关联绑定信息
 
-  
消息管理:
-  
管理信息:
- 属性消息ID, 路由主题,持久化模式标志
 - 消息内容
 - 持久化有效标志
 - 持久化位置
 - 持久化消息长度
 
 -  
管理操作:恢复历史消息、向指定队列新增消息,获取指定队列队首消息,确认移除消息。以上消息都应该在内存和硬盘中存储。
-  
以内存存储为主,主要是保证快速查找信息进⾏处理
 -  
以硬盘存储为辅,主要是保证服务器重启之后,之前的信息都可以正常保持
 
 -  
 
 -  
 
虚拟机管理模块
-  
因为交换机/队列/绑定都是以虚拟机为单元整体进⾏操作的,因此虚拟机是对以上数据管理模块的整合模块
 -  
虚拟机管理信息:
-  
交换机数据管理模块句柄
 -  
队列数据管理模块句柄
 -  
绑定数据管理模块句柄
 -  
消息数据管理模块句柄
 
 -  
 -  
虚拟机对外的接口
-  
提供虚拟机内交换机声明/删除操作
 -  
提供虚拟机内队列声明/删除操作
 -  
提供虚拟机内交换机-队列绑定/解绑操作
 -  
获取交换机的相关绑定信息
 
 -  
 -  
对虚拟机的管理操作
-  
创建虚拟机
 -  
查询虚拟机
 -  
删除虚拟机
 
 -  
 
交换路由模块
-  
当客⼾端发布⼀条消息到交换机后,这条消息,应该被⼊队到该交换机绑定的哪些队列中?答案是由交换路由模块来决定
 -  
在绑定信息中有⼀个binding_key,⽽每条发布的消息中有⼀个routing_key,能否⼊队取决于两个要素:交换机类型和key
 -  
不同交换机类型对应的路由操作:
- ⼴播:将消息⼊队到该交换机的所有绑定队列中
 - 直接:将消息⼊队到绑定信息中binding_key与消息routing_key⼀致的队列中
 - 主题:将消息⼊队到绑定信息中binding_key与routing_key是匹配成功的队列中
 
 -  
binding_key:
- 是由数字字⺟下划线构成的, 并且使⽤ . 分成若⼲部分,比如:news.music.#
 - ⽀持 * 和 # 两种通配符, 但是 * # 只能作为 . 切分出来的独⽴部分, 不能和其他数字字⺟混⽤,比如: 
- ⽐如 a.*.b 是合法的, a.*a.b 是不合法的
 - * 可以匹配任意⼀个单词(注意是单词不是字⺟)
 - # 可以匹配任意零个或者多个单词(注意是单词不是字⺟)
 
 
 -  
routing_key:
- 是由数据、字⺟和下划线构成, 并且使⽤ . 划分成若⼲部分,如:news.music.pop
 
 -  
路由匹配算法详解
这个路由匹配算法用于消息队列系统中,根据交换机类型和路由键来决定消息如何路由到队列。算法主要处理三种交换机类型:DIRECT(直接)、FANOUT(扇出)和TOPIC(主题)。
 -  
算法概述
-  
该算法实现了AMQP协议中的路由匹配规则,特别是对TOPIC交换机的复杂模式匹配。算法采用动态规划方法解决主题交换机中的通配符匹配问题。
 -  
三种交换机类型的处理
- DIRECT(直接交换机)
 
 
if (type == ExchangeType::DIRECT) {return (routing_key == binding_key); } 特点:
-  
精确匹配路由键和绑定键
 -  
只有当
routing_key完全等于binding_key时才匹配成功 -  
时间复杂度:O(1)
 
示例:
-  
routing_key: “order.create”
 -  
匹配的binding_key: “order.create”
 -  
不匹配的binding_key: “order.delete”
 
- FANOUT(扇出交换机)
 
else if (type == ExchangeType::FANOUT) {return true; }特点:
-  
无条件匹配所有绑定
 -  
消息会被路由到所有绑定的队列
 -  
不考虑路由键和绑定键
 -  
时间复杂度:O(1)
 
示例:
- 无论routing_key是什么,都会匹配所有binding_key
 
- TOPIC(主题交换机)
 
else if (type == ExchangeType::TOPIC) {// 详细匹配逻辑... }特点:
-  
支持通配符匹配
 -  
使用动态规划算法处理复杂模式
 -  
时间复杂度:O(m*n),其中m和n分别是binding_key和routing_key的分段数
 
TOPIC交换机匹配算法详解
- 键分割
 
std::vector<std::string> bkeys, rkeys; int count_binding_key = StrHelper::split(binding_key, ".", bkeys); int count_routing_key = StrHelper::split(routing_key, ".", rkeys);-  
将binding_key和routing_key按"."分割成多个部分
 -  
例如:“news.music.pop” → [“news”, “music”, “pop”]
 
- 动态规划表初始化
 
std::vector<std::vector<bool>> dp(count_binding_key + 1, std::vector<bool>(count_routing_key + 1, false)); dp[0][0] = true;-  
创建(m+1)×(n+1)的二维布尔数组dp
 -  
dp[i][j]表示binding_key前i部分能否匹配routing_key前j部分
 -  
初始状态:空键匹配空键(dp[0][0] = true)
 
- 处理"#"开头的binding_key
 
for (int i = 1; i <= count_binding_key; i++) {if (bkeys[i - 1] == "#") {dp[i][0] = true;continue;}break; }-  
"#"表示匹配零个或多个单词
 -  
如果binding_key以"#"开头,则能匹配空routing_key
 -  
例如binding_key: "#.news"可以匹配routing_key: “news”
 
- 动态规划填充
 
for (int i = 1; i <= count_binding_key; i++) {for (int j = 1; j <= count_routing_key; j++) {// 情况1:当前单词相同或binding_key为"*"if (bkeys[i - 1] == rkeys[j - 1] || bkeys[i - 1] == "*") {dp[i][j] = dp[i - 1][j - 1];}// 情况2:当前binding_key为"#"else if (bkeys[i - 1] == "#") {dp[i][j] = dp[i - 1][j - 1] | dp[i][j - 1] | dp[i - 1][j];}} }匹配规则:
-  
精确匹配或"*"通配符:
-  
"*"匹配单个单词
 -  
如果当前binding_key部分等于routing_key部分,或binding_key部分为"*"
 -  
继承左上角的结果(dp[i][j] = dp[i-1][j-1])
 
 -  
 -  
"#"通配符:
-  
"#"匹配零个或多个单词
 -  
可以从三个方向继承结果:
-  
左上(dp[i-1][j-1]):"#"匹配当前单词
 -  
左(dp[i][j-1]):"#"匹配更多单词
 -  
上(dp[i-1][j]):"#"匹配零个单词
 
 -  
 
 -  
 -  
返回结果
 
return dp[count_binding_key][count_routing_key];-  
返回动态规划表右下角的值
 -  
表示整个binding_key能否匹配整个routing_key
 
示例分析
示例1:
-  
binding_key: “news.*.pop”
 -  
routing_key: “news.music.pop”
 
匹配过程:
-  
分割:
-  
bkeys: [“news”, “*”, “pop”]
 -  
rkeys: [“news”, “music”, “pop”]
 
 -  
 -  
动态规划表:
-  
dp[1][1] = true (“news” == “news”)
 -  
dp[2][2] = true ("*“匹配"music”)
 -  
dp[3][3] = true (“pop” == “pop”)
 
 -  
 -  
结果:true
 
示例2:
-  
binding_key: “news.#”
 -  
routing_key: “news.music.pop.jazz”
 
匹配过程:
-  
分割:
-  
bkeys: [“news”, “#”]
 -  
rkeys: [“news”, “music”, “pop”, “jazz”]
 
 -  
 -  
动态规划表:
-  
dp[1][1] = true (“news” == “news”)
 -  
dp[2][2] = true ("#“匹配"music”)
 -  
dp[2][3] = true ("#“继续匹配"pop”)
 -  
dp[2][4] = true ("#“继续匹配"jazz”)
 
 -  
 -  
结果:true
 
示例3:
-  
binding_key: “news.music.#.jazz”
 -  
routing_key: “news.music.pop.jazz”
 
匹配过程:
-  
分割:
-  
bkeys: [“news”, “music”, “#”, “jazz”]
 -  
rkeys: [“news”, “music”, “pop”, “jazz”]
 
 -  
 -  
动态规划表:
-  
dp[1][1] = true (“news” == “news”)
 -  
dp[2][2] = true (“music” == “music”)
 -  
dp[3][3] = true ("#“匹配"pop”)
 -  
dp[4][4] = true (“jazz” == “jazz”)
 
 -  
 -  
结果:true
 
 -  
 
消费者管理模块
- 消费者管理是以队列为单元的,因为每个消费者都会在开始的时候订阅⼀个队列的消息,当队列中有消息后,会将队列消息轮询推送给订阅了该队列的消费者。
 - 因此操作流程通常是,从队列关联的消息管理中取出消息,从队列关联的消费者中取出⼀个消费者,然后将消息推送给消费者(这就是发布订阅中负载均衡的⽤法)
 - 需要管理的消费者信息: 
- 标识
 - 订阅队列名称
 - ⾃动应答标志(决定了⼀条消息推送给消费者后,是否需要等待收到确认后再删除消息)
 - 消息处理回调函数指针(⼀个消息发布后调⽤回调,选择消费者进⾏推送)
 
 - 对消费者管理的操作: 
- 添加
 - 删除
 - 轮询获取指定队列的消费者
 - 移除队列所有消费者
 
 
信道管理模块
-  
在AMQP模型中,除了通信连接Connection概念外,还有⼀个Channel的概念,Channel是针对Connection连接的⼀个更细粒度的通信信道,多个Channel可以使⽤同⼀个通信连接Connection,但同一个Connection的Channel之间相互独⽴
 -  
需要管理的信道信息
- 信道ID
 - 信道关联的消费者
 - 信道关联的连接
 - 信道关联的虚拟机
 - ⼯作线程池(⼀条消息被发布到队列后,需要将消息推送给订阅了对应队列的消费者,过程由线程池完成)
 
 -  
需要管理的操作:
-  
提供声明&删除交换机操作(删除交换机的同时删除交换机关联的绑定信息)
 -  
提供声明&删除队列操作(删除队列的同时,删除队列关联的绑定信息,消息,消费者信息)
 -  
提供绑定&解绑队列操作
 -  
提供订阅&取消订阅队列消息操作
 -  
提供发布&确认消息操作
 
 -  
 
链接管理模块
-  
链接模块的典型工作流程

 -  
管理结构图

 -  
虽然Muduo库里面提供了链接模块,但不能完全满足我们的要求,所以需要进行二次封装。
 -  
除了基础的增删查链接外,还需要能够操作信道
 -  
需要管理的链接信息:
- 链接关联的信道
 - 链接关联的Muduo库Connection
 
 -  
需要管理的操作:
- 新增链接
 - 删除链接
 - 获取链接
 - 打开信道
 - 关闭信道
 
 
Broker服务器模块
- 整合以上所有模块,并搭建⽹络通信服务器,实现与客⼾端⽹络通信,能够识别客⼾端请求,并提供客⼾端请求的处理服务
 - 需要管理的服务器信息: 
- 虚拟机管理模块句柄
 - 消费者管理模块句柄
 - 连接管理模块句柄
 - ⼯作线程池句柄
 - muduo库通信所需元素
 
 
客户端模块
消费者管理
-  
消费者在客⼾端的存在感⽐较低,因为在⽤⼾的使⽤⻆度中,只要创建⼀个信道后,就可以通过信道完成所有的操作,因此对于消费者的感官更多是在订阅的时候传⼊了⼀个消费者标识,且当前的简单实现也仅仅是⼀个信道只能创建订阅⼀个队列,也就是只能创建⼀个消费者,它们⼀⼀对应,因此更是弱化了消费者的存在
 -  
需要管理的消费者信息
-  
标识
b. 订阅队列名称
c. ⾃动应答标志(决定了⼀条消息推送给消费者后,是否需要等待收到确认后再删除消息)
d. 消息处理回调函数指针(⼀个消息发布后调⽤回调,选择消费者进⾏推送)
 
 -  
 -  
需要管理的消费者操作:
- 添加
 - 删除
 - 轮询获取指定队列的消费者
 - 移除队列所有消费者等操作
 
 
信道请求模块
-  
与服务端的信道类似,客⼾端这边在AMQP模型中,也是除了通信连接Connection概念外,还有⼀个Channel的概念,Channel是针对Connection连接的⼀个更细粒度的通信信道,多个Channel可以使⽤同⼀个通信连接Connection进⾏通信,但是同⼀个Connection的Channel之间相互独⽴
 -  
需要管理的信道信息:
-  
信道ID
 -  
信道关联的通信连接
 -  
信道关联的消费者
 -  
请求对应的响应信息队列(这⾥队列使⽤hash表,以便于查找指定的响应)
 -  
互斥锁&条件变量(⼤部分的请求都是阻塞操作,发送请求后需要等到响应才能继续,但是muduo库的通信是异步的,因此需要我们⾃⼰在收到响应后,通过判断是否是等待的指定响应
来进⾏同步)
 
 -  
 -  
需要管理的信道操作:
- 提供创建信道操作
 - 提供删除信道操作
 - 提供声明交换机操作(强断⾔-有则OK,没有则创建)
 - 提供删除交换机
 - 提供创建队列操作(强断⾔-有则OK,没有则创建)
 - 提供删除队列操作
 - 提供交换机-队列绑定操作
 
 
