I/O模式之epoll,本文会讲到epoll的相关接口以及底层,还会涉及水平和边缘工作模式,以及通过epoll相关接口实现一个水平工作模式服务端
1.epoll相关系统调用
简单总结就是create用于创建句柄ctl用于添加或者删除关心事件wait用于获取有响应的事件
2.epoll工作原理
当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关
struct eventpoll{ .... /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/ struct rb_root rbr; /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/ struct list_head rdlist; ....
};
每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件.
这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度).
而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法.
这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中.
在epoll中,对于每一个事件,都会建立一个epitem结构体
struct epitem{ struct rb_node rbn;//红黑树节点 struct list_head rdllink;//双向链表节点 struct epoll_filefd ffd; //事件句柄信息 struct eventpoll *ep; //指向其所属的eventpoll对象 struct epoll_event event; //期待发生的事件类型
}
当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可.
如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户. 这个操作的时间复杂度是O(1).
epoll的优点(和 select 的缺点对应)
接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而select/poll都是每次循环都要进行拷贝)
事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中,epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响.没有数量限制: 文件描述符数目无上限
3.epoll工作方式
epoll工作方式分为水平触发和边缘触发
水平触发:当一个socket上的事件在第一次响应时内容没有读完,那么下一次wait后这个socket还是会响应直到这个socket内的数据读完
水平触发Level Triggered 工作模式
epoll默认状态下就是LT工作模式.
当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分.
如上面的例子, 由于只读了1K数据, 缓冲区中还剩1K数据, 在第二次调用 epoll_wait 时, epoll_wait仍然会立刻返回并通知socket读事件就绪.直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回.
支持阻塞读写和非阻塞读写
边缘触发:wait后如果响应了,服务端这里没有做处理那么下一次wait并不会响应
边缘触发Edge Triggered工作模式
如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式.当epoll检测到socket上事件就绪时, 必须立刻处理.如上面的例子, 虽然只读了1K的数据, 缓冲区还剩1K的数据, 在第二次调用 epoll_wait 的时候,epoll_wait 不会再返回了.也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会.
ET的性能比LT性能更高( epoll_wait 返回的次数少了很多). Nginx默认采用ET模式使用epoll.
只支持非阻塞的读写
理解ET模式和非阻塞文件描述符
使用 ET 模式的 epoll, 需要将文件描述设置为非阻塞. 这个不是接口上的要求, 而是 "工程实践" 上的要求.
假设这样的场景: 服务器接受到一个10k的请求, 会向客户端返回一个应答数据. 如果客户端收不到应答, 不会发送第二个10k请求.
如果服务端写的代码是阻塞式的read, 并且一次只 read 1k 数据的话(read不能保证一次就把所有的数据都读出来,参考 man 手册的说明, 可能被信号打断), 剩下的9k数据就会待在缓冲区中
此时由于 epoll 是ET模式, 并不会认为文件描述符读就绪. epoll_wait 就不会再次返回. 剩下的 9k 数据会一直在缓冲区中. 直到下一次客户端再给服务器写数据. epoll_wait 才能返回但是问题来了.服务器只读到1k个数据, 要10k读完才会给客户端返回响应数据.
客户端要读到服务器的响应 , 才会发送下一个请求而客户端发送了下一个请求, epoll_wait 才会返回, 才能去读缓冲区中剩余的数据.
所以, 为了解决上述问题(阻塞read不一定能一下把完整的请求读完), 于是就可以使用非阻塞轮训的方式来读缓冲区,保证一定能把完整的请求都读出来.
而如果是LT没这个问题. 只要缓冲区中的数据没读完, 就能够让 epoll_wait 返回文件描述符读就绪.
4.epoll服务端实现
这里提供的服务器代码是水平(LT)工作模式的
#include <iostream>
#include <unistd.h>
#include <sys/epoll.h>class Epoll
{const int size = 128;public:Epoll() : _timeout(3000){_epfd = epoll_create(size);if (_epfd < 0){std::cout<<"epoll_create fail..."<<std::endl;close(_epfd);}}~Epoll() { close(_epfd); }public:void Epoll_Ctl(int op, int fd, uint32_t event){if (op == EPOLL_CTL_DEL){int n = epoll_ctl(_epfd, op, fd, nullptr);if (n != 0)std::cout << "epoll_ctl delete fail..." << std::endl;}else{struct epoll_event ev;ev.data.fd = fd;ev.events = event;int n = epoll_ctl(_epfd, op, fd, &ev);if (n != 0)std::cout << "epoll_ctl add epoll_event fail..." << std::endl;}}int Epoll_Wait(struct epoll_event *events, int maxevents){int n = epoll_wait(_epfd, events, maxevents, _timeout);if (n == 0)std::cout << "epoll_wait timeout..." << std::endl;else if (n < 0)std::cout << "epoll_wait fail..." << std::endl;return n;}private:int _epfd;int _timeout;
};
#include <iostream>
#include <memory>
#include <string>
#include "epoll.hpp"
#include "Socket.hpp"const uint16_t default_port = 8888;
const int max_event = 64;
const int max_buffer = 1024;class epoll_server
{
public:epoll_server() :_listen_fd(new Sock),_epoll_fd(new Epoll){}~epoll_server(){}
public:void init(uint16_t port = default_port){_listen_fd->Socket();_listen_fd->Bind(port);_listen_fd->Listen();}void start(){_epoll_fd->Epoll_Ctl(EPOLL_CTL_ADD,_listen_fd->Fd(),EPOLLIN);for(;;){struct epoll_event events[max_event];int n = _epoll_fd->Epoll_Wait(events,max_event);for(int i = 0;i<n;i++){if(events[i].data.fd == _listen_fd->Fd()){//客户端请求连接事件uint16_t client_port;std::string client_ip;int client_fd = _listen_fd->Accept(&client_ip,&client_port);_epoll_fd->Epoll_Ctl(EPOLL_CTL_ADD,client_fd,EPOLLIN);}else{char buffer[max_buffer];memset(buffer,0,sizeof buffer);int len = read(events[i].data.fd,buffer,sizeof buffer);if(len < 0){std::cout<<"read fail..."<<std::endl;return ;}else if(len == 0){std::cout<<"read content is 0..."<<std::endl;_epoll_fd->Epoll_Ctl(EPOLL_CTL_DEL,events[i].data.fd,0);}else{std::cout<<"client talk#"<<buffer<<std::endl;}}}}}
private:std::shared_ptr<Sock> _listen_fd;std::shared_ptr<Epoll> _epoll_fd;
};