C++ - 仿 RabbitMQ 实现消息队列(2)(Protobuf 和 Muduo 初识)
C++ - 仿 RabbitMQ 实现消息队列(2)(Protobuf 和 Muduo 初识)
- Protobuf
- 1. 序列化/反序列化方法(最核心)
- `_InternalSerialize()`
- `_InternalParse()`
- 2. 内存管理方法
- `SharedCtor()`/`SharedDtor()`
- `InternalSwap()`
- 3. 字段访问方法
- 生成的 getter/setter
- 4. 工具方法
- `ByteSizeLong()`
- `IsInitialized()`
- 关键设计特点:
- 实际使用中最常用的方法:
- muduo
- Reactor模式
- 1. Reactor 是什么?
- 2. Reactor 工作流程
- 3. 与传统阻塞模型的对比
- 4. Reactor 在 Muduo 中的体现
- 5. 为什么用 Reactor?
- 6. 通俗理解案例
- 📌 编译命令建议(请确保 muduo 已正确安装):
- ✅ 测试方法:
我们之前把仿 RabbitMQ 实现消息队列的环境搭建好了,我们今天主要是来使用一下protobuf和Muduo库做一个简单的应用,为之后的项目做准备。
如果之前环境还没有装好的小伙伴可以点击这里:
https://blog.csdn.net/qq_67693066/article/details/147733266
Protobuf
首先,我们查看一下我们有没有装protobuf:
// 定义protobuf的版本
syntax = "proto3";
package chat;//定义消息体
message ChatMessage{string sender = 1; //发送方string text = 2; //文本内容int64 timestamp = 3; //时间戳
}//消息请求
message ChatRequest{enum Type{SEND = 0;QUERY = 1;}Type type = 1;ChatMessage message = 2;
}//消息回应
message ChatResponse{bool success = 1;string info = 2; //回应信息repeated ChatMessage messages = 3;
}
然后执行protoc --cpp_out=. chat.proto,然后就会多出两个文件:
这段代码是使用 Protocol Buffers (protobuf) 定义的一组消息格式,具体来说:
-
syntax = "proto3";
:指定使用 proto3 版本的语法。Proto3 相比于 Proto2 简化了很多特性,并且支持更多语言。 -
package chat;
:定义了一个包声明chat
,这有助于防止不同项目之间的命名冲突。
接下来是三个消息类型定义:
-
message ChatMessage
:定义了一个名为ChatMessage
的消息类型,表示一条聊天消息。它包含了以下字段:string sender = 1;
:发送者的名称。string text = 2;
:消息内容。int64 timestamp = 3;
:消息的时间戳,使用 Unix 时间格式(从 1970 年 1 月 1 日开始计算的秒数)。
-
message ChatRequest
:定义了一个名为ChatRequest
的消息类型,用于请求聊天操作。它包括:enum Type
:一个枚举类型Type
,包含两种可能的值SEND = 0;
和QUERY = 1;
,分别代表发送消息和查询消息的请求类型。Type type = 1;
:请求的类型,使用上述Type
枚举中的一个值。ChatMessage message = 2;
:如果这是一个发送消息的请求,这里会包含一条ChatMessage
。
-
message ChatResponse
:定义了一个名为ChatResponse
的消息类型,用于响应聊天请求。它包括:bool success = 1;
:表示请求是否成功处理。string info = 2;
:提供额外的信息或错误描述。repeated ChatMessage messages = 3;
:一个ChatMessage
类型的列表,用于返回多条消息(例如查询请求的结果)。
这些定义可以被编译成多种编程语言(如 C++, Java, Python等),以便在不同的系统或服务之间传输结构化的数据。
我们看看里面的核心方法有哪些:
从您提供的 Protobuf 生成的 C++ 代码中,最核心的方法可以分为以下几类:
1. 序列化/反序列化方法(最核心)
这些方法是 Protobuf 自动生成的核心功能,用于消息的二进制编码和解码:
_InternalSerialize()
uint8_t* ChatMessage::_InternalSerialize(uint8_t* target, ::PROTOBUF_NAMESPACE_ID::io::EpsCopyOutputStream* stream) const
- 作用:将消息对象序列化为二进制格式
- 调用链:被
SerializeToArray()
/SerializeToString()
调用 - 重要性:所有网络传输和持久化存储的基础
_InternalParse()
const char* ChatMessage::_InternalParse(const char* ptr, ::_pbi::ParseContext* ctx)
- 作用:从二进制数据反序列化为消息对象
- 调用链:被
ParseFromArray()
/ParseFromString()
调用 - 重要性:接收数据时的关键解析方法
2. 内存管理方法
SharedCtor()
/SharedDtor()
void ChatMessage::SharedCtor(::_pb::Arena* arena, bool is_message_owned)
void ChatMessage::SharedDtor()
- 作用:构造/析构时的内存管理
- 特点:处理字符串字段的内存分配和释放
InternalSwap()
void ChatMessage::InternalSwap(ChatMessage* other)
- 作用:高效交换两个对象的内容
- 使用场景:被赋值运算符和移动语义操作调用
3. 字段访问方法
生成的 getter/setter
如:
// string sender = 1;
inline const std::string& sender() const;
inline void set_sender(const std::string& value);
- 作用:提供类型安全的字段访问接口
4. 工具方法
ByteSizeLong()
size_t ChatMessage::ByteSizeLong() const
- 作用:计算消息序列化后的字节大小
- 用途:预先分配缓冲区大小
IsInitialized()
bool ChatMessage::IsInitialized() const
- 作用:检查所有 required 字段是否已设置(在 proto3 中主要用作占位符)
关键设计特点:
- 零拷贝优化:通过
ArenaStringPtr
管理字符串字段,减少内存拷贝 - 延迟解析:Parse 方法支持按需解析字段
- 类型安全:强类型的 getter/setter 方法
- 二进制紧凑:采用 Tag-Length-Value 编码格式
实际使用中最常用的方法:
// 序列化
message.SerializeToString(&output_string);// 反序列化
message.ParseFromString(input_string);// 字段访问
message.set_sender("Alice");
const std::string& sender = message.sender();
这些生成的代码构成了 Protobuf 高效序列化能力的核心,而 Muduo 网络库则会利用这些方法来实现网络消息的收发和处理。
muduo
Muduo 由陈硕大佬开发,是一个基于非阻塞 IO 和事件驱动的 C++高并发 TCP 网络编
程库。 它是一款基于主从 Reactor 模型的网络库,其使用的线程模型是 one loop per
thread, 所谓 one loop per thread 指的是:
- 一个线程只能有一个事件循环(EventLoop), 用于响应计时器和 IO 事件
- 一个文件描述符只能由一个线程进行读写,换句话说就是一个 TCP 连接必须归属
于某个 EventLoop 管理
Reactor模式
Reactor 模型是事件驱动的网络编程模型,它的核心思想是**“当事件发生时,才进行处理”**,而不是阻塞等待。Muduo 网络库正是基于 Reactor 模式设计的。我用最通俗的方式解释它的原理和工作流程:
1. Reactor 是什么?
-
类比:就像餐厅的服务员
- 传统阻塞模型:一个服务员全程服务一桌客人(上菜、倒水、结账都卡住)
- Reactor 模型:一个服务员监听所有桌子的需求(有需求才过去处理)
-
核心三要素:
1. Event Loop(事件循环):不断检查是否有事件发生 2. Demultiplexer(事件分发器):监听哪些socket有事件(如 select/epoll) 3. Event Handler(事件处理器):处理具体事件(如 onMessage)
2. Reactor 工作流程
以 Muduo 的 TCP 服务器为例:
┌───────────────────────────────────────────────────┐│ Event Loop ││ ┌──────────────────────────────┐ ││ │ epoll_wait (监听所有socket) │◀──新连接/数据──┤│ └──────────────┬───────────────┘ ││ 检测到事件│ ││ ┌──────────────▼───────────────┐ ││ │ 根据事件类型分发到对应处理器 │ ││ └──────────────┬───────────────┘ ││ │ ││ ┌──────▼──────┐ ┌─────────────┐ ┌─────────┐ ││ │ 新连接处理器 │ │ 数据处理器 │ │ 错误处理器│ ││ │ (onConnection)│ │ (onMessage) │ │ (onError) │ ││ └─────────────┘ └─────────────┘ └─────────┘ │└───────────────────────────────────────────────────┘
3. 与传统阻塞模型的对比
特性 | Reactor 模型 | 阻塞模型 |
---|---|---|
线程使用 | 单线程处理多连接(IO多路复用) | 每个连接一个线程 |
资源消耗 | 低(1个线程管理成千上万个连接) | 高(线程数=连接数) |
响应方式 | 事件触发(有数据才唤醒) | 阻塞等待(没数据也占着线程) |
典型实现 | Muduo、Nginx、Redis | 早期Java BIO |
4. Reactor 在 Muduo 中的体现
// 1. 创建事件循环(Reactor核心)
EventLoop loop;// 2. 创建服务器并注册事件处理器
TcpServer server(&loop, listenAddr, "EchoServer");
server.setConnectionCallback(onConnection); // 新连接事件处理器
server.setMessageCallback(onMessage); // 数据到达事件处理器// 3. 启动事件循环(开始监听事件)
loop.loop(); // 内部调用 epoll_wait
- 当事件发生时:
epoll_wait
返回活跃的 socket- EventLoop 根据事件类型(新连接/数据到达)调用对应的
Callback
- 你的
onMessage
等函数被触发执行
5. 为什么用 Reactor?
- 高性能:单线程可处理数万并发连接(如 Nginx)
- 低延迟:没有线程切换开销
- 适合场景:
- 短连接服务(HTTP)
- 高并发低延迟(即时通讯)
- 需要精细控制IO的场合(代理服务器)
6. 通俗理解案例
假设你经营一家奶茶店:
- 传统模式:每个顾客配一个店员(线程),店员必须等顾客做完决定才能服务下一个
- Reactor模式:
- 1个前台(EventLoop)监听所有顾客(socket)
- 顾客举手(事件发生)时,前台派空闲店员(Callback)去处理
- 没顾客举手时,前台可以喝茶休息(
epoll_wait
休眠)
我们来看一个简单的例子:
#include <muduo/net/TcpServer.h> // TcpServer 类,用于创建 TCP 服务器
#include <muduo/net/EventLoop.h> // EventLoop 事件循环类,负责 I/O 多路复用和事件处理
#include <muduo/base/Logging.h> // 日志系统,用于输出调试、信息、错误等日志using namespace muduo;
using namespace muduo::net;// 定义消息回调函数:当客户端发送数据时,该函数会被调用
// 参数说明:
// - conn: 表示当前连接对象的智能指针
// - buf: 接收到的数据缓冲区(muduo 的 Buffer 类型)
// - time: 接收数据的时间戳(本例中未使用)
void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)
{// 将接收的数据转换为字符串std::string msg = buf->retrieveAllAsString(); // 提取全部数据并清空缓冲区// 在终端打印接收到的消息内容printf("[Server] Received: %s\n", msg.c_str());// 将接收到的数据原样返回给客户端conn->send(buf); // send 函数接受的是 Buffer* 类型,所以可以直接传入 buf
}int main()
{// 设置日志输出级别为 DEBUG,这样会显示更多的调试信息// 默认情况下,muduo 可能只输出 INFO 和以上级别的日志Logger::setLogLevel(Logger::DEBUG);// 创建一个 EventLoop 对象,它是整个 muduo 网络模型的核心// 所有的网络事件(如 accept、read、write)都会在这个 loop 中处理EventLoop loop; // 核心事件循环// 创建 TcpServer 实例// 参数说明:// - loop: 使用哪个 EventLoop 来驱动服务器// - InetAddress(8888): 监听本地所有 IP 地址(0.0.0.0)上的 8888 端口// - "SimpleServer": 服务器名称,用于日志输出中的标识TcpServer server(&loop, InetAddress(8888), "SimpleServer");// 设置消息回调函数// 当服务器从客户端读取到数据时,就会调用这个 onMessage 函数server.setMessageCallback(onMessage);// 启动服务器// 这个函数不会立即开始监听端口,而是在下一次进入 EventLoop 后执行初始化操作server.start();// 启动事件循环// 进入主事件循环后,程序将一直运行,等待客户端连接和数据的到来loop.loop();return 0;
}
📌 编译命令建议(请确保 muduo 已正确安装):
g++ -std=c++11 echo_server.cpp -lmuduo_net -lmuduo_base -lpthread -o echo_server
✅ 测试方法:
-
启动服务端:
./echo_server
-
打开另一个终端,使用 telnet 或 nc 测试:
telnet 127.0.0.1 8888
或者:
nc localhost 8888
-
在客户端输入任意文本,例如:
Hello Muduo!
-
你会看到服务端打印类似信息:
[Server] Received: Hello Muduo!
-
同时客户端也会收到同样的回显消息。
这样我们可以利用muduo库编写一个简单的程序,客户端连接到服务端之后,服务端回显客户端发送的数据:
客户端代码:
client.h
#ifndef CHAT_CLIENT_H
#define CHAT_CLIENT_H#include <muduo/net/TcpClient.h>
#include <functional>class ChatClient {
public:using ConnectionCallback = std::function<void()>;ChatClient(muduo::net::EventLoop* loop, const muduo::net::InetAddress& serverAddr);void connect();void sendMessage(const std::string& text);bool isConnected() const;private:void onConnection(const muduo::net::TcpConnectionPtr& conn);void onMessage(const muduo::net::TcpConnectionPtr& conn,muduo::net::Buffer* buf,muduo::Timestamp time);muduo::net::TcpClient client_;muduo::net::TcpConnectionPtr conn_;
};#endif
client.cpp
#include "client.h"
#include <muduo/base/Logging.h>
#include <iostream>using namespace muduo;
using namespace muduo::net;ChatClient::ChatClient(EventLoop* loop, const InetAddress& serverAddr): client_(loop, serverAddr, "ChatClient") {client_.setConnectionCallback(std::bind(&ChatClient::onConnection, this, _1));client_.setMessageCallback(std::bind(&ChatClient::onMessage, this, _1, _2, _3));
}void ChatClient::connect() {client_.connect();
}bool ChatClient::isConnected() const {return conn_ && conn_->connected();
}void ChatClient::sendMessage(const std::string& text) {if (!isConnected()) {std::cout << "Not connected to server" << std::endl;return;}conn_->send(text);
}void ChatClient::onConnection(const TcpConnectionPtr& conn) {if (conn->connected()) {conn_ = conn;LOG_INFO << "Connected to server";} else {conn_.reset();LOG_INFO << "Disconnected from server";}
}void ChatClient::onMessage(const TcpConnectionPtr& conn,Buffer* buf,Timestamp time) {std::string msg(buf->retrieveAllAsString());std::cout << "Server reply: " << msg << std::endl;
}
主函数:
client_main.cpp
#include "../client/client.h"
#include <muduo/net/EventLoop.h>
#include <iostream>
#include <thread>
#include <atomic>std::atomic<bool> g_running(true);void handleInput(ChatClient& client) {std::string line;while (g_running.load() && std::getline(std::cin, line)) {if (line == "quit") {g_running.store(false);break;}client.sendMessage(line);}
}int main(int argc, char* argv[]) {if (argc < 3) {std::cerr << "Usage: " << argv[0] << " <server_ip> <server_port>" << std::endl;return 1;}muduo::net::EventLoop loop;muduo::net::InetAddress serverAddr(argv[1], std::stoi(argv[2]));ChatClient client(&loop, serverAddr);std::thread inputThread(handleInput, std::ref(client));client.connect();loop.loop();g_running.store(false);if (inputThread.joinable()) {inputThread.join();}return 0;
}
服务端代码:
server.h
#ifndef CHAT_SERVER_H
#define CHAT_SERVER_H#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/base/Logging.h>
#include <unordered_set>
#include "chat.pb.h"class ChatServer {
public:ChatServer(muduo::net::EventLoop* loop,const muduo::net::InetAddress& listenAddr);void start();private:void onConnection(const muduo::net::TcpConnectionPtr& conn);void onMessage(const muduo::net::TcpConnectionPtr& conn,muduo::net::Buffer* buf,muduo::Timestamp time);void handleRequest(const muduo::net::TcpConnectionPtr& conn,const chat::ChatRequest& request);muduo::net::TcpServer server_;std::unordered_set<muduo::net::TcpConnectionPtr> connections_;std::vector<chat::ChatMessage> chatHistory_;
};#endif // CHAT_SERVER_H
server.cpp
#include "server.h"
#include "chat.pb.h"
#include <muduo/base/Logging.h>using namespace muduo;
using namespace muduo::net;ChatServer::ChatServer(EventLoop* loop, const InetAddress& listenAddr): server_(loop, listenAddr, "ChatServer") {server_.setMessageCallback(std::bind(&ChatServer::onMessage, this, _1, _2, _3));
}void ChatServer::start() {server_.start();
}void ChatServer::onMessage(const TcpConnectionPtr& conn,Buffer* buf,Timestamp time) {// 获取并打印完整消息内容std::string msg = buf->retrieveAllAsString();printf("收到客户端消息[长度:%lu]: %s\n", msg.size(), msg.c_str());// 简单回复确认(包含原始消息)std::string reply = "Server收到消息: " + msg;conn->send(reply);
}
主函数:
server_main.cpp
#include "../server/server.h"
#include <muduo/net/EventLoop.h>int main() {muduo::net::EventLoop loop;muduo::net::InetAddress listenAddr(8888);ChatServer server(&loop, listenAddr);server.start();loop.loop();return 0;
}
效果如下: