I/O 多路复用
I/O 多路复用对比
- 前言
- 一、select
- 1.理解 select 执行过程
- 2.select数据结构
- 3.select 的内核级实现机制
- 4.select特点
- 5.代码示例
- 6.总结
- 二、poll
- 1. poll 的工作核心流程:
- 2.poll 相比 select 的优势
- 3.poll 的性能瓶颈
- 4.代码示例
- 5 总结
- 三、epoll
- 1.`epoll 的三大核心系统调用:`
- 2.epoll的工作流程
- 3. epoll 内核级工作机制
- 4.epoll 的优点
- 5.epoll 的触发模式
- 6. select、poll 与 epoll 性能对比
- 7. epoll 的高级特性
- 8.代码示例
- 9.总结
前言
- 什么是 select?
select 是 I/O 多路复用 的一种机制,最早在 BSD Unix 中引入,用于监控多个文件描述符(FD)的 I/O 状态变化。它允许程序同时监听多个 FD,以判断哪些可以进行读、写或异常处理。
2.什么是poll?
poll 是一种 I/O 多路复用 技术,常用于处理大量并发连接,尤其在网络编程中广泛应用。它的设计目的是改进 select 的一些限制,尤其在处理大量文件描述符时更具优势。
3.什么是epoll?
epoll 是 Linux 内核 2.6 版本引入的 高效 I/O 多路复用机制,相较于传统的 select 和 poll,它在 处理大量文件描述符(FD) 时具有更高的性能和扩展性。
socket 就绪条件
在Linux I/O多路复用(如select/poll/epoll)中,读写事件就绪指的是文件描述符(fd)的状态满足非阻塞读写的条件。
读就绪
• socket 内核中, 接收缓冲区中的字节数, 大于等于低水位标记
SO_RCVLOWAT. 此时可以无阻塞的读该文件描述符, 并且返回值大于 0;
• socket TCP 通信中, 对端关闭连接, 此时对该 socket 读, 则返回 0;
• 监听的 socket 上有新的连接请求;
• socket 上有未处理的错误;
写就绪
• socket 内核中, 发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小), 大于
等于低水位标记 SO_SNDLOWAT, 此时可以无阻塞的写, 并且返回值大于 0;
• socket 的写操作被关闭(close 或者 shutdown). 对一个写操作被关闭的 socket
进行写操作, 会触发 SIGPIPE 信号;
• socket 使用非阻塞 connect 连接成功或失败之后;
• socket 上有未读取的错误;
异常就绪
• socket 上收到带外数据. 关于带外数据, 和 TCP 紧急模式相关(回忆 TCP 协议
头中, 有一个紧急指针的字段)
一、select
函数原型:
#include <sys/select.h>int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
参数解释:
•nfds:监听的 FD 集合中 最大 FD + 1,因为 FD 是从 0 开始的。
⚠️ 限制: 通常 1024(可调整 FD_SETSIZE)。
•readfds: 监控是否可读的 FD 集合。
•writefds: 监控是否可写的 FD 集合。
•exceptfds: 监控异常状态的 FD 集合(如带外数据)。
•timeout: 超时时间,控制阻塞行为。
NULL:无限期阻塞。
0:立即返回,非阻塞。
0:阻塞等待指定时间。
返回值:
•>0: 表示就绪的 FD 数量。
•0: 超时,没有 FD 就绪。
•-1: 发生错误(如信号中断)
1.理解 select 执行过程
select 内核实现基于 线性扫描(O(N)),通过不断遍历 FD 集合,检测每个 FD 是否就绪。
工作流程概述:
拷贝用户态的 FD 集合到内核态。
内核遍历每个 FD,检查其状态(是否可读/可写/异常)。
将已就绪的 FD 标记为可用,返回给用户态。
应用程序处理就绪的 FD,然后重新调用 select 进入下一轮检测。
// 1. 创建 FD 集合
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(sockfd, &readfds);// 2. 设置超时时间
struct timeval timeout;
timeout.tv_sec = 5; // 5 秒超时
timeout.tv_usec = 0;// 3. 调用 select 等待事件
int ret = select(sockfd + 1, &readfds, NULL, NULL, &timeout);if (ret > 0) {if (FD_ISSET(sockfd, &readfds)) {// sockfd 可读,进行处理}
} else if (ret == 0) {// 超时处理
} else {// 错误处理
}
2.select数据结构
fd_set:文件描述符集合
fd_set 是一个 位图(bitmap),每一位代表一个 FD 的状态(0 表示未监听,1 表示监听)。
常用宏:
FD_ZERO(fd_set *set):清空集合。
FD_SET(int fd, fd_set *set):将 FD 加入集合。
FD_CLR(int fd, fd_set *set):从集合中移除 FD。
FD_ISSET(int fd, fd_set *set):检测 FD 是否可用。
3.select 的内核级实现机制
(1)FD 集合的拷贝
用户态的 FD 集合通过 copy_from_user() 复制到内核态。
内核会在 select 期间修改这个集合,标记哪些 FD 已就绪。
2)内核遍历
内核调用 do_select(),对 FD 逐一检查。
每个 FD 都需要调用设备驱动的 poll 或 f_op->poll() 方法,检测其状态。
(3)阻塞 & 唤醒
如果没有 FD 就绪,进程会被挂起(睡眠状态)。
当 I/O 事件发生时,内核通过 wake_up() 唤醒进程,重新检查 FD 集合。
(4)返回结果
将已就绪的 FD 标记为可用,再通过 copy_to_user() 复制回用户态。
应用程序根据 FD_ISSET 检测哪些 FD 可用。
4.select特点
• 可监控的文件描述符个数取决于 sizeof(fd_set)的值. 我这边服务器上sizeof(fd_set)= 512, 每 bit 表示一个文件描述符, 则我服务器上支持的最大文件描述符是 512*8=4096.
• 将 fd 加入 select 监控集的同时, 还要再使用一个数据结构 array 保存放到 select
监控集中的 fd,
○ 一是用于再 select 返回后, array 作为源数据和 fd_set 进行 FD_ISSET 判断。
○ 二是 select 返回后会把以前加入的但并无事件发生的 fd 清空, 则每次开始
select 前都要重新从 array 取得 fd 逐一加入(FD_ZERO 最先), 扫描 array 的同时
取得 fd 最大值 maxfd, 用于 select 的第一个参数。
sselect 的性能瓶颈
1.FD 数量限制:
默认最多支持 1024 个 FD(可通过修改内核参数扩大)。
每次调用 select 都需要传递完整的 FD 集合,效率低下。
2.线性扫描(O(N))复杂度:
无论是否有事件发生,内核都要遍历所有 FD。
随着连接数增加,CPU 开销显著上升。
3.每次调用需要重复拷贝 FD 集合:
用户态和内核态频繁数据拷贝,增加了额外的系统开销。
5.代码示例
#pragma once#include <iostream>
#include <memory>
#include <unistd.h>
#include "Socket.hpp"
#include "Log.hpp"using namespace SocketModule;
using namespace LogModule;class SelectServer
{const static int size = sizeof(fd_set) * 8;const static int defaultfd = -1;public:SelectServer(int port) : _listensock(std::make_unique<TcpSocket>()), _isrunning(false){_listensock->BuildTcpSocketMethod(port);for (int i = 0; i < size; i++)_fd_array[i] = defaultfd;_fd_array[0] = _listensock->Fd();}void Start(){_isrunning = true;while (_isrunning){// 因为: listensockfd,也是一个fd,进程怎么知道listenfd上面有新连接到来了呢?// auto res = _listensock->Accept(); // 我们在select这里,可以进行accept吗?// 将listensockfd添加到select内部,让OS帮我关心listensockfd上面的读事件fd_set rfds; // 定义fds集合FD_ZERO(&rfds); // 清空fdsint maxfd = defaultfd;for (int i = 0; i < size; i++){if (_fd_array[i] == defaultfd)continue;// 1. 每次select之前,都要对rfds进行重置!FD_SET(_fd_array[i], &rfds);// 2. 最大fd,一定是变化的if (maxfd < _fd_array[i]){maxfd = _fd_array[i]; // 更新出最大fd}}PrintFd();// struct timeval timeout = {0, 0};// select 返回之后,你怎么还知道哪些fd需要被添加到rfds,让select关心呢?// 所以:select要进行完整的设计,需要借助一个辅助数组!保存服务器历史获取过的所有的fd// rfds: 1111 1111// select负责事件就绪检测int n = select(maxfd + 1, &rfds, nullptr, nullptr, nullptr);// rfds: 0000 0000switch (n){case -1:LOG(LogLevel::ERROR) << "select error";break;case 0:LOG(LogLevel::INFO) << "time out...";break;default:// 有事件就绪,就不仅仅是新连接到来了吧?读事件就绪啊?LOG(LogLevel::DEBUG) << "有事件就绪了..., n : " << n;Dispatcher(rfds); // 处理就绪的事件啊!break;}}_isrunning = false;}// 事件派发器void Dispatcher(fd_set &rfds /*, fd_set &wfds*/){// 就不仅仅是新连接到来了吧?读事件就绪啊? // 指定的文件描述符,在rfds里面,就证明该fd就绪了for (int i = 0; i < size; i++){if (_fd_array[i] == defaultfd)continue;// fd合法,不一定就绪if (FD_ISSET(_fd_array[i], &rfds)){// fd_array[i] 上面一定是读就绪了// listensockfd 新连接到来,也是读事件就绪啊// sockfd 数据到来,读事件就绪啊if (_fd_array[i] == _listensock->Fd()){// listensockfd 新连接到来Accepter();}else{// 普通的读事件就绪Recver(_fd_array[i], i);}}// if (FD_ISSET(fd_array[i], &wfds))// {// // fd_array[i] 上面一定是读就绪了// }}}// 链接管理器void Accepter(){InetAddr client;int sockfd = _listensock->Accept(&client); // accept会不会阻塞?if (sockfd >= 0){// 获取新链接到来成功, 然后呢??能不能直接// read/recv(), sockfd是否读就绪,我们不清楚// 只有谁最清楚,未来sockfd上是否有事件就绪?select!// 将新的sockfd,托管给select!// 如何托管? 将新的fd放入辅助数组!LOG(LogLevel::INFO) << "get a new link, sockfd: "<< sockfd << ", client is: " << client.StringAddr();int pos = 0;for (; pos < size; pos++){if (_fd_array[pos] == defaultfd)break;}if (pos == size){LOG(LogLevel::WARNING) << "select server full";close(sockfd);}else{_fd_array[pos] = sockfd;}}}// IO处理器void Recver(int fd, int pos){char buffer[1024];// 我在这里读取的时候,会不会阻塞?ssize_t n = recv(fd, buffer, sizeof(buffer)-1, 0); // recv写的时候有bug吗?if(n > 0){buffer[n] = 0;std::cout << "client say@ "<< buffer << std::endl;}else if(n == 0){LOG(LogLevel::INFO) << "clien quit...";// 1. 不要让select在关系这个fd了_fd_array[pos] = defaultfd;// 2. 关闭fdclose(fd);}else {LOG(LogLevel::ERROR) << "recv error";// 1. 不要让select在关系这个fd了_fd_array[pos] = defaultfd;// 2. 关闭fdclose(fd);}}void PrintFd(){std::cout << "_fd_array[]: ";for (int i = 0; i < size; i++){if (_fd_array[i] == defaultfd)continue;std::cout << _fd_array[i] << " ";}std::cout << "\r\n";}void Stop(){_isrunning = false;}~SelectServer(){}private:std::unique_ptr<Socket> _listensock;bool _isrunning;int _fd_array[size];
};
6.总结
•select 是 I/O 多路复用的鼻祖,简单但存在性能瓶颈,适用于小规模连接场景。
•在高并发场景中,推荐使用 epoll 或 kqueue 等更高效的机制。
•理解 select 的底层机制,有助于深入理解网络编程和 I/O 模型的本质。
二、poll
作用: 检测一组文件描述符(FD)是否就绪(可读、可写或有异常)。
系统调用:
#include <sys/poll.h>int poll(struct pollfd *fds, nfds_t nfds, int timeout);
参数解释:
• fds 是一个 poll 函数监听的结构列表. 每一个元素中, 包含了三部分内容: 文件描
述符, 监听的事件集合, 返回的事件集合.
• nfds 表示 fds 数组的长度.
• timeout 表示 poll 函数的超时时间, 单位是毫秒(ms).
返回值:
• 返回值小于 0, 表示出错;
• 返回值等于 0, 表示 poll 函数等待超时;
• 返回值大于 0, 表示 poll 由于监听的文件描述符就绪而返回
pollfd 结构体:
struct pollfd
{
int fd; // 需要监控的文件描述符
short events; // 关注的事件(如 POLLIN 可读,POLLOUT 可写)
short revents; // 实际发生的事件(内核填充)
};
事件
1. poll 的工作核心流程:
1.准备阶段:
用户空间将需要监控的文件描述符及其事件(如 POLLIN、POLLOUT)放入 pollfd 数组,调用 poll()。
2.内核检查:
内核遍历 pollfd 数组,检查每个文件描述符的状态。
已就绪: 立即返回,更新 revents 字段。
未就绪: 将当前进程加入等待队列(sleep),直到有事件发生或超时。
3.等待事件:
文件描述符状态改变时,内核唤醒进程。
超时(timeout)或被信号打断时也会唤醒进程。
4.返回结果:
内核更新 revents,用户空间检查每个文件描述符的状态。
#include <stdio.h>
#include <poll.h>
#include <unistd.h>int main() {struct pollfd fds[1];fds[0].fd = STDIN_FILENO; // 监控标准输入fds[0].events = POLLIN; // 关注可读事件printf("等待输入 (5秒超时):\n");int ret = poll(fds, 1, 5000); // 5秒超时if (ret == -1) {perror("poll 出错");} else if (ret == 0) {printf("超时,没有输入。\n");} else {if (fds[0].revents & POLLIN) {char buffer[100];read(STDIN_FILENO, buffer, sizeof(buffer));printf("读取到输入:%s\n", buffer);}}return 0;
}
2.poll 相比 select 的优势
•文件描述符不受限制: poll 不受 FD_SETSIZE 限制,select 通常最多支持 1024 个描述符。
•灵活的事件注册: poll 允许更灵活地监听多种事件。
•代码更简洁: pollfd 数组替代了复杂的 fd_set 操作。
3.poll 的性能瓶颈
尽管 poll 优于 select,但仍存在一些性能问题:
•线性扫描: 内核仍需遍历整个 pollfd 数组,O(N) 复杂度。
•重复传输: 每次调用 poll 都要重新传入 pollfd 数组。
•唤醒风暴: 大量就绪事件时,可能会导致不必要的 CPU 消耗。
4.代码示例
#pragma once#include <iostream>
#include <memory>
#include <unistd.h>
#include <sys/poll.h>
#include "Socket.hpp"
#include "Log.hpp"using namespace SocketModule;
using namespace LogModule;class PollServer
{const static int size = 4096;const static int defaultfd = -1;public:PollServer(int port) : _listensock(std::make_unique<TcpSocket>()), _isrunning(false){_listensock->BuildTcpSocketMethod(port);for (int i = 0; i < size; i++){_fds[i].fd = defaultfd;_fds[i].events = 0;_fds[i].revents = 0;}_fds[0].fd = _listensock->Fd();_fds[0].events = POLLIN;}void Start(){int timeout = -1;_isrunning = true;while (_isrunning){PrintFd();int n = poll(_fds, size, timeout);// rfds: 0000 0000switch (n){case -1:LOG(LogLevel::ERROR) << "poll error";break;case 0:LOG(LogLevel::INFO) << "poll time out...";break;default:// 有事件就绪,就不仅仅是新连接到来了吧?读事件就绪啊?LOG(LogLevel::DEBUG) << "有事件就绪了..., n : " << n;Dispatcher(); // 处理就绪的事件啊!break;}}_isrunning = false;}// 事件派发器void Dispatcher(){// 就不仅仅是新连接到来了吧?读事件就绪啊? // 指定的文件描述符,在rfds里面,就证明该fd就绪了for (int i = 0; i < size; i++){if (_fds[i].fd == defaultfd)continue;// fd合法,不一定就绪if (_fds[i].revents & POLLIN){// fd_array[i] 上面一定是读就绪了// listensockfd 新连接到来,也是读事件就绪啊// sockfd 数据到来,读事件就绪啊if (_fds[i].fd == _listensock->Fd()){// listensockfd 新连接到来Accepter();}else{// 普通的读事件就绪Recver(i);}}// else if(_fds[i].revents & POLLOUT)// {}}}// 链接管理器void Accepter(){InetAddr client;int sockfd = _listensock->Accept(&client); // accept会不会阻塞?if (sockfd >= 0){// 获取新链接到来成功, 然后呢??能不能直接// read/recv(), sockfd是否读就绪,我们不清楚// 只有谁最清楚,未来sockfd上是否有事件就绪?select!// 将新的sockfd,托管给select!// 如何托管? 将新的fd放入辅助数组!LOG(LogLevel::INFO) << "get a new link, sockfd: "<< sockfd << ", client is: " << client.StringAddr();int pos = 0;for (; pos < size; pos++){if (_fds[pos].fd == defaultfd)break;}if (pos == size){LOG(LogLevel::WARNING) << "poll server full";close(sockfd);}else{_fds[pos].fd = sockfd;_fds[pos].events = POLLIN;_fds[pos].revents = 0;}}}// IO处理器void Recver(int pos){char buffer[1024];// 我在这里读取的时候,会不会阻塞?ssize_t n = recv(_fds[pos].fd, buffer, sizeof(buffer) - 1, 0); // recv写的时候有bug吗?if (n > 0){buffer[n] = 0;std::cout << "client say@ " << buffer << std::endl;}else if (n == 0){LOG(LogLevel::INFO) << "clien quit...";// 2. 关闭fdclose(_fds[pos].fd);// 1. 不要让select在关系这个fd了_fds[pos].fd = defaultfd;_fds[pos].events = 0;_fds[pos].revents = 0;}else{LOG(LogLevel::ERROR) << "recv error";// 2. 关闭fdclose(_fds[pos].fd);// 1. 不要让select在关系这个fd了_fds[pos].fd = defaultfd;_fds[pos].events = 0;_fds[pos].revents = 0;}}void PrintFd(){std::cout << "_fds[]: ";for (int i = 0; i < size; i++){if (_fds[i].fd == defaultfd)continue;std::cout << _fds[i].fd << " ";}std::cout << "\r\n";}void Stop(){_isrunning = false;}~PollServer(){}private:std::unique_ptr<Socket> _listensock;bool _isrunning;struct pollfd _fds[size];// struct pollfd *_fds;
};
5 总结
•poll 是 select 的改进版,解决了文件描述符数量受限的问题。
•核心问题仍是 O(N) 的遍历复杂度,在高并发场景下性能不佳。
•在更高效的场景下,可以使用 epoll 替代 poll,降低系统调用开销。
三、epoll
1.epoll 的三大核心系统调用:
系统调用 | 作用 |
---|---|
int epoll_create(int size); | 创建一个 epoll 的句柄.•自从 linux2.6.8 之后, size 参数是被忽略的.• 用完之后, 必须调用 close()关闭 |
int epoll_ctl(int epfd, int op, int fd, struct epoll_event*event); | 控制接口,添加、修改、删除监听的文件描述符 |
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout); | 等待 I/O 事件的发生(阻塞或非阻塞) |
关于epoll_ctl接口介绍:
参数解释:
• 第一个参数是 epoll_create()的返回值(epoll 的句柄).
• 第二个参数表示动作, 用三个宏来表示.
• 第三个参数是需要监听的 fd.
• 第四个参数是告诉内核需要监听什么事
第二个参数的取值:
• EPOLL_CTL_ADD: 注册新的 fd 到 epfd 中;
• EPOLL_CTL_MOD: 修改已经注册的 fd 的监听事件;
• EPOLL_CTL_DEL: 从 epfd 中删除一个 fd;
struct epoll_event 结构如下:
events 可以是以下几个宏的集合:
• EPOLLIN : 表示对应的文件描述符可以读 (包括对端 SOCKET 正常关闭);
• EPOLLOUT : 表示对应的文件描述符可以写;
• EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外
数据到来);
• EPOLLERR : 表示对应的文件描述符发生错误;
• EPOLLHUP : 表示对应的文件描述符被挂断;
• EPOLLET : 将 EPOLL 设为边缘触发(Edge Triggered)模式, 这是相对于水平
触发(Level Triggered)来说的.
• EPOLLONESHOT: 只监听一次事件, 当监听完这次事件之后, 如果还需要继
续监听这个 socket 的话, 需要再次把这个 socket 加入到 EPOLL 队列里.
返回值:
成功epoll_ctl()返回0。错误返回-1。
关于epoll_wait接口介绍:
参数解释:
• 参数 events 是分配好的 epoll_event 结构体数组.
• epoll 将会把发生的事件赋值到 events 数组中 (events 不可以是空指针, 内核
只负责把数据复制到这个 events 数组中, 不会去帮助我们在用户态中分配内存).
• maxevents 告之内核这个 events 有多大, 这个 maxevents 的值不能大于创建
epoll_create()时的 size.
• 参数 timeout 是超时时间 (毫秒, 0 会立即返回, -1 是永久阻塞).
返回值:
• 如果函数调用成功, 返回对应 I/O 上已准备好的文件描述符数目, 如返回 0 表
示已超时, 返回小于 0 表示函数失败.
2.epoll的工作流程
1.创建 epoll 实例:
int epfd = epoll_create(0);
2.注册文件描述符:
struct epoll_event event;
event.events = EPOLLIN; // 监听可读事件
event.data.fd = sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &event);
3.等待事件:
struct epoll_event events[10];
int nfds = epoll_wait(epfd, events, 10, -1); // 阻塞等待
//性能关键点: epoll_wait 只返回就绪的 FD,避免了大规模 FD 的遍历。
4.处理事件:
for (int i = 0; i < nfds; i++) {if (events[i].events & EPOLLIN) {// 处理可读事件}
}
5.关闭 epoll:
close(epfd);
3. epoll 内核级工作机制
epoll 主要基于 红黑树(RB-Tree) 和 就绪链表(Ready List) 的数据结构实现。
内部核心结构:
1.红黑树(RB-Tree):
存储所有被 epoll_ctl 注册的文件描述符。
具备快速的增删查改能力(O(log N) 复杂度)。
2.就绪链表(Ready List):
保存已经就绪的文件描述符。
只有发生事件的 FD 才会被添加,避免不必要的遍历。
(1)注册阶段: epoll_ctl
将 FD 添加到内核的红黑树中。
注册的事件类型保存在 epitem 结构中。
(2)事件检测阶段:epoll_wait
LT 模式: 内核轮询已注册的 FD,找到所有就绪事件。
ET 模式: 内核只在状态变化时通知用户态,避免重复通知。
(3)事件处理阶段
内核将就绪的 FD 放入就绪链表,epoll_wait 返回后,应用程序遍历处理。
4.epoll 的优点
•O(1) 事件获取:epoll_wait 直接遍历就绪队列(无需遍历所有 fd)。
回调驱动:仅当 socket 事件发生时调用回调,避免轮询。
• 没有数量限制: 文件描述符数目无上限。
•内核态数据结构:红黑树管理 fd,就绪队列维护事件,减少用户-内核切换。
5.epoll 的触发模式
1️⃣水平触发(Level Trigger,LT) - 默认模式
•只要 FD 处于就绪状态,epoll_wait 每次都会返回。
•适用场景: 兼容性好,逻辑简单,适合大多数场景。
示例:
event.events = EPOLLIN; // 默认 LT 模式
2️⃣ 边缘触发(Edge Trigger,ET)
•只有状态从未就绪到就绪时,才会触发通知。(从无到有)
•如果未处理完所有数据,后续不会再次通知,除非有新数据到来。(从有到多)
•适用场景: 高性能网络服务器,减少系统调用次数。
示例:
event.events = EPOLLIN | EPOLLET; // 启用 ET 模式
⚠️ 注意: 使用 ET 模式时,必须使用非阻塞 I/O,否则容易导致死锁或数据丢失。
ET VS LT
倒逼程序猿把缓冲区数据一次性全读完(如何做到呢?循环读取),(为什么呢?)
6. select、poll 与 epoll 性能对比
7. epoll 的高级特性
✅ (1)EPOLLONESHOT
•事件触发后,自动将 FD 从 epoll 列表中移除,需手动重新注册。
•适用于多线程场景,防止同一个 FD 被多个线程处理。
event.events = EPOLLIN | EPOLLONESHOT;
✅ (2)EPOLLEXCLUSIVE
•防止 “惊群效应”,提高多核服务器下的效率。
•适用于多个 epoll_wait 监听同一 FD 的场景。
惊群效应
event.events = EPOLLIN | EPOLLEXCLUSIVE;
8.代码示例
#pragma once#include <iostream>
#include <memory>
#include <unistd.h>
#include <sys/epoll.h>
#include "Socket.hpp"
#include "Log.hpp"using namespace SocketModule;
using namespace LogModule;// 暂时不做过多解耦
class EpollServer
{const static int size = 64;const static int defaultfd = -1;public:EpollServer(int port) : _listensock(std::make_unique<TcpSocket>()), _isrunning(false), _epfd(defaultfd){// 1. 创建listensocket_listensock->BuildTcpSocketMethod(port); // 3// 2. 创建epoll模型_epfd = epoll_create(256);if (_epfd < 0){LOG(LogLevel::FATAL) << "epoll_create error";exit(EPOLL_CREATE_ERR);}LOG(LogLevel::INFO) << "epoll_create success: " << _epfd; // 4// 3. 将listensocket设置到内核中!struct epoll_event ev; // 有没有设置到内核中,有没有rb_tree中新增节点??没有!!ev.events = EPOLLIN;ev.data.fd = _listensock->Fd(); // TODO : 这里未来是维护的是用户的数据,常见的是fd// ev.data.ptr = _listensock.get();int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock->Fd(), &ev);if (n < 0){LOG(LogLevel::FATAL) << "add listensockfd failed";exit(EPOLL_CTL_ERR);}}void Start(){int timeout = -1;_isrunning = true;while (_isrunning){// 能不能直接accept呢??不能!应该干什么?int n = epoll_wait(_epfd, _revs, size, timeout);switch (n){case 0:LOG(LogLevel::DEBUG) << "timeout...";break;case -1:LOG(LogLevel::ERROR) << "epoll error";break;default:Dispatcher(n);break;}}_isrunning = false;}// 事件派发器void Dispatcher(int rnum){LOG(LogLevel::DEBUG) << "event ready ..."; // LT: 水平触发模式--epoll默认for (int i = 0; i < rnum; i++){// epoll也要循环处理就绪事件--这是应该的,本来就有可能有多个fd就绪!int sockfd = _revs[i].data.fd;uint32_t revent = _revs[i].events;if (revent & EPOLLIN){ // 读事件就绪// listensockfd ready? normal socfd ready??if (sockfd == _listensock->Fd()){// 读事件就绪 && 新连接到来Accepter();}else{// 读事件就绪 && 普通socket可读Recver(sockfd);}}// if(_revs[i].events & EPOLLOUT)// {// 写事件就绪// }}}// 链接管理器void Accepter(){InetAddr client;// 新连接到来 --- 至少有一个连接到来 --- accept一次 --- 绝对不会阻塞int sockfd = _listensock->Accept(&client); // accept会不会阻塞? 0 or 1if (sockfd >= 0){// 获取新链接到来成功, 然后呢??能不能直接// read/recv(), sockfd是否读就绪,我们不清楚// 只有谁最清楚,未来sockfd上是否有事件就绪?select!// 将新的sockfd,托管给select!// 如何托管? 将新的fd放入辅助数组!LOG(LogLevel::INFO) << "get a new link, sockfd: "<< sockfd << ", client is: " << client.StringAddr();// 能不能直接recv??? 不能!!!// 将新的sockfd添加到内核!struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = sockfd;int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sockfd, &ev);if (n < 0){LOG(LogLevel::WARNING) << "add listensockfd failed";}else{LOG(LogLevel::INFO) << "epoll_ctl add sockfd success: " << sockfd;}}}// IO处理器void Recver(int sockfd){char buffer[1024];// 我在这里读取的时候,会不会阻塞? 本次读取,不会被阻塞ssize_t n = recv(sockfd, buffer, sizeof(buffer) - 1, 0); // recv写的时候有bug吗?if (n > 0){buffer[n] = 0;std::cout << "client say@ " << buffer << std::endl;}else if (n == 0){LOG(LogLevel::INFO) << "clien quit...";// 2. 从epoll中移除fd的关心 && 关闭fd -- 细节:epoll_ctl: 只能移除合法fd -- 先移除,在关闭!!int m = epoll_ctl(_epfd, EPOLL_CTL_DEL, sockfd, nullptr);if(m > 0){LOG(LogLevel::INFO) << "epoll_ctl remove sockfd success: " << sockfd;}close(sockfd);}else{LOG(LogLevel::ERROR) << "recv error";// 2. 关闭fdint ret = epoll_ctl(_epfd, EPOLL_CTL_DEL, sockfd, nullptr);if(ret > 0){LOG(LogLevel::INFO) << "epoll_ctl remove sockfd success: " << sockfd;}close(sockfd);}}void Stop(){_isrunning = false;}~EpollServer(){_listensock->Close();if (_epfd > 0)close(_epfd);}private:std::unique_ptr<Socket> _listensock;bool _isrunning;int _epfd;struct epoll_event _revs[size];
};
9.总结
•epoll 是 Linux 高并发服务器开发的核心技术,适合处理上万级别的连接。
•通过事件驱动模型,结合红黑树和就绪链表,epoll 极大提高了 I/O 性能。
•关键优化点: 使用 ET 模式 + 非阻塞 I/O,配合 EPOLLONESHOT 进行多线程优化。
如果你对 多路复用原理 感兴趣,我可以进一步为你解析更深层的实现细节!