【高并发服务器】Socket套接字类 Channel事件管理类设计与实现
文章目录
- Socket套接字类设计与实现
- 1、成员变量设计
- 2、成员函数设计
- 3、完整的代码实现
- 4、测试样例
- Channel事件管理类设计与实现
- 1、成员变量的设计
- 2、成员接口的设计
- 3、接口的实现

Socket套接字类设计与实现
该模块就是对我们基本的套接字操作进行一个封装,方便我们在使用的时候直接调用,减少重复的编程工作!
1、成员变量设计
成员变量很简单,因为涉及到对套接字的操作,所以只需要一个文件描述符 _sockfd
即可!
2、成员函数设计
成员函数比较简单,无非就是对套接字操作的封装,如下所示:
const static int MAXBACKLOG = 1024;
class Socket
{
private:int _sockfd;
public:Socket();Socket(int sockfd);~Socket();// 1. 创建套接字bool Create();// 2. 绑定套接字信息bool Bind(const std::string& ip, uint16_t port);// 3. 监听套接字bool Listen(int backlog = MAXBACKLOG);// 4. 获取新连接int Accept();// 5. 客户端发起请求bool Connect(const std::string& ip, uint16_t port);// 6. 接收数据ssize_t Send(void* buffer, size_t size, int flag = 0);// 7. 发送数据ssize_t Recv(void* buffer, size_t size, int flag = 0);// 8. 关闭套接字void Close();// 9. 创建一个服务端链接的接口bool create_server(uint16_t port, const std::string& ip = "0.0.0.0", bool isBlock = false)// 10. 创建一个客户端连接的接口bool create_client(uint16_t port, const std::string& ip);// 11. 设置套接字选项 -- 开启地址端口复用void reuse_addr();// 12. 设置套接字阻塞属性 -- 设置为非阻塞void set_nonblock();
};
3、完整的代码实现
就是对套接字操作的简单封装,并不难!
const static int MAXBACKLOG = 1024;
class Socket
{
private:int _sockfd;
public:Socket() : _sockfd(-1) {}Socket(int sockfd) : _sockfd(sockfd) {}~Socket() { Close(); };// 1. 创建套接字bool Create(){_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); // IPPROTO_TCP指定的是TCP协议if(_sockfd < 0){ELOG("create socket error!!");return false;}return true;}// 2. 绑定套接字信息bool Bind(const std::string& ip, uint16_t port){// 填充信息struct sockaddr_in local;memset(&local, 0, sizeof(struct sockaddr_in));local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = inet_addr(ip.c_str());// 绑定信息int n = bind(_sockfd, (struct sockaddr*)&local, sizeof(struct sockaddr_in));if(n < 0){ELOG("bind error!!");return false;}return true;}// 3. 监听套接字bool Listen(int backlog = MAXBACKLOG){int n = listen(_sockfd, backlog);if(n < 0){ELOG("listen error!!");return false;}return true;}// 4. 获取新连接int Accept(){// 这里不关心获取到的新连接的属性int newfd = accept(_sockfd, nullptr, nullptr);if(newfd < 0){ELOG("accept error!!");return -1;}return newfd;}// 5. 客户端发起请求bool Connect(const std::string& ip, uint16_t port){// 填充信息struct sockaddr_in client;memset(&client, 0, sizeof client);client.sin_family = AF_INET;client.sin_port = htons(port);client.sin_addr.s_addr = inet_addr(ip.c_str());// 发起连接int n = connect(_sockfd, (struct sockaddr*)&client, sizeof(struct sockaddr_in));if(n < 0){ELOG("connect server error!!");return false;}return true;}// 6. 接收数据ssize_t Send(const void* buffer, size_t size, int flag = 0){ssize_t n = send(_sockfd, buffer, size, flag);if(n <= 0){// EAGAIN 表示当前socket的发送缓冲区满了,在非阻塞的情况下才会有这个错误// EINTR 表示当前socket的阻塞等待,被信号打断了if(errno == EAGAIN || errno == EINTR)return 0;ELOG("send error");return -1;}return n; // 返回实际发送的数据长度}// 非等待式发送数据ssize_t send_with_noblock(const void* buffer, size_t size){if(size == 0)return 0;return Send(buffer, size, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞}// 7. 发送数据ssize_t Recv(void* buffer, size_t size, int flag = 0){ssize_t n = recv(_sockfd, buffer, size, flag);if(n <= 0){// EAGAIN 表示当前socket的接收缓冲区中没有数据了,在非阻塞的情况下才会有这个错误// EINTR 表示当前socket的阻塞等待,被信号打断了if(errno == EAGAIN || errno == EINTR)return 0;ELOG("recv error");return -1;}return n; // 返回实际发送的数据长度}// 非等待式接收数据ssize_t recv_with_noblock(void* buffer, size_t size){return Recv(buffer, size, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞}// 8. 关闭套接字void Close(){if(_sockfd != -1){close(_sockfd);_sockfd = -1;}}// 9. 创建一个服务端链接的接口bool create_server(uint16_t port, const std::string& ip = "0.0.0.0", bool isNonBlock = false){// 1. 创建套接字 2. 绑定地址 3. 开始监听 4. 设置非阻塞 5. 启动地址重用if(Create() == false) return false;if(Bind(ip, port) == false) return false;if(Listen() == false) return false;if(isNonBlock)set_nonblock();reuse_addr();return true;}// 10. 创建一个客户端连接的接口bool create_client(uint16_t port, const std::string& ip){// 1. 创建套接字 2. 发起连接if(Create() == false) return false;if(Connect(ip, port) == false)return false;return true;}// 11. 设置套接字选项 -- 开启地址端口复用void reuse_addr(){// 设置地址复用int val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void*)&val, sizeof(int));// 设置端口复用val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void*)&val, sizeof(int));}// 12. 设置套接字阻塞属性 -- 设置为非阻塞void set_nonblock(){int old = fcntl(_sockfd, F_GETFL, 0);fcntl(_sockfd, F_SETFL, old | O_NONBLOCK);}
};
4、测试样例
创建一个服务端和客户端,这里只做简单的收发数据:
// 服务端
#include "../source/server.hpp"int main()
{Socket server;server.create_server(8080);while(true){int newfd = server.Accept();if(newfd != -1){Socket client(newfd);char buf[1024] = {0};client.Recv(buf, 1023);DLOG("%s", buf);sleep(1);std::string str = "hello lirendada!";client.Send(str.c_str(), str.size());client.Close();}sleep(1);}server.Close();return 0;
}
客户端代码:
// 客户端
#include "../source/server.hpp"int main()
{Socket cli_sock;cli_sock.create_client(8080, "127.0.0.1");for (int i = 0; i < 5; i++) {std::string str = "hha";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};cli_sock.Recv(buf, 1023);DLOG("%s", buf);sleep(1);}while(1) sleep(1);return 0;
}
Channel事件管理类设计与实现
该模块是对一个描述符需要进行的 IO
事件管理的模块,实现对描述符可读,可写,错误………事件的管理操作,以及 Poller
模块对描述符进行 IO
事件监控就绪后,根据不同的事件,回调不同的处理函数功能。
之所以需要有该模块,是为了更方便的在编码的时候进行一个文件描述符对应事件的维护处理,比如说对一个文件描述符可写事件的设置,这个动作是很频繁的,我们可以将其封装成接口使用等等情况。
简单地说,就是 一个连接监听什么事件、触发什么事件都由该模块处理!
1、成员变量的设计
因为我们要对事件进行管理,并且使用的是 epoll
模型,所以就 要有一个文件描述符 _fd
。
而对这些事件的管理其实就是在操作的时候用按位或操作将要管理的事件添加上即可,而这些事件比如说可读事件 EPOLLIN
等等,其实都是一个 uint32_t
的数据类型,所以我们的成员变量也就 需要一个 uint32_t
的数据来存放要管理的事件!
因为触发后我们需要有对应的回调事件,所以可以 用包装器创建几个回调函数成员,方便设置和调用,这个包装器中设置的回调函数的类型就涉及到了 Connection
模块的指针,因为 Channel
模块中这些回调函数其实是通过 Connection
模块设置的,如下图所示:
此外,在连接建立完成之后,就需要将 Channel
模块中的监控事件挂到 EventLoop
中,所以势必会和 EventLoop
模块产生联系,因为 EventLoop
模块就是对所有连接的监控事件的管理,所以必须要通知 EventLoop
模块,所以 Channel
模块中还需要一个 EventLoop
模块的指针,方便找到其模块中的添加、移除、修改事件监控的接口,如下图所示:
但是因为 EventLoop
模块还没实现,这里就先不给出该成员变量!
2、成员接口的设计
对于功能设计我们分为两块:对文件描述符监控事件的管理、对文件描述符监控事件触发后的处理。
- 对文件描述符监控事件的管理:
- 判断描述符释放可读
- 判断描述符释放可写
- 设置描述符监控可读
- 设置描述符监控可写
- 解除可读事件的监控
- 解除可写事件的监控
- 解除所有事件的监控
- ……
- 对文件描述符监控事件出发后的处理:
- 需要处理的事件:可读、可写、挂断、错误、任意事件(其实对应的就是各各宏)
- 设置对于不同事件的 回调处理函数,明确触发了某个事件之后应该如何处理。
但是因为这里接口还涉及到与 EventLoop
的联系,所以还没办法实现完善,具体的完善接口设计,会在后面 Poller
类以及 EventLoop
类的联调中给出!
所以当前的设计成员如下所示:
using eventcallback_t = std::function<void()>; // 事件触发的函数类型
class Channel
{
private:int _fd; // 文件描述符uint32_t _events; // 当前需要监控的事件uint32_t _revents; // 当前触发或者就绪的事件(由外部设置)eventcallback_t _read_callback; // 可读事件被触发的回调函数eventcallback_t _write_callback; // 可写事件被触发的回调函数eventcallback_t _error_callback; // 错误事件被触发的回调函数eventcallback_t _close_callback; // 关闭事件被触发的回调函数eventcallback_t _arbitrary_callback; // 任意事件被触发的回调函数
public:Channel(int fd) : _fd(fd), _events(0), _revents(0) {}int get_fd(); // 获取文件描述符uint32_t get_events(); // 获取当前监控的事件void set_revents(uint32_t revents); // 设置实际就绪的事件(就绪事件其实由EventLoop管理,只是触发后来通知Channel模块调用回调函数而已)// 设置对应触发事件的回调函数void set_read_callback(const eventcallback_t& cb);void set_write_callback(const eventcallback_t& cb);void set_error_callback(const eventcallback_t& cb);void set_close_callback(const eventcallback_t& cb);void set_arbitrary_callback(const eventcallback_t& cb);bool is_read_able(); // 当前是否监控了可读bool is_write_able(); // 当前是否监控了可写void enable_read(); // 启动读事件监控void enable_write(); // 启动写事件监控void disable_read(); // 关闭读事件监控void disable_write(); // 关闭写事件监控void disable_all(); // 关闭所有事件监控void handler(); // 事件总处理函数。一旦触发了事件,就调用这个函数,而触发了什么事件如何处理由连接管理者决定void update(); // 添加或者修改事件监控(需要和EventLoop模块联系起来)void remove(); // 移除事件监控(需要和EventLoop模块联系起来)
};
3、接口的实现
其实实现并不难,就是一些事件的设置等等,最重要的还是 _handler()
函数中对触发事件的管理!
using eventcallback_t = std::function<void()>; // 事件触发的函数类型
class Channel
{
private:int _fd; // 文件描述符uint32_t _events; // 当前需要监控的事件uint32_t _revents; // 当前触发或者就绪的事件(由外部设置)eventcallback_t _read_callback; // 可读事件被触发的回调函数eventcallback_t _write_callback; // 可写事件被触发的回调函数eventcallback_t _error_callback; // 错误事件被触发的回调函数eventcallback_t _close_callback; // 关闭事件被触发的回调函数eventcallback_t _arbitrary_callback; // 任意事件被触发的回调函数
public:Channel(int fd) : _fd(fd), _events(0), _revents(0) {}~Channel() {close(_fd); // 记得要释放文件描述符 }int get_fd() { return _fd; } // 获取文件描述符uint32_t get_events() { return _events; } // 获取当前监控的事件void set_revents(uint32_t revents) { _revents = revents; } // 设置实际就绪的事件// 设置对应触发事件的回调函数void set_read_callback(const eventcallback_t& cb) { _read_callback = cb; }void set_write_callback(const eventcallback_t& cb) { _write_callback = cb; }void set_error_callback(const eventcallback_t& cb) { _error_callback = cb; }void set_close_callback(const eventcallback_t& cb) { _close_callback = cb; }void set_arbitrary_callback(const eventcallback_t& cb) { _arbitrary_callback = cb; }bool _is_read_able() { return (_events & EPOLLIN); } // 当前是否监控了可读bool _is_write_able() { return (_events & EPOLLOUT); } // 当前是否监控了可写// 启动读事件监控void enable_read() {_events |= EPOLLIN; update(); // 实际更新涉及到epoll操作,所以交给Poller来实现}// 启动写事件监控void enable_write() {_events |= EPOLLOUT; update(); // 实际更新涉及到epoll操作,所以交给Poller来实现}// 关闭读事件监控void disable_read(){_events &= (~EPOLLIN);update(); // 实际更新涉及到epoll操作,所以交给Poller来实现}// 关闭写事件监控void disable_write(){_events &= (~EPOLLOUT);update(); // 实际更新涉及到epoll操作,所以交给Poller来实现}// 关闭所有事件监控void disable_all(){_events = 0;update(); // 实际更新涉及到epoll操作,所以交给Poller来实现} // 清除所有的回调函数void clear_callback() {_read_callback = _write_callback = _error_callback = _close_callback = _arbitrary_callback = nullptr;}// 事件总处理函数。一旦触发了事件,就调用这个函数,而触发了什么事件如何处理由连接管理者决定void handler() {// 下面因为错误和关闭事件触发的时候会释放连接,此时就不能再调用_arbitrary_callback了,所以需要提前先调用if((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) ||(_revents & EPOLLPRI)){// 如果是有数据可读、对端关闭写入、有带外数据的事件触发的话,则都属于是可读事件处理if(_read_callback)_read_callback();if(_arbitrary_callback)_arbitrary_callback(); // 不管任何事件,都调用的回调函数}// 下面的三个事件有可能会释放连接,所以只能处理一个,要用else if连接if(_revents & EPOLLOUT) {if(_write_callback)_write_callback(); // 可读事件触发的处理if(_arbitrary_callback)_arbitrary_callback(); // 不管任何事件,都调用的回调函数}else if(_revents & EPOLLERR) {if(_arbitrary_callback)_arbitrary_callback(); // 不管任何事件,都调用的回调函数if(_error_callback)_error_callback(); // 错误事件触发的处理}else if(_revents & EPOLLHUP) {if(_arbitrary_callback)_arbitrary_callback(); // 不管任何事件,都调用的回调函数if(_close_callback)_close_callback(); // 关闭事件触发的处理}}// 添加或者修改事件监控void update();// 移除事件监控void remove();
};