从 OneThreadOneLoop 线程池到进程池:高性能 Reactor 服务器的演进
文章目录
- 引言
- 一、为什么选择进程池?
- 二、核心文件改动说明
- 1. EventLoopProcess.hpp:进程池核心实现
- 1.1 进程间通信与文件描述符传递
- 1.2 单个事件循环进程封装
- 1.3 进程池管理
- 2. ReactorServer.hpp:服务器核心调整
- 2.1 成员变量调整
- 2.2 初始化流程调整
- 2.3 新连接处理流程重构
- 2.4 子进程中的连接处理
- 三、关键技术点解析
- 总结
引言
在网络编程中,OneThreadOneLoop 模型是实现高性能 IO 的经典模式。上一篇我们介绍了基于线程池的 OneThreadOneLoop 实现,本文将聚焦于如何将其改造为进程池版本,以更好地利用多核 CPU 资源并避免线程安全带来的复杂问题。
一、为什么选择进程池?
线程池版本的 OneThreadOneLoop 虽然能有效利用多核,但线程间共享地址空间带来了潜在的线程安全问题,需要大量同步机制保证正确性。而进程池具有以下优势:
- 天然的内存隔离,避免了大部分同步问题
- 每个进程独立占用 CPU 核心,无线程切换开销
- 可避免某些语言(如 Python)的 GIL 限制
- 单个进程崩溃不会导致整个服务不可用
当然进程池也引入了新的挑战:进程间通信(IPC)和文件描述符传递。接下来我们看看具体实现。
二、核心文件改动说明
本次改造的核心是将线程池实现 EventLoopThreadPool.hpp 替换为进程池实现 EventLoopProcess.hpp,并调整 ReactorServer.hpp 以适配进程池模型。
1. EventLoopProcess.hpp:进程池核心实现
这个文件是本次改造的核心,负责创建和管理多个进程,每个进程运行一个独立的 EventLoop。
1.1 进程间通信与文件描述符传递
进程间通信采用 socketpair 创建双向管道,关键是实现跨进程的文件描述符传递(依赖 Unix 域套接字的 SCM_RIGHTS 特性):
// 用于进程间传递文件描述符的辅助函数
ssize_t send_fd(int socket, int fd_to_send) {struct msghdr socket_message;struct iovec io_vector[1];struct cmsghdr *control_message = nullptr;char message_buffer[1];char ancillary_element_buffer[CMSG_SPACE(sizeof(int))];// 至少发送一个字节的消息体message_buffer[0] = 'F';io_vector[0].iov_base = message_buffer;io_vector[0].iov_len = 1;// 初始化消息结构memset(&socket_message, 0, sizeof(struct msghdr));socket_message.msg_iov = io_vector;socket_message.msg_iovlen = 1;// 准备辅助数据空间(用于传递文件描述符)socket_message.msg_control = ancillary_element_buffer;socket_message.msg_controllen = CMSG_SPACE(sizeof(int));// 初始化辅助数据(传递文件描述符)control_message = CMSG_FIRSTHDR(&socket_message);control_message->cmsg_level = SOL_SOCKET;control_message->cmsg_type = SCM_RIGHTS; // 标识传递的是文件描述符control_message->cmsg_len = CMSG_LEN(sizeof(int));*((int *)CMSG_DATA(control_message)) = fd_to_send;return sendmsg(socket, &socket_message, 0);
}
1.2 单个事件循环进程封装
EventLoopProcess 类封装了单个进程的生命周期和事件循环:
class EventLoopProcess {
public:EventLoopProcess() : _loop(nullptr), _running(false), _pid(-1) {// 创建进程间通信管道if (socketpair(AF_UNIX, SOCK_STREAM, 0, _pipefd) == -1) {perror("socketpair");exit(1);}// 设置管道为非阻塞模式int flags = fcntl(_pipefd[0], F_GETFL, 0);fcntl(_pipefd[0], F_SETFL, flags | O_NONBLOCK);flags = fcntl(_pipefd[1], F_GETFL, 0);fcntl(_pipefd[1], F_SETFL, flags | O_NONBLOCK);}// 启动进程并返回通信管道写端(供父进程使用)int startLoop() {_running = true;_pid = fork(); // 创建子进程if (_pid < 0) {perror("fork");return -1;} else if (_pid == 0) { // 子进程逻辑close(_pipefd[1]); // 关闭写端childProcessFunc(); // 运行事件循环exit(0);} else { // 父进程逻辑close(_pipefd[0]); // 关闭读端return _pipefd[1]; // 返回写端给父进程}}private:// 子进程函数:运行事件循环并处理新连接void childProcessFunc() {EventLoop loop;_loop = &loop;// 注册管道读事件(接收父进程传递的客户端fd)Channel pipeChannel(_pipefd[0]);pipeChannel.setReadCallback([this, &loop, &pipeChannel]() {// 接收文件描述符的逻辑(省略细节,与send_fd对应)// ...// 触发新连接回调extern std::function<void(int, EventLoop&)> newConnectionCallback;if (newConnectionCallback) {newConnectionCallback(fd, loop);}});pipeChannel.enableReading();loop.updateChannel(&pipeChannel);loop.start(); // 启动事件循环_loop = nullptr;}private:EventLoop* _loop; // 进程内的事件循环bool _running; // 运行状态pid_t _pid; // 进程IDint _pipefd[2]; // 进程间通信管道
};
1.3 进程池管理
EventLoopProcessPool 负责管理多个进程,实现连接的负载均衡(轮询策略):
class EventLoopProcessPool {
public:EventLoopProcessPool(size_t numProcesses) : _num_processes(numProcesses), _next_process(0) {}// 启动所有进程void start() {for (size_t i = 0; i < _num_processes; ++i) {auto process = std::make_unique<EventLoopProcess>();int writeFd = process->startLoop();if (writeFd != -1) {_write_fds.push_back(writeFd);_processes.push_back(std::move(process));}}}// 轮询获取下一个进程的通信管道写端int getNextWriteFd() {if (_write_fds.empty()) {return -1;}int fd = _write_fds[_next_process];_next_process = (_next_process + 1) % _write_fds.size();return fd;}private:size_t _num_processes; // 进程数量std::vector<int> _write_fds; // 通信管道写端列表std::vector<std::unique_ptr<EventLoopProcess>> _processes; // 进程列表size_t _next_process; // 下一个要使用的进程索引
};
2. ReactorServer.hpp:服务器核心调整
服务器主类需要适配进程池模型,主要改动集中在连接处理流程和资源管理。
2.1 成员变量调整
将线程池替换为进程池,并添加全局回调(用于子进程处理新连接):
class ReactorServer {
public:ReactorServer(uint16_t port, int backlog = 1024, size_t process_num = 4) : _port(port), _backlog(backlog), _main_loop(),_process_pool(process_num), // 替换为进程池_is_running(false),_listen_socket(std::make_unique<TcpSocket>()) {// 设置全局回调(子进程中处理新连接)newConnectionCallback = [this](int fd, EventLoop& loop) {this->handleNewConnectionInChild(fd, loop);};}private:// ... 其他成员EventLoopProcessPool _process_pool; // 事件循环进程池(替换原线程池)// ...
};
2.2 初始化流程调整
在初始化时启动进程池,而非线程池:
bool Init() {try {// 创建并配置监听Socket(省略细节)// ...// 初始化主事件循环的AcceptorinitAcceptor();// 启动事件循环进程池(替换原线程池启动)_process_pool.start();return true;} catch (...) {std::cerr << "服务器初始化失败" << std::endl;return false;}
}
2.3 新连接处理流程重构
主进程接收连接后,通过进程池将客户端 fd 传递给子进程:
// 处理新连接(主进程中)
void onNewConnection(int client_fd, const std::string& client_ip) {if (client_fd < 0) return;std::cout << "新连接:IP=" << client_ip << ", fd=" << client_fd << std::endl;// 创建客户端Socket并设置为非阻塞TcpSocket client_socket(client_fd);client_socket.SetNonBlocking();// 选择一个子进程(轮询策略)int writeFd = _process_pool.getNextWriteFd();if (writeFd == -1) {close(client_fd);return;}// 发送文件描述符到子进程if (send_fd(writeFd, client_fd) == -1) {perror("send_fd");close(client_fd);}
}
2.4 子进程中的连接处理
子进程接收 fd 后,创建客户端处理器并绑定到自身的事件循环:
// 在子进程中处理新连接
void handleNewConnectionInChild(int client_fd, EventLoop& loop) {// 创建客户端处理器并绑定到当前子进程的事件循环auto client_handler = std::make_shared<ClientHandler>(client_fd, loop,[this](int fd, const std::string& data) {onMessage(fd, "", data); // 子进程中无法直接获取client_ip},[this](int fd) {onClose(fd);});// 在子进程中保存客户端连接(每个进程有独立的_clients)std::lock_guard<std::mutex> lock(_clients_mutex);_clients[client_fd] = client_handler;
}
三、关键技术点解析
- 文件描述符跨进程传递:通过 Unix 域套接字的
SCM_RIGHTS特性,实现客户端连接 fd 从主进程到子进程的安全传递。 - 进程间通信:使用
socketpair创建的管道作为进程间通信通道,主进程发送连接 fd,子进程接收并处理。 - 负载均衡:采用轮询策略将新连接分配到不同子进程,简单高效且能保证负载均匀。
- 资源隔离:每个子进程维护独立的客户端连接列表(
_clients),避免了进程间的资源竞争,无需复杂同步。
总结
从线程池到进程池的改造,核心是解决了进程间通信和文件描述符传递的问题。通过EventLoopProcessPool的封装,我们保留了 OneThreadOneLoop 模型的高性能特性,同时获得了进程隔离带来的稳定性提升。
这种模型特别适合 CPU 密集型的服务场景,或需要严格隔离不同客户端连接的场景。当然,进程池也有其局限性(如内存占用较高、IPC 开销略大),实际应用中需根据业务场景选择合适的模型。
