当前位置: 首页 > news >正文

从 OneThreadOneLoop 线程池到进程池:高性能 Reactor 服务器的演进

文章目录

  • 引言
  • 一、为什么选择进程池?
  • 二、核心文件改动说明
    • 1. EventLoopProcess.hpp:进程池核心实现
      • 1.1 进程间通信与文件描述符传递
      • 1.2 单个事件循环进程封装
      • 1.3 进程池管理
    • 2. ReactorServer.hpp:服务器核心调整
      • 2.1 成员变量调整
      • 2.2 初始化流程调整
      • 2.3 新连接处理流程重构
      • 2.4 子进程中的连接处理
  • 三、关键技术点解析
  • 总结

引言

在网络编程中,OneThreadOneLoop 模型是实现高性能 IO 的经典模式。上一篇我们介绍了基于线程池的 OneThreadOneLoop 实现,本文将聚焦于如何将其改造为进程池版本,以更好地利用多核 CPU 资源并避免线程安全带来的复杂问题。

一、为什么选择进程池?

线程池版本的 OneThreadOneLoop 虽然能有效利用多核,但线程间共享地址空间带来了潜在的线程安全问题,需要大量同步机制保证正确性。而进程池具有以下优势:

  • 天然的内存隔离,避免了大部分同步问题
  • 每个进程独立占用 CPU 核心,无线程切换开销
  • 可避免某些语言(如 Python)的 GIL 限制
  • 单个进程崩溃不会导致整个服务不可用

当然进程池也引入了新的挑战:进程间通信(IPC)和文件描述符传递。接下来我们看看具体实现。

二、核心文件改动说明

本次改造的核心是将线程池实现 EventLoopThreadPool.hpp 替换为进程池实现 EventLoopProcess.hpp,并调整 ReactorServer.hpp 以适配进程池模型。

1. EventLoopProcess.hpp:进程池核心实现

这个文件是本次改造的核心,负责创建和管理多个进程,每个进程运行一个独立的 EventLoop。

1.1 进程间通信与文件描述符传递

进程间通信采用 socketpair 创建双向管道,关键是实现跨进程的文件描述符传递(依赖 Unix 域套接字的 SCM_RIGHTS 特性):

// 用于进程间传递文件描述符的辅助函数
ssize_t send_fd(int socket, int fd_to_send) {struct msghdr socket_message;struct iovec io_vector[1];struct cmsghdr *control_message = nullptr;char message_buffer[1];char ancillary_element_buffer[CMSG_SPACE(sizeof(int))];// 至少发送一个字节的消息体message_buffer[0] = 'F';io_vector[0].iov_base = message_buffer;io_vector[0].iov_len = 1;// 初始化消息结构memset(&socket_message, 0, sizeof(struct msghdr));socket_message.msg_iov = io_vector;socket_message.msg_iovlen = 1;// 准备辅助数据空间(用于传递文件描述符)socket_message.msg_control = ancillary_element_buffer;socket_message.msg_controllen = CMSG_SPACE(sizeof(int));// 初始化辅助数据(传递文件描述符)control_message = CMSG_FIRSTHDR(&socket_message);control_message->cmsg_level = SOL_SOCKET;control_message->cmsg_type = SCM_RIGHTS;  // 标识传递的是文件描述符control_message->cmsg_len = CMSG_LEN(sizeof(int));*((int *)CMSG_DATA(control_message)) = fd_to_send;return sendmsg(socket, &socket_message, 0);
}

1.2 单个事件循环进程封装

EventLoopProcess 类封装了单个进程的生命周期和事件循环:

class EventLoopProcess {
public:EventLoopProcess() : _loop(nullptr), _running(false), _pid(-1) {// 创建进程间通信管道if (socketpair(AF_UNIX, SOCK_STREAM, 0, _pipefd) == -1) {perror("socketpair");exit(1);}// 设置管道为非阻塞模式int flags = fcntl(_pipefd[0], F_GETFL, 0);fcntl(_pipefd[0], F_SETFL, flags | O_NONBLOCK);flags = fcntl(_pipefd[1], F_GETFL, 0);fcntl(_pipefd[1], F_SETFL, flags | O_NONBLOCK);}// 启动进程并返回通信管道写端(供父进程使用)int startLoop() {_running = true;_pid = fork();  // 创建子进程if (_pid < 0) {perror("fork");return -1;} else if (_pid == 0) {  // 子进程逻辑close(_pipefd[1]);  // 关闭写端childProcessFunc();  // 运行事件循环exit(0);} else {  // 父进程逻辑close(_pipefd[0]);  // 关闭读端return _pipefd[1];  // 返回写端给父进程}}private:// 子进程函数:运行事件循环并处理新连接void childProcessFunc() {EventLoop loop;_loop = &loop;// 注册管道读事件(接收父进程传递的客户端fd)Channel pipeChannel(_pipefd[0]);pipeChannel.setReadCallback([this, &loop, &pipeChannel]() {// 接收文件描述符的逻辑(省略细节,与send_fd对应)// ...// 触发新连接回调extern std::function<void(int, EventLoop&)> newConnectionCallback;if (newConnectionCallback) {newConnectionCallback(fd, loop);}});pipeChannel.enableReading();loop.updateChannel(&pipeChannel);loop.start();  // 启动事件循环_loop = nullptr;}private:EventLoop* _loop;         // 进程内的事件循环bool _running;            // 运行状态pid_t _pid;               // 进程IDint _pipefd[2];           // 进程间通信管道
};

1.3 进程池管理

EventLoopProcessPool 负责管理多个进程,实现连接的负载均衡(轮询策略):

class EventLoopProcessPool {
public:EventLoopProcessPool(size_t numProcesses) : _num_processes(numProcesses), _next_process(0) {}// 启动所有进程void start() {for (size_t i = 0; i < _num_processes; ++i) {auto process = std::make_unique<EventLoopProcess>();int writeFd = process->startLoop();if (writeFd != -1) {_write_fds.push_back(writeFd);_processes.push_back(std::move(process));}}}// 轮询获取下一个进程的通信管道写端int getNextWriteFd() {if (_write_fds.empty()) {return -1;}int fd = _write_fds[_next_process];_next_process = (_next_process + 1) % _write_fds.size();return fd;}private:size_t _num_processes;                          // 进程数量std::vector<int> _write_fds;                    // 通信管道写端列表std::vector<std::unique_ptr<EventLoopProcess>> _processes;  // 进程列表size_t _next_process;                           // 下一个要使用的进程索引
};

2. ReactorServer.hpp:服务器核心调整

服务器主类需要适配进程池模型,主要改动集中在连接处理流程和资源管理。

2.1 成员变量调整

将线程池替换为进程池,并添加全局回调(用于子进程处理新连接):

class ReactorServer {
public:ReactorServer(uint16_t port, int backlog = 1024, size_t process_num = 4) : _port(port), _backlog(backlog), _main_loop(),_process_pool(process_num),  // 替换为进程池_is_running(false),_listen_socket(std::make_unique<TcpSocket>()) {// 设置全局回调(子进程中处理新连接)newConnectionCallback = [this](int fd, EventLoop& loop) {this->handleNewConnectionInChild(fd, loop);};}private:// ... 其他成员EventLoopProcessPool _process_pool;  // 事件循环进程池(替换原线程池)// ...
};

2.2 初始化流程调整

在初始化时启动进程池,而非线程池:

bool Init() {try {// 创建并配置监听Socket(省略细节)// ...// 初始化主事件循环的AcceptorinitAcceptor();// 启动事件循环进程池(替换原线程池启动)_process_pool.start();return true;} catch (...) {std::cerr << "服务器初始化失败" << std::endl;return false;}
}

2.3 新连接处理流程重构

主进程接收连接后,通过进程池将客户端 fd 传递给子进程:

// 处理新连接(主进程中)
void onNewConnection(int client_fd, const std::string& client_ip) {if (client_fd < 0) return;std::cout << "新连接:IP=" << client_ip << ", fd=" << client_fd << std::endl;// 创建客户端Socket并设置为非阻塞TcpSocket client_socket(client_fd);client_socket.SetNonBlocking();// 选择一个子进程(轮询策略)int writeFd = _process_pool.getNextWriteFd();if (writeFd == -1) {close(client_fd);return;}// 发送文件描述符到子进程if (send_fd(writeFd, client_fd) == -1) {perror("send_fd");close(client_fd);}
}

2.4 子进程中的连接处理

子进程接收 fd 后,创建客户端处理器并绑定到自身的事件循环:

// 在子进程中处理新连接
void handleNewConnectionInChild(int client_fd, EventLoop& loop) {// 创建客户端处理器并绑定到当前子进程的事件循环auto client_handler = std::make_shared<ClientHandler>(client_fd, loop,[this](int fd, const std::string& data) {onMessage(fd, "", data);  // 子进程中无法直接获取client_ip},[this](int fd) {onClose(fd);});// 在子进程中保存客户端连接(每个进程有独立的_clients)std::lock_guard<std::mutex> lock(_clients_mutex);_clients[client_fd] = client_handler;
}

三、关键技术点解析

  1. 文件描述符跨进程传递:通过 Unix 域套接字的 SCM_RIGHTS 特性,实现客户端连接 fd 从主进程到子进程的安全传递。
  2. 进程间通信:使用 socketpair 创建的管道作为进程间通信通道,主进程发送连接 fd,子进程接收并处理。
  3. 负载均衡:采用轮询策略将新连接分配到不同子进程,简单高效且能保证负载均匀。
  4. 资源隔离:每个子进程维护独立的客户端连接列表(_clients),避免了进程间的资源竞争,无需复杂同步。

总结

从线程池到进程池的改造,核心是解决了进程间通信和文件描述符传递的问题。通过EventLoopProcessPool的封装,我们保留了 OneThreadOneLoop 模型的高性能特性,同时获得了进程隔离带来的稳定性提升。
这种模型特别适合 CPU 密集型的服务场景,或需要严格隔离不同客户端连接的场景。当然,进程池也有其局限性(如内存占用较高、IPC 开销略大),实际应用中需根据业务场景选择合适的模型。

http://www.dtcms.com/a/604884.html

相关文章:

  • C语言在线编译环境 | 提高编程效率与学习体验
  • 矩阵起源成功登陆深圳“专精特新”专板,加速 AI 数据智能新进程!
  • MPC模型预测控制原理全解析:从状态预测、矩阵推导到QP求解与滚动优化(含完整手推过程)
  • android recyclerview缓存2_四级缓存机制
  • [特殊字符] 在 macOS 上设置 SQLite
  • android recyclerview缓存1_概念和常见问题
  • SQLite 速成学习
  • [特殊字符] 在 Windows 上设置 SQLite
  • 做资源网站违法吗深圳大梅沙
  • 【环境数据处理】-基于R批量对环境数据克里金插值提高数据精度
  • 广州wap网站建设百度seo优化技术
  • linux centos常用命令整理
  • Java设计模式之建造者模式(Builder)详解
  • [智能体设计模式] 第6章:规划
  • 学习react第三天
  • 营销软文网站西安网站建设网络公司熊掌号
  • 二分查找算法介绍及使用
  • [element-plus] el-tree 动态增加节点,删除节点
  • SQL:从数据基石到安全前线的双重审视
  • 数据结构:双向链表(1)
  • 【C++】深入拆解二叉搜索树:从递归与非递归双视角,彻底掌握STL容器的基石
  • 深圳趣网站建设网络外包服务公司
  • Axios 全面详解
  • ios-AVIF
  • 360网站建设公司哪家好石家庄有哪些互联网公司
  • 单机并发简介
  • 自相关实操流程
  • java基础-集合接口(Collection)
  • 基于中国深圳无桩共享单车数据的出行目的推断与时空活动模式挖掘
  • 【Rust】通过系统编程语言获取当前系统内存、CPU等运行情况,以及简单实现图片采集并设置系统壁纸