【高并发服务器】八、Poller描述符监控类实现
文章目录
- Ⅰ. Poller描述符监控类实现
- 1、成员变量设计
- 2、成员接口设计
- 3、完整代码实现
- Ⅱ. 与`Channel`类的联调
- 1、Channel类的部分完善
- 2、服务端测试代码
- 3、客户端测试代码

Ⅰ. Poller描述符监控类实现
该模块是 对 epoll
的操作进行封装的一个模块,主要实现 epoll
的 IO
事件添加、修改、移除、获取活跃连接的功能。
而该模块的这些功能其实和 Channel
模块是相关联的,因为 Channel
模块管理的就是文件描述符的事件,又因为后面的 EventLoop
模块其实就封装了 Poller
模块,对所有的监控事件进行管理,所以 Channel
模块管理事件仅仅是对事件进行状态监控,而没有对事件的实际管理操作,真正的操作其实都在 EventLoop
中封装的 Poller
模块中,所以 Channel
模块就必须和 Poller
模块联系起来!
1、成员变量设计
首先 Poller
模块就是对 epoll
模型的操作,所以肯定**需要一个 epoll
模型的描述符 _epollfd
。然后因为该模块也要监控当前有哪些事件是活跃的,所以就需要一个 struct epoll_event
结构体类型的数组**,用来管理活跃事件!
此外,因为在该模块管理的事件需要关注什么,都是由 Channel
对象来决定的,并且每个 Channel
对象就对应一个文件描述符,所以我们就要建立起两者的关系,所以就需要一个哈希表来存放管理的文件描述符和对应的事件管理对象 Channel
的关系,即**需要一个 unordered_map<int, Channel*>
**。
该模块对文件描述符进行监控,而通过文件描述符对应的 Channel
对象才能知道其需要监控什么事件、触发后回调什么事件!
2、成员接口设计
接口其实也很简单,就是如下几个:
- 添加或修改监控事件接口(根据操作类型调用下面的辅助接口完成操作,所以这只是一个封装接口!)
- 移除监控接口
- 启动监控接口,并且通过输出型参数返回活跃连接
此时就需要两个辅助接口:
- 对
epoll
的直接操作接口,由上面的接口调用 - 判断一个事件管理对象
Channel
是否已经添加了事件监控
const static int MAX_EPOLL_EVENTS = 1024;class Poller
{
private:int _epollfd; struct epoll_event _events[MAX_EPOLL_EVENTS]; // 存放活跃连接的数组std::unordered_map<int, Channel*> _channels; // 存放文件描述符与其对应的事件管理对象的哈希表
public:Poller() {}~Poller() {}// 添加或修改监控事件(只是一个封装)void update_event(Channel* channel);// 移除监控void remove_event(Channel* channel);// 启动监控,返回活跃连接void start_event(std::vector<Channel*>* active);
private:// 对epoll的实际操作接口void control(Channel* channel, int option);// 判断一个事件管理对象Channel是否已经添加了事件监控bool has_channel(Channel* channel);
};
3、完整代码实现
const static int MAX_EPOLL_EVENTS = 1024;class Poller
{
private:int _epollfd; struct epoll_event _events[MAX_EPOLL_EVENTS]; // 存放活跃连接的数组std::unordered_map<int, Channel*> _channels; // 存放文件描述符与其对应的事件管理对象的哈希表
public:Poller() {_epollfd = epoll_create(MAX_EPOLL_EVENTS);if(_epollfd == -1){ELOG("epoll create error!!");abort();}DLOG("epoll create succuess, epollfd is: %d", _epollfd);}~Poller() {if(_epollfd != -1)close(_epollfd);}// 添加或修改监控事件(只是一个封装)void update_event(Channel* channel){// 如果事件存在的话则属于修改事件,不存在的话属于添加事件bool ret = has_channel(channel);if(ret == false){_channels[channel->get_fd()] = channel;control(channel, EPOLL_CTL_ADD); // 交给辅助函数去完成}elsecontrol(channel, EPOLL_CTL_MOD);}// 移除监控void remove_event(Channel* channel){// 移除监控包括两个步骤:// 1. 从epoll模型中移除control(channel, EPOLL_CTL_DEL);// 2. 从哈希表中去除关系auto it = _channels.find(channel->get_fd());if(it != _channels.end())_channels.erase(it);}// 启动监控,返回活跃连接void start_event(std::vector<Channel*>* active){// 等待事件就绪,我们这里设为阻塞式等待int n = epoll_wait(_epollfd, _events, MAX_EPOLL_EVENTS, -1); if(n <= 0){if(errno == EINTR) // 如果是被中断则不代表等待失败return;ELOG("epoll_wait error:%s\n", strerror(errno));abort();}// 将获取到的事件设置到对应的Channel对象中,并且尾插到active中for(int i = 0; i < n; ++i){auto it = _channels.find(_events[i].data.fd);assert(it != _channels.end()); // 认为是一定能找到的,找不到说明是程序问题,直接退出it->second->set_revents(_events[i].events); // 设置实际就绪的事件active->push_back(it->second);}}
private:// 对epoll的实际操作接口void control(Channel* channel, int option){struct epoll_event ev;ev.data.fd = channel->get_fd();ev.events = channel->get_events();int ret = epoll_ctl(_epollfd, option, channel->get_fd(), &ev);if(ret == -1)ELOG("epoll_ctl error!!");}// 判断一个事件管理对象Channel是否已经添加了事件监控bool has_channel(Channel* channel){auto it = _channels.find(channel->get_fd());if(it == _channels.end())return false;return true;}
};
Ⅱ. 与Channel
类的联调
1、Channel类的部分完善
虽说 Channel
模块是和 EventLoop
模块有关联的,但是因为 EventLoop
模块内就包含了 Poller
模块,所以我们现在实现了 Poller
模块之后就可以直接用它来和 Channel
模块联调了,等我们后面实现了 EventLoop
之后就不需要说去再联调一次了,到时候只需要改一下变量即可!
因为之前 Channel
模块我们是没完成的,因为涉及到了当前的事件管理操作,所以现在我们就能来完善 Channel
模块了,如下所示:(下面代码中只需要关注注释部分,也就是新增和修改部分)
class Poller; // 需要搞个前置声明using eventcallback_t = std::function<void()>;
class Channel
{
private:int _fd; uint32_t _events; uint32_t _revents; eventcallback_t _read_callback; eventcallback_t _write_callback; eventcallback_t _error_callback; eventcallback_t _close_callback; eventcallback_t _arbitrary_callback; Poller* _poller; // 描述符监控类对象的指针
public:Channel(int fd, Poller* poller) : _fd(fd), _events(0), _revents(0), _poller(poller){}~Channel() { close(_fd); }int get_fd() { return _fd; } uint32_t get_events() { return _events; } void set_revents(uint32_t revents) { _revents = revents; } void set_read_callback(const eventcallback_t& cb) { _read_callback = cb; }void set_write_callback(const eventcallback_t& cb) { _write_callback = cb; }void set_error_callback(const eventcallback_t& cb) { _error_callback = cb; }void set_close_callback(const eventcallback_t& cb) { _close_callback = cb; }void set_arbitrary_callback(const eventcallback_t& cb) { _arbitrary_callback = cb; }bool _is_read_able() { return (_events & EPOLLIN); } bool _is_write_able() { return (_events & EPOLLOUT); } void _handler(); { // 实现省略,具体参考前面笔记 }// 清除所有的回调函数void clear_callback() { _read_callback = _write_callback = _error_callback = _close_callback = _arbitrary_callback = nullptr; }// 启动读事件监控void _enable_read() {_events |= EPOLLIN; update(); // 实际更新涉及到epoll操作,所以交给Poller来实现}// 启动写事件监控void _enable_write() {_events |= EPOLLOUT; update(); // 实际更新涉及到epoll操作,所以交给Poller来实现}// 关闭读事件监控void _disable_read(){_events &= (~EPOLLIN);update(); // 实际更新涉及到epoll操作,所以交给Poller来实现}// 关闭写事件监控void _disable_write(){_events &= (~EPOLLOUT);update(); // 实际更新涉及到epoll操作,所以交给Poller来实现}// 关闭所有事件监控void _disable_all(){_events = 0;update(); // 实际更新涉及到epoll操作,所以交给Poller来实现}
public:// 添加或者修改事件监控void update();// 移除事件监控void remove();
};const static int MAX_EPOLL_EVENTS = 1024;
class Poller
{// 这里省略Poller的实现,只是为了体现出其位置
};// 这两个函数因为涉及到了Poller的成员接口,在上面虽然前置声明了Poller类,但并不是定义,所以需要放到这里来实现!
void Channel::update() { _poller->update_event(this); }
void Channel::remove() { _poller->remove_event(this); }
2、服务端测试代码
主函数逻辑就是这样子:
- 创建服务器套接字。
- 创建一个
poller
对象。 - 创建一个用于监听套接字的
Channel
对象,然后利用bind
函数设置可读回调函数,并且启动可读监控。 - 开始监听事件,然后获取就绪事件之后进行函数回调。
然后全局中给出了 Channel
对象要回调的函数,如可读、可写等等事件的回调函数,它们都是由 Acceptor
函数设置的,Acceptor
函数是专门给监听套接字管理对象 listen_channel
使用的,用来监听新链接,然后给新链接设置我们上面给出的回调函数,最后启动这些新链接的可读事件开始监听!
#include "../source/server.hpp"void CloseEvent(Channel* channel)
{std::cout << "close: " << channel->get_fd() << std::endl;channel->remove(); // 移除监控channel->clear_callback();delete channel;
}void ReadEvent(Channel* channel)
{// 这里读事件处理,我们就做简单的打印、启动可写事件监控即可int fd = channel->get_fd();char buffer[1024] = { 0 };int n = recv(fd, buffer, sizeof(buffer) - 1, 0);if(n > 0){buffer[n] = 0;std::cout << buffer << std::endl;// 接收到数据之后,启动可写事件监控channel->enable_write();}else{CloseEvent(channel); // 其实不应该释放,但是因为当前只是测试,所以需要关闭}
}void WriteEvent(Channel* channel)
{// 这里做个简单的发送即可int fd = channel->get_fd();const char* data = "lirendada 你好呀!";int n = send(fd, data, strlen(data), 0);if(n < 0){return CloseEvent(channel); // 错误的话释放该对象}channel->disable_write(); // 然后关闭可写事件监控
}void ErrorEvent(Channel* channel)
{CloseEvent(channel); // 错误的话释放该对象
}void ArbitraryEvent(Channel* channel)
{std::cout << "有一个事件发生,fd:" << channel->get_fd() << std::endl;
}void Acceptor(Channel* listen_channel, Poller* poller)
{// 获取新链接int newfd = accept(listen_channel->get_fd(), nullptr, nullptr);if(newfd < 0){ELOG("accept error, the error is: ", strerror(errno));return;}// 设置新链接的回调函数Channel* channel = new Channel(newfd, poller);channel->set_read_callback(std::bind(ReadEvent, channel));channel->set_write_callback(std::bind(WriteEvent, channel));channel->set_close_callback(std::bind(CloseEvent, channel));channel->set_error_callback(std::bind(ErrorEvent, channel));channel->set_arbitrary_callback(std::bind(ArbitraryEvent, channel));// 启动新链接的可读事件监控channel->enable_read();
}int main()
{// 创建服务器套接字Socket server;server.create_server(8080);// 创建一个poller对象Poller poller;// 创建一个用于监听套接字的Channel对象,然后利用bind函数设置可读回调函数,并且启动可读监控Channel listen_channel(server.get_fd(), &poller);listen_channel.set_read_callback(std::bind(Acceptor, &listen_channel, &poller));listen_channel.enable_read();while(true){// 开始监听事件,然后获取事件之后进行函数回调std::vector<Channel*> actives;poller.start_event(&actives);for(int i = 0; i < actives.size(); ++i){actives[i]->handler();}}server.Close();return 0;
}
3、客户端测试代码
就是一个简单的发送消息和回响消息的程序!
#include "../source/server.hpp"int main()
{// 创建客户端套接字Socket client_sock;client_sock.create_client(8080, "127.0.0.1");while(true){// 做一个简单的发送和回响std::string str;getline(std::cin, str);client_sock.Send(str.c_str(), str.size());char buf[1024] = { 0 };client_sock.Recv(buf, sizeof(buf) - 1);DLOG("%s", buf);sleep(1);}client_sock.Close();return 0;
}