当前位置: 首页 > news >正文

【仿Mudou库one thread per loop式并发服务器实现】SERVER服务器模块实现

SERVER服务器模块实现

  • 1. Buffer模块
  • 2. Socket模块
  • 3. Channel模块
  • 4. Poller模块
  • 5. EventLoop模块
    • 5.1 TimerQueue模块
    • 5.2 TimeWheel整合到EventLoop
    • 5.1 EventLoop与线程结合
    • 5.2 EventLoop线程池
  • 6. Connection模块
  • 7. Acceptor模块
  • 8. TcpServer模块

1. Buffer模块

在这里插入图片描述

Buffer模块:缓冲区模块
提供的功能:存储数据,取出数据

实现思想:

  1. 实现缓冲区得有一块内存空间,采用vector,vector底层其实使用的就是一个线性的内存空间。不用string的原因是因为,网络传输数据什么都有而string在读取遇 ‘\0’ 就停止了。
  2. 要素:
  • 默认的空间大小
  • 当前的读取数据位置
  • 当前的写入数据位置
  1. 操作:
    a. 写入数据:
    当前写入位置指向哪里,就从哪里开始写入。
    如果后续剩余空闲空间不够了,考虑整体缓冲区空闲空间是否足够(因为读位置也会向后偏移,前边有可能会有空闲空间)。
    足够:将数据移动到起始位置即可
    不够:扩容,从当前写位置开始扩容足够大小
    数据一旦写入成功,当前写位置,就要向后偏移
    b. 读取数据:
    当前的读取位置指向哪里,就从哪里开始读取,前提是有数据可读
    可读数据大小:当前写入位置,减去当前读取位置
// 缓存区Buffer模块
#define BUFFER_SIZE 1024
class Buffer
{
private:std::vector<char> _buffer; // 使⽤vector进⾏内存空间管理uint64_t _read_idx;        // 读偏移uint64_t _write_idx;       // 写偏移public:Buffer() : _read_idx(0), _write_idx(0), _buffer(BUFFER_SIZE) {}// 获得数组起始位置,注意并不是对象的位置// void* Begin()char *Begin(){return &(*_buffer.begin());}// 获取当前写⼊起始地址, _buffer的空间起始地址,加上写偏移量// void* ReadPos()char *WritePosition(){return Begin() + _write_idx;}// 获取当前读取起始地址// void* WritePos()char *ReadPosition(){return Begin() + _read_idx;}// 获取缓冲区末尾空闲空间大小--写偏移之后的空闲空间, 总体空间大小减去写偏移uint64_t TailIdleSize(){return _buffer.size() - _write_idx;// 这里也可以用_buffer.size() - _write_idx// 因为vector构造使用的函数即开辟空间也初始化了// 也就是说size()和capacity()是一样的大小// 后续也用resize()会调整空间}// 获取缓冲区起始空闲空间大小--读偏移之前的空闲空间uint64_t HeadIdleSize(){return _read_idx;}// 获取可读数据⼤⼩ = 写偏移 - 读偏移uint64_t ReadAbleSize(){return _write_idx - _read_idx;}// 确保可写空间⾜够(整体空闲空间够了就移动数据,否则就扩容)void EnsureWriteSpace(uint64_t len){// 如果末尾空闲空间⼤⼩⾜够,直接返回if (TailIdleSize() >= len)return;// 末尾空闲空间不够,则判断加上起始位置的空闲空间大小是否足够, 够了就将数据移动到起始位置if (HeadIdleSize() + TailIdleSize() >= len){// 将数据移动到起始位置// 把当前数据大小先保存起来uint64_t size = ReadAbleSize();     // 把可读数据拷贝到起始位置                     std::copy(ReadPosition(), ReadPosition() + size, Begin()); // 将读偏移归0_read_idx = 0;   // 将写位置置为可读数据大小,因为当前的可读数据大小就是写偏移量                                        _write_idx = size;                                       }else{// 总体空间不够,则需要扩容,不移动数据,直接给写偏移之后扩容足够空间即可_buffer.resize(_write_idx + len);}}// 将读偏移向后移动void MoveReadOffset(uint64_t len){if (len == 0) return; _read_idx += len;}// 将写偏移向后移动void MoveWriteOffset(uint64_t len){//向后移动的大小,必须小于当前后边的空闲空间大小assert(len <= TailIdleSize());_write_idx += len;}// 写入数据void Write(const void *date, uint64_t len){// 1. 保证有足够空间,2. 拷⻉数据进去if (len == 0)return;EnsureWriteSpace(len);// void* 不能解引用 和 ++const char *d = (const char *)date;//copy是一个模板函数,类型需要匹配,因此上面读写位置返回都是char*std::copy(d, d + len, WritePosition());}void WriteAndPush(const void *date, uint64_t len){Write(date, len);MoveWriteOffset(len);}void WriteString(const std::string &data){return Write(data.c_str(), data.size());}void WriteStringAndPush(const std::string &data){WriteString(data);MoveWriteOffset(data.size());}void WriteBuffer(Buffer &data){return Write(data.ReadPosition(), data.ReadAbleSize());}void WriteBufferAndPush(Buffer &data){WriteBuffer(data);MoveWriteOffset(data.ReadAbleSize());}// 读取数据void Read(void *buff, uint64_t len){// 要求要获取的数据大小必须⼩于可读数据大小assert(len <= ReadAbleSize());std::copy(ReadPosition(), ReadPosition() + len, (char *)buff);}void ReadAndPop(void *buff, uint64_t len){Read(buff, len);MoveReadOffset(len);}std::string ReadString(uint64_t len){//要求要获取的数据大小必须小于可读数据大小assert(len <= ReadAbleSize());std::string str;str.resize(len);// str.c_str()返回的是const char*Read(&str[0], len);return str;}std::string ReadStringAndPop(uint64_t len){assert(len <= ReadAbleSize());std::string str = ReadString(len);MoveReadOffset(len);return str;}char *FindCRLE(){char *p = (char *)memchr(ReadPosition(), '\n', ReadAbleSize());return p;}// 通常获取⼀⾏数据,这种情况针对是std::string GetLine(){char *p = FindCRLE();if (p == nullptr)return "";return ReadString(p - ReadPosition() + 1); //+是为了把\n也读出来}std::string GetLineAndPop(){std::string str = GetLine();MoveReadOffset(str.size());return str;}void Clear(){_read_idx = 0;_write_idx = 0;}
};

这里补充一个按照日志等级打印的模块,方便后面调试。顺便把这个项目用到的所有头文件都写出来。

#include <iostream>
#include <vector>
#include <string>
#include <algorithm>
#include <functional>
#include <unordered_map>
#include <thread>
#include <mutex>
#include <memory>
#include <typeinfo>
#include <condition_variable>#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/timerfd.h>
#include <signal.h>#include <cassert>
#include <cstring>
#include <stdint.h>// 日志打印
#define NORMAL 0
#define DEBUG 1
#define WARRING 2
#define FATAL 3// 根据等级打印日志
#define LOG(level, format, ...)                                                               \{                                                                                         \if (level >= FATAL)                                                                   \{                                                                                     \time_t t = time(NULL);                                                            \struct tm *m = localtime(&t);                                                     \char ts[32] = {0};                                                                \strftime(ts, 31, "%H:%M:%S", m);                                                  \fprintf(stdout, "[%p %s %s:%d]" format "\n", (void*)pthread_self(),ts, __FILE__, __LINE__, ##__VA_ARGS__); \}                                                                                     \}// ... 表示可变参数,可以传0个、1个、多个
// _VA_ARGS__ 将...的内容原封不动抄到__VA_ARGS__位置
// ##__VA_ARGS__ ...没有参数,就移除这个_VA_ARGS_,相对于没有这个东西#define LOG_NORMAL(format, ...) LOG(NORMAL, format, ##__VA_ARGS__)
#define LOG_DEBUG(format, ...) LOG(DEBUG, format, ##__VA_ARGS__)
#define LOG_WARRING(format, ...) LOG(WARRING, format, ##__VA_ARGS__)
#define LOG_FATAL(format, ...) LOG(FATAL, format, ##__VA_ARGS__)

2. Socket模块

在这里插入图片描述

Socket模块:对socket套接字操作进行分资

功能:

  1. 创建套接字
  2. 绑定地址信息
  3. 开始监听
  4. 向服务器发起连接
  5. 获取新连接
  6. 接收数据
  7. 发送数据
  8. 关闭套接字
  9. 创建一个服务端连接
  10. 创建一个客户端连接
  11. 设置套接字选项—开启地址端口重用
  12. 设置套接字阻塞属性-设置为非阻塞
// 套接字Socket模块
#define LISTEN_SIZE 1024class Socket
{
private:int _sockfd;public:Socket() : _sockfd(-1) {}Socket(int fd) : _sockfd(fd) {}~Socket() { Close(); }int Fd() { return _sockfd; }// 创建套接字bool Create(){_sockfd = socket(AF_INET, SOCK_STREAM, 0);if (_sockfd < 0){LOG_FATAL("CREATE SOCKET FAILED!!");return false;}return true;}// 绑定ip+portbool Bind(const uint16_t &port){struct sockaddr_in peer;memset(&peer, 0, sizeof peer);peer.sin_family = AF_INET;peer.sin_port = htons(port);peer.sin_addr.s_addr = INADDR_ANY; // 任意地址绑定if (bind(_sockfd, (sockaddr *)&peer, sizeof peer) < 0){LOG_FATAL("BIND ADDRESS FAILED!");return false;}return true;}// 开始监听bool Listen(int backlog = LISTEN_SIZE){if (listen(_sockfd, backlog) < 0){LOG_FATAL("SOCKET LISTEN FAILED!");return false;}return true;}// 向服务器发起连接bool Connect(const uint16_t &port, const std::string ip){struct sockaddr_in peer;memset(&peer, 0, sizeof peer);peer.sin_family = AF_INET;peer.sin_port = htons(port);peer.sin_addr.s_addr = inet_addr(ip.c_str());if (connect(_sockfd, (sockaddr *)&peer, sizeof peer) < 0){LOG_FATAL("CONNECT SERVER FAILED!");return false;}return true;}// 获取连接int Accpet(){// 不关注是那个ip和port发起的连接int newfd = accept(_sockfd, nullptr, nullptr);if (newfd < 0){LOG_DEBUG("SOCKET ACCEPT FAILED!");return -1;}return newfd;}// 阻塞读取数据ssize_t Recv(void *buffer, size_t len, int flag = 0){ssize_t ret = recv(_sockfd, buffer, len, flag);if (ret <= 0){// EAGAIN 当前socket的接收缓冲区中没有数据了,在⾮阻塞的情况下才会有这个错误// EINTR 表⽰当前socket的阻塞等待,被信号打断了,if (errno == EAGAIN || errno == EINTR){return 0; // 表⽰这次接收没有接收到数据}LOG_FATAL("SOCKET RECV FAILED!");return -1;}return ret; // 实际接收的数据长度}// 非阻塞读取数据ssize_t NonBlockRecv(void *buffer, size_t len){return Recv(buffer, len, MSG_DONTWAIT); // MSG_DONTWAIT 表⽰当前接收为非阻塞}// 发出数据ssize_t Send(const void *buffer, size_t len, int flag = 0){ssize_t ret = send(_sockfd, buffer, len, flag);if (ret < 0){if (errno == EAGAIN || errno == EINTR){return 0;}LOG_FATAL("SOCKET SEND FAILED!");return -1;}return ret; // 实际发送的数据长度}// 非阻塞发出数据ssize_t NonBlockSend(const void *buffer, size_t len){if(len == 0) return 0;return Send(buffer, len, MSG_DONTWAIT); // MSG_DONTWAIT 表⽰当前接收为非阻塞}// 关闭套接字void Close(){if (_sockfd != -1){close(_sockfd);_sockfd = -1;}}// 创建一个服务端连接bool CreateServer(const uint16_t &port,bool block_flag = false){// 1.创建套接字if (Create() == false)return false;// 2.设置非阻塞if(block_flag) NonBlock();// 3.绑定if (Bind(port) == false)return false;// 4.监听if (Listen() == false)return false;// 5.开启地址端口复用ReuseAddress();return true;}// 创建一个客服端连接bool CreateClient(const uint16_t &port, const std::string ip){// 1.创建套接字if (Create() == false)return false;// 2.发起连接if (Connect(port, ip) == false)return false;return true;}// 设置套接字选项--开启地址端口复用void ReuseAddress(){int opt = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof opt);}// 设置套接字属性 -- 非阻塞void NonBlock(){int fl = fcntl(_sockfd, F_GETFL, 0);fcntl(_sockfd, F_SETFL, fl | O_NONBLOCK);}
};

3. Channel模块

在这里插入图片描述
Channel类设计
目的:对描述符的监控事件管理

  1. 成员:
    因为后边使用epoll进行事件监控,
    EPOLLIN 可读
    EPOLLOUT 可写
    EPOLLRDHUP 连接断开
    EPOLLPRI 优先数据
    EPOLLERR 出错了
    EPOLLHUP 挂断
    而以上的事件都是一个数值uint32_t进行保存,要进行事件管理,就需要有一个uint32t类型的成员保存当前需要监控的事件。
    事件处理这里,因为有五种事件需要处理,就需要五个回调函数。

功能:

  1. 事件管理:
    描述符是否可读
    描述符是否可写
    对描述符监控可读
    对描述符监控可写
    解除可读事件监控
    解除可写事件监控
    解除所有事件监控

  2. 事件触发后的处理的管理
    a.需要处理的事件:可读,可写,挂断,错误,任意
    b.事件处理的回调函数

// 描述符事件管理Channel模块
class EventLoop;class Channel
{
private:int _fd; //当前需要监控的文件描述符EventLoop *_loop; //放在自己所在的EventLoop监控uint32_t _events;  // 当前需要监控的事件uint32_t _revents; // 当前连接触发的事件using EventCallback = std::function<void()>;EventCallback _read_callback;  // 可读事件被触发的回调函数EventCallback _write_callback; // 可写事件被触发的回调函数EventCallback _error_callback; // 错误事件被触发的回调函数EventCallback _close_callback; // 连接断开事件被触发的回调函数EventCallback _event_callback; // 任意事件被触发的回调函数public:// Channel(Poller *poller,int fd) : _fd(fd), _events(0), _revents(0), _poller(poller) {}Channel(EventLoop *loop, int fd) : _fd(fd), _events(0), _revents(0), _loop(loop) {}int Fd(){return _fd;}uint32_t Events() // 获得想要监控的事情{return _events;}void SetREvents(uint32_t events) // 设置实际就绪的事情{_revents = events;}void SetReadCallback(const EventCallback &cb){_read_callback = cb;}void SetWriteCallback(const EventCallback &cb){_write_callback = cb;}void SetErrorCallback(const EventCallback &cb){_error_callback = cb;}void SetCloseCallback(const EventCallback &cb){_close_callback = cb;}void SetEventCallback(const EventCallback &cb){_event_callback = cb;}// 当前是否监控了可读bool ReadAble(){return _events & EPOLLIN;}// 当前是否监控了可写bool WriteAble(){return _events & EPOLLOUT;}// 启动读事件监控void EnableRead(){_events |= EPOLLIN;Update();}// 启动写事件监控void EnableWrite(){_events |= EPOLLOUT;Update();}// 关闭读事件监控void DisableRead(){_events &= ~EPOLLIN;Update();}// 关闭写事件监控void DisableWrite(){_events &= ~EPOLLOUT;Update();}// 关闭所有事件监控,文件描述符还在红黑树上,但是没有事件让它监控void DisableAll(){_events = 0;Update();}// 移除监控,文件描述符从红黑树中删除void Remove();// 添加到监控void Update();// 事件处理,一旦连接触发了事件,就调用这个函数,自己触发了什么事情如何处理由调用它的人决定void HandleEvent(){// 读事件发现连接出现问题,是不去释放连接的,因为我们还看看是否还有数据待发送// 等把数据都发送完了,或者发送出错了,再去关闭连接// 写/出错/挂断 都会关闭连接,我们只需要关闭一次就好了//关于把任意事件回调放在最后,在测试阶段会有说明。if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)){// if (_event_callback)//     _event_callback();if (_read_callback)_read_callback();}// 有可能会释放连接的操作事件,⼀次只处理⼀个if (_revents & EPOLLOUT){       // if (_event_callback)//     _event_callback();if (_write_callback)_write_callback();}else if (_revents & EPOLLERR){// if (_event_callback)//     _event_callback();if (_error_callback)_error_callback();}else if (_revents & EPOLLHUP){// if (_event_callback)//     _event_callback();if (_close_callback)_close_callback();}if (_event_callback){_event_callback();}}
};//这里先不做接释,等到了EventLoop在细说。
void Channel::Remove()
{return _loop->RemoveEvent(this);
}void Channel::Update()
{return _loop->UpdateEvent(this);
}

客户端异常退出,比如断电断网,客户端底层操作系统没有办法及时服务器进行四次挥手,因此服务器也收不到任何信息,除非服务器主动去读或者写,客户端会给服务器发送一个RST,服务器读写的返回值都是-1,错误码设置为ECOMNNRESET,并且在epoll模型下的服务器会触发EPOLLERR和EPOLLHUOP事件。

客户端正常调用close或者ctrl+c终止进程,客户端操作系统有机会和服务器进行四次挥手,read返回值是0,如果发送缓存区还有数据可能首次发送回成功,第二次再次发送返回值是-1,并且会触发SIGPIPE信号,如果服务器不对这个信号做处理,就会导致服务器奔溃退出。因此我们还需要对这个信号进行特殊处理。

/*避免服务器因为给断开连接的客⼾端进⾏send触发异常导致程序崩溃,因此忽略SIGPIPE信号*/
/*定义静态全局是为了保证构造函数中的信号忽略处理能够在程序启动阶段就被直接执⾏*/
class NetWork
{public:NetWork(){LOG_DEBUG("SIGPIPE INIT");signal(SIGPIPE,SIG_IGN);}
};static NetWork nw;

4. Poller模块

在这里插入图片描述

Poller模块:描述符IO事件监控模块
意义:通过epoll实现对描述符的lO事件监控
功能:

  1. 添加/修改描述符的事件监控(不存在则添加,存在则修改)
  2. 移除描述符的事件监控

封装思想:

  1. 必须拥有一个epoll的操作句柄
  2. 拥有一个structepoll_event结构数组,监控时保存所有的活跃事件
  3. 使用hash表管理描述符与描述符对应的事件管理Channel对象

逻辑流程:

  1. 对描述符进行监控,通过Channel才能知道描述符需要监控什么事件
  2. 当描述符就绪了,通过描述符在hash表中找到对应的Channel(得到了Channel才能知道什么事件如何处理)当描述符就绪了,返回就绪描述符对应的Channel
// 描述符I/O事件监控Poller模块
#define MAX_EPOLLEVEVTS 1024
class Poller
{
private:int _epfd;struct epoll_event _evs[MAX_EPOLLEVEVTS];std::unordered_map<int, Channel *> _channels;private:// 对epoll直接操作void Update(Channel *channel, int op){int fd = channel->Fd();struct epoll_event ev;ev.data.fd = fd;ev.events = channel->Events();int ret = epoll_ctl(_epfd, op, fd, &ev);if (ret < 0){LOG_WARRING("EPOLLCTL FAILED!");}return;}public:Poller(){_epfd = epoll_create(MAX_EPOLLEVEVTS);if (_epfd < 0){LOG_FATAL("EPOLL CREATE FAILED!");abort();//退出程序}}// 添加/修改监控事件void UpdateEvent(Channel *channel){auto it = _channels.find(channel->Fd());if (it == _channels.end()){// 不存在则添加_channels.insert(std::make_pair(channel->Fd(), channel));return Update(channel, EPOLL_CTL_ADD);}// 存在则修改return Update(channel, EPOLL_CTL_MOD);}// 移除监控void RemoveEvent(Channel *channel){auto it = _channels.find(channel->Fd());if (it != _channels.end()){_channels.erase(it);}Update(channel, EPOLL_CTL_DEL);}// 开始监控,返回就绪事件void Poll(std::vector<Channel *> *active){int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVEVTS, -1);if (nfds < 0){if (errno == EINTR){return;}LOG_FATAL("EPOLL WAIT ERROR:%s\n", strerror(errno));abort();//退出程序}for (int i = 0; i < nfds; ++i){auto it = _channels.find(_evs[i].data.fd);assert(it != _channels.end());it->second->SetREvents(_evs[i].events); // 设置实际就绪的事件active->push_back(it->second);}return;}
};

5. EventLoop模块

在这里插入图片描述

首先要先介绍一下eventfd,eventfd是一种事件通知机制。创建一个描述符用于实现事件通知。eventfd本质在内核里边管理的就是一个计数器。创建eventfd就会在内核中创建一个计数器(结构)。

每当向evenfd中写入一个数值—用于表示事件通知次数,可以使用read进行数据的读取,读取到的数据就是通知的次数。

假设每次给eventfd中写入一个1,就表示通知了一次,连续写了三次之后,
再去read读取出来的数字就是3,读取之后计数清0。

用处:在EventLoop模块中实现线程间的事件通知功能

#include<iostream>
#include<sys/eventfd.h>
#include<stdint.h>
#include<unistd.h>/*int eventfd(unsigned int initval, int flags);
功能:创建一个eventfd对象,实现事件通知
参数:initval:计数初值flags:EFD_CLOEXEC-禁止进程复制EFDNONBLOCK-启动非阻塞属性
返回值:返回一个文件描述符用于操作
eventfd也是通过read/write/close进行操作的。
注意点:read&write进行IO的时候数据只能是一个8字节数据*/int main()
{int evfd = eventfd(0,EFD_CLOEXEC | EFD_NONBLOCK);if(evfd < 0){std::cout<<"EVENTFD FAIL!"<<std::endl;abort();}uint64_t len = 1;int ret = write(evfd,&len,sizeof len);ret = write(evfd,&len,sizeof len);ret = write(evfd,&len,sizeof len);if(ret < 0){std::cout<<"WRITE FAIL!"<<std::endl;abort();}uint64_t val;ret = read(evfd,&val,sizeof val);if(ret < 0){std::cout<<"WRITE FAIL!"<<std::endl;abort();}std::cout<<val<<std::endl;return 0;
}

EventLoop:进行事件监控,以及事件处理的模块
关键点:这个模块与线程是一一对应关联的,一个EventLoop对应一个线程

监控了一个连接,而这个连接一旦就绪,就要进行事件处理。但是如果这个描述符,在多个EventLoop线程中都触发了事件,然后进行处理,就会存在线程安全问题。就比如说即时通信,当某个EventLoop管理的连接触发了读事件,然后要把这条消息发送给所有在线的连接,此时就会涉及到多个EventLoop线程,就有线程安全的问题。如果不想加锁怎么办,毕竟加锁解锁有一定的消耗。不加锁怎么保证多线程之间线程安全问题?

我们可以将一个EventLoop管理所有连接的事件监控,以及连接事件处理,以及其他操作都放在同一个线程中进行!

如何保证一个连接的所有操作都在EventLoop对应的线程中?

解决方案:给EventLoop模块中,添加一个任务队列

对连接的所有操作,都进行一次封装,将对连接的操作并不直接执行,而是当作任务添加到任务队列中。

EventLoop处理流程:

  1. 在线程中对描述符进行事件监控
  2. 有描述符就绪则对描述符进行事件处理(如何保证处理回调函数中的操作都在线程中)
  3. 所有的就绪事件处理完了,这时候再去将任务队列中的所有任务一一执行

这样能够保证对于连接的所有操作,都是在一个线程中进行的,不涉及线程安全问题但是对于任务队列的操作有线程安全的问题,只需要给task的操作加一把锁即可。

这里还有一个注意点:当事件就绪,需要处理的时候,处理过程中,如果对连接要进行某些操作:这些操作必须在EventLoop对应的线程中执行,保证对连接的各项操作都是线程安全的。

  1. 如果执行的操作本就在线程中,不需要将操作压入队列了,可以直接执行
  2. 如果执行的操作不再线程中,才需要加入任务池,等到事件处理完了然后执行任务

上面是整个项目最核心的知识点。

EventLoop模块的成员:

  1. 事件监控
    使用Poller模块,有事件就绪则进行事件处理
  2. 执行任务队列中的任务
    一个线程安全的任务队列

注意点:

因为有可能因为等待描述符IO事件就绪,导致执行流流程阻塞,这时候任务队列中的任务将得不到执行,因此得有一个事件通知的东西,能够唤醒事件监控的阻塞,上面eventfd在这里就派上用场了。

// EvevtLoop模块,对事件进行监控,处理class EventLoop
{
private:using Functor = std::function<void()>;std::thread::id _thread_id;  // EvevtLoop对应的线程IDint _event_fd;               // eventfd唤醒IO事件监控有可能导致的阻塞Channel _event_channel;      // 对_event_fd做事件管理Poller _poller;              // 对所有事件进行监控std::vector<Functor> _tasks; // 任务池std::mutex _mutex;           // 实现任务池操作的线程安全public:// 执行任务池中所有任务void RunAllTask(){// 为了不让每次拿任务都加锁解锁,因此一次把任务都拿出来// 这也是不用队列做任务池的原因std::vector<Functor> functor;{std::unique_lock<std::mutex> _lock(_mutex);_tasks.swap(functor);}// 依次执行任务for (auto &f : functor){f();}}static int CreateEventFd(){int efd = eventfd(0, O_CLOEXEC | O_NONBLOCK);if (efd < 0){LOG_FATAL("CREATE EVENTFD FAILED!!");abort();}return efd;}void ReadEvent(){uint64_t res = 0;int ret = read(_event_fd, &res, sizeof res);if (ret <= 0){if (errno == EINTR || errno == EWOULDBLOCK){return;}LOG_FATAL("READ EVENTFD FAILED!!");abort();}}void WeakUpEventfd(){uint64_t val = 1;int ret = write(_event_fd, &val, sizeof val);if (ret < 0){if (errno == EINTR){return;}LOG_FATAL("WRITE EVENTFD FAILED!!");abort();}}public:EventLoop(): _thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()), _event_channel(this, _event_fd), _time_wheel(this){// 给eventfd添加可读事件的回调,就绪了,读取eventfd事件通知次数_event_channel.SetReadCallback(std::bind(&EventLoop::ReadEvent, this));// 启动eventfd读事件监控_event_channel.EnableRead();//std::cout<<"_event_fd: "<<_event_fd<<std::endl;};// 1.事件监控 2.就绪事件处理 3.执行任务void Statr(){while(1){// 1.事件监控std::vector<Channel *> active;_poller.Poll(&active);// 2.就绪事件处理for (auto &channel : active){channel->HandleEvent();}// 3.执行任务RunAllTask();}}// 判断当前线程是否是EventLoop对应的线程bool IsInLoop(){return _thread_id == std::this_thread::get_id();}// 判断将要执行的任务是否处于当前线程,如果是则执行,不是则压入队列void RunInLoop(const Functor &cb){if (IsInLoop()){return cb();}return QueueInLoop(cb);}void AssertInLoop() {assert(_thread_id == std::this_thread::get_id());}// 将操作压入任务池void QueueInLoop(const Functor &cb){{std::unique_lock<std::mutex> _lock(_mutex);_tasks.push_back(cb);}// 唤醒有可能因为没有事件就绪,而导致的epoll阻塞// 其实就是给eventfd写入一个数据,eventfd就会触发可读事件WeakUpEventfd();}// 添加/修改描述符的事件监控void UpdateEvent(Channel *channel){_poller.UpdateEvent(channel);}// 移除描述符监控void RemoveEvent(Channel *channel){_poller.RemoveEvent(channel);}
};

5.1 TimerQueue模块

在这里插入图片描述

一个EventLoop模块里面也有一个TimerQueue对象用于定时任务的管理,里面放着每个连接的非活跃超时销毁的任务。这里有个问题,如何让整个程序启动之后,时间轮一秒走一步呢?

可以将时间轮和Linux下的定时器time_create和time_settime整合在一起。

定时器模块的整合:

timerfd:实现内核每隔一段时间,给进程一次超时事件(timerfd可读)
timerwheel:实现每次执行Runtimetask,都可以执行一波到期的定时任务

要实现一个完整的秒级定时器,就需要将这两个功能整合到一起

timerfd设置为每秒钟触发一次定时事件,当事件被触发,则运行一次timerwheel的runtimertask,执行一下所有的过期定时任务。

// 定时器模块
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;class TimerTask
{
private:uint64_t _id;      // 定时器任务对象IDuint32_t _timeout; // 定时任务的超时时间bool _cancel;      // 定时任务是否取消,false不取消,true取消TaskFunc _task_cb;      // 定时器对象要执行的定时任务ReleaseFunc _release_cb;   /// 用于删除TimerWheel中保存的定时器对象信息public:TimerTask(uint64_t id, uint32_t time,const TaskFunc& cb) : _id(id), _timeout(time), _task_cb(cb), _cancel(false) {}~TimerTask(){if (_cancel == false){_task_cb();}_release_cb();}uint32_t DelayTime(){return _timeout;}void SetRelease(const ReleaseFunc &rb){_release_cb = rb;}void Cancel(){_cancel = true;}
};class TimeWheel
{
private:using WeakPtr = std::weak_ptr<TimerTask>;using SharedPtr = std::shared_ptr<TimerTask>;int _tick;                                  // 滴答指针,当前的秒,走到哪里释放哪里,释放哪里,就相当于执行哪里的任务int _capacity;                              // 表盘最大数量---其实就是最大延迟时间std::vector<std::vector<SharedPtr>> _wheel; // 时间轮std::unordered_map<uint64_t, WeakPtr> _timers; // 通过id找到任务对象,方便后续刷新任务// unordered_map第二个参数不能用shared_ptr,如果还用用shared_ptr去指向原始对象,那么就和之前用shared_ptr指向原始对象// 而产生的引用计数根本就不是同一个引用计数,这样如果unordered_map第二个参数引用计数是1,而其他指向它引用计数是2// 万一unordered_map第二个参数shared_ptr被释放了,这个原始对象就会被释放,这是由问题的.// 如何保证定时器的滴答指针,在程序启动之后,每秒走一次// 与linux下得到定时器time_create和time_settime结合一起用// time_settime超时时间间隔设置为1秒,每秒都会给timefd写超时次数// 用EventLoop把timefd管理起来,启动读事件监控,一写读就绪,然后就让滴答指针往后走一步EventLoop *_loop;int _timerfd;             // 定时器描述符--可读事件回调就是读取计数器,执行定时任务Channel _timerfd_channel; //// 释放时间轮上任务对象void RemoveTimer(uint64_t id){auto it = _timers.find(id);if (it == _timers.end()){return; }_timers.erase(it);}static int CreateTimerFd(){int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);if (timerfd < 0){LOG_FATAL("CREATE TIMEFD FAILED!!");abort();}struct itimerspec item;item.it_value.tv_sec = 1;item.it_value.tv_nsec = 0; // 第一次超时时间item.it_interval.tv_sec = 1;item.it_interval.tv_nsec = 0; // 第⼀次之后的超时间隔时间timerfd_settime(timerfd, 0, &item, nullptr);return timerfd;}int ReadTimeFd(){//有可能因为其他描述符的事件处理花费事件⽐较⻓,然后在处理定时器描述符事件的时候,有可能就已经超时了很多次//read读取到的数据times就是从上⼀次read之后超时的次数uint64_t times = 0;int ret = read(_timerfd, &times, 8);if (ret < 0){LOG_FATAL("READ TIMERFD FAILED!!");abort();}//std::cout<<"cnt:"<<times<<std::endl;return times;}// 这个函数应该每秒钟被执行一次,相当于秒针向后走了一步void RunTimerTask(){_tick = (_tick + 1) % _capacity;// 清空指定位置的数组,就会把数组中保存的所有管理定时器对象的shared_ptr释放掉_wheel[_tick].clear(); // 执行任务,释放shared_ptr指针,当引用计数到0,调用Task析构,执行定时任务}void OneTime(){//根据实际超时的次数,执⾏对应的超时任务int times = ReadTimeFd();for (int i = 0; i < times; ++i){RunTimerTask();}}// 设置定时任务void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb){SharedPtr spt(new TimerTask(id, delay, cb));spt->SetRelease(std::bind(&TimeWheel::RemoveTimer, this, id));int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(spt);_timers[id] = WeakPtr(spt);}// 刷新/延迟定时任务void TimerRefreshInLoop(uint64_t id){auto it = _timers.find(id);if (it == _timers.end()){return;//没找着定时任务,没法刷新,没法延迟}// spt是临时对象,出栈后会自动销毁,不会增加引用计数SharedPtr spt = it->second.lock(); // lock获取weak_ptr管理的对象对应的shared_ptrint delaytime = spt->DelayTime();int pos = (_tick + delaytime) % _capacity;_wheel[pos].push_back(spt);}// 取消定时任务,等时间到了,释放任务对象,但是析构里面的超时任务不去执行void TimerCancelInLoop(uint64_t id){auto it = _timers.find(id);if (it == _timers.end()){return;//没找着定时任务,没法刷新,没法延迟}// spt是临时对象,出栈后会自动销毁,不会增加引用计数SharedPtr spt = it->second.lock();//如果没有if判断,超时先去执行的释放任务,然后才删除unordered_map管理的对象,//释放任务里面有如果有定时任务就去取消这一步,因为对象还没有删除,所有会进来这个函数//但TimerTask对象正在析构的时候,没有if判断 就去执行spt->Cancel()程序就会奔溃if(spt) spt->Cancel();}public:TimeWheel(EventLoop *loop): _tick(0), _capacity(60), _wheel(_capacity), _loop(loop), _timerfd(CreateTimerFd()), _timerfd_channel(_loop, _timerfd){// timerfd读事件就绪回调函数_timerfd_channel.SetReadCallback(std::bind(&TimeWheel::OneTime, this));_timerfd_channel.EnableRead(); // 启动timerfd读事件监控}~TimeWheel() {}// 定时器中有个_timers成员,定时器信息的操作有可能在多个EventLoop线程中进行,因此需要考虑线程安全的问题// 但是我们又不想加锁,那就把对定时器的所有操作,都放在对应的线程中进行// 一个线程都是串行化执行的,那就不存在线程安全问题了void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb);void TimerRefresh(uint64_t id);void TimerCancel(uint64_t id);// 虽然这个接口存在线程安全问题,这个接口实际上不能被外界使用者调用,只能在模块EventLoop线程内执行bool HasTimer(uint64_t id){auto it = _timers.find(id);if (it == _timers.end()){return false;}return true;}
};

5.2 TimeWheel整合到EventLoop

// EvevtLoop模块,对事件进行监控,处理class EventLoop
{
private:using Functor = std::function<void()>;std::thread::id _thread_id;  // 线程IDint _event_fd;               // eventfd唤醒IO事件监控有可能导致的阻塞Channel _event_channel;      // 对_event_fd做事件管理Poller _poller;              // 对所有事件进行监控std::vector<Functor> _tasks; // 任务池std::mutex _mutex;           // 实现任务池操作的线程安全TimeWheel _time_wheel;       // 定时器模块public:// 执行任务池中所有任务void RunAllTask(){// 为了不让每次拿任务都加锁解锁,因此一次把任务都拿出来// 这也是不用队列做任务池的原因std::vector<Functor> functor;{std::unique_lock<std::mutex> _lock(_mutex);_tasks.swap(functor);}// 依次执行任务for (auto &f : functor){f();}}static int CreateEventFd(){int efd = eventfd(0, O_CLOEXEC | O_NONBLOCK);if (efd < 0){LOG_FATAL("CREATE EVENTFD FAILED!!");abort();}return efd;}void ReadEvent(){uint64_t res = 0;int ret = read(_event_fd, &res, sizeof res);if (ret <= 0){if (errno == EINTR || errno == EWOULDBLOCK){return;}LOG_FATAL("READ EVENTFD FAILED!!");abort();}}void WeakUpEventfd(){uint64_t val = 1;int ret = write(_event_fd, &val, sizeof val);if (ret < 0){if (errno == EINTR){return;}LOG_FATAL("WRITE EVENTFD FAILED!!");abort();}}public:EventLoop(): _thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()), _event_channel(this, _event_fd), _time_wheel(this){// 给eventfd添加可读事件的回调,就绪了,读取eventfd事件通知次数_event_channel.SetReadCallback(std::bind(&EventLoop::ReadEvent, this));// 启动eventfd读事件监控_event_channel.EnableRead();//std::cout<<"_event_fd: "<<_event_fd<<std::endl;};// 1.事件监控 2.就绪事件处理 3.执行任务void Statr(){while(1){// 1.事件监控std::vector<Channel *> active;_poller.Poll(&active);// 2.就绪事件处理for (auto &channel : active){channel->HandleEvent();}// 3.执行任务RunAllTask();}}// 判断当前线程是否是EventLoop对应的线程bool IsInLoop(){return _thread_id == std::this_thread::get_id();}// 判断将要执行的任务是否处于当前线程,如果是则执行,不是则压入队列void RunInLoop(const Functor &cb){if (IsInLoop()){return cb();}return QueueInLoop(cb);}void AssertInLoop() {assert(_thread_id == std::this_thread::get_id());}// 将操作压入任务池void QueueInLoop(const Functor &cb){{std::unique_lock<std::mutex> _lock(_mutex);_tasks.push_back(cb);}// 唤醒有可能因为没有事件就绪,而导致的epoll阻塞// 其实就是给eventfd写入一个数据,eventfd就会触发可读事件WeakUpEventfd();}// 添加/修改描述符的事件监控void UpdateEvent(Channel *channel){_poller.UpdateEvent(channel);}// 移除描述符监控void RemoveEvent(Channel *channel){_poller.RemoveEvent(channel);}// 添加定时任务void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb){return _time_wheel.TimerAdd(id, delay, cb);}// 刷新定时任务void TimerRefresh(uint64_t id){return _time_wheel.TimerRefresh(id);}// 删除定时任务void TimerCancel(uint64_t id){return _time_wheel.TimerCancel(id);}// 是否有某个定时任务存在bool HasTimer(uint64_t id){return _time_wheel.HasTimer(id);}
};//Chaneel模块之前只有EventLoop的声明,没有内部函数。因此需要放在EventLoop后面,编译才不会报错
void Channel::Remove()
{return _loop->RemoveEvent(this);
}void Channel::Update()
{return _loop->UpdateEvent(this);
}//同理TimeWheel模块也是一样
void TimeWheel::TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb)
{_loop->RunInLoop(std::bind(&TimeWheel::TimerAddInLoop, this, id, delay, cb));
}void TimeWheel::TimerRefresh(uint64_t id)
{_loop->RunInLoop(std::bind(&TimeWheel::TimerRefreshInLoop, this, id));
}void TimeWheel::TimerCancel(uint64_t id)
{_loop->RunInLoop(std::bind(&TimeWheel::TimerCancelInLoop, this, id));
}

5.1 EventLoop与线程结合

目标:将EventLoop模块与线程整合起来

EventLoop模块与线程是——对应的。

EventLoop模块实例化的对象,在构造的时候就会初始化_thread_id,
而后边当运行一个连接操作的时候判断当前是否运行在连接自己的所在EventLoop模块对应的线程中,就是将线程ID与EventLoop模块中的thread_id进行一个比较,相同就表示在同一个线程,不同就表示当前运行线程并不是自己这个连接所在EventLoop线程。

上面的含义:EventLoop模块在实例化对象的时候,必须在线程内部

EventLoop实例化对象时会设置自己的thread_id,如果我们先创建了多个EventLoop对象,然后创建了多个线程,将各个线程的id,重新给EventLoop进行设置。

存在问题:在构造EventLoop对象,到设置新的thread_id期间将是不可控的。因此我们必须先创建线程,然后在线程的入口函数中,去实例化EventLoop对象

造一个新的模块:LoopThread
这个模块的功能:将EventLoop与thread整合到一起

思想:

  1. 创建线程
  2. 在线程中实例化EventLoop对象

功能:可以向外部返回所实例化的EventLoop

//一个EventLoop一个线程,将EventLoop和线程整合在一起
class LoopThread
{private://互斥锁+条件变量,用于实现_loop获取的同步关系,避免线程创建了,但是_loop还没有实例化之前去获取_loopstd::mutex _mutex;//互斥锁std::condition_variable _cond;//条件变量EventLoop* _loop;//EventLoop指针变量,这个对象需要在线程内部实例化std::thread _thread;//EventLoop对应的线程private://实例化EventLoop对象,唤醒_cond有可能阻塞的线程,并且开始运行EventLoop模块的功能void ThreadEntry(){//由LoopThread管理这个变量的生命周期EventLoop loop;{std::unique_lock<std::mutex> _mtx(_mutex);_loop = &loop;_cond.notify_all();}loop.Statr();}public://创建线程,设定线程入口函数LoopThread():_loop(nullptr),_thread(std::thread(&LoopThread::ThreadEntry,this)){}//返回当前线程关联的EventLoop对象的指针,得到EventLoop指针相对于找到这个线程,把连接绑定到对应EventLoop线程中EventLoop* GetLoop(){EventLoop* loop = nullptr;{std::unique_lock<std::mutex> _mtx(_mutex);_cond.wait(_mtx,[&](){ return _loop != nullptr;});//_loop为nullptr就一直阻塞loop = _loop;}return loop;}
};

5.2 EventLoop线程池

针对LoopThread设计一个线程池:

LoopThreadPool模块:对所有的LoopThread进行管理及分配

功能:

  1. 线程数量可配置(0个或多个)
    注意事项:在服务器中,主从Reactor模型是主线程只负责新连接获取,从属线程负责新连接的事件监控及处理
    因此当前的线程池,有可能从属线程会数量为0,也就是实现单Reactor服务器,一个线程及负责获取连接,也负责连接的处理
  2. 对所有的线程进行管理,其实就是管理0个或多个LoopThread对象
  3. 提供线程分配的功能
    当主线程获取了一个新连接,需要将新连接挂到从属线程上进行事件监控及处理
    假设有0个从属线程,则直接分配给主线程的EventLoop,进行处理
    假设有多个从属线程,则采用RR轮转思想,进行线程的分配(将对应线程的EventLoop获取到,设置给对应的Connection)
//针对LoopThread做一个线程池
class LoopThreadPoll
{private:int _thread_count;//从属线程的数量int _next_loop_idx;//给连接分配那个EventLoopEventLoop* _base_loop;//主EventLoop,运行在主线程,从属EventLoop线程数量为0,则所有操作都在主EventLoop中进行std::vector<LoopThread*> _threads;//保存所有LoopThread对象std::vector<EventLoop*> _loops;//从属EventLoop线程数量大于0则从_loops进行线程EventLoop分配public:LoopThreadPoll(EventLoop* loop):_thread_count(0),_next_loop_idx(0),_base_loop(loop){};//设置从属EventLoop线程数量void SetThreadCount(int count){_thread_count = count;}//创建从属线程void Create(){if(_thread_count > 0){_threads.resize(_thread_count);_loops.resize(_thread_count);for(int i = 0; i < _thread_count; ++i){_threads[i] = new LoopThread();_loops[i] = _threads[i]->GetLoop();}}}//获取从属线程EvetnLoop指针EventLoop* NextLoop(){if(_thread_count == 0) {return _base_loop;}//RR轮转_next_loop_idx = (_next_loop_idx + 1) % _thread_count;return _loops[_next_loop_idx];}
};

6. Connection模块

在这里插入图片描述
Connection模块:

目的:对连接进行全方位的管理,对通信连接的所有操作都是通过这个模块提供的功能完成。

管理:

  1. 套接字的管理,能够进行套接字的操作
  2. 连接事件的管理,可读,可写,错误,挂断,任意
  3. 缓冲区的管理,便于socket数据的接收和发送
  4. 协议上下文的管理,记录请求数据的处理过程
  5. 回调函数的管理
    a. 因为连接接收到数据之后该如何处理,需要由用户决定,因此必须有业务处理回调函数
    b. 一个连接建立成功后,该如何处理,由用户决定,因此必须有连接建立成功的回调函数
    c. 一个连接关闭前,该如何处理,由用户决定,因此必须由关闭连接回调函数。
    d. 任意事件的产生,有没有某些处理,由用户决定,因此必须有任意事件的回调函数

功能:

  1. 发送数据—给用户提供的发送数据接口,并不是真正的发送接口,而只是把数据放到发送缓冲区,然后启动写事件监控
  2. 关闭连接—给用户提供的关闭连接接口,应该在实际释放连接之前,看看输入输出缓冲区是否有数据待处理
  3. 启动非活跃连接的超时销毁功能
  4. 取消非活跃连接的超时销毁功能
  5. 协议切换—一个连接接收数据后如何进行业务处理,取决于上下文,以及数据的业务处理回调函数
// 通用类型Any
class Any
{class holder{public:virtual ~holder() {}virtual const std::type_info &type() = 0;virtual holder *clone() = 0;};template <class T>class placeholder : public holder{public:placeholder(const T &val) : _val(val) {}// 获取子类对象保存的数据类型virtual const std::type_info &type(){return typeid(T);}// 针对当前的对象自身,克隆出一个新的子类对象virtual holder *clone(){return new placeholder(_val);}public:T _val;};private:holder *_content;public:Any() : _content(nullptr) {}~Any(){delete _content;}template <class T>Any(const T &val){_content = new placeholder<T>(val);}Any(const Any &other){_content = _content != nullptr ? other._content->clone() : nullptr;}// 返回子类对象保存的数据的指针template <class T>T *Get(){assert(typeid(T) == _content->type());// 父类指针指向子类中属于父类的,找不到子类中成员,因此需要转成子类指针return &(((placeholder<T> *)_content)->_val);}Any &swap(Any &other){std::swap(_content, other._content);return *this;}// 赋值运算符的重载函数template <class T>Any &operator=(const T &val){Any(val).swap(*this);return *this;}Any &operator=(const Any &other){Any(other).swap(*this);return *this;}
};class Connection;
// DISCONNECTED 连接关闭状态  CCONNECTING 连接建立成功成功,待处理状态
// CONNECTED 连接建立完成,各种设置已完成,可以通信状态     DISCONNECTING 连接待关闭状态
typedef enum
{DISCONNECTED,CONNECTING,CONNECTED,DISCONNECTING
} ConStatus;
//使用shared_ptr管理每个Connection对象,防止用户把Connection在某个地方删除
//最终导致程序奔溃退出
using PtrConnection = std::shared_ptr<Connection>;
//enable_shared_from_this<Connection>里面保存了一个weak_ptr,可以从weak_ptr得到一个shared_ptr
class Connection: public std::enable_shared_from_this<Connection>
{
private:uint64_t _conn_id; // 连接唯一ID,便于连接的管理和查找// uint64_t _timer_fd; // 定时器ID,必须是唯一,这块为了简化操作使用_conn_id作为定时器idint _sockfd;                   // 连接关联的文件描述符bool _enable_inactive_release; // 连接是否启动非活跃销毁的判断标志,默认为falseEventLoop *_loop;              // 连接所关联的一个EventLoopConStatus _status;             // 连接状态Socket _socket;                // 套接字管理Channel _channel;              // 连接事件管理Buffer _in_buffer;             // 输入缓存区,存放从socket中读取到的数据Buffer _out_buffer;            // 输出缓存区,存放经过业务处理要给对端发出的数据Any _context;                  // 请求接收处理上下文// 这四个回调函数,是让服务器模块来设置的(其实服务器模块的处理回调也是组件使用者设置的)// 换句话说,这⼏个回调都是组件使用者使用的//连接建立回调using ConnectedCallback = std::function<void(const PtrConnection &)>;//接收到数据回调using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;//关闭连接回调using ClosedCallback = std::function<void(const PtrConnection &)>;//任意事件回调using AnyeEventCallback = std::function<void(const PtrConnection &)>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyeEventCallback _event_callback;//组件内的连接关闭回调--组件内设置的,因为服务器组件内也会把所有的连接管理起来,⼀旦某个连接要关闭//就应该从管理的地⽅移除掉自己的信息ClosedCallback _server_closed_callback;private://五个channel的事件回调函数//描述符可读事件触发后调⽤的函数,接收socket数据放到接收缓冲区中,然后调⽤_message_callback进行业务处理void HandleRead(){   //1.接收socket缓存区数据,放到缓存区char buffer[65535];ssize_t ret = _socket.NonBlockRecv(buffer,65535);if(ret < 0){  //出错,不能直接关闭连接return ShutdownInLoop();}//这里的ret等于0表示的是没有读取到数据,⽽并不是连接断开了,连接断开返回的是-1//将数据放⼊输⼊缓冲区,写⼊之后顺便将写偏移向后移动_in_buffer.WriteAndPush(buffer,ret);//2.调用_message_callback进行业务处理if(_in_buffer.ReadAbleSize() > 0){if(_message_callback){//shared_from_this--从当前对象自身获取自身的shared_ptr管理对象return _message_callback(shared_from_this(),&_in_buffer);}}}//描述符可写事件出发后调用的函数,将发送缓存区中数据进行发送void HandleWrite(){//_out_buffer中保存的是待发送的数据ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(),_out_buffer.ReadAbleSize());if(ret < 0){//发送错误关闭连接,如果接收缓存区还有数据先处理一下if(_in_buffer.ReadAbleSize() > 0){if(_message_callback){//shared_from_this--从当前对象自身获取自身的shared_ptr管理对象_message_callback(shared_from_this(),&_in_buffer);}}//这时候就是实际的关闭释放操作了//先把释放连接任务压到任务队列中,等所有就绪事件处理后,在处理任务队列中释放连接任务//return ReleaseInLoop();return Release();}//千万不要忘了,将读偏移向后移动_out_buffer.MoveReadOffset(ret);if(_out_buffer.ReadAbleSize() == 0){_channel.DisableWrite();// 没有数据待发送了,关闭写事件监控if(_status == DISCONNECTING)//如果当前是连接待关闭状态,还有数据,发送完数据释放连接,没有数据则直接释放{//先把释放连接任务压到任务队列中,等所有就绪事件处理后,在处理任务队列中释放连接任务//return ReleaseInLoop();return Release();}}return;}//描述符触发挂断事件void HandleClose(){//⼀旦连接挂断了,套接字就什么都⼲不了了,因此有数据待处理就处理⼀下,完毕关闭连接if(_in_buffer.ReadAbleSize() > 0){if(_message_callback){//shared_from_this--从当前对象⾃⾝获取⾃⾝的shared_ptr管理对象_message_callback(shared_from_this(),&_in_buffer);}}//这时候就是实际的关闭释放操作了//先把释放连接任务压到任务队列中,等所有就绪事件处理后,在处理任务队列中释放连接任务//return ReleaseInLoop();return Release();}//描述符触发出错事件void HandleError(){return HandleClose();}//描述符触发任意事件: 1. 刷新连接的活跃度--延迟定时销毁任务; 2. 调⽤组件使用者的任意事件回调void HandleEvent(){if(_enable_inactive_release == true){_loop->TimerRefresh(_conn_id);}if(_event_callback){_event_callback(shared_from_this());}}//连接获取后,所处的状态下要进行各种设置(启动读事件监控,调用连接建立阶段回调函数)void EstablishedInLoop(){// 1. 修改连接状态assert(_status == CONNECTING);//当前的状态必须⼀定是上层的半连接状态_status = CONNECTED;//当前函数执⾏完毕,则连接进⼊已完成连接状态// 2. 启动读事件监控 _channel.EnableRead();// ⼀旦启动读事件监控就有可能会⽴即触发读事件,如果这时候启动了⾮活跃连接销毁//3. 调用组件调用者设置的连接建立阶段的回调函数if(_connected_callback){_connected_callback(shared_from_this());}}//这个接口才是实际的释放接口void ReleaseInLoop(){//1. 修改连接状态,将其置为DISCONNECTED_status == DISCONNECTED;//2. 移除连接的事件监控_channel.Remove();//3. 关闭描述符_socket.Close();//4. 如果当前定时器队列中还有定时销毁任务,则取消任务if(_loop->HasTimer(_conn_id)){CancelInactiveReleaseInLoop();}//5. 调用关闭回调函数,避免先移除服务器管理的连接信息导致Connection被释放,// 再去处理会出错,因此先调用用户的回调函数if(_closed_callback){_closed_callback(shared_from_this());}//移除服务器内部管理的连接信息if(_server_closed_callback){_server_closed_callback(shared_from_this());}}//这个接⼝并不是实际的发送接⼝,⽽只是把数据放到了发送缓冲区,启动了可写事件监控void SendInLoop(Buffer& buf){if(_status == DISCONNECTED) return;_out_buffer.WriteBufferAndPush(buf);if(_channel.WriteAble() == false){_channel.EnableWrite();}}//这个关闭操作并⾮实际的连接释放操作,需要判断还有没有数据待处理,待发送void ShutdownInLoop(){// 设置连接为半关闭状态_status = DISCONNECTING;if(_in_buffer.ReadAbleSize() > 0){if(_message_callback){_message_callback(shared_from_this(),&_in_buffer);}}//要么就是写⼊数据的时候出错关闭,要么就是没有待发送数据,直接关闭if(_out_buffer.ReadAbleSize() > 0){if(_channel.WriteAble() == false){_channel.EnableWrite();}}if(_out_buffer.ReadAbleSize() == 0){//ReleaseInLoop();Release();}}//启动⾮活跃连接超时释放规则void EnableInactiveReleaseInLoop(int sec){//1. 将判断标志 _enable_inactive_release 置为true_enable_inactive_release = true;//2. 如果当前定时销毁任务已经存在,那就刷新延迟⼀下即可if(_loop->HasTimer(_conn_id)){return _loop->TimerRefresh(_conn_id);}//3. 如果不存在定时销毁任务,则新增/* 业务处理超时,查看服务器的处理情况当服务器达到了一个性能瓶颈,在一次业务处理中花费了太长的时间(超过了服务器设置的非活跃超时时间)1. 在一次业务处理中耗费太长时间,导致其他的连接也被连累超时,其他的连接有可能会被拖累超时释放假设现在  12345描述符就绪了, 在处理1的时候花费了30s处理完,超时了,导致2345描述符因为长时间没有刷新活跃度1. 如果接下来的2345描述符都是通信连接描述符,如果都就绪了,则并不影响,因为接下来就会进行处理并刷新活跃度2. 如果接下来的2号描述符是定时器事件描述符,定时器触发超时,执行定时任务,就会将345描述符给释放掉这时候一旦345描述符对应的连接被释放,接下来在处理345事件的时候就会导致程序崩溃(内存访问错误)因此这时候,在本次事件处理中,并不能直接对连接进行释放,而应该将释放操作压入到任务队列中,等到事件处理完了执行任务池中的任务的时候,再去释放*/ //_loop->TimerAdd(_conn_id,sec,std::bind(&Connection::ReleaseInLoop,this));_loop->TimerAdd(_conn_id,sec,std::bind(&Connection::Release,this));}void CancelInactiveReleaseInLoop(){_enable_inactive_release = false;if(_loop->HasTimer(_conn_id)){_loop->TimerCancel(_conn_id);}}void UpgradeInLoop(const Any& context,const ConnectedCallback& conn,const MessageCallback& msg,const ClosedCallback& closed,const AnyeEventCallback& event){_context = context;_connected_callback = conn;_message_callback = msg;_closed_callback = closed;_event_callback = event;}public:Connection(EventLoop* loop,uint64_t conn_id,int sockfd):_loop(loop),_conn_id(conn_id),_sockfd(sockfd),_enable_inactive_release(false),_status(CONNECTING),_socket(_sockfd),_channel(_loop,_sockfd){_channel.SetReadCallback(std::bind(&Connection::HandleRead,this));_channel.SetWriteCallback(std::bind(&Connection::HandleWrite,this));_channel.SetErrorCallback(std::bind(&Connection::HandleError,this));_channel.SetCloseCallback(std::bind(&Connection::HandleClose,this));_channel.SetEventCallback(std::bind(&Connection::HandleEvent,this));}~Connection(){LOG_DEBUG("RELEASE CONNECTION:%p", this);}//获得管理的文件描述符int Fd(){return _sockfd;}//获取连接IDint Id(){return _conn_id;}//当前是否处于CONNECTED状态bool Connected(){return (_status == CONNECTED);}//设置上下文--连接建立完成时进行调用void SetContent(const Any& context){_context = context;}//获取上下文Any* GetContent(){return &_context;}void SetConnectedCallback(const ConnectedCallback& cb){_connected_callback = cb;}void SetMessageCallback(const MessageCallback& cb){_message_callback = cb;}void SetClosedCallback(const ClosedCallback& cb){_closed_callback = cb;}void SetAnyeEventCallback(const AnyeEventCallback& cb){_event_callback = cb;}void SetSevClosedCallback(const ClosedCallback& cb){_server_closed_callback = cb;}//多线程执行,涉及到线程安全的问题,我们都放在EventLoop线程下的任务队列中执行//连接建立就绪后,启动读监控,调用_connected_callbackvoid Established(){_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop,this));}//发送数据,实际是将数据放到发送缓存区,启动写事件监控void Send(const void* data,size_t len){//外界传⼊的data,可能是个临时的空间,我们现在只是把发送操作压⼊了任务池,有可能并没有被⽴即执⾏//因此有可能执⾏的时候,data指向的空间有可能已经被释放了Buffer buf;buf.WriteAndPush(data,len);//std::move(buf))是将右值参数(如 std::move(buf))会被移动构造到 std::bind 内部‌,生成一个 ‌独立存储的 Buffer 对象‌。//当异步任务执行时,std::bind 会将存储的 Buffer 对象以 "‌左值‌" 形式传递给 SendInLoop//所以void SendInLoop(Buffer& buf),Buffer应为左值引用//并且这里std::move(buf))想要减少一次拷贝,减少的是将buf拷贝到bind内部生成Buffer对象这次拷贝_loop->RunInLoop(std::bind(&Connection::SendInLoop,this,std::move(buf)));}//提供给组件使用者的关闭窗口,并不是真实关闭,需要判断输入输出有没有数据待处理void Shutdown(){_loop->RunInLoop(std::bind(&Connection::ShutdownInLoop,this));}void Release(){//定时超时释放连接的任务,压到任务队列中//甚至可以所有的释放连接的地方都先先把任务压到任务队列中,等所有就绪事件处理自后再去处理任务队列中的任务//这样可以把Channel类中HandleEvent中改改,不用每个事件执行之前都先去执行任意事件的回调,而是最后在执行任意事件回调//这样更符合逻辑,当事件就绪处理自后在刷新活跃度,如果超时在释放//现在不会在处理就绪事件的时候去释放连接了,所有不用担心因为释放连接销毁Connection对象而导致调用任意事件回调而导致程序奔溃了_loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop,this));}//启动非活跃消费,并定义多长时间五通信就是非活跃,添加定时任务void EnableInactiveRelease(int sec){_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop,this,sec));}//取消非活跃销毁void CancelInactiveRelease(){_loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop,this));}//切换协议---重置上下⽂以及阶段性回调处理函数 -- ⽽是这个接⼝必须在EventLoop线程中⽴即执⾏//防备新的事件触发后,处理的时候,切换任务还没有被执⾏--会导致数据使⽤原协议处理了。void Upgrade(const Any& content,const ConnectedCallback& conn,const MessageCallback& msg,const ClosedCallback& closed,const AnyeEventCallback& event){_loop->AssertInLoop();_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop,this,content,conn,msg,closed,event));}
};

7. Acceptor模块

在这里插入图片描述

Acceptor模块:对监听套接字进行管理

  1. 创建一个监听套接字
  2. 启动读事件监控
  3. 事件触发后,获取新连接
  4. 调用新连接获取成功后的回调函数
  5. 为新连接创建Connection进行管理(这一步不是Acceptor模块操作,应该是服务器模块)
    因为Acceptor模块只进行监听连接的管理,因此获取到新连接的描述符之后,对于新连接描述符如何处理其实并不关心

对于新连接如何处理,应该是服务器模块来管理的
服务器模块,实现了一个对于新连接描述符处理的函数,将这个函数设置给Acceptor模块中的回调函数

//Acceptor模块,对监听套接字进行管理
class Acceptor
{private:EventLoop* _loop;//对监听套接字进行监控Socket _socket;//创建监听套接字Channel _channel;//对监听套接字进行事件管理using AcceptCallback = std::function<void(int)>;AcceptCallback _accept_callback;private://监听套接字读事件就绪回调函数 --- 获取新连接,调用_accept_callback函数进行新连接处理void HandleRead(){int newfd = _socket.Accpet();if(newfd < 0){return;}if(_accept_callback) _accept_callback(newfd);}int CreateServer(uint16_t port){int ret = _socket.CreateServer(port);assert(ret == true);return _socket.Fd();}public:Acceptor(EventLoop* loop,uint16_t port):_loop(loop),_socket(CreateServer(port)),_channel(_loop,_socket.Fd()){_channel.SetReadCallback(std::bind(&Acceptor::HandleRead,this));}void SetAcceptCallback(const AcceptCallback& cb){_accept_callback = cb;}//不能将启动读事件监控,放到构造函数,必须设置在回调函数后,再去启动//否则有可能造成启动监控之后,立刻就有了事件,处理的时候,回调函数还没有设置,新连接得不到处理,且资源泄漏void Listen(){_channel.EnableRead();}      
};

8. TcpServer模块

在这里插入图片描述
TcpServer模块:对所有模块的整合,通过TcpServer模块实例化的对象,可以非常简单的完成一个服务器的搭建

管理:

  1. Acceptor对象,创建一个监听套接字
  2. EventLoop对象,baseloop对象,实现对监听套接字的事件监控
  3. std:unordered_map<uint64_t, PtrConnection>_conns,实现对所有新建连接的管理
  4. LoopThreadPool对象,创建loop线程池,对新建连接进行事件监控及处理

功能:

  1. 设置从属线程池数量
  2. 启动服务器
  3. 设置各种回调函数(连接建立完成,消息,关闭,任意),用户设置给TcpServer,TcpServer设置给获取的新连接
  4. 是否启动非活跃连接超时销毁功能
  5. 给baseloop添加定时任务功能(如果用户需要的话可以设置)

流程:

  1. 在TcpServer中实例化一个Acceptor对象,以及一个EventLoop对象(baseloop)
  2. 将Acceptor挂到baseloop上进行事件监控
  3. 一旦Acceptor对象就绪了可读事件,则执行读事件回调函数获取新建连接
  4. 对新连接,创建一个Connection进行管理
  5. 对连接对应的Connection设置功能回调(连接完成回调,消息回调,关闭回调,任意事件回调)
  6. 启动Connection的非活跃连接的超时销毁规则
  7. 将新连接对应的Connection挂到LoopThreadPool中的从属线程对应的Eventloop中进行事件监控
  8. 一旦Connection对应的连接就绪了可读事件,则这时候执行读事件回调函数,读取数据,读取完毕后调用TcpServer设置的消息回调
class TcpServer
{private:uint64_t _next_id; //这是一个自动增长的连接IDuint16_t _port;//服务器端口int _timeout;//这是非活跃连接的统计时间---多长时间无通信就是非活跃连接bool _enable_inactive_release;//是否启动了非活跃连接超时销毁的判断标志EventLoop _baseloop;//这是主线程的EventLoop对象,负责监听事件的处理Acceptor _acceptor;//这是监听套接字的管理对象LoopThreadPoll _pool;//这是从属EventLoop线程池std::unordered_map<uint64_t,PtrConnection> _conns;//保存管理所有连接对应的shared_ptr对象using Functor = std::function<void()>;//连接建立回调using ConnectedCallback = std::function<void(const PtrConnection &)>;//接收到数据回调using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;//关闭连接回调using ClosedCallback = std::function<void(const PtrConnection &)>;//任意事件回调using AnyeEventCallback = std::function<void(const PtrConnection &)>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyeEventCallback _event_callback;private://为新连接构造一个Connection进行管理void NewConnection(int fd){LOG_DEBUG("NEW CONNECTION");_next_id++;PtrConnection conn(new Connection(_pool.NextLoop(),_next_id,fd));conn->SetConnectedCallback(_connected_callback);conn->SetClosedCallback(_closed_callback);conn->SetMessageCallback(_message_callback);conn->SetAnyeEventCallback(_event_callback);conn->SetSevClosedCallback(std::bind(&TcpServer::RemoveConnection,this,std::placeholders::_1));if(_enable_inactive_release) conn->EnableInactiveRelease(_timeout);//启动非活跃销毁conn->Established();//就绪初始化_conns.insert(std::make_pair(_next_id,conn));}void RemoveConnectionInLoop(const PtrConnection& conn){   //int id = conn->Id();auto it = _conns.find(conn->Id());if(it != _conns.end()){_conns.erase(it);}}//给Connection模块的_server_closed_callback设置一个回调函数//从管理Connection的_conns中移除管理信息//这里才是真正释放Connection的地方void RemoveConnection(const PtrConnection& conn){_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop,this,conn));} void RunAfterInLoop(const Functor& task, int delay){_next_id++;_baseloop.TimerAdd(_next_id,delay,task);}public:TcpServer(uint16_t port):_port(port),_next_id(0),_timeout(0),_enable_inactive_release(false),_acceptor(&_baseloop,_port),_pool(&_baseloop){_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection,this,std::placeholders::_1));_acceptor.Listen();}//设置从属EventLoop线程数量void SetThreadCount(int count){_pool.SetThreadCount(count);}void SetConnectedCallback(const ConnectedCallback& cb){_connected_callback = cb;}void SetMessageCallback(const MessageCallback& cb){_message_callback = cb;}void SetClosedCallback(const ClosedCallback& cb){_closed_callback = cb;}void SetAnyeEventCallback(const AnyeEventCallback& cb){_event_callback = cb;}//启动非活跃定时销毁任务void EnableInactiveRelease(int timeout){_enable_inactive_release = true;_timeout = timeout;}//提供给主线程添加定时任务的接口,是否调用由用户决定void RunAfter(const Functor& task, int delay){_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop,this,task,delay));}//启动服务器void Start(){//创建从属EventLoop线程池_pool.Create();//启动监听套接字读事件监控_baseloop.Statr();}     
};

相关文章:

  • 科技快讯 | 智谱开源最新GLM模型系列;“AI 洗头店”现身广州;ChatGPT上线图库功能
  • 虚拟卡可以解决订阅 ChatGPT 时无法付款的问题
  • 基于 ZYNQ MPSOC 异构平台的道路交通目标检测设计,也支持RK3588+FPGA
  • 5.9 《GPT-4调试+测试金字塔:构建高可靠系统的5大实战策略》
  • 当纺织车间遇上“数字魔法”--天拓四方飞鸟物联平台+边缘计算采集网关的智造革命
  • 记录待办事项的便签软件有没有推荐的?
  • TailwindCss快速上手
  • Halcon应用:九点标定-手眼标定
  • 可灵AI进入2.0时代,全新视频生成模式将怎么改变市场?
  • Flutter 从零到一
  • Node.js 文件读取与复制相关内容
  • 自然科技部分详解
  • Java开发中的设计模式之观察者模式详细讲解
  • 音频炼金术:Threejs 让 3D 场景「听」起来更真实
  • 邀请函 | 知从科技邀您共赴2025上海车展
  • 【学习笔记】计算机网络(八)—— 音频/视频服务
  • Qwen2.5-Omni 部署框架选择指南:PyTorch vs. TensorFlow 深度对比
  • MCP协议,.Net 使用示例
  • Runnable和Callable接口的区别【简单易懂】
  • 营销自动化实战指南:如何用全渠道工作流引爆线索转化率?
  • 西宁网站建设君博正规/陕西整站关键词自然排名优化
  • 户外网站 整站下载/打开网站搜索
  • 网站流程示意/网站推广怎么推广
  • 网站界面是什么做的/seo专业优化方法
  • 外贸平台网站的营销方式/口碑营销的产品
  • 深圳金融投资网站建设/seo建站公司