【IO多路转接】高并发服务器实战:Reactor 框架与 Epoll 机制的封装与设计逻辑

文章目录
- 前言
- 一. Epoll的工作模式
- 二. Reactor 服务器
- 2.1 对网络套接字进行封装
- 2.2 对Epoll接口进行封装
- 2.3 设计一个管理连接的类
- 2.4 设计 Reactor服务器 类
- 2.5 将文件描述符设置为非阻塞
- 2.6 所有文件描述符的处理方法
- 2.6.1 普通文件描述符的处理方法
- 2.6.2 套接字的处理方法
- 2.7 初始化服务器
- 2.8 进行任务派发
- 三. 补充
- 3.1 实现在线计算器
- 3.2 引入线程池
前言
在高并发成为系统标配的今天,网络编程、中间件开发、分布式通信等场景中,“如何高效处理海量 IO 请求” 始终是开发者绕不开的核心命题。传统 “一连接一线程” 的同步阻塞模型,早已因线程资源耗尽、CPU 上下文切换频繁、内存占用过高等问题,难以应对万级甚至十万级的并发连接;即便引入线程池优化,也无法从根本上解决 “等待 IO 时线程闲置” 的资源浪费困境。
正是在这样的需求下,基于 “事件驱动” 与 “IO 多路转接” 的 Reactor 模式应运而生 —— 它以 “少量线程监听多 IO、事件触发业务处理” 的核心逻辑,成为解决高并发 IO 的经典架构:小到 Netty 的网络通信内核、Redis 的事件循环,大到 Nginx 的请求处理框架、Kafka 的消息接收模块,其底层都能看到 Reactor 模式的影子。可以说,理解 Reactor 模式的实现逻辑,是掌握高并发系统设计的 “关键钥匙”。
本文正是围绕 “Reactor 模式实现” 展开:不局限于抽象原理,而是从底层技术依赖(IO 多路转接调用)切入,一步步拆解事件循环的构建、组件间的协作逻辑,手把手帮助你构建出一个基于Reactor 模式的服务器。
一. Epoll的工作模式
Epoll有两种工作模式:水平触发(Level Triggered,简称 LT) 和 边缘触发(Edge Triggered,简称 ET)。这两种模式的核心差异在于 “何时通知应用程序某个文件描述符(fd)就绪”,直接影响高并发 IO 处理的效率和编程复杂度。
- 水平触发(LT):默认模式,“状态持续” 触发:
当一个文件描述符(如 socket)处于就绪状态(例如:有数据可读、可写,或发生异常)时,epoll 会持续通知应用程序,直到该就绪状态被 “消除”(例如:数据被完全读取、缓冲区被写满)。
- 边缘触发(ET):“状态变化” 触发,高效但复杂:
epoll
仅在文件描述符的就绪状态发生 “变化瞬间” 通知一次,之后无论该状态是否持续,都不再通知。,即只有在读写资源从没就绪多就绪的时候才会进行通知。
我们通常会认为ET模式的效率更高:
- ET当资源就绪的时候只会通知一次,并不需要反复通知。并且如果上层没有将数据读取完毕,也不会再进行通知了;
- 因为ET模式只会进行通知一次,因此其==会倒逼着上层在进行读取时要将数据一次全部取完,这样就可以空出一个更大的接收缓冲区,对方也可以发送更多的。
二. Reactor 服务器
一下我们开始进行基于 Reactor 模式设计的高性能网络服务器,通过 “事件驱动” 和 “IO 多路转接” 技术,高效处理海量并发连接。
2.1 对网络套接字进行封装
关于网络套接字可以查看,我之前写的关于TCP的文章,改内容并不是本文的重点,所以此处直接贴实现代码了:
const std::string defaultip_ = "0.0.0.0";
enum SockErr
{SOCKET_Err, BIND_Err,
};class Sock
{
public:Sock(uint16_t port): port_(port),listensockfd_(-1){}void Socket(){listensockfd_ = socket(AF_INET, SOCK_STREAM, 0);if (listensockfd_ < 0){Log(Fatal) << "socket fail";exit(SOCKET_Err);}Log(Info) << "socket sucess";}void Bind(){struct sockaddr_in server;server.sin_family = AF_INET;server.sin_port = htons(port_);inet_pton(AF_INET, defaultip_.c_str(), &server.sin_addr);if (bind(listensockfd_, (struct sockaddr *)&server, sizeof(server)) < 0){Log(Fatal) << "bind fail";exit(BIND_Err);}Log(Info) << "bind sucess";}void Listen(){if (listen(listensockfd_, 10) < 0){Log(Warning) << "listen fail";}Log(Info) << "listen sucess";}int Accept(){struct sockaddr_in client;socklen_t len = sizeof(client);int fd = accept(listensockfd_ , (sockaddr*)&client , &len);return fd;}int Accept(std::string& ip , uint16_t& port){struct sockaddr_in client;socklen_t len = sizeof(client);int fd = accept(listensockfd_ , (sockaddr*)&client , &len);port = ntohs(client.sin_port);char bufferip[64];inet_ntop(AF_INET , &client.sin_addr , bufferip , sizeof(bufferip) - 1);ip = bufferip;return fd;}int Get_fd(){return listensockfd_;}~Sock(){close(listensockfd_);}private:uint16_t port_;int listensockfd_;
};
2.2 对Epoll接口进行封装
关于Epoll具体的细节,可以查看之前关于关于Epoll的文章,此处我们直接对封装的接口进行使用:
enum EpollErr
{CREAR_Err,
};class Epoll
{
public:Epoll(){// 创建epoll模型_epfd = epoll_create(1);if (_epfd < 0){Log(Fatal) << "epoll_create fail";exit(CREAR_Err);}Log(Info) << "epoll create sucess ";}void Add_fd(int fd, uint32_t event){// 添加文件描述符到红黑树中struct epoll_event epevt;epevt.events = event;epevt.data.fd = fd;if (epoll_ctl(_epfd, EPOLL_CTL_ADD , fd, &epevt) < 0){Log(Warning) << "epoll add error : " << strerror(errno);}Log(Info) << "epoll add sucess , fd : " << fd ;}void Del_fd(int fd){// 删除要进行等待的文件描述符if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr) < 0){Log(Warning) << "epoll del error : " << strerror(errno);}Log(Info) << "epoll del sucess , fd : " << fd;}void Mod_fd(int fd, uint32_t event){// 对文件描述符的事件进行修改struct epoll_event epevt;epevt.events = event;epevt.data.fd = fd;if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &epevt) < 0){Log(Warning) << "epoll mod error : " << strerror(errno);}}int Wait(struct epoll_event *ep_array, int max_size, int timeout){// 进行等待return epoll_wait(_epfd, ep_array, max_size, timeout);}private:int _epfd;
};
2.3 设计一个管理连接的类
因为TCP通信传递的是字节流,因此我们无法确定每次获取到的数据是一个有效报文,因此我们需要将所有获取到的数据都先存储起来:
- 我们需要一个整形,存储连接对应的文件描述符;
- 需要两个缓冲区:输入缓冲区和输出缓冲区;
- 当然为将代码的耦合性尽量降低一些,此处我们将不同文件描述符处理读写以及异常事件的方法也放到
Connection
类中。
这些方法的参数统一都设置为:std::shared_ptr<Connection>
来保证当跳转到外界去进行代码的执行时,依旧可以拿到文件描述符的相关资源。
class Connection;
using func_t = std::function<void(std::shared_ptr<Connection>)>;class Connection
{
public:Connection(int fd , func_t recv , func_t sender , func_t exception):_fd(fd) , _Recv(recv) , _Sender(sender) , _Exception(exception){}
private:int _fd; // 对应的文件描述符std::string _inbuffer ; // 输入缓冲区std::string _outbuffer; // 输出缓冲区
public:func_t _Recv; // 处理接收的逻辑func_t _Sender; // 处理发送的逻辑func_t _Exception; // 处理出现异常时的逻辑
};
在该类中,毫无疑问我们在后续需要先缓冲区中进行读写操作:
std::string& Get_Inbuffer(){return _inbuffer;}std::string& Get_Outbuffer(){return _outbuffer;}void Add_In(const std::string& mes){_inbuffer += mes;}void Add_Out(const std::string& mes){_outbuffer += mes;}int Get_fd(){return _fd;}
可能后续还需要使用一些操作,在后面再进行补充。
2.4 设计 Reactor服务器 类
- 需要一个Sock对象来从网路中获取客户端的连接;
- 需要一个Epoll对象来使用
epoll
多路转接的接口; - 使用一个哈希表来存储每一个文件描述符与之对应的
Connection
资源,方便我们后面获取一个文件描述符的输入缓冲区和输出缓冲区; - 还需要一个缓冲区,负责接收epoll模型等待结束后返回的就绪队列中的文件描述符信息。
class Rserver
{static const int array_num_max = 1024;
public:Rserver(uint16_t port):_sock_ptr(new Sock(port)) , _epoll_ptr(new Epoll){}private:std::shared_ptr<Sock> _sock_ptr;std::shared_ptr<Epoll> _epoll_ptr;std::unordered_map<int , std::shared_ptr<Connection> > _connections;struct epoll_event _epl_array[array_num_max];
};
2.5 将文件描述符设置为非阻塞
在ET模式下,我们要保证一次将所有的资源都获取上来,因此我们需要while
式的对资源进行读取,这就使得如果没有资源了我们也不能让其堵塞住,因此要将所有文件描述符设置为非阻塞状态。
此时使用int fcntl(int fd, int op , ... )
接口进行设置:
int SetNoBlock(int fd)
{int fl = fcntl(fd, F_GETFL);fl |= O_NONBLOCK;int n = fcntl(fd, F_SETFL, fl); return n;
}
2.6 所有文件描述符的处理方法
2.6.1 普通文件描述符的处理方法
首先就是普通文件的接收方法:
- 将缓冲区中的数据全部读取到connection中;
- 调用外界函数判断是否含有一个完成的报文;
- 含有完整报文就进行处理。
对于第二步,我们可以先外界开放一个接口,让外界将数据进行处理,将处理好的数据再给我,由服务器进行发送,因此我们在服务端的类中添加一个成员,负责回调:
using callback_func = std::function<std::string(std::shared_ptr<Connection>)>;class Rserver
{static const int array_num_max = 1024;
public:Rserver(uint16_t port , callback_func Onmessage):_sock_ptr(new Sock(port)) , _epoll_ptr(new Epoll) , _Onmessage(Onmessage){}private:std::shared_ptr<Sock> _sock_ptr;std::shared_ptr<Epoll> _epoll_ptr;std::unordered_map<int , std::shared_ptr<Connection> > _connections;struct epoll_event _epl_array[array_num_max];callback_func _Onmessage; // 负责回调
};
关于普通文件描述符的接受问题,需要注意的就是read
的不同返回值进行不同的处理:
void Recv(std::shared_ptr<Connection> con_ptr){// 1. 将缓冲区中的数据全部读取到Connection中// 2. 调用外界函数判断是否含有一个完成的报文// 3. 先客户端返回结果char inbuffer[1024];while(1){int n = read(con_ptr->Get_fd() , inbuffer , sizeof(inbuffer) - 1);if(n > 0){// 有数据inbuffer[n] = 0;con_ptr->Add_In(inbuffer);}else if(n == 0){// 对方关闭了文件 , 断开连接了 // 1. 将文件描述符从epoll模型中移除// 2. 将文件描述符从哈希表中移除// 3. 将文件描述符关闭int fd = con_ptr->Get_fd();_epoll_ptr->Del_fd(fd);_connections.erase(fd);close(fd);return;}else{// 此次有两种情况: 1. 数据读取完了 2. 读取出错了if(errno == EAGAIN) // 读取完了{break; } else // 出错了{// 此处调用文件对应的异常处理con_ptr->_Exception(con_ptr);return;}}}std::string ret = _Onmessage(con_ptr);con_ptr->Add_Out(ret);}
接下来就是编写发送的接口:
思考:对于发送接口是否需要判断,写事件是否就绪???
在大多数时候,写事件都是就绪的;因此如果将其加入到判断中epoll_wait
就会频繁的进行返回,会影响效率;所以一般不对写事件加入到等待中,除非写缓冲区满了,此时才将写加入到等待中。
- 在代码中表现为:在调用
write
接口的时候,实际写入的大小比我字符串要小。
void Sender(std::shared_ptr<Connection> con_ptr){// 进行数据的发送// 直接进行发送std::string& outbuffer = con_ptr->Get_Outbuffer();int fd = con_ptr->Get_fd();// 循环式的进行发送while(1){int n = write(fd , outbuffer.c_str() , outbuffer.size());if(n > 0){// 1. 将已经发送的数据从字符串中移除outbuffer.erase(0 , n);if(outbuffer.empty()) break; // 已经写完了}else if(n == 0){break;}else{if(errno == EAGAIN) // 已经写完了break;else // 出错了{// 此处调用文件对应的异常处理con_ptr->_Exception(con_ptr);return;}}}// 判断发送缓冲区中是否还有数据if(!outbuffer.empty()){// 发送缓冲区满了_epoll_ptr->Mod_fd(fd , EPOLLIN | EPOLLOUT | EPOLLET);}else{// 缓冲区没满 , 不需要对写事件进行检测_epoll_ptr->Mod_fd(fd , EPOLLIN | EPOLLET);}}
最后一步就是对异常情况的处理了:
- 打印日志信息;
- 将文件描述符从
epoll
模型从移除; - 将文件描述符从哈希表中移除;
- 关闭文件描述符。
void Exception(std::shared_ptr<Connection> con_ptr){int fd = con_ptr->Get_fd();_epoll_ptr->Del_fd(fd);_connections.erase(fd);close(fd);}
2.6.2 套接字的处理方法
对于套接字来说,只需要负责将建立好的链接拿上来就行了,不需要进行写入和异常处理。
在创建为新的文件描述符创建Connection
对象的是时候,我们需要传入可执行对象,但是我们在进行统一接口的时候参数都是std::shared_ptr<Connection>
,并且上述的Recv,Sender,Expection
都是类成员函数,都有一个隐含的参数this
指针,所以对于可调用对象在进行传参的是否要使用bind进行绑定。
void Accept(std::shared_ptr<Connection> con_ptr){// 1. 获取文件描述符while (1){int newfd = _sock_ptr->Accept();if (newfd >= 0){// 有新连接// 2. 将文件描述符设置为非阻塞// 3. 将文件加入到epoll模型中// 4. 将文件描述符加入到哈希表中if(SetNoBlock(newfd) < 0){Log(Warning) << "set no block fail";continue;}_epoll_ptr->Add_fd(newfd , EPOLLIN | EPOLLET);std::shared_ptr<Connection> con_ptr(new Connection(newfd ,std::bind(&Rserver::Recv , this , std::placeholders::_1), std::bind(&Rserver::Sender , this , std::placeholders::_1), std::bind(&Rserver::Exception , this , std::placeholders::_1)));_connections.emplace(newfd , con_ptr);}else{if(errno == EAGAIN) break;else{// 出错了Log(Warning) << "accept fail";}}}}
2.7 初始化服务器
- 创建套接字;
- 进行绑定;
- 设置监听模式;
- 将网络套接字加入到epoll模型中,并创建connection加入到_connections中进行管理;
- 在创建
Connection
对象的时候,我们还需要设计一个套接字的Recv
方法.
关于建立好的
void Init(){// 1. 创建套接字// 2. 进行绑定// 3. 设置监听模式// 4. 将网络套接字加入到epoll模型中,并创建Connection加入到_connections中进行管理_sock_ptr->Socket();_sock_ptr->Bind();_sock_ptr->Listen();int listensock = _sock_ptr->Get_fd();SetNoBlock(listensock);_epoll_ptr->Add_fd(listensock , EPOLLIN | EPOLLET);std::shared_ptr<Connection> conptr(new Connection(listensock, std::bind(&Rserver::Accept , this , std::placeholders::_1), nullptr, nullptr));_connections.emplace(listensock, conptr);// 将IP和端口号设置为可复用的int opt = 1;setsockopt(listensock , SOL_SOCKET , SO_REUSEADDR | SO_REUSEPORT , &opt , sizeof(opt));}
2.8 进行任务派发
因为我们之前已经将每个文件描述符对应的处理方法加入到了Connection
对象中了,因此直接进行调用即可。
在进行任务派发的时候有一个细节:可以将异常处理嫁接到读写事件中的异常处理,这样就不需要再单独对异常进行处理了。
void Dispatcher(int n){for (int i = 0; i < n; i++){int fd = _epl_array[i].data.fd;short events = _epl_array[i].events;auto &con_ptr = _connections[fd];// 将异常处理, 转化为读写处理if (events & EPOLLERR){events |= (EPOLLIN | EPOLLOUT);}if (_connections.count(fd) && con_ptr->_Recv){con_ptr->_Recv(con_ptr);}if (_connections.count(fd) && con_ptr->_Sender){con_ptr->_Sender(con_ptr);}}}```## 服务器的主循环服务器的主循环就比较简单了,直接进行`epoll_wait`即可,将操作系统中的就绪队列拿到:```cppvoid Run(){while (1){int n = _epoll_ptr->Wait(_epl_array, array_num_max, -1);if (n > 0){Dispatcher(n);}else if (n == 0){Log(Info) << "no message";}else{Log(Warning) << "epoll wait fail";}}}
以上就是整个服务器的实现过程了,下面我们对服务器接入一下事件,让服务器能够处理一些业务。
三. 补充
3.1 实现在线计算器
此处我们引入之前:手动私下序列化和换序列化的代码,来实现一个手动计算器:
std::string Onmessge(std::shared_ptr<Connection> con_ptr)
{static Calculator cal;std::string& inbuffer = con_ptr->Get_Inbuffer(); std::string ret = cal(inbuffer); // 对请求进行处理 , 返回一个序列化后的字符串return ret;
}
3.2 引入线程池
对于引入线程池,此代码就需要进行重构了,在Connection
对象中我们需要存储一个Server
的回指指针,但是此处不能直接使用shared_ptr<>
否则会出现循环引用,因此要采用weak_ptr
来实现。
但是注意:我们是在类的成员函数中使用其this
指针来构建一个sharead_ptr
,从而初始化weak_ptr
;
如果在类的成员函数中,直接通过 this
指针创建新的 shared_ptr
,会导致两个独立的 shared_ptr
管理同一个对象,但它们的引用计数是分开的:
- 原有的
shared_ptr
(创建服务器时候的)计数减到 0 时,会释放对象; - 新创建的
shared_ptr
(this指针创建的)计数减到 0 时,会再次尝试释放已被销毁的对象,导致双重释放(double free) 或未定义行为。
此处我们需要使用enable_shared_from_this<T>
继承来进行解决:
- 当类
T
继承enable_shared_from_this<T>
后,该类会隐式包含一个weak_ptr<T>
成员(内部维护)。当T
的对象被shared_ptr
管理时,这个weak_ptr
会与管理该对象的shared_ptr
共享控制块(记录引用计数的结构)。
此时,通过调用 shared_from_this()
方法,可返回一个指向自身的 shared_ptr<T>
,这个新的 shared_ptr
会复用原有的引用计数,避免双重释放。
服务器类定义:
class Rserver : public std::enable_shared_from_this<Loop_Epollserver>
{
public:// ......
};
在Connection
类中增加一个成员:weak_ptr<Rserver> _loop_svr
.
对于创建Connection
对象部分也要进行修改:
std::shared_ptr<Connection> conptr(new Connection(listensock,shared_from_this(),std::bind(&Rserver::Accept, this, std::placeholders::_1),nullptr, nullptr));
在第二个实参中,传入this
指针来构建Connection
中的weak_ptr
。
关于线程池部分的代码因为文章篇幅就不再叙述了,大家可以自己试试写一下。