当前位置: 首页 > news >正文

C++ - 仿 RabbitMQ 实现消息队列(2)(Protobuf 和 Muduo 初识)

C++ - 仿 RabbitMQ 实现消息队列(2)(Protobuf 和 Muduo 初识)

  • Protobuf
      • 1. 序列化/反序列化方法(最核心)
        • `_InternalSerialize()`
        • `_InternalParse()`
      • 2. 内存管理方法
        • `SharedCtor()`/`SharedDtor()`
        • `InternalSwap()`
      • 3. 字段访问方法
        • 生成的 getter/setter
      • 4. 工具方法
        • `ByteSizeLong()`
        • `IsInitialized()`
      • 关键设计特点:
      • 实际使用中最常用的方法:
  • muduo
    • Reactor模式
      • 1. Reactor 是什么?
      • 2. Reactor 工作流程
      • 3. 与传统阻塞模型的对比
      • 4. Reactor 在 Muduo 中的体现
      • 5. 为什么用 Reactor?
      • 6. 通俗理解案例
      • 📌 编译命令建议(请确保 muduo 已正确安装):
      • ✅ 测试方法:

我们之前把仿 RabbitMQ 实现消息队列的环境搭建好了,我们今天主要是来使用一下protobuf和Muduo库做一个简单的应用,为之后的项目做准备。

如果之前环境还没有装好的小伙伴可以点击这里:

https://blog.csdn.net/qq_67693066/article/details/147733266

Protobuf

首先,我们查看一下我们有没有装protobuf:
在这里插入图片描述

// 定义protobuf的版本
syntax = "proto3";
package chat;//定义消息体
message ChatMessage{string sender = 1; //发送方string text = 2; //文本内容int64 timestamp = 3; //时间戳
}//消息请求
message ChatRequest{enum Type{SEND = 0;QUERY = 1;}Type type = 1;ChatMessage message = 2;
}//消息回应
message ChatResponse{bool success = 1;string info = 2; //回应信息repeated ChatMessage messages = 3;
}

然后执行protoc --cpp_out=. chat.proto,然后就会多出两个文件:
在这里插入图片描述

这段代码是使用 Protocol Buffers (protobuf) 定义的一组消息格式,具体来说:

  • syntax = "proto3";:指定使用 proto3 版本的语法。Proto3 相比于 Proto2 简化了很多特性,并且支持更多语言。

  • package chat;:定义了一个包声明 chat,这有助于防止不同项目之间的命名冲突。

接下来是三个消息类型定义:

  1. message ChatMessage:定义了一个名为 ChatMessage 的消息类型,表示一条聊天消息。它包含了以下字段:

    • string sender = 1;:发送者的名称。
    • string text = 2;:消息内容。
    • int64 timestamp = 3;:消息的时间戳,使用 Unix 时间格式(从 1970 年 1 月 1 日开始计算的秒数)。
  2. message ChatRequest:定义了一个名为 ChatRequest 的消息类型,用于请求聊天操作。它包括:

    • enum Type:一个枚举类型 Type,包含两种可能的值 SEND = 0;QUERY = 1;,分别代表发送消息和查询消息的请求类型。
    • Type type = 1;:请求的类型,使用上述 Type 枚举中的一个值。
    • ChatMessage message = 2;:如果这是一个发送消息的请求,这里会包含一条 ChatMessage
  3. message ChatResponse:定义了一个名为 ChatResponse 的消息类型,用于响应聊天请求。它包括:

    • bool success = 1;:表示请求是否成功处理。
    • string info = 2;:提供额外的信息或错误描述。
    • repeated ChatMessage messages = 3;:一个 ChatMessage 类型的列表,用于返回多条消息(例如查询请求的结果)。

这些定义可以被编译成多种编程语言(如 C++, Java, Python等),以便在不同的系统或服务之间传输结构化的数据。

我们看看里面的核心方法有哪些:
从您提供的 Protobuf 生成的 C++ 代码中,最核心的方法可以分为以下几类:


1. 序列化/反序列化方法(最核心)

这些方法是 Protobuf 自动生成的核心功能,用于消息的二进制编码和解码:

_InternalSerialize()
uint8_t* ChatMessage::_InternalSerialize(uint8_t* target, ::PROTOBUF_NAMESPACE_ID::io::EpsCopyOutputStream* stream) const
  • 作用:将消息对象序列化为二进制格式
  • 调用链:被 SerializeToArray()/SerializeToString() 调用
  • 重要性:所有网络传输和持久化存储的基础
_InternalParse()
const char* ChatMessage::_InternalParse(const char* ptr, ::_pbi::ParseContext* ctx)
  • 作用:从二进制数据反序列化为消息对象
  • 调用链:被 ParseFromArray()/ParseFromString() 调用
  • 重要性:接收数据时的关键解析方法

2. 内存管理方法

SharedCtor()/SharedDtor()
void ChatMessage::SharedCtor(::_pb::Arena* arena, bool is_message_owned)
void ChatMessage::SharedDtor()
  • 作用:构造/析构时的内存管理
  • 特点:处理字符串字段的内存分配和释放
InternalSwap()
void ChatMessage::InternalSwap(ChatMessage* other)
  • 作用:高效交换两个对象的内容
  • 使用场景:被赋值运算符和移动语义操作调用

3. 字段访问方法

生成的 getter/setter

如:

// string sender = 1;
inline const std::string& sender() const;
inline void set_sender(const std::string& value);
  • 作用:提供类型安全的字段访问接口

4. 工具方法

ByteSizeLong()
size_t ChatMessage::ByteSizeLong() const
  • 作用:计算消息序列化后的字节大小
  • 用途:预先分配缓冲区大小
IsInitialized()
bool ChatMessage::IsInitialized() const
  • 作用:检查所有 required 字段是否已设置(在 proto3 中主要用作占位符)

关键设计特点:

  1. 零拷贝优化:通过 ArenaStringPtr 管理字符串字段,减少内存拷贝
  2. 延迟解析:Parse 方法支持按需解析字段
  3. 类型安全:强类型的 getter/setter 方法
  4. 二进制紧凑:采用 Tag-Length-Value 编码格式

实际使用中最常用的方法:

// 序列化
message.SerializeToString(&output_string);// 反序列化
message.ParseFromString(input_string);// 字段访问
message.set_sender("Alice");
const std::string& sender = message.sender();

这些生成的代码构成了 Protobuf 高效序列化能力的核心,而 Muduo 网络库则会利用这些方法来实现网络消息的收发和处理。

muduo

Muduo 由陈硕大佬开发,是一个基于非阻塞 IO 和事件驱动的 C++高并发 TCP 网络编
程库。 它是一款基于主从 Reactor 模型的网络库,其使用的线程模型是 one loop per
thread, 所谓 one loop per thread 指的是:

  • 一个线程只能有一个事件循环(EventLoop), 用于响应计时器和 IO 事件
  • 一个文件描述符只能由一个线程进行读写,换句话说就是一个 TCP 连接必须归属
    于某个 EventLoop 管理

在这里插入图片描述

Reactor模式

Reactor 模型是事件驱动的网络编程模型,它的核心思想是**“当事件发生时,才进行处理”**,而不是阻塞等待。Muduo 网络库正是基于 Reactor 模式设计的。我用最通俗的方式解释它的原理和工作流程:


1. Reactor 是什么?

  • 类比:就像餐厅的服务员

    • 传统阻塞模型:一个服务员全程服务一桌客人(上菜、倒水、结账都卡住)
    • Reactor 模型:一个服务员监听所有桌子的需求(有需求才过去处理)
  • 核心三要素

    1. Event Loop(事件循环):不断检查是否有事件发生
    2. Demultiplexer(事件分发器):监听哪些socket有事件(如 select/epoll)
    3. Event Handler(事件处理器):处理具体事件(如 onMessage)
    

2. Reactor 工作流程

以 Muduo 的 TCP 服务器为例:

  ┌───────────────────────────────────────────────────┐│  Event Loop                                      ││   ┌──────────────────────────────┐               ││   │  epoll_wait (监听所有socket) │◀──新连接/数据──┤│   └──────────────┬───────────────┘               ││         检测到事件│                              ││   ┌──────────────▼───────────────┐               ││   │  根据事件类型分发到对应处理器  │               ││   └──────────────┬───────────────┘               ││          │                                       ││   ┌──────▼──────┐  ┌─────────────┐  ┌─────────┐ ││   │ 新连接处理器 │  │ 数据处理器  │  │ 错误处理器│ ││   │ (onConnection)│  │ (onMessage) │  │ (onError) │ ││   └─────────────┘  └─────────────┘  └─────────┘ │└───────────────────────────────────────────────────┘

3. 与传统阻塞模型的对比

特性Reactor 模型阻塞模型
线程使用单线程处理多连接(IO多路复用)每个连接一个线程
资源消耗低(1个线程管理成千上万个连接)高(线程数=连接数)
响应方式事件触发(有数据才唤醒)阻塞等待(没数据也占着线程)
典型实现Muduo、Nginx、Redis早期Java BIO

4. Reactor 在 Muduo 中的体现

// 1. 创建事件循环(Reactor核心)
EventLoop loop;// 2. 创建服务器并注册事件处理器
TcpServer server(&loop, listenAddr, "EchoServer");
server.setConnectionCallback(onConnection);  // 新连接事件处理器
server.setMessageCallback(onMessage);        // 数据到达事件处理器// 3. 启动事件循环(开始监听事件)
loop.loop();  // 内部调用 epoll_wait
  • 当事件发生时
    1. epoll_wait 返回活跃的 socket
    2. EventLoop 根据事件类型(新连接/数据到达)调用对应的 Callback
    3. 你的 onMessage 等函数被触发执行

5. 为什么用 Reactor?

  • 高性能:单线程可处理数万并发连接(如 Nginx)
  • 低延迟:没有线程切换开销
  • 适合场景
    • 短连接服务(HTTP)
    • 高并发低延迟(即时通讯)
    • 需要精细控制IO的场合(代理服务器)

6. 通俗理解案例

假设你经营一家奶茶店:

  • 传统模式:每个顾客配一个店员(线程),店员必须等顾客做完决定才能服务下一个
  • Reactor模式
    • 1个前台(EventLoop)监听所有顾客(socket)
    • 顾客举手(事件发生)时,前台派空闲店员(Callback)去处理
    • 没顾客举手时,前台可以喝茶休息(epoll_wait 休眠)

我们来看一个简单的例子:

#include <muduo/net/TcpServer.h>    // TcpServer 类,用于创建 TCP 服务器
#include <muduo/net/EventLoop.h>     // EventLoop 事件循环类,负责 I/O 多路复用和事件处理
#include <muduo/base/Logging.h>      // 日志系统,用于输出调试、信息、错误等日志using namespace muduo;
using namespace muduo::net;// 定义消息回调函数:当客户端发送数据时,该函数会被调用
// 参数说明:
// - conn: 表示当前连接对象的智能指针
// - buf: 接收到的数据缓冲区(muduo 的 Buffer 类型)
// - time: 接收数据的时间戳(本例中未使用)
void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)
{// 将接收的数据转换为字符串std::string msg = buf->retrieveAllAsString();  // 提取全部数据并清空缓冲区// 在终端打印接收到的消息内容printf("[Server] Received: %s\n", msg.c_str());// 将接收到的数据原样返回给客户端conn->send(buf);  // send 函数接受的是 Buffer* 类型,所以可以直接传入 buf
}int main()
{// 设置日志输出级别为 DEBUG,这样会显示更多的调试信息// 默认情况下,muduo 可能只输出 INFO 和以上级别的日志Logger::setLogLevel(Logger::DEBUG);// 创建一个 EventLoop 对象,它是整个 muduo 网络模型的核心// 所有的网络事件(如 accept、read、write)都会在这个 loop 中处理EventLoop loop;  // 核心事件循环// 创建 TcpServer 实例// 参数说明:// - loop: 使用哪个 EventLoop 来驱动服务器// - InetAddress(8888): 监听本地所有 IP 地址(0.0.0.0)上的 8888 端口// - "SimpleServer": 服务器名称,用于日志输出中的标识TcpServer server(&loop, InetAddress(8888), "SimpleServer");// 设置消息回调函数// 当服务器从客户端读取到数据时,就会调用这个 onMessage 函数server.setMessageCallback(onMessage);// 启动服务器// 这个函数不会立即开始监听端口,而是在下一次进入 EventLoop 后执行初始化操作server.start();// 启动事件循环// 进入主事件循环后,程序将一直运行,等待客户端连接和数据的到来loop.loop();return 0;
}

📌 编译命令建议(请确保 muduo 已正确安装):

g++ -std=c++11 echo_server.cpp -lmuduo_net -lmuduo_base -lpthread -o echo_server

✅ 测试方法:

  1. 启动服务端:

    ./echo_server
    
  2. 打开另一个终端,使用 telnet 或 nc 测试:

    telnet 127.0.0.1 8888
    

    或者:

    nc localhost 8888
    
  3. 在客户端输入任意文本,例如:

    Hello Muduo!
    
  4. 你会看到服务端打印类似信息:

    [Server] Received: Hello Muduo!
    
  5. 同时客户端也会收到同样的回显消息。


这样我们可以利用muduo库编写一个简单的程序,客户端连接到服务端之后,服务端回显客户端发送的数据:

客户端代码:

client.h
#ifndef CHAT_CLIENT_H
#define CHAT_CLIENT_H#include <muduo/net/TcpClient.h>
#include <functional>class ChatClient {
public:using ConnectionCallback = std::function<void()>;ChatClient(muduo::net::EventLoop* loop, const muduo::net::InetAddress& serverAddr);void connect();void sendMessage(const std::string& text);bool isConnected() const;private:void onConnection(const muduo::net::TcpConnectionPtr& conn);void onMessage(const muduo::net::TcpConnectionPtr& conn,muduo::net::Buffer* buf,muduo::Timestamp time);muduo::net::TcpClient client_;muduo::net::TcpConnectionPtr conn_;
};#endif
client.cpp
#include "client.h"
#include <muduo/base/Logging.h>
#include <iostream>using namespace muduo;
using namespace muduo::net;ChatClient::ChatClient(EventLoop* loop, const InetAddress& serverAddr): client_(loop, serverAddr, "ChatClient") {client_.setConnectionCallback(std::bind(&ChatClient::onConnection, this, _1));client_.setMessageCallback(std::bind(&ChatClient::onMessage, this, _1, _2, _3));
}void ChatClient::connect() {client_.connect();
}bool ChatClient::isConnected() const {return conn_ && conn_->connected();
}void ChatClient::sendMessage(const std::string& text) {if (!isConnected()) {std::cout << "Not connected to server" << std::endl;return;}conn_->send(text);
}void ChatClient::onConnection(const TcpConnectionPtr& conn) {if (conn->connected()) {conn_ = conn;LOG_INFO << "Connected to server";} else {conn_.reset();LOG_INFO << "Disconnected from server";}
}void ChatClient::onMessage(const TcpConnectionPtr& conn,Buffer* buf,Timestamp time) {std::string msg(buf->retrieveAllAsString());std::cout << "Server reply: " << msg << std::endl;
}

主函数:

client_main.cpp
#include "../client/client.h"
#include <muduo/net/EventLoop.h>
#include <iostream>
#include <thread>
#include <atomic>std::atomic<bool> g_running(true);void handleInput(ChatClient& client) {std::string line;while (g_running.load() && std::getline(std::cin, line)) {if (line == "quit") {g_running.store(false);break;}client.sendMessage(line);}
}int main(int argc, char* argv[]) {if (argc < 3) {std::cerr << "Usage: " << argv[0] << " <server_ip> <server_port>" << std::endl;return 1;}muduo::net::EventLoop loop;muduo::net::InetAddress serverAddr(argv[1], std::stoi(argv[2]));ChatClient client(&loop, serverAddr);std::thread inputThread(handleInput, std::ref(client));client.connect();loop.loop();g_running.store(false);if (inputThread.joinable()) {inputThread.join();}return 0;
}

服务端代码:

server.h
#ifndef CHAT_SERVER_H
#define CHAT_SERVER_H#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/base/Logging.h> 
#include <unordered_set>
#include "chat.pb.h"class ChatServer {
public:ChatServer(muduo::net::EventLoop* loop,const muduo::net::InetAddress& listenAddr);void start();private:void onConnection(const muduo::net::TcpConnectionPtr& conn);void onMessage(const muduo::net::TcpConnectionPtr& conn,muduo::net::Buffer* buf,muduo::Timestamp time);void handleRequest(const muduo::net::TcpConnectionPtr& conn,const chat::ChatRequest& request);muduo::net::TcpServer server_;std::unordered_set<muduo::net::TcpConnectionPtr> connections_;std::vector<chat::ChatMessage> chatHistory_;
};#endif // CHAT_SERVER_H
server.cpp
#include "server.h"
#include "chat.pb.h"
#include <muduo/base/Logging.h>using namespace muduo;
using namespace muduo::net;ChatServer::ChatServer(EventLoop* loop, const InetAddress& listenAddr): server_(loop, listenAddr, "ChatServer") {server_.setMessageCallback(std::bind(&ChatServer::onMessage, this, _1, _2, _3));
}void ChatServer::start() {server_.start();
}void ChatServer::onMessage(const TcpConnectionPtr& conn,Buffer* buf,Timestamp time) {// 获取并打印完整消息内容std::string msg = buf->retrieveAllAsString();printf("收到客户端消息[长度:%lu]: %s\n", msg.size(), msg.c_str());// 简单回复确认(包含原始消息)std::string reply = "Server收到消息: " + msg;conn->send(reply);
}

主函数:

server_main.cpp
#include "../server/server.h"
#include <muduo/net/EventLoop.h>int main() {muduo::net::EventLoop loop;muduo::net::InetAddress listenAddr(8888);ChatServer server(&loop, listenAddr);server.start();loop.loop();return 0;
}

效果如下:
在这里插入图片描述
在这里插入图片描述

相关文章:

  • [逆向工程]C++实现DLL注入:原理、实现与防御全解析(二十五)
  • 【Linux】进程间通信(一):认识管道
  • 微软家各种copilot的AI产品:Github copilot、Microsoft copilot
  • Selenium无法定位元素的几种解决方案详解
  • Spring MVC 如何处理文件上传? 需要哪些配置和依赖?如何在 Controller 中接收上传的文件 (MultipartFile)?
  • uniapp自动构建pages.json的vite插件
  • 前脚收购 Windsurf 后,OpenAI 深夜发布 Codex。
  • 26、思维链Chain-of-Thought(CoT)论文笔记
  • 9.DMA
  • (9)python开发经验
  • 【机器学习】第二章模型的评估与选择
  • 学习笔记(C++篇)—— Day 6
  • 2025 年九江市第二十三届中职学校技能大赛 (网络安全)赛项竞赛样题
  • 数据结构第七章(四)-B树和B+树
  • 从代码学习深度学习 - 词嵌入(word2vec)PyTorch版
  • 兰亭妙微:用系统化思维重构智能座舱 UI 体验
  • HarmonyOS:重构万物互联时代的操作系统范式
  • 【论文#目标检测】End-to-End Object Detection with Transformers
  • WPS PPT设置默认文本框
  • pytorch小记(二十一):PyTorch 中的 torch.randn 全面指南
  • 以色列媒体:哈马斯愿意释放部分人员换取两个月停火
  • 菲律宾选举委员会公布中期选举结果,马科斯阵营选情未达预期
  • 新城市志|GDP万亿城市,一季度如何挑大梁
  • 用贝多芬八首钢琴三重奏纪念风雨并肩20年
  • 贞丰古城:新垣旧梦间的商脉与烟火
  • 国家统计局公布2024年城镇单位就业人员年平均工资情况