当前位置: 首页 > news >正文

手撕基于AMQP协议的简易消息队列-4(项目需求分析)

需求分析

核心概念需求

  1. ⽣产者 (Producer)
  2. 消费者 (Consumer)
  3. 中间⼈ (Broker)
  4. 发布 (Publish)
  5. 订阅 (Subscribe)

整体模型需求

  • 一个生产者一个消费者
    在这里插入图片描述

  • N个⽣产者, N个消费者

    在这里插入图片描述

  • 不难看出Broker Server是最核⼼的部分, 负责消息的存储和转发

Broker Server模块需求

Broker Server模块概览
  • 为了实现AMQP(Advanced Message Queuing Protocol)-⾼级消息队列协议模型,对于消息中间件服务器Broker中,又需要一下模块:

    • 虚拟机 (VirtualHost): 类似于 MySQL 的 “database”, 是⼀个逻辑上的集合。⼀个 BrokerServer 上可以存在多个 VirtualHost
    • 交换机 (Exchange): ⽣产者把消息先发送到 Broker 的 Exchange 上,再根据不同的规则, 把消息转发给不同的 Queue队列 (Queue): 真正⽤来存储消息的部分, 每个消费者决定⾃⼰从哪个 Queue 上读取消息
    • 绑定 (Binding): Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以理解成 "多对多"关系,使⽤⼀个关联表就可以把这两个概念联系起来
    • 消息 (Message): 传递的内容
  • BrokerServer结构图

    • 大体上是BrokerServer中包含多个虚拟机,但本项目为了简化代码,仅支持定义一个虚拟机,同志们可以在这做拓展

    在这里插入图片描述

  • VirtualHost结构图

    • 对于生产客户端来说,其生产的消息应该交给交换机进行处理,由交换机根据交换机类型决定该消息应该推送到哪些队列当中去,而不是由生产客户端直接推送到队列当中
    • 对于消费客户端来说,其要消费得消息应该直接从队列里面进行拿取。

    在这里插入图片描述

Broker Server模块核心API
  • Broker Server需要提供相应的接口来给客户端进行操作
  1. 创建交换机 (exchangeDeclare)
  2. 销毁交换机 (exchangeDelete)
  3. 创建队列 (queueDeclare)
  4. 销毁队列 (queueDelete)
  5. 创建绑定 (queueBind)
  6. 解除绑定 (queueUnbind)
  7. 发布消息 (basicPublish)
  8. 订阅消息 (basicConsume)
  9. 确认消息 (basicAck)
  10. 取消订阅 (basicCancel)

交换机类型需求

  1. Direct: ⽣产者发送消息时, 直接指定被该交换机绑定的队列名

  2. Fanout: ⽣产者发送的消息会被 到该交换机的所有队列中

  3. Topic: 绑定队列到交换机上时, 指定⼀个字符串为 bindingKey。发送消息指定⼀个字符串为routingKey。当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列

持久化需求

  • 需要持久化的数据:
    1. Exchange:交换机数据
    2. Queue:队列数据
    3. Binding:绑定数据,消费者与队列与交换机之间的关系
    4. Message:未被消费得数据

网络通信需求

  • ⽣产者和消费者都是客⼾端程序, Broker 则是作为服务器,通过⽹络进⾏通信。
  • 在⽹络通信的过程中, 客⼾端部分要提供对应的 api, 来实现对服务器的操作
  • 客户端需要提供的api:
    1. 创建 Connection
    2. 关闭 Connection
    3. 创建 Channel
    4. 关闭 Channel
    5. 创建队列 (queueDeclare)
    6. 销毁队列 (queueDelete)
    7. 创建交换机 (exchangeDeclare)
    8. 销毁交换机 (exchangeDelete)
    9. 创建绑定 (queueBind)解除绑定 (queueUnbind)
    10. 发布消息 (basicPublish)
    11. 订阅消息 (basicConsume)
    12. 确认消息 (basicAck)
    13. 取消订阅(basicCancel)

消息应答种类需求

  • ⾃动应答: 消费者只要消费了消息, 就算应答完毕了,Broker 直接删除这个消息

  • ⼿动应答: 消费者⼿动调⽤应答接⼝, Broker 收到应答请求之后, 才真正删除这个消息

模块划分

三层处理架构

在这里插入图片描述

项目模块关系图

在这里插入图片描述

服务端模块

服务器的整体结构图

在这里插入图片描述

  • _server:Muduo库提供的⼀个通⽤TCP服务器, 我们可以封装这个服务器进⾏TCP通信
  • _baseloop:主事件循环器, ⽤于响应IO事件和定时器事件,主loop主要是为了响应监听描述符的IO事件
  • _codec: ⼀个protobuf编解码器, 我们在TCP服务器上设计了⼀层应⽤层协议, 这个编解码器主要就是负责实现应⽤层协议的解析和封装, 下边具体讲解
  • _dispatcher:⼀个消息分发器, 当Socket接收到⼀个报⽂消息后, 我们需要按照消息的类型, 即上⾯提到的typeName进⾏消息分发, 会将不同类型的消息分发相对应的的处理函数中,下边具体讲解
  • _consumer: 服务器中的消费者信息管理句柄。
  • _threadpool: 异步⼯作线程池,主要⽤于队列消息的推送⼯作。
  • _connections: 连接管理句柄,管理当前服务器上的所有已经建⽴的通信连接。
  • _virtual_host:服务器持有的虚拟主机。 队列、交换机 、绑定、消息等数据都是通过虚拟主机管理
关键组件协作

在这里插入图片描述

持久化数据管理中⼼模块
  • 在数据管理模块中管理交换机,队列,队列绑定,消息等部分数据数据。
  1. 交换机管理:

    • 管理信息:名称,类型,是否持久化标志,是否(⽆⼈使⽤时)⾃动删除标志,其他参数

    • 管理操作:恢复历史信息,声明,删除,获取,判断是否存在

  2. 队列管理:

    • 管理信息:名称,是否持久化标志,是否独有标志,是否(⽆⼈使⽤时)⾃动删除标志,其他参数…

    • 管理操作:恢复历史信息,声明,删除,获取,判断是否存在

  3. 绑定管理:

  • 管理信息:交换机名称,队列名称,绑定主题

  • 管理操作:恢复历史信息,绑定,解绑,解除交换机关联绑定信息,解除队列关联绑定信息,获取交换机关联绑定信息

  1. 消息管理:

    • 管理信息:

      1. 属性消息ID, 路由主题,持久化模式标志
      2. 消息内容
      3. 持久化有效标志
      4. 持久化位置
      5. 持久化消息长度
    • 管理操作:恢复历史消息、向指定队列新增消息,获取指定队列队首消息,确认移除消息。以上消息都应该在内存和硬盘中存储。

      • 以内存存储为主,主要是保证快速查找信息进⾏处理

      • 以硬盘存储为辅,主要是保证服务器重启之后,之前的信息都可以正常保持

虚拟机管理模块
  • 因为交换机/队列/绑定都是以虚拟机为单元整体进⾏操作的,因此虚拟机是对以上数据管理模块整合模块

  • 虚拟机管理信息:

    • 交换机数据管理模块句柄

    • 队列数据管理模块句柄

    • 绑定数据管理模块句柄

    • 消息数据管理模块句柄

  • 虚拟机对外的接口

    • 提供虚拟机内交换机声明/删除操作

    • 提供虚拟机内队列声明/删除操作

    • 提供虚拟机内交换机-队列绑定/解绑操作

    • 获取交换机的相关绑定信息

  • 对虚拟机的管理操作

    • 创建虚拟机

    • 查询虚拟机

    • 删除虚拟机

交换路由模块
  • 当客⼾端发布⼀条消息到交换机后,这条消息,应该被⼊队到该交换机绑定的哪些队列中?答案是由交换路由模块来决定

  • 在绑定信息中有⼀个binding_key,⽽每条发布的消息中有⼀个routing_key,能否⼊队取决于两个要素:交换机类型和key

  • 不同交换机类型对应的路由操作:

    1. ⼴播:将消息⼊队到该交换机的所有绑定队列中
    2. 直接:将消息⼊队到绑定信息中binding_key与消息routing_key⼀致的队列中
    3. 主题:将消息⼊队到绑定信息中binding_key与routing_key是匹配成功的队列中
  • binding_key:

    • 是由数字字⺟下划线构成的, 并且使⽤ . 分成若⼲部分,比如:news.music.#
    • ⽀持 * 和 # 两种通配符, 但是 * # 只能作为 . 切分出来的独⽴部分, 不能和其他数字字⺟混⽤,比如:
      • ⽐如 a.*.b 是合法的, a.*a.b 是不合法的
      • * 可以匹配任意⼀个单词(注意是单词不是字⺟)
      • # 可以匹配任意零个或者多个单词(注意是单词不是字⺟)
  • routing_key:

    • 是由数据、字⺟和下划线构成, 并且使⽤ . 划分成若⼲部分,如:news.music.pop
  • 路由匹配算法详解

    这个路由匹配算法用于消息队列系统中,根据交换机类型和路由键来决定消息如何路由到队列。算法主要处理三种交换机类型:DIRECT(直接)、FANOUT(扇出)和TOPIC(主题)。

  • 算法概述

    • 该算法实现了AMQP协议中的路由匹配规则,特别是对TOPIC交换机的复杂模式匹配。算法采用动态规划方法解决主题交换机中的通配符匹配问题。

    • 三种交换机类型的处理

      1. DIRECT(直接交换机)
    if (type == ExchangeType::DIRECT) {return (routing_key == binding_key);
    }
    

    ​ 特点:

    • 精确匹配路由键和绑定键

    • 只有当routing_key完全等于binding_key时才匹配成功

    • 时间复杂度:O(1)

    示例:

    • routing_key: “order.create”

    • 匹配的binding_key: “order.create”

    • 不匹配的binding_key: “order.delete”

    1. FANOUT(扇出交换机)
    else if (type == ExchangeType::FANOUT) {return true;
    }
    

    特点:

    • 无条件匹配所有绑定

    • 消息会被路由到所有绑定的队列

    • 不考虑路由键和绑定键

    • 时间复杂度:O(1)

    示例:

    • 无论routing_key是什么,都会匹配所有binding_key
    1. TOPIC(主题交换机)
    else if (type == ExchangeType::TOPIC) {// 详细匹配逻辑...
    }
    

    特点:

    • 支持通配符匹配

    • 使用动态规划算法处理复杂模式

    • 时间复杂度:O(m*n),其中m和n分别是binding_key和routing_key的分段数

    TOPIC交换机匹配算法详解

    1. 键分割
    std::vector<std::string> bkeys, rkeys;
    int count_binding_key = StrHelper::split(binding_key, ".", bkeys);
    int count_routing_key = StrHelper::split(routing_key, ".", rkeys);
    
    • 将binding_key和routing_key按"."分割成多个部分

    • 例如:“news.music.pop” → [“news”, “music”, “pop”]

    1. 动态规划表初始化
    std::vector<std::vector<bool>> dp(count_binding_key + 1, std::vector<bool>(count_routing_key + 1, false));
    dp[0][0] = true;
    
    • 创建(m+1)×(n+1)的二维布尔数组dp

    • dp[i][j]表示binding_key前i部分能否匹配routing_key前j部分

    • 初始状态:空键匹配空键(dp[0][0] = true)

    1. 处理"#"开头的binding_key
    for (int i = 1; i <= count_binding_key; i++) {if (bkeys[i - 1] == "#") {dp[i][0] = true;continue;}break;
    }
    
    • "#"表示匹配零个或多个单词

    • 如果binding_key以"#"开头,则能匹配空routing_key

    • 例如binding_key: "#.news"可以匹配routing_key: “news”

    1. 动态规划填充
    for (int i = 1; i <= count_binding_key; i++) {for (int j = 1; j <= count_routing_key; j++) {// 情况1:当前单词相同或binding_key为"*"if (bkeys[i - 1] == rkeys[j - 1] || bkeys[i - 1] == "*") {dp[i][j] = dp[i - 1][j - 1];}// 情况2:当前binding_key为"#"else if (bkeys[i - 1] == "#") {dp[i][j] = dp[i - 1][j - 1] | dp[i][j - 1] | dp[i - 1][j];}}
    }
    

    匹配规则:

    1. 精确匹配或"*"通配符:

      • "*"匹配单个单词

      • 如果当前binding_key部分等于routing_key部分,或binding_key部分为"*"

      • 继承左上角的结果(dp[i][j] = dp[i-1][j-1])

    2. "#"通配符:

      • "#"匹配零个或多个单词

      • 可以从三个方向继承结果:

        • 左上(dp[i-1][j-1]):"#"匹配当前单词

        • 左(dp[i][j-1]):"#"匹配更多单词

        • 上(dp[i-1][j]):"#"匹配零个单词

    3. 返回结果

    return dp[count_binding_key][count_routing_key];
    
    • 返回动态规划表右下角的值

    • 表示整个binding_key能否匹配整个routing_key

    示例分析

    示例1:

    • binding_key: “news.*.pop”

    • routing_key: “news.music.pop”

    匹配过程:

    1. 分割:

      • bkeys: [“news”, “*”, “pop”]

      • rkeys: [“news”, “music”, “pop”]

    2. 动态规划表:

      • dp[1][1] = true (“news” == “news”)

      • dp[2][2] = true ("*“匹配"music”)

      • dp[3][3] = true (“pop” == “pop”)

    3. 结果:true

    示例2:

    • binding_key: “news.#”

    • routing_key: “news.music.pop.jazz”

    匹配过程:

    1. 分割:

      • bkeys: [“news”, “#”]

      • rkeys: [“news”, “music”, “pop”, “jazz”]

    2. 动态规划表:

      • dp[1][1] = true (“news” == “news”)

      • dp[2][2] = true ("#“匹配"music”)

      • dp[2][3] = true ("#“继续匹配"pop”)

      • dp[2][4] = true ("#“继续匹配"jazz”)

    3. 结果:true

    示例3:

    • binding_key: “news.music.#.jazz”

    • routing_key: “news.music.pop.jazz”

    匹配过程:

    1. 分割:

      • bkeys: [“news”, “music”, “#”, “jazz”]

      • rkeys: [“news”, “music”, “pop”, “jazz”]

    2. 动态规划表:

      • dp[1][1] = true (“news” == “news”)

      • dp[2][2] = true (“music” == “music”)

      • dp[3][3] = true ("#“匹配"pop”)

      • dp[4][4] = true (“jazz” == “jazz”)

    3. 结果:true

消费者管理模块
  • 消费者管理是以队列为单元的,因为每个消费者都会在开始的时候订阅⼀个队列的消息,当队列中有消息后,会将队列消息轮询推送给订阅了该队列的消费者。
  • 因此操作流程通常是,从队列关联的消息管理中取出消息,从队列关联的消费者中取出⼀个消费者,然后将消息推送给消费者(这就是发布订阅中负载均衡的⽤法)
  • 需要管理的消费者信息:
    • 标识
    • 订阅队列名称
    • ⾃动应答标志(决定了⼀条消息推送给消费者后,是否需要等待收到确认后再删除消息)
    • 消息处理回调函数指针(⼀个消息发布后调⽤回调,选择消费者进⾏推送)
  • 对消费者管理的操作:
    • 添加
    • 删除
    • 轮询获取指定队列的消费者
    • 移除队列所有消费者
信道管理模块
  • 在AMQP模型中,除了通信连接Connection概念外,还有⼀个Channel的概念,Channel是针对Connection连接的⼀个更细粒度的通信信道,多个Channel可以使⽤同⼀个通信连接Connection,但同一个Connection的Channel之间相互独⽴

  • 需要管理的信道信息

    • 信道ID
    • 信道关联的消费者
    • 信道关联的连接
    • 信道关联的虚拟机
    • ⼯作线程池(⼀条消息被发布到队列后,需要将消息推送给订阅了对应队列的消费者,过程由线程池完成)
  • 需要管理的操作:

    • 提供声明&删除交换机操作(删除交换机的同时删除交换机关联的绑定信息)

    • 提供声明&删除队列操作(删除队列的同时,删除队列关联的绑定信息,消息,消费者信息)

    • 提供绑定&解绑队列操作

    • 提供订阅&取消订阅队列消息操作

    • 提供发布&确认消息操作

链接管理模块
  • 链接模块的典型工作流程

    在这里插入图片描述

  • 管理结构图

    在这里插入图片描述

  • 虽然Muduo库里面提供了链接模块,但不能完全满足我们的要求,所以需要进行二次封装。

  • 除了基础的增删查链接外,还需要能够操作信道

  • 需要管理的链接信息:

    • 链接关联的信道
    • 链接关联的Muduo库Connection
  • 需要管理的操作:

    • 新增链接
    • 删除链接
    • 获取链接
    • 打开信道
    • 关闭信道
Broker服务器模块
  • 整合以上所有模块,并搭建⽹络通信服务器,实现与客⼾端⽹络通信,能够识别客⼾端请求,并提供客⼾端请求的处理服务
  • 需要管理的服务器信息:
    • 虚拟机管理模块句柄
    • 消费者管理模块句柄
    • 连接管理模块句柄
    • ⼯作线程池句柄
    • muduo库通信所需元素

客户端模块

消费者管理
  • 消费者在客⼾端的存在感⽐较低,因为在⽤⼾的使⽤⻆度中,只要创建⼀个信道后,就可以通过信道完成所有的操作,因此对于消费者的感官更多是在订阅的时候传⼊了⼀个消费者标识,且当前的简单实现也仅仅是⼀个信道只能创建订阅⼀个队列,也就是只能创建⼀个消费者,它们⼀⼀对应,因此更是弱化了消费者的存在

  • 需要管理的消费者信息

    • 标识

      b. 订阅队列名称

      c. ⾃动应答标志(决定了⼀条消息推送给消费者后,是否需要等待收到确认后再删除消息)

      d. 消息处理回调函数指针(⼀个消息发布后调⽤回调,选择消费者进⾏推送)

  • 需要管理的消费者操作:

    • 添加
    • 删除
    • 轮询获取指定队列的消费者
    • 移除队列所有消费者等操作
信道请求模块
  • 与服务端的信道类似,客⼾端这边在AMQP模型中,也是除了通信连接Connection概念外,还有⼀个Channel的概念,Channel是针对Connection连接的⼀个更细粒度的通信信道,多个Channel可以使⽤同⼀个通信连接Connection进⾏通信,但是同⼀个Connection的Channel之间相互独⽴

  • 需要管理的信道信息:

    • 信道ID

    • 信道关联的通信连接

    • 信道关联的消费者

    • 请求对应的响应信息队列(这⾥队列使⽤hash表,以便于查找指定的响应)

    • 互斥锁&条件变量(⼤部分的请求都是阻塞操作,发送请求后需要等到响应才能继续,但是muduo库的通信是异步的,因此需要我们⾃⼰在收到响应后,通过判断是否是等待的指定响应

      来进⾏同步)

  • 需要管理的信道操作:

    • 提供创建信道操作
    • 提供删除信道操作
    • 提供声明交换机操作(强断⾔-有则OK,没有则创建)
    • 提供删除交换机
    • 提供创建队列操作(强断⾔-有则OK,没有则创建)
    • 提供删除队列操作
    • 提供交换机-队列绑定操作

相关文章:

  • 如何查看某个文件中的特殊符号
  • [原创](现代Delphi 12指南):[macOS 64bit App开发]: 如何获取自身程序的所在的目录?
  • 【前端基础】8、CSS的选择器
  • Jquery ajax 提交序列化或JSON数据到后台
  • LeetCode算法题(Go语言实现)_61
  • 基于大数据分析的Facebook隐私保护策略
  • 全球电商新势力崛起:拆解Coupang的“韩国速度“与未来棋局
  • ESP32开发之freeRTOS的互斥量
  • C++:扫雷游戏
  • MCP vs Function Call:AI交互的USB-C革命
  • Python实现文件批量改名功能
  • MySQL中隔离级别那点事
  • rom定制系列------红米note12 5G版miui14修改型号root版 原生安卓14批量线刷固件 原生安卓15等
  • 【MySQL】存储引擎 - CSV详解
  • @AutoConfigureBefore功能简介-笔记
  • Windows系统下使用Kafka和Zookeeper,Python运行kafka(一)
  • Java 基础知识点——数组相关
  • [java八股文][Java并发编程面试篇]场景
  • 自研MCU芯片闪存驱动的实现:OpenOCD详细过程记录与操作指南
  • 关于vue-office在vue3工程中的引用报错问题
  • 一周文化讲座|城市移民与数字时代的新工作
  • 早期投资人蜂巧资本清仓泡泡玛特套现超22亿港元,称基金即将到期
  • 长三角地区中华老字号品牌景气指数发布,哪些牌子是你熟悉的?
  • 高盛上调A股未来12个月目标点位,沪深300指数潜在回报15%
  • 优化网络营商环境,上海严厉打击涉企网络谣言、黑灰产等违法犯罪
  • 世界人形机器人运动会将在北京“双奥场馆”举行