【Linux】多路转接epoll
目录
一、poll
1.1 poll函数接口
1.2 poll的优点
1.3 poll的缺点
1.4 poll使用示例:使用poll监控标准输入
二、epoll
2.1 epoll初识
2.2 epoll相关的系统调用
2.3 epoll工作原理
2.4 epoll的优点
2.5 epoll工作方式
2.6 对比LT和ET
2.7 理解ET和非阻塞文件描述符
2.8 epoll使用场景
2.9 epoll示例:epoll服务器(LT模式)
2.10 epoll示例:epoll服务器(ET模式)
一、poll
1.1 poll函数接口
#include <poll.h>int poll(struct pollfd *fds, nfds_t nfds, int timeout);// pollfd结构
struct pollfd {int fd; /* file descriptor */short events; /* requested events */short revents; /* returned events */
};
参数说明:
- fds是一个poll函数监听的结构链表。每一个结构中包含了三部分内容:文件描述符,监听的事件集合,返回的事件集合。
- nfds表示fds数组的长度。
- timeout表示poll函数的超时时间,单位为毫秒(ms)。
events和revents的取值:


返回值:
- 返回值大于0:表示poll用于监听的文件描述符就绪并返回。
- 返回值等于0:表示poll等待超时。
- 返回值小于0:表示出错。
1.2 poll的优点
不同于select使用三个位图来表示三个fd_set的方式,poll使用一个pollfd的指针来实现。
- pollfd结构包含了要监视的event和发生的event,不再使用select “参数 - 值” 传递的方式。接口使用比select更方便。
- poll并没有最大数量的限制(但是数量过大性能也是会下降的)。
1.3 poll的缺点
poll中监听的文件描述符数量增多时:
- 和select函数一样,poll函数返回后,需要轮询pollfd来获取就绪的描述符。
- 每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核态。
- 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的文件描述符数量增多,其效率也会线性下降。
1.4 poll使用示例:使用poll监控标准输入
#include <iostream>
#include <poll.h>
#include <unistd.h>int main()
{struct pollfd pfd;pfd.fd = 0;pfd.events = POLLIN;while(1) {int n = poll(&pfd, 1, 1000);if(n < 0) {perror("poll");}else if(n == 0) {std::cout << "poll timeout" << std::endl;}else {if(pfd.revents == POLLIN) {char buf[1024];int n = read(pfd.fd, buf, sizeof(buf) - 1);buf[n] = 0;std::cout << "stdin: " << buf << std::endl;}}}return 0;
}

二、epoll
2.1 epoll初识
按照man手册的说法:是为了处理大批量句柄而做了改进的poll。
它是在2.5.44内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44)。
它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路IO就绪通知方式。
2.2 epoll相关的系统调用
epoll有三个相关的系统调用。
epoll_create
int epoll_create(int size);
创建一个epoll的句柄
- 自从Linux2.6.8之后,size参数是被忽略的。
- 用完之后,必须调用close()关闭。
epoll_ctl
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注册函数
- 它不同于select是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。
- 第一个参数是epoll_create函数的返回值(epoll的句柄)。
- 第二个参数表示动作,用三个宏来表示。
- 第三个参数是需要监听的fd。
- 第四个参数是告诉内核需要监听什么事。
第二个参数的取值:
- EPOLL_CTL_ADD:注册新的fd到epfd中。
- EPOLL_CTL_MOD:修改已经注册的fd的监听事件。
- EPOLL_CTL_DEL:从epfd中删除一个fd。
struct epoll_event结构如下:

events可以是以下几个宏的集合:
- EPOLLIN:表示对应的文件描述符可读(包括对端socket正常关闭)。
- EPOLLOUT:表示对应的文件描述符可写。
- EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)。
- EPOLLERR:表示对应的文件描述符发生错误。
- EPOLLHUP:表示对应的文件描述符被挂断。
- EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说。
- EPOLLONESHOT:只监听一次事件,当监听完这次事件后,如果还需要监听这个socket的话,需要再次把这个socket加入到EPOLL红黑树里。
epoll_wait
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
收集在epoll监控的事件中已经发生的事件。
- 参数events是已经分配好的epoll_event结构体数组。
- epoll将会把已经发生的事件赋值到events数组中(events不可以是空指针,内核只负责把数据赋值到events这个数组中,不会去帮助我们在用户态中分配内存)。
- maxevents告知内核这个events有多大,这个maxevents的值不能大于创建epoll_create时size的值。
- timeout是超时时间(毫秒,0会立即返回,-1是永久阻塞)。
- 如果函数调用成功,返回对应IO上已经准备好的文件描述符的数量,如返回0表示已超时,返回-1表示出错。
2.3 epoll工作原理

- 当某一进程调用epoll_create函数时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方法密切相关。
struct eventpoll{..../*红⿊树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/struct rb_root rbr;/*双链表中则存放着将要通过epoll_wait返回给⽤⼾的满⾜条件的事件*/struct list_head rdlist;....
};
- 每一个epoll对象有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件。
- 这些事件都会挂载到红黑树中,如此,重复添加的事件就可以通过红黑树高效的识别出来(红黑树的插入时间效率是logn,其中n为事件个数)。
- 而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法。
- 这个回调方法在内核中叫ep_poll_callback,他会将发生的事件添加到rdlist双链表中。
- 在epoll中,对于每一个事件,都会建立一个epitem结构体。
struct epitem{struct rb_node rbn;//红⿊树节点struct list_head rdllink;//双向链表节点struct epoll_filefd ffd; //事件句柄信息struct eventpoll *ep; //指向其所属的eventpoll对象struct epoll_event event; //期待发⽣的事件类型
}
- 当epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可。
- 如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户,这个操作的时间复杂度是O(1)。
总结一下,epoll的使用过程就是三部曲。
- 调用epoll_create创建一个epoll句柄。
- 调用epoll_ctl,将要监控的文件描述符进行注册。
- 调用epoll_wait,等待文件描述符就绪。
2.4 epoll的优点
- 接口使用方便:虽然拆分成了三个函数,但是使用起来反而更加高效方便。不需要每次循环都要设置关注的文件描述符,也做到了输入输出参数分离。
- 数据拷贝轻量:只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中,这个操作并不频繁(而select/poll都是每次循环都要进行拷贝)。
- 事件回调机制:避免使用遍历,而是使用回调函数的方式,将就绪的文件描述符结构加入到就绪队列中,epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪了。这个操作时间复杂度O(1),即使文件描述符数量很多,也不会影响效率。
- 没有数量限制:文件描述符数目无上限。
2.5 epoll工作方式
举个父母喊我们吃饭为例:
假设你正在打王者,这时候饭做好了,父母喊你吃饭的方式一般不相同:
- 如果你妈喊你一次,你没动,你妈会接着喊第二次,第三次......(妈,水平触发)。
- 如果你爸喊你一次,你没动,你爸就不会管你了。(爸,边缘触发)。
epoll有两种工作方式:水平触发(LT)和边缘触发(ET)。
假设这么一个例子:
- 我们已经把一个tcp socket添加到epoll描述符中。
- 这时候socket的另一端被写入了2KB数据。
- 调用epoll_wait,它会返回,说明它已经准备好读取操作。
- 然后调用read,只读取了1KB数据。
- 然后继续调用epoll_wait......
水平触发Level Triggered 工作模式
epoll默认状态下就是LT工作模式
- 当epoll检测到socket上事件就绪的时候,可以不立刻处理,或者只处理其中的一部分。
- 如上面的例子,由于只读取了1KB的数据,缓冲区中还剩1KB数据,在第二次调用 epoll_wait 时,epoll_wait 仍然会返回并立刻通知socket读时间就绪。
- 直到缓冲区上所有数据都被处理完,epoll_wait 才不会立刻返回。
- 支持阻塞读写和非阻塞读写。
边缘触发Edge Triggered 工作模式
如果我们在第一步将socket添加到epoll描述符的时候使用了EPOLLET标志,epoll进入ET工作模式。
- 当epoll检测到socket上事件就绪时,必须立刻处理。
- 如上面的例子,虽然只读取了1KB的数据,缓冲区还剩1KB数据,在第二次调用 epoll_wait 的时候,epoll_wait 不再返回了。
- 也就是说,ET模式下,文件描述符上的事件就绪后,只有一次处理的机会。
- ET的性能比LT的性能更高(epoll_wait 返回的次数少了很多)。Nginx默认采用ET模式。
- 只支持非阻塞读写。
select和poll其实也是工作在LT模式下。epoll既支持LT,也支持ET。
2.6 对比LT和ET
LT是epoll的默认行为。
使用ET能够减少epoll触发次数,但是代价就是强逼着程序员一次响应就绪过程中就把所有的数据处理完。
相当于一个文件描述符就绪后,不会反复被提示就绪,看起来就比LT更高效一些。但是在LT情况下,也能做到每次就绪的文件描述符都立刻处理,不让这个就绪被重复提示的话,其实性能也是一样的。
2.7 理解ET和非阻塞文件描述符
使用ET模式的epoll,需要将文件描述符设为非阻塞。这个不是接口上的要求,而是“工程实践”的要求。
假设这样的场景:服务器接收到一个10K的请求,会向客户端返回一个应答数据。如果客户端收不到应答,不会发送第二个10K请求了。

如果服务端写的代码是阻塞式的read,并且一次只read 1K的数据(read不能保证一次就把所有数据都读出来,参考man手册上的说明,可能被信号打断),剩下的9K数据就会待在缓冲区中。

此时由于epoll是ET模式,并不会认为文件描述符读就绪。epoll_wait 就不会再次返回,剩下的9K数据会一直在缓冲区当中,直到下一次客户端再给服务器写数据,epoll_wait才能返回。
但是问题来了:
- 服务器只读到了1K的数据,要10K读完才会给客户端返回响应数据。
- 客户端要读到服务器的响应,才会发送下一个请求。
- 客户端发送了下一个请求,epoll_wait 才会返回,才能去读缓冲区中剩余的数据。

所以,为了解决上述问题(阻塞read不一定能一下把完整的请求读完),于是就可以使用非阻塞轮询的方式来读取缓冲区,保证一定能把完整的请求读出来。
而如果是LT这没有这个问题。只要缓冲区中的数据没有读完,就能够让 epoll_wait 返回文件描述符就绪的值。
2.8 epoll使用场景
epoll的高性能,是有一定的特定场景的。如果场景选择的不适宜,epoll的性能可能适得其反。
- 对于多连接,且多连接中只有一部分连接比较活跃时,比较适合使用epoll。
例如,典型的一个需要处理上万个客户端的服务器,例如各种互联网APP的入口服务器,这样的服务器就很适合epoll。
如果只是系统内部,服务器和服务器之间进行通信,只有少数的几个连接,这种情况下用epoll就并不合适,具体要根据需求和场景特点来决定使用哪种IO模型。
2.9 epoll示例:epoll服务器(LT模式)
// EpollServer.hpp#pragma once#include <iostream>
#include <string>
#include <memory>
#include <sys/epoll.h>
#include "Common.hpp"
#include "Log.hpp"
#include "Socket.hpp"using namespace LogModule;
using namespace SocketModule;const static int gdefaultfd = -1;class EpollServer
{const static int revs_num = 256;
public:EpollServer(uint16_t port):_port(port),_listen_socket(new TcpSocket),_running(false),_epfd(gdefaultfd){}void Init() {_listen_socket->BuildListenMethod(_port);_epfd = epoll_create(revs_num);if(_epfd < 0) {LOG(LogLevel::ERROR) << "epoll_create error!";exit(EPOLL_CREATE_ERR);}LOG(LogLevel::DEBUG) << "epoll_create success!";struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = _listen_socket->GetSockfd();int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, _listen_socket->GetSockfd(), &ev);if(n < 0) {LOG(LogLevel::ERROR) << "epoll_ctl error";exit(EPOLL_CTL_ERR);}LOG(LogLevel::DEBUG) << "epoll_ctl success!";}void Loop() {_running = true;int timeout = -1;while(_running) {int n = epoll_wait(_epfd, _revs, revs_num, timeout);if(n < 0) perror("epoll_wait");else if(n == 0) std::cout << "time out..." << std::endl;else {std::cout << "有事件就绪了..." << std::endl;Dispatcher(n);}}_running = false;}void Dispatcher(int n) {for(int i = 0; i < n; i++) {if(_revs[i].data.fd == _listen_socket->GetSockfd())Accepter();else Recver(_revs[i].data.fd);}}void Accepter() {InetAddr client;auto cli_socket = _listen_socket->Accept(&client);if(!cli_socket) return;LOG(LogLevel::DEBUG) << "get a new client, info is: " << client.Addr();struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = cli_socket->GetSockfd();int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, cli_socket->GetSockfd(), &ev);if(n < 0) {LOG(LogLevel::ERROR) << "epoll_ctl error";exit(EPOLL_CTL_ERR);}}void Recver(int fd) {char buf[1024];ssize_t n = recv(fd, buf, sizeof(buf) - 1, 0);if(n == 0) {LOG(LogLevel::DEBUG) << "客户端退出,sockfd: " << fd;int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);if(n < 0) {LOG(LogLevel::ERROR) << "epoll_ctl error";exit(EPOLL_CTL_ERR);}close(fd);}else if(n > 0) {buf[n] = 0;std::cout << "client# " << buf << std::endl;std::string message = "echo# ";message += buf;send(fd, message.c_str(), message.size(), 0);}else {LOG(LogLevel::ERROR) << "客户端读取出错,sockfd: " << fd;int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);if(n < 0) {LOG(LogLevel::ERROR) << "epoll_ctl error";exit(EPOLL_CTL_ERR);}close(fd);}}
private:uint16_t _port;Socket* _listen_socket;bool _running;int _epfd;struct epoll_event _revs[revs_num];
};
// EpollServer.cc#include "EpollServer.hpp"int main(int argc, char* argv[])
{if(argc != 2) {std::cout << "Usage: " << argv[0] << " server_port" << std::endl;return 1;}uint16_t port = std::stoi(argv[1]);EpollServer ss(port);ss.Init();ss.Loop();return 0;
}


2.10 epoll示例:epoll服务器(ET模式)
基于 LT 版本稍加修改即可。
- 修改Socket.hpp,新增设置文件描述符为非阻塞、非阻塞读和非阻塞写功能。
- 对于 accpet 返回的 cli_socket 加上 EPOLLET 选项。
注意:这里暂时未考虑 listen_socket ET 的情况,如果将 listen_socket 设为ET,则需要非阻塞轮询的方式accept,否则会导致同一时刻大量客户端同时连接的情况,只能accept一次的情况。
// 以下代码添加在 TcpSocket 类中
// ⾮阻塞 IO 接⼝
bool SetNoBlock() {int fl = fcntl(_sockfd, F_GETFL);if (fl < 0) {perror("fcntl F_GETFL");return false;}int ret = fcntl(_sockfd, F_SETFL, fl | O_NONBLOCK);if (ret < 0) {perror("fcntl F_SETFL");return false;}return true;
}bool RecvNoBlock(std::string* buf) const {// 对于⾮阻塞 IO 读数据, 如果 TCP 接受缓冲区为空, 就会返回错误// 错误码为 EAGAIN 或者 EWOULDBLOCK, 这种情况也是意料之中, 需要重试// 如果当前读到的数据⻓度⼩于尝试读的缓冲区的⻓度, 就退出循环// 这种写法其实不算特别严谨(没有考虑粘包问题)buf->clear();char tmp[1024 * 10] = { 0 };for (;;) {ssize_t read_size = recv(_sockfd, tmp, sizeof(tmp) - 1, 0);if (read_size < 0) {if (errno == EWOULDBLOCK || errno == EAGAIN) {continue;}perror("recv");return false;}if (read_size == 0) {// 对端关闭, 返回 falsereturn false;}tmp[read_size] = '\0';*buf += tmp;if (read_size < (ssize_t)sizeof(tmp) - 1) {break;}}return true;
}bool SendNoBlock(const std::string& buf) const {// 对于⾮阻塞 IO 的写⼊, 如果 TCP 的发送缓冲区已经满了, 就会出现出错的情况// 此时的错误号是 EAGAIN 或者 EWOULDBLOCK. 这种情况下不应放弃治疗// ⽽要进⾏重试ssize_t cur_pos = 0; // 记录当前写到的位置ssize_t left_size = buf.size();for (;;) {ssize_t write_size = send(_sockfd, buf.data() + cur_pos, left_size, 0);if (write_size < 0) {if (errno == EAGAIN || errno == EWOULDBLOCK) {// 重试写⼊continue;}return false;}cur_pos += write_size;left_size -= write_size;// 这个条件说明写完需要的数据了if (left_size <= 0) {break;}}return true;
}
剩下的一些调整,相信大家能够完成,这里就不写出来了。
