手撕基于AMQP协议的简易消息队列-4(项目需求分析)
需求分析
核心概念需求
- ⽣产者 (Producer)
- 消费者 (Consumer)
- 中间⼈ (Broker)
- 发布 (Publish)
- 订阅 (Subscribe)
整体模型需求
-
一个生产者一个消费者
-
N个⽣产者, N个消费者
-
不难看出Broker Server是最核⼼的部分, 负责消息的存储和转发
Broker Server模块需求
Broker Server模块概览
-
为了实现AMQP(Advanced Message Queuing Protocol)-⾼级消息队列协议模型,对于消息中间件服务器Broker中,又需要一下模块:
- 虚拟机 (VirtualHost): 类似于 MySQL 的 “database”, 是⼀个逻辑上的集合。⼀个 BrokerServer 上可以存在多个 VirtualHost
- 交换机 (Exchange): ⽣产者把消息先发送到 Broker 的 Exchange 上,再根据不同的规则, 把消息转发给不同的 Queue队列 (Queue): 真正⽤来存储消息的部分, 每个消费者决定⾃⼰从哪个 Queue 上读取消息
- 绑定 (Binding): Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以理解成 "多对多"关系,使⽤⼀个关联表就可以把这两个概念联系起来
- 消息 (Message): 传递的内容
-
BrokerServer结构图
- 大体上是BrokerServer中包含多个虚拟机,但本项目为了简化代码,仅支持定义一个虚拟机,同志们可以在这做拓展
-
VirtualHost结构图
- 对于生产客户端来说,其生产的消息应该交给交换机进行处理,由交换机根据交换机类型决定该消息应该推送到哪些队列当中去,而不是由生产客户端直接推送到队列当中
- 对于消费客户端来说,其要消费得消息应该直接从队列里面进行拿取。
Broker Server模块核心API
- Broker Server需要提供相应的接口来给客户端进行操作
- 创建交换机 (exchangeDeclare)
- 销毁交换机 (exchangeDelete)
- 创建队列 (queueDeclare)
- 销毁队列 (queueDelete)
- 创建绑定 (queueBind)
- 解除绑定 (queueUnbind)
- 发布消息 (basicPublish)
- 订阅消息 (basicConsume)
- 确认消息 (basicAck)
- 取消订阅 (basicCancel)
交换机类型需求
-
Direct: ⽣产者发送消息时, 直接指定被该交换机绑定的队列名
-
Fanout: ⽣产者发送的消息会被 到该交换机的所有队列中
-
Topic: 绑定队列到交换机上时, 指定⼀个字符串为 bindingKey。发送消息指定⼀个字符串为routingKey。当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列
持久化需求
- 需要持久化的数据:
- Exchange:交换机数据
- Queue:队列数据
- Binding:绑定数据,消费者与队列与交换机之间的关系
- Message:未被消费得数据
网络通信需求
- ⽣产者和消费者都是客⼾端程序, Broker 则是作为服务器,通过⽹络进⾏通信。
- 在⽹络通信的过程中, 客⼾端部分要提供对应的 api, 来实现对服务器的操作
- 客户端需要提供的api:
- 创建 Connection
- 关闭 Connection
- 创建 Channel
- 关闭 Channel
- 创建队列 (queueDeclare)
- 销毁队列 (queueDelete)
- 创建交换机 (exchangeDeclare)
- 销毁交换机 (exchangeDelete)
- 创建绑定 (queueBind)解除绑定 (queueUnbind)
- 发布消息 (basicPublish)
- 订阅消息 (basicConsume)
- 确认消息 (basicAck)
- 取消订阅(basicCancel)
消息应答种类需求
-
⾃动应答: 消费者只要消费了消息, 就算应答完毕了,Broker 直接删除这个消息
-
⼿动应答: 消费者⼿动调⽤应答接⼝, Broker 收到应答请求之后, 才真正删除这个消息
模块划分
三层处理架构
项目模块关系图
服务端模块
服务器的整体结构图
- _server:Muduo库提供的⼀个通⽤TCP服务器, 我们可以封装这个服务器进⾏TCP通信
- _baseloop:主事件循环器, ⽤于响应IO事件和定时器事件,主loop主要是为了响应监听描述符的IO事件
- _codec: ⼀个protobuf编解码器, 我们在TCP服务器上设计了⼀层应⽤层协议, 这个编解码器主要就是负责实现应⽤层协议的解析和封装, 下边具体讲解
- _dispatcher:⼀个消息分发器, 当Socket接收到⼀个报⽂消息后, 我们需要按照消息的类型, 即上⾯提到的typeName进⾏消息分发, 会将不同类型的消息分发相对应的的处理函数中,下边具体讲解
- _consumer: 服务器中的消费者信息管理句柄。
- _threadpool: 异步⼯作线程池,主要⽤于队列消息的推送⼯作。
- _connections: 连接管理句柄,管理当前服务器上的所有已经建⽴的通信连接。
- _virtual_host:服务器持有的虚拟主机。 队列、交换机 、绑定、消息等数据都是通过虚拟主机管理
关键组件协作
持久化数据管理中⼼模块
- 在数据管理模块中管理交换机,队列,队列绑定,消息等部分数据数据。
-
交换机管理:
-
管理信息:名称,类型,是否持久化标志,是否(⽆⼈使⽤时)⾃动删除标志,其他参数
-
管理操作:恢复历史信息,声明,删除,获取,判断是否存在
-
-
队列管理:
-
管理信息:名称,是否持久化标志,是否独有标志,是否(⽆⼈使⽤时)⾃动删除标志,其他参数…
-
管理操作:恢复历史信息,声明,删除,获取,判断是否存在
-
-
绑定管理:
-
管理信息:交换机名称,队列名称,绑定主题
-
管理操作:恢复历史信息,绑定,解绑,解除交换机关联绑定信息,解除队列关联绑定信息,获取交换机关联绑定信息
-
消息管理:
-
管理信息:
- 属性消息ID, 路由主题,持久化模式标志
- 消息内容
- 持久化有效标志
- 持久化位置
- 持久化消息长度
-
管理操作:恢复历史消息、向指定队列新增消息,获取指定队列队首消息,确认移除消息。以上消息都应该在内存和硬盘中存储。
-
以内存存储为主,主要是保证快速查找信息进⾏处理
-
以硬盘存储为辅,主要是保证服务器重启之后,之前的信息都可以正常保持
-
-
虚拟机管理模块
-
因为交换机/队列/绑定都是以虚拟机为单元整体进⾏操作的,因此虚拟机是对以上数据管理模块的整合模块
-
虚拟机管理信息:
-
交换机数据管理模块句柄
-
队列数据管理模块句柄
-
绑定数据管理模块句柄
-
消息数据管理模块句柄
-
-
虚拟机对外的接口
-
提供虚拟机内交换机声明/删除操作
-
提供虚拟机内队列声明/删除操作
-
提供虚拟机内交换机-队列绑定/解绑操作
-
获取交换机的相关绑定信息
-
-
对虚拟机的管理操作
-
创建虚拟机
-
查询虚拟机
-
删除虚拟机
-
交换路由模块
-
当客⼾端发布⼀条消息到交换机后,这条消息,应该被⼊队到该交换机绑定的哪些队列中?答案是由交换路由模块来决定
-
在绑定信息中有⼀个binding_key,⽽每条发布的消息中有⼀个routing_key,能否⼊队取决于两个要素:交换机类型和key
-
不同交换机类型对应的路由操作:
- ⼴播:将消息⼊队到该交换机的所有绑定队列中
- 直接:将消息⼊队到绑定信息中binding_key与消息routing_key⼀致的队列中
- 主题:将消息⼊队到绑定信息中binding_key与routing_key是匹配成功的队列中
-
binding_key:
- 是由数字字⺟下划线构成的, 并且使⽤ . 分成若⼲部分,比如:news.music.#
- ⽀持 * 和 # 两种通配符, 但是 * # 只能作为 . 切分出来的独⽴部分, 不能和其他数字字⺟混⽤,比如:
- ⽐如 a.*.b 是合法的, a.*a.b 是不合法的
- * 可以匹配任意⼀个单词(注意是单词不是字⺟)
- # 可以匹配任意零个或者多个单词(注意是单词不是字⺟)
-
routing_key:
- 是由数据、字⺟和下划线构成, 并且使⽤ . 划分成若⼲部分,如:news.music.pop
-
路由匹配算法详解
这个路由匹配算法用于消息队列系统中,根据交换机类型和路由键来决定消息如何路由到队列。算法主要处理三种交换机类型:DIRECT(直接)、FANOUT(扇出)和TOPIC(主题)。
-
算法概述
-
该算法实现了AMQP协议中的路由匹配规则,特别是对TOPIC交换机的复杂模式匹配。算法采用动态规划方法解决主题交换机中的通配符匹配问题。
-
三种交换机类型的处理
- DIRECT(直接交换机)
if (type == ExchangeType::DIRECT) {return (routing_key == binding_key); }
特点:
-
精确匹配路由键和绑定键
-
只有当
routing_key
完全等于binding_key
时才匹配成功 -
时间复杂度:O(1)
示例:
-
routing_key: “order.create”
-
匹配的binding_key: “order.create”
-
不匹配的binding_key: “order.delete”
- FANOUT(扇出交换机)
else if (type == ExchangeType::FANOUT) {return true; }
特点:
-
无条件匹配所有绑定
-
消息会被路由到所有绑定的队列
-
不考虑路由键和绑定键
-
时间复杂度:O(1)
示例:
- 无论routing_key是什么,都会匹配所有binding_key
- TOPIC(主题交换机)
else if (type == ExchangeType::TOPIC) {// 详细匹配逻辑... }
特点:
-
支持通配符匹配
-
使用动态规划算法处理复杂模式
-
时间复杂度:O(m*n),其中m和n分别是binding_key和routing_key的分段数
TOPIC交换机匹配算法详解
- 键分割
std::vector<std::string> bkeys, rkeys; int count_binding_key = StrHelper::split(binding_key, ".", bkeys); int count_routing_key = StrHelper::split(routing_key, ".", rkeys);
-
将binding_key和routing_key按"."分割成多个部分
-
例如:“news.music.pop” → [“news”, “music”, “pop”]
- 动态规划表初始化
std::vector<std::vector<bool>> dp(count_binding_key + 1, std::vector<bool>(count_routing_key + 1, false)); dp[0][0] = true;
-
创建(m+1)×(n+1)的二维布尔数组dp
-
dp[i][j]表示binding_key前i部分能否匹配routing_key前j部分
-
初始状态:空键匹配空键(dp[0][0] = true)
- 处理"#"开头的binding_key
for (int i = 1; i <= count_binding_key; i++) {if (bkeys[i - 1] == "#") {dp[i][0] = true;continue;}break; }
-
"#"表示匹配零个或多个单词
-
如果binding_key以"#"开头,则能匹配空routing_key
-
例如binding_key: "#.news"可以匹配routing_key: “news”
- 动态规划填充
for (int i = 1; i <= count_binding_key; i++) {for (int j = 1; j <= count_routing_key; j++) {// 情况1:当前单词相同或binding_key为"*"if (bkeys[i - 1] == rkeys[j - 1] || bkeys[i - 1] == "*") {dp[i][j] = dp[i - 1][j - 1];}// 情况2:当前binding_key为"#"else if (bkeys[i - 1] == "#") {dp[i][j] = dp[i - 1][j - 1] | dp[i][j - 1] | dp[i - 1][j];}} }
匹配规则:
-
精确匹配或"*"通配符:
-
"*"匹配单个单词
-
如果当前binding_key部分等于routing_key部分,或binding_key部分为"*"
-
继承左上角的结果(dp[i][j] = dp[i-1][j-1])
-
-
"#"通配符:
-
"#"匹配零个或多个单词
-
可以从三个方向继承结果:
-
左上(dp[i-1][j-1]):"#"匹配当前单词
-
左(dp[i][j-1]):"#"匹配更多单词
-
上(dp[i-1][j]):"#"匹配零个单词
-
-
-
返回结果
return dp[count_binding_key][count_routing_key];
-
返回动态规划表右下角的值
-
表示整个binding_key能否匹配整个routing_key
示例分析
示例1:
-
binding_key: “news.*.pop”
-
routing_key: “news.music.pop”
匹配过程:
-
分割:
-
bkeys: [“news”, “*”, “pop”]
-
rkeys: [“news”, “music”, “pop”]
-
-
动态规划表:
-
dp[1][1] = true (“news” == “news”)
-
dp[2][2] = true ("*“匹配"music”)
-
dp[3][3] = true (“pop” == “pop”)
-
-
结果:true
示例2:
-
binding_key: “news.#”
-
routing_key: “news.music.pop.jazz”
匹配过程:
-
分割:
-
bkeys: [“news”, “#”]
-
rkeys: [“news”, “music”, “pop”, “jazz”]
-
-
动态规划表:
-
dp[1][1] = true (“news” == “news”)
-
dp[2][2] = true ("#“匹配"music”)
-
dp[2][3] = true ("#“继续匹配"pop”)
-
dp[2][4] = true ("#“继续匹配"jazz”)
-
-
结果:true
示例3:
-
binding_key: “news.music.#.jazz”
-
routing_key: “news.music.pop.jazz”
匹配过程:
-
分割:
-
bkeys: [“news”, “music”, “#”, “jazz”]
-
rkeys: [“news”, “music”, “pop”, “jazz”]
-
-
动态规划表:
-
dp[1][1] = true (“news” == “news”)
-
dp[2][2] = true (“music” == “music”)
-
dp[3][3] = true ("#“匹配"pop”)
-
dp[4][4] = true (“jazz” == “jazz”)
-
-
结果:true
-
消费者管理模块
- 消费者管理是以队列为单元的,因为每个消费者都会在开始的时候订阅⼀个队列的消息,当队列中有消息后,会将队列消息轮询推送给订阅了该队列的消费者。
- 因此操作流程通常是,从队列关联的消息管理中取出消息,从队列关联的消费者中取出⼀个消费者,然后将消息推送给消费者(这就是发布订阅中负载均衡的⽤法)
- 需要管理的消费者信息:
- 标识
- 订阅队列名称
- ⾃动应答标志(决定了⼀条消息推送给消费者后,是否需要等待收到确认后再删除消息)
- 消息处理回调函数指针(⼀个消息发布后调⽤回调,选择消费者进⾏推送)
- 对消费者管理的操作:
- 添加
- 删除
- 轮询获取指定队列的消费者
- 移除队列所有消费者
信道管理模块
-
在AMQP模型中,除了通信连接Connection概念外,还有⼀个Channel的概念,Channel是针对Connection连接的⼀个更细粒度的通信信道,多个Channel可以使⽤同⼀个通信连接Connection,但同一个Connection的Channel之间相互独⽴
-
需要管理的信道信息
- 信道ID
- 信道关联的消费者
- 信道关联的连接
- 信道关联的虚拟机
- ⼯作线程池(⼀条消息被发布到队列后,需要将消息推送给订阅了对应队列的消费者,过程由线程池完成)
-
需要管理的操作:
-
提供声明&删除交换机操作(删除交换机的同时删除交换机关联的绑定信息)
-
提供声明&删除队列操作(删除队列的同时,删除队列关联的绑定信息,消息,消费者信息)
-
提供绑定&解绑队列操作
-
提供订阅&取消订阅队列消息操作
-
提供发布&确认消息操作
-
链接管理模块
-
链接模块的典型工作流程
-
管理结构图
-
虽然Muduo库里面提供了链接模块,但不能完全满足我们的要求,所以需要进行二次封装。
-
除了基础的增删查链接外,还需要能够操作信道
-
需要管理的链接信息:
- 链接关联的信道
- 链接关联的Muduo库Connection
-
需要管理的操作:
- 新增链接
- 删除链接
- 获取链接
- 打开信道
- 关闭信道
Broker服务器模块
- 整合以上所有模块,并搭建⽹络通信服务器,实现与客⼾端⽹络通信,能够识别客⼾端请求,并提供客⼾端请求的处理服务
- 需要管理的服务器信息:
- 虚拟机管理模块句柄
- 消费者管理模块句柄
- 连接管理模块句柄
- ⼯作线程池句柄
- muduo库通信所需元素
客户端模块
消费者管理
-
消费者在客⼾端的存在感⽐较低,因为在⽤⼾的使⽤⻆度中,只要创建⼀个信道后,就可以通过信道完成所有的操作,因此对于消费者的感官更多是在订阅的时候传⼊了⼀个消费者标识,且当前的简单实现也仅仅是⼀个信道只能创建订阅⼀个队列,也就是只能创建⼀个消费者,它们⼀⼀对应,因此更是弱化了消费者的存在
-
需要管理的消费者信息
-
标识
b. 订阅队列名称
c. ⾃动应答标志(决定了⼀条消息推送给消费者后,是否需要等待收到确认后再删除消息)
d. 消息处理回调函数指针(⼀个消息发布后调⽤回调,选择消费者进⾏推送)
-
-
需要管理的消费者操作:
- 添加
- 删除
- 轮询获取指定队列的消费者
- 移除队列所有消费者等操作
信道请求模块
-
与服务端的信道类似,客⼾端这边在AMQP模型中,也是除了通信连接Connection概念外,还有⼀个Channel的概念,Channel是针对Connection连接的⼀个更细粒度的通信信道,多个Channel可以使⽤同⼀个通信连接Connection进⾏通信,但是同⼀个Connection的Channel之间相互独⽴
-
需要管理的信道信息:
-
信道ID
-
信道关联的通信连接
-
信道关联的消费者
-
请求对应的响应信息队列(这⾥队列使⽤hash表,以便于查找指定的响应)
-
互斥锁&条件变量(⼤部分的请求都是阻塞操作,发送请求后需要等到响应才能继续,但是muduo库的通信是异步的,因此需要我们⾃⼰在收到响应后,通过判断是否是等待的指定响应
来进⾏同步)
-
-
需要管理的信道操作:
- 提供创建信道操作
- 提供删除信道操作
- 提供声明交换机操作(强断⾔-有则OK,没有则创建)
- 提供删除交换机
- 提供创建队列操作(强断⾔-有则OK,没有则创建)
- 提供删除队列操作
- 提供交换机-队列绑定操作