当前位置: 首页 > news >正文

【IO多路转接】高并发服务器实战:Reactor 框架与 Epoll 机制的封装与设计逻辑


请添加图片描述


半桔:个人主页

 🔥 个人专栏: 《IO多路转接》《手撕面试算法》《C++从入门到入土》

🔖无论什么样的灾难降临,只要生命还在,生活始终要继续。活着,就是最美丽的事。 《美丽人生》

文章目录

  • 前言
  • 一. Epoll的工作模式
  • 二. Reactor 服务器
    • 2.1 对网络套接字进行封装
    • 2.2 对Epoll接口进行封装
    • 2.3 设计一个管理连接的类
    • 2.4 设计 Reactor服务器 类
    • 2.5 将文件描述符设置为非阻塞
    • 2.6 所有文件描述符的处理方法
      • 2.6.1 普通文件描述符的处理方法
      • 2.6.2 套接字的处理方法
    • 2.7 初始化服务器
    • 2.8 进行任务派发
  • 三. 补充
    • 3.1 实现在线计算器
    • 3.2 引入线程池

前言

在高并发成为系统标配的今天,网络编程、中间件开发、分布式通信等场景中,“如何高效处理海量 IO 请求” 始终是开发者绕不开的核心命题。传统 “一连接一线程” 的同步阻塞模型,早已因线程资源耗尽、CPU 上下文切换频繁、内存占用过高等问题,难以应对万级甚至十万级的并发连接;即便引入线程池优化,也无法从根本上解决 “等待 IO 时线程闲置” 的资源浪费困境。​

正是在这样的需求下,基于 “事件驱动” 与 “IO 多路转接” 的 Reactor 模式应运而生 —— 它以 “少量线程监听多 IO、事件触发业务处理” 的核心逻辑,成为解决高并发 IO 的经典架构:小到 Netty 的网络通信内核、Redis 的事件循环,大到 Nginx 的请求处理框架、Kafka 的消息接收模块,其底层都能看到 Reactor 模式的影子。可以说,理解 Reactor 模式的实现逻辑,是掌握高并发系统设计的 “关键钥匙”。​

本文正是围绕 “Reactor 模式实现” 展开:不局限于抽象原理,而是从底层技术依赖(IO 多路转接调用)切入,一步步拆解事件循环的构建、组件间的协作逻辑,手把手帮助你构建出一个基于Reactor 模式的服务器。

一. Epoll的工作模式

Epoll有两种工作模式:水平触发(Level Triggered,简称 LT)边缘触发(Edge Triggered,简称 ET)。这两种模式的核心差异在于 “何时通知应用程序某个文件描述符(fd)就绪”,直接影响高并发 IO 处理的效率和编程复杂度。

  1. 水平触发(LT):默认模式,“状态持续” 触发:

当一个文件描述符(如 socket)处于就绪状态(例如:有数据可读、可写,或发生异常)时,epoll 会持续通知应用程序,直到该就绪状态被 “消除”(例如:数据被完全读取、缓冲区被写满)。

  1. 边缘触发(ET):“状态变化” 触发,高效但复杂:

epoll 仅在文件描述符的就绪状态发生 “变化瞬间” 通知一次,之后无论该状态是否持续,都不再通知。,即只有在读写资源从没就绪多就绪的时候才会进行通知。

我们通常会认为ET模式的效率更高:

  • ET当资源就绪的时候只会通知一次,并不需要反复通知。并且如果上层没有将数据读取完毕,也不会再进行通知了;
  • 因为ET模式只会进行通知一次,因此其==会倒逼着上层在进行读取时要将数据一次全部取完,这样就可以空出一个更大的接收缓冲区,对方也可以发送更多的。

二. Reactor 服务器

一下我们开始进行基于 Reactor 模式设计的高性能网络服务器,通过 “事件驱动” 和 “IO 多路转接” 技术,高效处理海量并发连接。

2.1 对网络套接字进行封装

关于网络套接字可以查看,我之前写的关于TCP的文章,改内容并不是本文的重点,所以此处直接贴实现代码了:

const std::string defaultip_ = "0.0.0.0";
enum SockErr
{SOCKET_Err, BIND_Err,
};class Sock
{
public:Sock(uint16_t port): port_(port),listensockfd_(-1){}void Socket(){listensockfd_ = socket(AF_INET, SOCK_STREAM, 0);if (listensockfd_ < 0){Log(Fatal) << "socket fail";exit(SOCKET_Err);}Log(Info) << "socket sucess";}void Bind(){struct sockaddr_in server;server.sin_family = AF_INET;server.sin_port = htons(port_);inet_pton(AF_INET, defaultip_.c_str(), &server.sin_addr);if (bind(listensockfd_, (struct sockaddr *)&server, sizeof(server)) < 0){Log(Fatal) << "bind fail";exit(BIND_Err);}Log(Info) << "bind sucess";}void Listen(){if (listen(listensockfd_, 10) < 0){Log(Warning) << "listen fail";}Log(Info) << "listen sucess";}int Accept(){struct sockaddr_in client;socklen_t len = sizeof(client);int fd = accept(listensockfd_ , (sockaddr*)&client , &len);return fd;}int Accept(std::string& ip , uint16_t& port){struct sockaddr_in client;socklen_t len = sizeof(client);int fd = accept(listensockfd_ , (sockaddr*)&client , &len);port = ntohs(client.sin_port);char bufferip[64];inet_ntop(AF_INET , &client.sin_addr , bufferip , sizeof(bufferip) - 1);ip = bufferip;return fd;}int Get_fd(){return listensockfd_;}~Sock(){close(listensockfd_);}private:uint16_t port_;int listensockfd_;
};

2.2 对Epoll接口进行封装

关于Epoll具体的细节,可以查看之前关于关于Epoll的文章,此处我们直接对封装的接口进行使用:

enum EpollErr
{CREAR_Err,
};class Epoll
{
public:Epoll(){// 创建epoll模型_epfd = epoll_create(1);if (_epfd < 0){Log(Fatal) << "epoll_create fail";exit(CREAR_Err);}Log(Info) << "epoll create sucess ";}void Add_fd(int fd, uint32_t event){// 添加文件描述符到红黑树中struct epoll_event epevt;epevt.events = event;epevt.data.fd = fd;if (epoll_ctl(_epfd, EPOLL_CTL_ADD , fd, &epevt) < 0){Log(Warning) << "epoll add error : " << strerror(errno);}Log(Info) << "epoll add sucess , fd : " << fd ;}void Del_fd(int fd){// 删除要进行等待的文件描述符if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr) < 0){Log(Warning) << "epoll del error : " << strerror(errno);}Log(Info) << "epoll del sucess  , fd : " << fd;}void Mod_fd(int fd, uint32_t event){// 对文件描述符的事件进行修改struct epoll_event epevt;epevt.events = event;epevt.data.fd = fd;if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &epevt) < 0){Log(Warning) << "epoll mod error : " << strerror(errno);}}int Wait(struct epoll_event *ep_array, int max_size, int timeout){// 进行等待return epoll_wait(_epfd, ep_array, max_size, timeout);}private:int _epfd;
};

2.3 设计一个管理连接的类

因为TCP通信传递的是字节流,因此我们无法确定每次获取到的数据是一个有效报文,因此我们需要将所有获取到的数据都先存储起来:

  1. 我们需要一个整形,存储连接对应的文件描述符;
  2. 需要两个缓冲区:输入缓冲区和输出缓冲区;
  3. 当然为将代码的耦合性尽量降低一些,此处我们将不同文件描述符处理读写以及异常事件的方法也放到Connection类中
    这些方法的参数统一都设置为:std::shared_ptr<Connection>来保证当跳转到外界去进行代码的执行时,依旧可以拿到文件描述符的相关资源。
class Connection;
using func_t = std::function<void(std::shared_ptr<Connection>)>;class Connection
{
public:Connection(int fd , func_t recv , func_t sender , func_t exception):_fd(fd) , _Recv(recv) , _Sender(sender) , _Exception(exception){}
private:int _fd;                 // 对应的文件描述符std::string _inbuffer ;  // 输入缓冲区std::string _outbuffer;  // 输出缓冲区
public:func_t _Recv;             // 处理接收的逻辑func_t _Sender;           // 处理发送的逻辑func_t _Exception;        // 处理出现异常时的逻辑
};

在该类中,毫无疑问我们在后续需要先缓冲区中进行读写操作:

    std::string& Get_Inbuffer(){return _inbuffer;}std::string& Get_Outbuffer(){return _outbuffer;}void Add_In(const std::string& mes){_inbuffer += mes;}void Add_Out(const std::string& mes){_outbuffer += mes;}int Get_fd(){return _fd;}

可能后续还需要使用一些操作,在后面再进行补充。

2.4 设计 Reactor服务器 类

  1. 需要一个Sock对象来从网路中获取客户端的连接;
  2. 需要一个Epoll对象来使用epoll多路转接的接口;
  3. 使用一个哈希表来存储每一个文件描述符与之对应的Connection资源,方便我们后面获取一个文件描述符的输入缓冲区和输出缓冲区;
  4. 还需要一个缓冲区,负责接收epoll模型等待结束后返回的就绪队列中的文件描述符信息。
class Rserver
{static const int array_num_max = 1024;
public:Rserver(uint16_t port):_sock_ptr(new Sock(port)) , _epoll_ptr(new Epoll){}private:std::shared_ptr<Sock> _sock_ptr;std::shared_ptr<Epoll> _epoll_ptr;std::unordered_map<int , std::shared_ptr<Connection> > _connections;struct epoll_event _epl_array[array_num_max];
};

2.5 将文件描述符设置为非阻塞

在ET模式下,我们要保证一次将所有的资源都获取上来,因此我们需要while式的对资源进行读取,这就使得如果没有资源了我们也不能让其堵塞住,因此要将所有文件描述符设置为非阻塞状态

此时使用int fcntl(int fd, int op , ... )接口进行设置:

int SetNoBlock(int fd)
{int fl = fcntl(fd, F_GETFL);fl |= O_NONBLOCK;int n = fcntl(fd, F_SETFL, fl); return n;
}

2.6 所有文件描述符的处理方法

2.6.1 普通文件描述符的处理方法

首先就是普通文件的接收方法:

  1. 将缓冲区中的数据全部读取到connection中;
  2. 调用外界函数判断是否含有一个完成的报文;
  3. 含有完整报文就进行处理。

对于第二步,我们可以先外界开放一个接口,让外界将数据进行处理,将处理好的数据再给我,由服务器进行发送,因此我们在服务端的类中添加一个成员,负责回调

using callback_func = std::function<std::string(std::shared_ptr<Connection>)>;class Rserver
{static const int array_num_max = 1024;
public:Rserver(uint16_t port , callback_func Onmessage):_sock_ptr(new Sock(port)) , _epoll_ptr(new Epoll) , _Onmessage(Onmessage){}private:std::shared_ptr<Sock> _sock_ptr;std::shared_ptr<Epoll> _epoll_ptr;std::unordered_map<int , std::shared_ptr<Connection> > _connections;struct epoll_event _epl_array[array_num_max];callback_func _Onmessage;           // 负责回调
};

关于普通文件描述符的接受问题,需要注意的就是read的不同返回值进行不同的处理:

    void Recv(std::shared_ptr<Connection> con_ptr){// 1. 将缓冲区中的数据全部读取到Connection中// 2. 调用外界函数判断是否含有一个完成的报文// 3. 先客户端返回结果char inbuffer[1024];while(1){int n = read(con_ptr->Get_fd() , inbuffer , sizeof(inbuffer) - 1);if(n > 0){// 有数据inbuffer[n] = 0;con_ptr->Add_In(inbuffer);}else if(n == 0){// 对方关闭了文件 , 断开连接了 // 1. 将文件描述符从epoll模型中移除// 2. 将文件描述符从哈希表中移除// 3. 将文件描述符关闭int fd = con_ptr->Get_fd();_epoll_ptr->Del_fd(fd);_connections.erase(fd);close(fd);return;}else{// 此次有两种情况: 1. 数据读取完了   2. 读取出错了if(errno == EAGAIN)  // 读取完了{break;     } else                 // 出错了{// 此处调用文件对应的异常处理con_ptr->_Exception(con_ptr);return;}}}std::string ret = _Onmessage(con_ptr);con_ptr->Add_Out(ret);}

接下来就是编写发送的接口:

思考:对于发送接口是否需要判断,写事件是否就绪???

在大多数时候,写事件都是就绪的;因此如果将其加入到判断中epoll_wait就会频繁的进行返回,会影响效率;所以一般不对写事件加入到等待中,除非写缓冲区满了,此时才将写加入到等待中

  • 在代码中表现为:在调用write接口的时候,实际写入的大小比我字符串要小。
 void Sender(std::shared_ptr<Connection> con_ptr){// 进行数据的发送// 直接进行发送std::string& outbuffer = con_ptr->Get_Outbuffer();int fd = con_ptr->Get_fd();// 循环式的进行发送while(1){int n = write(fd , outbuffer.c_str() , outbuffer.size());if(n > 0){// 1. 将已经发送的数据从字符串中移除outbuffer.erase(0 , n);if(outbuffer.empty()) break;           // 已经写完了}else if(n == 0){break;}else{if(errno == EAGAIN)        // 已经写完了break;else                       // 出错了{// 此处调用文件对应的异常处理con_ptr->_Exception(con_ptr);return;}}}// 判断发送缓冲区中是否还有数据if(!outbuffer.empty()){// 发送缓冲区满了_epoll_ptr->Mod_fd(fd , EPOLLIN | EPOLLOUT | EPOLLET);}else{// 缓冲区没满 , 不需要对写事件进行检测_epoll_ptr->Mod_fd(fd , EPOLLIN | EPOLLET);}}

最后一步就是对异常情况的处理了:

  1. 打印日志信息;
  2. 将文件描述符从epoll模型从移除;
  3. 将文件描述符从哈希表中移除;
  4. 关闭文件描述符。
void Exception(std::shared_ptr<Connection> con_ptr){int fd = con_ptr->Get_fd();_epoll_ptr->Del_fd(fd);_connections.erase(fd);close(fd);}

2.6.2 套接字的处理方法

对于套接字来说,只需要负责将建立好的链接拿上来就行了,不需要进行写入和异常处理。

在创建为新的文件描述符创建Connection对象的是时候,我们需要传入可执行对象,但是我们在进行统一接口的时候参数都是std::shared_ptr<Connection>,并且上述的Recv,Sender,Expection都是类成员函数,都有一个隐含的参数this指针,所以对于可调用对象在进行传参的是否要使用bind进行绑定

    void Accept(std::shared_ptr<Connection> con_ptr){// 1. 获取文件描述符while (1){int newfd = _sock_ptr->Accept();if (newfd >= 0){// 有新连接// 2. 将文件描述符设置为非阻塞// 3. 将文件加入到epoll模型中// 4. 将文件描述符加入到哈希表中if(SetNoBlock(newfd) < 0){Log(Warning) << "set no block fail";continue;}_epoll_ptr->Add_fd(newfd , EPOLLIN | EPOLLET);std::shared_ptr<Connection> con_ptr(new Connection(newfd ,std::bind(&Rserver::Recv , this , std::placeholders::_1), std::bind(&Rserver::Sender , this , std::placeholders::_1), std::bind(&Rserver::Exception , this , std::placeholders::_1)));_connections.emplace(newfd , con_ptr);}else{if(errno == EAGAIN) break;else{// 出错了Log(Warning) << "accept fail";}}}}

2.7 初始化服务器

  1. 创建套接字;
  2. 进行绑定;
  3. 设置监听模式;
  4. 将网络套接字加入到epoll模型中,并创建connection加入到_connections中进行管理;
  5. 在创建Connection对象的时候,我们还需要设计一个套接字的Recv方法.

关于建立好的

    void Init(){// 1. 创建套接字// 2. 进行绑定// 3. 设置监听模式// 4. 将网络套接字加入到epoll模型中,并创建Connection加入到_connections中进行管理_sock_ptr->Socket();_sock_ptr->Bind();_sock_ptr->Listen();int listensock = _sock_ptr->Get_fd();SetNoBlock(listensock);_epoll_ptr->Add_fd(listensock , EPOLLIN | EPOLLET);std::shared_ptr<Connection> conptr(new Connection(listensock, std::bind(&Rserver::Accept , this , std::placeholders::_1), nullptr, nullptr));_connections.emplace(listensock, conptr);// 将IP和端口号设置为可复用的int opt = 1;setsockopt(listensock , SOL_SOCKET , SO_REUSEADDR | SO_REUSEPORT , &opt , sizeof(opt));}

2.8 进行任务派发

因为我们之前已经将每个文件描述符对应的处理方法加入到了Connection对象中了,因此直接进行调用即可。

在进行任务派发的时候有一个细节:可以将异常处理嫁接到读写事件中的异常处理,这样就不需要再单独对异常进行处理了。

    void Dispatcher(int n){for (int i = 0; i < n; i++){int fd = _epl_array[i].data.fd;short events = _epl_array[i].events;auto &con_ptr = _connections[fd];// 将异常处理, 转化为读写处理if (events & EPOLLERR){events |= (EPOLLIN | EPOLLOUT);}if (_connections.count(fd) && con_ptr->_Recv){con_ptr->_Recv(con_ptr);}if (_connections.count(fd) && con_ptr->_Sender){con_ptr->_Sender(con_ptr);}}}```## 服务器的主循环服务器的主循环就比较简单了,直接进行`epoll_wait`即可,将操作系统中的就绪队列拿到:```cppvoid Run(){while (1){int n = _epoll_ptr->Wait(_epl_array, array_num_max, -1);if (n > 0){Dispatcher(n);}else if (n == 0){Log(Info) << "no message";}else{Log(Warning) << "epoll wait fail";}}}

以上就是整个服务器的实现过程了,下面我们对服务器接入一下事件,让服务器能够处理一些业务。

三. 补充

3.1 实现在线计算器

此处我们引入之前:手动私下序列化和换序列化的代码,来实现一个手动计算器:

std::string Onmessge(std::shared_ptr<Connection> con_ptr)
{static Calculator cal;std::string& inbuffer = con_ptr->Get_Inbuffer();  std::string ret = cal(inbuffer);  // 对请求进行处理 , 返回一个序列化后的字符串return ret;
}

3.2 引入线程池

对于引入线程池,此代码就需要进行重构了,在Connection对象中我们需要存储一个Server的回指指针,但是此处不能直接使用shared_ptr<>否则会出现循环引用,因此要采用weak_ptr来实现。

但是注意:我们是在类的成员函数中使用其this指针来构建一个sharead_ptr,从而初始化weak_ptr

如果在类的成员函数中,直接通过 this 指针创建新的 shared_ptr,会导致两个独立的 shared_ptr 管理同一个对象,但它们的引用计数是分开的:

  • 原有的 shared_ptr (创建服务器时候的)计数减到 0 时,会释放对象;
  • 新创建的 shared_ptr (this指针创建的)计数减到 0 时,会再次尝试释放已被销毁的对象,导致双重释放(double free) 或未定义行为。

此处我们需要使用enable_shared_from_this<T>继承来进行解决:

  • 当类 T 继承 enable_shared_from_this<T> 后,该类会隐式包含一个 weak_ptr<T> 成员(内部维护)。当 T 的对象被 shared_ptr 管理时,这个 weak_ptr 会与管理该对象的 shared_ptr 共享控制块(记录引用计数的结构)。

此时,通过调用 shared_from_this() 方法,可返回一个指向自身的 shared_ptr<T>,这个新的 shared_ptr 会复用原有的引用计数,避免双重释放。

服务器类定义:

class Rserver : public std::enable_shared_from_this<Loop_Epollserver>
{
public:// ......
};

Connection类中增加一个成员:weak_ptr<Rserver> _loop_svr.

对于创建Connection对象部分也要进行修改:

std::shared_ptr<Connection> conptr(new Connection(listensock,shared_from_this(),std::bind(&Rserver::Accept, this, std::placeholders::_1),nullptr, nullptr));

在第二个实参中,传入this指针来构建Connection中的weak_ptr

关于线程池部分的代码因为文章篇幅就不再叙述了,大家可以自己试试写一下。

http://www.dtcms.com/a/515260.html

相关文章:

  • 企业网站建设毕业设计论文龙岗网站建设开发设计公司
  • 力扣热题100道之238除自身以外数组的乘积
  • 把AI装进OS、批量落地智慧服务,智能手机革命2.0来了
  • 防爆手机可以通过普通智能手机改装吗?
  • 宇树H2仿生机器人登场
  • 语音识别技术之FireRedASR
  • 有什么指标可以判断手机是否降频
  • 禾赛科技与广和通战略合作,联合推出机器人解决方案加速具身智能商业化落地
  • 石家庄网站开发哪家好企业网站建设网站模板
  • Z3 Technology-适用于无人机和机器人的 4K 高清摄像机和视频编码器
  • 济南天桥区做网站的怎样做app推广
  • 建设河南网站外贸网站建设盲区
  • 运维逆袭志·第4期 | 安全风暴的绝地反击 :从告警地狱到智能防护
  • Java-集合求差集,如果B集合中的id在A集合中存在就移除,如果不在就返回A集合
  • 微服务即时通讯系统——整体架构和组件(1)
  • WPF入门
  • WPF布局控件(界面骨架核心)
  • WPF 常用样式属性及示例笔记
  • 【WPF】自定义颜色拾取器
  • MahApps.Metro WPF 开发使用过程中遇到的问题 - 未能加载文件或程序集“Microsoft.Xaml.Behaviors,
  • 【普中Hi3861开发攻略--基于鸿蒙OS】-- 第 26 章 WIFI实验-AP 建立网络
  • ARM架构深度解析:ARMv7、ARMv8、ARMv9的技术演进、芯片实现与未来展望
  • 线下剧本杀预约小程序核心功能玩法解析:轻量化载体重构娱乐消费生态
  • 【矩阵分析与应用】【第8章 特征分析】【8.3 凯莱-哈密顿定理求解矩阵高次幂详解】
  • 合肥制作企业网站免费收录网站推广
  • 阿里云安装docker-compose
  • Centos 7 :VMware Tools 启动脚本未能在虚拟机中成功运行
  • 基于vue的停车场管理系统
  • 短剧小程序系统开发:开启影视娱乐新纪元
  • 系统架构设计师备考第49天——数字孪生体云计算大数据技术