模拟实现消息队列项目
项目介绍:
在实际的后端开发中, 尤其是分布式系统里, 跨主机之间使用生产者消费者模型, 也是非常普遍的需求。因此, 我们通常会把阻塞队列封装成一个独立的服务器程序, 并且赋予其更丰富的功能。 这样的服务程序称为消息队列 (Message Queue, MQ)。
消息队列有很多:RabbitMQ ,Kafka ,RocketMQ , ActiveMQ等;其中RabbitMQ比较知名,所以仿照 RabbitMQ 模拟实现一个简单的消息队列。
模拟实现的消息队列相当于是消息的发布-订阅的功能。
环境搭建:
protobuf:实现序列化和反序列化的框架,进行序列化和反序列化。
Muduo:用于底层通信的框架。
SQLite3:轻量化数据库。
第三方库的介绍:
Protobuf:(全称Protocol Buffer)是数据结构序列化和反序列化框架。
它具有以下特点:
• 语言无关、平台无关:即 ProtoBuf 支持 Java、C++、Python 等多种语言,支持 多个平台
• 高效:即比 XML 更小、更快、更为简单
• 扩展性、兼容性好:你可以更新数据结构,而不影响和破坏原有的旧程序
Muduo:是一个基于非阻塞IO和事件驱动的C++高并发TCP网络编程库。是一款基于主从Reactor模型的网络库。
reactor模型:基于事件触发的模型(基于epoll进行io时间监控)。
主从reactor:将io事件监控进行进一步的层次划分。
主reactor:只对新建连接进行监控(保证不受io阻塞影响,实现高效的新建连接获取)
从reactor:针对新建的链接进行io时间监控(进行io操作和业务处理)
主从reactor必然是一个多执行流的并发模式。
Muduo库常见接口:
TcpServer 类:搭建服务器
EventLoop类:事件监控,业务处理
SQLite:是一个进程内的轻量级数据库,它实现了自给自足的、无服务器的、零配置的、 事务性的 SQL 数据库引擎。
GTest:是一个跨平台的 C++单元测试框架,它提供了丰富的断言、致命和非致命判断、参数化等。
单元测试框架可以很清楚的看到几个测试案例通过了,几个没有通过。
C++11 异步操作:
std::future是C++11标准库中的一个模板类,它表示一个异步操作的结果。std::future可以帮助我们在需要的时候获取任务的执行 结果。std::future的一个重要特性是能够阻塞当前线程,直到异步操作完成,从而确保 我们在获取结果时不会遇到未完成的操作。
//std::launch::deffered在执行get获取异步结果的时候,才会执行add这个异步任务,表明该函数会被延迟调用,直到在future上调用get()或者wait()才会开始执行任务
//std::launch::async内部会创建线程,异步的完成任务(表明函数会在自己创建的线程上运行)
C++11实现线程池:
线程池的工作思想: a. 用户传入要执行的函数,以及需要处理的数据(函数的参数),由线程池中的 工作线程来执行函数完成任务
管理的成员 1. 任务池:用vector维护的一个函数任务池子 2.互斥锁 & 条件变量: 实现同步互斥 3. 一定数量的工作线程:用于不断从任务池取出任务执行任务 4.结束运行标志:以便于控制线程池的结束。
管理的操作: 1.入队任务:入队一个函数和参数 2.停止运行:终止线程池
项目需求分析:
具体的流程是:发布消息之后,消息发送到交换机上,交换机根据某个规则和消息队列进行bind绑定,将消息发送到对应的队列中,最后当消费者订阅了某个队列中的消息,就会将消息推送出去。
消息队列服务器:最核心的部分, 负责消息的存储和转发
虚拟机 (VirtualHost): 类似于 MySQL 的 "database", 是一个逻辑上的集合。一个 消息队列服务器上可以存在多个 VirtualHost。
交换机 (Exchange): 生产者把消息先发送到 Broker 的 Exchange 上,再根据不同 的规则, 把消息转发给不同的 Queue
队列 (Queue): 真正用来存储消息的部分, 每个消费者决定自己从哪个 Queue 上 读取消息
绑定 (Binding): Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以 理解成 "多对多" 关系,使用一个关联表就可以把这两个概念联系起来。
上述数据结构, 既需要在内存中存储, 也需要在硬盘中存储 ;
内存存储: 方便使用
硬盘存储: 重启数据不丢失
要实现的内容:
1.broker服务器:消息队列服务器。
2.消息发布客户端:向服务器发布消息。
3.消息订阅客户端:从服务器订阅消息。
AMQP协议:交换机、队列、绑定三者组合在一起叫做虚拟机。
模块划分:
数据管理模块---交换机数据管理:
数据管理模块---队列数据管理:
数据管理模块---绑定数据管理:
数据管理模块---消息数据管理:
虚拟机数据管理模块:
路由匹配模块:
消费者管理模块:
信道管理:
连接管理:
服务器模块:
整体流程图:
项目框架目录:
队列消息管理:
交换机路由管理:
匹配算法讲解视频:第84个
队列消费者/订阅者管理:
信道管理设计之前需要进行网络通信协议的设计;因为Channel是针对Connection连接的一个通信信道,
客户端这边存在两个异步工作线程:
• 一个是muduo库中客户端连接的异步循环线程EventLoopThread。(io事件监控)
• 一个是当收到消息后进行异步处理的工作线程池。(收到的消息异步处理)
项目总结:
仿RabbitMQ实现一个简化版的消息队列组件,其内部 实现了消息队列服务器以及客户端的搭建,并支持不同主机间消息的发布与订阅及消 息推送功能。
项目中所用到的技术:基于muduo库实现底层网络通信服务器和客户端的搭建, 在应用层基于protobuf协议设计应用层协议接口,在数据管理上使用了轻量数据库 sqlite 来进行数据的持久化管理,以及基于AMQP模型的理解(broker服务器包括队列,交换机,绑定等整合到一起),实现整个消息队列项目技术的整合,并在项目的实现过程中使用gtest框架进行单元测试,完成项目的最终实现。
代码出现的问题:
mq_helper写的时候出现很多细节问题。
protoc --cpp_out=./mq_msg.proto中的./后面要有空格。
写函数时返回值类型写错了
标点符号。
需要无参构造,但是类里面没有无参构造,所以需要定义一个无参构造函数。
变量名避免和数据库重名
在删除交换机代码时,数据库写错了命令。
应用层协议采用lv协议解决粘包问题
代码链接:
https://gitee.com/feng-peijie/queue.git