当前位置: 首页 > news >正文

【高并发服务器】八、Poller描述符监控类实现

文章目录

  • Ⅰ. Poller描述符监控类实现
    • 1、成员变量设计
    • 2、成员接口设计
    • 3、完整代码实现
  • Ⅱ. 与`Channel`类的联调
    • 1、Channel类的部分完善
    • 2、服务端测试代码
    • 3、客户端测试代码

在这里插入图片描述

Ⅰ. Poller描述符监控类实现

​ 该模块是 epoll 的操作进行封装的一个模块,主要实现 epollIO 事件添加、修改、移除、获取活跃连接的功能。

​ 而该模块的这些功能其实和 Channel 模块是相关联的,因为 Channel 模块管理的就是文件描述符的事件,又因为后面的 EventLoop 模块其实就封装了 Poller 模块,对所有的监控事件进行管理,所以 Channel 模块管理事件仅仅是对事件进行状态监控,而没有对事件的实际管理操作,真正的操作其实都在 EventLoop 中封装的 Poller 模块中,所以 Channel 模块就必须和 Poller 模块联系起来!

在这里插入图片描述

1、成员变量设计

​ 首先 Poller 模块就是对 epoll 模型的操作,所以肯定**需要一个 epoll 模型的描述符 _epollfd。然后因为该模块也要监控当前有哪些事件是活跃的,所以就需要一个 struct epoll_event 结构体类型的数组**,用来管理活跃事件!

​ 此外,因为在该模块管理的事件需要关注什么,都是由 Channel 对象来决定的,并且每个 Channel 对象就对应一个文件描述符,所以我们就要建立起两者的关系,所以就需要一个哈希表来存放管理的文件描述符和对应的事件管理对象 Channel 的关系,即**需要一个 unordered_map<int, Channel*>**。

​ 该模块对文件描述符进行监控,而通过文件描述符对应的 Channel 对象才能知道其需要监控什么事件、触发后回调什么事件!

2、成员接口设计

接口其实也很简单,就是如下几个:

  1. 添加或修改监控事件接口(根据操作类型调用下面的辅助接口完成操作,所以这只是一个封装接口!)
  2. 移除监控接口
  3. 启动监控接口,并且通过输出型参数返回活跃连接

此时就需要两个辅助接口:

  1. epoll 的直接操作接口,由上面的接口调用
  2. 判断一个事件管理对象 Channel 是否已经添加了事件监控
const static int MAX_EPOLL_EVENTS = 1024;class Poller
{
private:int _epollfd;                                struct epoll_event _events[MAX_EPOLL_EVENTS]; // 存放活跃连接的数组std::unordered_map<int, Channel*> _channels;  // 存放文件描述符与其对应的事件管理对象的哈希表
public:Poller() {}~Poller() {}// 添加或修改监控事件(只是一个封装)void update_event(Channel* channel);// 移除监控void remove_event(Channel* channel);// 启动监控,返回活跃连接void start_event(std::vector<Channel*>* active);
private:// 对epoll的实际操作接口void control(Channel* channel, int option);// 判断一个事件管理对象Channel是否已经添加了事件监控bool has_channel(Channel* channel);
};

3、完整代码实现

const static int MAX_EPOLL_EVENTS = 1024;class Poller
{
private:int _epollfd;                                struct epoll_event _events[MAX_EPOLL_EVENTS]; // 存放活跃连接的数组std::unordered_map<int, Channel*> _channels;  // 存放文件描述符与其对应的事件管理对象的哈希表
public:Poller() {_epollfd = epoll_create(MAX_EPOLL_EVENTS);if(_epollfd == -1){ELOG("epoll create error!!");abort();}DLOG("epoll create succuess, epollfd is: %d", _epollfd);}~Poller() {if(_epollfd != -1)close(_epollfd);}// 添加或修改监控事件(只是一个封装)void update_event(Channel* channel){// 如果事件存在的话则属于修改事件,不存在的话属于添加事件bool ret = has_channel(channel);if(ret == false){_channels[channel->get_fd()] = channel;control(channel, EPOLL_CTL_ADD); // 交给辅助函数去完成}elsecontrol(channel, EPOLL_CTL_MOD);}// 移除监控void remove_event(Channel* channel){// 移除监控包括两个步骤://   1. 从epoll模型中移除control(channel, EPOLL_CTL_DEL);//   2. 从哈希表中去除关系auto it = _channels.find(channel->get_fd());if(it != _channels.end())_channels.erase(it);}// 启动监控,返回活跃连接void start_event(std::vector<Channel*>* active){// 等待事件就绪,我们这里设为阻塞式等待int n = epoll_wait(_epollfd, _events, MAX_EPOLL_EVENTS, -1);  if(n <= 0){if(errno == EINTR) // 如果是被中断则不代表等待失败return;ELOG("epoll_wait error:%s\n", strerror(errno));abort();}// 将获取到的事件设置到对应的Channel对象中,并且尾插到active中for(int i = 0; i < n; ++i){auto it = _channels.find(_events[i].data.fd);assert(it != _channels.end()); // 认为是一定能找到的,找不到说明是程序问题,直接退出it->second->set_revents(_events[i].events); // 设置实际就绪的事件active->push_back(it->second);}}
private:// 对epoll的实际操作接口void control(Channel* channel, int option){struct epoll_event ev;ev.data.fd = channel->get_fd();ev.events = channel->get_events();int ret = epoll_ctl(_epollfd, option, channel->get_fd(), &ev);if(ret == -1)ELOG("epoll_ctl error!!");}// 判断一个事件管理对象Channel是否已经添加了事件监控bool has_channel(Channel* channel){auto it = _channels.find(channel->get_fd());if(it == _channels.end())return false;return true;}
};

Ⅱ. 与Channel类的联调

1、Channel类的部分完善

​ 虽说 Channel 模块是和 EventLoop 模块有关联的,但是因为 EventLoop 模块内就包含了 Poller 模块,所以我们现在实现了 Poller 模块之后就可以直接用它来和 Channel 模块联调了,等我们后面实现了 EventLoop 之后就不需要说去再联调一次了,到时候只需要改一下变量即可!

​ 因为之前 Channel 模块我们是没完成的,因为涉及到了当前的事件管理操作,所以现在我们就能来完善 Channel 模块了,如下所示:(下面代码中只需要关注注释部分,也就是新增和修改部分

class Poller; // 需要搞个前置声明using eventcallback_t  = std::function<void()>; 
class Channel
{
private:int _fd;           uint32_t _events; uint32_t _revents; eventcallback_t _read_callback;       eventcallback_t _write_callback;      eventcallback_t _error_callback;      eventcallback_t _close_callback;      eventcallback_t _arbitrary_callback; Poller* _poller; // 描述符监控类对象的指针
public:Channel(int fd, Poller* poller) : _fd(fd), _events(0), _revents(0), _poller(poller){}~Channel() { close(_fd); }int get_fd() { return _fd; }                               uint32_t get_events() { return _events; }                 void set_revents(uint32_t revents) { _revents = revents; } void set_read_callback(const eventcallback_t& cb) { _read_callback = cb; }void set_write_callback(const eventcallback_t& cb) { _write_callback = cb; }void set_error_callback(const eventcallback_t& cb) { _error_callback = cb; }void set_close_callback(const eventcallback_t& cb) { _close_callback = cb; }void set_arbitrary_callback(const eventcallback_t& cb) { _arbitrary_callback = cb; }bool _is_read_able()  { return (_events & EPOLLIN); }   bool _is_write_able() { return (_events & EPOLLOUT); } void _handler(); { // 实现省略,具体参考前面笔记 }// 清除所有的回调函数void clear_callback() { _read_callback = _write_callback = _error_callback = _close_callback = _arbitrary_callback = nullptr; }// 启动读事件监控void _enable_read() {_events |= EPOLLIN; update(); // 实际更新涉及到epoll操作,所以交给Poller来实现}// 启动写事件监控void _enable_write() {_events |= EPOLLOUT; update(); // 实际更新涉及到epoll操作,所以交给Poller来实现}// 关闭读事件监控void _disable_read(){_events &= (~EPOLLIN);update(); // 实际更新涉及到epoll操作,所以交给Poller来实现}// 关闭写事件监控void _disable_write(){_events &= (~EPOLLOUT);update(); // 实际更新涉及到epoll操作,所以交给Poller来实现}// 关闭所有事件监控void _disable_all(){_events = 0;update(); // 实际更新涉及到epoll操作,所以交给Poller来实现} 
public:// 添加或者修改事件监控void update();// 移除事件监控void remove();
};const static int MAX_EPOLL_EVENTS = 1024;
class Poller
{// 这里省略Poller的实现,只是为了体现出其位置
};// 这两个函数因为涉及到了Poller的成员接口,在上面虽然前置声明了Poller类,但并不是定义,所以需要放到这里来实现!
void Channel::update() { _poller->update_event(this); }
void Channel::remove() { _poller->remove_event(this); }

2、服务端测试代码

主函数逻辑就是这样子:

  1. 创建服务器套接字。
  2. 创建一个 poller 对象。
  3. 创建一个用于监听套接字的 Channel 对象,然后利用 bind 函数设置可读回调函数,并且启动可读监控。
  4. 开始监听事件,然后获取就绪事件之后进行函数回调。

​ 然后全局中给出了 Channel 对象要回调的函数,如可读、可写等等事件的回调函数,它们都是由 Acceptor 函数设置的,Acceptor 函数是专门给监听套接字管理对象 listen_channel 使用的,用来监听新链接,然后给新链接设置我们上面给出的回调函数,最后启动这些新链接的可读事件开始监听!

#include "../source/server.hpp"void CloseEvent(Channel* channel)
{std::cout << "close: " << channel->get_fd() << std::endl;channel->remove(); // 移除监控channel->clear_callback();delete channel;
}void ReadEvent(Channel* channel)
{// 这里读事件处理,我们就做简单的打印、启动可写事件监控即可int fd = channel->get_fd();char buffer[1024] = { 0 };int n = recv(fd, buffer, sizeof(buffer) - 1, 0);if(n > 0){buffer[n] = 0;std::cout << buffer << std::endl;// 接收到数据之后,启动可写事件监控channel->enable_write();}else{CloseEvent(channel); // 其实不应该释放,但是因为当前只是测试,所以需要关闭}
}void WriteEvent(Channel* channel)
{// 这里做个简单的发送即可int fd = channel->get_fd();const char* data = "lirendada 你好呀!";int n = send(fd, data, strlen(data), 0);if(n < 0){return CloseEvent(channel); // 错误的话释放该对象}channel->disable_write(); // 然后关闭可写事件监控
}void ErrorEvent(Channel* channel)
{CloseEvent(channel); // 错误的话释放该对象
}void ArbitraryEvent(Channel* channel)
{std::cout << "有一个事件发生,fd:" << channel->get_fd() << std::endl;
}void Acceptor(Channel* listen_channel, Poller* poller)
{// 获取新链接int newfd = accept(listen_channel->get_fd(), nullptr, nullptr);if(newfd < 0){ELOG("accept error, the error is: ", strerror(errno));return;}// 设置新链接的回调函数Channel* channel = new Channel(newfd, poller);channel->set_read_callback(std::bind(ReadEvent, channel));channel->set_write_callback(std::bind(WriteEvent, channel));channel->set_close_callback(std::bind(CloseEvent, channel));channel->set_error_callback(std::bind(ErrorEvent, channel));channel->set_arbitrary_callback(std::bind(ArbitraryEvent, channel));// 启动新链接的可读事件监控channel->enable_read();
}int main()
{// 创建服务器套接字Socket server;server.create_server(8080);// 创建一个poller对象Poller poller;// 创建一个用于监听套接字的Channel对象,然后利用bind函数设置可读回调函数,并且启动可读监控Channel listen_channel(server.get_fd(), &poller);listen_channel.set_read_callback(std::bind(Acceptor, &listen_channel, &poller));listen_channel.enable_read();while(true){// 开始监听事件,然后获取事件之后进行函数回调std::vector<Channel*> actives;poller.start_event(&actives);for(int i = 0; i < actives.size(); ++i){actives[i]->handler();}}server.Close();return 0;
}

3、客户端测试代码

​ 就是一个简单的发送消息和回响消息的程序!

#include "../source/server.hpp"int main()
{// 创建客户端套接字Socket client_sock;client_sock.create_client(8080, "127.0.0.1");while(true){// 做一个简单的发送和回响std::string str;getline(std::cin, str);client_sock.Send(str.c_str(), str.size());char buf[1024] = { 0 };client_sock.Recv(buf, sizeof(buf) - 1);DLOG("%s", buf);sleep(1);}client_sock.Close();return 0;
}

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

在这里插入图片描述

http://www.dtcms.com/a/511425.html

相关文章:

  • 用vs2013网站开发四川最好的网络优化公司
  • 如何开发一个 IDEA 插件通过 Ollama 调用大模型为方法生成仙侠风格的注释
  • 【论文精读】Latent-Shift:基于时间偏移模块的高效文本生成视频技术
  • unity基础学习笔记<上>
  • C# WPF Dragablz使用记录 TabControl选项卡可拖拽为单独界面或停靠
  • 机器人场景落地步入技术验证阶段,微美全息加快创新势能探索AI多元路径变革
  • YOLOv4 核心内容笔记
  • 网站开发工程师待遇家庭网站建设
  • 医疗门户网站模板wordpress3.8
  • iOS的多线程下数据安全和内存泄漏以及工具使用监测内存泄漏
  • 『CMake』关于使用CMake构建项目时的现代/传统指令
  • 请被人做网站怎么做倒计时网站
  • App开发框架调研对比
  • Ubuntu下载以及安装详解以及应用安装
  • 亚马逊云代理:AWS的EC2, S3, RDS,Lambda具体简介
  • 2640. QYQ在艾泽拉斯
  • 基于 React + TypeScript + Fabric.js 构建一个封面生成器网站
  • 营销型电子商务网站品牌建设与推广思路
  • 更新网站 seo公司的管理方式与管理方法
  • BZV49-C22,115稳压二极管 NXP安世半导体 工业电源芯片 芯片解析
  • 职场发展—如何避雷垃圾公司
  • 【Linux篇】软链接vs硬链接:Linux文件系统中的两种引用机制
  • C++ list核心接口与实战技巧
  • 微服务框架
  • 网站模块结构图wordpress调用栏目名称
  • 算法学习记录03——二叉树学习笔记:从两道题看透后序位置的关键作用​
  • Rust高性能分布式任务调度系统开发实践:从设计到性能优化
  • go tools安装
  • 阿里云代理商:如何给阿里云配置网络ACL?
  • 阿里巴巴 Java 开发手册解读:DO、DTO、BO、AO、VO、Query 的区别与用法