高级IO——多路转接方案epoll原理及简单应用
文章目录
- 多路转接方案epoll原理及简单应用
- epoll初识
- epoll相关系统调用
- 初步理解epoll底层实现原理
- 宏观结构
- 宏观执行流程
- 回调机制
- 细节部分
- 部分源码解析
- 代码实践——基于epoll的echo server
多路转接方案epoll原理及简单应用
虽然我们已经学习过两种多路转接的方案——select和poll。
但是,上述的两种方案还是各有一些缺点。实际上,使用最多的不是这两种方案。
在当今的系统内核中,最常用的,也是最好的解决方案是——epoll(extend poll),其实就是poll的扩展版本(但其实在实现上差别还是很大的)。
epoll非常重要,我们将分为两个部分进行讲解。
本篇文章是是epoll的第一个部分,理解epoll的原理以及能够简单地实现应用!
epoll初识
首先明确epoll核心地位:epoll是基于对多个fd等待的就绪事件的通知机制,同select和poll!
按照 man 手册的说法: 是为处理大批量句柄而作了改进的 poll. 它是在 2.5.44 内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44)
它几乎具备了之前所说的一切优点,被公认为 Linux2.6 下性能最好的多路 I/O 就绪通知方法
epoll相关系统调用
epoll讲解起来是比较复杂的。我们先来认识接口,然后再反向通过原理来理解这些接口。
epoll_create:
SYNOPSIS#include <sys/epoll.h>int epoll_create(int size);RETURN VALUEOn success, these system calls return a file descriptor (a nonnegative integer). On error, -1 is returned, and errno is set to indicate the error.
作用:创建一个 epoll 的句柄,我们目前理解为创建一个epoll模型即可!先不做深究。
自从 linux 2.6.8 之后,size 参数是被忽略的.
用完之后, 必须调用 close()关闭
size参数在linux 2.6.8以前,是有作用的!表示epoll能够一次性等待的fd个数。
但是,在该版本后,该参数被忽略了。只要随便给一个大于0的数即可。
我们还会发现,epoll_create的返回值是一个文件描述符,所以,epoll是和Linux的文件管理相关的!但是具体如何关联的,我们放在原理部分来说。
epoll_ctl:
SYNOPSIS#include <sys/epoll.h>int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);RETURN VALUEWhen successful, epoll_ctl() returns zero. When an error occurs, epoll_ctl() returns -1 and errno is set appropriately.
这个从名字上来看:epoll control
,即控制epoll的!
第一个参数就是我们通过epoll_create返回的一个epoll_fd
第二个op是操作:
EPOLL_CTL_ADD
:注册新的 fd 到 epfd 中;
EPOLL_CTL_MOD
:修改已经注册的 fd 的监听事件;
EPOLL_CTL_DEL
:从 epfd 中删除一个 fd;
第三个参数是一个文件描述符,用户告诉内核要关注的!
第四个参数是一个结构体指针,结构体对象如下所示:
struct epoll_event {uint32_t events; /* Epoll events */epoll_data_t data; /* User data variable */
};
uint32_t events
:表示的对于fd这个文件描述符所需要关注的事件:
事件标志 | 描述 |
---|---|
EPOLLIN | 表示对应的文件描述符可以读(包括对端 Socket 正常关闭) |
EPOLLOUT | 表示对应的文件描述符可以写 |
EPOLLPRI | 表示对应的文件描述符有紧急的数据可读(通常指带外数据到来) |
EPOLLERR | 表示对应的文件描述符发生错误 |
EPOLLHUP | 表示对应的文件描述符被挂断 |
EPOLLET | 将 EPOLL 设为边缘触发(Edge Triggered)模式(相对于水平触发模式) |
EPOLLONESHOT | 只监听一次事件,如需继续监听需重新将此 socket 加入到 EPOLL 队列中 |
这里和poll是一样的,所谓的事件就是一个个的宏定义过的值!且只有一个比特位上为1。
这里我们重点关注EPOLLIN 读和EPOLLOUT 写即可。
epoll_data_t data
:这是一个联合体,其大小取决于内部最大的那个成员变量的大小。
typedef union epoll_data {void *ptr;int fd;uint32_t u32;uint64_t u64;
} epoll_data_t;
一般就是用来携带一些数据的,最常用的就是int fd
,表示的被关注的fd。
(font color = “REd”>Tips:这个联合体部分虽然传入函数,但内核不做修改!)
该函数的返回值很简单:
操作成功返回0,反之返回-1!
epoll_wait:
SYNOPSIS#include <sys/epoll.h>int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
RETURN VALUEWhen successful, epoll_wait() returns the number of file descriptorsready for the requested I/O, or zero if no file descriptor became readyduring the requested timeout milliseconds. When an error occurs, epoll_wait() returns -1 and errno is set appropriately.
这个函数,是用来获取已就绪事件的!
第一个参数也是通过epoll_create返回的epoll_fd
第二个参数和第三个参数是配套使用的,就是一段连续空间和空间的大小!
每个空间内存储的就是struct epoll_event
,这个在上面那个接口已经看过了
第四个参数就是超时时间,用法和poll那里是一致的,不再赘述。
返回值:
如果返回-1,说明出现错误。返回0代表超时,返回x(x > 0),代表返回来的就绪事件个数!
初步理解epoll底层实现原理
接下来,我们直接来看epoll的底层实现原理,当然,一次性讲完不太可能。
我们这里主要是需要有一个对epoll的宏观认识,后序我们我们还要在理解epoll的其他细节。
宏观结构
这里就不讲什么例子来作为引入了,直接上原理:
首先,epoll技术实现的时候,底层会有一个红黑树!还有一个就绪队列!
作用已经展示了,下面就不再说了!
首先,epoll_create是创建了什么?其实是创建了一整个epoll模型!(红色部分)
每个红黑树的节点大致是这样的:
struct RBTNode{color_t color; int fd;uint32_t events;RBTNode* left;RBTNode* right;//...
};
宏观执行流程
现在我们知道了epoll模型底层的结构,但是,它是如何运行起来的呢?
所以,epoll_create这个接口,创建的就是这个模型!注册好一些结构,方法。
(比如如何检测到事件就绪,如何把就绪事件链入到就绪队列)!然后返回一个文件描述符!
文件描述符指向的是一整个epoll模型!
epoll_ctl,控制的就是文件描述符是否需要关心,是否需要删除关注,是否修改关注事件!
本质上,就是在对模型内的红黑树进行增删改!(不需要进行查询!)
epoll_wait,就是从就绪队列里面拿取就绪事件到上层的缓冲区中!(缓冲区是用户管理的)。
回调机制
我们现在只是搞清楚了总体的流程,但是,对内的细节还是不太清楚。
现在,我们需要明白的是:
底层流程内,epoll如何知道事件就绪?如何检测到后再来进行红黑树节点链入队列呢?
首先,我们需要重新对epoll内的数据结构做重新认识:
我们上面那样画图,很容易会让人觉得,epoll内就是一个红黑树和一个队列。
当检测到红黑树内有一个fd就绪后,就把这节点从红黑树上复制一个,然后加入到队列。
实际上,这么做,完全没有问题,也没有错误!
但是,epoll内并不是这么做的,因为这样要存放两个数据结构!效率还是比较低的!
在最开始系统部分讲系统进程管理的时候,我们就知道:
不同类型的数据结构是可以共存的!比如进程中队列和链表共存!
其实就是通过在节点内存放一个链表节点list_head,真正起到节点连接的是这个链表节点,然后通过计算偏移量来获取下一个节点起始位置!
所以,在今天这个epoll模型内来看,红黑树和队列共存很难做到吗?很难理解吗?
一点也不难,队列实现基本上都是链表形式的!所以,只需要在红黑树的节点内加上这么一个list_head节点,然后有一个fd就绪,就把节点的链接节点进行连接操作即可!
其次,epoll是怎么检测到事件就绪的呢?我们就以读事件为例!
读事件就绪,就代表着底层有数据来了,可以进行读取了!在网络通信中,数据从对方的网卡发送来,也必然是我方主机的网卡先收到!
内核怎么知道数据已经发送来网卡处了呢? -> 硬件中断!
操作系统就是一个基于中断的软件!所以,一旦有中断 -> 执行中断向量表内的方法。
然后向上交付,解包分用,直到传输层!
epoll的介入时机:
当数据到达socket缓冲区后,内核会检查是否有epoll监视该socket
若有,则触发epoll就绪回调(ep_poll_callback()),将socket加入就绪队列
该回调可能通过wake_up_locked()唤醒阻塞在epoll_wait()的进程
一旦检测到有事件就绪,且有epoll在监视该socket,就会执行回调方法!
这个回调方法是:激活红黑树中就绪fd对应的节点,链入就绪队列(本质是指针的操作)
这个回调方法是早就被设置好的!具体什么时候设置的呢?我们在细节部分进行讲解。
细节部分
1.如何正确看待就绪队列?
就绪队列,不能把它真的当作成是一个完整的数据结构队列,其实是在红黑树节点上通过链表节点进行连接的一个指针链罢了!
还有就是,就绪队列,不就是一个标准的生产者消费者模型吗?
当有事件就绪,epoll内执行回调方法,激活该节点,加入到就绪队列中。否则没办法加入。
上层用户通过epoll_wait来获取就绪事件,可以是阻塞、也可以是定时轮询。
这就是一个典型的生产者消费者模型!
所以,有没有可能同时多个进程/线程来对这个就绪队列进行加入/获取呢?
当然有可能!所以,需要保证这个临界资源的安全!
所以,epoll底层实现的时候,就已经做了线程安全的保证了!否则不可能应用广泛的。
2.从就绪队列读取就绪事件到上层缓冲区存储时,如果缓冲区大小不足以读完所有就绪事件?
假设现在就绪队列中,有128个事件就绪,等待上层用户取走处理。但是,我们的缓冲区大小只有64,一次处理不完怎么办,剩下的怎么办?
不用担心,拿不走就不拿!因为epoll会一直对红黑树中的节点进行监测。如果事件就绪了添加到就绪队列后,但是又没拿走处理。
下一次epoll还是能检测到它是没有被处理的,还是会继续触发回调,再次激活节点!
又或是epoll还专门针对于这个场景做了不同模式的处理,这个我们不需要担心!
3.注册回调函数的时机是什么时候?
答案:就是在使用epoll_ctl添加新的节点的时候!
添加新的节点的时候,大致需要做两件事情:
1.创建新的节点,添加到红黑树中(不激活)
2.对这个新的节点,进行注册回调方法!
注意,这个回调方法在内核中是看的见的:ep_poll_callback
但是,我们这里要说的是:每个节点都要注册该节点自己使用的回调方法!
而且,所有的回调方法也会按照链式结构进行连接!
每个节点内都会有一个指针指向该回调函数,所有的回调函数会连接在一起形成回调队列。
(其实也就是也是链表 + 红黑树,上面为了逻辑清晰故意分开画的!)
问题:可问题是,上面说了回调函数都是
ep_poll_callback
,为什么还要每个节点都注册?
答案:因为每个节点关注的事件一定一样吗?那当然不一定!
所以,每个节点的回调函数虽然一样,但是需要根据该节点的关注的事件状况、来进行特定逻辑操作
所以,ep_poll_callback
函数内,必然是会有根据不同的事件而执行不同的操作的逻辑的!
4.事件就绪检测的时间复杂度及上层处理的优化
所以,对于今天的epoll技术来说,检测事件是否就绪,是否还需要像select和poll那样,遍历整个位图或者数组吗?答案是:根本不需要!
对于select和poll来说,这是无奈之举!因为它们实现的机制就导致了必须要进行线性遍历。
但是,epoll不需要!它把未就绪和就绪分离了!
通过就绪队列的设置,只要判断就绪队列是否为空,就能判断是否有事件就绪。
时间复杂度直接从O(N) -> O(1),在检测事件就绪这方面,效率大大提高!
上次不再需要使用额外的数据结构,只需要拿着一个缓冲区来进行接收即可。
内核会把就绪队列中的就绪事件,严格的按照从0下标的顺序拷贝至用户缓冲区的!
所以,最后得到的就绪事件个数 = min(上层缓冲区大小, 就绪队列大小)
而且上层从就绪队列中取出来的,都是已经就绪的!不需要再做检测!
部分源码解析
接下来,我们一起来看看epoll在底层实现的部分源码,加深对于epoll的理解!
首先,epoll_create的核心功能就是创建并初始化一个 struct eventpoll实例
它是整个 epoll机制管理结构:
struct eventpoll {spinlock_t lock;struct mutex mtx;wait_queue_head_t wq;wait_queue_head_t poll_wait;struct list_head rdllist; // 就绪队列struct rb_root_cached rbr; // 红黑树(优化版)// ...
};
其次即是红黑树节点、又是就绪队列节点的数据结构是struct epitem:
// fs/eventpoll.c
struct epitem {struct rb_node rbn; // 红黑树节点(用于挂载到 eventpoll->rbr)struct list_head rdllink; // 就绪队列链表节点(挂载到 eventpoll->rdllist)struct epitem *next; // 用于溢出列表(eventpoll->ovflist)struct epoll_filefd ffd; // 关联的文件描述符和文件对象struct eventpoll *ep; // 指向所属的 eventpoll 实例struct list_head pwqlist; // 等待队列(poll wait queues)链表头struct epoll_event event; // 用户设置的事件掩码(EPOLLIN/OUT/ET 等)// ... 其他调试和统计字段
};
// fs/eventpoll.c
struct epoll_filefd {struct file *file; // 指向文件对象的指针int fd; // 文件描述符(用户层传入的 fd)
};
挂载到红黑树的是struct rb_node rbn
,挂载到就绪队列的是struct list_head rdllink
具体的代码逻辑如上述所示!下面我们用一张图来总结一下这个流程:
但是,上面我们没有说到关于回调机制是如何实现的!现在我们一起来看一下:
首先,我们已经知道:每个节点的回调函数是在添加节点到红黑树的时候做的!
也就是操作:
struct epoll_event ev;
//设置ev相关信息
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev);
那我们要先找到这个操作:
// fs/eventpoll.c
asmlinkage long sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event)
{int error;struct file *file, *tfile;struct eventpoll *ep;struct epitem *epi;struct epoll_event epds;// 1. 从用户空间拷贝 epoll_event 结构(ADD/MOD 操作需要)if (copy_from_user(&epds, event, sizeof(struct epoll_event)))return -EFAULT;// 2. 获取 epoll 实例的文件描述符(epfd)对应的 struct fileerror = -EBADF;file = fget(epfd);if (!file)goto error_return;// 3. 获取目标 fd(需要操作的文件描述符)对应的 struct filetfile = fget(fd);if (!tfile)goto error_fput;// 4. 检查 epfd 是否指向一个真正的 epoll 实例error = -EINVAL;if (!is_file_epoll(file))goto error_tgt_fput;// 5. 获取 epoll 实例的 eventpoll 结构ep = file->private_data;// 6. 针对不同操作进行处理switch (op) {case EPOLL_CTL_ADD:error = ep_insert(ep, &epds, tfile, fd);break;case EPOLL_CTL_DEL:error = ep_remove(ep, tfile, fd);break;case EPOLL_CTL_MOD:error = ep_modify(ep, &epds, tfile, fd);break;default:error = -EINVAL;break;}// 7. 清理资源
error_tgt_fput:fput(tfile);
error_fput:fput(file);
error_return:return error;
}
我们找到内部的添加新的节点到红黑树的逻辑:
然后进入接口ep_insert
查看:
// fs/eventpoll.c
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,struct file *tfile, int fd)
{int error, revents, pwake = 0;unsigned long flags;struct epitem *epi;struct ep_pqueue epq;// 1. 分配 epitem 内存(开辟节点空间)if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))return -ENOMEM;// 2. 初始化 epitem 结构体memset(epi, 0, sizeof(*epi));INIT_LIST_HEAD(&epi->rdllink);INIT_LIST_HEAD(&epi->pwqlist);epi->ep = ep;ep_set_ffd(&epi->ffd, tfile, fd);epi->event = *event;epi->nwait = 0;epi->revents = 0;// 3. 初始化 poll 队列epq.epi = epi;init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);// 4. 调用文件系统的 poll 方法注册回调revents = tfile->f_op->poll(tfile, &epq.pt);// 5. 获取自旋锁保护数据结构(用于线程安全)spin_lock_irqsave(&ep->lock, flags);// 6. 检查是否成功注册了等待队列if (epi->nwait < 0) {error = -EINVAL;goto error_unlock;}// 7. 将 epitem 插入红黑树ep_rbtree_insert(ep, epi);// 8. 如果文件描述符已经就绪,将其加入就绪队列if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {list_add_tail(&epi->rdllink, &ep->rdllist);pwake = 1;}// 9. 释放自旋锁spin_unlock_irqrestore(&ep->lock, flags);// 10. 如果需要,唤醒等待的进程if (pwake)ep_poll_safewake(&ep->poll_wait);return 0;// 错误处理路径
error_unlock:spin_unlock_irqrestore(&ep->lock, flags);kmem_cache_free(epi_cache, epi);return error;
}
我们可以看到注册回调函数的逻辑!
我们进入到注册回调逻辑函数内部:
poll方法的作用:
检查当前文件描述符的就绪状态(如 socket 是否有数据可读)
调用 poll_table->qproc(即 ep_ptable_queue_proc)注册回调
poll方法的作用:
- 检查当前文件描述符的就绪状态(如 socket 是否有数据可读)
- 调用 poll_table->qproc(即 ep_ptable_queue_proc)注册回调
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,poll_table *pt) {struct epitem *epi = ep_item_from_epqueue(pt);struct eppoll_entry *pwq;// 分配等待队列条目pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL);init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); // 绑定 ep_poll_callbackpwq->whead = whead; // 设备的等待队列(如 socket 的 sk->sk_wq)pwq->base = epi; // 关联的 epitem// 将回调添加到设备的等待队列add_wait_queue(whead, &pwq->wait);// 记录到 epitem 的 pwqlist(用于后续删除)list_add_tail(&pwq->llink, &epi->pwqlist);epi->nwait++;
}
注册回调函数ep_poll_callback:
把回调函数添加到回调函数队列
回调函数内部实现:
// fs/eventpoll.c
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{int pwake = 0;unsigned long flags;struct epitem *epi = ep_item_from_wait(wait);struct eventpoll *ep = epi->ep;// 获取自旋锁保护数据结构spin_lock_irqsave(&ep->lock, flags);// 1. 检查事件是否匹配用户设置if (key && !((unsigned long)key & epi->event.events))goto out_unlock;// 2. 如果用户设置了 EPOLLONESHOT,标记为已触发if (epi->event.events & EPOLLONESHOT) {if (epi->event.events & EPOLLIN)epi->event.events &= ~EPOLLIN;if (epi->event.events & EPOLLOUT)epi->event.events &= ~EPOLLOUT;}// 3. 将 epitem 加入就绪队列(如果尚未加入)if (!ep_is_linked(&epi->rdllink)) {list_add_tail(&epi->rdllink, &ep->rdllist);ep_pm_stay_awake(epi);pwake = 1;}// 4. 唤醒阻塞在 epoll_wait 的进程if (pwake && waitqueue_active(&ep->wq))wake_up_locked(&ep->wq);// 5. 如果使用了嵌套 epoll,唤醒父 epollif (waitqueue_active(&ep->poll_wait))pwake++;out_unlock:spin_unlock_irqrestore(&ep->lock, flags);// 6. 如果需要,唤醒父 epollif (pwake)ep_poll_safewake(&ep->poll_wait);return 1;
}
回调函数本质工作就是激活红黑树的节点,链入就绪队列!具体的逻辑我们不需要看!
具体的注册回调流程:
ep_insert()
→ 文件系统的 poll()
→ ep_ptable_queue_proc()
→ 初始化等待队列项
→ 绑定ep_poll_callback
至此,我们就看到了回调函数是如何被设置到每个节点的!
执行流程:
回调机制具体是以什么方式设置到每个节点的,以及具体是如何找到回调的,真要看源码的来理解的话还是有些复杂的!我们只需要知道宏观逻辑即可。
至此,当双方使用套接字进行通信的时候:
如果该套接字对应的读/写事件没有就绪,进程会阻塞!
而一旦底层检测到了事件就绪,也是会触发对应的回调机制,即对事件进行处理,向上传输。
一直向上解包分用,直到传输层的时候:
1.若发现有epoll监视该套接字,就让epoll交付给就绪队列
2.反之,唤醒因为要对该套接字事件处理而阻塞在该套接字阻塞队列的进程!
最后来理解,为什么相比于epoll,poll和select会慢!
因为poll和select在检测是否有事件就绪的时候,需要从头至尾遍历存储的事件的数组/位图。
如果有就绪的就要把在该套接字下的阻塞队列的进程拿到运行队列!然后若干时间后返回。
如果未就绪的,就需要继续向后查找是否有就绪的。
这个过程需要遍历、还需要把进程的PCB进行队列移动操作,是比较耗低效的!
但是,epoll需要吗?不需要!
因为没有就绪的,会放在红黑树中,不会激活该节点!
只要有检测到就绪,就把它放在就绪队列中!检测是否有就绪事件的效率为O(1)!
所以,上层用户拿到的,一定是已经就绪的了!
核心差距:epoll用事件驱动替代轮询,从根本上减少了无效遍历!
-> 再说个大白话就是:epoll就绪和未就绪事件是分离的!
代码实践——基于epoll的echo server
本篇文章,还是基于epoll的技术实现一个简单地echo server(只处理读端)!
后序还要再讲解epoll的其他相关细节,也还要再讲解一种设计模式!到时候再来处理。
本篇文章,我们只需要能够理解到epoll相关的接口的使用即可。
代码其实非常简单,这里就不再过多讲述了
代码放在个人的gitee上,需要可自取:基于epoll实现的echo server