【项目设计】基于AMQP协议实现的简单消息队列
项目介绍
项目背景
在单机系统中,我们通常可以使用阻塞队列来实现生产者消费者模型,它能够非常好地解决线程间通信的三大痛点:
✅ 解耦合 ✅ 并发协调 ✅ 流量削峰
但当我们的系统走向分布式,跨主机的网络通信就会面临新的挑战:
🔥 网络不可靠(丢包/宕机)
🔥 动态扩缩容(节点随时增减)
🔥 协议不统一(多语言交互)
于是消息中间件应运而生,成为分布式系统的“中枢神经”,本项目参考了RabbitMQ的设计,实现了一个轻量级消息队列核心模块。
开发环境
- 操作系统:Linux(Ubuntu 20.04)
- 编辑器:VSCode/vim
- 编译器:g++ 9.4.0
- 调试器:gdb 9.2
- 构建系统:GNU Make
技术选型
类别 | 组件 | 图标 | 技术描述 |
---|---|---|---|
开发语言 | C++11 | 🚀 | 高性能系统开发首选,结合RAII机制保障资源安全 |
序列化框架 | Protocol Buffers v3.21 | 📦 | 二进制高效序列化,支持跨语言数据交换(C++/Python/Java) |
网络通信 | Muduo v2.3 + 自定义协议 | 🌐 | Reactor模式实现高并发 |
持久化存储 | SQLite3 3.39 | 💾 | 嵌入式数据库实现零配置管理 |
测试框架 | GoogleTest v1.12 | ✅ | 行为驱动测试(BDD)支持 |
项目地址:仿RabbitMQ实现消息队列: 用C++仿造RabbitMQ实现的消息队列
项目相关库学习
接下来我将先带大家熟悉下本项目使用到的一些主要的第三方库,并在学习的时候做一些简单的小demo,相信这样能够帮大家更好地学习到知识!
Protobuf
什么是Protobuf?
Protobuf(全称Protocol Buffer),是Google开元的一种跨语言、跨平台、可扩展的序列化框架,通过.proto文件定义数据结构,自动生成高效的编解码器。相较于JSON/XML等文本协议,具有显著的性能优势:
特性 | Protobuf | JSON | XML |
---|---|---|---|
序列化体积 | 🔻 小(二进制) | 🔺 大(文本) | 🔺 极大(标签) |
解析速度 | ⚡ 快(无反射) | 🐢 慢(动态解析) | 🐢 慢(DOM树) |
跨语言支持 | ✅ 20+语言 | ✅ 广泛 | ✅ 广泛 |
数据校验 | 🔒 强类型约束 | ❌ 弱类型 | ⚠️ 需额外Schema |
快速上手Protobuf
快速上手protobuf?
我们以⼀个简单通讯录的实现来驱动对Protobuf的学习。在通讯录demo中,我们将实现:
- 对联系人的信息进行序列化,并将序列化后的结果打印出来
- 对序列化后的内容进行反序列化,并将解析得到的信息打印出来
- 联系⼈包含以下信息: 姓名、年龄
通过通讯录demo,我们能快速的了解ProtoBuf的使⽤流程。
创建.ptoto文件
.proto文件规范
- ⽂件命名应该使⽤全⼩写字⺟命名,多个字⺟之间⽤ _ 连接,例如: hello_world.proto
-
书写 .proto ⽂件代码时,应使⽤ 2 个空格的缩进
- Protocol Buffers 语⾔版本3,简称 proto3,是 .proto⽂件最新的语法版本。proto3简化了 Protocol Buffers 语⾔,既易于使⽤,⼜可以在更⼴泛的编程语⾔中使⽤。它允许你使⽤ Java,C++,Python 等多种语⾔⽣成 protocol buffer 代码。在 .proto ⽂件中,要使⽤ syntax = "proto3"; 来指定 ⽂件语法为 proto3,并且必须写在除去注释内容的第⼀⾏。 如果没有指定,编译器会使⽤proto2语法。
指定contact.ptoto文件的语法:
syntax = "proto3";
package 声明符
- package 是⼀个可选的声明符,能表⽰ .proto ⽂件的命名空间,在项⽬中要有唯⼀性。它的作⽤是为了避免我们定义的消息出现冲突。
声明contact.proto文件的命名空间
syntax = "proto3";
package contacts;
-
消息(message): 要定义的结构化对象,我们可以给这个结构化对象中定义其对应的属性内容。在⽹络传输中,我们需要为传输双⽅定制协议。定制协议说⽩了就是定义结构体或者结构化数据,⽐如,tcp,udp 报⽂就是结构化的。再⽐如将数据持久化存储到数据库时,会将⼀系列元数据统⼀⽤对象组织起来,再进⾏存储。ProtoBuf 就是以 message 的⽅式来⽀持我们定制协议字段,后期帮助我们形成类和⽅法来使⽤。
syntax = "proto3";
package contacts;
// 定义联系⼈消息
message PeopleInfo {
}
-
在 message 中我们可以定义其属性字段,字段定义格式为:字段类型 字段名 = 字段唯⼀编号;
-
字段名称命名规范:全⼩写字⺟,多个字⺟之间⽤ _ 连接。
-
字段类型分为:标量数据类型 和 特殊类型(包括枚举、其他消息类型等)。
-
字段唯⼀编号:⽤来标识字段,⼀旦开始使⽤就不能够再改变。
-
Protobuf 类型 | C++ 生成类型 | 描述 | 示例 |
---|---|---|---|
标量类型 | |||
int32 | int32_t | 32位有符号整数(变长编码,适合小数值) | int32 id = 1; |
sint32 | int32_t | 有符号整数,ZigZag编码(优化负值存储) | sint32 delta = 2; |
uint64 | uint64_t | 64位无符号整数(变长编码) | uint64 timestamp = 3; |
double | double | 双精度浮点数(固定8字节) | double latitude = 4; |
bool | bool | 布尔值(存储为1字节) | bool is_valid = 5; |
string | std::string | UTF-8字符串(最大2GB) | string name = 6; |
bytes | std::string | 二进制数据(无编码限制) | bytes encrypted_data = 7; |
复合类型 | |||
enum | 枚举类 | 强类型枚举(默认第一个值为0) | enum Status { PENDING=0; DONE=1; } |
repeated T | std::vector<T> | 动态数组(支持嵌套Message) | repeated int32 scores = 8; |
map<K, V> | std::unordered_map<K, V> | 键值对集合(K可为整型/string,V为任意类型) | map<string, int32> player_scores = 9; |
oneof | 联合访问器 | 互斥字段(同一时间只有一个字段有效) | oneof data { string text; bytes raw; } |
嵌套类型 | |||
message | 嵌套类 | 可嵌套定义结构体 | message User { string name=1; } |
特殊类型 | |||
Any | google::protobuf::Any | 任意类型容器(需导入google/protobuf/any.proto ) | Any metadata = 10; |
Timestamp | google::protobuf::Timestamp | 时间戳(精确到纳秒,需导入google/protobuf/timestamp.proto ) | Timestamp create_time = 11; |
为PeopleInfo添加姓名年龄字段:
syntax = "proto3";
package contacts;
message PeopleInfo {
string name = 1;
int32 age = 2;
}
// 消息中定义了如下编号,代码会告警:
// Field numbers 19,000 through 19,999 are reserved for the protobuf
implementation
string name = 19000;
protoc [--proto_path=IMPORT_PATH] --cpp_out=DST_DIR path/to/file.proto
protoc 是 Protocol Buffer 提供的命令⾏编译⼯具。
--proto_path : 指定被编译的.proto⽂件所在⽬录,可多次指定,可简写成 -I
IMPORT_PATH : 如不指定该参数,则在当前⽬录进⾏搜索。当某个.proto ⽂件
import其他.proto ⽂件时,或需要编译的.proto ⽂件不在当前⽬录下,
这时就要⽤-I来指定搜索⽬录。
--cpp_out= : 编译后的⽂件为 C++ ⽂件。
OUT_DIR : 编译后⽣成⽂件的⽬标路径。
path/to/file.proto : 要编译的.proto⽂件
protoc --cpp_out=. contacts.proto
编译contact.proto文件后,会生成所选择语言的代码,我们选择的是生成C++代码,所以生成了contact.pb.h和contact.pb.cc:
#include <iostream>
#include "contact.pb.h"
int main()
{
// 序列化过程
contacts::PeopleInfo info;
// 设置消息字段
info.set_name("小白");
info.set_age(17);
// 将message结构体序列化得到字符串
std::string info_str = info.SerializeAsString();
// 反序列化过程
contacts::PeopleInfo stu;
bool ret = stu.ParseFromString(info_str);
if(ret == false)
{
std::cout << "反序列化失败" << std::endl;
return 1;
}
std::cout << stu.name() <<std::endl;
std::cout << stu.age() <<std::endl;
return 0;
}
编写这段代码后,我们进行编译,需要注意的是,由于引入了protobuf库,所以编译时要链接protobuf库,由于使用了protobuf为我们生成的contact.pb.h,所以编译时contact.pb.cc也要编译:
g++ -o test info.cpp contact.pb.cc -lprotobuf
运行顺利得到反序列化结果:
Muduo库
快速上手Muduo库
什么是Muduo库?
Muduo的核心设计理念?
组件 | 职责 | 线程归属 |
---|---|---|
Main Reactor | 监听新连接(accept ) | 主线程(单线程) |
Sub Reactor | 处理已连接套接字的 I/O 事件(read/write ) | 线程池(多线程) |
优势:
- 主Reactor可以快速响应连接,能够避免I/O操作阻塞
- 从Reactor分散负载到多线程,提升吞吐量
2. One Loop Per Thread模型
- 核心规则:
- 每个线程都有自己独立的EventLoop(事件循环)
- 每个TCP连接都固定归属于某个EventLoop线程
- 代码体现:
EventLoop loop; // 每个线程仅创建一个EventLoop
TcpServer server(&loop, InetAddress(8888), "EchoServer");
快速上手Muduo库?
我们以⼀个简单TCP翻译服务器的实现来驱动对Muduo库的学习,在这个demo中我们将实现:
- TCP翻译服务端:
- 接受客户端单词请求,返回中文翻译
- 基于Muduo库实现高并发网络通信
- 内存单词查询:
- 使用哈希表存储单词-翻译键值对
- 简单的查询逻辑:存在则返回翻译,否则提示未找到
通过这个demo,我们能够迅速熟悉Muduo库的使用流程。
核心类功能速览
类名 | 核心职责 | 类比现实 |
---|---|---|
EventLoop | 事件循环中枢,驱动IO事件 | 交通调度中心 |
TcpServer | 服务端入口,负责监听端口与连接 | 银行总服务台 |
TcpClient | 客户端入口,负责管理与服务端的连接 | ATM终端机 |
TcpConnection | 封装TCP连接状态与数据流 | 银行服务窗口 |
Buffer | 非阻塞IO的读写缓冲区管理 | 数据传输快递箱 |
服务端核心接口详解
1. 服务初始化与启动
代码示例:
// 创建EventLoop与TcpServer
EventLoop loop;
TcpServer server(&loop, InetAddress(8088), "TranslateServer");
// 设置回调函数
server.setConnectionCallback(onConnection);
server.setMessageCallback(onMessage);
// 启动服务
server.start();
loop.loop();
2. 关键回调接口
回调类型 | 触发时机 | 典型应用场景 |
---|---|---|
ConnectionCallback | 连接建立/断开时触发 | 资源初始化、日志记录 |
MessageCallback | 收到客户端数据时触发 | 业务逻辑处理、协议解析 |
EventLoopThread loopThread; // 独立IO线程,防止因事件阻塞
TcpClient client(loopThread.startLoop(),
InetAddress("127.0.0.1", 8088), "TranslateClient");
client.connect(); // 发起连接
client.connection()->send("hello"); // 发送数据
2. 线程同步工具 CountDownLatch
代码示例:
// 创建同步器(初始计数1)
CountDownLatch latch(1);
// 连接成功后减少计数
void onConnection(const TcpConnectionPtr& conn) {
if (conn->connected()) latch.countDown();
}
// 主线程等待连接建立
latch.wait(); // 阻塞直到计数为0
为什么需要线程同步工具?
这其实是因为Muduo库的TcpClient::connect()是异步操作,仅负责发起连接请求,请求完成后会立即返回,实际连接过程由IO异步线程完成,因此如果调用connect之后就立即向连接发送数据,一旦conn还没初始化完成,就会导致崩溃。而同步工具的作用实际上就是:阻塞等待连接建立,防止主线程在连接建立前发送数据。
接下来是服务器和客户端的实现代码:
TranslateServer.cpp:
#include <string>
#include <iostream>
#include <unordered_map>
#include <functional>
using namespace std;
#include "include/muduo/net/TcpServer.h"
#include "include/muduo/net/EventLoop.h"
#include "include/muduo/net/TcpConnection.h"
// 词典服务器类
// 启动服务器后进行事件监控->建立连接/关闭连接或收到消息时,调用设置的回调函数进行响应
class TranslateServer
{
public:
TranslateServer(int port): _server(&_baseloop,
muduo::net::InetAddress("0.0.0.0", port),
"TranslateServer", muduo::net::TcpServer::Option::kReusePort)
{
// 将成员函数设置作为服务器的回调
_server.setConnectionCallback(std::bind(&TranslateServer::onConnectionCallBack, this, std::placeholders::_1));
_server.setMessageCallback(std::bind(&TranslateServer::onMessageCallBack, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
}
// 启动服务器
void Start()
{
_server.start(); // 开始事件监听
_baseloop.loop(); // 开始事件监控,死循环
}
private:
// 建立连接成功/关闭连接时的回调
void onConnectionCallBack(const muduo::net::TcpConnectionPtr& conn)
{
if(conn->connected() == true)
{
std::cout << "建立新连接成功" << std::endl;
}
else
{
std::cout << "关闭连接成功" << std::endl;
}
}
string translate(const string& str)
{
static unordered_map<string, string> map = {
{"hello", "你好"},
{"string", "字符串"},
{"wangshu", "大帅b"}
};
if(map.find(str) == map.end())
{
return "找不到捏";
}
return map[str];
}
// 通信连接收到请求时的回调
void onMessageCallBack(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buffer, muduo::Timestamp)
{
// 从buffer中取出请求数据
string req = buffer->retrieveAllAsString();
// 业务处理逻辑
string resp = translate(req);
// 将响应返回到连接
conn->send(resp);
}
private:
muduo::net::EventLoop _baseloop;
muduo::net::TcpServer _server;
};
int main()
{
TranslateServer server(8088);
server.Start();
return 0;
}
TranslateClient.cpp:
#include "include/muduo/net/TcpClient.h"
#include "include/muduo/net/EventLoopThread.h"
#include "include/muduo/net/TcpConnection.h"
#include "include/muduo/base/CountDownLatch.h"
#include <iostream>
#include <functional>
class TranslateClient {
public:
TranslateClient(const std::string &sip, int sport):_latch(1),
_client(_loopthread.startLoop(), muduo::net::InetAddress(sip, sport), "TranslateClient"){
_client.setConnectionCallback(std::bind(&TranslateClient::onConnection, this, std::placeholders::_1));
_client.setMessageCallback(std::bind(&TranslateClient::onMessage, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
}
//连接服务器---需要阻塞等待连接建立成功之后再返回
void connect() {
_client.connect();
_latch.wait();//阻塞等待,直到连接建立成功
}
bool send(const std::string &msg) {
if (_conn->connected()) {//连接状态正常,再发送,否则就返回false
_conn->send(msg);
return true;
}
return false;
}
private:
//连接建立成功时候的回调函数,连接建立成功后,唤醒上边的阻塞
void onConnection(const muduo::net::TcpConnectionPtr&conn){
if (conn->connected()) {
_latch.countDown();//唤醒主线程中的阻塞
_conn = conn;
}else {
//连接关闭时的操作
_conn.reset();
}
}
//收到消息时候的回调函数
void onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buf, muduo::Timestamp) {
std::cout << "翻译结果:" << buf->retrieveAllAsString() << std::endl;
}
private:
muduo::CountDownLatch _latch;
muduo::net::EventLoopThread _loopthread;
muduo::net::TcpClient _client;
muduo::net::TcpConnectionPtr _conn;
};
int main()
{
TranslateClient client("127.0.0.1", 8088);
client.connect();
while(1) {
std::string buf;
std::cin >> buf;
client.send(buf);
}
return 0;
}
Makefile:
all: server client
server: TranslateServer.cpp
g++ $^ -o $@ -std=c++11 -I./include -L./lib -lmuduo_net -lmuduo_base -lpthread
client: TranslatClient.cpp
g++ $^ -o $@ -std=c++11 -I./include -L./lib -lmuduo_net -lmuduo_base -lpthread
实现基于Protobuf的通信
在完成了上一个demo后,我们对使用Muduo库编写服务器和客户端有了基本的了解,但是那段代码有个问题,那就是没有进行序列化与反序列化,这会导致:
- 无消息边界,无法区分连续发送的多个请求
- 扩展麻烦,在添加新服务时需要使用条件判断语句
- 类型安全性缺失,字段错误(如数字传为字母)只能在运行时发现
所以我们接下来要引入Protobuf这个序列化协议,并对之前的代码进行一些改进,编写一个支持翻译和计算的服务器+客户端demo,我们将对原来的demo进行一下改进:
改进维度 | 原始版本 | 改进版本 | 核心优势 |
---|---|---|---|
协议设计 | 纯文本传输(send("hello") ) | 使用Protobuf定义结构化协议(TranslateRequest 等) | 支持复杂数据类型,协议可扩展性强 |
消息边界处理 | 直接读取缓冲区(存在粘包问题) | 通过ProtobufCodec 自动处理4字节长度头 | 彻底解决TCP粘包问题,保证消息完整性 |
多业务支持 | 仅支持翻译服务 | 新增加法服务,通过ProtobufDispatcher 路由不同消息类型 | 业务扩展只需添加proto定义和回调函数 |
接口设计 | 手动拼接字符串发送(conn->send("hello") ) | 类型安全接口(如Add(13,14) 、translate("wangshu") ) | 减少编码错误,提升开发效率 |
错误处理 | 无未知消息处理 | OnUnknownMessage 自动拦截非法消息并关闭连接 | 增强系统健壮性,防止协议攻击 |
通过这个改进后的demo,我们能进一步理解Muduo库的工作。
应用层通信协议设计
syntax = "proto3";
package wangshu;
message TranslateRequest
{
string msg = 1;
};
message TranslateResponse
{
string msg = 1;
};
message AddRequest
{
int32 num1 = 1;
int32 num2 = 2;
};
message AddResponse
{
int32 result = 1;
};
服务端实现解析
1. 核心组件与职责
组件 | 作用 |
---|---|
TcpServer | 管理监听端口和客户端连接,处理新连接建立事件 |
ProtobufDispatcher | 根据 Protobuf 消息类型分发到对应的处理函数 |
ProtobufCodec | 封装 Protobuf 的编解码逻辑,处理消息边界(长度前缀)和粘包问题 |
EventLoop | 事件循环驱动,监听网络事件并触发回调函数 |
2. 接口要点
TcpServer(Muduo数据库)
- 事件循环(EventLoop)驱动
- 设置连接回调(ConnectionCallback)与消息回调(MessageCallback),在连接/消息就绪时进行处理
ProtobufDispatcher
- 关键机制:
- registerMessageCallback<T>():注册具体消息类型的回调
- 借助Protobuf为我们维护的Descriptor结构体指针,可以获取对应类的类型名称和各项成员名称,建立起消息类型和对应处理函数的映射,从而可以实现根据不同的消息类型使用不同的业务处理函数进行处理
ProtobufCodec
- 关键方法:
- send:序列化消息并添加长度头信息发送
- onMessage:解析消息头长度,反序列化Protobuf对象
ProtobufCodec通过长度前缀解决粘包问题:
EventLoop
- 核心职责:在Muduo库中,EventLoop是事件驱动架构的核心,负责驱动整个网络通信流程,其主要职责包括:
- 监听IO事件:通过epoll监听socket的读就绪/写就绪事件
- 分发事件回调:触发绑定的ConnectionCallback(连接建立/关闭)和MessageCallback(消息到达)等
- 线程任务队列:处理跨线程提交的任务(通过
runInLoop
实现线程安全)
3. 服务端核心流程
- 注册Dispatcher注册加法和翻译的请求回调处理函数(在Server构造时进行)
// 注册TranslateRequest处理器
_dispatcher.registerMessageCallback<wangshu::TranslateRequest>(
std::bind(&Server::OnTranslate, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
// 注册AddRequest处理器
_dispatcher.registerMessageCallback<wangshu::AddRequest>(
std::bind(&Server::OnAdd, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
- 启动TcpServer监听端口
void Start() {
_server.start(); // 启动TcpServer,开始监听端口
_baseloop.loop(); // 进入事件循环,开始处理连接和消息
}
- 连接建立时记录日志
void Onconnection(const ::muduo::net::TcpConnectionPtr &conn) {
if (conn->connected()) {
LOG_INFO << "新连接建立成功"; // 连接成功日志
} else {
LOG_INFO << "连接即将关闭"; // 连接关闭日志
}
}
- 收到消息时进行业务处理,并通过ProtobufCodec返回响应的消息处理全流程:
1. 消息解码(通过codec)
// 在Server构造函数中设置消息回调
_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage,
&_codec, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
作用:消息到来时,ProtobufCodec::onMessage
将自动解析消息头和反序列化 Protobuf 对象。
2. 消息路由(通过dispatcher)
// ProtobufDispatcher初始化时绑定到Codec
_codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))
作用:codec反序列化完成得到了消息后,就能通过分发器来根据不同的消息类型决定回调哪个消息处理函数。
3. 业务处理(翻译和加法)
// 翻译处理
void OnTranslate(const TcpConnectionPtr &conn, const TransRequestPtr &message, Timestamp) {
string resp_msg = translate(message->msg()); // 查询内存字典
wangshu::TranslateResponse resp;
resp.set_msg(resp_msg);
_codec.send(conn, resp); // 通过Codec发送响应
}
// 加法处理
void OnAdd(const TcpConnectionPtr &conn, const AddRequestPtr &message, Timestamp) {
int result = message->num1() + message->num2();
wangshu::AddResponse resp;
resp.set_result(result);
_codec.send(conn, resp); // 通过Codec发送响应
}
作用:进行业务处理,如果codec收到的是翻译请求,就调用OnTranslate;如果是加法请求,就调用OnAdd。
4. 响应发送
_codec.send(conn, resp); // 在OnTranslate/OnAdd中调用
作用:
- 序列化Protobuf响应对象
- 添加消息长度字段
- 使用TCP连接发送响应
protobuf_server完整代码:
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
#include "request.pb.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"
#include <iostream>
#include <unordered_map>
#include <functional>
class Server
{
public:
typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
typedef std::shared_ptr<wangshu::TranslateRequest> TransRequestPtr;
typedef std::shared_ptr<wangshu::AddRequest> AddRequestPtr;
typedef std::shared_ptr<wangshu::AddResponse> AddResponsePtr;
Server(int port) : _server(&_baseloop,
muduo::net::InetAddress("0.0.0.0", port),
"Server", muduo::net::TcpServer::Option::kReusePort),
_dispatcher(std::bind(&Server::OnUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),
_codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))
{
// 注册业务请求处理函数
_dispatcher.registerMessageCallback<wangshu::TranslateRequest>(std::bind(&Server::OnTranslate,
this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.registerMessageCallback<wangshu::AddRequest>(std::bind(&Server::OnAdd,
this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
// 设置消息回调处理函数
_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage,
&_codec, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
// 设置连接回调处理函数
_server.setConnectionCallback(std::bind(&Server::Onconnection, this, std::placeholders::_1));
}
void Start()
{
_server.start();
_baseloop.loop();
}
private:
std::string translate(const std::string &str)
{
static std::unordered_map<std::string, std::string> map = {
{"hello", "你好"},
{"string", "字符串"},
{"wangshu", "大帅b"}};
if (map.find(str) == map.end())
{
return "找不到捏";
}
return map[str];
}
void OnTranslate(const muduo::net::TcpConnectionPtr &conn, const TransRequestPtr &message, muduo::Timestamp)
{
// 提取有效信息
std::string msg = message->msg();
// 业务处理,进行翻译
std::string resp_msg = translate(msg);
// 组织protobuf响应
wangshu::TranslateResponse resp;
resp.set_msg(resp_msg);
// 发送响应
_codec.send(conn, resp);
}
void OnAdd(const muduo::net::TcpConnectionPtr &conn, const AddRequestPtr &message, muduo::Timestamp)
{
int num1 = message->num1();
int num2 = message->num2();
int result = num1 + num2;
wangshu::AddResponse resp;
resp.set_result(result);
_codec.send(conn, resp);
}
void Onconnection(const ::muduo::net::TcpConnectionPtr &conn)
{
if (conn->connected())
{
LOG_INFO << "新连接建立成功";
}
else
{
LOG_INFO << "连接即将关闭";
}
}
void OnUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp)
{
LOG_INFO << "On Unknown Message" << message->GetTypeName();
conn->shutdown();
}
private:
muduo::net::EventLoop _baseloop;
muduo::net::TcpServer _server; // 服务器对象
ProtobufDispatcher _dispatcher; // 请求分发器对象 ---注册请求处理函数
ProtobufCodec _codec; // protobuf协议处理对象 ---针对收到的数据进行protobuf协议处理
};
int main()
{
Server svr(8088);
svr.Start();
return 0;
}
客户端实现解析
1. 核心组件和职责
组件 | 作用 |
---|---|
TcpClient | 管理与服务端的连接,处理连接建立、断开及数据收发 |
ProtobufDispatcher | 根据服务端返回的响应类型(如 TranslateResponse /AddResponse )分发到对应处理函数 |
ProtobufCodec | 序列化请求、反序列化响应,处理消息边界(长度前缀) |
EventLoopThread | 在独立线程中运行事件循环,实现客户端异步通信 |
CountDownLatch | 同步连接建立过程,确保连接成功后再发送请求 |
2. 接口要点
TcpClient(Muduo库)
- 事件循环(EventLoop)驱动
- 设置连接回调(ConnectionCallback)与消息回调(MessageCallback),在连接/消息就绪时进行处理
ProtobufDispatcher
- 响应路由机制:
- 通过
T::descriptor()
建立响应类型与处理函数的映射 - 自动将反序列化后的响应对象路由到对应的消息处理回调函数
- 通过
// 注册 TranslateResponse 处理器
_dispatcher.registerMessageCallback<wangshu::TranslateResponse>(
std::bind(&Client::OnTranslate, this, _1, _2, _3));
// 注册 AddResponse 处理器
_dispatcher.registerMessageCallback<wangshu::AddResponse>(
std::bind(&Client::OnAdd, this, _1, _2, _3));
ProtobufCodec
- 关键方法:
- send:序列化消息并添加长度头信息发送
- onMessage:解析消息头长度,反序列化Protobuf对象
EventLoopThread
- 异步通信的核心:网络IO操作(连接,数据收发)都在后台线程进行
muduo::net::EventLoopThread _loopthread; // 内部启动独立线程运行 EventLoop
_client(_loopthread.startLoop(), ...); // TcpClient 绑定到该 EventLoop
CountDownLatch
- 实现连接同步控制:客户端要阻塞直到连接建立完成才能发送数据
void connect() {
_client.connect(); // 异步连接
_latch.wait(); // 阻塞至连接成功
}
// 连接成功回调中解除阻塞
void OnConnection(...) {
_latch.countDown();
}
3. 客户端核心流程
-
初始化 Dispatcher 注册响应回调:
Client::Client(...) {
// 绑定 TranslateResponse 处理器
_dispatcher.registerMessageCallback<wangshu::TranslateResponse>(
std::bind(&Client::OnTranslate, this, _1, _2, _3));
// 绑定 AddResponse 处理器
_dispatcher.registerMessageCallback<wangshu::AddResponse>(
std::bind(&Client::OnAdd, this, _1, _2, _3));
}
作用:建立响应类型与处理函数的映射关系
- 启动异步连接:
void connect() {
_client.connect(); // 触发异步连接操作
_latch.wait(); // 阻塞等待连接成功
}
流程:TcpClient在后台线程发起TCP连接,连接建立成功时,触发OnConnection回调并解除阻塞
- 处理收到的响应的全流程:
1. 消息解码
// 设置消息回调:原始数据 → ProtobufCodec
_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage,
&_codec, _1, _2, _3));
作用:将接收到的字节流交给 ProtobufCodec
解析。
2. 消息路由
// Codec 反序列化后提交给 Dispatcher
_codec(std::bind(&ProtobufDispatcher::onProtobufMessage,
&_dispatcher, _1, _2, _3));
作用:根据响应类型调用对应的 OnTranslate
或 OnAdd
。
3. 业务处理
// 处理翻译响应
void OnTranslate(...) {
std::cout << "翻译结果:" << message->msg() << std::endl;
}
// 处理加法响应
void OnAdd(...) {
std::cout << "加法结果:" << message->result() << std::endl;
}
特点:直接操作反序列化的 Protobuf 对象,无手动解析开销。
protobuf_client完整代码:
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
#include "request.pb.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoopThread.h"
#include "muduo/net/TcpClient.h"
#include "muduo/base/CountDownLatch.h"
#include <functional>
class Client
{
public:
typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
typedef std::shared_ptr<wangshu::TranslateResponse> TransResponsePtr;
typedef std::shared_ptr<wangshu::AddResponse> AddResponsePtr;
Client(const std::string& serverip, int serverport): _latch(1), _client(_loopthread.startLoop(),
muduo::net::InetAddress(serverip, serverport), "Client"),
_dispatcher(std::bind(&Client::OnUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),
_codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))
{
// 注册业务请求处理函数
_dispatcher.registerMessageCallback<wangshu::TranslateResponse>(std::bind(&Client::OnTranslate,
this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.registerMessageCallback<wangshu::AddResponse>(std::bind(&Client::OnAdd,
this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
// 设置消息回调处理函数
_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage,
&_codec, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
// 设置连接回调处理函数
_client.setConnectionCallback(std::bind(&Client::Onconnection, this, std::placeholders::_1));
}
void Add(int num1, int num2)
{
wangshu::AddRequest req;
req.set_num1(num1);
req.set_num2(num2);
send(&req);
}
void translate(const std::string& msg)
{
wangshu::TranslateRequest req;
req.set_msg(msg);
send(&req);
}
void connect()
{
_client.connect();
_latch.wait();
}
private:
void OnUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp)
{
LOG_INFO << "onUnknownMessage: " << message->GetTypeName();
}
void OnTranslate(const muduo::net::TcpConnectionPtr &conn, const TransResponsePtr &message, muduo::Timestamp)
{
std::cout << "翻译结果:" << message->msg() << std::endl;
}
void OnAdd(const muduo::net::TcpConnectionPtr &conn, const AddResponsePtr &message, muduo::Timestamp)
{
std::cout << "加法结果:" << message->result() << std::endl;
}
void Onconnection(const ::muduo::net::TcpConnectionPtr &conn)
{
if(conn->connected())
{
_latch.countDown();
_conn = conn;
}
else
{
_conn.reset();
}
}
bool send(const google::protobuf::Message* msg) {
if (_conn->connected()) {//连接状态正常,再发送,否则就返回false
_codec.send(_conn, *msg);
return true;
}
return false;
}
private:
muduo::CountDownLatch _latch;
muduo::net::EventLoopThread _loopthread;
muduo::net::TcpClient _client;
ProtobufCodec _codec;
ProtobufDispatcher _dispatcher;
muduo::net::TcpConnectionPtr _conn;
};
int main()
{
Client client("127.0.0.1", 8088);
client.connect();
client.Add(13, 14);
client.translate("wangshu");
sleep(1);
return 0;
}
对demo简单总结
尽管本demo处理的是非常简单的业务,但由于解耦合做得好,所以只要我们想,可以非常轻易的换成别的业务,所以说咱们花这么大篇幅谈这个demo是有原因的,之后项目实现中的网络通信模块可以非常低成本地复用这个demo。
简单总结下这个demo的优点?
- 基于Protobuf的高效协议设计:通过Protobuf定义请求/响应消息,并使用Muduo库中实现的ProtobufCodec进行编解码,实现二进制高效序列化。
- 灵活的消息分发机制:通过ProtobufDispatcher实现类型安全的消息路由,支持动态注册处理函数。
- 事件驱动网络模型:基于Muduo的EventLoop实现非阻塞IO+多路复用,单线程即可处理高并发连接,客户端和服务端均具备异步事件响应能力。
SQLite3
什么是SQLite?
SQLite是一个进程内的轻量级数据库,实现了一个自给自足、无服务器、零配置、事务性的SQL数据库引擎。SQLite不是一个独立的进程,可以按应用程序需求进行静态或动态链接,SQLite直接访问其存储的文件。
为什么本项目使用SQLite?
首先明确需求,我们要做的项目实际上是基于AMQP协议的轻量级消息队列,需要实现消息持久化、元数据存储和事务支持,而SQLite正好可以满足我们的需求:
消息队列需求 | SQLite 解决方案 |
---|---|
消息持久化 | 消息内容及元数据存储于本地文件,重启后不丢失 |
元数据管理 | 使用表结构管理队列(Queues)、交换机(Exchanges)、绑定(Bindings)等 AMQP 实体 |
事务一致性 | 通过 ACID 事务确保消息入队、确认(ACK)、状态更新的原子性 |
轻量级部署 | 无需独立数据库服务,作为进程内组件嵌入消息队列,降低依赖复杂度 |
SQLite的API封装
为了方便在项目中对消息队列的元数据进行持久化存储,我们先把SQLite提供的C++的API封装成一个SQLiteHelper类,后续通过这个类来与SQLite进行交互。
1. 类结构设计:核心成员与职责
class SqliteHelper {
private:
std::string _dbfile; // 数据库文件路径
sqlite3* _handler; // 数据库连接句柄(核心操作对象)
public:
typedef int(*SqliteCallback)(void*, int, char**, char**); // 回调函数类型
SqliteHelper(const std::string &dbfile); // 构造函数
bool open(int safeLevel = SQLITE_OPEN_FULLMUTEX); // 连接数据库
bool exec(const std::string& sql, SqliteCallback cb, void* arg); // 执行SQL
void close(); // 关闭连接
};
核心设计思想:通过 RAII(资源获取即初始化)模式管理数据库生命周期,_handler
在 open()
时初始化,close()
释放资源。
2. 关键方法解析
2.1 构造函数:路径绑定
SqliteHelper(const std::string &dbfile)
: _dbfile(dbfile), _handler(nullptr) {}
作用:存储数据库文件的路径,初始化连接句柄,避免野指针问题
注意:此时未真正打开数据库文件,在open后才真正打开
2.2 打开数据库:安全与并发控制
bool open(int safeLevel = SQLITE_OPEN_FULLMUTEX) {
int ret = sqlite3_open_v2(
_dbfile.c_str(),
&_handler,
SQLITE_OPEN_CREATE | SQLITE_OPEN_READWRITE | safeLevel,
NULL
);
// 错误处理...
}
参数解析:
SQLITE_OPEN_CREATE
:不存在时自动创建SQLITE_OPEN_READWRITE
:读写模式safeLevel
:线程安全级别(默认全互斥锁)
2.3 执行SQL:灵活性与回调机制
bool exec(const std::string& sql, SqliteCallback cb, void* arg) {
int ret = sqlite3_exec(
_handler,
sql.c_str(),
cb, // 回调函数指针
arg, // 传递给回调的额外参数
nullptr // 错误信息存储(未使用)
);
// 错误处理...
}
- 回调参数设计
typedef int(*SqliteCallback)(void* arg, int colNum, char** colValues, char** colNames);
参数:
arg
:用户自定义数据(如类实例指针)colNum
:结果列数colValues
:当前行各列的值colNames
:列名数组
返回值:非0值则终止查询
2.4 关闭连接:资源释放
void close() {
if(_handler)
sqlite3_close_v2(_handler); // 使用v2版本确保完全关闭
}
SQLiteHelper.hpp:
/*
封装实现一个SqliteHelper类,提供简单的SQLite数据库操作接口,完成数据的增删查改操作
1. 创建/打开数据库文件
2. 针对打开的数据库执行擦欧总
1. 对表的操作
2. 对数据的操作
3. 关闭数据库
*/
#include <sqlite3.h>
#include <string>
#include <iostream>
class SqliteHelper
{
public:
/* void*: 设置的在回调时传入的arg参数
int: 数据的列数
char**: 存储一行数据的字符指针数组
char**: 每一列的字段名称
回调函数的返回值在成功处理时返回0,返回非0则退出程序 */
typedef int(*SqliteCallback)(void*, int, char**, char**);
SqliteHelper(const std::string &dbfile) : _dbfile(dbfile), _handler(nullptr) // 数据库句柄是在打开后得到的
{}
bool open(int safeLevel = SQLITE_OPEN_FULLMUTEX)
{
// int sqlite3_open_v2(const char *filename, sqlite3 **ppDb, int flags, constchar *zVfs ); // ppDb是输入输出型参数
int ret = sqlite3_open_v2(_dbfile.c_str(), &_handler, SQLITE_OPEN_CREATE | SQLITE_OPEN_READWRITE | safeLevel, NULL);
if(ret != SQLITE_OK)
{
std::cout << "创建/打开SQLite数据库失败:";
std::cout << sqlite3_errmsg(_handler) << std::endl;
return false;
}
return true;
}
bool exec(const std::string& sql, SqliteCallback cb, void* arg)
{
// int sqlite3_exec(sqlite3*, char *sql, int (*callback)(void*,int,char**,char**),void* arg, char **err)
int ret = sqlite3_exec(_handler, sql.c_str(), cb, arg, nullptr);
if(ret != SQLITE_OK)
{
std::cout << "执行语句失败:";
std::cout << sqlite3_errmsg(_handler) << std::endl;
return false;
}
return true;
}
void close()
{
if(_handler) sqlite3_close_v2(_handler);
}
private:
std::string _dbfile;
sqlite3* _handler;
};
GTest
什么是GTest?
GTest(Google Test)是Google开发的一块开源C++单元测试框架,为编写高质量、可维护的单元测试设计。支持跨平台(如Windows、Linux、macOS),提供丰富的功能来简化测试编写、执行和结果分析,是C++开发者广泛使用的测试工具。
为什么选择GTest?
优点 | 说明 |
---|---|
效率提升 | 通过 TEST /TEST_F 宏、参数化测试(TEST_P )减少重复代码,快速生成高覆盖率的测试用例。 |
维护性增强 | 利用 测试夹具(Fixture) 复用初始化逻辑,参数化测试适应多场景输入,代码变更时测试易同步更新。 |
问题定位快 | 提供 30+ 断言宏(如 EXPECT_EQ 、ASSERT_THROW )及详细错误日志,精准定位失败原因。 |
团队协作易 | 标准化测试结构,统一断言风格和报告格式,降低新人上手成本,促进团队协作。 |
快速上手GTest
1. TEST与TEST_F宏
作用:定义测试用例,组织测试的逻辑
区别:
- TEST:定义独立测试,无需共享初始化/清理代码
- TEST_F:基于测试夹具,复用了SetUp和TearDown方法
什么是测试夹具?
夹具是 GTest 中用于 管理测试用例共享资源 的核心机制。它通过继承 testing::Test
类,允许在多个测试用例中复用初始化和清理代码,确保每个用例在独立且一致的环境中运行。
- 核心作用:
- 复用代码:避免在每个测试用例中重复编写初始化(如创建对象、打开文件)和清理(如释放内存、关闭连接)逻辑。
- 隔离测试:确保每个测试用例使用独立的资源实例,防止测试间的数据污染。
代码:
#include <gtest/gtest.h>
#include <unordered_map>
// 使用TEST宏进行简单测试
TEST(MathTest, Add)
{
EXPECT_EQ(1 + 1, 2); // 验证加法
}
// 定义测试夹具
class HashTestEnv : public testing::Test
{
public:
static void SetUpTestCase()
{
std::cout << "第一个用例执行前被调用,进行总体环境配置\n";
}
static void TearDownTestCase()
{
std::cout << "最后一个用例执行后被调用,进行总体环境清理\n";
}
virtual void SetUp() override
{
_myMap.insert(std::make_pair("hello", "你好"));
_myMap.insert(std::make_pair("world", "世界"));
}
virtual void TearDown() override
{
_myMap.clear();
}
public:
std::unordered_map<std::string, std::string> _myMap;
};
TEST_F(HashTestEnv, insert_test)
{
_myMap.insert(std::make_pair("wangshu", "帅哥"));
ASSERT_EQ(_myMap.size(), 3);
}
TEST_F(HashTestEnv, erase_test)
{
EXPECT_EQ(_myMap.size(), 2);
_myMap.erase("hello");
ASSERT_EQ(_myMap.size(), 1);
}
int main(int argc, char* argv[])
{
testing::InitGoogleTest(&argc, argv); // 解析参数(如 --gtest_filter)
return RUN_ALL_TESTS(); // 运行测试并返回失败数(0 表示全部通过)
}
运行结果:
2. 断言宏
作用:验证代码的行为和预期是否相符
分类:
- ASSERTT_系列:失败时立即终止当前测试函数
- EXPECT_系列:失败时继续执行后续代码
常用断言示例:
TEST(AssertTest, Examples) {
int a = 5, b = 10;
// 布尔检查
EXPECT_TRUE(a > 0); // a 必须 > 0
ASSERT_FALSE(b == 0); // b 不能为 0(失败则终止)
// 数值比较
EXPECT_EQ(a + b, 15); // 验证和
ASSERT_NE(a, b); // 验证不相等
// 字符串比较
std::string s1 = "hello", s2 = "HELLO";
EXPECT_STREQ(s1.c_str(), "hello"); // 严格相等
EXPECT_STRCASEEQ(s1.c_str(), s2.c_str()); // 忽略大小写比较
}
3. 事件机制
作用:管理测试的初始化与清理逻辑,支持不同粒度的控制
- 测试用例级别事件:
- SetUp:在每个TEST_F测试用例前自动调用
- TearDown:在每个TEST_F测试用力后自动调用
- 测试套件级别事件:
- SetUpTestSuite:整个测试套件(一组
TEST_F
)前调用一次。 - TearDownSuite:整个测试套件后调用一次。
- SetUpTestSuite:整个测试套件(一组
- 全局事件:
- 继承
testing::Environment
类,实现全局初始化和清理。
- 继承
class GlobalEnv : public testing::Environment {
public:
void SetUp() override {
std::cout << "全局初始化(如启动日志服务)" << std::endl;
}
void TearDown() override {
std::cout << "全局清理(如关闭网络连接)" << std::endl;
}
};
int main(int argc, char** argv) {
testing::AddGlobalTestEnvironment(new GlobalEnv); // 注册全局事件
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
C++11异步操作
异步操作简介
C++11引入了强大的异步编程支持,通过future头文件中的工具(如std::future
、std::async
、std::packaged_task
、std::promise
),开发者可以简化多线程编程,高效管理异步任务,异步操作的核心组件如下:
- std::future:表示异步操作的结果,提供阻塞等待结果(get)和查询状态(valid、wait)的方法
- std::async:创建异步任务并返回std::future,支持延迟执行或立即启动线程
- std::packaged_task:将函数封装为可调用对象,并与std::future绑定,便于手动控制任务执行
- std::promise:手动设置异步操作的结果,可以与std::future配合,实现灵活的任务控制
应用场景
场景 | 说明 |
---|---|
耗时操作 | 交由后台线程执行文件I/O、网络请求、复杂计算,避免阻塞主线程 |
并发控制 | 多任务并行执行,通过 std::future 等待结果,实现线程间同步。 |
结果延迟获取 | 先提交任务,在需要时通过 future.get() 阻塞获取结果。 |
核心用法与示例
1. 使用std::async
创建异步任务:
#include <iostream>
#include <thread>
#include <future>
#include <chrono>
// async接口的学习使用
int Add(int num1, int num2)
{
std::cout << "加法!\n";
return num1 + num2;
}
int main()
{
std::cout << "-------1---------\n";
// std::future<int> fut = std::async(std::launch::async, Add, 11, 22); // 选择异步策略时,异步线程会立即执行函数
std::future<int> fut = std::async(std::launch::deferred, Add, 11, 22); // 选择延迟策略时,异步线程会等到调用get时才执行函数
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "-------2---------\n";
std::cout << "获取结果!\n";
int ret = fut.get();
std::cout << "-------3---------\n";
std::cout << ret << std::endl;
return 0;
}
当异步任务设置为立即执行时,结果为:
当异步任务策略设置为延迟执行时,结果为:
这是因为设置为延迟执行时,会在用户调用get获取结果时执行任务
2. 使用 std::packaged_task
封装任务
std::packaged_task是C++11提供的模板类,用于将函数或可调用对象封装为一个异步任务,其核心价值在于:
- 绑定任务与结果:通过
get_future()
方法获取std::future
对象,用于异步获取任务结果。 - 延迟执行:任务不会立即执行,而是由开发者决定何时、何地执行。
简单的例子:
int Add(int a, int b) {
return a + b;
}
int main() {
// 封装 Add 函数为一个 packaged_task
std::packaged_task<int(int, int)> task(Add);
// 获取 future 对象
std::future<int> fut = task.get_future();
// 执行任务(同步)
task(10, 20); // 调用 Add(10, 20)
// 获取结果
std::cout << "结果: " << fut.get() << std::endl; // 输出 30
return 0;
}
但是这样使用是没有意义的,直接调用会阻塞当前线程,失去了异步的意义,所以我们的想法是:将packaged_task封装的异步任务交给线程执行,让我们进行尝试:
2.1 直接传递的尝试:
std::packaged_task<int(int, int)> task(Add);
std::future<int> fut = task.get_future();
// 尝试将 task 传递给线程
std::thread thr(task, 10, 20); // 编译错误!
编译的时候报错了:
报错原因:std::packaged_task
不可拷贝(禁用了拷贝构造函数),而 std::thread
的参数默认以值方式传递,导致编译失败。
为什么packaged_task禁止拷贝?
- 设计原因:
packaged_task
内部管理任务状态(如是否已执行、结果存储等),拷贝可能导致状态混乱。 - 解决方法:使用移动语义或指针传递。
2.2 智能指针+Lambda捕获
使用智能指针管理任务:通过 std::shared_ptr
管理 packaged_task
,确保其生命周期覆盖线程执行期间:
auto ptask = std::make_shared<std::packaged_task<int(int, int)>>(Add);
通过 Lambda 捕获指针并执行:
std::thread thr([ptask]() {
(*ptask)(10, 20); // 解引用并执行任务
});
- Lambda 捕获:以值方式捕获
ptask
,增加引用计数,避免悬垂指针。 - 解引用执行:
(*ptask)
调用operator()
,执行Add(10, 20)
。
3. 使用std::promise手动设置结果
std::future
确实可以用于获取异步任务的结果,但它通常需要与 std::async
或 std::packaged_task
结合使用,而 std::promise
提供了更底层、更灵活的控制机制:
特性 | std::future + std::async | std::promise + std::future |
---|---|---|
结果来源 | 函数返回值 | 可在任意位置手动设置 |
灵活性 | 依赖函数返回值,简单但受限 | 完全手动控制,适用复杂场景 |
异常传递 | 自动传递函数中抛出的异常 | 可手动设置自定义异常 |
多线程协同 | 单一线程任务 | 多线程协作(如流水线、分阶段任务) |
接下来我们来看一个能体现promise价值的例子:
场景:主线程启动一个后台任务,但结果需要在某个事件(如用户输入)后手动设置
#include <iostream>
#include <future>
#include <thread>
void backgroundTask(std::promise<int> prom) {
std::cout << "后台任务启动,等待事件..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟等待事件
prom.set_value(100); // 事件触发后手动设置结果
}
int main() {
std::promise<int> prom;
std::future<int> fut = prom.get_future();
std::thread t(backgroundTask, std::move(prom));
// 主线程继续处理其他逻辑
std::cout << "主线程正在工作..." << std::endl;
// 等待用户输入(模拟事件触发)
std::cout << "按 Enter 键获取结果..." << std::endl;
std::cin.get();
// 获取结果(阻塞直到后台任务设置值)
std::cout << "结果: " << fut.get() << std::endl;
t.join();
return 0;
}
异步线程池设计
接下来,我将带大家使用C++11的这些异步操作来实现一个异步线程池demo,一方面帮助大家更好地理解异步操作;另一方面,本项目之后也会用到这个异步线程池。
1. 核心组件与职责
组件 | 职责 |
---|---|
线程池(ThreadPool) | 管理线程生命周期,接收并分发任务,确保多线程环境下的任务安全执行。 |
工作线程(worker threads) | 执行任务队列中的任务,循环等待新任务或终止信号。 |
任务队列(_tasks) | 存储用户提交的待执行任务,使用互斥锁保证线程安全。 |
同步机制 | 包括互斥锁(_mutex )和条件变量(_cv ),用于控制线程的休眠与唤醒。 |
停止标志(_stop) | 原子布尔变量,控制线程池的终止流程 |
2. 核心流程
2.1 初始化
- 构造函数启动指定数量的工作线程,每个线程运行
entry
函数,进入任务循环。
2.2 任务提交
- 用户调用Push方法:
- 将用户函数封装为
std::packaged_task
,关联std::future
。 - 任务加入队列,并通过条件变量
notify_one()
唤醒一个线程。
- 将用户函数封装为
2.3 任务执行
- 工作线程循环:
- 等待任务:条件变量
wait()
阻塞,直到任务到来或终止信号。 - 取出任务:加锁后从队列中取任务。
- 执行任务:调用任务函数,通过
future
返回结果。
- 等待任务:条件变量
2.4 终止流程
- 调用
Stop()
设置_stop = true
,唤醒所有线程。 - 线程退出循环,主线程通过
join()
等待所有工作线程结束。
3. 具体实现
#ifndef __M_THREADPOOL_H__
#define __M_THREADPOOL_H__
#include <iostream>
#include <thread>
#include <memory>
#include <functional>
#include <future>
#include <vector>
class ThreadPool
{
public:
using ptr = std::shared_ptr<ThreadPool>;
using Functor = std::function<void(void)>;
ThreadPool(int threadCount = 1) : _stop(false)
{
for (int i = 0; i < threadCount; i++)
{
_threads.emplace_back(&ThreadPool::entry, this);
}
}
~ThreadPool()
{
Stop();
}
void Stop()
{
if (_stop == true)
return;
_stop = true;
_cv.notify_all();
for (auto &thread : _threads)
{
thread.join();
}
}
// 将用户要处理的任务函数封装为异步任务(packaged_task),等待工作线程的处理
// 异步任务处理完成后,通过packaged_task获取future,返回结果给用户
template <typename F, typename... Args>
auto Push(F &&func, Args &&...args) -> std::future<decltype(func(args...))>
{
// 1. 将传入的函数封装成一个packaged_task任务
using return_type = decltype(func(args...));
auto tmp_func = std::bind(std::forward<F>(func), std::forward<Args>(args)...);
auto task = std::make_shared<std::packaged_task<return_type()>>(tmp_func);
std::future<return_type> fu = task->get_future();
// 2. 构造一个lambda匿名函数(捕获任务对象),函数内执行任务对象
{
std::unique_lock<std::mutex> lock(_mutex);
// 3. 将构造出来的匿名函数对象,抛入到任务池中
_tasks.push_back([task]()
{ (*task)(); });
_cv.notify_one();
}
return fu;
}
private:
// 线程入口函数:不断从任务池获取任务进行执行
void entry()
{
while (!_stop)
{
std::vector<Functor> tmp_taskpool;
{
// 加锁
std::unique_lock<std::mutex> lock(_mutex);
// 当任务池不为空/线程池终止时,线程恢复执行
_cv.wait(lock, [this]()
{ return _stop || !_threads.empty(); });
// 取出任务并执行
tmp_taskpool.swap(_tasks);
}
for (auto &task : tmp_taskpool)
{
task();
}
}
}
std::atomic<bool> _stop;
// 互斥锁和条件变量要在线程前定义
std::mutex _mutex;
std::condition_variable _cv;
std::vector<std::thread> _threads;
std::vector<Functor> _tasks;
};
#endif
需求分析
首先,明确一下,我们的项目中应该包含的核心概念有:
- 生产者(Producer)
- 消费者(Consumer)
- 中间人(Broker)
- 发布(Publish)
- 订阅(Subscribe)
单生产者单消费者:
多生产者多消费者:
其中的Broker Server就是我们的项目中要实现的最核心的部分,它的职责是存储和转发数据。正如文章标题,我们的项目是基于AMQP协议实现的简单消息队列,那么问题来了:
AMQP协议是什么?
AMQP(Advanced Message Queue Protocol,高级消息队列协议),是面向消息的中间件通信协议,旨在为分布式系统提供标准化、可靠的消息传递机制,允许生产者和消费者进行解耦的异步通信。
所以为了实现一个简单但是符合标准的消息队列,我们就需要先了解AMQP协议的核心概念。
AMQP的核心概念
虚拟主机 (VirtualHost)
作用:类似于数据库中的namespace,用于在同一个Broker中实现逻辑隔离。
特点:
- 每个VirtualHost拥有独立的交换机、队列、绑定关系、权限等资源。
- 类似MySQL的
database
,不同业务系统可通过VirtualHost隔离(例如:电商订单系统、物流系统各占一个VirtualHost)。 - 权限控制的最小单元,用户需绑定到VirtualHost才能操作其资源。
交换机(Exchange)
角色:消息的路由中心,生产者将消息发送到交换机,而非直接发送到队列。
类型(决定路由规则):
- 直连交换机 (Direct):通过精确匹配
Routing Key
路由消息(例如:订单创建事件路由到订单队列)。 - 扇出交换机 (Fanout):广播模式,忽略
Routing Key
,将消息发送到所有绑定的队列(适用于日志广播)。 - 主题交换机 (Topic):基于通配符(
*
匹配单个词,#
匹配多级)的灵活路由(例如:news.sports.#
匹配所有体育新闻)。 - 头交换机 (Headers):通过消息头(Headers)而非
Routing Key
进行路由,支持复杂匹配条件。
队列(Queue)
角色:消息的存储容器,消费者从中拉取/订阅消息。
关键属性:
- 持久化:队列可配置为持久化(Broker重启后保留)或临时存储。
- 独占队列:仅允许一个消费者连接,常用于临时任务。
- 死信队列 (DLX):处理无法投递的消息(如超时、被拒绝的消息)。
绑定(Binding)
定义:连接交换机与队列的规则,决定了消息如何从交换机路由到队列
绑定键 (Binding Key):
- 在Direct/Topic交换机中,绑定键用于匹配生产者的
Routing Key
。 - 例如:队列绑定到Topic交换机时使用
news.#
,则所有以news.
开头的消息都会被路由。
消息(message)
结构:
- Header:元数据(如优先级、持久化标志)。
- Properties:可选属性(如消息ID、时间戳)。
- Body:二进制负载(通常为JSON、Protobuf等序列化数据)。
生命周期:
- 生产者 → 交换机 → 路由到队列 → 消费者确认(ACK)后删除。
- 若未被确认(NACK),消息可能重新入队或进入死信队列。
AMQP核心流程
- 生产者发布消息→ 发送到指定交换机的Routing Key。
- 交换机根据类型和绑定规则→将消息路由到匹配的队列。
- 消费者订阅队列→通过推(Push)或拉(Pull)模式获取消息。
- 消息确认机制→消费者处理成功后发送ACK,否则触发重传或死信机制。
我们如何实现消息队列?
市面上的消息队列非常多:RabbitMQ、Kafka、RocketMQ、ActiveMQ,而在这之中,RabbitMQ是一个非常知名且功能强大、使用广泛的消息队列,我们接下来仿造RabbitMQ的实现思想,完成一个简化版的消息队列。
模块图设计
以虚拟机为单元管理队列、交换机、绑定信息:
发布者向交换机发布消息,消费者向队列订阅消息:
核心API
对于 Broker 来说, 要实现以下核⼼ API,通过这些 API 来实现消息队列的基本功能:
- 创建交换机(exchangeDeclare)
- 销毁交换机(exchangeDelete)
- 创建队列(queueDeclare)
- 销毁队列(queueDelete)
- 创建绑定(queueBind)
- 解除绑定(queueUnBind)
- 发布消息(basicPublish)
- 确认消息(basicAck)
- 订阅消息(basicConsume)
- 取消订阅(basicCancel)
并且,我们的消息队列项目作为消息中间件,应该能够跨网络为发布者客户端和订阅者客户端提供服务,所以Producer和Consumer能通过网络调用这些API,实现生产者消费者模型。
交换机类型
交换机类型 | 匹配方式 | 典型场景 | 类比 |
---|---|---|---|
Direct | 精确匹配routingKey | 单业务事件路由(如支付成功通知) | 多队列可共享同一暗号 |
Fanout | 广播到所有队列 | 日志广播、多系统同步 | 魔法红包,全员复制 |
Topic | 通配符模式匹配 | 多级分类消息(如新闻、IoT设备数据) | 按模式匹配的“智能红包” |
Header | 消息头键值对筛选 | 复杂条件路由(多属性组合) | 需填写属性表单的定制红包 |
其中Header比较复杂和少见,所以我们的项目只实现前面三种:Direct、Fanout、Topic
持久化
我们需要确保以下组件在程序或主机重启后数据不丢失:
- 交换机(Exchange):名称、类型、参数等元数据。
- 队列(Queue):名称、参数、绑定关系及消息内容。
- 绑定(Binding):交换机与队列的绑定规则(如路由键)。
- 消息(Message):消息内容、元数据(如优先级、过期时间)及状态(是否被消费)。
对于前面三者,我们使用SQLite3进行持久化;对于消息数据,由于消息内容可能较长,我们选择将其存储在磁盘文件中。
网络通信
⽣产者和消费者都是客⼾端程序, Broker 则是作为服务器,通过⽹络进⾏通信。 在⽹络通信的过程中, 客⼾端部分要提供对应的 api, 来实现对服务器的操作,但我们在网络通信时,不直接使用连接,AMQP引入了信道机制,在复用单个TCP连接的基础上,实现逻辑隔离和轻量级并发,从而优化资源利用和通信效率。
什么是信道?
信道(Channel)是TCP连接上的逻辑通道,多个信道共享同一个物理连接,每个信道有自己的独立ID,就像一条独立的车道,消息发布、订阅等操作通过不同的信道隔离执行,互不干扰。
- 创建Connection
- 关闭Connection
- 创建Channel
- 关闭Channel
- 创建交换机(exchangeDeclare)
- 销毁交换机(exchangeDelete)
- 创建队列(queueDeclare)
- 销毁队列(queueDelete)
- 创建绑定(queueBind)
- 解除绑定(queueUnBind)
- 发布消息(basicPublish)
- 确认消息(basicAck)
- 订阅消息(basicConsume)
- 取消订阅(basicCancel)
消息应答
- ⾃动应答: 消费者只要消费了消息, 就算应答完毕了,Broker 直接删除这个消息
- ⼿动应答: 消费者⼿动调⽤应答接⼝, Broker 收到应答请求之后, 才真正删除这个消息
⼿动应答的⽬的是为了保证消息确实被消费者处理成功了. 在⼀些对于数据可靠性要求⾼的场景, ⽐较常⻅。
模块划分
服务器模块
数据管理模块
- 交换机管理:
- 管理信息:名称,类型,是否持久化标志,是否(⽆⼈使⽤时)⾃动删除标志,其他参数
- 管理操作:恢复历史信息,声明,删除,获取,判断是否存在
- 队列管理:
- 管理信息:名称,是否持久化标志,是否独有标志,是否(⽆⼈使⽤时)⾃动删除标志,其他参数
- 管理操作:恢复历史信息,声明,删除,获取,判断是否存在
- 绑定管理:
- 管理信息:交换机名称,队列名称,绑定主题
- 管理操作:恢复历史信息,绑定,解绑,解除交换机关联绑定信息,解除队列关联绑定信息,获取交换机关联绑定信息
- 消息管理
- 管理消息:
- 属性:消息ID, 路由主题,持久化模式标志
- 消息内容
- 有效标志(持久化需要)
- 持久化位置(内存中)
- 持久化消息⻓度(内存中)
- 管理操作:恢复历史信息,向指定队列新增消息,获取指定队列队⾸消息,确认移除消息
- 管理消息:
以上的核心概念数据都需要在硬盘中存储,并且:
- 以内存存储为主,保证快速查找信息进行处理
- 以硬盘存储为辅助,保证服务器重启后,能够恢复历史信息
虚拟机管理模块
什么是虚拟机?
在AMQP协议中,虚拟机是Broker内部的逻辑单元,类似于C++中的命名空间,核心作用是将队列、交换机、绑定信息等资源按逻辑分组。
当需要对队列、交换机、绑定信息、消息进行操作时,通过虚拟机来完成操作,所以虚拟机管理模块是对以上数据模块的整合模块。
- 虚拟机管理信息:
- 交换机数据管理模块句柄
- 队列数据管理模块句柄
- 绑定数据管理模块句柄
- 消息数据管理模块句柄
- 虚拟机对外提供的方法:
- 提供虚拟机内交换机声明,交换机删除操作。
- 提供虚拟机内队列声明,队列删除操作。
- 提供虚拟机内交换机-队列绑定,解除绑定操作。
- 获取交换机相关绑定信息
- 虚拟机管理操作:
- 创建虚拟机
- 查询虚拟机
- 删除虚拟机
交换路由模块
通过前面设计的模块图,我们可以知道,消息队列的使用流程是:发布者向交换机发送要发布的消息→交换机通过某种规则将消息放入与交换机绑定的队列中→队列告知订阅该队列的消费者数据就绪。所以交换路由模块的作用就是:决定将一条消息发送到与交换机绑定的哪些队列中。
交换路由模块如何实现?
- ⼴播:将消息⼊队到该交换机的所有绑定队列中
- 直接:将消息⼊队到绑定信息中binding_key与消息routing_key⼀致的队列中
- 主题:将消息⼊队到绑定信息中binding_key与routing_key是匹配成功的队列中
什么是binding_key?
binding_key是队列(Queue)与交换机(Exchange)绑定时指定的路由规则,用于决定交换机如何将消息路由到队列。其作用类似于“过滤条件”,只有消息的 routingKey
满足 bindingKey
的匹配规则时,消息才会被投递到对应的队列。
binding_key的格式?
Binding Key 的格式取决于交换机类型,主要分为以下两类:
- Direct交换机:
- 格式要求:
bindingKey
必须是明确的字符串,不支持通配符。 - 匹配规则:消息的
routingKey
必须与bindingKey
完全一致(区分大小写)。
- 格式要求:
bindingKey = "payment.success"
匹配的 routingKey:仅限 "payment.success"
-
Topic 交换机:
- 格式要求:
bindingKey
是由.
分隔的单词组成,支持两种通配符:*
:匹配一个单词(占位符)。#
:匹配零或多个单词(通配符)。
- 格式要求:
bindingKey = "*.error"
匹配的 routingKey:
- "app.error"(匹配)
- "network.error"(匹配)
- "app.error.network"(不匹配,`*`仅占一个单词)
bindingKey = "logs.#"
匹配的 routingKey:
- "logs"(匹配,#可匹配零单词)
- "logs.app"(匹配)
- "logs.app.error"(匹配)
什么是routing_key?
Routing Key 是生产者发送消息时指定的路由标识符,用于指示交换机(Exchange)如何将消息路由到绑定(Binding)的队列(Queue)。其作用类似于“地址标签”,交换机根据 routingKey
和队列的 bindingKey
匹配规则,决定消息应投递到哪些队列。
routing_key的格式?
消费者管理模块
由于消费者客户端获取数据的方式是通过订阅一个队列,接收队列推送的消息,所以我们可以以队列为单位对消费者进行管理。
因此操作流程通常是:从队列关联的消息管理中取出消息,从队列关联的消费者中取出⼀个消费者, 然后将消息推送给消费者(这就是发布订阅中负载均衡的⽤法)。
- 消费者信息:
- 消费者标识
- 订阅队列名称
- ⾃动应答标志(决定了⼀条消息推送给消费者后,是否需要等待收到确认后再删除消息)
- 消息处理回调函数指针(⼀个消息发布后调⽤回调,选择消费者进⾏推送....)
- void(const std::string& tag, const BasicProperties& p, const std::string& body)
- 消费者管理:添加,删除,轮询获取指定队列的消费者,移除队列所有消费者等操作
信道管理模块
在概念上,我们提到过,信道是TCP连接上的逻辑通道,多个信道共享同一个物理连接,每个信道有自己的独立ID,消息发布、订阅等操作通过不同的信道隔离执行,互不干扰,这就意味着,信道是一个面向用户的模块,将上述的模块进行整合,并为用户提供服务。
维度 | 单连接模型 | AMQP Channel模型 |
---|---|---|
TCP连接数 | 每个操作需独立连接,资源占用高 | 多Channel复用单连接,资源高效 |
并发能力 | 多连接管理复杂,线程竞争严重 | 单连接多Channel,天然支持并发 |
错误影响域 | 连接异常导致所有操作中断 | Channel独立,故障局部化 |
协议扩展性 | 新增功能需修改底层协议 | Channel作为抽象层,协议易扩展 |
- 管理信息:
- 信道ID
- 信道关联的消费者
- 信道关联的连接
- 信道关联的虚拟机
- ⼯作线程池(⼀条消息被发布到队列后,需要将消息推送给订阅了对应队列的消费者,过程由线程池完成)
- 管理操作:
- 提供声明&删除交换机操作(删除交换机的同时删除交换机关联的绑定信息)
- 提供声明&删除队列操作(删除队列的同时,删除队列关联的绑定信息,消息,消费者信息)
- 提供绑定&解绑队列操作
- 提供订阅&取消订阅队列消息操作
- 提供发布&确认消息操作
连接管理模块
尽管我们使用信道为单位向外提供服务,但实现网络通信时还是需要用TCP连接来实现,为了聚焦于实现消息队列的主要业务和加快开发速度,我们使用前面学习过的muduo库来完成底层通信。这里的连接管理模块,实际上是对muduo库提供的Connection进行二次封装管理,提供我们项目所需的额外操作。
- 管理信息:
- 连接关联的信道
- 连接关联的muduo库Connection
- 管理操作:
- 新增连接
- 删除连接
- 获取连接
- 打开信道
- 关闭信道
Broker服务器模块
我们的Broker模块将整合以上的所有模块,并搭建网络通信服务器,实现与客户端的网络通信,能够识别客户端的请求,并提供相对应的处理服务。
- 管理信息:
- 虚拟机管理模块句柄
- 消费者管理模块句柄
- 连接管理模块句柄
- ⼯作线程池句柄
- muduo库通信所需元素
客户端模块
消费者管理模块
在客户端设计中,消费者(Consumer)的感知被有意简化,以信道(Channel)为交互核心。用户通过信道完成所有操作(如发布、订阅),而消费者仅作为信道的附属逻辑隐式存在。这种设计降低了用户负担,同时契合"一个信道绑定一个队列"的轻量化实现。
- 消费者信息:
- 标识
- 订阅队列名称
- ⾃动应答标志(决定了⼀条消息推送给消费者后,是否需要等待收到确认后再删除消息)
- 消息处理回调函数指针(在消费者接收到消息时触发回调,用于处理消息内容及确认状态)
- 消费者管理:添加,删除,轮询获取指定队列的消费者,移除队列所有消费者等操作
信道管理模块
- 信道管理信息:
- 信道ID
- 信道关联的连接
- 信道关联的消费者
- 请求对应的响应信息队列(这⾥队列使⽤hash表,以便于查找指定的响应)
- 互斥锁&条件变量(⼤部分的请求都是阻塞操作,发送请求后需要等到响应才能继续,但是muduo库的通信是异步的,因此需要我们⾃⼰在收到响应后,通过判断是否是等待的指定响应来进⾏同步)
- 信道管理操作:
- 提供创建信道操作
- 提供删除信道操作
- 提供声明交换机操作(强断⾔-有则OK,没有则创建)
- 提供删除交换机
- 提供创建队列操作(强断⾔-有则OK,没有则创建)
- 提供删除队列操作
- 提供交换机-队列绑定/解绑操作
- 提供添加/取消订阅操作
- 提供发布/确认消息操作
连接管理模块
- 管理信息:
- 连接关联的实际⽤于通信的muduo::net::Connection连接
- 连接关联的信管理句柄(实现信道的增删查)
- 连接关联的EventLoop异步循环⼯作线程
- 异步⼯作线程池(⽤于对收到服务器推送过来的消息进⾏处理的线程池)
- 管理操作:
- 提供创建Channel信道的操作
- 提供删除Channel信道的操作
项目模块关系图
为了保证项目的逻辑清晰,我们先并且规划开发⽬录:
rabbit-like-mq/
|-- mqdemo
|-- mqclient
|-- mqcommon
|-- mqserver
|-- mqtest
|-- mqthird
- mqdemo:编写⼀些功能⽤例时所在的⽬录
- mqcommon: 公共模块代码(线程池、数据库访问、⽂件访问、⽇志打印、通信协议等等)
- mqclient: 客户端模块代码
- mqserver: 服务器模块代码
- mqtest:单元测试代码
- mqthird: 存放使用的第三方库文件
项目开发
通用模块
日志打印工具
为了方便开发过程中出bug时,能够快速定位错误位置,我们封装一个简单的日志打印类。
#ifndef __M_LOG_H__
#define __M_LOG_H__
#include <iostream>
#include <ctime>
#define DBG_LEVEL 0
#define INF_LEVEL 1
#define ERR_LEVEL 2
#define DEFAULT_LEVEL DBG_LEVEL
#define LOG(lev_str, level, format, ...) {\
if (level >= DEFAULT_LEVEL) {\
time_t t = time(nullptr);\
struct tm* ptm = localtime(&t);\
char time_str[32];\
strftime(time_str, 31, "%H:%M:%S", ptm);\
printf("[%s][%s][%s:%d]\t" format "\n", lev_str, time_str, __FILE__, __LINE__, ##__VA_ARGS__);\
}\
}
#define DLOG(format, ...) LOG("DBG", DBG_LEVEL, format, ##__VA_ARGS__)
#define ILOG(format, ...) LOG("INF", INF_LEVEL, format, ##__VA_ARGS__)
#define ELOG(format, ...) LOG("ERR", ERR_LEVEL, format, ##__VA_ARGS__)
#endif
日志打印类提供Debug、Info、Error三种日志等级,调用日志函数时,调用方式与printf类似,能够打印出:日志等级、打印时间、所在文件、代码行号,这对于我们后续调试是非常有利的。
实用工具模块
实用⼯具模块中要完成的是项⽬中需要的⼀些辅助功能代码实现,其中包括:⽂件的基础操作,字符串的分割操作、数据库操作、UUID生成。
文件基础操作
由于我们的项目支持队列、交换机、绑定信息等元数据的持久化,所以势必会涉及到对文件的操作,为了便利后续开发工作,我们将常用的文件基础操作封装成FileHelper类。这个类需要对外提供的操作:
-
⽂件是否存在判断
-
⽂件⼤⼩获取
-
⽂件读/写
-
文件重命名
-
⽂件创建/删除
-
⽬录创建/删除
class FileHelper {
public:
FileHelper(const std::string &filename):_filename(filename){}
bool exists() {
struct stat st;
return (stat(_filename.c_str(), &st) == 0);
}
size_t size() {
struct stat st;
int ret = stat(_filename.c_str(), &st);
if (ret < 0) {
return 0;
}
return st.st_size;
}
bool read(char *body, size_t offset, size_t len) {
//1. 打开文件
std::ifstream ifs(_filename, std::ios::binary | std::ios::in);
if (ifs.is_open() == false) {
ELOG("%s 文件打开失败!", _filename.c_str());
return false;
}
//2. 跳转文件读写位置
ifs.seekg(offset, std::ios::beg);
//3. 读取文件数据
ifs.read(body, len);
if (ifs.good() == false) {
ELOG("%s 文件读取数据失败!!", _filename.c_str());
ifs.close();
return false;
}
//4. 关闭文件
ifs.close();
return true;
}
bool read(std::string &body) {
//获取文件大小,根据文件大小调整body的空间
size_t fsize = this->size();
body.resize(fsize);
return read(&body[0], 0, fsize);
}
bool write(const char *body, size_t offset, size_t len) {
//1. 打开文件
std::fstream fs(_filename, std::ios::binary | std::ios::in | std::ios::out);
if (fs.is_open() == false) {
ELOG("%s 文件打开失败!", _filename.c_str());
return false;
}
//2. 跳转到文件指定位置
fs.seekp(offset, std::ios::beg);
//3. 写入数据
fs.write(body, len);
if (fs.good() == false) {
ELOG("%s 文件写入数据失败!!", _filename.c_str());
fs.close();
return false;
}
//4. 关闭文件
fs.close();
return true;
}
bool write(const std::string &body) {
return write(body.c_str(), 0, body.size());
}
bool rename(const std::string &nname) {
return (::rename(_filename.c_str(), nname.c_str()) == 0);
}
static std::string parentDirectory(const std::string &filename) {
// /aaa/bb/ccc/ddd/test.txt
size_t pos = filename.find_last_of("/");
if (pos == std::string::npos) {
// test.txt
return "./";
}
std::string path = filename.substr(0, pos);
return path;
}
static bool createFile(const std::string &filename) {
std::fstream ofs(filename, std::ios::binary | std::ios::out);
if (ofs.is_open() == false) {
ELOG("%s 文件打开失败!", filename.c_str());
return false;
}
ofs.close();
return true;
}
static bool removeFile(const std::string &filename) {
return (::remove(filename.c_str()) == 0);
}
static bool createDirectory(const std::string &path) {
// aaa/bbb/ccc cccc
// 在多级路径创建中,我们需要从第一个父级目录开始创建
size_t pos, idx = 0;
while(idx < path.size()) {
pos = path.find("/", idx);
if (pos == std::string::npos) {
return (mkdir(path.c_str(), 0775) == 0);
}
std::string subpath = path.substr(0, pos);
int ret = mkdir(subpath.c_str(), 0775);
if (ret != 0 && errno != EEXIST) {
ELOG("创建目录 %s 失败: %s", subpath.c_str(), strerror(errno));
return false;
}
idx = pos + 1;
}
return true;
}
static bool removeDirectory(const std::string &path) {
// rm -rf path
// system()
std::string cmd = "rm -rf " + path;
return (system(cmd.c_str()) != -1);
}
private:
std::string _filename;
};
SQLite基础操作类
我们的SQLite基础操作类需要能够:打开数据库、执行语句、关闭数据库,而实际上,我们在学习项目相关库时,就已经完成了封装:
class SqliteHelper {
public:
typedef int(*SqliteCallback)(void*,int,char**,char**);
SqliteHelper(const std::string &dbfile) : _dbfile(dbfile), _handler(nullptr){}
bool open(int safe_leve = SQLITE_OPEN_FULLMUTEX) {
//int sqlite3_open_v2(const char *filename, sqlite3 **ppDb, int flags, const char *zVfs );
int ret = sqlite3_open_v2(_dbfile.c_str(), &_handler, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | safe_leve, nullptr);
if (ret != SQLITE_OK) {
ELOG("创建/打开sqlite数据库失败: %s", sqlite3_errmsg(_handler));
return false;
}
return true;
}
bool exec(const std::string &sql, SqliteCallback cb, void *arg) {
//int sqlite3_exec(sqlite3*, char *sql, int (*callback)(void*,int,char**,char**), void* arg, char **err)
int ret = sqlite3_exec(_handler, sql.c_str(), cb, arg, nullptr);
if (ret != SQLITE_OK) {
ELOG("%s \n语句执行失败: %s", sql.c_str(), sqlite3_errmsg(_handler));
return false;
}
return true;
}
void close() {
if (_handler) sqlite3_close_v2(_handler);
}
private:
std::string _dbfile;
sqlite3 *_handler;
};
class StrHelper{
public:
static size_t split(const std::string &str, const std::string &sep, std::vector<std::string> &result) {
size_t pos, idx = 0;
while(idx < str.size()) {
pos = str.find(sep, idx);
if (pos == std::string::npos) {
result.push_back(str.substr(idx));
return result.size();
}
if (pos == idx) {
idx = pos + sep.size();
continue;
}
result.push_back(str.substr(idx, pos - idx));
idx = pos + sep.size();
}
return result.size();
}
};
字符串操作类
我们后续在进行路由匹配时,需要根据routing_key和binding_key决定交换机把消息给哪些队列,所以会涉及到字符串操作,因此也封装一个类,其实主要提供的操作就是根据分隔符将字符串分割后,放入字符串数组:
class StrHelper{
public:
static size_t split(const std::string &str, const std::string &sep, std::vector<std::string> &result) {
size_t pos, idx = 0;
while(idx < str.size()) {
pos = str.find(sep, idx);
if (pos == std::string::npos) {
result.push_back(str.substr(idx));
return result.size();
}
if (pos == idx) {
idx = pos + sep.size();
continue;
}
result.push_back(str.substr(idx, pos - idx));
idx = pos + sep.size();
}
return result.size();
}
};
UUID生成类
什么是UUID?
UUID(Universally Unique Identifier,通用唯一识别码)是一种用于标识信息的128位(16字节)标准化格式,旨在保证跨空间和时间的全局唯一性。
如何生成UUID?
class UUIDHelper {
public:
static std::string uuid() {
std::random_device rd; // 硬件熵源
std::mt19937_64 generator(rd()); // 64位梅森旋转算法引擎
std::uniform_int_distribution<int> distribution(0, 255);
std::stringstream ss;
for (int i = 0; i < 8; i++) {
ss << std::setw(2) << std::setfill('0') << std::hex << distribution(gernator) ;
if (i == 3 || i == 5 || i == 7) {
ss << "-";
}
}
static std::atomic<size_t> seq(1);
size_t num = seq.fetch_add(1);
for (int i = 7; i >= 0; i--) {
ss << std::setw(2) << std::setfill('0') << std::hex << ((num>>(i*8)) & 0xff);
if (i == 6) ss << "-";
}
return ss.str();
}
};
消息与交换机类型定义
在正式编写项目代码之前,我们需要定义消息类型,并且考虑到进行持久化时,需要将消息数据写入到磁盘文件中,恢复历史数据时,要从磁盘读取消息数据到内存,这涉及数据的序列化和反序列化,因此消息的类型定义我们使⽤protobuf来进⾏⽣成。
也就是说,定义消息类型实际上就是定义⼀个消息类型的proto⽂件,并⽣成相关代码。
消息所需要素:
- 消息本身要素:
- 消息属性:消息ID、消息投递模式(持久化/非持久化)、routing_key
- 消息的有效载荷
- 消息持久化所需额外要素:
- 消息存储位置
- 消息长度
- 消息是否有效
- 交换机类型:
- DIRECT
- FANOUT
- TOPIC
- 消息投递模式:
- UNDURABLE:在RabbitMQ中,此模式的值为1
- DURABLE :值为2
mq_msg.proto:
-
message BasicProperties{ string id = 1; DeliveryMode delivery_mode = 2; string routing_key = 3; }; message Message { message Payload { BasicProperties properties = 1; string body = 2; string valid = 3; }; Payload payload = 1; uint32 offset = 2; uint32 length = 3; };
数据管理模块
关于分层架构
其实一开始在设计数据管理模块时,我没有考虑进行分层,而是试图把所有功能塞到一个万能类中,于是遇到了三个非常困扰我的问题:
痛点 | 具体表现 |
---|---|
修改代码困难 | 修改交换机路由算法时,意外触发协议解析错误 |
性能卡顿 | 同步写入磁盘导致每秒只能处理50个请求 |
扩展性差 | 由于代码之间耦合度高,导致扩展时需要修改很多东西 |
交换机数据管理
定义交换机数据类:描述交换机的核心元数据,相当于给每个交换机发一张"身份证"
- 交换机名称
- 交换机类型
- 是否持久化标志
- 是否⾃动删除标志
- 其他参数(以便于后续扩展)
// 1. 定义交换机类
struct Exchange
{
using ptr = std::shared_ptr<Exchange>;
// 1. 交换机名称
std::string name;
// 2. 交换机类型
ExchangeType type;
// 3. 交换机持久化标志
bool durable;
// 4. 是否自动删除标志
bool auto_delete;
// 5. 其他参数
google::protobuf::Map<std::string, std::string> args;
Exchange() {}
Exchange(const std::string &ename,
ExchangeType etype,
bool edurable,
bool eauto_delete,
const google::protobuf::Map<std::string, std::string>& eargs) : name(ename), type(etype), durable(edurable),
auto_delete(eauto_delete), args(eargs) {}
// args存储键值对,在存储数据库的时候,会组织一个格式字符串进行存储 key=val&key=val....
// 内部解析str_args字符串,将内容存储到成员中
void setArgs(const std::string &str_args)
{
// key=val&key=val&
std::vector<std::string> sub_args;
StrHelper::split(str_args, "&", sub_args);
for (auto &str : sub_args)
{
size_t pos = str.find("=");
std::string key = str.substr(0, pos);
std::string val = str.substr(pos + 1);
args[key] = val;
}
}
// 将args中的内容进行序列化后,返回一个字符串
std::string getArgs()
{
std::string result;
for (auto start = args.begin(); start != args.end(); ++start)
{
result += start->first + "=" + start->second + "&";
}
return result;
}
};
- 创建/删除交换机数据表
- 新增交换机数据
- 移除交换机数据
- 查询所有交换机数据
- 查询指定交换机数据(根据名称)
using ExchangeMap = std::unordered_map<std::string, Exchange::ptr>;
// 2. 定义交换机数据持久化管理类--数据存储在sqlite数据库中
class ExchangeMapper
{
public:
ExchangeMapper(const std::string &dbfile) : _sql_helper(dbfile)
{
std::string path = FileHelper::parentDirectory(dbfile);
FileHelper::createDirectory(path);
assert(_sql_helper.open());
createTable();
}
void createTable()
{
#define CREATE_TABLE "create table if not exists exchange_table(\
name varchar(32) primary key, \
type int, \
durable int, \
auto_delete int, \
args varchar(128));"
bool ret = _sql_helper.exec(CREATE_TABLE, nullptr, nullptr);
if (ret == false)
{
DLOG("创建交换机数据库表失败!!");
abort(); // 直接异常退出程序
}
}
void removeTable()
{
#define DROP_TABLE "drop table if exists exchange_table;"
bool ret = _sql_helper.exec(DROP_TABLE, nullptr, nullptr);
if (ret == false) {
DLOG("删除交换机数据库表失败!!");
abort();//直接异常退出程序
}
}
bool insert(Exchange::ptr &exp)
{
std::stringstream ss;
ss << "insert into exchange_table values(";
ss << "'" << exp->name << "', ";
ss << exp->type << ", ";
ss << exp->durable << ", ";
ss << exp->auto_delete << ", ";
ss << "'" << exp->getArgs() << "');";
return _sql_helper.exec(ss.str(), nullptr, nullptr);
}
void remove(const std::string &name)
{
std::stringstream ss;
ss << "delete from exchange_table where name=";
ss << "'" << name << "';";
_sql_helper.exec(ss.str(), nullptr, nullptr);
}
ExchangeMap recovery()
{
ExchangeMap result;
std::string sql = "select name, type, durable, auto_delete, args from exchange_table;";
_sql_helper.exec(sql, selectCallback, &result);
return result;
}
private:
static int selectCallback(void *arg, int colnum, char **row, char **fields)
{
ExchangeMap *result = (ExchangeMap *)arg;
auto exp = std::make_shared<Exchange>();
exp->name = row[0];
exp->type = (wsmq::ExchangeType)std::stoi(row[1]);
exp->durable = (bool)std::stoi(row[2]);
exp->auto_delete = (bool)std::stoi(row[3]);
if (row[4])
exp->setArgs(row[4]);
result->insert(std::make_pair(exp->name, exp));
return 0;
}
private:
SqliteHelper _sql_helper;
};
- 声明交换机,并添加管理(存在则OK,不存在则创建)
- 删除交换机
- 获取指定交换机
- 销毁所有交换机数据
// 3. 定义交换机数据内存管理类
class ExchangeManager
{
public:
using ptr = std::shared_ptr<ExchangeManager>;
ExchangeManager(const std::string &dbfile) : _mapper(dbfile)
{
_exchanges = _mapper.recovery();
}
// 声明交换机
bool declareExchange(const std::string &name,
ExchangeType type, bool durable, bool auto_delete,
const google::protobuf::Map<std::string, std::string>& args)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _exchanges.find(name);
if (it != _exchanges.end())
{
// 如果交换机已经存在,那就直接返回,不需要重复新增。
return true;
}
auto exp = std::make_shared<Exchange>(name, type, durable, auto_delete, args);
if (durable == true)
{
bool ret = _mapper.insert(exp);
if (ret == false)
return false;
}
_exchanges.insert(std::make_pair(name, exp));
return true;
}
// 删除交换机
void deleteExchange(const std::string &name)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _exchanges.find(name);
if (it == _exchanges.end())
{
return;
}
if (it->second->durable == true)
_mapper.remove(name);
_exchanges.erase(name);
}
// 获取指定交换机对象
Exchange::ptr selectExchange(const std::string &name)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _exchanges.find(name);
if (it == _exchanges.end())
{
return Exchange::ptr();
}
return it->second;
}
// 判断交换机是否存在
bool exists(const std::string &name)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _exchanges.find(name);
if (it == _exchanges.end())
{
return false;
}
return true;
}
size_t size()
{
std::unique_lock<std::mutex> lock(_mutex);
return _exchanges.size();
}
// 清理所有交换机数据
void clear()
{
std::unique_lock<std::mutex> lock(_mutex);
_mapper.removeTable();
_exchanges.clear();
}
private:
std::mutex _mutex;
ExchangeMapper _mapper;
ExchangeMap _exchanges;
};
分层架构的价值:
场景 | 协议层 | 业务层 | 持久化层 |
---|---|---|---|
新增交换机类型 | 无需改动 | 修改路由算法 | 无需改动 |
更换数据库(如MySQL) | 无需改动 | 无需改动 | 重写SQL逻辑 |
支持HTTP协议 | 新增协议解析逻辑 | 复用现有接口 | 无需改动 |
队列数据管理
- 队列名称
- 是否持久化标志
- 是否独占标志
- 是否⾃动删除标志
- 其他参数
struct MsgQueue
{
using ptr = std::shared_ptr<MsgQueue>;
std::string _name;
bool _durable;
bool _exclusive; // 是否独占标识
bool _auto_delete;
// std::unordered_map<std::string, std::string> _args;
google::protobuf::Map<std::string, std::string> _args;
MsgQueue() {}
MsgQueue(std::string name,
bool durable, bool exclusive, bool auto_delete,
const google::protobuf::Map<std::string, std::string>& args) : _name(name),
_durable(durable), _exclusive(exclusive),
_auto_delete(auto_delete),
_args(args)
{
}
void setArgs(const std::string &str_args)
{
// key=val&key=val&
std::vector<std::string> sub_args;
StrHelper::split(str_args, "&", sub_args);
for (auto &str : sub_args)
{
size_t pos = str.find("=");
std::string key = str.substr(0, pos);
std::string val = str.substr(pos + 1);
_args[key] = val;
}
}
// 将args中的内容进行序列化后,返回一个字符串
std::string getArgs()
{
std::string result;
for (auto start = _args.begin(); start != _args.end(); ++start)
{
result += start->first + "=" + start->second + "&";
}
return result;
}
};
- 创建/删除队列数据表
- 新增队列数据
- 移除队列数据
- 查询所有队列数据
using MsgQueueMap = std::unordered_map<std::string, MsgQueue::ptr>;
class MsgQueueMapper
{
public:
using ptr = std::shared_ptr<MsgQueueMapper>;
MsgQueueMapper(const std::string &dbfile) : _sql_helper(dbfile)
{
// 持久化类初始化时,根据数据库文件路径名创建目录,并打开数据库、建queue_table
std::string path = FileHelper::parentDirectory(dbfile);
FileHelper::createDirectory(path);
assert(_sql_helper.open());
createTable();
}
void createTable()
{
std::stringstream sql;
sql << "create table if not exists queue_table(";
sql << "name varchar(32) primary key, ";
sql << "durable int, ";
sql << "exclusive int, ";
sql << "auto_delete int, ";
sql << "args varchar(128));";
assert(_sql_helper.exec(sql.str(), nullptr, nullptr));
}
void removeTable()
{
std::string sql = "drop table if exists queue_table;";
_sql_helper.exec(sql, nullptr, nullptr);
}
bool insert(MsgQueue::ptr &queue)
{
std::stringstream sql;
sql << "insert into queue_table values(";
sql << "'" << queue->_name << "', ";
sql << queue->_durable << ", ";
sql << queue->_exclusive << ", ";
sql << queue->_auto_delete << ", ";
sql << "'" << queue->getArgs() << "');";
return _sql_helper.exec(sql.str(), nullptr, nullptr);
}
void remove(const std::string &name)
{
std::stringstream sql;
sql << "delete from queue_table where name=";
sql << "'" << name << "';";
_sql_helper.exec(sql.str(), nullptr, nullptr);
}
MsgQueueMap recovery()
{
MsgQueueMap result;
std::string sql = "select name, durable, exclusive, auto_delete, args from queue_table;";
_sql_helper.exec(sql, selectCallBack, &result);
return result;
}
private:
static int selectCallBack(void *args, int numcol, char **row, char **feilds)
{
MsgQueueMap *result = (MsgQueueMap *)args;
auto msp = std::make_shared<MsgQueue>();
msp->_name = row[0];
msp->_durable = (bool)std::stoi(row[1]);
msp->_exclusive = (bool)std::stoi(row[2]);
msp->_auto_delete = (bool)std::stoi(row[3]);
if (row[4])
msp->setArgs(row[4]);
result->insert(std::make_pair(msp->_name, msp));
return 0; // 这个回调函数必须返回0,返回非0则认为错误
}
private:
SqliteHelper _sql_helper;
};
- 创建队列,并添加管理(存在则OK,不存在则创建)
- 删除队列
- 获取指定队列
- 获取所有队列
- 判断指定队列是否存在
- 获取队列数量
- 销毁所有队列数据
class MsgQueueManager
{
public:
using ptr = std::shared_ptr<MsgQueueManager>;
MsgQueueManager(const std::string &dbfile) : _mapper(dbfile)
{
_queues = _mapper.recovery(); // 将持久化的数据恢复到内存中(如果有的话)
}
bool declareQueue(const std::string &name,
bool durable, bool exclusive, bool auto_delete,
const google::protobuf::Map<std::string, std::string>& args)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _queues.find(name);
if (it != _queues.end()) // 如果存在,就不用新增了
{
return true;
}
MsgQueue::ptr mqp = std::make_shared<MsgQueue>();
mqp->_name = name;
mqp->_durable = durable;
mqp->_exclusive = exclusive;
mqp->_auto_delete = auto_delete;
mqp->_args = args;
if (durable == true) {
bool ret = _mapper.insert(mqp);
if (ret == false) return false;
}
_queues.insert(std::make_pair(name, mqp));
return true;
}
void removeQueue(const std::string &name)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _queues.find(name);
if (it == _queues.end())
{
return;
}
if (it->second->_durable == true)
{
_mapper.remove(name);
}
_queues.erase(name);
}
bool exists(const std::string &name)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _queues.find(name);
if (it == _queues.end())
{
return false;
}
return true;
}
MsgQueueMap allQueues()
{
std::unique_lock<std::mutex> lock(_mutex);
return _queues;
}
MsgQueue::ptr selectMsgQueue(const std::string &name)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _queues.find(name);
if (it == _queues.end())
{
return MsgQueue::ptr();
}
return it->second;
}
size_t size()
{
std::unique_lock<std::mutex> lock(_mutex);
return _queues.size();
}
void clear()
{
_mapper.removeTable();
_queues.clear();
}
private:
std::mutex _mutex;
MsgQueueMapper _mapper;
MsgQueueMap _queues;
};
绑定信息管理
- 交换机名称
- 队列名称
- binding_key(分发匹配规则-决定了哪些数据能被交换机放⼊队列)
struct Binding
{
using ptr = std::shared_ptr<Binding>;
std::string _binding_key;
std::string _exchange_name;
std::string _queue_name;
Binding(){}
Binding(const std::string& exchange_name, const std::string& queue_name, const std::string& binding_key):
_binding_key(binding_key), _exchange_name(exchange_name),
_queue_name(queue_name)
{}
};
- 创建/删除绑定信息数据表
- 新增绑定信息数据
- 移除指定绑定信息数据
- 移除指定交换机相关绑定信息数据:移除交换机的时候会被调⽤
- 移除指定队列相关绑定信息数据:移除队列的时候会被调⽤
- 查询所有绑定信息数据:⽤于重启服务器时进⾏历史数据恢复
using MsgQueueBindingMap = std::unordered_map<std::string, Binding::ptr>; // 队列名与绑定信息的映射
// 交换机名与队列绑定信息的映射,包含了所有绑定信息,并以交换机为单位进行区分
using BindingMap = std::unordered_map<std::string, MsgQueueBindingMap>;
class BindingMapper
{
public:
using ptr = std::shared_ptr<BindingMapper>;
BindingMapper(const std::string& dbfile): _sql_helper(dbfile)
{
std::string path = FileHelper::parentDirectory(dbfile);
FileHelper::createDirectory(path);
assert(_sql_helper.open());
createTable();
}
void createTable()
{
std::stringstream sql;
sql << "create table if not exists binding_table(";
sql << "exchange_name varchar(32), ";
sql << "msgqueue_name varchar(32), ";
sql << "binding_key varchar(128));";
assert(_sql_helper.exec(sql.str(), nullptr, nullptr));
}
void removeTable()
{
std::string sql = "drop table if exists binding_table;";
_sql_helper.exec(sql, nullptr, nullptr);
}
bool insert(Binding::ptr& binding)
{
std::stringstream sql;
sql << "insert into binding_table values(";
sql << "'" << binding->_exchange_name << "', ";
sql << "'" << binding->_queue_name << "', ";
sql << "'" << binding->_binding_key << "');";
return _sql_helper.exec(sql.str(), nullptr, nullptr);
}
void remove(const std::string& ename, const std::string& qname)
{
std::stringstream sql;
sql << "delete from binding_table where ";
sql << "exchange_name='" << ename << "' and ";
sql << "msgqueue_name='" << qname << "';";
_sql_helper.exec(sql.str(), nullptr, nullptr);
}
void removeExchangeBinding(const std::string& ename)
{
std::stringstream sql;
sql << "delete from binding_table where ";
sql << "exchange_name='" << ename << "';";
_sql_helper.exec(sql.str(), nullptr, nullptr);
}
void removeQueueBinding(const std::string& qname)
{
std::stringstream sql;
sql << "delete from binding_table where ";
sql << "msgqueue_name='" << qname << "';";
_sql_helper.exec(sql.str(), nullptr, nullptr);
}
BindingMap recovery()
{
BindingMap result;
// select exchange_name, msgqueue_name, binding_key from binding_table;
std::string sql = "select exchange_name, msgqueue_name, binding_key from binding_table;";
_sql_helper.exec(sql, selectCallback, &result);
return result;
}
private:
static int selectCallback(void* arg, int colnum, char** row, char** fields)
{
BindingMap* result = (BindingMap*)arg;
Binding::ptr bp = std::make_shared<Binding>(row[0], row[1], row[2]);
MsgQueueBindingMap& qmap = (*result)[bp->_exchange_name];
qmap.insert(std::make_pair(bp->_queue_name, bp));
return 0;
}
SqliteHelper _sql_helper;
};
- 创建绑定信息,并添加管理(存在则OK,不存在则创建)
- 解除指定的绑定信息
- 删除指定队列的所有绑定信息
- 删除交换机相关的所有绑定信息
- 获取交换机相关的所有绑定信息:交换机收到消息后,需要分发给⾃⼰关联的队列
- 判断指定绑定信息是否存在
- 获取当前绑定信息数量
- 销毁所有绑定信息数据
class BindingManager
{
public:
using ptr = std::shared_ptr<BindingManager>;
BindingManager(const std::string& dbfile): _mapper(dbfile)
{
_bindings = _mapper.recovery();
}
bool bind(const std::string& ename, const std::string& qname,
const std::string& key, bool durable) // 由外界决定是否持久化绑定信息
// 当队列和交换机都要持久化时,在持久化
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _bindings.find(ename);
if(it != _bindings.end() && it->second.find(qname) != it->second.end())
{
return true;
}
Binding::ptr bp = std::make_shared<Binding>(ename, qname, key);
if(durable == true)
{
bool ret = _mapper.insert(bp);
if(ret == false) return false;
}
MsgQueueBindingMap& qmap = _bindings[ename];
qmap.insert(std::make_pair(qname, bp));
return true;
}
void unbind(const std::string& ename, const std::string& qname)
{
std::unique_lock<std::mutex> lock(_mutex);
auto eit = _bindings.find(ename);
if(eit == _bindings.end()) return ;
auto qit = eit->second.find(qname);
if(qit == eit->second.end()) return ;
_mapper.remove(ename, qname);
_bindings[ename].erase(qname);
}
void removeExchangeBind(const std::string& ename)
{
std::unique_lock<std::mutex> lock(_mutex);
_mapper.removeExchangeBinding(ename);
_bindings.erase(ename);
}
void removeQueueBind(const std::string& qname)
{
std::unique_lock<std::mutex> lock(_mutex);
_mapper.removeQueueBinding(qname);
for(auto start = _bindings.begin(); start != _bindings.end(); start++)
{
start->second.erase(qname);
}
}
MsgQueueBindingMap getExchangeBindings(const std::string& ename)
{
std::unique_lock<std::mutex> lock(_mutex);
auto eit = _bindings.find(ename);
if(eit == _bindings.end()) return MsgQueueBindingMap();
return eit->second;
}
Binding::ptr getBinding(const std::string& ename, const std::string& qname)
{
std::unique_lock<std::mutex> lock(_mutex);
auto eit = _bindings.find(ename);
if(eit == _bindings.end()) return Binding::ptr();
auto qit = eit->second.find(qname);
if(qit == eit->second.end()) return Binding::ptr();
return qit->second;
}
bool exists(const std::string& ename, const std::string& qname)
{
std::unique_lock<std::mutex> lock(_mutex);
auto eit = _bindings.find(ename);
if(eit == _bindings.end()) return false;
auto qit = eit->second.find(qname);
if(qit == eit->second.end()) return false;
return true;
}
size_t size()
{
size_t total_size = 0;
std::unique_lock<std::mutex> lock(_mutex);
for(auto start = _bindings.begin(); start != _bindings.end(); start++)
{
total_size += start->second.size();
}
return total_size;
}
void clear()
{
std::unique_lock<std::mutex> lock(_mutex);
_mapper.removeTable();
_bindings.clear();
}
private:
std::mutex _mutex;
BindingMapper _mapper;
BindingMap _bindings;
};
场景 | 协议层 | 业务层 | 持久化层 |
---|---|---|---|
新增队列属性(如TTL) | 无需改动 | 修改Queue 类定义 | 修改数据库表结构 |
更换存储引擎(如Redis) | 无需改动 | 无需改动 | 重写持久化类逻辑 |
实现集群模式 | 新增路由协议 | 扩展内存表为分布式缓存 | 适配分布式数据库 |
消息数据管理
因为消息数据需要在⽹络中进⾏传输,因此消息的类型定义使⽤protobuf进⾏,因为protobuf中⾃带了序列化和反序列化功能,因此操作起来会简便⼀些。需要特别说明的是,消息的存储并没有使⽤数据库,而是使用文件存储,内存中管理的消息只需要记录好⾃⼰在⽂件中的所在位置和⻓度即可。为了便于管理,消息管理以队列为单元进⾏管理,因此每个队列都会有⾃⼰独⽴的数据存储⽂件。
为什么使用文件持久化消息?
对比项 | 数据库存储 | 文件存储 |
---|---|---|
大消息支持 | BLOB字段性能差 | 顺序追加写入,吞吐量高 |
扩展性 | 需预定义表结构 | 天然支持任意长度数据 |
崩溃恢复 | 事务回滚复杂 | 通过长度头快速定位消息边界 |
适用场景 | 元数据(交换机/队列) | 消息内容(高频写入) |
为什么以队列为单元在内存管理数据?
对比维度 | 以队列为单元(独立文件) | 集中式存储(单文件/数据库) |
---|---|---|
性能优化 | 每个队列独立文件,顺序追加写入(如order.msg 只存订单消息),磁盘IO效率高 | 所有队列混存同一文件,需频繁寻址和随机写入,IO性能下降5-10倍 |
资源隔离 | 单队列文件损坏或过载不影响其他队列(如支付队列异常,订单队列仍正常) | 单点故障或大消息写入导致全局阻塞 |
维护便利性 | 按队列清理过期消息(如直接删除logs.msg 文件) | 需在混杂数据中扫描删除,垃圾回收耗时增长10倍 |
扩展性 | 天然支持分布式存储(不同队列文件可放在不同磁盘/服务器) | 集中式存储扩容复杂,易成性能瓶颈 |
崩溃恢复 | 通过文件偏移量快速定位队列消息(如从payment.msg 的1MB处恢复) | 需遍历全局数据重建状态,恢复时间不可控 |
- 属性:消息ID, 路由主题,持久化模式标志
- 消息内容
- 有效标志(持久化需要)
- 持久化位置(内存中)
- 持久化消息⻓度(内存中)
enum DeliveryMode {
UNKNOWMODE = 0;
UNDURABLE = 1;
DURABLE = 2;
};
message BasicProperties{
string id = 1;
DeliveryMode delivery_mode = 2;
string routing_key = 3;
};
message Message {
message Payload {
BasicProperties properties = 1;
string body = 2;
string valid = 3;
};
Payload payload = 1;
uint32 offset = 2;
uint32 length = 3;
};
- 管理数据
- 队列消息⽂件存储的路径
- 队列消息的存储⽂件名
- 队列消息的临时交换⽂件名
- 管理操作
- ⽇志消息存储在⽂件中(4B⻓度+(属性+内容+有效位)序列化消息,连续存储即可)
- 提供队列消息⽂件创建/删除功能
- 提供队列消息的新增持久化/删除持久化
- 提供持久化内容的垃圾回收(其实就是重新加载出所有有效消息返回,并重新⽣成新的消息存储⽂件)
#define DATAFILE_SUFFIX ".mqd"
#define TMPFILE_SUFFIX ".mqd.tmp"
using MessagePtr = std::shared_ptr<wsmq::Message>;
class MessageMapper
{
public:
MessageMapper(std::string& basedir, const std::string& qname): _qname(qname)
{
if(basedir.back() != '/') basedir.push_back('/');
_datafile = basedir + _qname + DATAFILE_SUFFIX;
_tmpfile = basedir + _qname + TMPFILE_SUFFIX;
if(FileHelper(basedir).exists() == false)
{
assert(FileHelper::createDirectory(basedir));
}
createMsgFile();
}
bool createMsgFile()
{
if(FileHelper(_datafile).exists() == true) // 导致[ERR][15:54:10][../mqserver/../mqcommon/mq_helper.hpp:120] ./data/message/queue1.mqd 文件读取数据失败!!
{
return true;
}
bool ret = FileHelper::createFile(_datafile);
if(ret == false)
{
DLOG("创建队列文件 %s 失败", _datafile.c_str());
return false;
}
return true;
}
void removeMsgFile()
{
FileHelper::removeFile(_datafile);
FileHelper::removeFile(_tmpfile);
}
bool insert(MessagePtr& msg)
{
return insert(_datafile, msg);
}
bool remove(MessagePtr& msg)
{
// 将msg的有效标志设置为'0'
msg->mutable_payload()->set_valid("0");
// 将msg序列化为字符串
std::string body = msg->payload().SerializeAsString();
if(body.size() != msg->length())
{
DLOG("不能修改文件中数据,因为新生成的数据和原数据长度不一致");
return false;
}
// 将序列化后消息写入数据在文件中位置(覆盖原有数据)
FileHelper helper(_datafile);
bool ret = helper.write(body.c_str(), msg->offset(), body.size());
if(ret == false)
{
DLOG("向队列文件写入数据失败");
return false;
}
return true;
}
std::list<MessagePtr> gc()
{
bool ret;
std::list<MessagePtr> result;
ret = load(result);
if(ret == false)
{
DLOG("加载有效数据失败");
return result;
}
DLOG("垃圾回收,得到有效消息数量:%ld", result.size());
// 有效数据存储到临时文件
FileHelper::createFile(_tmpfile);
for(auto& msg : result)
{
DLOG("向临时文件写入有效数据 %s", msg->payload().body().c_str());
ret = insert(_tmpfile, msg);
if(ret == false)
{
DLOG("向临时文件写入有效数据失败");
return result;
}
}
DLOG("垃圾回收后,临时文件大小:%ld", FileHelper(_tmpfile).size());
// 删除原文件
ret = FileHelper::removeFile(_datafile);
if(ret == false)
{
DLOG("删除原文件失败");
return result;
}
// 修改临时文件名为原文件名
ret = FileHelper(_tmpfile).rename(_datafile);
if(ret == false)
{
DLOG("修改临时文件名为原文件名失败");
return result;
}
// 返回全部有效数据
return result;
}
private:
bool insert(const std::string& filename, MessagePtr& msg)
{
// 新增数据添加到文件末尾
// 进行消息序列化,得到格式化字符串消息
std::string body = msg->payload().SerializeAsString();
// 获取文件长度
FileHelper helper(filename);
size_t fsize = helper.size(); // 文件偏移量
size_t msg_size = body.size(); // 消息长度
bool ret = helper.write((char*)&msg_size, fsize, sizeof(size_t));
if(ret == false)
{
DLOG("向队列文件写入消息长度失败");
return false;
}
// 将数据写入文件对应位置
ret = helper.write(body.c_str(), fsize + sizeof(size_t), body.size());
if(ret == false)
{
DLOG("向队列文件写入消息数据失败");
return false;
}
// 更新msg中实际存储信息
msg->set_offset(fsize + sizeof(size_t));
msg->set_length(body.size());
return true;
}
bool load(std::list<MessagePtr>& result)
{
// 加载出文件中有效数据:按照数据格式:4字节消息长度|数据 读取数据,根据valid字段判断数据是否有效
FileHelper helper(_datafile);
size_t msg_size, offset = 0;
size_t fsize = helper.size();
DLOG("准备加载持久化数据,当前文件大小为:%ld", helper.size());
bool ret;
while(offset < fsize)
{
// ret = helper.read((char*)&msg_size, fsize, sizeof(size_t));--------------错了!!!!!!!
ret = helper.read((char*)&msg_size, offset, sizeof(size_t));
if(ret == false)
{
DLOG("获取消息长度失败");
return false;
}
offset += sizeof(size_t);
std::string msg_body(msg_size, '\0');
ret = helper.read(&msg_body[0], offset, msg_size);
if(ret == false)
{
DLOG("读取消息数据失败");
return false;
}
offset += msg_size;
MessagePtr pmsg = std::make_shared<Message>();
pmsg->mutable_payload()->ParseFromString(msg_body);
if(pmsg->payload().valid() == "0")
{
DLOG("加载到无效消息: %s", pmsg->payload().body().c_str());
continue;
}
result.push_back(pmsg);
}
return true;
}
private:
std::string _qname;
std::string _datafile;
std::string _tmpfile;
};
- 队列消息管理数据
- 队列名称
- 待推送消息链表
- 持久化消息hash
- 待确认消息hash
- 有效消息数量
- 已经持久化消息总量
- 持久化管理句柄
- 队列管理操作
- 新增消息
- 获取队⾸消息(获取的同时将消息加⼊待确认队列)
- 移除指定待确认消息
- 获取队列待消费&待确认消息数量
- 恢复队列历史消息。
- 销毁队列所有消息
- 判断队列消息是否为空
- 消息的总体对外管理
- 初始化新建队列的消息管理结构,并创建消息存储⽂件
- 删除队列的消息管理结构,以及消息存储⽂件
- 向指定队列新增消息
- 获取指定队列队⾸消息
- 确认指定队列待确认消息(删除)
- 判断指定队列消息是否为空
class QueueMessage
{
public:
using ptr = std::shared_ptr<QueueMessage>;
QueueMessage(std::string& basedir, const std::string& qname): _mapper(basedir, qname),
_qname(qname), _valid_count(0), _total_count(0)
{}
bool recovery()
{
std::unique_lock<std::mutex> lock(_mutex);
_msgs = _mapper.gc(); // 恢复历史消息数据
for(auto& msg : _msgs)
{
_durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
}
_total_count = _valid_count = _msgs.size();
return true;
}
bool insert(const BasicProperties* bp, const std::string& body, bool queue_is_durable)
{
// 构造消息对象
MessagePtr msg = std::make_shared<Message>();
msg->mutable_payload()->set_body(body);
if(bp != nullptr) // 传入的消息属性不为空
{
DeliveryMode mode = queue_is_durable ? bp->delivery_mode() : DeliveryMode::UNDURABLE;
msg->mutable_payload()->mutable_properties()->set_id(bp->id());
msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);
msg->mutable_payload()->mutable_properties()->set_routing_key(bp->routing_key());
}
else
{
DeliveryMode mode = queue_is_durable ? DeliveryMode::DURABLE : DeliveryMode::UNDURABLE;
msg->mutable_payload()->mutable_properties()->set_id(UUIDHelper::uuid());
msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);
msg->mutable_payload()->mutable_properties()->set_routing_key("");
}
std::unique_lock<std::mutex> lock(_mutex);
// 判断消息是否需要持久化
if(msg->payload().properties().delivery_mode() == wsmq::DeliveryMode::DURABLE)
{
// 进行持久化
msg->mutable_payload()->set_valid("1");
bool ret = _mapper.insert(msg);
if(ret == false)
{
DLOG("持久化消息失败");
return false;
}
_valid_count += 1;
_total_count += 1;
_durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
}
// 在内存中的管理
_msgs.push_back(msg);
return true;
}
MessagePtr front()
{
std::unique_lock<std::mutex> lock(_mutex);
if (_msgs.size() == 0) {
return MessagePtr();
}
// 获取队首消息:从_msgs中取出数据
MessagePtr msg = _msgs.front();
_msgs.pop_front();
// 将消息添加到待确认消息中,一旦确认,就可以删除
_waitack_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
return msg;
}
bool remove(const std::string& msg_id) // 每次删除消息后,根据情况判断是否需要垃圾回收
{
std::unique_lock<std::mutex> lock(_mutex);
// 从待确认消息中查找到要删除的消息
auto it = _waitack_msgs.find(msg_id);
if(it == _waitack_msgs.end())
{
return true;
}
// 根据消息的持久化模式判断是否进行过持久化
if(it->second->payload().properties().delivery_mode() == DeliveryMode::DURABLE)
{
// 删除持久化消息
_mapper.remove(it->second);
_durable_msgs.erase(msg_id);
_valid_count -= 1;
gc();
}
// 删除内存中数据
_waitack_msgs.erase(msg_id);
return true;
}
size_t getable_count() // 待推送消息数量
{
std::unique_lock<std::mutex> lock(_mutex);
return _msgs.size();
}
size_t total_count() // 持久化消息历史总数
{
std::unique_lock<std::mutex> lock(_mutex);
return _total_count;
}
size_t durable_count() // 当前有效持久化消息数量
{
std::unique_lock<std::mutex> lock(_mutex);
return _durable_msgs.size();
}
size_t waitack_count() // 待确认消息数量
{
std::unique_lock<std::mutex> lock(_mutex);
return _waitack_msgs.size();
}
void clear()
{
std::unique_lock<std::mutex> lock(_mutex);
_valid_count = _total_count = 0;
_mapper.removeMsgFile();
_msgs.clear();
_durable_msgs.clear();
_waitack_msgs.clear();
}
private:
bool GCCheck()
{
// 消息总数且有效消息比例小于50%则进行垃圾回收
if(_total_count > 2000 && _valid_count * 1.0 / _total_count <= 0.5)
{
return true;
}
return false;
}
void gc()
{
// 进行垃圾回收,获取垃圾回收后的有效消息链表
if(GCCheck() == false) return ;
std::list<MessagePtr> msgs = _mapper.gc();
for(auto& msg : msgs)
{
auto it = _durable_msgs.find(msg->payload().properties().id());
if(it == _durable_msgs.end())
{
DLOG("垃圾回收后,发现未管理的有效持久化消息");
_msgs.push_back(it->second);
_durable_msgs.insert(std::make_pair(msg->payload().properties().id(), it->second));
continue;
}
// 更新每条消息的实际存储位置
it->second->set_offset(msg->offset());
it->second->set_length(msg->length());
}
// 更新当前有效数据条数 && 总消息条数
_total_count = _valid_count = msgs.size();
}
private:
std::mutex _mutex;
std::string _qname;
size_t _valid_count;
size_t _total_count;
MessageMapper _mapper;
std::list<MessagePtr> _msgs; // 待推送消息
std::unordered_map<std::string, MessagePtr> _durable_msgs;
std::unordered_map<std::string, MessagePtr> _waitack_msgs;
};
class MessageManager
{
public:
using ptr = std::shared_ptr<MessageManager>;
MessageManager(const std::string& basedir): _basedir(basedir)
{}
void clear()
{
for(auto msg : _queue_msgs)
{
msg.second->clear();
}
}
void initQueueMessage(const std::string& qname)
{
QueueMessage::ptr qmp;
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _queue_msgs.find(qname);
if (it != _queue_msgs.end())
{
return ;
}
qmp = std::make_shared<QueueMessage>(_basedir, qname);
_queue_msgs.insert(std::make_pair(qname, qmp));
}
qmp->recovery(); // 此操作本身是线程安全的(已经加锁保护了),不需要放在临界区
}
void destroyQueueMessage(const std::string& qname)
{
QueueMessage::ptr qmp;
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _queue_msgs.find(qname);
if (it == _queue_msgs.end())
{
return ;
}
qmp = it->second;
_queue_msgs.erase(it);
}
qmp->clear(); // 本身就是被加锁保护的,不需要外部再加锁了,否则降低效率
}
MessagePtr front(const std::string& qname)
{
QueueMessage::ptr qmp = getQueueMessageHandle(qname);
if(qmp == nullptr)
{
DLOG("获取队首消息失败");
return nullptr;
}
return qmp->front();
}
void ack(const std::string& qname, const std::string& msg_id)
{
QueueMessage::ptr qmp = getQueueMessageHandle(qname);
if(qmp == nullptr)
{
DLOG("确认队列消息 %s 失败", msg_id.c_str());
return ;
}
qmp->remove(msg_id);
return ;
}
bool insert(const std::string& qname, const BasicProperties* bp,
const std::string& body, bool queue_is_durable)
{
QueueMessage::ptr qmp = getQueueMessageHandle(qname);
if(qmp == nullptr)
{
DLOG("向队列 %s 插入消息失败", qname.c_str());
return false;
}
return qmp->insert(bp, body, queue_is_durable);
}
size_t getable_count(const std::string& qname)
{
QueueMessage::ptr qmp = getQueueMessageHandle(qname);
if(qmp == nullptr)
{
DLOG("获取队列 %s 待推送消息数量失败", qname.c_str());
return 0;
}
return qmp->getable_count();
}
size_t total_count(const std::string& qname)
{
QueueMessage::ptr qmp = getQueueMessageHandle(qname);
if(qmp == nullptr)
{
DLOG("获取队列 %s 总体消息数量失败", qname.c_str());
return 0;
}
return qmp->total_count();
}
size_t durable_count(const std::string& qname)
{
QueueMessage::ptr qmp = getQueueMessageHandle(qname);
if(qmp == nullptr)
{
DLOG("获取队列 %s 持久化消息数量失败", qname.c_str());
return 0;
}
return qmp->durable_count();
}
size_t waitack_count(const std::string& qname)
{
QueueMessage::ptr qmp = getQueueMessageHandle(qname);
if(qmp == nullptr)
{
DLOG("获取队列 %s 待确认消息数量失败", qname.c_str());
return 0;
}
return qmp->waitack_count();
}
private:
// 由于需要频繁查找队列名对应的操作句柄,所以封装成函数
QueueMessage::ptr getQueueMessageHandle(const std::string& qname)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _queue_msgs.find(qname);
if(it == _queue_msgs.end())
{
DLOG("未找到队列 %s 操作句柄", qname.c_str());
return QueueMessage::ptr();
}
return it->second;
}
private:
std::mutex _mutex;
std::string _basedir;
std::unordered_map<std::string, QueueMessage::ptr> _queue_msgs;
};
设计总结:
场景 | 推荐方案 |
---|---|
小规模元数据 | 用SQLite管理交换机、队列、绑定的定义信息(如名称、路由键) |
消息内容存储 | 每个队列独立文件存储,内存中维护消息位置索引(如std::map<offset, Message> ) |
高频写入队列 | 使用内存缓存批量提交(如积累10条消息后批量写入文件) |
崩溃恢复 | 启动时按队列逐个扫描文件,重建内存索引 |
虚拟机管理模块
持续更新中......