当前位置: 首页 > news >正文

【仿RabbitMQ的发布订阅式消息队列】--- 模块设计与划分

 Welcome to 9ilk's Code World

       

(๑•́ ₃ •̀๑) 个人主页:        9ilk

(๑•́ ₃ •̀๑) 文章专栏:     项目


从之前对消息队列工作模型的分析,我们的项目总体分为三个部分实现:

1. 服务端

2. 发布客户端

3. 订阅客户端

主要的模块划分图如下:

服务端模块

数据管理模块

   数据管理模块主要完成的工作是管理交换机、队列、队列绑定、消息数据的管理,分别对他们实现数据的管理(增删查)以及持久化的存储。因此数据管理模块主要是以下四个小模块的整合:

  • a. 交换机数据管理模块
  • b. 队列数据管理模块
  • c. 绑定数据管理模块
  • d. 消息数据管理模块

交换机数据管理

管理的数据:

  • 交换机名称 : 作为交换机的唯一标识。

  • 交换机类型 : 决定了消息的转发方式。每个队列绑定中有个binding_key每条消息中有个routing_key。

直接交换 --- binding_key与routing_key相同,则将消息放入队列。

广播交换 --- 将消息放入交换机绑定的所有队列。

主题交换 --- routing_key与多个绑定队列的binding_key有多个匹配规则,匹配成功则放入

  • 持久化标志:决定了当前交换机信息是否需要持久化存储,也就决定该交换机重启后数据是否还存在。

  • 自动删除标志:指的是关联了当前交换机的所有客户端退出了,是否要自动删除交换机。(扩展用)

  • 其他参数:可以用来后续扩展。

对交换机的管理操作:

  • 创建交换机:本质上需要的是声明 ---- 强断言的思想,有就OK,没有则创建。

  • 删除交换机:注意每个交换机都会绑定一个或多个队列(意味着会有一个或多个绑定信息) ,删除交换机需要删除需要删除相关的绑定信息

  • 获取指定名称交换机。

  • 获取当前交换机数量。

队列数据管理

管理的数据:

  • 队列名称 : 作为队列的唯一标识。

  • 持久化存储标志 : 决定了是否将队列信息持久化存储起来 , 决定了重启后 , 这个队列是否还存在 , 也决定了队列中的消息是否需要持久化

注意:一个队列如果持久化标志为false,意味着重启后队列就没了,没有客户端能订阅该队列的消息,此时这个队列的消息如果持久化是没有意义的,队列没了,也没人能把消息拿走,因此通常一个队列持久化标志为false,那么它的消息也不需要持久化

  • 是否独占标志:独占指的是只有当前客户端自己能够去订阅队列消息,不允许其他客户端订阅该队列消息。

  • 自动删除标志:当订阅了当前队列的所有客户端退出后,是否删除队列(扩展)。

  • 其他参数

对队列的管理操作:

  • 创建队列

  • 删除队列

  • 获取指定队列信息

  • 获取队列数量

  • 获取所有队列名称:当系统重启后,需要重新加载数据,加载历史消息(消息以队列为单元存储在文件中当系统重启后,我们需要知道有哪些队列,才知道加载哪些消息

绑定数据管理

   该模块主要描述的是哪个队列和哪个交换机产生关联。

管理的数据:

  • 交换机名称

  • 队列名称

  • binding_key:绑定密钥 --- 描述了在交换机的主题交换&直接交换的消息发布匹配规则。由数字,字符,_, # , . , *组成。比如:news.music.#

对绑定的管理操作:

  • 添加绑定

  • 解除绑定

  • 获取交换机相关的所有绑定信息(删除交换机的时候,要删除相关绑定信息;当消息发布到交换机,交换机得通过这些信息来将消息发布到指定队列)

  • 获取队列相关的所有绑定信息(删除队列的时候,也要删除相关的绑定信息)

  • 获取绑定信息数量

消息数据管理

1. 消息信息

(1) 消息属性:

  • 消息ID --- 消息唯一标识
  • 持久化标志:表示是否对消息进行持久化,实际上还取决于队列的持久化标志
  • routing_key:决定了当前消息要发布的队列(消息发布到交换机后,根据绑定队列的 binding_key决定是否发布到指定队列)

(2)消息主体:消息内容

(3)服务端用于管理需添加的信息:

  • 消息偏移量:消息以队列为单元存储在文件中,这个偏移量指的是当前消息相对于文件起始位置的偏移量。
  • 消息长度:从偏移量位置取出指定长度的消息(解决粘包问题)
  • 是否有效标志:标识当前消息是否已经被删除。

注意:删除一条消息并不会每次直接将后边的数据直接拷贝到前面,而只是重置标志。我们这里规定,当一个文件中,有效消息占据总消息比例不到50%且数据量超过2000,则进行垃圾回收,重新整理文件数据存储。当系统重启,也只需要重新加载有效消息即可,相当于进行了一次垃圾回收。

2. 消息的管理

管理方式:以队列为单元进行管理(因为消息的所有操作都是以队列为单元)。

管理的数据:

  • 待推送消息链表  ----  保存所有的待推送消息
  • 待确认消息hash:消息推送给客户端,会等待客户端进行消息确认,收到确认后,才会真正删除消息。
  • 持久化消息hash:假设消息都会进行持久化存储 , 操作过程中会存在垃圾回收操作(其实就是将有效消息读取出来,然后重新截断文件,将消息连续写入文件中,操作完文件中都是有效消息),但是垃圾回收会改变消息的存储位置,但是内存中的消息也会存储消息的实际存储位置,垃圾回收后就不一致了,因此每次垃圾回收后要用新的位置去更新持久化消息的信息。

  • 持久化的有效消息数量

  • 持久化的总消息数量:和持久化的有效消息数决定了什么时候进行垃圾回收。

3. 对每个队列提供的消息管理操作

  • 向队列新增消息
  • 获取队首消息:获取消息后就会将消息从待推送消息链表删除 ,因为它不再是待发送消息而是待确认消息,应该加入到待确认消息中hash。

  • 确认消息:从待确认消息hash中移除消息,并进行持久化数据的删除。

  • 恢复队列历史消息:主要是在构造函数中进行,因为只有在重启的时候才会进行。

  • 垃圾回收:在消息持久化子模块完成。持久化文件中有效消息比例小于50%,且总消息数量超过2000进行垃圾回收。

  • 删除队列相关消息文件:当一个队列被删除那它的消息也就没有存在的意义了。

4. 对外提供的队列消息管理操作

  • 初始化队列消息结构

  • 移除队列消息结构:在一个队列创建/删除的时候调用。

  • 向队列新增消息。

  • 对队列消息进行确认。

  • 恢复队列历史消息

虚拟机数据管理

   虚拟机数据管理模块其实是对前面几个数据管理模块(交换机数据管理,队列数据管理,绑定数据管理,消息数据管理)的整合。

1. 要管理的数据

  • 交换机数据管理句柄

  • 队列数据管理句柄

  • 绑定信息管理句柄

  • 消息数据管理句柄

2. 要管理的操作

  • 声明/删除交换机:注意在删除交换机的时候要删除相关的绑定信息

  • 声明/删除队列:注意在删除队列的时候,要删除相关的绑定信息以及消息数据。

  • 队列的绑定/解除绑定:注意绑定的时候必须交换机和队列是存在的。

  • 获取指定队列的消息

  • 对指定队列的指定消息进行确认

  • 获取交换机相关的所有绑定信息:一条消息要发布给指定交换机的时候,交换机获取所有绑定信息,来确定消息要发布到哪些队列。

路由匹配模块

    该模块决定了一条消息是否能够发布到指定队列。在每个队列跟交换机的绑定信息中,都有一搜个binding_key,表示队列发布的匹配规则;同时在每条要发布的消息中都有一个routing_key,表示消息的发布规则。

    交换机我们这里设置三种类型:

  • 广播:直接将消息发布给该交换机的所有绑定队列。
  • 直接:routing_keybinding_key完全一致匹配成功。
  • 主题:binding_key中是匹配规则,routing_key是消息规则,匹配成功才能发布。

路由匹配模块本质上没有需要管理的数据,但是它需要对外提供路由匹配操作:

  • 提供一个判断routing_keybinding_key是否能够匹配成功的接口。
  • 判断routing_key是否符合规定:格式约定为只能由数字,字母,.构成。
  • 判断binding_key是否符合规定:只能由数字,字母,_  #  *构成。

消费者管理模块

     客户端有两种,一个是消息发布客户端,一个是订阅消息客户端,对于订阅指定队列消息的客户端才是消费者。消费者数据的存在是必要的,因为当指定队列有了消息,就需要将消息推送给消费者客户端(比如推送的时候就需要找到这个客户端相关的信道)

消费者信息:

  1. 消费者标识
  2. 订阅队列名称:当当前队列有消息就会推送给这个客户端以及当客户端收到消息,需要对指定队列的消息进行确认。
  3. 自动确认标志:自动确认 --- 推送消息后直接删除消息不需要额外确认 ;手动确认 --- 推送消息后需要等待收到确认回复再去删除消息。
  4. 消费处理回调函数指针:队列有一条消息后通过哪个函数进行处理(函数内部其实逻辑固定,即向指定客户端推送消息)

  消费者管理:管理思想是以队列为单元进行管理,因为每个消费者订阅的都是指定队列的消息,并且消费者对消息进行确认也是以队列进行确认。关键的是,当指定队列中有消息,必然是需要获取订阅了这个队列的消费者信息来进行消息推送。

队列消费者管理结构的数据信息:

  • 消费者链表 :  保存当前队列的所有消费者信息

不同消息队列会采取不同的消费模式:

(1)队列模式(比如RabbitMQ)是一条消息只能被一个消费者消费,以RR轮转的方式;

(2)发布/订阅者模式(Kafka):一条消息可以被多个消费者同时消费,每个订阅者都会收到消息的副本。

应用场景:

(1)任务处理:只需要一个消费者执行。

(2)事件通知:需要多个消费者接收。

队列消费者管理操作:

1. 新增消费者

2. RR轮转获取一个消费者

3.删除消费者

4.队列消费者数量

5. 队列消费者是否为空

整体的队列消费者管理操作:

  1. 初始化队列消费者结构
  2. 删除队列消费者结构
  3. 向指定队列添加消费者
  4. 获取指定队列消费者
  5. 删除指定队列消费者

信道管理模块

  • 信道是网络通信中的一个概念,叫做通信通道。网络通信的时候必然是通过网络通信连接完成的,为了能够更加充分的利用资源,因此对通信连接又进行更进一步的细化,细化出了通信通道。

  • 对于用户来说,一个通信信道,就是进行网络通信的载体,而一个真正的通信连接可以创建出多个通信通道,每一个信道之间,在用户的眼中是相互独立的,而本质它们底层使用同一个通信连接进行网络通信。因此因为信道是用户眼中的一个通信通道,所以所有的网络通信服务都是由信道提供的。

信道提供的操作(服务):

  1. 声明 / 删除交换机
  2. 声明 / 删除队列
  3. 绑定 / 解绑队列与交换机
  4. 发布消息 / 订阅队列消息 / 队列消息确认 / 取消队列订阅

    信道要管理的数据:

    1. 信道关联的虚拟机句柄
    2. 信道关联的消费者句柄:当信道关闭的时候,所有关联的消费者订阅都要取消,相当于删除所有的相关消费者。
    3. 工作线程池句柄:信道将消息发布到指定队列操作之后,从指定队列获取一个消费者,对这条消息进行消费,这个工作不能在当前线程,否则会阻塞。(也就是将这条消息推送给一个客户端的操作交给线程池执行,并非每个信道都有一个线程池,而是整个服务器有一个线程池,大家所有的信道都是通过同一个线程池进行异步操作而已)
    4. 信道ID

    信道整体管理操作:

    1. 创建一个信道
    2. 关闭一个信道
    3. 获取指定信道句柄

    连接管理模块

           在网络通信模块中,我们使用muduo库实现底层通信,而muduo库中本身就有一个Connection连接的概念和对象类。但是我们的连接中还有一个上层通信信道的概念,这个概念在muduo库中是没有的。因此我们需要在用户的层面对muduo库中的connection连接进行二次封装,形成我们自己所需的连接管理。

    管理的数据:

    1. muduo库的通信连接
    2. 当前连接关联的信道管理句柄

    提供的操作:

    1. 创建信道
    2. 关闭信道

    连接整体管理操作:

    1. 新增连接
    2. 关闭连接
    3. 获取指定连接信息。

    Broker服务器模块

        该模块主要是整合前面所有的模块,同时搭建网络通信服务器,实现与客户端网络通信,能够识别客户端的请求,并提供客户端请求的处理服务。本质上这个模块并不提供实质的功能性操作,这个模块最重要的是资源的整合,是一个资源的载体。

    管理信息:

    1. 虚拟机管理模块句柄
    2. 消费者管理模块句柄
    3. 连接管理模块句柄
    4. 工作线程池句柄
    5. muduo库通信所需元素

    服务器与资源的关系:

    1. 一个服务器有一个工作线程池,其他所有信道操作的都是这同一个线程池。
    2. 一个服务器有一个虚拟机,其他所有交换机,队列,绑定,消息的操作都是针对这个虚拟机进行的。
    3. 一个服务器有一个消费者管理
    4. 通信相关连接管理,协议处理模块句柄,也是一整个服务器都有一套。

    客户端

    消费者管理

    消费者信息:

    1. 消费者标识
    2. 订阅队列名称
    3. 自动确认标志
    4. 消费处理回调函数指针

            当前消费者订阅了某一个队列的消息,这个队列有了消息后,就会将消息推送给这个客户端,这时候收到了消息则使用回调函数进行处理,处理完毕后就会根据确认标志决定是否进行消息确认。

    管理的操作:对消费者的增删查。

    信道管理

    其实跟服务器的一样,因为客户端要给用户提供什么服务,服务器就要给客户端提供什么服务。

    管理信息:

    (1)消费者管理句柄:每个信道都有自己相关的消费者。

    (2)线程池句柄:对推送过来的消息进行回调处理,处理过程通过工作线程来进行。

    (3)信道ID

    (4)信道关联的连接

    信道提供的服务:

    (1)声明/删除交换机

    (2)声明/删除队列

    (3)绑定/解绑队列与交换机

    (4)发布消息/确认消息

    (5)订阅队列消息/取消订阅队列消息

    (6)创建/关闭信道

    信道的管理:信道的增删查

    连接管理模块

            客户端连接的管理本质上是对客户端TcpClient的二次封装和管理。面对用户,不需要有客户端的概念,连接对于用户来说就是客户端通过连接创建信道,通过信道完成自己所需服务。因此当前客户端这边的连接对于用户来说就是一个资源的载体

    管理操作:

    1. 连接服务器
    2. 创建信道
    3. 关闭信道
    4. 关闭连接

    管理资源:工作线程池,连接关联的信道管理句柄

    异步工作池模块

    (1)TcpClient模块需要一个EventLoopThread模块进行IO事件监控

    (2)收到推送消息后,需要对推送过来的消息进行处理,因此需要一个线程池来帮助我们完成消息处理的过程

    注意:将异步工作线程模块独立出来,原因是多个连接用一个EventLoopThread进行I/O事件监控就够了, 以及所有的推送消息处理也只需要有一个线程池就够了,并不需要每个连接都要有一个EventLoop,也不需要每个信道的消息处理都有自己的线程池。

    模块关系图

    http://www.dtcms.com/a/542601.html

    相关文章:

  • 102.二叉树的层序遍历
  • 互粉的网站是怎么做的浏览器网站入口
  • 企业网站建设参考资料WordPress首页站内搜索
  • 宁波做网站优化设计制作一个 个人主页网站
  • 合肥做公司网站一般多少钱智能建站加盟电话
  • app外包网站上海有什么公司名称
  • 做网站需要几天温岭app开发公司
  • 【java】【springboot】队列涉及订单关闭数据返还异常
  • 建立主题网站的一般步骤哈什么网一个网站做ppt
  • 红色企业网站攸县网站制作公司
  • 夜莺监控设计思考(三)时序库、agent 的一些设计考量
  • 深圳个性化建网站服务商西安网站建设工作室
  • 大连住建部官方网站网站建设经典案例
  • 如何网上建设网站绵阳网站建设策划内容
  • 【LeetCode】91. 解码方法
  • 网站设计的规范安装wordpress插件目录下
  • 网站开发如何使用微信登录wordpress修改网址导航
  • 投资网站源码十堰响应式网站
  • 保定市做网站的电话做网站需要准备哪些
  • 那个网站做网站托管网站源码 预览
  • 基于物联网的智能楼宇门禁系统
  • 给公司做宣传网站的好处电气网站开发
  • 华为做网站吗怀化电视台网站
  • 网站优化哪家好中山精品网站建设讯息
  • 东城网站设计东莞海天网站建设
  • 织梦网站环境网页设计初学者公司网页设计模板
  • 营销网站优化seo二级域名ip查询
  • 企业官网型网站模板棋牌推广如何精准引流
  • 什么是住宅IP,住宅IP应用场景有哪些
  • 深圳龙岗区网站建设哪些网站可以免费申请