【Liunx】高级IO
目录
- 1. 五种IO模型
- 1.1 阻塞IO (Blocking IO)
- 1.2 非阻塞IO (Non-blocking IO)
- 1.3 IO多路复用 (IO Multiplexing)
- 1.4 信号驱动IO (Signal-driven IO)
- 1.5 异步IO (Asynchronous IO)
- 2. 高级IO重要概念
- 2.1 同步IO与异步IO
- 2.2 阻塞和非阻塞对读和写都有影响
- 3. 非阻塞IO
- 3.1 fcntl设置为非阻塞
- 3.2 I/O多路转接
- 3.2.1 select
- 3.2.2 poll
- 3.2.3 epoll
- 3.2.3.1 epoll核心的三个ARI
- 3.2.3.2 epoll的原理
- 3.2.3.3 水平触发 vs 边缘触发
- 3.2.3.4 epoll相对于select/poll的优点
1. 五种IO模型
IO本质就是等+拷贝
把IO效率变高,其实就是把等待时间变短
我们直接看五种IO模型在真实代码层面的核心区别。核心是理解应用进程/线程在等待数据到达内核缓冲区和将数据从内核拷贝到用户空间这两个阶段的状态。
- 阻塞IO (Blocking IO)
- 非阻塞IO (Non-blocking IO)
- IO多路复用 (IO Multiplexing)
- 信号驱动IO (Signal-driven IO)
- 异步IO (Asynchronous IO)
1.1 阻塞IO (Blocking IO)
真实过程:
- 应用调用recvfrom()系统调用,向内核索要数据。
- 内核开始准备数据(对于网络IO,就是等待数据包到达)。
- 应用进程被挂起(阻塞),什么也干不了,直到内核数据准备好。
- 内核数据准备好后,将其从内核空间拷贝到应用提供的用户空间缓冲区。
- 拷贝完成,recvfrom()返回成功信号,应用进程解除阻塞,继续执行。
代码感受:
// 这段代码会一直停在这里,直到有数据到来或出错
ssize_t num_bytes = recvfrom(sockfd, buf, BUFSIZE, 0, (struct sockaddr *)&src_addr, &addrlen);
// 这行代码之后,数据已经在buf里了,可以直接处理
process_data(buf, num_bytes);
特点: 最简单,但性能最差,一个线程只能处理一个连接。
1.2 非阻塞IO (Non-blocking IO)
真实过程:
- 应用通过fcntl(sockfd, F_SETFL, O_NONBLOCK)将Socket设置为非阻塞模式。
- 应用调用recvfrom()系统调用。
- 如果内核中没有数据 ready,recvfrom()会立即返回一个错误码(如EWOULDBLOCK),而不是阻塞应用进程。
- 应用进程可以继续做其他事情(比如处理其他连接的业务逻辑)。
- 应用需要不断地循环调用recvfrom()(轮询),直到某一次调用返回成功,表示数据已就绪并拷贝完成。
// 将socket设置为非阻塞
fcntl(sockfd, F_SETFL, O_NONBLOCK);while (1) {ssize_t num_bytes = recvfrom(sockfd, buf, BUFSIZE, 0, ...);if (num_bytes > 0) {// 成功收到数据,处理process_data(buf, num_bytes);break;} else if (errno == EWOULDBLOCK) {// 内核数据还没准备好,可以做点别的事,比如处理其他连接do_other_work();usleep(1000); // 避免疯狂循环消耗CPU} else {// 真正的错误发生了handle_error();break;}
}
特点: CPU消耗极大,因为大部分轮询都是空转。实践中很少直接这样用。
1.3 IO多路复用 (IO Multiplexing)
真实过程 (以epoll为例):
- 应用不是直接调用recvfrom(),而是先调用epoll_create()创建一个epoll对象。
- 通过epoll_ctl()将需要监听的Socket(文件描述符)添加到这个epoll对象中,并关心的事件(如EPOLLIN,即可读事件)。
- 应用调用epoll_wait()阻塞等待。注意:这里是阻塞在epoll_wait上,而不是阻塞在单个Socket上。
- 当epoll监控的任何一个Socket有数据到达(即可读)时,epoll_wait()会返回,告知应用哪些Socket发生了事件。(也可以检测是否可写)
- 应用遍历有事件发生的Socket,然后对每一个可读的Socket调用recvfrom()。此时调用recvfrom()会立即将数据从内核拷贝到用户空间,因为内核数据早已准备就绪。
代码感受:
// 1. 创建epoll实例
int epoll_fd = epoll_create1(0);
// 2. 添加要监听的socket
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = sockfd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &event);// 3. 等待事件发生(此处阻塞)
struct epoll_event events[MAX_EVENTS];
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);// 4. 处理所有就绪的事件
for (int i = 0; i < nfds; i++) {if (events[i].events & EPOLLIN) {// 确认是这个socket可读,再调用recvfrom,此时不会阻塞ssize_t num_bytes = recvfrom(events[i].data.fd, buf, BUFSIZE, 0, ...);process_data(buf, num_bytes);}
}
特点: 一个线程可以高效管理成千上万个连接。select/poll
是早期实现,需要遍历所有连接;epoll/kqueue
是更高效的现代实现。
1.4 信号驱动IO (Signal-driven IO)
真实过程:
- 应用通过fcntl()和sigaction()为一个Socket设置信号驱动标志O_ASYNC并指定一个信号处理函数(如SIGIO)。
- 应用可以继续执行其他代码,不会被阻塞。
- 当该Socket的数据到达内核 ready 时,内核会向应用进程发送一个SIGIO信号。
- 应用进程在预先注册的信号处理函数中调用recvfrom()去读取数据(从内核拷贝到用户空间)。
代码感受:
// 设置信号处理函数
signal(SIGIO, sigio_handler);// 设置当前进程为接收信号的进程
fcntl(sockfd, F_SETOWN, getpid());
// 启用信号驱动IO
int flags = fcntl(sockfd, F_GETFL);
fcntl(sockfd, F_SETFL, flags | O_ASYNC | O_NONBLOCK);// ... 主线程可以去做别的事了 ...// 信号处理函数
void sigio_handler(int sig) {ssize_t num_bytes;// 在信号处理函数中读取数据while ((num_bytes = recvfrom(sockfd, buf, BUFSIZE, 0, ...)) > 0) {process_data(buf, num_bytes);}
}
特点: 编程复杂(信号处理有很多限制),通知是边缘触发(ET,即状态变化才会通知),容易丢失事件,不常用。
1.5 异步IO (Asynchronous IO)
真实过程 (以Linux AIO为例):
- 应用发起一个aio_read()操作,并提供一个缓冲区、Socket描述符、回调函数等。
- aio_read()调用会立即返回,应用进程继续执行,完全不被阻塞。
- 内核会自己完成所有工作:等待数据到达内核 -> 将数据从内核拷贝到应用在第1步提供的那个用户空间缓冲区。
- 当整个IO操作(等待+拷贝)全部完成后,内核会通过信号或回调函数等方式通知应用。
代码感受:
// 定义一个AIO控制块
struct aiocb my_aiocb;
// 填充控制块:socket、缓冲区、大小、回调函数等
bzero(&my_aiocb, sizeof(my_aiocb));
my_aiocb.aio_fildes = sockfd;
my_aiocb.aio_buf = buf; // 应用程序提供的缓冲区
my_aiocb.aio_nbytes = BUFSIZE;
// ... 其他设置 ...// 发起异步读请求,函数立即返回,不会阻塞
int ret = aio_read(&my_aiocb);
// ... 这里可以立刻做其他事情 ...// 之后,可以通过aio_error()轮询检查是否完成,或者等待信号通知
特点: 真正的异步。应用只需要发起请求和接收完成通知,中间过程完全不参与。与IO多路复用的核心区别在于:AIO通知的是“IO操作已完成”(数据已在你的缓冲区),而IO多路复用通知的是“IO操作可以开始了”(内核数据已就绪,需要你自己调用recvfrom来拷贝)。
总结:
IO模型 | 等待数据阶段 (recvfrom第一阶段) | 拷贝数据阶段 (recvfrom第二阶段) | 通知内容 |
---|---|---|---|
阻塞IO | 应用阻塞 | 应用阻塞 | 操作完成 |
非阻塞IO | 应用轮询 | 应用阻塞 | 操作可立即执行 |
IO多路复用 | 应用阻塞于select/epoll | 应用阻塞 | 数据已就绪(可读了) |
信号驱动IO | 应用继续执行 | 应用阻塞 | 数据已就绪(可读了) |
异步IO | 应用继续执行 | 应用继续执行 | 数据已拷贝完成 |
2. 高级IO重要概念
2.1 同步IO与异步IO
核心判断法则
只看一个阶段:拷贝数据阶段(recvfrom第二阶段)。
- 如果应用进程在“拷贝数据阶段”需要被阻塞(即不能执行其他代码),等待内核将数据拷贝到自己的缓冲区,那么就是同步I/O。
- 如果应用进程在“拷贝数据阶段”完全不需要关心,可以继续执行其他代码,拷贝工作由内核在后台独立完成,那么就是异步I/O。
那么上述5种IO前5种都是同步IO,最后一个是异步IO
2.2 阻塞和非阻塞对读和写都有影响
阻塞模式:
- 读操作(read):如果没有数据可读,进程会一直阻塞,直到有数据到来或连接关闭。
- 写操作(write):如果内核发送缓冲区已满,进程会一直阻塞,直到有足够空间容纳要写入的数据。
非阻塞模式:
- 读操作(read):如果没有数据可读,立即返回-1,并设置 errno 为 EAGAIN/EWOULDBLOCK。
- 写操作(write):如果内核发送缓冲区已满,立即返回-1,并设置 errno 为 EAGAIN/EWOULDBLOCK。数据不能写进,直接丢弃。
3. 非阻塞IO
3.1 fcntl设置为非阻塞
一个文件描述符, 默认都是阻塞IO
函数原型如下.
#include <unistd.h>
#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */ );
传入的cmd的值不同, 后面追加的参数也不相同.
获得/设置文件状态标记(cmd=F_GETFL或F_SETFL).
标准三步法——GET → 修改 → SET模式
将一个文件描述符设置为非阻塞模式的标准代码如下:
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <errno.h>int set_nonblocking(int fd) {// 第一步:获取当前的文件状态标志int flags = fcntl(fd, F_GETFL, 0); //0可以省略。if (flags == -1) {perror("fcntl(F_GETFL) failed");return -1;}// 第二步:给标志加上 O_NONBLOCK 位flags |= O_NONBLOCK;// 第三步:将新的标志设置回文件描述符if (fcntl(fd, F_SETFL, flags) == -1) {perror("fcntl(F_SETFL) failed");return -1;}return 0; // 成功
}
3.2 I/O多路转接
3.2.1 select
select 是什么?
select 是最早出现、最基础的一种I/O多路转接技术。它允许程序告知内核:“帮我监视一组描述符,直到其中一个或多个描述符准备好进行I/O操作时,再唤醒我。”
#include <sys/select.h>int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
参数详解
- nfds(最大的文件描述符编号 + 1)
- 这是所有被监视的描述符集合中最大的那个描述符数值 + 1。
- 内核通过这个值来限制需要检查的描述符范围,提高效率。例如,你要监视描述符 3, 5, 8,那么 nfds 就应该设为 9 (8+1)。
- readfds(可读事件集合)
- 传入参数:你希望内核帮你监视哪些描述符有数据可读。
- 传出参数:当 select 返回时,内核会修改这个集合,只保留那些真正可读的描述符。
- 如果对某个事件不感兴趣,可以传入 NULL。
- writefds(可写事件集合)
- 传入参数:你希望内核帮你监视哪些描述符可以写入数据(发送缓冲区有空间)。
- 传出参数:返回时,只保留那些真正可写的描述符。
- exceptfds(异常事件集合)
- 用于监视某些异常条件,如带外数据(OOB data)的到来。
- timeout(超时时间)
- 控制 select 的阻塞行为。
- NULL:永远阻塞,直到有事件发生。
- 设为 0 即 {0,0}:非阻塞,检查一下立刻返回。
- 设为大于0的值:阻塞指定时间,超时后即使无事件也返回。当没超时时,只有要一个套接字可写或可读(前提是检测了读或写),就直接返回
返回值
大于0:表示就绪的描述符的总数。
= 0:超时时间到,没有任何描述符就绪。
-1:发生错误。
fd_set的结构:
- fd_set 本质上是一个位图(bit array 或 bitmask)。 它的每一位(bit)对应一个文件描述符(file descriptor)。
- 如果某个位被设置为 1:表示对应的文件描述符在集合中,需要被监视。
- 如果某个位是 0:表示对应的文件描述符不在集合中。
- fd_set最多1024bit,所以最多检测1024个套接字,有局限性
// 简化理解的定义typedef struct {long int fds_bits[16]; // 或者类似整型数组} fd_set;
- fd_set的操作函数:
- FD_ZERO(&set):清空集合。
- FD_SET(fd, &set):将描述符 fd 加入集合。
- FD_CLR(fd, &set):将描述符 fd 从集合中移除。
- FD_ISSET(fd, &set):检查 fd 是否在集合中(即是否就绪)。
三、select 的工作流程(核心)
这是一个标准的使用 select 的服务器伪代码流程,它完美体现了其工作模式:
// 1. 初始化
int listen_fd = socket(...); // 创建监听socket
bind(listen_fd, ...);
listen(listen_fd, ...);// 2. 定义并初始化描述符集合
fd_set read_fds; // 读事件集合
FD_ZERO(&read_fds); // 清空集合
FD_SET(listen_fd, &read_fds); // 将监听socket加入集合
int max_fd = listen_fd; // 当前最大描述符while(1) {// 3. 每次调用select前,需要重置集合(因为内核会修改它)fd_set tmp_fds = read_fds;// 4. 调用select,阻塞等待事件发生int ret = select(max_fd + 1, &tmp_fds, NULL, NULL, NULL);if (ret > 0) {// 5. 有事件发生!遍历所有被监视的描述符,找出谁就绪了for (int fd = 0; fd <= max_fd; fd++) {if (FD_ISSET(fd, &tmp_fds)) { // 检查fd是否在就绪集合中if (fd == listen_fd) {// 6. 监听socket可读,表示有新连接到来int client_fd = accept(listen_fd, ...);FD_SET(client_fd, &read_fds); // 将新连接加入监视集合if (client_fd > max_fd) max_fd = client_fd; // 更新max_fd} else {// 7. 客户端socket可读,表示有数据到来char buf[1024];ssize_t n = read(fd, buf, sizeof(buf));if (n <= 0) {// 连接关闭或出错close(fd);FD_CLR(fd, &read_fds); // 从集合中移除} else {// 处理数据...}}}}}
}
四、select 的优缺点
优点
- 跨平台:几乎所有的操作系统都支持。
- 解决了基础问题:实现了单进程同时处理多个I/O。
缺点(非常严重,导致其被淘汰)
- 连接数限制:受固定大小的 fd_set 结构限制,默认最多仅支持 1024 个并发连接。
- 线性扫描效率低:
内核需要遍历所有被监视的描述符(0 到 max_fd)来检测就绪事件。
用户在调用返回后需要遍历整个集合以识别就绪的描述符。
算法复杂度为 O(n),性能随连接数增加而线性下降。 - 状态无法持久化:每次调用 select 前必须重新设置传入的描述符集合,因为内核会修改该参数,产生重复初始化开销。
- 内存数据拷贝:每次调用都需在用户态和内核态之间拷贝整个 fd_set 结构,当监控大量描述符时存在额外开销。
3.2.2 poll
函数原型详解
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
参数说明:
- fds - 指向 struct pollfd 数组的指针
- 这个数组包含了所有要监视的文件描述符及其关注的事件
- 每个元素对应一个文件描述符
- nfds - 要监视的文件描述符数量
- 类型 nfds_t 通常是 unsigned long
- 指定 fds 数组中有多少个有效的元素
- timeout - 超时时间(毫秒)
- -1: 无限阻塞,直到有事件发生
- 0: 立即返回,不阻塞(轮询)
- 大于0: 阻塞指定的毫秒数,超时后返回 0
返回值:
大于 0: 就绪的文件描述符总数
0: 超时,没有文件描述符就绪
-1: 出错,并设置相应的 errno
struct pollfd 结构体详解
struct pollfd {int fd; /* 文件描述符 */short events; /* 我们关心的事件 (输入参数) */short revents; /* 实际发生的事件 (输出参数) */
};
成员说明:
- fd - 文件描述符
- 要监视的文件描述符
- 如果设置为负数,poll 会忽略这个条目
- events - 请求监视的事件(输入)
- 应用程序设置,告诉内核"我关心什么事件"
- revents - 返回的事件(输出)
- 内核设置,返回"实际发生了什么事件"
- 调用后检查这个字段来判断文件描述符状态
事件标志位详解:
比较于select有更多类型,select只能检测读和写
事件标志 | 含义 | 可设置在 | 说明 |
---|---|---|---|
POLLIN | 有数据可读 | events | 普通数据或优先级数据可读 |
POLLPRI | 紧急数据可读 | events | 高优先级数据(如TCP带外数据) |
POLLOUT | 可写 | events | 现在可以写入数据而不阻塞 |
POLLRDHUP | 对端关闭连接 | events | 流套接字的对端关闭连接 |
POLLERR | 错误 | revents | 发生错误(仅返回) |
POLLHUP | 挂起 | revents | 文件描述符被挂起(仅返回) |
POLLNVAL | 无效请求 | revents | 文件描述符未打开(仅返回) |
事件组合示例:
// 监视可读和可写事件
events = POLLIN | POLLOUT;
// 监视可读和紧急数据
events = POLLIN | POLLPRI;
实际网络编程示例
#include <sys/socket.h>
#include <netinet/in.h>
#include <poll.h>
#include <arpa/inet.h>#define MAX_CLIENTS 100int main() {int server_fd, new_socket;struct sockaddr_in address;struct pollfd fds[MAX_CLIENTS + 1]; // +1 for server socketint nfds = 1; // Start with server socket only// 创建服务器套接字server_fd = socket(AF_INET, SOCK_STREAM, 0);address.sin_family = AF_INET;address.sin_addr.s_addr = INADDR_ANY;address.sin_port = htons(8080);bind(server_fd, (struct sockaddr*)&address, sizeof(address));listen(server_fd, 10);// 设置服务器套接字到poll数组fds[0].fd = server_fd;fds[0].events = POLLIN;printf("服务器启动,监听端口 8080...\n");while (1) {int ret = poll(fds, nfds, -1); // 无限阻塞if (ret <= 0) continue;// 检查所有文件描述符for (int i = 0; i < nfds; i++) {if (fds[i].revents & POLLIN) {if (fds[i].fd == server_fd) {// 新的连接请求new_socket = accept(server_fd, NULL, NULL);if (nfds < MAX_CLIENTS + 1) {fds[nfds].fd = new_socket;fds[nfds].events = POLLIN;nfds++;printf("新客户端连接,fd=%d\n", new_socket);} else {close(new_socket);}} else {// 客户端数据可读char buffer[1024];int bytes_read = recv(fds[i].fd, buffer, sizeof(buffer), 0);if (bytes_read <= 0) {// 客户端断开连接printf("客户端 fd=%d 断开连接\n", fds[i].fd);close(fds[i].fd);// 从数组中移除(简单做法:用最后一个元素覆盖)fds[i] = fds[nfds - 1];nfds--;i--; // 重新检查当前位置} else {// 处理接收到的数据buffer[bytes_read] = '\0';printf("从 fd=%d 收到: %s", fds[i].fd, buffer);}}}}}return 0;
}
关键要点总结
- events 和 revents 分离:输入输出参数分开,使用更方便
- 无数量限制:使用动态数组,可以监视大量文件描述符
- 需要遍历:返回后仍需遍历所有描述符检查 revents
- 事件类型丰富:支持更多类型的事件检测
3.2.3 epoll
3.2.3.1 epoll核心的三个ARI
- epoll_create - 创建 epoll 实例
函数原型
#include <sys/epoll.h>int epoll_create(int size);
int epoll_create1(int flags);
详细说明:
作用: 创建一个 epoll 实例,返回一个文件描述符,用于后续的所有 epoll 操作。
参数:
- size (在 epoll_create 中):
- 现代内核中这个参数被忽略,但必须大于 0
- 为了兼容性,通常传递一个合理的估计值(如 128)
- flags (在 epoll_create1 中):
- 0: 默认行为
返回值:
- 成功:返回 epoll 文件描述符(非负整数)
- 失败:返回 -1,并设置 errno
重要说明:
- 每个 epoll 实例在内核中对应一个兴趣列表(interest list)
- 使用完毕后必须用 close() 关闭 epoll 文件描述符
- 一个进程可以创建多个 epoll 实例
- epoll_ctl - 管理兴趣列表
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
详细说明
作用: 向 epoll 实例的兴趣列表中添加、修改或删除一个文件描述符。
参数:
- epfd: epoll_create 返回的文件描述符
- op: 操作类型,取以下值之一:
- EPOLL_CTL_ADD: 添加新的文件描述符到兴趣列表
- EPOLL_CTL_MOD: 修改已存在的文件描述符的监视事件
- EPOLL_CTL_DEL: 从兴趣列表中删除文件描述符
- fd: 要操作的目标文件描述符
- event: 指向 epoll_event 结构体的指针,描述该文件描述符要监视的事件
epoll_event 结构体:
typedef union epoll_data {void *ptr; // 用户自定义数据指针int fd; // 文件描述符uint32_t u32; // 32位整数uint64_t u64; // 64位整数
} epoll_data_t;struct epoll_event {uint32_t events; // Epoll 事件掩码(要监视的事件类型)epoll_data_t data; // 用户数据(事件发生时返回的数据)
};
两个重要成员
(1) events - 事件掩码(epoll_ctl 为输入参数,epoll_wait为输出参数)
作用: 告诉 epoll 你要监视这个文件描述符的什么事件。
常用事件标志:
// 基础事件
ev.events = EPOLLIN; // 可读事件
ev.events = EPOLLOUT; // 可写事件
ev.events = EPOLLPRI; // 紧急数据可读// 组合事件
ev.events = EPOLLIN | EPOLLOUT; // 同时监视可读和可写
ev.events = EPOLLIN | EPOLLET; // 可读 + 边缘触发模式
ev.events = EPOLLIN | EPOLLOUT | EPOLLET; // 可读可写 + 边缘触发
(2)data - 用户数据(epoll_ctl 为输入参数,epoll_wait为输出参数)
作用: 当事件发生时,epoll 会把这个数据原样返回给你,让你知道是哪个文件描述符就绪了。
但是,需要注意的是:在使用 epoll_ctl 前,你必须主动填入 data 字段的内容。
data的作用就是存储文件描述符
- epoll_wait - 等待事件
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
详细说明
作用: 等待在兴趣列表中的文件描述符上发生事件,返回就绪的事件列表。
参数:
- epfd: epoll 实例的文件描述符
- events: 指向 epoll_event 数组的指针,用于接收就绪事件
- maxevents: events 数组的大小,必须大于 0
- timeout: 超时时间(毫秒)
- -1: 无限阻塞,直到有事件发生
- 0: 立即返回,用于轮询
- 大于0: 阻塞指定的毫秒数
返回值:
- 大于0: 返回就绪的文件描述符数量
- 0: 超时,没有事件发生
- -1: 出错,设置 errno
简单的 epoll 服务器:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>#define MAX_EVENTS 10
#define PORT 8080
#define BUFFER_SIZE 1024void set_nonblocking(int sock) {int flags = fcntl(sock, F_GETFL, 0);fcntl(sock, F_SETFL, flags | O_NONBLOCK);
}int main() {int server_fd, epoll_fd;struct sockaddr_in address;struct epoll_event ev, events[MAX_EVENTS];// 创建服务器套接字server_fd = socket(AF_INET, SOCK_STREAM, 0);address.sin_family = AF_INET;address.sin_addr.s_addr = INADDR_ANY;address.sin_port = htons(PORT);bind(server_fd, (struct sockaddr*)&address, sizeof(address));listen(server_fd, 128);// 创建 epoll 实例epoll_fd = epoll_create1(0);// 添加服务器套接字到 epollev.events = EPOLLIN;ev.data.fd = server_fd;epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev);printf("Epoll 服务器启动,监听端口 %d...\n", PORT);while (1) {int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);for (int i = 0; i < nfds; i++) {if (events[i].data.fd == server_fd) {// 新的客户端连接struct sockaddr_in client_addr;socklen_t client_len = sizeof(client_addr);int client_fd = accept(server_fd, (struct sockaddr*)&client_addr, &client_len);set_nonblocking(client_fd);// 添加客户端到 epollev.events = EPOLLIN | EPOLLET; // 边缘触发模式ev.data.fd = client_fd;epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev);printf("新客户端连接: %s:%d, fd=%d\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), client_fd);} else {// 客户端数据可读int client_fd = events[i].data.fd;char buffer[BUFFER_SIZE];if (events[i].events & EPOLLIN) {// ET 模式:必须循环读取直到 EAGAINwhile (1) {ssize_t count = read(client_fd, buffer, BUFFER_SIZE - 1);if (count == -1) {if (errno != EAGAIN && errno != EWOULDBLOCK) {perror("read");close(client_fd);epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, NULL);}break;} else if (count == 0) {// 客户端断开连接printf("客户端 fd=%d 断开连接\n", client_fd);close(client_fd);epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, NULL);break;} else {buffer[count] = '\0';printf("从 fd=%d 收到: %s", client_fd, buffer);// 回显数据write(client_fd, buffer, count);}}}if (events[i].events & (EPOLLERR | EPOLLHUP)) {printf("客户端 fd=%d 错误或挂起\n", client_fd);close(client_fd);epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, NULL);}}}}close(epoll_fd);close(server_fd);return 0;
}
3.2.3.2 epoll的原理
我来详细讲解 epoll 的核心原理和三个 API 对应的内核过程。
epoll 在内核中维护了两个关键数据结构:
// 内核中的 epoll 实例(简化)
struct eventpoll {// 1. 红黑树根节点 - 存储所有注册的 fdstruct rb_root rbr;// 2. 就绪队列 - 存储所有就绪的 fd struct list_head rdllist;};
三个核心 API 的内核过程
- epoll_create - 创建 epoll 实例
用户空间调用:
int epfd = epoll_create1(0);
内核过程:
- 分配一个 struct eventpoll 对象
- 初始化红黑树 rbr(空树)
- 初始化就绪队列 rdllist(空队列)
- 返回一个文件描述符 epfd,关联到这个 eventpoll 对象
结果: 创建了一个空的"监视中心"
- epoll_ctl - 注册文件描述符
用户空间调用:
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);
epoll_ctl 函数(ADD操作)
关键步骤详解:
- 查找红黑树:检查 该sockfd 是否已经注册,已经创建,则放回错误,反之,接着下面操作
- 创建 epitem:为这个fd 创建一个监控项
- 将这个epitem插入红黑数中
- 注册回调函数:向该socket 的文件操作表注册 ep_poll_callback(注册即加入),这步是关键,也是为什么epoll性能大大提升的原因。
struct epitem {struct rb_node rbn; // 红黑树节点struct list_head rdllink; // 就绪队列节点int fd; // 监控的文件描述符struct epoll_event event; // 用户设置的事件// ... 其他字段
};
什么是"文件操作表"?
在 Linux 内核中,每个文件描述符背后都有一个 struct file 结构,其中包含:
struct file {// ...struct file_operations *f_op; // 文件操作表void *private_data; // 私有数据// ...
};// 文件操作表 - 函数指针集合
struct file_operations {ssize_t (*read)(struct file *, char __user *, size_t, loff_t *);ssize_t (*write)(struct file *, const char __user *, size_t, loff_t *);unsigned int (*poll)(struct file *, struct poll_table_struct *);// ... 其他操作
};
- epoll_wait - 等待事件
用户空间调用:
struct epoll_event events[10];
int n = epoll_wait(epfd, events, 10, -1);
操作:
(1)检查就绪队列是否为空,为空,则进程进入等待队列,等待就绪队列不为空为止。
(2)不为空,则将就绪队列的事件的信息拷贝到struct epoll_event *events
中,epoll_wait
返回就绪队列节点个数
(3)LT模式:如果socket还有数据,不动,仍在就绪队列;ET模式:不管是否还有数据,节点都从就绪队列移除
红黑树的作用:
红黑树是 epoll 的"注册表"或"花名册",它记录了所有你要求 epoll 监视的文件描述符。
- 持久化存储:记住所有通过 epoll_ctl(ADD) 注册的 fd
- 快速查找:O(logN) 时间查找、插入、删除 fd
- 去重保证:防止同一个 fd 被重复注册
- epoll_ctl 的相关操作都是通过红黑树实现的。
epoll_wait 函数其实和网络中的accept函数类似,它只是提取结果的作用,不参与过程的实现。epoll_wait 其实只是将就绪队列中的事件信息拷贝到了用户层。那么操作系统为什么可以实现,"记录册"即红黑树中的socket的事件一就绪,就自动形成节点,放入就绪队列中呢?
其实,这就是我们之前说的回调函数的功劳。每一个红黑树中的socket的文件操作表中都有这个回调函数,这个回调函数的作用就是该socket一就绪,就直接被插入到就绪队列中。
3.2.3.3 水平触发 vs 边缘触发
水平触发 (LT)(默认模式) - 像"持续亮着的灯"
- 只要还有数据,灯就一直亮着
- 你看到灯亮,去拿一部分数据
- 如果还有剩余数据,灯继续亮着提醒你
- 直到你拿完所有数据,灯才熄灭
边缘触发 (ET) - 像"按一次响一次的门铃"
- 只在状态变化时响一次
- 门铃响,你去开门拿数据
- 如果没拿完,门铃不会再响
- 你必须一次性把所有数据搬进来
两者在epoll中的差异主要是:
看epoll_wait 返回时决定是否重新放回就绪队列
- LT:如果socket还有数据,节点不动,仍在就绪队列中
- ET:不管socket是否还有数据,节点都从就绪队列移除
3.2.3.4 epoll相对于select/poll的优点
select/poll:
select:
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
poll:
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd {int fd; /* 文件描述符 */short events; /* 我们关心的事件 (输入参数) */short revents; /* 实际发生的事件 (输出参数) */
};
select/poll 的瓶颈:
- 调用前,需要传递所有要监视的文件描述符,假设a个
- 函数检测时,内核需要线性扫描所有文件描述符(不止是a个,是进程打开的所有文件)——遍历
- 我们拿结果时,我们需要线性扫描所有返回的文件描述符(a个)——遍历
在理解了 select/poll 的痛点后,epoll 的出现就是为了解决它们的根本问题:
epoll 的解决方案:
- epoll_ctl :fd 一次注册,长期有效,直接被红黑树记录在册
// select/poll:每次调用都要传递所有 fd
// 伪代码:
while (1) {reset_fd_sets(); // 每次都要重置add_all_fds_to_sets(); // 每次都要添加所有 fdselect(max_fd+1, &readfds, ...); // 每次传递所有 fd
}// epoll:fd一次注册,永久有效
epoll_ctl(epfd, EPOLL_CTL_ADD, fd1, &event); // 注册 fd1
epoll_ctl(epfd, EPOLL_CTL_ADD, fd2, &event); // 注册 fd2
// ... 注册所有需要的 fdwhile (1) {epoll_wait(epfd, events, MAX, -1); // 不需要重新注册!
}
关键区别:
- select/poll:每次调用都是"一次性"的监视
- epoll:在内核中维护一个持久的兴趣列表
- 内核检测:O(1) 事件通知
epoll 内核实现的核心数据结构:
// 内核内部数据结构(简化理解)
struct epoll_instance {struct rb_root rbr; // 红黑树:存储所有注册的 fdstruct list_head rdlist; // 就绪链表:存储就绪的 fd(快速获取)
};
- epoll_ctl - 管理兴趣列表:通过红黑树快速查找,达到O(1)
- epoll_wait - 就绪事件的捕获:就绪链表,达到O(1)
- epoll_wait 调用之后:只返回就绪的 fd
// select/poll:需要遍历所有 fd
for (int i = 0; i <= max_fd; i++) {if (FD_ISSET(i, &readfds)) { // 可能检查 1000 次,只有 1 个就绪handle_fd(i);}
}// epoll:只返回就绪的 fd
int nfds = epoll_wait(epfd, events, MAX, -1);
for (int i = 0; i < nfds; i++) { // 直接遍历就绪的 fdhandle_fd(events[i].data.fd); // 比如只有 3 个就绪,就循环 3 次
}