当前位置: 首页 > news >正文

Linux C IO多路复用

在上一节利用管道实现了一个简单的聊天室,但这个聊天室有一个很明显的问题就是,当A处于读阻塞情况下是不能向B发送消息的,只有收到B的消息才能发送。如何实现同时既能接受B的消息,又能向其发送消息?

很遗憾,依靠基本的编程思维似乎无法解决这个问题。因为当A处于读阻塞状态时,程序是不可能往下执行的。那么现在的聊天软件又是如何实现同时收发消息的?这个时候,我们就需要把问题交给OS帮我们来解决,操作系统的内核通过控制底层操作帮助我们实现了一些逻辑上的”并行“,也就是我们今天所说的 IO多路复用

什么是IO多路复用?

IO多路复用(I/O Multiplexing)是一种同时监控多个文件描述符(socket、管道、文件等)的IO状态的机制。当其中任意一个或多个文件描述符就绪(可读、可写或异常)时,内核会通知应用程序,从而避免阻塞等待单个IO操作。

在传统阻塞IO模型中,每个IO操作(如read/write)会阻塞线程直到完成。若需处理多个连接(如Web服务器),必须为每个连接创建一个线程/进程,导致资源浪费(线程上下文切换、内存占用)。
IO多路复用通过单线程监控多个IO事件,实现高并发、低资源消耗

当聊天室使用了IO多路复用,就可以同时监控读和写对应的文件描述符,任何一个文件描述符就绪就会立即响应,然后继续轮询等待,从而实现了逻辑上的“并行”。

核心机制与系统调用

IO多路复用包含两种系统调用,select与epoll。他们之间的实现方式是完全不同的

select

底层实现

  • 维护一个位图(fd集合),每次调用需遍历所有fd检查就绪状态,轮询和通知由OS完成。

  • 位图机制:通过一个固定大小的位图(fd_set)来管理文件描述符(fd)。每个 fd 占用一个位,最大支持的 fd 数量通常为 1024。

  • 支持跨平台(POSIX标准)。

  • 线性扫描:每次调用时,内核会遍历所有 fd,检查它们是否就绪。时间复杂度为 O(n),其中 n 是最大 fd 数。fd数量增加时性能下降。

  • 每次调用都需要重新传递 fd 集合:调用时需要将用户态的 fd 集合拷贝到内核态,内核处理后再次拷贝回用户态,效率较低。

使用流程

需要使用的系统调用:

#include <sys/select.h>
#include <sys/time.h>
//readset、writeset、exceptionset都是fd_set集合
//集合的相关操作如下:
void FD_ZERO(fd_set *fdset); /* 将所有fd清零 */
void FD_SET(int fd, fd_set *fdset);/* 增加一个fd */
void FD_CLR(int fd, fd_set *fdset);/* 删除一个fd */
int FD_ISSET(int fd, fd_set *fdset);/* 判断一个fd是否有设置 */
int select(int maxfd, fd_set *readset,fd_set *writeset, fd_set *exceptionset,
struct timeval * timeout);

这里先简要地介绍一下 select 使用流程:

  • 首先,需要先为监听集合申请内存;
  • 使用 FD_ZERO 初始化监听集合;
  • 将所有需要监听的文件描述符使用 FD_SET 加入监听集合;
  • 调用 select 系统调用使进程陷入阻塞状态;
  • 从阻塞当中被唤醒以后,使用 FD_ISSET 遍历所有监听的文件描述符,找到真正就绪的文件描述符;
  • 对就绪的文件描述符执行IO操作。

存在的问题:

  • 每调用一次select 就需要3个事件类型的fd_set需从用户空间拷贝到内核空间去,返回时select也会把保留了活跃事件的fd_set返回(从内核拷贝到用户空间)。当fd_set数据大的时候,这个过程消耗是很大的。
  • select需要逐个遍历fd_set集合 ,然后去检查对应fd的可读写状态,如果fd_set 数据量多,那么遍历fd_set 就是一个比较耗时的过程。
  • fd_set是个集合类型的数据结构有长度限制,32位系统长度1024,64位系统长度2048,这个就限制了select最多能同时监控1024个连接。

系统调用

FR_ZERO

清空一个文件描述符集合。

void FD_ZERO(fd_set *fdset);fd_set readfds;
FD_ZERO(&readfds); // 清空集合
  • fdset:指向 fd_set 类型的指针,表示要清空的文件描述符集合。
  • fd_set 中的所有位清零,表示集合中没有任何文件描述符被设置。
FD_SET

将一个文件描述符加入到集合中。

void FD_SET(int fd, fd_set *fdset);fd_set readfds;
FD_ZERO(&readfds); // 清空集合
FD_SET(sockfd1, &readfds); // 将 sockfd1 加入集合
FD_SET(sockfd2, &readfds); // 将 sockfd2 加入集合
  • fd:要加入集合的文件描述符。

  • fdset:指向 fd_set 类型的指针,表示要操作的文件描述符集合。

  • 将指定的文件描述符 fd 设置为 1,表示该文件描述符被加入到集合中。
select

监控多个文件描述符的可读、可写和异常状态。

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
  • nfds:需要监控的最大文件描述符值加 1(即 maxfd + 1)。OS会从0~nfds的范围内的轮询,从而减少不必要的监听(nfds+1~1024的位图被忽略)

  • readfds:指向 fd_set 类型的指针,表示要监控的可读文件描述符集合。

  • writefds:指向 fd_set 类型的指针,表示要监控的可写文件描述符集合。

  • exceptfds:指向 fd_set 类型的指针,表示要监控的异常状态文件描述符集合。

  • timeout:指向 struct timeval 类型的指针,表示超时时间。如果为 NULL,表示阻塞等待;如果为 {0, 0},表示非阻塞。

返回值

  • 大于 0:表示就绪的文件描述符数量。

  • 等于 0:表示超时,没有任何文件描述符就绪。

  • 小于 0:表示出错。

select 函数会阻塞当前线程,直到集合中的某个文件描述符就绪(可读、可写或异常)或超时。如果某个文件描述符就绪,select 会返回就绪的文件描述符数量,并修改对应的集合。

示例:

fd_set readfds;
struct timeval tv;FD_ZERO(&readfds);
FD_SET(sockfd1, &readfds);
FD_SET(sockfd2, &readfds);tv.tv_sec = 5; // 设置超时时间为 5 秒
tv.tv_usec = 0;int ret = select(sockfd2 + 1, &readfds, NULL, NULL, &tv);
if (ret > 0) {if (FD_ISSET(sockfd1, &readfds)) {printf("sockfd1 is ready for reading\n");}if (FD_ISSET(sockfd2, &readfds)) {printf("sockfd2 is ready for reading\n");}
} else if (ret == 0) {printf("Timeout occurred\n");
} else {printf("Error occurred\n");
}
FD_ISSET

检查一个文件描述符是否在集合中。

int FD_ISSET(int fd, const fd_set *fdset);
  • fd:要检查的文件描述符。

  • fdset:指向 fd_set 类型的指针,表示要检查的文件描述符集合。

  • 非零:表示文件描述符 fd 在集合中。

  • 零:表示文件描述符 fd 不在集合中。

检查指定的文件描述符 fd 是否被设置为 1,即是否在集合中。

FD_CLR

FD_CLR 的主要功能是从一个文件描述符集合中移除一个指定的文件描述符。

  • 当某个文件描述符不再需要被监控时(例如,关闭了某个 socket)。

  • select 调用后,需要清理某些不再需要的文件描述符。

void FD_CLR(int fd, fd_set *fdset);
  • fd:要从集合中移除的文件描述符。

  • fdset:指向 fd_set 类型的指针,表示要操作的文件描述符集合。

实战:使用select对于基于管道的简易聊天程序进行改进:

基于上节我们通过管道实现的简易聊天程序,我们对其进行改进实现同时收发消息:

//客户端Aint main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf("waiting for connect\n");int fdr = open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, "open fdr error");int fdw = open(argv[2], O_RDWR);ERROR_CHECK(fdr, -1, "open fdw error");printf("connected\n");char buf[1024];fd_set rdset;while (1){FD_ZERO(&rdset);FD_SET(STDIN_FILENO, &rdset);FD_SET(fdr, &rdset);select(fdr+1, &rdset, NULL, NULL, NULL);if(FD_ISSET(STDIN_FILENO, &rdset)){memset(buf, 0, sizeof(buf));int ret = read(STDIN_FILENO, buf, sizeof(buf));ERROR_CHECK(ret, -1, "STDIN ERROR");write(fdw, buf, strlen(buf));}if(FD_ISSET(fdr, &rdset)){memset(buf, 0, sizeof(buf));int ret = read(fdr, buf, sizeof(buf));ERROR_CHECK(ret, -1, "STDIN ERROR");if(ret == 0){printf("B is disconnected\n");break;}printf("B:%s", buf);}}return 0;
}//客户端Bint main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf("waiting for connect\n");int fdw = open(argv[1], O_RDWR);ERROR_CHECK(fdw, -1, "open fdw error");int fdr = open(argv[2], O_RDONLY);ERROR_CHECK(fdr, -1, "open fdr error");printf("connected\n");char buf[1024];fd_set rdset;while (1){FD_ZERO(&rdset);FD_SET(STDIN_FILENO, &rdset);FD_SET(fdr, &rdset);select(fdr+1, &rdset, NULL, NULL, NULL);if(FD_ISSET(STDIN_FILENO, &rdset)){memset(buf, 0, sizeof(buf));int ret = read(STDIN_FILENO, buf, sizeof(buf));ERROR_CHECK(ret, -1, "STDIN ERROR");write(fdw, buf, strlen(buf));}if(FD_ISSET(fdr, &rdset)){memset(buf, 0, sizeof(buf));int ret = read(fdr, buf, sizeof(buf));ERROR_CHECK(ret, -1, "STDIN ERROR");if(ret == 0){printf("A is disconnected\n");break;}printf("A:%s", buf);}}return 0;
}

输出结果:

ubuntu@ubuntu:~/MyProject/Linux/IO$ ./selectA 1.pipe 2.pipe
waiting for connect
connected
hello!
B:who are you?
B:what are you doing?
I am A
I am eat dinner?
goodbye!
^Cubuntu@ubuntu:~/MyProject/Linux/IO$ ./selectB 1.pipe 2.pipe
waiting for connect
connected
A:hello!
who are you?
what are you doing?
A:I am A
A:I am eat dinner?
A:goodbye!
A is disconnected

可以看到,A和B可以同时收发消息,无需等待收到消息之后再发送

epoll(Linux特有)

在早期计算机网络并不发达,所以并发网络请求并不会很高,select模型也足够使用了,但是随着网络的高速发展,高并发的网络请求程序越来越多,而select模式下 fd_set 长度限制就开始成为了致命的缺陷。下图显示了随着并发量的提升,不同IO多路复用机制的响应速度。

显然,根据select的底层实现,不难发现它有如下缺陷:

  • 位图靠数组实现,当改变长度需要重新编译
  • 每次从内核态读取就绪集合,和重新将文件描述符放入集合会产生大量的内核态和用户态之间的冗余拷贝
  • 监听集合和就绪集合的耦合度高
  • 就绪集合的处理性能低

吸取了select的教训,epoll模式就不再使用数组的方式来保存自己所监控的fd信息了,epoll 可以在内核态空间当中维持两个数据结构:监听事件集合和就绪事件队列

监听事件集合用来存储所有需要关注的设备(即文件描述符)和对应操作(比如读、写、挂起和异常等等),当监听的设备有事件产生时,比如网卡上接收到了数据并传输到了缓冲区当中时,硬件会采用中断等方式通知操作系统,操作系统会将就绪事件拷贝到就绪事件队列中,并且找到阻塞在 epoll_wait 的线程,让其就绪。监听事件集合通常是一个红黑树,就绪事件队列是一个线性表。

底层实现

  • 红黑树 + 就绪链表:使用红黑树管理所有注册的 fd,当 fd 就绪时,将其加入就绪链表。时间复杂度为 O(1)。

  • 边缘触发(ET)和水平触发(LT)
    • ET:仅通知一次,需一次性处理完所有数据(减少事件触发次数,高效但需非阻塞IO)。

    • LT:默认模式,fd就绪后,若未处理完,下次epoll_wait仍会通知。

  • 事件驱动:内核维护一个事件表,只返回已经就绪的 fd,无需每次遍历所有 fd。

  • 无需重复传递 fd 集合:通过 epoll_ctl 动态管理 fd,无需在每次调用时重新传递 fd 集合。

  • 优势

    • 时间复杂度O(1):仅返回就绪的fd,无需遍历。

    • 无fd数量限制:理论上仅受系统内存限制。

    • 高效:通过epoll_ctl注册/修改事件,避免每次调用时重复传递fd集合。

有了这些优势之后, epoll 逐渐取代了 select 的市场地位,尤其是在管理巨大量连接的高
并发场景中, epoll 的性能要远超 select 。

使用流程

需要使用的系统调用

#include<sys/epoll.h>
int epoll_create(int size);   //创建 epoll 实例
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); //注册文件描述符
//等待事件
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);//用于描述 epoll 就绪的事件及其关联数据。
struct epoll_event {uint32_t events;    //表示文件描述符上发生的事件类型。EPOLLIN 表示读, EPOLLOUT 表示写epoll_data_t data;  //存储与事件相关的数据,具体类型由用户决定。
};//用于存储就绪事件中与事件相关的不同类型的数据。
typedef union epoll_data {void*ptr;int fd;        //存储就绪事件对应的文件描述符uint32_t u32;uint64_t u64;
} epoll_data_t;
events 是一个 32 位的无符号整数,用于表示文件描述符上发生的事件类型。它可以是一个或多个事件标志的组合(通过位或操作)。常见的事件类型包括:
EPOLLIN:表示文件描述符可读。
EPOLLOUT:表示文件描述符可写。
EPOLLRDHUP:表示对端关闭连接(仅适用于 TCP 套接字)。
EPOLLPRI:表示有紧急数据可读。
EPOLLERR:表示发生错误。
EPOLLHUP:表示挂起(文件描述符关闭)。
EPOLLET:表示边缘触发模式(Edge-Triggered)。
EPOLLONESHOT:表示一次性事件,事件处理完成后需要重新注册。
  • 创建 epoll 实例:使用 epoll_createepoll_create1 创建一个 epoll 文件描述符(epfd)。这个文件描述符用于后续的 epoll 操作。

  • 注册文件描述符:使用 epoll_ctl 将需要监控的文件描述符(如 socket)注册到 epoll 实例中,并指定感兴趣的事件(如可读、可写)。

  • 等待事件:使用 epoll_wait 等待 epoll 实例中的事件。epoll_wait 会阻塞当前线程,直到有文件描述符就绪或超时。

  • 处理事件:epoll_wait 返回时,它会返回就绪的文件描述符数量(nfds),并填充 events 数组。程序可以遍历 events 数组,处理每个就绪的文件描述符。

  • 清理资源:当不再需要 epoll 实例时,可以关闭 epoll 文件描述符,释放相关资源。

 系统调用

epoll_create 和 epoll_create1

使用 epoll_createepoll_create1 创建一个 epoll 文件描述符(epfd)。这个文件描述符用于后续的 epoll 操作。

int epoll_create(int size);
int epoll_create1(int flags);
  • size:建议的初始文件描述符数量(一般选择1即可)。
  • flags:可以设置一些标志,如 EPOLL_CLOEXEC(设置文件描述符为关闭执行)。
  • 成功时返回一个有效的 epoll 文件描述符(非负整数)。

  • 失败时返回 -1,并设置 errno 以指示错误原因。

 epoll_ctl

使用 epoll_ctl 将需要监控的文件描述符(如 socket)注册到 epoll 实例中,并指定感兴趣的事件(如可读、可写)。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  • epfdepoll 实例的文件描述符。

  • op:操作类型,可以是 EPOLL_CTL_ADD(添加)、EPOLL_CTL_MOD(修改)或 EPOLL_CTL_DEL(删除)。

  • fd:要操作的文件描述符。

  • event:指向 epoll_event 结构的指针,包含要监控的事件和附加数据。

  • 成功时返回 0

  • 失败时返回 -1,具体错误码可以通过 errno 获取。

epoll_wait

使用 epoll_wait 等待 epoll 实例中的事件。epoll_wait 会阻塞当前线程,直到有文件描述符就绪或超时。

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
  • epfdepoll 实例的文件描述符。

  • events:指向 epoll_event 数组的指针,用于存储就绪的事件。

  • maxeventsevents 数组的最大容量,等同于插入就绪的文件描述符数量。

  • timeout:超时时间(单位为毫秒),-1 表示阻塞等待,0 表示非阻塞。

  • 返回值 > 0:表示有就绪的文件描述符,返回值为就绪的文件描述符数量。

  • 返回值 == 0:表示超时,没有任何文件描述符就绪。

  • 返回值 < 0:表示发生错误,具体错误码可以通过 errno 获取。

示例:

int main(int argc, char const *argv[])
{int epfd = epoll_create1(0);ERROR_CHECK(epfd, -1, "epoll_create")int sockfd = socket(AF_INET, SOCK_STREAM, 0);ERROR_CHECK(sockfd, -1, "socket");struct epoll_event ev;ev.events = EPOLLIN; // 监听可读事件ev.data.fd = sockfd; // 存储文件描述符int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);ERROR_CHECK(ret, -1, "epoll_ctl: add");struct epoll_event events[10];int nfds = epoll_wait(epfd, events, 10, -1);ERROR_CHECK(nfds, -1, "epoll_wait");char buf[1024];for (int i = 0; i < nfds; i++) {if (events[i].events & EPOLLIN) {int fd = events[i].data.fd; // 获取文件描述符// 处理可读事件read(fd, buf, sizeof(buf));}}return 0;
}

 实战:使用epoll对于基于管道的简易聊天程序进行改进

//客户A
//客户端A
#include<54func.h>
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf("waiting for connect\n");int fdr = open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, "open fdr error");int fdw = open(argv[2], O_RDWR);ERROR_CHECK(fdr, -1, "open fdw error");printf("connected\n");int epfd = epoll_create(1);ERROR_CHECK(epfd, -1, "epoll_create");struct epoll_event ev, evs[2];ev.events = EPOLLIN; ev.data.fd = STDIN_FILENO;int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev);ERROR_CHECK(ret, -1, "epoll_ctl: fdr");ev.data.fd = fdr; ret = epoll_ctl(epfd, EPOLL_CTL_ADD, fdr, &ev);ERROR_CHECK(ret, -1, "epoll_ctl: fdw");char buf[1024];int readyNum = 0;while (1){readyNum = epoll_wait(epfd, evs, 2, -1);for(int i = 0; i < readyNum; i++){if(evs[i].data.fd == STDIN_FILENO){memset(buf, 0, sizeof(buf));read(STDIN_FILENO, buf, sizeof(buf));write(fdw, buf, strlen(buf));}else if(evs[i].data.fd == fdr){memset(buf, 0, sizeof(buf));int sret = read(fdr, buf, sizeof(buf));if(sret == 0){printf("B is disconnected\n");return 0;}printf("B:%s",buf);}}}return 0;
}
//客户B
//客户端B
#include<54func.h>
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf("waiting for connect\n");int fdw = open(argv[1], O_RDWR);ERROR_CHECK(fdw, -1, "open fdw error");int fdr = open(argv[2], O_RDONLY);ERROR_CHECK(fdr, -1, "open fdr error");printf("connected\n");int epfd = epoll_create(1);ERROR_CHECK(epfd, -1, "epoll_create");struct epoll_event ev, evs[2];ev.events = EPOLLIN; ev.data.fd = STDIN_FILENO;int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev);ERROR_CHECK(ret, -1, "epoll_ctl: fdr");ev.data.fd = fdr; ret = epoll_ctl(epfd, EPOLL_CTL_ADD, fdr, &ev);ERROR_CHECK(ret, -1, "epoll_ctl: fdw");char buf[1024];int readyNum = 0;while (1){readyNum = epoll_wait(epfd, evs, 2, -1);for(int i = 0; i < readyNum; i++){if(evs[i].data.fd == STDIN_FILENO){memset(buf, 0, sizeof(buf));read(STDIN_FILENO, buf, sizeof(buf));write(fdw, buf, strlen(buf));}else if(evs[i].data.fd == fdr){memset(buf, 0, sizeof(buf));int sret = read(fdr, buf, sizeof(buf));if(sret == 0){printf("A is disconnected\n");return 0;}printf("A:%s",buf);}}}return 0;
}

输出结果:

//客户A
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./epollA 1.pipe 2.pipe 
waiting for connect
connected
hello I am A
who are you?
B:I am B
Am I alone?
B:You are not alone.
Someboday tell me, Why it feel more real when I dream than truth?
B:There is some fiction in your truth, and some truth in your fiction.
B is disconnected
//客户B
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./epollB 1.pipe  2.pipe 
waiting for connect
connected
A:hello I am A
A:who are you?
I am B
A:Am I alone?
You are not alone.
A:Someboday tell me, Why it feel more real when I dream than truth?
There is some fiction in your truth, and some truth in your fiction.
^C

epoll的边缘触发

epoll_wait 的就绪触发有两种方式:一种是默认的水平触发方式(Level-triggered),另一种是边缘触发模式(Edge-triggered)。以读事件为例子:水平触发模式下,只要缓冲区当中存在数据,就可以使 epoll_wait 就绪在边缘触发的情况下,如果缓冲区中存在数据,但是数据一直没有增多,那么 epoll_wait 就不会就绪,只有缓冲区的数据增多的时候,即下图中绿色的上升沿部分时,才能使 epoll_wait 就绪。

使用水平触发的话,线程能够以更短的响应时间来处理事件,但是这可能会导致饥饿问题,如果存在某个事件传输的数据量过大,那么线的epoll_wait就会多次就绪直到处理完所有数据为止,而一些其他的任务所占用的资源就会相对变少而一直无法得到响应。使用边缘触发可以避免这个问题。为了确保读操作可以将所有数据读完,可以考虑使用循环配合非阻塞的形式来处理。
在线程池架构中,主线程通常会将实际的IO交给子线程即工作线程完成,采用边缘触发可以有效地降低主线程的响应频率,提高整体的性能。除此以外,如果一次请求对应一次响应是用户追求的通信模式,那么边缘触发正好符合。 

设置文件描述符为非阻塞模式

在边缘触发模式下,文件描述符必须设置为非阻塞模式(O_NONBLOCK),否则可能会导致程序阻塞在 readwrite 操作上。

我们需要使用fcntl设置文件的状态

int flags = fcntl(sockfd, F_GETFL, 0);
ERROR_CHECK(flags, -1, "fcntl:get");int ret = fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
ERROR_CHECK(ret, -1, "fcntl: set");
设置边缘触发模式

在使用 epoll_ctl 注册文件描述符时,通过在 events 字段中添加 EPOLLET 标志来启用边缘触发模式。

int epfd = epoll_create(1);
ERROR_CHECK(epfd, -1, "epoll_create");
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // 启用边缘触发模式,监听可读事件
ev.data.fd = sockfd;
int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev)
ERROR_CHECK(ret, -1, "epoll_ctl: add")
处理边缘触发事件

在边缘触发模式下,必须确保在每次通知后处理完所有数据。否则,如果数据没有被完全读取或写入,可能会错过后续的数据。

struct epoll_event events[10];
int nfds = epoll_wait(epfd, events, 10, -1);
ERROR_CHECK(nfds, -1, "epoll_wait");for (int i = 0; i < nfds; i++) {if (events[i].events & EPOLLIN) {char buf[1024];// 使用非阻塞读取,确保读取所有数据while (1) {memset(buf, 0, sizeof(buf));ssize_t ret = read(sockfd, buf, sizeof(buf));ERROR_CHECK(ret, -1, "read");if(ret == -1 || ret == 0){printf("finish\n");break;}printf("%s\n",buf);}}
}
处理所有数据

在边缘触发模式下,必须确保在每次通知后处理完所有数据。如果数据没有被完全读取或写入,可能会错过后续的数据。

 一次性触发模式(EPOLLONESHOT)

如果需要在处理完事件后自动禁用该文件描述符的事件通知,可以结合 EPOLLONESHOT 标志使用。

ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT; // 启用边缘触发模式和一次性触发模式
ev.data.fd = sockfd;int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);
ERROR_CHECK(ret, -1, "epoll_ctl");
使用水平触发和边缘触发的效果区别:

我们先看一下对文件描述符使用阻塞和非阻塞的效果,设置边缘触发必须将文件描述符设置为非阻塞:

写端:

int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 2);int fdw = open(argv[1], O_RDWR);ERROR_CHECK(fdw, -1, "open");char buf[256];while (1){read(STDIN_FILENO, buf, sizeof(buf));write(fdw, buf, strlen(buf));}close(fdw);return 0;
}

 读端为阻塞的情况:

int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 2);int fdr = open(argv[1], O_RDONLY);char buf[20];while (1){memset(buf, 0, sizeof(buf));ssize_t sret = read(fdr, buf, sizeof(buf));printf("%ld %s\n", sret, buf);if(sret == 0)    break;sleep(1);}close(fdr);return 0;
}

输出结果:

//写端
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./testETW 1.pipe
hello
what are you doing?
^C
//读端
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./testET 1.pipe
6 hello
20 what are you doing?
0 

可以看到,当缓冲区没有数据时,读端管道就会阻塞等待。只有当写端关闭时,读端才会变为非阻塞状态,当写端没有数据时,读端收到的是0进而退出。 

读端为非阻塞的情况:

int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 2);int fdr = open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, "open");int flags = fcntl(fdr, F_GETFL, 0);ERROR_CHECK(flags, -1, "fcntl:get");int ret = fcntl(fdr, F_SETFL, flags | O_NONBLOCK);   //设置为非阻塞ERROR_CHECK(ret, -1, "fcntl:set");char buf[20];while (1){memset(buf, 0, sizeof(buf));ssize_t sret = read(fdr, buf, sizeof(buf));printf("%ld %s\n", sret, buf);if(sret == 0)    break;sleep(1);}close(fdr);return 0;
}

输出结果:

//写端
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./testETW 1.pipe
hello
who?
^C
//读端
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./testET 1.pipe
-1 
-1 
6 hello-1 
-1 
-1 
-1 
6 who?0 

把读端设置为非阻塞状态时,不会因为没有数据而等待。而是不断去查询,注意这里与阻塞状态下是有区别的,当端没有收到数据时,如果写端未断开,读端会非阻塞的一直收到 sret = -1 ;当写端断开时,读端才会收到 sret = 0。非阻塞模式允许程序在 I/O 操作无法完成时立即返回,从而可以快速处理其他任务,提高程序的响应速度。为什么在使用边缘触发时必须设置文件描述符为非阻塞模式这个问题我们先放一放,先看水平触发和边缘触发的区别。

现在我们继续观察使用水平触发和边缘触发读取数据的区别:

为了使结果便于辨别,我们把接收缓冲区调小

使用水平触发:

//读端
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 2);printf("waiting for connect\n");int fdr = open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, "open fdr error");printf("connected\n");int epfd = epoll_create(1);ERROR_CHECK(epfd, -1, "epoll_create");struct epoll_event ev, evs[2];ev.data.fd = fdr; ev.events = EPOLLIN ;int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, fdr, &ev);ERROR_CHECK(ret, -1, "epoll_ctl: fdr");char buf[5];int readyNum = 0;while (1){readyNum = epoll_wait(epfd, evs, 2, -1);printf("epoll wait is ready\n");for(int i = 0; i < readyNum; i++){if(evs[i].data.fd == fdr){memset(buf, 0, sizeof(buf));int sret = read(fdr, buf, sizeof(buf));if(sret == -1){printf("B is disconnected\n");return 0;}printf("B:%s\n",buf);}}}return 0;
}//写端
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf("waiting for connect\n");int fdw = open(argv[1], O_RDWR);ERROR_CHECK(fdw, -1, "open fdw error");printf("connected\n");int epfd = epoll_create(1);ERROR_CHECK(epfd, -1, "epoll_create");struct epoll_event ev, evs[2];ev.events = EPOLLIN; ev.data.fd = STDIN_FILENO;int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev);ERROR_CHECK(ret, -1, "epoll_ctl: fdw");char buf[5];int readyNum = 0;while (1){readyNum = epoll_wait(epfd, evs, 2, -1);for(int i = 0; i < readyNum; i++){if(evs[i].data.fd == STDIN_FILENO){memset(buf, 0, sizeof(buf));read(STDIN_FILENO, buf, sizeof(buf));write(fdw, buf, strlen(buf));}}}return 0;
}

输出结果:

//客户端A
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./ETA1 1.pipe
waiting for connect
connected
epoll wait is ready
B:hello
epoll wait is ready
B:, I a
epoll wait is ready
B:m B, 
epoll wait is ready
B:what 
epoll wait is ready
B:is yo
epoll wait is ready
B:ur na
epoll wait is ready
B:me?
//客户端B
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./ETB 1.pipe 2.pipe
waiting for connect
connected
hello, I am B, what is your name?

可以观察到,在使用水平触发时,我们并未一次性读取全部数据,而是部分读取,只要读端有数据, epoll 就会多次就绪,直到把数据全部取出。

现在我们使用边缘触发看看效果:

//读端
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 2);printf("waiting for connect\n");int fdr = open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, "open fdr error");int flags = fcntl(fdr, F_GETFL, 0);ERROR_CHECK(flags, -1, "fcntl:get");int ret = fcntl(fdr, F_SETFL, flags | O_NONBLOCK);ERROR_CHECK(ret, -1, "fcntl:set");printf("connected\n");int epfd = epoll_create(1);ERROR_CHECK(epfd, -1, "epoll_create");struct epoll_event ev, evs[2];ev.data.fd = fdr; ev.events = EPOLLIN | EPOLLET;ret = epoll_ctl(epfd, EPOLL_CTL_ADD, fdr, &ev);ERROR_CHECK(ret, -1, "epoll_ctl: fdr");char buf[5];int readyNum = 0;while (1){readyNum = epoll_wait(epfd, evs, 2, -1);printf("epoll wait is ready\n");for(int i = 0; i < readyNum; i++){if(evs[i].data.fd == fdr){memset(buf, 0, sizeof(buf));int sret = read(fdr, buf, sizeof(buf));if(sret == 0){printf("B is disconnected\n");return 0;}printf("B:%s\n",buf);}}}return 0;
}

输出结果:

//读端
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./ETA1 1.pipe
waiting for connect
connected
epoll wait is ready
B:hello
epoll wait is ready
B:, I a
//写端
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./ETB1 1.pipe
waiting for connect
connected
hello, I am B, what is your name?    
can you hear me?

 可以看到,使用边缘触发时,无论读端是否有数据,epoll 只会在每次收到数据时就绪一次。即使端没有一次性读取全部数据,也要等待下一次收到数据时才能再读取数据。

那么如何在缓冲区一次无法接收全部数据时进行多次读取呢?这个时候我们可以使用循环,效果如下:

while (1){readyNum = epoll_wait(epfd, evs, 2, -1);printf("epoll wait is ready\n");for(int i = 0; i < readyNum; i++){if(evs[i].data.fd == fdr){while(1){memset(buf, 0, sizeof(buf));int sret = read(fdr, buf, sizeof(buf));if(sret == 0 || sret == -1){printf("finish\n");break;}printf("B:%s\n",buf);}}}}

输出结果:

ubuntu@ubuntu:~/MyProject/Linux/IO$ ./ETA1 1.pipe
waiting for connect
connected
epoll wait is ready
B:hello
B:, I a
B:m B, 
B:what 
B:is yo
B:ur na
B:me?finish

这样,我们就可以在边缘触发下一次性读取全部数据。

以下是我在使用边缘触发读取数据的几点疑问:

为什么使用边缘触发时要把文件描述符设置为非阻塞的?

我们把文件描述符设置为阻塞模式看看效果:

int fdr = open(argv[1], O_RDONLY);
ERROR_CHECK(fdr, -1, "open fdr error");
int flags = fcntl(fdr, F_GETFL, 0);
ERROR_CHECK(flags, -1, "fcntl:get");
//int ret = fcntl(fdr, F_SETFL, flags | O_NONBLOCK);
//ERROR_CHECK(ret, -1, "fcntl:set");
printf("connected\n");

 输出结果:

//读端
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./ETA1 1.pipe
waiting for connect
connected
epoll wait is ready
B:hello, I am B, what 
B:is your name?B:hello, I am B, what 
B:is your name?
//写端
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./ETB1 1.pipe
waiting for connect
connected
hello, I am B, what is your name?
hello, I am B, what is your name?

可以看到,在发送两次消息时,epoll只就绪了一次,这明显是有问题的 !原因如下:

当 read() 读取的数据量少于缓冲区大小时,程序无法区分是数据已读完(需等待新事件)还是数据未读完(需继续读)。若此时继续调用阻塞的 read(),它会一直等待新数据到来,导致线程阻塞。这导致其他文件描述符的事件得不到处理(饥饿),且当前描述符的后续事件可能丢失(因为状态未再次变化)。

ET 模式要求程序在收到事件后必须一次性处理完所有数据(直到返回 EAGAIN)。非阻塞模式的 read()/write() 在数据不足时会立即返回 EAGAIN 或 EWOULDBLOCK,程序可据此安全停止读取。

阻塞模式下,若最后一次 read() 时内核缓冲区数据恰好读完,调用会阻塞线程,直到新数据到来。非阻塞模式确保 read() 总是立即返回,避免线程意外阻塞。

结论:ET 必须非阻塞,LT 可容忍阻塞(但仍推荐非阻塞)

什么时候时候可以用ET,什么时候可以用LT?

从之前的输出结果,我们可以看到二者一个很明显的区别就是,使用ET时可以极大的减少通知次数,减少 epoll_wait() 的返回次数(仅在状态变化时触发),降低系统调用开销。在有成千上万连接的高并发情况下,减少通知次数可以有效缓解服务器处理请求的压力。把后续处理任务交给线程自行处理,能够更好的响应其他的连接,而不是一直把精力耗费在单个连接上。

由此,我们便很容易发现二者之间的区别和优势:

边缘触发(ET)的优势

更高的性能潜力

        减少 epoll_wait() 的返回次数(仅在状态变化时触发),降低系统调用开销。

        适合高并发场景(如 >10k 连接),能显著减少 CPU 占用。

避免重复事件风暴

        对高频事件(如套接字持续可写)更友好,不会因状态未变化而重复通知。

更精细的控制

强制要求程序一次性处理所有数据,避免逻辑分散。

水平触发(LT)的优势

编程简单可靠

        允许分批处理数据(例如一次 read() 部分数据),未处理完的事件会持续触发。

        不易遗漏事件,适合快速开发。

行为可预测

        与传统 select/poll 行为一致,迁移成本低。

        对异常情况(如未处理完数据)更宽容。

资源友好

        适合低频或突发流量场景(如 HTTP 短连接),不会因单次未处理完而卡死。

其次,epoll出现的时间较晚,它生而就是为高并发而生的。最初只支持边缘触发,算是一个历史遗留问题,所以在使用epoll时使用边缘触发更常见。

何时更适合使用边缘触发(ET)?

高性能服务器

        需要处理 >10k 并发连接(如游戏服务器、交易所系统)。

        例如:WebSocket 长连接服务,ET 能减少可写事件的重复通知。

需避免事件风暴的场景

        监听大量持续可写的套接字(如日志广播服务),LT 会频繁通知,而 ET 仅在缓冲区从满变为非满时通知一次。

精细控制数据吞吐

        需要最大化单次 I/O 效率的场景(如文件传输服务),配合非阻塞 I/O 一次性读写完整数据块。

延迟敏感型应用

        金融交易系统等低延迟场景,ET 减少内核到用户态的事件传递次数。

何时更适合水平触发(LT)?

开发效率优先的应用

        原型开发、内部工具等,LT 的简单性可降低调试成本。

低频 I/O 场景

        命令行工具、低频数据采集服务(如传感器上报)。

需要兼容旧代码

        从 select/poll 迁移到 epoll 时,LT 行为一致,兼容性更好

对吞吐要求不极端

        普通 Web 服务器(如 Nginx 默认使用 ET,但 Apache 可选 LT)。

超时处理

超时机制是IO多路复用中的一个重要功能,它允许程序在等待IO事件时设置一个时间限制,防止程序无限期地阻塞。实际应用中,程序可能需要在等待IO事件的同时执行其他任务,或者在超时后采取某种默认行为。

  • 在网络编程中,客户端可能需要在一定时间内等待服务器响应,超时后重试或断开连接。

  • 在多任务环境中,程序可能需要在等待IO事件的同时处理其他任务。

一个典型的例子就是游戏中的挂机党,当游戏服务器中存在大量的挂机玩家会严重占用资源,也会影响其他正常玩家的游戏体验。使用超时机制可以在规定时间内清除无响应的玩家,使资源平衡到其他正常玩家中。

在设置IO多路复用的超时机制时,需要传入一个时间结构体,用于设置超时时间,可以精确到微秒级别

struct timeval {time_t      tv_sec;   // 秒数suseconds_t tv_usec;  // 微秒数(1秒 = 1,000,000 微秒)
};

selcet示例:

#include<54func.h>int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf("waiting for connect\n");int fdr = open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, "open fdr error");int fdw = open(argv[2], O_RDWR);ERROR_CHECK(fdr, -1, "open fdw error");printf("connected\n");char buf[1024];fd_set rdset;while (1){FD_ZERO(&rdset);FD_SET(STDIN_FILENO, &rdset);FD_SET(fdr, &rdset);struct timeval timeout;timeout.tv_sec = 3;timeout.tv_usec = 0;int ret = select(fdr+1, &rdset, NULL, NULL, &timeout);if(ret == 0){printf("timeout! disconnect\n");break;}if(FD_ISSET(STDIN_FILENO, &rdset)){memset(buf, 0, sizeof(buf));int ret = read(STDIN_FILENO, buf, sizeof(buf));ERROR_CHECK(ret, -1, "STDIN ERROR");write(fdw, buf, strlen(buf));}if(FD_ISSET(fdr, &rdset)){memset(buf, 0, sizeof(buf));int ret = read(fdr, buf, sizeof(buf));ERROR_CHECK(ret, -1, "STDIN ERROR");if(ret == 0){printf("B is disconnected\n");break;}printf("B:%s", buf);}}return 0;
}
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./timeoutA 1.pipe
waiting for connect
connected
timeout! disconnect

对于有多个用户同时进行连接时,仅向select里面放一个timeout,因为每收到一个消息,计时器就会重新计时,无法做到超时踢出的效果。这个时候我们可以改变一下思路,可以设置一个本地每秒钟都会响应的计时器,并存储上一次活动的时间。每当计时器响应时,就检查当前时间与上一次的差值是否超过规定时间,超过就会自动下线。代码实现如下:

#include<54func.h>int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf("waiting for connect\n");int fdr = open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, "open fdr error");int fdw = open(argv[2], O_RDWR);ERROR_CHECK(fdr, -1, "open fdw error");printf("connected\n");char buf[1024];fd_set rdset;time_t curtime = time(NULL);time_t lastactive = time(NULL);while (1){FD_ZERO(&rdset);FD_SET(STDIN_FILENO, &rdset);FD_SET(fdr, &rdset);struct timeval timeout;timeout.tv_sec = 1;timeout.tv_usec = 0;int ret = select(fdr+1, &rdset, NULL, NULL, &timeout);curtime = time(NULL);if(FD_ISSET(STDIN_FILENO, &rdset)){memset(buf, 0, sizeof(buf));int ret = read(STDIN_FILENO, buf, sizeof(buf));ERROR_CHECK(ret, -1, "STDIN ERROR");write(fdw, buf, strlen(buf));lastactive = time(NULL);}if(FD_ISSET(fdr, &rdset)){memset(buf, 0, sizeof(buf));int ret = read(fdr, buf, sizeof(buf));ERROR_CHECK(ret, -1, "STDIN ERROR");if(ret == 0){printf("B is disconnected\n");break;}printf("B:%s", buf);}if(curtime - lastactive > 3){printf("timeout! disconnect\n");break;}}return 0;
}


文章转载自:
http://boll.hnsdj.cn
http://apellation.hnsdj.cn
http://aspartate.hnsdj.cn
http://boxlike.hnsdj.cn
http://assayer.hnsdj.cn
http://autocratic.hnsdj.cn
http://aircraftman.hnsdj.cn
http://architecturally.hnsdj.cn
http://basipetal.hnsdj.cn
http://blowmobile.hnsdj.cn
http://blandly.hnsdj.cn
http://amadis.hnsdj.cn
http://charisma.hnsdj.cn
http://atomise.hnsdj.cn
http://certiorari.hnsdj.cn
http://authenticator.hnsdj.cn
http://awhirl.hnsdj.cn
http://catskinner.hnsdj.cn
http://browse.hnsdj.cn
http://chernozem.hnsdj.cn
http://beeves.hnsdj.cn
http://brach.hnsdj.cn
http://acolyte.hnsdj.cn
http://bartizan.hnsdj.cn
http://antrum.hnsdj.cn
http://beagle.hnsdj.cn
http://ashler.hnsdj.cn
http://bregma.hnsdj.cn
http://burns.hnsdj.cn
http://aeroamphibious.hnsdj.cn
http://www.dtcms.com/a/280368.html

相关文章:

  • 静态补丁脚本 - 修改 libtolua.so
  • Unity音游开发全指南:模板与免费资源高效构建节奏游戏
  • Ubuntu 22.04 安装 mysql-server服务端
  • docker拉取nacos镜像失败
  • golang语法-----标准化输入输出
  • 渗透测试技术_Nessus工具(三):输出报告
  • 构建 Go 可执行文件镜像 | 探索轻量级 Docker 基础镜像(我应该选择哪个 Docker 镜像?)
  • STM32小实验三--让蜂鸣器响起来
  • Pytorch中张量的索引和切片使用详解和代码示例
  • CSS的初步学习
  • 用语音识别芯片驱动TFT屏幕还有链接蓝牙功能?
  • cursor使用mcp连接mysql数据库,url方式
  • java截取视频帧
  • c#进阶之数据结构(字符串篇)----String
  • C++中list各种基本接口的模拟实现
  • 【Java代码审计(2)】MyBatis XML 注入审计
  • 153.在 Vue 3 中使用 OpenLayers + Cesium 实现 2D/3D 地图切换效果
  • java中的接口
  • JavaScript 动态访问嵌套对象属性问题记录
  • HarmonyOS-ArkUI: Web组件加载流程1
  • 暴力破解:攻破系统的终极密钥
  • Rust指针选择
  • 安装带GPU的docker环境
  • 20250715使用荣品RD-RK3588开发板在Android13下接入USB3.0接口的红外相机
  • 【I3D 2024】Deblur-GS: 3D Gaussian Splatting from Camera Motion Blurred Images
  • 记录一条面试sql题目
  • JS中async/await功能介绍和使用演示
  • 普通字符类型和new String有什么区别
  • 使用JS编写动态表格
  • 【env环境】rtthread5.1.0使用fal组件