【计算机网络】非阻塞IO——epoll 编程与ET模式详解——(easy)高并发网络服务器设计
🔥个人主页🔥:孤寂大仙V
🌈收录专栏🌈:计算机网络
🌹往期回顾🌹:【计算机网络】非阻塞IO——poll实现多路转接
🔖流水不争,争的是滔滔不息
- 一、epoll实现多路转接
- epoll的参数
- **epoll_create**
- **epoll_ctl**
- epoll_wait
- epoll的原理
- epoll的优点
- epoll的工作方式:LT(水平触发)与ET(边缘触发)
- 二、代码简单实现epoll高并发服务器
- 构造函数 注册事件
- 启动服务,阻塞等待事件
- 事件派发(对不同就绪事件分类处理新连接 or 客户端发消息)
- 接收新连接并将其注册到 epoll
- 接收数据并输出
- 三、源码:[epoll实现非阻塞服务器](https://gitee.com/hanbuxuan/linux-warehouse/tree/master/Project/Network/Non-blocking%20IO/EPoll/epoll1)
一、epoll实现多路转接
epoll是Linux内核提供的一种高性能I/O事件通知机制,用于处理大量文件描述符的I/O事件。它是对传统select/poll模型的改进,避免了线性扫描文件描述符的开销,适合高并发场景。
epoll的参数
epoll_create
#include <sys/epoll.h>int epoll_create(int size);
用于创建 epoll 实例,返回一个 epoll 文件描述符(fd),用于后续的事件管理。
size:一个提示参数,建议内核预期的文件描述符数量。作用:告诉内核大致需要管理的 fd 数量,帮助内核分配初始内存。
返回值:
成功:返回 epoll 文件描述符(>= 0),如你的 _epfd。
失败:返回 -1,并设置 errno。
epoll_ctl
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll_ctl 用于管理 epoll 实例中的文件描述符(添加、修改、删除)。
-
epfd:
epoll 实例的文件描述符,由 epoll_create 或 epoll_create1 返回。 -
op:
操作类型,控制对 fd 的操作:
EPOLL_CTL_ADD:将 fd 添加到 epoll 实例的红黑树,注册关注的事件。
EPOLL_CTL_MOD:修改 fd 的事件或用户数据。
EPOLL_CTL_DEL:从红黑树中移除 fd。 -
fd:
要操作的文件描述符。 -
event
指向 struct epoll_event 的指针,定义 fd 的事件和用户数据。
结构定义
struct epoll_event {uint32_t events; // 事件类型epoll_data_t data; // 用户数据
};
union epoll_data_t {void *ptr; // 指针int fd; // 文件描述符uint32_t u32; // 32 位整数uint64_t u64; // 64 位整数
};
- events 可选值(可组合):
EPOLLIN:fd 可读(例如 socket 有数据或新连接)。
EPOLLOUT:fd 可写。
EPOLLERR:fd 发生错误。
EPOLLHUP:fd 被挂起(例如 socket 关闭)。
EPOLLET:启用边沿触发(ET)模式(默认是水平触发 LT)。
EPOLLONESHOT:事件只触发一次,需重新注册。 - 返回值:
成功:返回 0。
失败:返回 -1,并设置 errno
epoll_wait
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
epoll_wait 等待 epoll 实例中的就绪事件,并将就绪事件复制到用户空间。
-
epfd:
epoll 实例的文件描述符。 -
events:
指向 struct epoll_event 数组的指针,用于存储内核返回的就绪事件。
内核将就绪队列中的事件复制到这个数组,每个元素包含:
events:触发的事件类型(如 EPOLLIN、EPOLLERR)。
data:用户数据(如 data.fd)。 -
maxevents:
数组 events 能容纳的最大事件数,必须大于 0。 -
timeout:
等待的超时时间(毫秒):
-1:无限等待(阻塞直到有事件)。
0:立即返回(非阻塞)。
0:等待指定毫秒数。 -
返回值
成功:返回就绪事件数量(0 表示超时,> 0 表示有事件)。
失败:返回 -1,并设置 errno
epoll的原理
epoll_create 会创建红黑树和就绪队列。
红黑树:用于存储所有通过 epoll_ctl 注册的 fd 及其事件。
就绪队列:用于存储已经就绪的事件,供 epoll_wait 获取。
红黑树
- 作用:红黑树是一个高效的平衡二叉搜索树,用于存储和管理所有被监控的 fd 及其关联的事件。
- 初始化:在调用 epoll_create 时,内核会为 epoll 实例分配一个红黑树,初始为空。
- 用途:通过 epoll_ctl(EPOLL_CTL_ADD) 添加 fd 时,fd 和事件信息(struct epoll_event)被插入红黑树,内核通过红黑树高效管理(插入、删除、查找复杂度为 O(log n))。
红黑树的本质是用户告诉内核,你要帮我关心哪个一个fd,哪一个事件。
就绪队列
- 作用:就绪队列是一个双向链表,用于存储已经触发的事件,即 fd 上有用户关注的事件,如新连接或数据可读。
- 初始化:epoll_create 初始化一个空的就绪队列,等待事件触发时由内核填充。
- 用途:当 fd 上的事件触发(例如 socket 收到数据),内核通过回调函数将该 fd 的事件加入就绪队列,epoll_wait 直接从这个队列获取就绪事件。
就绪队列的本质是内核告诉用户,哪一个fd上面的哪些事件已经就绪了。
在这个就绪队列中,检测是否有fd就绪事件复杂度是O(1),获取就绪事件的事件复杂度是O(n)。
红黑树中,添加fd和事件时内核会为该 fd 注册一个回调函数,但是此时不会将fd加入就绪队列。当事件触发(listen 套接字上有新连接到来)时,回调函数才会将fd事件加入就绪队列。
[epoll_ctl 添加 fd 和事件]↓[红黑树维护 fd 状态]↓
[事件真正发生 → 回调触发 → 加入就绪队列]↓[epoll_wait 获取就绪事件处理]
这时候我们发现,epoll_create的返回值怎么是一个fd,它不应该是一个红黑树吗,为什么叫fd?
这其实是对内核中 epoll 实现机制的一种描述方式,不是说你创建的红黑树叫“fd”,而是epoll_ctl调用时,内核会把你要监控的 fd 加入到一棵红黑树结构中维护。这棵红黑树的作用是:快速查找/修改/删除 被监控的 fd 对应的事件信息。
系统就会把 fd 作为一个 key,加进 epoll 实例内部维护的这棵红黑树中。它不是真正“叫 fd 的红黑树”,而是以 fd 为索引的红黑树,用于管理大量的监听对象。
就绪队列,就像是基于事件就绪的生产者消费者模型,epoll接口是线程安全的。
epoll的优点
- 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效.,不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开。
- 数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而 select/poll 都是每次循环都要进行拷贝)。
- 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符。
- 结构加入到就绪队列中, epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度 O(1). 即使文件描述符数目很多, 效率也不会受到影响。
- 没有数量限制: 文件描述符数目无上限。
epoll的工作方式:LT(水平触发)与ET(边缘触发)
LT水平触发
只要文件描述符(fd)处于就绪状态(如可读或可写),就会持续通知用户程序,直到事件被处理。
ET边缘触发
其核心特点是仅在状态变化(如从无数据到有数据)时触发一次事件通知,后续状态不变则不再重复通知。
水平触发(LT)只要条件满足(如缓冲区有数据),持续触发事件。边缘触发(ET)仅在条件变化(如空缓冲区变为非空)时触发一次,需一次性处理完所有数据。
LT:默认、友好、容错
- 只要内核检测到 fd 是就绪的,就会反复通知(哪怕你没一次性把数据读完)。
- 哪怕你处理得不彻底,它下次还会提醒你:“喂,还有数据没处理呢!”
假设你一次 recv() 只读了一半数据,LT 会继续告诉你这个 fd 可读,你下次还可以继续读。
ET:极致、狠、效率高
- 只有状态从“无”到“有”的那一刻才会通知你。
- -如果你没一次性处理干净,下次就永远不会再通知你,你就错过了数据,程序直接出 BUG!
内核有新数据时,调用一次 recv() 读了一点,但还有数据没读完。如果你这次没把数据都读光,下次 ET 不会再通知你这个 fd 可读了!
ET 模式本质上就是一种“倒逼程序员认真写代码、处理彻底”的机制。 让你不能偷懒!你必须写出健壮、完整、循环处理的逻辑,否则你就会错过数据、错过连接、错过人生(不是)。勇敢面对ET的“倒逼”,写出不漏不炸的代码。
那为啥要用 ET 呢?
因为它效率高!通知次数少,系统开销更低,适合高并发场景。常见于epoll + 非阻塞IO + ET 模式,配合使用。
LT 更像是老妈提醒你“吃饭了”,没吃她还会再喊;ET 更像是高铁广播,错过了就完蛋。
用ET模式,要把文件描述符设置为非阻塞,读数据要一直读,写数据要一直写。
为什么ET模式文件描述符要设置为非阻塞?
在ET模式下,epoll通知一次后,程序必须尽可能读取或写入所有可用数据,直到无数据可读或无空间可写为止。如果文件描述符是阻塞的,第一次读/写操作可能会因为缓冲区满或空而阻塞,导致程序无法继续处理其他事件,降低效率。
非阻塞模式允许程序在读/写操作返回EAGAIN或EWOULDBLOCK(表示暂时无数据或无法写入)时立即返回,从而继续处理其他事件或逻辑。
如果不设置为非阻塞,程序可能卡在某个文件描述符的读/写操作上,无法及时响应其他文件描述符的事件,这与ET模式高效、快速处理的设计初衷相悖。
示例场景: 假设一个socket上有100字节数据到达,ET模式只会通知一次。如果文件描述符是阻塞的,程序可能只读取部分数据(比如50字节)后阻塞,无法处理其他事件。而非阻塞模式下,程序可以反复读取直到返回EAGAIN,确保处理完所有数据。
为什么读数据要一直读,写数据要一直写?
在ET模式下,epoll只在事件状态发生变化时触发一次通知,因此程序必须在这一次通知中处理完所有可用的数据或完成所有可能的写操作,否则可能错过后续数据或导致数据堆积。
读取数据时的while循环:
当epoll通知可读事件时,程序需要反复调用read或recv函数,直到读取到EAGAIN或EWOULDBLOCK,表示当前没有更多数据可读。如果不使用while循环,只读取一次,可能只读到部分数据(例如,缓冲区一次只能读1024字节,但实际有4096字节数据)。后续数据不会再次触发epoll事件,导致数据丢失或处理不完整。
while (1) {ssize_t count = read(fd, buffer, sizeof(buffer));if (count == -1) {if (errno == EAGAIN || errno == EWOULDBLOCK) {// 缓冲区已读完break;}// 其他错误处理perror("read");break;} else if (count == 0) {// 对端关闭连接break;}// 处理读取到的count字节数据
}
写入数据时的while循环:
类似地,当epoll通知可写事件时,程序需要反复调用write或send函数,直到所有数据写入完成或返回EAGAIN(表示发送缓冲区已满)。
如果不使用while循环,只写入一次,可能只写出部分数据(例如,发送缓冲区一次只接受1024字节,但实际要发送4096字节)。后续写入可能不会再次触发epoll事件,导致数据发送不完整。
while (data_to_write > 0) {ssize_t count = write(fd, buffer, data_to_write);if (count == -1) {if (errno == EAGAIN || errno == EWOULDBLOCK) {// 缓冲区满,等待下次可写事件break;}// 其他错误处理perror("write");break;}data_to_write -= count;buffer += count;
}
ET模式的特性要求:
ET模式假定程序在收到事件通知后会“榨干”文件描述符的处理能力(读完所有可读数据或写完所有可写数据)。如果不通过while循环处理,可能会导致数据丢失或写操作不完整,因为后续事件可能不会再次触发。相比之下,LT模式会持续触发事件,直到条件不再满足(例如,缓冲区无数据或可写),因此LT模式对while循环的依赖较低,但效率也低于ET模式。
二、代码简单实现epoll高并发服务器
#pragma once
#include "Common.hpp"
#include "Socket.hpp"
#include "Log.hpp"
#include <sys/epoll.h>using namespace std;
using namespace LogModule;
using namespace SocketModule;class epollserver
{
const static int size = 4096;
const static int defaulted = -1;
public:epollserver(int port):_listensockfd(make_unique<TcpSocket>()),_isrunning(false),_epfd(defaulted){_listensockfd->BuildTcpSocketServer(port); //构造TCP服务器_epfd=epoll_create(256); //创建epoll模型if(_epfd<0){LOG(LogLevel::FATAL)<<"epoll_create error";exit(EPOLL_CREATE_ERR);}LOG(LogLevel::INFO)<<"epoll_create success";epoll_event ev; //创建对象ev.events=EPOLLIN;//关心读事件ev.data.fd=_listensockfd->FD();//listen套接字int n=epoll_ctl(_epfd,EPOLL_CTL_ADD,_listensockfd->FD(),&ev); //设置进内核关系读事件if(n<0){LOG(LogLevel::FATAL)<<"epoll_ctl error";exit(EPOLL_CTL_ERR);}LOG(LogLevel::INFO)<<"epoll_ctl listen_sockfd success";}void Start() //启动服务器{int timeout=-1;_isrunning=true;while(_isrunning){int n=epoll_wait(_epfd,_revs,size,timeout); //n就是等待队列里有几个fd就绪了switch (n){case -1:LOG(LogLevel::FATAL)<<"epoll error";break;case 0:LOG(LogLevel::DEBUG)<<"epoll timeout";break;default:LOG(LogLevel::INFO)<<"读事件就绪";Distribute(n);//事件派发break;}}}void Distribute(int rnum){for(int i=0;i<rnum;i++){if(_revs[i].events& EPOLLIN) //读事件就绪{if(_revs[i].data.fd==_listensockfd->FD()){//listen 套接字Accept();}else{//普通 套接字Recv(i);}}}}void Accept(){InetAddr client;int sockfd=_listensockfd->AcceptOrDie(&client);if(sockfd>0){LOG(LogLevel::DEBUG)<<"收到一个客户端连接"<<client.StringAddr();epoll_event ev;ev.events=EPOLLIN;ev.data.fd=sockfd;int n=epoll_ctl(_epfd,EPOLL_CTL_ADD,sockfd,&ev);//把accept的套接字设置进内核if(n<0){LOG(LogLevel::FATAL)<<"epoll_ctl error";exit(EPOLL_CTL_ERR);}LOG(LogLevel::INFO)<<"epoll_ctl accept_sockfd success";}} void Recv(int i){char buffer[1024];int n=recv(_revs[i].data.fd,buffer,sizeof(buffer)-1,0);if(n>0){buffer[n]=0;cout<<"client say->"<<buffer;}else if(n==0) //退出{LOG(LogLevel::WARNING)<<"client quit";int m=epoll_ctl(_epfd,EPOLL_CTL_DEL,_revs[i].data.fd,nullptr);if(m>0){LOG(LogLevel::INFO)<<"epoll_ctl remove sockfd success:"<<_revs[i].data.fd;}close(_revs[i].data.fd);}else //异常{LOG(LogLevel::FATAL)<<"recv error";int m=epoll_ctl(_epfd,EPOLL_CTL_DEL,_revs[i].data.fd,nullptr);if(m>0){LOG(LogLevel::INFO)<<"epoll_ctl remove sockfd success:"<<_revs[i].data.fd;}close(_revs[i].data.fd);}}~epollserver(){ _listensockfd->Close();if (_epfd > 0)close(_epfd);}
private:unique_ptr<Socket> _listensockfd;struct epoll_event _revs[size];bool _isrunning;int _epfd;
};
构造函数: 创建 socket、绑定监听、创建 epoll 实例、注册 listen 套接字读事件
Start(): 进入事件循环,使用 epoll_wait 阻塞等待事件
Distribute(): 对每个就绪事件分类处理(新连接 or 客户端发消息)
Accept(): 接收新连接并将其注册到 epoll
Recv(): 接收数据并输出 / 清理关闭连接
析构函数:清理监听套接字 + epoll fd
构造函数 注册事件
//私有成员变量
private:unique_ptr<Socket> _listensockfd;struct epoll_event _revs[size];bool _isrunning;int _epfd;
//构造epollserver(int port):_listensockfd(make_unique<TcpSocket>()),_isrunning(false),_epfd(defaulted){_listensockfd->BuildTcpSocketServer(port); //构造TCP服务器_epfd=epoll_create(256); //创建epoll模型if(_epfd<0){LOG(LogLevel::FATAL)<<"epoll_create error";exit(EPOLL_CREATE_ERR);}LOG(LogLevel::INFO)<<"epoll_create success";epoll_event ev; //创建对象ev.events=EPOLLIN;//关心读事件ev.data.fd=_listensockfd->FD();//listen套接字int n=epoll_ctl(_epfd,EPOLL_CTL_ADD,_listensockfd->FD(),&ev); //设置进内核关系读事件if(n<0){LOG(LogLevel::FATAL)<<"epoll_ctl error";exit(EPOLL_CTL_ERR);}LOG(LogLevel::INFO)<<"epoll_ctl listen_sockfd success";}
通过之前文章的模板方法模式的Tcp服务器,父类托管子类构造TCP服务器。通过epoll_create
创建epoll模型(红黑树和就绪队列)返回这个epoll_create
的套接字_epfd。接下来就用epoll_ctl
把关系的fd和关心的事件设置进内核,epoll_ctl
的参数第一个参数是epoll模型的fd、第二个参数EPOLL_CTL_ADD把 fd 添加到 epoll 实例的红黑树注册关注的事件、第三个参数是把要关注的fd(要监听哪个fd)添加进去、第四个参数设置关心的事件(是读事件还是写事件)和当这个事件被触发的时候返回的epoll_event中你能知道“那个fd触发了”是内核返回给用户的。
启动服务,阻塞等待事件
void Start() //启动服务器{int timeout=-1;_isrunning=true;while(_isrunning){int n=epoll_wait(_epfd,_revs,size,timeout); //n就是等待队列里有几个fd就绪了switch (n){case -1:LOG(LogLevel::FATAL)<<"epoll error";break;case 0:LOG(LogLevel::DEBUG)<<"epoll timeout";break;default:LOG(LogLevel::INFO)<<"读事件就绪";Distribute(n);//事件派发break;}}}
epoll_wait
方法 就是看就绪队列中有没有就绪事件。epoll_wait
的返回值就是有多少个事件就绪。对返回值进行判断n>0是读事件就绪进行事件派发。
事件派发(对不同就绪事件分类处理新连接 or 客户端发消息)
void Distribute(int rnum){for(int i=0;i<rnum;i++){if(_revs[i].events& EPOLLIN) //读事件就绪{if(_revs[i].data.fd==_listensockfd->FD()){//listen 套接字Accept();}else{//普通 套接字Recv(i);}}}}
传过来的参数是epoll_wait
的返回值,表示有几个事件就绪(epoll 的就绪队列中的就绪 fd 是顺序排列的)。遍历struct epoll_event _revs[size];
(是就绪队列),判断是否是读事件就绪,然后判断是监听套接字(新连接)还是accept套接字(普通套接字)。
接收新连接并将其注册到 epoll
void Accept(){InetAddr client;int sockfd=_listensockfd->AcceptOrDie(&client);if(sockfd>0){LOG(LogLevel::DEBUG)<<"收到一个客户端连接"<<client.StringAddr();epoll_event ev;ev.events=EPOLLIN;ev.data.fd=sockfd;int n=epoll_ctl(_epfd,EPOLL_CTL_ADD,sockfd,&ev);//把accept的套接字设置进内核if(n<0){LOG(LogLevel::FATAL)<<"epoll_ctl error";exit(EPOLL_CTL_ERR);}LOG(LogLevel::INFO)<<"epoll_ctl accept_sockfd success";}}
接收客户端连接,接着用epoll_ctl
把这个新的套接字设置进内核(epoll)。
接收数据并输出
void Recv(int i){char buffer[1024];int n=recv(_revs[i].data.fd,buffer,sizeof(buffer)-1,0);if(n>0){buffer[n]=0;cout<<"client say->"<<buffer;}else if(n==0) //退出{LOG(LogLevel::WARNING)<<"client quit";int m=epoll_ctl(_epfd,EPOLL_CTL_DEL,_revs[i].data.fd,nullptr);if(m>0){LOG(LogLevel::INFO)<<"epoll_ctl remove sockfd success:"<<_revs[i].data.fd;}close(_revs[i].data.fd);}else //异常{LOG(LogLevel::FATAL)<<"recv error";int m=epoll_ctl(_epfd,EPOLL_CTL_DEL,_revs[i].data.fd,nullptr);if(m>0){LOG(LogLevel::INFO)<<"epoll_ctl remove sockfd success:"<<_revs[i].data.fd;}close(_revs[i].data.fd);}}
接收数据,recv返回值=0是客户端退出返回值<0是出现异常。要对注册到epoll模型中的fd进行清理,然后关闭文件描述符。