当前位置: 首页 > news >正文

【Linux网络】多路转接poll、epoll

    🌈个人主页:秦jh__https://blog.csdn.net/qinjh_?spm=1010.2135.3001.5343
🔥 系列专栏:https://blog.csdn.net/qinjh_/category_12891150.html

9efbcbc3d25747719da38c01b3fa9b4f.gif

目录

poll

poll 函数接口

poll 的优点

poll 的缺点

代码样例

 epoll

epoll 的相关系统调用

epoll_create

​编辑

epoll_ctl

epoll_wait

​编辑

epoll 工作原理 

 代码样例

epoll 的优点(和 select 的缺点对应)

epoll 工作方式 

对比 LT 和 ET 

理解 ET 模式和非阻塞文件描述符 

 epoll 的使用场景

多路转接的write

Reactor 反应堆模式

 Comm.hpp

Connection.hpp

Epoller.hpp

 HandlerConnection.hpp

 Listener.hpp

 Main.cc

 PackageParse.hpp

 Reactor.hpp


前言

    💬 hello! 各位铁子们大家好哇。

             今日更新了Linux多路转接poll,epoll的内容
    🎉 欢迎大家关注🔍点赞👍收藏⭐️留言📝

poll

poll 函数接口

poll的作用和select一样。

参数说明

  • fds 是一个 poll 函数监听的结构列表. 每一个元素中, 包含了三部分内容: 文件描述符, 监听的事件集合, 返回的事件集合.
  • nfds 表示 fds 数组的长度.
  • timeout 表示 poll 函数的超时时间, 单位是毫秒(ms).比如:timeout==5000,也就是5秒以内,阻塞等待所有的文件描述符。超时的话,poll就直接timeout。如果时间内有事件就绪,poll就会提交返回。;如果timeout==0,就采用非阻塞方式等待;如果timeout==-1,他就会阻塞等待,直到有事件就绪。
  • 返回值>0,有几个fd就绪了;==0,超时了;<0,poll出错了。

pollfd的结构 

1.用户告诉内核,你要帮我关系哪些fd上的哪些事件:fd和events;

2.内核告诉用户,你让我关心的哪些fd上的哪些事件,已经就绪了:fd和revents;

poll 的优点

不同于 select 使用三个位图来表示三个 fdset 的方式,poll 使用一个pollfd 的指针实现.

  • pollfd 结构包含了要监视的 event 和发生的 event,不再使用select“参数-值”传递的方式. 接口使用比 select 更方便.
  • poll 并没有最大数量限制 (但是数量过大后性能也是会下降). 

poll 的缺点

poll 中监听的文件描述符数目增多时

  • 和select 函数一样,poll 返回后,需要轮询 pollfd 来获取就绪的描述符.
  • 每次调用 poll 都需要把大量的 pollfd 结构从用户态拷贝到内核中.
  • 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降 

代码样例

PollSever.hpp

#pragma once#include <iostream>
#include <poll.h>
#include "Socket.hpp"
#include "Log.hpp"
#include "InetAddr.hpp"using namespace socket_ns;class PollServer
{const static int gnum = sizeof(fd_set) * 8;const static int gdefaultfd = -1;public:PollServer(uint16_t port) : _port(port), _listensock(std::make_unique<TcpSocket>()){_listensock->BuildListenSocket(_port);}void InitServer(){for (int i = 0; i < gnum; i++){fd_events[i].fd = gdefaultfd;fd_events[i].events = 0;fd_events[i].revents = 0;}fd_events[0].fd = _listensock->Sockfd(); // 默认直接添加listensock到数组中fd_events[0].events = POLLIN;}// 处理新连接的void Accepter(){// 我们叫做连接事件就绪,等价于读事件就绪InetAddr addr;int sockfd = _listensock->Accepter(&addr); // 会不会被阻塞!一定不会!if (sockfd > 0){LOG(DEBUG, "get a new link, client info %s:%d\n", addr.Ip().c_str(), addr.Port());// 已经获得了一个新的sockfd// 接下来我们可以读取吗?绝对不能读!读取的时候,条件不一定满足// 谁最清楚底层fd的数据是否就绪了呢??通过select!// 想办法把新的fd添加给select,由select统一进行监管。怎么做到??// select 为什么等待的fd会越来越多??// 只要将新的fd,添加到fd_array中即可!bool flag = false;for (int pos = 1; pos < gnum; pos++){if (fd_events[pos].fd == gdefaultfd){flag = true;fd_events[pos].fd = sockfd;fd_events[pos].events = POLLIN;LOG(INFO, "add %d to fd_array success!\n", sockfd);break;}}if (!flag){LOG(WARNING, "Server Is Full!\n");::close(sockfd);// 扩容// 添加}}}// 处理普通的fd就绪的void HandlerIO(int i){// 下面的读写对吗?// 普通的文件描述符,正常的读写char buffer[1024];ssize_t n = ::recv(fd_events[i].fd, buffer, sizeof(buffer) - 1, 0); // 这里读取会阻塞吗?不会if (n > 0){buffer[n] = 0;std::cout << "client say# " << buffer << std::endl;std::string content = "<html><body><h1>hello bite</h1></body></html>";std::string echo_str = "HTTP/1.0 200 OK\r\n";echo_str += "Content-Type: text/html\r\n";echo_str += "Content-Length: " + std::to_string(content.size()) + "\r\n\r\n";echo_str += content;// echo_str += buffer;::send(fd_events[i].fd, echo_str.c_str(), echo_str.size(), 0); // 临时方案}else if (n == 0){LOG(INFO, "client quit...\n");// 关闭fd::close(fd_events[i].fd);// select 不要在关心这个fd了fd_events[i].fd = gdefaultfd;fd_events[i].events = 0;fd_events[i].revents = 0;}else{LOG(ERROR, "recv error\n");// 关闭fd::close(fd_events[i].fd);// select 不要在关心这个fd了fd_events[i].fd = gdefaultfd;fd_events[i].events = 0;fd_events[i].revents = 0;}}// 一定会存在大量的fd就绪,可能是普通sockfd,也可能是listensockfdvoid HandlerEvent(){// 事件派发for (int i = 0; i < gnum; i++){if (fd_events[i].fd == gdefaultfd)continue;// fd一定是合法的fd// 合法的fd不一定就绪, 判断fd是否就绪?if (fd_events[i].revents & POLLIN){// 读事件就绪// 1. listensockfd 2. normal sockfd就绪?if (_listensock->Sockfd() == fd_events[i].fd){Accepter();}else{HandlerIO(i);}}}}void Loop(){int timeout = -1;while (true){// _listensock->Accepter();// 不能,listensock && accept 我们把他也看做IO类的函数。只关心新链接到来,等价于读事件就绪!int n = ::poll(fd_events, gnum, timeout); // 临时switch (n){case 0:LOG(DEBUG, "time out\n");break;case -1:LOG(ERROR, "poll error\n");break;default:// LOG(DEBUG, "time out, %d.%d\n", timeout.tv_sec, timeout.tv_usec);LOG(INFO, "haved event ready, n : %d\n", n); // 如果事件就绪,但是不处理,select会一直通知我,直到我处理了!HandlerEvent();PrintDebug();// sleep(1);break;}}}void PrintDebug(){std::cout << "fd list: ";for (int i = 0; i < gnum; i++){if (fd_events[i].fd == gdefaultfd)continue;std::cout << fd_events[i].fd << " ";}std::cout << "\n";}~PollServer() {}private:uint16_t _port;std::unique_ptr<Socket> _listensock;// 1. select要正常工作,需要借助一个辅助数组,来保存所有合法fdstruct pollfd fd_events[gnum];
};

 epoll

按照 man 手册的说法:是为处理大批量句柄而作了改进的 poll

epoll 的相关系统调用

epoll_create

 size 参数是被忽略的,直接设为大于0即可。

用完之后, 必须调用 close()关闭.

返回值是一个文件描述符。

epoll_ctl

epoll 的事件注册函数.

  • 它不同于 select()是在监听事件时告诉内核要监听什么类型的事件, 而是在这里先注册要监听的事件类型.
  • 第一个参数是 epoll_create()的返回值(epoll 的句柄).
  • 第二个参数表示动作,用三个宏来表示.
  • 第三个参数是需要监听的 fd.
  • 第四个参数是告诉内核需要监听什么事

第二个参数的取值:

  • EPOLL_CTL_ADD:注册新的 fd 到 epfd 中;
  • EPOLL_CTL_MOD:修改已经注册的 fd 的监听事件;
  • EPOLL_CTL_DEL:从 epfd 中删除一个 fd;

struct epoll_event结构如下:

events 可以是以下几个宏的集合:

  • EPOLLIN : 表示对应的文件描述符可以读 (包括对端 SOCKET 正常关闭);
  • EPOLLOUT : 表示对应的文件描述符可以写;
  • EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
  • EPOLLERR : 表示对应的文件描述符发生错误;
  • EPOLLHUP : 表示对应的文件描述符被挂断;
  • EPOLLET : 将 EPOLL 设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的.
  • EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个 socket 的话, 需要再次把这个 socket 加入到 EPOLL 队列里.

epoll_wait

收集在 epoll 监控的事件中已经发送的事件

  • 参数 events 是分配好的 epoll_event 结构体数组.
  • epoll 将会把发生的事件赋值到 events 数组中 (events 不可以是空指针,内核只负责把数据复制到这个 events 数组中,不会去帮助我们在用户态中分配内存).
  • maxevents 告之内核这个 events 有多大,这个 maxevents 的值不能大于创建epoll_create()时的 size.
  • 参数 timeout 是超时时间 (毫秒,0 会立即返回,-1 是永久阻塞).
  • 如果函数调用成功,返回对应 I/O 上已准备好的文件描述符数目,如返回0 表示已超时, 返回小于 0 表示函数失败.

epoll 工作原理 

  • 当某一进程调用 epoll_create 方法时,Linux 内核会创建一个eventpoll 结构体,这个结构体中有两个成员与 epoll 的使用方式密切相关.
  • 每一个 epoll 对象都有一个独立的 eventpoll 结构体,用于存放通过epoll_ctl 方法向 epoll 对象中添加进来的事件.
  • 这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是 lgn,其中 n 为树的高度).
  • 而所有添加到 epoll 中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法.
  • 这个回调方法在内核中叫 ep_poll_callback,它会将发生的事件添加到rdlist 双链表中.
  • 在 epoll 中,对于每一个事件,都会建立一个 epitem 结构体
  • 当调用 epoll_wait 检查是否有事件发生时,只需要检查 eventpoll 对象中的rdlist 双链表中是否有 epitem 元素即可.
  • 如果 rdlist 不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户. 这个操作的时间复杂度是 O(1).

总结一下, epoll 的使用过程就是三部曲:

  • 调用 epoll_create 创建一个 epoll 句柄;
  • 调用 epoll_ctl, 将要监控的文件描述符进行注册; 
  • 调用 epoll_wait, 等待文件描述符就绪;

 代码样例

EpollServer.hpp

#pragma once#include <iostream>
#include <string>
#include <memory>
#include <sys/epoll.h>
#include "Log.hpp"
#include "Socket.hpp"using namespace socket_ns;class EpollServer
{const static int size = 128;const static int num = 128;public:EpollServer(uint16_t port) : _port(port), _listensock(std::make_unique<TcpSocket>()){_listensock->BuildListenSocket(port);_epfd = ::epoll_create(size);if (_epfd < 0){LOG(FATAL, "epoll_create error!\n");exit(1);}LOG(INFO, "epoll create success, epfd: %d\n", _epfd);}void InitServer(){// 新链接到来,我们认为是读事件就绪struct epoll_event ev;ev.events = EPOLLIN;// ev.events = EPOLLIN | EPOLLET;ev.data.fd = _listensock->Sockfd(); // 为了在事件就绪的时候,得到是那一个fd就绪了// 必须先把listensock 添加到epoll中.int n = ::epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock->Sockfd(), &ev);if (n < 0){LOG(FATAL, "epoll_ctl error!\n");exit(2);}LOG(INFO, "epoll_ctl success, add new sockfd : %d\n", _listensock->Sockfd());}std::string EventsToString(uint32_t events){std::string eventstr;if (events & EPOLLIN)eventstr = "EPOLLIN";if (events & EPOLLOUT)eventstr += "|EPOLLOUT";return eventstr;}void Accepter(){InetAddr addr;int sockfd = _listensock->Accepter(&addr); // 肯定不会被阻塞if (sockfd < 0){LOG(ERROR, "获取连接失败\n");return;}LOG(INFO, "得到一个新的连接: %d, 客户端信息: %s:%d\n", sockfd, addr.Ip().c_str(), addr.Port());// 得到了一个新的sockfd,我们能不能要进行read、recv?不能.// 等底层有数据(读事件就绪), read/recv才不会被阻塞// 底层有数据 谁最清楚呢?epoll!// 将新的sockfd添加到epoll中!怎么做呢?struct epoll_event ev;ev.data.fd = sockfd;ev.events = EPOLLIN;::epoll_ctl(_epfd, EPOLL_CTL_ADD, sockfd, &ev);LOG(INFO, "epoll_ctl success, add new sockfd : %d\n", sockfd);}void HandlerIO(int fd){char buffer[4096];// 你怎么保证buffer就是一个完整的请求?或者有多个请求??// 一个fd,都要有一个自己的缓冲区// 引入协议int n = ::recv(fd, buffer, sizeof(buffer) - 1, 0); // 会阻塞吗?不会if (n > 0){buffer[n] = 0;std::cout << buffer;std::string response = "HTTP/1.0 200 OK\r\n";std::string content = "<html><body><h1>hello bite, hello world</h1></body></html>";response += "Content-Type: text/html\r\n";response += "Content-Length: " + std::to_string(content.size()) + "\r\n";response += "\r\n";response += content;::send(fd, response.c_str(), response.size(), 0);}else if (n == 0){LOG(INFO, "client quit, close fd: %d\n", fd);// 1. 从epoll中移除, 从epoll中移除fd,这个fd必须是健康&&合法的fd. 否则会移除出错::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);// 2. 关闭fd::close(fd);}else{LOG(ERROR, "recv error, close fd: %d\n", fd);// 1. 从epoll中移除, 从epoll中移除fd,这个fd必须是健康&&合法的fd. 否则会移除出错::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);// 2. 关闭fd::close(fd);}}void HandlerEvent(int n){for (int i = 0; i < n; i++){int fd = revs[i].data.fd;uint32_t revents = revs[i].events;LOG(INFO, "%d 上面有事件就绪了,具体事件是: %s\n", fd, EventsToString(revents).c_str());if (revents & EPOLLIN){// listensock 读事件就绪, 新连接到来了if (fd == _listensock->Sockfd())Accepter();elseHandlerIO(fd);}}}void Loop(){int timeout = -1;while (true){// 事件通知,事件派发int n = ::epoll_wait(_epfd, revs, num, timeout);switch (n){case 0:LOG(INFO, "epoll time out...\n");break;case -1:LOG(ERROR, "epoll error\n");break;default:LOG(INFO, "haved event happend!, n : %d\n", n);// HandlerEvent(n);break;}}}~EpollServer(){if (_epfd >= 0)::close(_epfd);_listensock->Close();}private:uint16_t _port;std::unique_ptr<Socket> _listensock;int _epfd;struct epoll_event revs[num];
};

epoll 的优点(和 select 的缺点对应)

  • 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
  • 数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而 select/poll 都是每次循环都要进行拷贝)
  • 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中, epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度 O(1). 即使文件描述符数目很多, 效率也不会受到影响.
  • 没有数量限制: 文件描述符数目无上限. 

epoll 工作方式 

例子:
 你正在吃鸡, 眼看进入了决赛圈, 你妈饭做好了, 喊你吃饭的时候有两种方式:

  1. 如果你妈喊你一次, 你没动, 那么你妈会继续喊你第二次, 第三次...(亲妈,水平触发)
  2. 如果你妈喊你一次, 你没动, 你妈就不管你了(后妈, 边缘触发)

epoll 有 2 种工作方式-水平触发(LT)和边缘触发(ET) 

假如有这样一个例子: 

  • 我们已经把一个 tcp socket 添加到 epoll 描述符
  • 这个时候 socket 的另一端被写入了 2KB 的数据
  • 调用 epoll_wait,并且它会返回. 说明它已经准备好读取操作
  • 然后调用 read, 只读取了 1KB 的数据
  • 继续调用 epoll_wait...... 

水平触发 Level Triggered 工作模式

epoll 默认状态下就是 LT 工作模式.

  • 当 epoll 检测到 socket 上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分.
  • 如上面的例子, 由于只读了 1K 数据, 缓冲区中还剩 1K 数据, 在第二次调用epoll_wait 时, epoll_wait 仍然会立刻返回并通知 socket 读事件就绪.
  • 直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回.
  • 支持阻塞读写和非阻塞读写 

边缘触发 Edge Triggered 工作模式

如果我们在第 1 步将 socket 添加到 epoll 描述符的时候使用了EPOLLET 标志, epoll 进入 ET 工作模式. 

  • 当 epoll 检测到 socket 上事件就绪时, 必须立刻处理.
  • 如上面的例子, 虽然只读了 1K 的数据, 缓冲区还剩 1K 的数据, 在第二次调用epoll_wait 的时候, epoll_wait 不会再返回了.
  • 也就是说, ET 模式下, 文件描述符上的事件就绪后, 只有一次处理机会.
  • ET 的性能比 LT 性能更高( epoll_wait 返回的次数少了很多). Nginx 默认采用ET 模式使用 epoll.
  • 只支持非阻塞的读写

select 和 poll 其实也是工作在 LT 模式下. epoll 既可以支持 LT, 也可以支持ET。 

ET模式下,所有的fd都必须是非阻塞的(因为要求把本轮数据全部读取完毕);LT模式下,fd阻塞或者非阻塞都行。

对比 LT 和 ET 

LT 是 epoll 的默认行为.

使用 ET 能够减少 epoll 触发的次数. 但是代价就是强逼着程序猿一次响应就绪过程中就把所有的数据都处理完.

相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比LT 更高效一些. 但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理, 不让这个就绪被重复提示的话, 其实性能也是一样的.

另一方面, ET 的代码复杂程度更高了.

理解 ET 模式和非阻塞文件描述符 

 使用 ET 模式的 epoll, 需要将文件描述设置为非阻塞. 这个不是接口上的要求, 而是"工程实践" 上的要求.

假设这样的场景: 服务器接收到一个 10k 的请求, 会向客户端返回一个应答数据. 如果客户端收不到应答, 不会发送第二个 10k 请求.

如果服务端写的代码是阻塞式的 read, 并且一次只 read 1k 数据的话(read 不能保证一次就把所有的数据都读出来, 参考 man 手册的说明, 可能被信号打断), 剩下的9k 数据就会待在缓冲区中。

此时由于 epoll 是 ET 模式, 并不会认为文件描述符读就绪. epoll_wait 就不会再次返回. 剩下的 9k 数据会一直在缓冲区中. 直到下一次客户端再给服务器写数据. epoll_wait 才能返回 

但是问题来了.

  • 服务器只读到 1k 个数据, 要 10k 读完才会给客户端返回响应数据.
  • 客户端要读到服务器的响应, 才会发送下一个请求
  • 客户端发送了下一个请求, epoll_wait 才会返回, 才能去读缓冲区中剩余的数据. 

所以, 为了解决上述问题(阻塞 read 不一定能一下把完整的请求读完), 于是就可以使用非阻塞轮训的方式来读缓冲区, 保证一定能把完整的请求都读出来.

而如果是 LT 没这个问题. 只要缓冲区中的数据没读完, 就能够让epoll_wait 返回文件描述符读就绪.

 epoll 的使用场景

epoll 的高性能, 是有一定的特定场景的. 如果场景选择的不适宜, epoll 的性能可能适得其反。

对于多连接, 且多连接中只有一部分连接比较活跃时, 比较适合使用epoll. 

多路转接的write

  • 当我们获取一个新的sockfd的时候,输入和输出缓冲区默认是空的。
  • 读事件就绪:就是输入缓冲区有了数据/底层有新连接
  • 写事件就绪:不是关心数据,而是关心发送缓冲区中没有空间。如果有空间,发送条件就是就绪的。否则,发送条件不满足。
  • 把一个sockfd托管给select,poll,epoll,是因为sockfd上事件没有就绪。
  • 默认sockfd新建的情况下,读事件默认不是就绪的!因为输入缓冲区没有数据。所以,对于读事件,默认要常添加到epoll中。
  • 默认sockfd新建的情况下,写事件默认就是就绪的!因为输出缓冲区本来就有空间。所以,对于写,我们直接写。
  • 当写条件不满足的时候,我们才按需开启对sockfd的EPOLLOUT事件的关心。

所以对于写,我们直接发。当发送条件不满足,开启写事件关心。后续剩余的数据,epoll会自动给我进行发送。

Reactor 反应堆模式

部分代码

 Comm.hpp

#pragma once#include <iostream>
#include <unistd.h>
#include <fcntl.h>enum
{SOCKET_ERROR = 1,BIND_ERROR,LISTEN_ERROR,EPOLL_CREATE_ERROR
};void SetNonBlock(int fd)
{int fl = ::fcntl(fd, F_GETFL);if (fl < 0){std::cout << "fcntl error" << std::endl;return;}::fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}

Connection.hpp

#pragma once#include <iostream>
#include <string>
#include <functional>
#include "InetAddr.hpp"#define ListenConnection 0
#define NormalConnection 1class Connection;
class Reactor;
using handler_t = std::function<void(Connection *conn)>;// 未来我们的服务器,一切皆Connection,对我们来将listensockfd也是一样
class Connection
{
public:Connection(int sockfd) : _sockfd(sockfd){}void RegisterHandler(handler_t recver, handler_t sender, handler_t excepter){_handler_recver = recver;_handler_sender = sender;_handler_excepter = excepter;}void SetEvents(uint32_t events){_events = events;}uint32_t Events(){return _events;}void SetConnectionType(int type){_type = type;}int Type(){return _type;}int Sockfd(){return _sockfd;}void SetReactor(Reactor *R){_R = R;}void SetAddr(const InetAddr &addr){_addr = addr;}void AppendInbuffer(const std::string &in){_inbuffer += in;}void AppendOutbuffer(const std::string &in){_outbuffer += in;}std::string &Inbuffer(){return _inbuffer;}std::string &OutBuffer(){return _outbuffer;}void DiscardOutbuffer(int n){_outbuffer.erase(0, n);}void Close(){if(_sockfd >= 0)::close(_sockfd);}~Connection(){}private:int _sockfd;uint32_t _events;std::string _inbuffer; // 我们在这里,用string充当缓冲区std::string _outbuffer;int _type;public:handler_t _handler_recver;   // 处理读取handler_t _handler_sender;   // 处理写入handler_t _handler_excepter; // 处理异常Reactor *_R; // 回指向自己所属的ReactorInetAddr _addr;
};

Epoller.hpp

#pragma once
#include <iostream>
#include <stdlib.h>
#include <sys/epoll.h>
#include "Log.hpp"
#include "Comm.hpp"static const int gsize = 128;using namespace log_ns;class Multiplex
{
public:virtual bool AddEvent(int fd, uint32_t events) = 0;virtual int Wait(struct epoll_event revs[], int num, int timeout) = 0;virtual bool ModEvent(int fd, uint32_t events) = 0;virtual bool DelEvent(int fd) = 0;
};class Epoller : public Multiplex
{
private:bool ModEventHelper(int fd, uint32_t events, int oper){struct epoll_event ev;ev.events = events;ev.data.fd = fd; // 特别重要,通过这个字段,向上触发int n = ::epoll_ctl(_epfd, oper, fd, &ev);if (n < 0){LOG(ERROR, "epoll_ctl %d events is %s failed\n", fd, EventsToString(events).c_str());return false;}LOG(INFO, "epoll_ctl %d events is %s success\n", fd, EventsToString(events).c_str());return true;}public:Epoller(){_epfd = ::epoll_create(gsize);if (_epfd < 0){LOG(FATAL, "epoll_create error\n");exit(EPOLL_CREATE_ERROR);}LOG(INFO, "epoll_create success, epfd: %d\n", _epfd);}std::string EventsToString(uint32_t events){std::string eventstr;if (events & EPOLLIN)eventstr = "EPOLLIN";if (events & EPOLLOUT)eventstr += "|EPOLLOUT";if (events & EPOLLET)eventstr += "|EPOLLET";return eventstr;}bool AddEvent(int fd, uint32_t events) override{return ModEventHelper(fd, events, EPOLL_CTL_ADD);}bool ModEvent(int fd, uint32_t events) override{return ModEventHelper(fd, events, EPOLL_CTL_MOD);}bool DelEvent(int fd) override{return 0 == ::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);}int Wait(struct epoll_event revs[], int num, int timeout){return ::epoll_wait(_epfd, revs, num, timeout);}~Epoller(){}private:int _epfd;
};// class Poller : public Multiplex
// {// };// class Selector: public Multiplex
// {// };

 HandlerConnection.hpp

#pragma once#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include "Log.hpp"
#include "Connection.hpp"using namespace log_ns;
const static int buffersize = 512;// 不应该让HandlerConnection来处理报文
class HandlerConnection
{
public:HandlerConnection(handler_t process) : _process(process){}void HandlerRecver(Connection *conn) // conn就是就绪的conn{errno = 0;// 1. 直接读取// LOG(DEBUG, "client 给我发了消息: %d\n", conn->Sockfd());while (true){char buffer[buffersize];ssize_t n = ::recv(conn->Sockfd(), buffer, sizeof(buffer) - 1, 0);if (n > 0){buffer[n] = 0; // 数据块conn->AppendInbuffer(buffer);}else{if (errno == EWOULDBLOCK){break;}else if (errno == EINTR){continue;}else{conn->_handler_excepter(conn); // 统一执行异常处理return;}}}// 2. 交给业务处理// 一定是读取完毕了,我们应该处理数据了std::cout << "%d Inbuffer 内容: " << conn->Inbuffer() << std::endl;// 未来不让他做处理, 我让他把报文解析,处理的方法,作为任务,交给线程池!// std::bind()_process(conn); // 内容分析}void HandlerSender(Connection *conn){errno = 0;// 1. 直接写while (true){ssize_t n = ::send(conn->Sockfd(), conn->OutBuffer().c_str(), conn->OutBuffer().size(), 0);if (n > 0){conn->DiscardOutbuffer(n);if (conn->OutBuffer().empty())break;}else if (n == 0){break;}else{if (errno == EWOULDBLOCK){// 发送条件不满足break;}else if (errno == EINTR){continue;}else{conn->_handler_excepter(conn);return;}}}// 2. 只能是发送条件不满足 && 缓冲区还有数据if (!conn->OutBuffer().empty()){// 开启对写事件关心.conn->_R->EnableConnectionReadWrite(conn->Sockfd(), true, true);// 发送完了呢?}else{conn->_R->EnableConnectionReadWrite(conn->Sockfd(), true, false);}}void HanlderExcepter(Connection *conn){// 整个代码的所有的逻辑异常处理,全部都在这里// 删除连接conn->_R->DelConnection(conn->Sockfd());}private:handler_t _process;
};

 Listener.hpp

#pragma once
#include <iostream>
#include <memory>
#include "Socket.hpp"
#include "Connection.hpp"// 该类未来统一进行listensock的管理工作,获取新连接using namespace socket_ns;class Listener
{
public:Listener(uint16_t port) : _port(port), _listensock(std::make_unique<TcpSocket>()){_listensock->BuildListenSocket(_port);}int ListenSockfd(){return _listensock->Sockfd();}void Accepter(Connection *conn){while (true){errno = 0;InetAddr addr;int code = 0;int sockfd = _listensock->Accepter(&addr, &code);if (sockfd > 0){// 该socket获取成功了,我们该怎么办??LOG(INFO, "获取连接成功, 客户端: %s:%d, sockfd: %d\n", addr.Ip().c_str(), addr.Port(), sockfd);conn->_R->AddConnection(sockfd, EPOLLIN|EPOLLET, addr, NormalConnection);}else{if (code == EWOULDBLOCK){LOG(INFO, "底层连接全部获取完毕\n");break;}else if (code == EINTR){continue;}else{LOG(ERROR, "获取连接失败.!\n");break;}}}}~Listener(){}private:uint16_t _port;std::unique_ptr<Socket> _listensock;
};

 Main.cc

#include "Log.hpp"
#include "Reactor.hpp"
#include "Listener.hpp"
#include "HandlerConnection.hpp"
#include "PackageParse.hpp"#include <iostream>
#include <memory>using namespace log_ns;// ./tcpserver 8888
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " local-port" << std::endl;exit(0);}uint16_t port = std::stoi(argv[1]);EnableScreen();InetAddr localaddr("0.0.0.0", port);PackageParse parse;// 专门用来处理新连接到来的模块Listener listener(port); // 连接管理器// 专门用来处理普通sockfd的模块HandlerConnection handlers(std::bind(&PackageParse::Excute, &parse, std::placeholders::_1)); // IO处理器// 主模块,事件派发std::unique_ptr<Reactor> R = std::make_unique<Reactor>(); // 事件派发器// 模块之间产生关联R->SetOnConnect(std::bind(&Listener::Accepter, &listener, std::placeholders::_1));R->SetOnNormalHandler(std::bind(&HandlerConnection::HandlerRecver, &handlers, std::placeholders::_1),std::bind(&HandlerConnection::HandlerSender, &handlers, std::placeholders::_1),std::bind(&HandlerConnection::HanlderExcepter, &handlers, std::placeholders::_1));R->AddConnection(listener.ListenSockfd(), EPOLLIN|EPOLLET, localaddr, ListenConnection);R->Dispatcher();return 0;
}

 PackageParse.hpp

#pragma once
#include <iostream>
#include <functional>
#include "InetAddr.hpp"
#include "Log.hpp"
#include "Protocol.hpp"
#include "Connection.hpp"
#include "NetCal.hpp"using namespace log_ns;
class PackageParse
{
public:void Excute(Connection *conn){while (true){// 我们能保证我们读到的是一个完整的报文吗?不能!// 2. 报文解析,提取报头和有效载荷std::string package = Decode(conn->Inbuffer());if (package.empty())break;// 我们能保证我们读到的是一个完整的报文吗?能!!auto req = Factory::BuildRequestDefault();std::cout << "package: \n"<< package << std::endl;// 3. 反序列化req->Deserialize(package);// 4. 业务处理// auto resp = _process(req); // 通过请求,得到应答auto resp = cal.Calculator(req);// 5. 序列化应答std::string respjson;resp->Serialize(&respjson);std::cout << "respjson: \n"<< respjson << std::endl;// 6. 添加len长度报头respjson = Encode(respjson);std::cout << "respjson add header done: \n"<< respjson << std::endl;// 7. 发回conn->AppendOutbuffer(respjson);}// 我们已经至少处理了一个请求,同时至少会有一个应答// if(!conn->OutBuffer().empty()) // conn->_handler_sender(conn); // 方法1:直接发送数据if (!conn->OutBuffer().empty())conn->_R->EnableConnectionReadWrite(conn->Sockfd(), true, true); // 方法2:我只要进行激活对写事件的关心即可}private:NetCal cal;
};

 Reactor.hpp

#pragma once
#include <iostream>
#include <memory>
#include <string>
#include <unordered_map>#include "Connection.hpp"
#include "Epoller.hpp"// 暂时叫做TcpServer->rename
// Reactor类似一个connection的容器,核心工作就是
// 1. 管理connection和对应的内核事件
// 2. 事件派发
class Reactor
{static const int gnum = 64;public:Reactor() : _epoller(std::make_unique<Epoller>()), _isrunning(false){}void AddConnection(int fd, uint32_t events, const InetAddr &addr, int type){// 1. 构建一个connectionConnection *conn = new Connection(fd);conn->SetEvents(events);conn->SetConnectionType(type);conn->SetAddr(addr);conn->SetReactor(this); // 将当前对象设置进入所有的conn对象中// 设置对connection的上层处理,即,如果该connection就绪被激活,我们应该如何处理它?if (conn->Type() == ListenConnection){conn->RegisterHandler(_OnConnect, nullptr, nullptr);}else{conn->RegisterHandler(_OnRecver, _OnSender, _OnExcepter);}// 2. fd和events写透到内核中,托管给epollif (!_epoller->AddEvent(conn->Sockfd(), conn->Events()))return;// 3. 托管给_connections_connections.insert(std::make_pair(fd, conn));}void EnableConnectionReadWrite(int sockfd, bool readable, bool writeable){if (!IsConnectionExists(sockfd)){return;}uint32_t events = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);_connections[sockfd]->SetEvents(events);// 写透到内核中!_epoller->ModEvent(_connections[sockfd]->Sockfd(), _connections[sockfd]->Events());}void DelConnection(int sockfd){// 0. 安全检测if (!IsConnectionExists(sockfd)){return;}LOG(INFO, "sockfd %d quit, 服务器释放所有资源\n", sockfd);// 1. 在内核中移除对sockfd的关心EnableConnectionReadWrite(sockfd, false, false);_epoller->DelEvent(sockfd);// 2. sockfd 关闭_connections[sockfd]->Close();// 3. 在Reactor中移除对Connection的关心delete _connections[sockfd];_connections.erase(sockfd);}void LoopOnce(int timeout){int n = _epoller->Wait(revs, gnum, timeout);for (int i = 0; i < n; i++){int sockfd = revs[i].data.fd;uint32_t revents = revs[i].events;if (revents & EPOLLERR)revents |= (EPOLLIN | EPOLLOUT);if (revents & EPOLLHUP)revents |= (EPOLLIN | EPOLLOUT);if (revents & EPOLLIN){if (IsConnectionExists(sockfd) && _connections[sockfd]->_handler_recver)_connections[sockfd]->_handler_recver(_connections[sockfd]); // 读事件就绪,派发给对应的conn}if (revents & EPOLLOUT){if (IsConnectionExists(sockfd) && _connections[sockfd]->_handler_sender)_connections[sockfd]->_handler_sender(_connections[sockfd]); // 写事件就绪,派发给对应的conn}}}void Dispatcher() // 事件派发{int timeout = -1;_isrunning = true;while (_isrunning){LoopOnce(timeout); // 默认阻塞等待的.// 做做其他事情// 就可以获取新的fd,并添加了!PrintDebug();}_isrunning = false;}bool IsConnectionExists(int sockfd){return _connections.find(sockfd) != _connections.end();}void SetOnConnect(handler_t OnConnect){_OnConnect = OnConnect;}void SetOnNormalHandler(handler_t recver, handler_t sender, handler_t excepter){_OnRecver = recver;_OnSender = sender;_OnExcepter = excepter;}void PrintDebug(){std::string fdlist;for (auto &conn : _connections){fdlist += std::to_string(conn.second->Sockfd()) + " ";}LOG(DEBUG, "epoll管理的fd列表: %s\n", fdlist.c_str());}~Reactor(){}private:// key: sockfd// value: Connetion*std::unordered_map<int, Connection *> _connections; // Eventstd::unique_ptr<Multiplex> _epoller;bool _isrunning;struct epoll_event revs[gnum];// Reactor中添加处理socket的方法集合// 1. 处理新连接handler_t _OnConnect;// 2. 处理普通的sockfd,主要是IO处理handler_t _OnRecver;handler_t _OnSender;handler_t _OnExcepter;
};

http://www.dtcms.com/a/279466.html

相关文章:

  • vue3 JavaScript 获取 el-table 单元格 赋红色外框
  • mac上用datagrip连接es
  • MFC/C++语言怎么比较CString类型最后一个字符
  • K8S的平台核心架构思想[面向抽象编程]
  • LVS(Linux Virtual Server)集群技术详解
  • linux 内核: 访问当前进程的 task_struct
  • 【NLP舆情分析】基于python微博舆情分析可视化系统(flask+pandas+echarts) 视频教程 - 架构搭建
  • C++-linux 6.makefile和cmake
  • 深入掌握Performance面板与LCP/FCP指标优化指南
  • 学习笔记——农作物遥感识别与大范围农作物类别制图的若干关键问题
  • 计算两个经纬度之间的距离(JavaScript 实现)
  • HashMap的长度为什么要是2的n次幂以及HashMap的继承关系(元码解析)
  • 前缀和题目:使数组互补的最少操作次数
  • 闲庭信步使用图像验证平台加速FPGA的开发:第十四课——图像二值化的FPGA实现
  • 如何集成光栅传感器到FPGA+ARM系统中?
  • JVM 内存模型详解:GC 是如何拯救内存世界的?
  • Oracle Virtualbox 虚拟机配置静态IP
  • 《亿级流量系统架构设计与实战》通用高并发架构设计 读场景
  • 1. 深入理解ArrayList源码
  • ae如何安装在非C盘
  • 7.15 窗口函数 | 二分 | 位运算
  • 逻辑代数中的基本规则,代入规则和反演规则,对偶规则
  • LLM notes
  • GitCode 使用高频问题及解决方案
  • TextIn:大学生的文档全能助手,让学习效率飙升
  • 【Linux庖丁解牛】— 信号的产生!
  • SwiftUI 常用控件分类与使用指南
  • SCI特刊征稿
  • 延迟双删懂不
  • .net swagger的API项目里面 同时可以运行wwwroot里面的网页