当前位置: 首页 > news >正文

模拟实现消息队列项目

项目介绍:

在实际的后端开发中, 尤其是分布式系统里, 跨主机之间使用生产者消费者模型, 也是非常普遍的需求。因此, 我们通常会把阻塞队列封装成一个独立的服务器程序, 并且赋予其更丰富的功能。 这样的服务程序称为消息队列 (Message Queue, MQ)。

消息队列有很多:RabbitMQ ,Kafka ,RocketMQ , ActiveMQ等;其中RabbitMQ比较知名,所以仿照 RabbitMQ 模拟实现一个简单的消息队列。

模拟实现的消息队列相当于是消息的发布-订阅的功能。

环境搭建:

protobuf:实现序列化和反序列化的框架,进行序列化和反序列化。

Muduo:用于底层通信的框架。

SQLite3:轻量化数据库。

第三方库的介绍:

Protobuf:(全称Protocol Buffer)是数据结构序列化和反序列化框架。

它具有以下特点:

• 语言无关、平台无关:即 ProtoBuf 支持 Java、C++、Python 等多种语言,支持 多个平台

• 高效:即比 XML 更小、更快、更为简单

• 扩展性、兼容性好:你可以更新数据结构,而不影响和破坏原有的旧程序

Muduo:是一个基于非阻塞IO和事件驱动的C++高并发TCP网络编程库。是一款基于主从Reactor模型的网络库。

reactor模型:基于事件触发的模型(基于epoll进行io时间监控)。

主从reactor:将io事件监控进行进一步的层次划分。

主reactor:只对新建连接进行监控(保证不受io阻塞影响,实现高效的新建连接获取)

从reactor:针对新建的链接进行io时间监控(进行io操作和业务处理)

主从reactor必然是一个多执行流的并发模式。

Muduo库常见接口:

TcpServer 类:搭建服务器

EventLoop类:事件监控,业务处理

SQLite:是一个进程内的轻量级数据库,它实现了自给自足的、无服务器的、零配置的、 事务性的 SQL 数据库引擎。

GTest:是一个跨平台的 C++单元测试框架,它提供了丰富的断言、致命和非致命判断、参数化等。

单元测试框架可以很清楚的看到几个测试案例通过了,几个没有通过。

C++11 异步操作:

std::future是C++11标准库中的一个模板类,它表示一个异步操作的结果。std::future可以帮助我们在需要的时候获取任务的执行 结果。std::future的一个重要特性是能够阻塞当前线程,直到异步操作完成,从而确保 我们在获取结果时不会遇到未完成的操作。

//std::launch::deffered在执行get获取异步结果的时候,才会执行add这个异步任务,表明该函数会被延迟调用,直到在future上调用get()或者wait()才会开始执行任务 

//std::launch::async内部会创建线程,异步的完成任务(表明函数会在自己创建的线程上运行)

C++11实现线程池:

线程池的工作思想: a. 用户传入要执行的函数,以及需要处理的数据(函数的参数),由线程池中的 工作线程来执行函数完成任务

管理的成员 1. 任务池:用vector维护的一个函数任务池子 2.互斥锁 & 条件变量: 实现同步互斥 3. 一定数量的工作线程:用于不断从任务池取出任务执行任务 4.结束运行标志:以便于控制线程池的结束。 

管理的操作: 1.入队任务:入队一个函数和参数 2.停止运行:终止线程池

项目需求分析:

具体的流程是:发布消息之后,消息发送到交换机上,交换机根据某个规则和消息队列进行bind绑定,将消息发送到对应的队列中,最后当消费者订阅了某个队列中的消息,就会将消息推送出去。

消息队列服务器:最核心的部分, 负责消息的存储和转发

虚拟机 (VirtualHost): 类似于 MySQL 的 "database", 是一个逻辑上的集合。一个 消息队列服务器上可以存在多个 VirtualHost。

交换机 (Exchange): 生产者把消息先发送到 Broker 的 Exchange 上,再根据不同 的规则, 把消息转发给不同的 Queue 

队列 (Queue): 真正用来存储消息的部分, 每个消费者决定自己从哪个 Queue 上 读取消息

绑定 (Binding): Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以 理解成 "多对多" 关系,使用一个关联表就可以把这两个概念联系起来。

上述数据结构, 既需要在内存中存储, 也需要在硬盘中存储 ;

内存存储: 方便使用 

硬盘存储: 重启数据不丢失

要实现的内容:
1.broker服务器:消息队列服务器。
2.消息发布客户端:向服务器发布消息。
3.消息订阅客户端:从服务器订阅消息。
AMQP协议:交换机、队列、绑定三者组合在一起叫做虚拟机。

模块划分:

 数据管理模块---交换机数据管理:

 数据管理模块---队列数据管理:

 数据管理模块---绑定数据管理: 

 数据管理模块---消息数据管理: 

 虚拟机数据管理模块:

 路由匹配模块:

 消费者管理模块:

 信道管理:

 连接管理:

 服务器模块:

整体流程图:

 项目框架目录:

队列消息管理:

交换机路由管理:

匹配算法讲解视频:第84个

队列消费者/订阅者管理:

信道管理设计之前需要进行网络通信协议的设计;因为Channel是针对Connection连接的一个通信信道,

客户端这边存在两个异步工作线程:

• 一个是muduo库中客户端连接的异步循环线程EventLoopThread。(io事件监控)

• 一个是当收到消息后进行异步处理的工作线程池。(收到的消息异步处理)

项目总结:

仿RabbitMQ实现一个简化版的消息队列组件,其内部 实现了消息队列服务器以及客户端的搭建,并支持不同主机间消息的发布与订阅及消 息推送功能。

项目中所用到的技术:基于muduo库实现底层网络通信服务器和客户端的搭建, 在应用层基于protobuf协议设计应用层协议接口,在数据管理上使用了轻量数据库 sqlite 来进行数据的持久化管理,以及基于AMQP模型的理解(broker服务器包括队列,交换机,绑定等整合到一起),实现整个消息队列项目技术的整合,并在项目的实现过程中使用gtest框架进行单元测试,完成项目的最终实现。

代码出现的问题:

mq_helper写的时候出现很多细节问题。

protoc --cpp_out=./mq_msg.proto中的./后面要有空格。

写函数时返回值类型写错了

标点符号。


需要无参构造,但是类里面没有无参构造,所以需要定义一个无参构造函数。


变量名避免和数据库重名


在删除交换机代码时,数据库写错了命令。


应用层协议采用lv协议解决粘包问题

代码链接:

https://gitee.com/feng-peijie/queue.git

http://www.dtcms.com/a/293509.html

相关文章:

  • 使用PEghost恢复系统(笔记版)
  • OpenEuler系统架构下编译redis的RPM包
  • [Mediatek] MTK openwrt-21.02 wifi 没启动问题
  • Android Multidex 完全解析:解决64K方法数限制
  • Java 虚拟线程在高并发微服务中的实战经验分享
  • 从0开始学习R语言--Day55--弹性网络
  • TDengine 的 HISTOGRAM() 函数用户手册
  • LabVIEW激光雷达障碍物识别
  • #C语言——学习攻略:操作符的探索(二)
  • 架构师--基于常见组件的微服务场景实战
  • VI Server 操控 LabVIEW 工程
  • DeepSeek Janus Pro本地部署与调用
  • 基于Trae IDE与MCP实现网页自动化测试的最佳实践
  • CI/CD与DevOps集成方法
  • 希尔排序cc
  • 无人机减震模块技术解析
  • Java冒泡排序的不同实现
  • 无人机吊舱减震球模块运行分析
  • 如何在Pico等Android头显中实现无人机低延迟RTMP全景巡检画面播放
  • Cursor(vscode)一些设置
  • 【基于OpenCV的图像处理】图像预处理之图像色彩空间转换以及图像灰度化处理
  • 高亮匹配关键词样式highLightMatchString、replaceHTMLChar
  • 图论的题目整合(Dijkstra)
  • 货车手机远程启动功能的详细使用步骤及注意事项
  • Elasticsearch 字段值过长导致索引报错问题排查与解决经验总结
  • git初始流程
  • [2025CVPR-小目标检测方向]基于特征信息驱动位置高斯分布估计微小目标检测模型
  • 什么是GCN?GCN与GNN有哪些区别?
  • SpringBoot与Vue实战:高效开发秘籍
  • 快手视觉算法面试30问全景精解