【网络编程】IO多路转接——epoll
文章目录
- 1. poll
- 1.1 poll 特性
- 1.2 函数解析
- 1.3 poll的使用
- 2. epoll
- 2.1 select、poll、epoll对比
- 2.2 epoll的使用
- 2.3 epoll函数
- 3. 基于epoll的TCP服务器
- 3.1 伪代码
- 3.2 代码
- 4. epoll 的工作模式
- 4.1 LT模式
- 4.2 ET模式
1. poll
1.1 poll 特性
- 不能跨平台,只能在LInux平台上使用
- select 有 1024 最大并发的上限,poll 可以检测更多数量的文件描述符,与硬件(内存)有关
- poll 的检测方式和 select 一样,是线性检测,效率很低
- poll 的使用方法是从select到epoll的一个过程
- poll 的使用更加直观一些
1.2 函数解析
数组元素:
struct pollfd {int fd; /* file descriptor */ short events; /* requested events */short revents; /* returned events */
};
- fd:要委托内核检测的文件描述符
- events:要检测 fd 的什么事件
- POLLIN:fd 的读缓冲区有数据可读
- POLLOUT:fd 对应的写缓冲区可写
- POLLERR:异常
- 同时检测文件描述符的读写事件:
events = POLLIN|POLLOUT;
- revents:给内核使用的变量,内核将文件描述符实际触发的事件写到这个变量中
#include <poll.h>int poll(struct pollfd *fds, nfds_t nfds, int timeout);
- 功能: 监听多个文件描述符的属性变化
- 参数:
- fds : 要检测的文件描述符的集合,传递
struct pollfd
类型的数组地址,监听的数组的首元素地址 - nfds: 数组有效元素的最大下标+1
- timeout : 超时时长,单位ms
- poll 函数默认是阻塞的,该函数可以检测一系列文件描述符状态
- 没有状态变化,一直阻塞,有状态变化,解除阻塞
- -1 是永久监听
- =0 限时等待
- poll 函数默认是阻塞的,该函数可以检测一系列文件描述符状态
- fds : 要检测的文件描述符的集合,传递
1.3 poll的使用
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <poll.h>int main() {int lfd = socket(AF_INET, SOCK_STREAM, 0);if (lfd == -1) {perror("socket");exit(0);}// 2. 绑定struct sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(8989);addr.sin_addr.s_addr = INADDR_ANY;int ret = bind(lfd, (struct sockaddr*)&addr, sizeof(addr));if (ret == -1) {perror("bind");exit(0);}// 3. 监听ret = listen(lfd, 100);if (ret == -1) {perror("listen");exit(0);}// 4. 等待连接 -> 循环// 检测 -> 读缓冲区,委托内核去处理// 数据初始化,创建自定义的文件描述符struct pollfd fds[1024];// 初始化for (int i = 0; i < 1024; i++) {fds[i].fd = -1;fds[i].events = POLLIN;}fds[0].fd = lfd;int maxfd = 0;while (1) {// 委托内核检测ret = poll(fds, maxfd + 1, -1);if (ret == -1) {perror("select");exit(0);}// 检测的读缓冲区有变化// 有新连接if (fds[0].revents & POLLIN) {// 接收连接请求struct sockaddr_in sockcli;int len = sizeof(sockcli);// 这个accept是不会阻塞的int connfd = accept(lfd, (struct sockaddr*)&sockcli, &len);int i;for (i = 0; i < 1024; i++) {if (fds[i].fd == -1) {fds[i].fd = connfd;break;}}maxfd = i > maxfd ? i : maxfd;}// 通信,有客户端发送数据过来for (int i = 1; i <= maxfd; i++) {// 如果在集合中,说明读缓冲区有数据if (fds[i].revents & POLLIN) {char buf[128];int ret = read(fds[i].fd, buf, sizeof(buf));if (ret == -1) {perror("read");exit(0);} else if (ret == 0) {printf("对方已经关闭了连接...\n");close(fds[i].fd);fds[i].fd = -1;} else {printf("客户端say: %s\n", buf);write(fds[i].fd, buf, strlen(buf) + 1);}}}}close(lfd);return 0;
}
2. epoll
2.1 select、poll、epoll对比
如果内存 1G,epoll 就支持10万连接
不能跨平台,只能在 Linux 使用
支持的并发量很大
- select
- 跨平台:支持
- 在 windows 平台
select
的第一个参数没有意义,写0即可
- 在 linux 平台
select
的第一个参数是检测的集合中最大文件描述符+1
- 在 windows 平台
- 检测的连接数
- 最大1024
- 检测方式和效率
- 线性检测,文件描述符越多,效率越低
- 使用 select 检测的集合会发生多次数据拷贝
- 用户区拷贝到内核区 -> 传入
- 内核区拷贝到用户区 -> 传出
- 传出信息的量
- 有多少文件描述符发生了变化 -> 返回值
- 到底哪个发生了状态变化,需要使用者检测
- 检测内核传出的revent
- 跨平台:支持
- poll
- 跨平台:不支持,只支持 Linux
- 检测的连接数
- 和内存有关
- 检测方式和效率
- 线性检测,文件描述符越多,效率越低
- epoll
- 跨平台:不支持,只支持 Linux
- 检测的连接数
- 和内存有关
- 检测方式和效率
- 树状==(红黑树)==模型,检测效率很高
- 委托 epoll 检测的文件描述符集合用户和内核使用的是同一块内存(共享内存),没有数据拷贝
- 传出信息的量
- 有多少文件描述符发生了变化 -> 返回值
- 可以精确地知道到底是哪个文件描述符发生了状态变化
2.2 epoll的使用
epoll 是一个模型,树状模型,使用 epoll 需要调用3个函数
epoll 的使用步骤:
- 需要先新创建一个树状模型,没有节点
- 将要检测的节点添加到 epoll 树上
- 文件描述符的类型
- 监听的
- 通信的
- 从检测的事件上说
- 读
- 写
- 异常
- 文件描述符的类型
- 开始委托内核对树上的节点进行检测
- 处理的过程
- 监听的:建立新的连接
- 通信的:接收和发送数据
2.3 epoll函数
1. 创建epoll模型,红黑树模型
#include <sys/epoll.h>
int epoll_create(int size); // 创建一个epoll模型
- 参数
size
:监听的文件描述符的上限,Linux2.6版本之后这个值没有实际意义,只需要大于0即可
- 返回值
- 成功:返回一个有效的文件描述符,可以理解为红黑树的根节点,通过这个文件描述符就可以访问创建的实例
- 失败:返回-1
2. 对epoll树的节点操作函数
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
- 功能:实现对epoll模型上节点的添加/删除/修改
- 参数
epfd
:树的句柄,epoll_create()
函数的返回值,找到epoll树的实例op
:EPOLL_CTL_ADD
:添加新节点EPOLL_CTL_MOD
:修改已经添加到树上的节点的属性- 比如原来检测的是读事件,可以修改为写事件
EPOLL_CTL_DEL
:将节点从树上删除
fd
:要操作的文件描述符- 如何操作:添加/修改/删除
- 种类:
- 监听的
- 通信的
event
:- 添加:设置要检测的文件描述符的什么事件
- 修改:修改对应的文件描述符的事件
- 删除:NULL
// union 不管里面有多少成员,这些成员共用同一块内存
// 使用的时候只能用其中一个成员,否则会发生数据覆盖
typedef union epoll_data {void *ptr;int fd; // 常用的一个成员uint32_t u32;uint64_t u64;
} epoll_data_t;struct epoll_event {uint32_t events; /* Epoll events */ // 需要监听的事件epoll_data_t data; /* User data variable */ // 需要监听的文件描述符
};
events
:EPOLLIN
:读事件,检测文件描述符的读缓冲区,检测有没有数据EPOLLOUT
:写事件,检测文件描述符的写缓冲区,检测是否可写(有空间就可写)
data.fd = epoll_ctl()第三个参数的fd的值
3. 监听
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
-
这是一个阻塞函数
-
委托内核检测epoll树上的文件描述符状态,如果没有状态变化,该函数默认一直阻塞
-
有满足条件的文件描述符被检测到,函数返回
-
参数
epfd
:树的句柄,epoll_create()
函数的返回值,找到epoll树的实例events
:传出参数,里面记录了当前这轮检测 epoll 模型中有状态变化的文件描述符信息- 这个参数是一个结构体数组的地址
maxevents
:指定第二个参数events
数组的容量timeout
:超时时长,单位 ms,和 poll 是一样的-1
:委托内核检测epoll树上的文件描述符状态,如果没有状态变化,该函数默认一直阻塞,有满足条件的文件描述符被检测到,函数返回0
:epoll_wait
调用之后,函数马上返回>0
:委托内核检测epoll树上的文件描述符状态,如果没有状态变化,但是timeout的时间到达了,函数被强制解除阻塞
-
返回值
- 成功:有多少个文件描述符发生了状态变化
3. 基于epoll的TCP服务器
3.1 伪代码
int main() {1. 创建监听的套接字int lfd = socket();2. 绑定bind();3. 设置监听listen();4. 创建epoll模型int epfd = epoll_create();5. 将需要检测的文件描述符添加到epollstruct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = lfd;epoll_ctl(epfd, epoll_ctl_add, lfd, &ev);6. 开始检测struct epoll_event events[1024];while(1) {int num = epoll_wait(epfd, events, 1024, -1);// 处理num个有状态变化的文件描述符for (int i = 0; i < num; i++) {// 更严谨的判断,如果不是读事件就不处理if (ev.events & EPOLLout) {// 如果是写事件,忽略continue;}int curfd = events[i].data.fd;if (curfd == lfd) {int cfd = accept(lfd, NULL, NULL);// cfd添加到epoll模型中ev.events = EPOLLIN;ev.data.fd = cfd;epoll_ctl(epfd, epoll_ctl_add, cfd, &ev);} else {// 通信int len = recv(curfd, buf, size, -1);if (len == 0) {printf("客户端已经断开连接...\n");// 从epoll模型中删除该节点epoll_ctl(epfd, epoll_ctl_del, curfd, NULL);close(curfd);} else if (len > 0) {send();} else {// len = -1perror("recv");}}}}
}
3.2 代码
epoll-server.h
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/epoll.h>int mian() {// 1. 创建监听 的套接字int lfd = socket(AF_INET, SOCK_STREAM, 0);if (lfd == -1) {perror("socket");exit(0);}// 2. 绑定struct sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(8989);addr.sin_addr.s_addr = INADDR_ANY;int ret = bind(lfd, (struct sockaddr*)&addr, sizeof(addr));if (ret == -1) {perror("bind");exit(0);}// 3. 设置监听ret = listen(lfd, 128);if (ret == -1) {perror("listen");exit(0);}// 4. 创建epoll模型int epfd = epoll_create(1);if (epfd == -1) {perror("epoll");exit(0);}// 5. 将要检测的节点添加到epoll模型中struct epoll_event ev;ev.events = EPOLLIN; // 检测lfd的读缓冲区ev.data.fd = lfd; // 要检测的文件描述符ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);if (ret == -1) {perror("epoll_ctl");exit(0);}// 6. 不停地委托内核检测epoll模型中的文件描述符状态struct epoll_event evs[1024];int size = sizeof(evs) / sizeof(evs[0]);while (1) {int num = epoll_wait(epfd, evs, size, -1);// 遍历 evs 数组,个数就是返回值for (int i = 0; i < num; i++) {// 取出数组元素中的文件描述符int curfd = evs[i].data.fd;if (curfd == lfd) {// 建立新连接,这里调用不会阻塞int cfd = accept(lfd, NULL, NULL);// cfd添加到检测的epoll模型中ev.events = EPOLLIN;ev.data.fd = cfd;epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);} else {// 通信char buf[1024];memset(buf, 0, sizeof(buf));int len = recv(curfd, buf, sizeof(buf), 0);if (len == 0) {printf("客户端已经断开连接...\n");epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL);// 这里注意一定要先删除再关闭close(curfd);} else if (len > 0) {printf("recv data: %s\n", buf);send(curfd, buf, len, 0);} else {perror("recv");exit(0);}}}}// 6. 断开连接close(lfd);return 0;
}
4. epoll 的工作模式
- 两种工作模式
- LT(Level Trigger):水平触发(默认模式)
- 阻塞和非阻塞的套接字都是支持的
- 阻塞指的是接收和发送数据的状态
- read/recv
- write/send
- 阻塞指的是接收和发送数据的状态
- 阻塞和非阻塞的套接字都是支持的
- ET(Edge Trigger):边缘触发(需要手动设置)
- 效率高,只支持非阻塞的套接字
- 边沿模式需要手动进行设置
- LT(Level Trigger):水平触发(默认模式)
4.1 LT模式
LT 模式(Level Trigger,水平触发)是
epoll
的默认触发模式。在此模式下,只要被监听的文件描述符上仍然存在未处理的事件(如可读或可写),epoll_wait()
就会持续返回该事件,直到应用程序完全处理完毕。该模式具有良好的容错性和编程友好性,适用于对实时性要求不高、连接数中等的应用场景。在 LT 模式下,即使应用程序没有一次性读取或写完数据,内核仍会在后续的epoll_wait()
调用中继续通知该事件,保证事件不会被遗漏。
场景:
客户端给服务器发送数据,每次发送1k数据,服务器使用epoll检测(水平模式),检测到读缓冲区中有数据,每次接收500字节。(发送的快,接收的慢)
特点: 通知的频率高,只要满足条件epoll_wait()
函数就返回(相当于通知)
- 读事件:
- 接收端接收数据的量少,接收一次数据包接收不完,还有500字节数据在读缓冲区中
- 在这种场景下,只要是
epoll_wait
检测到读缓冲区有数据,就会通知用户一次- 不管数据有没有读完,只要有数据就通知
- 通知就是
epoll_wait()
函数返回,我们就可以处理传出参数中的文件描述符的状态
- 写事件:
- 检测写缓冲区是否可用(是否有容量),只要是可写(有容量),
epoll_wait()
就会返回
- 检测写缓冲区是否可用(是否有容量),只要是可写(有容量),
4.2 ET模式
ET 模式(Edge Trigger,边缘触发)是
epoll
提供的高性能触发模式,需要显式设置EPOLLET
标志启用。在 ET 模式下,只有当文件描述符的状态发生边缘变化(如从无数据变为有数据)时,epoll_wait()
才会通知事件,且仅通知一次。因此,应用程序必须在接收到事件后,采用非阻塞 I/O 并在单次回调中将数据全部处理完毕,否则后续数据到达时将不会再次触发通知。该模式减少了系统调用频率和重复通知,提升了在高并发环境下的性能,适用于高吞吐、低延迟的网络服务系统。
场景:
客户端给服务器发送数据,每次发送1k数据,服务器使用epoll检测(边沿模式),检测到读缓冲区中有数据,每次接收500字节。(发送的快,接收的慢)
特点: epoll_wait()
检测次数变少了,效率变高了(有满足条件的新状态才会通知)
- 读事件
- 接收端每次收到一条新的数据,
epoll_wait()
会通知一次- 如果在这一次通知后,没有将缓存中的数据全部读出,
epoll_wait()
也不会再次通知 - 接收到新的数据,
epoll_wait()
只通知一次,不管数据有没有读完
- 如果在这一次通知后,没有将缓存中的数据全部读出,
- 接收端每次收到一条新的数据,
- 写事件
- 检测写缓冲区是否可用(是否有容量)
- 检测到可用则通知一次,再检测到缓冲区可用就不通知了
- 写缓冲区原来是不可用的(满了),后来缓冲区可用(有容量了),
epoll_wait()
检测到之后通知一次(只有一次)
如何设置边沿模式?
- 在struct epoll_event 结构体的成员变量 events 事件中额外设置 EPOLLET
// 往epoll模型中添加新节点
int cfd = accept(lfd, NULL, NULL);
// cfd添加到检测的epoll模型中
ev.events = EPOLLIN | EPOLLET; // 设置文件描述符的边沿模式
ev.data.fd = cfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);
通过测试,如果epoll_wait()
只通知一次,并且接收数据的缓存比较小,会导致服务器端通信的文件描述符中的数据越来越多,数据如果不能全部读出,就无法处理客户端请求,如何解决这个问题?
-
在
epoll_wait()
通知的这一次中,将客户端发送的数据全部读出- 方案一:接收端(服务器端)准备一个特别大的内存块,用来存储接收的数据
- 弊端:
- 客户端发送的数据有多大是不可预期的,大小的上限不太容易界定
- 向操作系统申请的内存太大,申请内存的操作会失败
- 弊端:
- 方案二:循环地进行数据接收
- 这种方案存在问题,会导致服务器端程序的阻塞
while(1) {int len = read(cfd, buf, sizeof(buf));}``` - 读完之后需要跳出循环 - 如果客户端和服务器的连接还保持着,如果数据接收完毕,read函数阻塞 - 服务器程序的单线程/进程,read阻塞会导致整个服务器程序阻塞
- 方案一:接收端(服务器端)准备一个特别大的内存块,用来存储接收的数据
-
解决上诉问题:将数据接收动作修改为非阻塞
read()/recv(), write()/send()
阻塞的函数行为,还是操作的文件描述符导致的?- 调用这些函数都是去检测操作的文件描述符的读写缓冲区,因此可知是文件描述符导致的
-
如何设置文件描述符的非阻塞?
- 使用
fcntl
函数设置文件描述符的非阻塞
- 使用
函数原型:
#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */ );
- 因为文件描述符行为默认是阻塞的,因此要追加非阻塞行为
- 获取文件描述符的 flag 属性
int flag = fcntl(cfd, F_GETFL);
意思是:把
cfd
这个 socket 的“当前设置”查出来
F_GETFL
表示“获取文件状态标志”- 常见的标志有:
O_RDONLY
、O_NONBLOCK
等 - 返回的
flag
是一个“二进制标志组合”,代表当前这个 socket 的所有设置
- 给 flag 追加非阻塞
flag = flag | O_NONBLOCK; // flag |= O_NONBLOCK;
意思是:我在原有设置上,加上一个“非阻塞”的功能位
- 将新的flag属性设置到文件描述符中
fcntl(cfd, F_SETFL, flag);
意思是:把“新的设置”应用回
cfd
这个 socket 上去
原本设置:O_RDWR (阻塞)↓
加上: | O_NONBLOCK (变成非阻塞)↓
设置回去:fcntl(fd, F_SETFL, ...)
- 在非阻塞模式下读数据遇到的错误
recv error: Resource temporarily unavailable //资源不可用,因为内存中没有数据了
- 错误出现的原因:
while(1) {int len = recv();}
- 循环地读数据,当通信的文件描述符对应读缓冲区数据被读完,recv/read 不会阻塞,继续读缓冲区
- 但是缓冲区中没有数据,这时候 read/recv 调用就失败了,返回 -1
- 这时候错误号 errno 的值为
errno = EAGAIN or EWOULDBLOCK
,一般情况下使用EAGAIN
判断就可以
代码
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>int main() {// 1. 创建监听的套接字int lfd = socket(AF_INET, SOCK_STREAM, 0);if (lfd == -1) {perror("socket");exit(0);}// 2. 绑定struct sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(8989);addr.sin_addr.s_addr = INADDR_ANY;int ret = bind(lfd, (struct sockaddr*)&addr, sizeof(addr));if (ret == -1) {perror("bind");exit(0);}// 3. 设置监听ret = listen(lfd, 128);if (ret == -1) {perror("listen");exit(0);}// 4. 创建poll模型int epfd = epoll_create(1);if (epfd == -1) {perror("epoll");exit(0);}// 5. 将要检测的节点添加到epoll模型中struct epoll_event ev;ev.events = EPOLLIN; // 检测lfd的读缓冲区ev.data.fd = lfd; // 要检测的文件描述符ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);if (ret == -1) {perror("epoll_ctl");exit(0);}// 6. 不停地委托内核检测epoll模型中的文件描述符状态struct epoll_event evs[1024];int size = sizeof(evs) / sizeof(evs[0]);while (1) {int num = epoll_wait(epfd, evs, size, -1);printf("num = %d\n", num);// 遍历 evs 数组,个数就是返回值for (int i = 0; i < num; i++) {// 取出数组元素中的文件描述符int curfd = evs[i].data.fd;if (curfd == lfd) {// 建立新连接,这里调用不会阻塞int cfd = accept(lfd, NULL, NULL);// 将通信的描述符设置为非阻塞int flag = fcntl(cfd, F_GETFL);flag |= O_NONBLOCK;fcntl(cfd, F_SETFL, flag);// cfd添加到检测的epoll模型中ev.events = EPOLLIN | EPOLLET; // 边沿模式->通信的时候ev.data.fd = cfd;epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);} else {// 通信char buf[5];memset(buf, 0, sizeof(buf));while(1) {int len = recv(curfd, buf, sizeof(buf), 0);if (len > 0) {// 读到了数据,发送回客户端// 如果有实际的业务需求,需要接受一个完整的数据包// 数据包有包头,通过包头就指定当前消息的字节数send(curfd, buf, len, 0);} else if (len == 0) {printf("客户端已经断开连接...\n");epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL);close(curfd);break;} else {// 返回值 == -1if (errno == EAGAIN || errno == EWOULDBLOCK) {printf("数据读完了\n");break;} else {perror("recv");exit(0);}}}}}}// 6. 断开连接close(lfd);return 0;}
总结
使用 epoll 的边沿触发模式(ET),通过设置非阻塞 I/O 和循环读取机制,实现了高性能的多客户端 TCP 通信模型。ET 模式下,事件只在状态变化时通知一次,因此需要在回调中一次性将数据读取完毕,否则可能错过后续数据。相比于传统的水平触发(LT),ET 模式减少了 epoll_wait 的触发频率,提高了系统吞吐效率。