当前位置: 首页 > news >正文

【基于one-loop-per-thread的高并发服务器】--- Server模块

 Welcome to 9ilk's Code World

       

(๑•́ ₃ •̀๑) 个人主页:       9ilk

(๑•́ ₃ •̀๑) 文章专栏:     项目 


本篇博客主要是对SERVER模块进行梳理总结,SERVER模块的最终目的是封装成一个主从Reactor模型的支持协议替换的高性能TCP服务器。

Buffer模块

        该模块是一个缓冲区模块,用来实现通信中用户态的接收缓冲区和发送缓冲区,其主要的功能就是存放外界数据/供外界读取数据。

        我们的主要实现思想是用STL容器vector<char>作为一块内存空间维护,它底层是线性的方便维护。同时还要维护两个指针,一个读指针read,一个写指针write。一个缓冲区可读区域的写指针与读指针之间的距离,因此write一般是大于等于read的。而一个缓冲区可写区域大小,除了写指针后续的内存空间,其实读指针之前的也算是空闲空间,也可以覆盖写。

        基于此,在扩容的时候,如果在写入数据时,写指针后续剩余空闲空间不足,首先应该考虑整体缓冲区空闲空间(写指针后+读指针前)是否足够,足够则将原来区域的可读数据移动到起始位置 ,更新指针,然后再写入新数据;如果整体空闲空间不足,则进行扩容,从当前写位置开始扩容足够大小,再写入新数据,更新指针。

class Buffer
{
private:vector<char> _buffer;//使用vector进行内存管理uint64_t _read_index;//读位置uint64_t _write_index;//写位置public:Buffer():_buffer(defaultbuffersize),_read_index(0),_write_index(0){}//提供一个公共的缓冲区起始位置: 获取起始地址还可以使用data() at(0) [0] front()char* Begin(){return &*_buffer.begin();}//当前读位置 char*  ReadPos(){return Begin() + _read_index;} //当前写位置char*  WritePos(){return Begin() + _write_index;}//移动读指针void MoveReadIdx(uint64_t len){if(len == 0) return;assert(len<=ReadAbleSize());//要读取长度不可超过可以读取数据大小_read_index += len;}//移动写指针void MoveWriteIdx(uint64_t len){if(len == 0) return;//1.需要先保证后续空闲空间足够assert(TailSpace()>=len);//2.移动写指针_write_index += len;}//获取可读数据大小uint64_t ReadAbleSize(){return _write_index - _read_index;}//获取缓冲区末尾空闲空间大小---写偏移之后的空闲空间uint64_t TailSpace(){return  _buffer.size()-_write_index;}//获取缓冲区起始空闲空间大小 --- 读偏移之前的空闲空间uint64_t HeadSpace(){return _read_index;}//确保可写空间足够(移动+扩容)void EnsureWriteSpace(uint64_t len){//1.先判断末尾空间大小是否足够if(len<=TailSpace()) return;//2.末尾空间不足,再加上起始空闲空间判断,此时向后移动if(len<=TailSpace()+HeadSpace()){uint64_t readsize = ReadAbleSize();//先保存可读数据位置std::copy(ReadPos(),ReadPos()+readsize,Begin());_read_index = 0;_write_index = readsize;return;}//3.起始+末尾空闲时间都不足,则扩容_buffer.resize(_write_index+len);}//写入数据void Write(const void*data,uint64_t len){if(len == 0) return;//1.确保空间足够EnsureWriteSpace(len);//2.拷贝数据const char* d = (const char*)data;std::copy(d,d+len,WritePos());}//读取数据void Read(void* buf,uint64_t len){//1.确保可读数据足够assert(len<=ReadAbleSize());//2.读取数据到bufstd::copy(ReadPos(),ReadPos()+len,(char*)buf);}//清空缓冲区void clear(){_write_index = _read_index = 0;}//将string内容写入 void WriteString(const string& data){Write(data.c_str(),data.size());}//将另一个Buffer对象内容写入void WriteBuffer(Buffer& data){  Write(data.ReadPos(),data.ReadAbleSize());}//缓冲区数据当作string返回string ReadAsString(uint64_t len){assert(len<=ReadAbleSize());//确保len不超过可读数据大小string str;str.resize(len);Read(&str[0],len);return str;}//读完并弹出void ReadAndPop(void*buf,uint64_t len){  //读取Read(buf,len);//移动指针MoveReadIdx(len);}uint64_t Size(){return _buffer.size();}//读取到string并弹出string ReadStringAndPop(uint64_t len){//读取到字符串string str = ReadAsString(len);MoveReadIdx(len);return str;}//写入数据并弹出void WriteAndPush(const void* data,uint64_t len){Write(data,len);MoveWriteIdx(len);}//写入string并弹出void WriteStringPush(const string& data){WriteString(data);MoveWriteIdx(data.size());}//写入另一个Buffer对象并弹出void WriteBufferPush(Buffer& data) {WriteBuffer(data);MoveWriteIdx(data.ReadAbleSize());}//获取一行数据//1.获取换行符位置char* FindCRLF(){void* res = memchr(ReadPos(),'\n',ReadAbleSize());return (char*)res;}//2.获取一行string GetLine(){char* pos = FindCRLF();if(pos == nullptr) return "";//把换行也取出来return ReadAsString(pos-ReadPos()+1);}//3.获取一行并弹出string GetLineAndPop(){string str = GetLine();MoveReadIdx(str.size());return str;}
};

Socket模块

        这个模块主要是对Socket操作的封装,方便灵活操作。主要封装的功能如下:

  • 创建套接字
  • 绑定地址信息
  • 开始监听
  • 向服务器发起连接
  • 获取新连接
  • 接收数据
  • 发送数据
  • 关闭套接字
  • 设置套接字选项(开启地址端口重用)
  • 设置套接字阻塞属性 (设置为非阻塞,使用套接字接收数据时,取到没有数据为止,但套接字默认是阻塞操作,当缓冲区没有数据就会阻塞)

除了上述功能,为了更方便创建连接,因此可以再封装两个集成功能:

  • 创建一个服务端连接
  • 创建一个客户端连接
class Socket
{
private:int _sockfd;
public:Socket():_sockfd(-1){}~Socket(){Close();}Socket(int fd):_sockfd(fd){}int Fd(){return _sockfd;}//创建套接字bool Create(){_sockfd = ::socket(PF_INET,SOCK_STREAM,0);if(_sockfd < 0){ERR_LOG("创建套接字失败!\n");return false;}return true;}//绑定地址信息bool Bind(const string& ip,uint16_t port){struct sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(addr);int n = ::bind(_sockfd,(struct sockaddr*)&addr,len);if(n < 0){ERR_LOG("绑定失败!\n");return false;}return true;}//开始监听bool listen(int backlog = MAX_LISTEN){int n = ::listen(_sockfd,backlog);if(n < 0){ERR_LOG("监听失败\n");return false;}return true;}//向服务器发起链接bool Connect(const string& ip,uint16_t port){struct sockaddr_in addr = {0};addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(addr);int n = ::connect(_sockfd,(struct sockaddr*)&addr,len);if(n < 0){ERR_LOG("连接失败!");return false;}return true;}//获取新连接int Accept() //新连接直接返回fd方便后续操作{int newfd = ::accept(_sockfd,NULL,NULL);if(newfd < 0){ERR_LOG("获取新连接失败\n");return -1;}return newfd ;}//关闭套接字void Close(){if(_sockfd != -1){::close(_sockfd);_sockfd = -1; }}//创建一个服务端连接:默认非阻塞bool CreateServer(uint16_t port,const string& ip = "0.0.0.0",bool flag = 0) //服务器默认接收所有网卡的数据{//1.创建套接字 2.bind地址信息 3.监听 4.开启地址复用 5.设置非阻塞if(Create() == false) {ERR_LOG("Create Error");return false;}if(flag) NonBlock();ReuseAddress();if(Bind(ip,port) == false){ERR_LOG("Bind Error");return false;}if(listen() == false){ERR_LOG("Listen Error");return false;}return true;}//创建一个客户端连接bool CreateClient(uint16_t port,const string& ip){//1.创建套接字 2.连接if(Create() == false) return false;if(Connect(ip,port) == false) return false;return true;}//设置套接字选项 --- 开启地址端口重用void ReuseAddress(){int val = 1;setsockopt(_sockfd,SOL_SOCKET,SO_REUSEADDR,(void*)&val,sizeof(int));val = 1;setsockopt(_sockfd,SOL_SOCKET,SO_REUSEPORT,(void*)&val,sizeof(int));}//设置套接字阻塞属性void NonBlock(){int flag = ::fcntl(_sockfd,F_GETFL,0);::fcntl(_sockfd,F_SETFL,flag|O_NONBLOCK);}//接收数据ssize_t Recv(void* buf,size_t len,int flag = 0) //缺省为0,默认阻塞读写{ssize_t ret = ::recv(_sockfd,buf,len,flag);if(ret <= 0){   //EAGAIN:当前socket接收缓冲区中没有数据,在非阻塞情况下才有这个错误//EINTER:表示当前scoket的阻塞等待,被信号打断if(errno == EAGAIN || errno == EINTR)return 0;ERR_LOG("socket recv failed\n"); return -1; }return ret;}//发送数据 ssize_t Send(const void* buf,size_t len,int flag = 0){if(len == 0) return 0;ssize_t ret = ::send(_sockfd,buf,len,flag);if(ret < 0){if(errno == EINTR || errno == EAGAIN)return 0;ERR_LOG("scoket send error\n");return -1;}return ret;}//非阻塞接收数据ssize_t NonBlockRecv(void*buf,size_t len){return Recv(buf,len,MSG_DONTWAIT);}//非阻塞发送数据ssize_t NonBlockSend(void*buf,size_t len){return Send(buf,len,MSG_DONTWAIT);}
};

Channel模块

        该模块主要是对描述符/连接的事件监控管理,它主要完成的功能是下面几大类:

1. 判断当前描述符是否监控了某事件

2. 对当前描述符开启某事件的监控

3. 对当前描述符关闭对某事件的监控

4. 设置事件回调

5. 事件分发/事件出来:一旦连接事件触发,根据不同的事件调用不同的函数

由于我们采用的是epoll机制来进行事件监控,因此我们需要了解其相关的事件标志:

  • EPOLLIN:fd可读
  • EPOLLOUT:fd可写
  • EPOLLRDHUB:fd连接断开(TCP连接被对方关闭或半关闭)
  • EPOLLPRI:fd有紧急/带外数据可读
  • EPOLLHUB:fd挂断,比如socket被关闭,通常可以理解为描述符已经失效或关闭,不再使用。
  • EPOLLERR:fd发生错误,不能作为事件请求(即不能监控该种事件),只能作为返回事件。

        这几个事件都是用uint32_t类型保存的,通过是一个bitmask,每个事件占一个bit位,因此可以通过位运算来组合或判断事件,同时我们需要为这几种事件注册对应的回调函数来处理。

class Channel
{
private:int _fd;uint32_t _events;//当前链接需要监控的事件uint32_t _revnts; //当前链接触发的事件using EventCallBack = std::function<void()>;EventCallBack _write_callback; //可读事件回调EventCallBack _read_callback;  //可写事件回调EventCallBack _error_callback; //错误事件回调EventCallBack _close_callback; //关闭事件回调EventCallBack _any_callback;   //任意事件回调
public:Channel(EventLoop* loop,int fd = -1):_fd(fd),_events(0),_revnts(0){}  int Fd(){return _fd;}uint32_t Events()//监控的事件{return _events;}bool ReadAble()//当前是否监控可读:判断对应事件的比特位是否为1{return _events&EPOLLIN; }bool WriteAble()//当前是否监控可写{return _events&EPOLLOUT;}void EnableRead()//启动读事件监控:将对应比特位置1{_events |= EPOLLIN;}void EnableWrite() //启动写事件监控{_events |= EPOLLOUT;}void DisableRead()//关闭读事件监控:将对应比特位置0{_events &= ~EPOLLIN;}void DisableWrite() //关闭写事件监控{_events &= ~EPOLLOUT;}void DisableAllEvent()// 关闭任意事件监控:全设置为0{_events = 0;}void Update();void Remove();  //移除监控:目前无法实现,后面调用eventloop移除监控void SetReadCallBack(const EventCallBack& cb) //设置读事件回调{_read_callback = cb;}void SetWriteCallBack(const EventCallBack& cb) //设置读事件回调{_write_callback = cb;}void SetErrorCallBack(const EventCallBack& cb) //设置读事件回调{_error_callback = cb;}void SetCloseCallBack(const EventCallBack& cb) //设置读事件回调{_close_callback = cb;}void SetAnyCallBack(const EventCallBack& cb) //设置读事件回调{_any_callback = cb;}void HandlerEvent() //事件处理:一旦链接事件触发就调用该函数,自己触发了什么事件如何处理由自己决定{if(_revnts & EPOLLIN || _revnts & EPOLLRDHUP || _revnts & EPOLLPRI) //断开链接认为要求上层读取数据,关闭链接后没有数据发送了{if(_any_callback) _any_callback();if(_read_callback) _read_callback();}//有可能链接释放的事件,一次只处理一个因此使用else ifif(_revnts & EPOLLOUT){if(_any_callback) _any_callback();if(_write_callback) _write_callback();}else if(_revnts & EPOLLERR){if(_any_callback) _any_callback();if(_error_callback) _error_callback();}else if(_revnts & EPOLLHUP){if(_any_callback) _any_callback();if(_close_callback) _close_callback();}}
//当Poller模块监控到事件发生时,需要通知Channel哪些事件触发,再去根据对应的事件进行回调,所以我/们需要设置一个SetREvents()告知Channel什么事件发生了void SetREvents(uint32_t revnts){_revnts = revnts;}
};

   注意:目前我们只是在channel模块内部对该描述符的events和revents进行简单的设置,未来channel模块一定是要和Poller模块结合的,因为设置了事件监控,就要渗透到内核,也就是使用epoll模型来监控,移除监控也一样,而Poller模块就是对epoll操作的封装。

HandlerEvent说明:

1. 对于EPOLLIN、EPOLLRDHUP、EPOLLPRI我们都认为需要读取数据了。EPOLLRDHUP表示连接断开,此时需要上层读取数据,因为对方不可能再发送数据了。

2. 对于可写事件,是可能写入出错,导致链接释放的,此时如果继续往下执行EPOLLERR和EPOLLHUP的事件回调这样是会出错的,因此对于有可能导致链接释放的事件,一次只处理一个,安全第一,下次再处理,因此使用else if。

3. 对于可读事件和可写事件,它们在处理之后是需要调用任意事件回调的,主要是为了刷新连接的活跃度,我们目前认为他们出错也是释放链接的,因此将任意事件回调放在之前调。

4. 对于EPOLLHUP或者EPOLLERR,一旦出错就需要释放连接,因此需要在它们各自的事件回调之前调用任意回调。

Poller模块

        这个模块其实是真正的fd事件监控模块,本质是对epoll封装实现对事件的监控操作,比如添加/修改/移除描述符的事件监控。

        既然是对epoll封装,那一定是要有一个对epoll模型操作的fd,同时还要有一个struct epoll_event结构体数组,保存所有触发事件的描述符信息。最后还需要一个hash表管理描述符以及描述符对应的事件管理对象channel,当Poller监控到事件发生了,就可以通过channel对象设置revents进行通知,此时channel就可以在HandlerEvent内部进行事件派发。

        因此Poller和Channel模块主要结合的流程是:

  • Poller通过Channel知道描述符需要监控什么事件,然后对描述符进行事件监控,然后存到将fd和Channel存到hash表中。
  • 当描述符事件就绪,通过描述符在hash表中找到对应的Channel,这样才知道什么事件如何处理。
#define MAX_EPOLLSIZE 1024
class Poller
{
private:int _epfd;struct epoll_event _events[MAX_EPOLLSIZE];unordered_map<int,Channel*> _channels; //一个fd对应一个channel
private: //为实现对外功能的私有接口bool HasChannel(Channel*channel) //一个Channel是否已经添加了事件监控{auto it = _channels.find(channel->Fd());if(it == _channels.end()) return false;return true;}void Update(Channel*channel,int op) //本质就是对epoll进行操作--epoll_ctl{struct epoll_event event;event.events = channel->Events();event.data.fd = channel->Fd();int ret = ::epoll_ctl(_epfd,op,channel->Fd(),&event);if(ret < 0){ERR_LOG("epoll_ctrL error");return;}    }
public: //对外开放接口Poller(){_epfd = ::epoll_create(MAX_EPOLLSIZE);if(_epfd < 0){ERR_LOG("Epoll Create Error");exit(1);}}//添加或修改监控事件void UpdateEvent(Channel* channel){//1.先查询是否channel添加了事件监控bool ret = HasChannel(channel);if(ret == false) //不存在则添加{ _channels[channel->Fd()] = channel;Update(channel, EPOLL_CTL_ADD);return;}//存在则修改Update(channel,EPOLL_CTL_MOD);}   //移除监控事件void RemoveEvent(Channel* channel){//1.从哈希表中移除auto it = _channels.find(channel->Fd());if(it != _channels.end())_channels.erase(it);//2.移出红黑树Update(channel,EPOLL_CTL_DEL);   }void Poll(vector<Channel*>* active) //开始监控返回活跃链接{int ret = ::epoll_wait(_epfd,_events,MAX_EPOLLSIZE,-1);//-1表示调用阻塞直到有事件发生if(ret < 0){if(ret == EINTR) return;//信号打断ERR_LOG("epoll_wait error:%s\n",strerror(errno));return;}   for(int i = 0; i < ret ; i++){auto it = _channels.find(_events[i].data.fd);assert(it != _channels.end());//说明channel管理出现问题it->second->SetREvents(_events[i].events); //设置实际就绪事件active->push_back(it->second); //放进活跃对立 }}
};

具体Channel模块和Poll模块怎么结合呢?之前我们在Channel模块提供了设置启动/关闭对某事件监控的接口,我们只是对_events进行修改,但是还要渗透到内核,也就是添加到epoll模型,因此Channel内部需要封装一个Poller模块,修改完events字段之后,使用Poller添加到epoll里:

   void EnableRead()//启动读事件监控:将对应比特位置1{_events |= EPOLLIN;Update();}void EnableWrite() //启动写事件监控{_events |= EPOLLOUT;Update();}void DisableRead()//关闭读事件监控:将对应比特位置0{_events &= ~EPOLLIN;Update();}void DisableWrite() //关闭写事件监控{_events &= ~EPOLLOUT;Update();}void Channel::Remove(){_poller->RemoveEvent(this);}void Channel::Update(){_poller->UpdateEvent(this);}

下面我们就可以根据Socket、Poller、Channel这几个模块实现一个简单的单Reactor模型,主要流程如下:

1. 创建监听Socket和Channel对象,Poller对象,将监听套接字添加到Poller中进行管理,设置好可读事件回调(即获取新链接),然后开启可读事件监控。

2. 循环监控,当listen套接字事件就绪,此时就会将新链接获取上来,也为其封装一个Channel对象添加到Poller对象进行事件管理,然后设置好对应的回调,就可以开启事件监控了。

3. 后续就是Poller不断监控监听连接和通信连接的事件。

#include"server.hpp"void HandlerClose(Channel* channel)
{cout << "close:" << channel->Fd() << endl;channel->Remove();//移除监控delete channel;//释放文件描述符
}void HandlerAny()
{cout << "产生了一个事件!" << endl;return;
}void HandlerRead(Channel* channel)
{int fd = channel->Fd();char buff[1024] = {0};int ret = recv(fd,buff,1023,0);if(ret <= 0){cout << "链接释放" << endl;return HandlerClose(channel);}//链接释放channel->EnableWrite();//启动可写事件cout <<"client say: " << buff << endl;
}void HandlerWrite(Channel* channel)
{int fd = channel->Fd();char* data = "hello Client..";int ret = ::send(fd,data,strlen(data),0);if(ret <= 0){return HandlerClose(channel);}channel->DisableWrite();//关闭写监控
}//监听套接字触发可读事件时,获取新链接 为新链接设置回调,开启他们的可读事件
void Acceptor(Poller* poller,Channel* lis_channel) 
{int fd = lis_channel->Fd();int newfd = accept(fd,NULL,NULL);if(newfd < 0){cout << "accept error" << endl;return;}cout << "gain a new link" << endl;Channel* new_channel = new Channel(poller,newfd);new_channel->SetReadCallBack(std::bind(HandlerRead,new_channel));new_channel->SetWriteCallBack(std::bind(HandlerWrite,new_channel));new_channel->SetErrorCallBack(std::bind(HandlerClose,new_channel));new_channel->SetCloseCallBack(std::bind(HandlerClose,new_channel));new_channel->SetAnyCallBack(HandlerAny);new_channel->EnableRead();
}int main()
{ Poller poller;Socket lis_sock;bool ret = lis_sock.CreateServer(8081);if(ret == false) {ERR_LOG("Create Server Error");abort();}//为监听套接字创建一个Channel对象,对其进行事件监控Channel lis_channel(&poller,lis_sock.Fd());lis_channel.SetReadCallBack(std::bind(Acceptor,&poller,&lis_channel));lis_channel.EnableRead();while(1){   //开始监控vector<Channel*> actives;poller.Poll(&actives);for(const auto& a : actives){a->HandlerEvent();}sleep(2);}lis_sock.Close();return 0;
}
#include"server.hpp"int main()
{Socket cli_sock;bool ret = cli_sock.CreateClient(8081,"127.0.0.1");if(ret == false){ERR_LOG("create newlink error");abort();}cout << "Create Client Success"<< endl;while(1){string str = "hello world";cli_sock.Send(str.c_str(),str.size());char buf[1024] = {0};int size = cli_sock.Recv(buf,1023);buf[size] = 0;DBG_LOG("client recv:%s",buf);sleep(2);}return 0;
}

流程图:

EventLoop模块

        EventLoop模块相当于是对前面几个模块的整合。

eventfd

在梳理EventLoop模块之前,我们需要对eventfd进行一个大概的了解。

DESCRIPTIONeventfd - create a file descriptor for event notificationNAMEeventfd - create a file descriptor for event notificationSYNOPSIS#include <sys/eventfd.h>int eventfd(unsigned int initval, int flags);
  •   eventfd用来创建一个文件描述符用于事件通知,是一种Linux提供的事件通知机制。与信号不同的是,信号是针对进程进行事件通知的,但一个信号被进程的哪个线程处理事不一定的,而eventfd就可以被用来在EventLoop模块中实现线程间的事件通知,毕竟一个EventLoop是和一个线程绑定的。
  • 参数

1. initval:计数初值,不一定要设置为0

2. flags: 一般设置以下两个标志位

EFD_CLOEXEC --- 禁止进程复制,表示返回的eventfd文件描述符在fork后exec其他程序时会自动关闭这个文件描述符。

EFD_NONBLOCK --- 启动非阻塞属性

  • 返回值:返回一个文件描述符用于操作eventfd,因此eventfd也是可以通过read/write/close进行操作的。注意:read&write进行I/O的时候数据只能是一个8字节数据
  • eventfd事件通知的原理:它本质是内核管理的一个计数器,创建eventfd就会在内核中创建对应的计数器结构,计数不为0,那么它会触发可读事件,read之后计数清零,write则会递增计数器。也就是说,eventfd计数不为0就是一直可读,而可写事件是一直可写(因为可以一直累计计数),因此eventfd如果使用epoll监控事件,那么都是监控读事件,因为监控写事件无意义。
  • 对于 eventfd 呢?它的阻塞有可能是怎么样的?read eventfd 的时候,如果计数器的值为 0,就会阻塞(这种就等同于没“文件”内容)。这种可以设置 fd 的属性为非阻塞类型,这样读的时候,如果计数器为 0 ,返回 EAGAIN 即可,这样就不会阻塞整个系统

Q:为什么使用eventfd,而不使用普通的fd进行事件通知呢?

  • 不是所有的 fd 类型都可用 epoll 池来监听事件的,只有实现了 file_operation->poll 的调用的“文件” fd 才能被 epoll 管理。eventfd 刚好就实现了这个接口。

设计思想

        EventLoop是需要对事件监控和事件处理的,但是如果这个描述符在多个线程中都触发了事件进行处理,此时是会存在线程安全问题的,因此这个模块一定是和线程一一对应的,需要将一个连接的事件监控,以及连接事件处理和其他操作都放在同一个线程中处理。具体应该如何处理呢,如何保证对链接的操作和处理回调函数中的操作都在线程中?

        muduo库中对于出现的这种问题,为了高性能不加锁,采用了一种设计方案:将对fd的操作封装成一个任务,压入这个EventLoop线程对应的任务队列里,等到监控的事件结束之后,再把任务队列中的任务拿出来执行,这样就只需对任务队列加锁,转移了锁的粒度,所有操作都变成单线程串行执行。因此在EventLoop内部线程具体的工作流程为:

1. 在线程中对描述符进行事件监控。

2. 有描述符就绪则对描述符进行事件处理。

3. 所有就绪事件处理完了,此时再去将任务队列中的所有任务一一执行。

但是需要注意的是,在上面的流程中,由于epoll的事件监控(step 2),可能会因为没有事件到来而持续阻塞,导致任务队列中的任务不能及时得到执行(step 3),因此我们使用eventfd添加到Poller的事件监控中,用于实现每次向任务队列添加任务的时候,通过向eventfd写入数据来唤醒epoll的阻塞

class EventLoop
{private:std::thread::id _thread_id; //线程idint _eventfd; //eventfd唤醒事件监控可能导致的阻塞std::unique_ptr<Channel> _event_channel;//为eventfd创建一个channel进行事件监控Poller _poller;//进行所有描述发的事件监控using Functor = function<void()>;vector<Functor> _tasks; //任务池std::mutex _mtx; //保证任务队列线程安全的锁static int CreateEfd()//创建eventfd{int efd = ::eventfd(0,EFD_CLOEXEC|EFD_NONBLOCK); //initval flagif(efd < 0){ERR_LOG("create eventfd error");abort();}return efd;}void ReadEvent(){uint64_t val = 0;ssize_t ret = read(_eventfd,&val,sizeof(val));if(ret < 0){    //信号打断  无数据可读if(errno == EINTR || errno == EAGAIN) return;ERR_LOG("Read Eventfd Failed");abort();}}//其实就是向eventfd写入一个数据,达到事件通知的效果,触发可读事件void WeakEvent(){uint64_t val = 1;ssize_t ret = write(_eventfd,&val,sizeof(val));if(ret < 0){    //信号打断  无数据可读if(errno == EINTR || errno == EAGAIN) return;ERR_LOG("Write Eventfd Failed");abort();}}public:EventLoop():_thread_id(std::this_thread::get_id()),_eventfd(CreateEfd()),_event_channel(new Channel(this,_eventfd)) //TODO Channel修改为eventloop{//给eventfd设置可读事件回调,读取事件通知次数(注意绑定的时候注意指定类域还有注意this)_event_channel->SetReadCallBack(std::bind(&EventLoop::ReadEvent,this));//开启可读监控_event_channel->EnableRead();}//交换任务时需要加锁保证任务队列的线程安全void RunAllTask() //执行任务池中所有任务{vector<Functor> func;{std::unique_lock<std::mutex> _lock(_mtx);func.swap(_tasks);}//执行任务for(const auto& f : func){f();}}void RunInLoop(const Functor& cb) //判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列{if(isInLoop()) return cb();//将要执行的任务处于当前线程,直接返回return QueueInLoop(cb); //不是处于同一线程,压入线程池中}void QueueInLoop(const Functor& cb)//将操作压入任务池:需要加锁保证线程安全{{std::unique_lock<std::mutex> _lock(_mtx);_tasks.push_back(cb);}//唤醒可能因为事件没就绪,而导致的epoll阻塞//其实就是给eventfd写入一个数据,eventfd就会触发可读事件,此时epoll不会阻塞,执行RunAlltask()WeakEvent();}bool isInLoop()//用于判断当前线程是否是EventLoop对应的线程{return _thread_id == std::this_thread::get_id();}bool UpdateEvent(Channel* channel){_poller.UpdateEvent(channel);}//添加/修改描述符的事件监控void RemoveEvent(Channel* channel){_poller.RemoveEvent(channel);} //移除描述符的监控void Start() //三步走:1.事件监控 2.就绪事件处理 3.执行任务{ while(1){//1.监控vector<Channel*> actives;_poller.Poll(&actives);//2.就绪事件处理for(const auto& a : actives){a->HandlerEvent();}//3.执行任务RunAllTask();}
};
  • Start()就是将来和EventLoop绑定线程的入口函数主要执行的工作。
  • 在将对链接的操作封装成一个任务抛进任务队列,不是直接压入任务池中的,如果当前要执行的任务处于当前线程,则直接返回,不是处于同一线程才压入线程池中。
  • 双缓冲队列优化:push任务时我们需要加锁,而在RunAllTasks()我们使用两个队列,也就是说锁只保护任务队列的push和swap操作,不保护任务执行,任务执行在EventLoop线程中串行执行,不需要加锁。

EventLoop内部是封装了Poller模块的,因此可以将EventLoop和Channel模块整合,在Channel对事件监控开启/关闭时,调用EventLoop内部封装的Poller实现:

class Poller;
class EventLoop;
class Channel
{
private:int _fd;EventLoop* _loop;uint32_t _events;//当前链接需要监控的事件uint32_t _revnts; //当前链接触发的事件
///....
};void Channel::Remove()
{_loop->RemoveEvent(this);
}void Channel::Update()
{_loop->UpdateEvent(this);
}

TimeWheel模块

        这个模块主要是为了帮我们完成定时任务的功能,完成类似非活跃连接销毁的任务。因此这个模块的目的是实现一个完整的秒级定时器,通过timerfd决定每隔多少时间滴答一次,timerfd设置为每ns触发一次定时事件,当事件被触发,而运行一次timeWheel的runTimeTask,执行当前槽内的所有定期任务。

        首先timerWheel是根据timerfd来决定何时执行过期定时任务,因此我们需要一个timerfd成员;同时每秒钟触发一次事件,那就需要将fd添加到eventloop进行事件监控,所以我们要添加个eventLoop;而eventLoop基于channel进行事件监控,因此我们还要为timerfd创建一个channel对象进行事件监控:

//时间轮对象
class TimeWheel
{
private:using  WeakTask = std::weak_ptr<TimeTask>;using  PtrTask = std::shared_ptr<TimeTask>;int _tick;//表示执行定时任务的指针,走到哪里释放哪里,释放哪里相当于执行哪里的任务int _capacity;//相当于时间轮的一圈多少 --- 最大延迟时间vector<vector<PtrTask>> _wheel;//二维数组作为时间轮注意要比capacity后声明unordered_map<uint64_t,WeakTask> _timers; //注意这里需要使用WeakPtr是因为添加新的定时任务(shared_ptr<TimeTask>)再使用shared_Ptr会增加不必要的计数EventLoop* _loop; //事件循环int _timerfd;//定时器描述符unique_ptr<Channel> _timer_channel;////....
};

之前我们已经封装好一个时间轮的,剩下的就是在原来基础上创建好timerfd,timerfd_channel,然后设置好对应可读回调并开启读事件监控:

private:static int CreateTimerFd(){//1.创建定时器int timerfd = ::timerfd_create(CLOCK_MONOTONIC,0);if(timerfd < 0){ERR_LOG("timerfd create error!");abort();}//2.启动定时器struct itimerspec itime;itime.it_value.tv_sec = 1;//第一次超时itime.it_value.tv_nsec = 0;itime.it_interval.tv_sec = 1;//第一次超时之后的超时间隔itime.it_interval.tv_nsec = 0;int n = ::timerfd_settime(timerfd,0,&itime,NULL);if(n < 0){ERR_LOG("start timer error!");abort();}return  timerfd;}void ReadTimerfd(){uint64_t times;int ret = ::read(_timerfd,&times,sizeof(times));//写入数据之前会阻塞没有数据if(ret < 0){ERR_LOG("read error");;abort();}}//定时器fd可读事件回调:1.读取定时器数据进行清0 2.执行一波所有的过期定时任务void TimerCb(){ReadTimerfd();RunTimeTask();}public:TimeWheel(EventLoop* loop):_tick(0),_capacity(60),_wheel(_capacity),_loop(loop),_timerfd(CreateTimerFd()),_timer_channel(new Channel(loop,_timerfd))//注意成员声明顺序{_timer_channel->SetReadCallBack(std::bind(&TimeWheel::TimerCb,this));_timer_channel->EnableRead();}//指针移动执行任务void RunTimeTask(){_tick = (_tick+1)%_capacity;//走到哪释放哪_wheel[_tick].clear(); //清空指定位置的数组,就会把数组中保存的所有管理定时器对象的shared_ptr軽放掉}

  在时间轮添加定时任务/对指定任务进行刷新/取消定时任务这些接口都是在任何一个线程都有可能进行的,但是定时器是和eventLoop保存在一起的,很多定时任务都是对通信链接进行操作,所以我们定时任务必须在EventLoop线程里执行,即这几个函数最终要放在eventLoop线程中执行:

    //在时间轮添加定时任务void TimerAdd(uint64_t id,uint32_t delay,const TaskFunc& cb){_loop->RunInLoop(std::bind(&TimeWheel::TimerAddInloop,this,id,delay,cb));return;}void TimerAddInloop(uint64_t id,uint32_t delay,const TaskFunc& cb){//1.创建定时任务设置好删除函数PtrTask pt(new TimeTask(id,delay,cb));//注意:因为编译器不会将对象的成员函数隐式转换成函数指针,所以必须在TimeWheel::RemoveTimeTask前添加&;pt->SetRelease(std::bind(&TimeWheel::RemoveTimeTask,this,id));//注意这里需要使用bind这是因为RemoveTimeTask第一个参数是this,而且我们统一了ReleaseFunccout << "Create Task.." << _wheel.size() << endl ;//2.存放到时间轮int pos = (_tick+delay)%_capacity;_wheel[pos].push_back(pt);_timers[id] = WeakTask(pt);}//对指定定时任务进行刷新/延迟void RefreshTask(uint64_t id){_loop->RunInLoop(std::bind(&TimeWheel::RefreshTaskInLoop,this,id));return;}void RefreshTaskInLoop(uint64_t id){auto it = _timers.find(id);if(it == _timers.end())return; //没找到指定定时任务 返回PtrTask pt = it->second.lock();//相当于新建了一个shared_ptr共享计数int delay = pt->DelayTime();int pos = (_tick+delay)%_capacity;_wheel[pos].push_back(pt);}//取消定时任务void CancelTask(uint64_t id){_loop->RunInLoop(std::bind(&TimeWheel::CancelTaskInLoop,this,id));return;}void CancelTaskInLoop(uint64_t id){auto it = _timers.find(id);if(it == _timers.end())return; //没找到指定定时任务 返回PtrTask pt = it->second.lock();//相当于新建了一个shared_ptr共享计数if(pt) pt->Cancel();}//Class TimerWheel: 是否存在某个定时任务bool HasTimer(uint64_t id){auto it = _timers.find(id);if(it == _timers.end()) return false;return true;}

编写好TimeWheel模块,就可以和EventLoop模块整合了,主要是用于实现定时任务(非活跃连接)的添加、刷新、取消:

   void EventLoop::TimerAdd(uint64_t id,uint32_t delay,const TaskFunc& cb){return _timer_wheel->TimerAdd(id,delay,cb);}void RefreshTask(uint64_t id){return _timer_wheel->RefreshTask(id);}void CancelTask(uint64_t id){return _timer_wheel->CancelTask(id);}bool EventLoop::HasTimer(uint64_t id){return _timer_wheel->HasTimer(id);}

需要注意的是,添加新链接的定时销毁任务这个操作,必须在启动读事件之前,因为有可能启动了事件监控之后,立即有了事件,需要刷新连接的活跃度,而任意事件回调的操作主要就是刷新定时任务,如果先前没设置定时任务就麻烦了。

主要的流程图如下:

Connection模块

            Connection模块的目的是为了对连接进行全方位的管理,对于通信连接的所有操作都是通过这个模块来完成的。

            管理的内容主要包括套接字的管理,可以对套接字进行各种操作,还有对于连接事件的管理,比如可读、可写、挂断、任意,也有对于缓冲区的管理,方便和Socket进行数据的发送和接收,还有对于协议上下文的管理,用来记录请求数据的发送过程,以及对用户设置各种回调的功能,最后还有连接的状态方便对连接进行维护。

  • DISCONNECTED -- 链接关闭状态
  • CONNECTING -- 链接建立成功-待处理状态
  • CONNECTED -- 链接建立完成,各种设置已完成,可以通信的状态
  • DISCONNECTING --- 链接待关闭状态

        之前几个模块来进行通信,都是对新通信连接封装一个channel然后添加到EventLoop中进行管理,设置回调(编写HandleRead,HandlerWrite接收和发送数据)而Connection模块需要对用户更加友好,因此我们需要在内部编写好这几个Channel对应的回调数据,内部帮用户接收/发送数据,而用户只需要关心连接收到数据之后的处理、连接建立成功之后处理、连接关闭之后该如何处理,任意事件的产生如何处理,这些都是由用户设置的。

        Connection模块的功能主要由发送数据、关闭连接、启动和取消非活跃连接超时销毁、连接初始化、协议切换的功能,而该模块是对于连接的管理模块,因此对于连接的所有操作都是通过这个模块完成的,Connection内部封装了EventLoop和Channel,因此可以保证对链接的操作都是在对应EventLoop线程中串行执行的,这种做法虽解决了"数据竞争"问题,但是不能解决"逻辑时序"问题,比如对于连接进行操作的时候,排进任务队列,但是释放连接操作比它先排进队列,此时对连接的访问是有风险的,可能导致程序崩溃,其中一个解决方案是使用智能指针来对Connection对象进行管理,这样就能保证任意一个地方对于Connection对象进行操作的时候都会在内部保存一个shared_ptr,即使其他地方进行释放操作,也只是对shared_ptr的计数器减1,修改连接的状态,不会导致连接的实际释放,如果后续其他处使用Send操作,也会根据链接的状态判断是否进行Send操作。

        我们连接收到数据之后,用户处理完数据之后,需要调用MessageCallback使用Connection对象再将响应发回去,因此在HandleRead内部需要传递Connection对象给MessageCallBack的,我们是不能直接传递this指针的,因为执行的时候可能对象已经被销毁,导致程序崩溃,因此我们需要值传递对象本身的shared_ptr,此时需要Connection类继承std::enable_shared_from_this,再调用shared_from_this()返回自身的shared_ptr对象。

//链接管理模块Connection
typedef enum {DISCONNECTED,CONNECTING,CONNECTED,DISCONNECTING} ConnStatu;
//DISCONNECTED -- 链接关闭状态
//CONNECTING -- 链接建立成功-待处理状态
//CONNECTED -- 链接建立完成,各种设置已完成,可以通信的状态
//DISCONNECTING --- 链接待关闭状态
class Conncetion;
using PtrConnection = std::shared_ptr<Conncetion>;//解决链接释放野指针(?)
class Conncetion : public enable_shared_from_this<Conncetion>
{
private:uint64_t _conn_id;//标识一个链接的id,也作为定时器事件id//套接字管理int _sockfd;//链接关联的文件描述符bool _enable_inactive_release;//链接释放启动非活跃链接销毁的判断标志,默认为falseConnStatu _statu;Socket _socket; //套接字操作管理//链接事件管理EventLoop* _loop;Channel _channel; //链接事件管理//缓冲区管理Buffer _in_buffer;//输入缓冲区:存放从socket中读取到的数据Buffer _out_buffer;//输出缓冲区:请求的接收处理上下文//协议上下文管理Any _context;//请求的接收处理上下文//回调函数管理(不同阶段回调)using ConnectedCallBack = std::function<void(const PtrConnection&)>;//连接建立成功后如何处理using MessageCallBack   = std::function<void(const PtrConnection&,Buffer*)>;//连接收到数据如何处理using CloseCallBack = std::function<void(const PtrConnection&)>;//连接关闭前如何处理using AnyCallBack = std::function<void(const PtrConnection&)>;//任意事件有没有处理由用户决定ConnectedCallBack _connected_cb;MessageCallBack _message_cb;CloseCallBack _close_cb;AnyCallBack _any_cb;CloseCallBack _server_closed_cb;//从服务器组件内移除链接管理信息
private:
//保证在EventLoop线程中执行void SendInLoop( Buffer& buf){if(_statu == DISCONNECTED) return;//1.将数据放到缓冲区_out_buffer.WriteBufferPush(buf);//2.启动可写事件监控if(_channel.WriteAble() == false) _channel.EnableWrite();}void ShutDownInLoop(){_statu = DISCONNECTING; //细节:在这里设置完DISCONNECTING之后,如果后续发送缓冲区还有数据开启写监控调用回调HandlerWrite发现//是DISCONNECTING就会调用ReleaseInLoop()//1.设置待关闭状态//2.判断接收缓冲区是否还有数据要处理if(_in_buffer.ReadAbleSize() > 0){if(_message_cb) _message_cb(shared_from_this(),&_in_buffer);}//3.判断发送缓冲区是否还有数据if(_out_buffer.ReadAbleSize() > 0){if(_channel.WriteAble() == false)//开启写监控就会调用HandlerWrite关闭链接_channel.EnableWrite();// return;}//没有直接关闭if(_out_buffer.ReadAbleSize() == 0)Release();}void EnableInactiveReleaseInLoop(int sec){//1.设置标记位_enable_inactive_release = true;//2.延迟/添加定时任务if(_loop->HasTimer(_conn_id))return _loop->RefreshTask(_conn_id);_loop->TimerAdd(_conn_id,sec,std::bind(&Conncetion::Release,this));}void CancelInactiveReleaseInLoop(){//1.切换标记位_enable_inactive_release = false;//2.取消定时任务if(_loop->HasTimer(_conn_id))_loop->CancelTask(_conn_id);}void UpgradeInLoop(const Any& context,const ConnectedCallBack& conn,const MessageCallBack&msg,const CloseCallBack& closed,const AnyCallBack& event){_context = context;_connected_cb = conn;_message_cb = msg;_close_cb = closed;_any_cb = event;}void ReleaseInLoop()//实际释放接口{//1.修改状态_statu = DISCONNECTED;//2.移除事件监控_channel.Remove();//3.关闭文件描述符_socket.Close();//4.看是否还存在定时任务需要移除if(_loop->HasTimer(_conn_id))CancelInactiveReleaseInLoop();//5.调用回调:注意为避免因移除服务器管理而导致连接释放而出错,因此先调用用户的 if(_close_cb) _close_cb(shared_from_this());if(_server_closed_cb) _server_closed_cb(shared_from_this());}void EstablishedInLoop()//连接获取之后进行设置{assert(_statu == CONNECTING);//1.修改连接状态_statu = CONNECTED;//2.开启读事件监控:开启之后可能立即会有事件触发,如果此时启动了非活跃链接销毁,就会刷新活跃延迟销毁任务执行//因此启动读事件监控应该在设置非活跃链接是否销毁之后进行_channel.EnableRead();//3.调用用户设置的回调if(_connected_cb){_connected_cb(shared_from_this());// DBG_LOG("after coonnected_cb");}}public:Conncetion(EventLoop* loop,uint64_t conn_id,int sockfd):_conn_id(conn_id),_sockfd(sockfd),_enable_inactive_release(false),_statu(CONNECTING),_socket(sockfd),_loop(loop),_channel(loop,sockfd){_channel.SetReadCallBack(std::bind(&Conncetion::HandlerRead,this));_channel.SetWriteCallBack(std::bind(&Conncetion::HandlerWrite,this));_channel.SetCloseCallBack(std::bind(&Conncetion::HandlerClose,this));_channel.SetErrorCallBack(std::bind(&Conncetion::HandlerError,this));_channel.SetAnyCallBack(std::bind(&Conncetion::HandlerAnyEvent,this));}~Conncetion(){DBG_LOG("RELEASE CONNECTION:%p",this);}void Send(const char* data,size_t len)//发送数据,将数据发送到缓冲区,启动写事件监控{//bug:可能外界传入的data是个临时空间,我们现在只是把发送操作传入任务池,有可能并没被立即执行//因此有可能执行的时候,data指向的空间有可能已经被释放Buffer buf;buf.WriteAndPush(data,len);_loop->RunInLoop(std::bind(&Conncetion::SendInLoop,this,std::move(buf))); //使用move}void ShutDown()//提供给组件使用者的关闭接口--并不是实际关闭还需要判断是否有数据待处理{_loop->RunInLoop(std::bind(&Conncetion::ShutDownInLoop,this));}void EnableInactiveRelease(int sec)//启动非活跃销毁,并定义多长时间无通信是非活跃,添加定时任务{_loop->RunInLoop(std::bind(&Conncetion::EnableInactiveReleaseInLoop,this,sec));}void CancelInactiveRelease()//取消非活跃销毁{_loop->RunInLoop(std::bind(&Conncetion::CancelInactiveReleaseInLoop,this));}//切换协议 -- 重置上下文以及阶段性处理函数void Upgrade(const Any& context,const ConnectedCallBack& conn,const MessageCallBack&msg,const CloseCallBack& closed,const AnyCallBack& event){//保证必须在eventLoop()线程中执行_loop->AssertInLoop();//应该立即执行不能压入队列:能走到这里说明就是同线程,就会直接执行任务_loop->RunInLoop(std::bind(&Conncetion::UpgradeInLoop,this,context,conn,msg,closed,event));}void Release(){_loop->QueueInLoop(std::bind(&Conncetion::ReleaseInLoop,this));}void Established(){_loop->RunInLoop(std::bind(&Conncetion::EstablishedInLoop,this));}public:   int Fd()//获取链接管理的文件描述符{return _sockfd;}int Id()//获取链接ID{return _conn_id;}bool Connected()//是否处于Connected状态{return _statu == CONNECTED;}void SetContext(const Any& context)//设置上下文{_context = context;}Any* GetContext()//获取上下文,返回指针{return &_context;}void SetConnectedCallBack(const ConnectedCallBack& cb){_connected_cb = cb;}void SetMessageCallBack(const MessageCallBack& cb){_message_cb = cb;}void SetCloseCallBack(const CloseCallBack& cb){_close_cb = cb;}void SetAnyCallBack(const AnyCallBack& cb){_any_cb = cb;}void SetSvrClosedCallBack(const CloseCallBack& cb){_server_closed_cb = cb;}public: //五个Channel事件回调void HandlerRead()//描述符可读事件触发的回调,接收socket数据放到接收缓冲区再调用message_cb{//1.非阻塞接收socket数据,放到缓冲区char buf[65536];int ret = _socket.NonBlockRecv(buf,65535);if(ret < 0){//  DBG_LOG("ret < 0");//返回-1表示连接断开:出错不能直接关闭还要看缓冲区是否有数据要处理return ShutDownInLoop();}buf[ret] = 0;// DBG_LOG("%s",buf);//返回0表示未读取到数据,并不是连接断开//读取数据到缓冲区,并且移动写偏移_in_buffer.WriteAndPush(buf,ret);//2.调用用户message_cbif(_in_buffer.ReadAbleSize() > 0){//  DBG_LOG("message_cb");_message_cb(shared_from_this(),&_in_buffer);return;}}void HandlerWrite()//描述符可写事件触发的回调,将发送缓冲区中数据进行发送{//1.将输出缓冲区中数据发送ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPos(),_out_buffer.ReadAbleSize());if(ret < 0) //发送错误关闭连接{  //关闭连接之前查看接收缓冲区是否有数据要处理(出错也就不能发,查看发送缓冲区就没必要了)if(_in_buffer.ReadAbleSize() > 0)_message_cb(shared_from_this(),&_in_buffer);return Release();//直接关闭释放}//2.不要忘记移动读偏移(因为我们已经发送了)_out_buffer.MoveReadIdx(ret);if(_out_buffer.ReadAbleSize() == 0){_channel.DisableWrite();//关闭写监控//如果当前连接时待关闭,有数据则处理数据(前面部分代码处理),没有(或处理完了)则释放连接if(_statu == DISCONNECTING)return Release();}}void HandlerClose()//触发挂断事件{//输入缓冲区有数据就处理if(_in_buffer.ReadAbleSize() > 0)_message_cb(shared_from_this(),&_in_buffer);return Release();}void HandlerError()//触发出错事件{HandlerClose();}void HandlerAnyEvent()//触发任意事件{//1.刷新活跃度if(_enable_inactive_release)_loop->RefreshTask(_conn_id);//2.调用用户组件者设置的任意事件回调if(_any_cb) _any_cb(shared_from_this());}
};

说明:

1. 对于切换协议的函数这个接口必须在EventLoop()线程中立即执行,因为我们需要防备新事件触发后,处理的时候,切换任务还没有被执行,而导致数据使用原协议处理了。所以这个任务必须保证在eventLoop线程中执行,不能压入队列要立即执行。

2. 除了CloseCallBack,我们还需要设置一个组件内的连接关闭回调,这是在组件内设置的,因为未来服务器组件内会把所有连接管理起来,一旦某个连接关闭就应该从管理的地方移除自己的信息。

3. 注意shutdownInLoop()不是一个实际关闭连接的接口,而是一个判断接口,要判断发送缓冲区有没有数据待处理,有就处理,处理完再调用真正的连接实际释放接口ReleaseInLoop()。

4. 连接获取之后我们需要对连接进行各种设置,比如设置channel事件监控,启动读事件监控等,因此我们还需要设置一个EstablishedInLoop()

Connection内部函数调用关系:

Acceptor模块

        该模块其实就是对监听套接字进行管理,开启对监听套接字的读事件监控,当新链接到来需要对新链接创建Connection对象管理,然后为Connection对象设置回调、初始化、开启事件监控等。

        我们需要注意的是,必须在新链接获取回调设置好再启动监听套接字的读事件监控,否则可能造成启动监控之后立即有事件发生,但是回调还没有设置好,新链接得不到处理而造成内存泄漏。

class Acceptor
{
private:Socket _socket;//监听套接字EventLoop* _loop;//对监听套接字进行事件监控Channel _channel;//监听套接字事件管理using AcceptCallBack = std::function<void(int)>;AcceptCallBack _accept_cb;
private:void HandlerRead() //可读事件回调 --- 获取新连接,调用accept_cb进行新连接处理{//1.获取新连接int newfd = _socket.Accept();if(newfd < 0) return;//2.调用获取新连接回调if(_accept_cb) _accept_cb(newfd);}int CreateServer(int port){bool ret = _socket.CreateServer(port);assert(ret == true);return _socket.Fd();}public:int LstFd(){return _socket.Fd();}Acceptor(EventLoop* loop,int port):_socket(CreateServer(port)),_loop(loop),_channel(loop,_socket.Fd()){_channel.SetReadCallBack(std::bind(&Acceptor::HandlerRead,this));}void Listen(){_channel.EnableRead();}void SetAcceptCallBack(const AcceptCallBack& cb){_accept_cb = cb;}};

LoopThread模块

        EventLoop是与线程一一对应的,因此这个模块内部肯定要有一个线程然后和EventLoop绑定。EventLoop模块在实例化对象的时候,必须在线程内部,不能在LoopThread这个类构造时初始化否则实例化构造的线程id不是自己对应的线程,而是主线程),如果我们先创建EventLoop对象再创建线程,将线程id重新给EventLoop进行设置,,这样存在的问题是在构造EventLoop对象到设置新的thread_id期间将是不可控的(构造函数里记录的thread_id是旧线程的id,后面把成员变量改成新线程的id,这两次赋值之间,如果任何代码检查了AssertInLoop就会得到错误结论),因此我们必须先创建线程,然后在线程的入口函数中去实例化EventLoop对象

        将来我们是需要通过LoopThread获取EventLoop对象来对描述符进行事件监控的,因此我们需要互斥锁&&条件变量保证同步,用于实现EventLoop获取的同步互斥关心,避免EventLoop还没实例化之前去获取EventLoop对象指针。

//每个线程对应一个eventLoop
class LoopThread
{
private://互斥锁&&条件变量:保证获取EventLoop对象指针的同步关系,防止获取时还没实例化std::mutex _mtx;std::condition_variable _cv;EventLoop* _loop; //EventLoop对象对应指针,需要在线程入口函数初始化std::thread _thread;//EventLoop对象对应线程
private:void ThreadEntry(){//不new对象的原因是想,EventLoop生命周期随线程EventLoop loop;{unique_lock<std::mutex> lck(_mtx);_loop = &loop;_cv.notify_all();//唤醒所有在条件变量上等的线程}//  while(1)//  {loop.Start();//  }} public:LoopThread():_loop(nullptr),_thread(thread(&LoopThread::ThreadEntry,this)){}EventLoop* GetLoop(){EventLoop* loop = NULL;{unique_lock<std::mutex> lck(_mtx);_cv.wait(lck,[&](){return _loop != NULL;});//在条件变量上等EventLoop实例化loop = _loop;}return loop;}int Eventfd(){{unique_lock<std::mutex> lck(_mtx);_cv.wait(lck,[&](){return _loop != NULL;});//在条件变量上等EventLoop实例化}return _loop->EventFd();}};

        有了LoopThread模块,未来Channel模块怎么和EventLoop模块关联呢,或者说Channel模块怎么进行事件监控?在获取新链接之后,创建新的Connection对象,然后通过LoopThread内部的GetLoop获取EventLoop对象指针,传递给Connection内部的channel,后面Connection开启事件监控的时候,内部channel设置好监控事件之后,就可以通过Loop指针调用Update进行事件监控,至此两个模块就关联起来了。

LoopThreadPool模块

        该模块其实就是设计一个线程池,对所有的LoopThread进行管理和分配,one thread one Loop其实就是连接从这个线程池中选择一个loop进行事件监控。

 该模块主要有以下功能:

1. 线程数量可配置:在服务器中,主从Reactor模型是主线程只负责新连接获取,从线程新连接的事件监控及处理,而当前线程池也有可能从属线程数量为0,也就是实现单Reactor服务器一个线程既负责连接获取还负责连接的事件监控处理,要什么模型由用户灵活配置。

2. 对所有线程进行管理

3. 提供线程分配功能:当主线程获取新连接,需要将新连接挂到从线程进行事件监控看,假设有0个从属线程,则直接分配给主线程的EventLoop;假设有多个从线程,则采用RR轮转进行线程分配,将对应线程eventLoop返回设置给对应Connection。

class LoopThreadPool
{
private:int _thread_count;//从属线程数量int _next_loopix; //分配eventLoop对象EventLoop* _base_loop;//主Reactor线程vector<LoopThread*> _threads;//管理LoopThread线程vector<EventLoop*> _loops;//管理EventLoop对象public:LoopThreadPool(EventLoop* base_loop):_base_loop(base_loop),_thread_count(0),_next_loopix(0){}void SetThreadCount(int count)//设置从属线程数量{_thread_count = count;}void Create()//线程池初始化{if(_thread_count > 0){_threads.resize(_thread_count);_loops.resize(_thread_count);for(int i = 0 ; i < _thread_count ; i++){_threads[i] = new LoopThread();_loops[i] = _threads[i]->GetLoop();//细节:在线程入口函数内部实例化EventLoop对象之前GetLoop会阻塞}}}EventLoop* NextLoop()//给链接分配EventLoop对象进行事件监控{if(_thread_count == 0)return _base_loop;_next_loopix = (_next_loopix+1) % _thread_count;return _loops[_next_loopix];}
};

TcpServer模块

        这个模块是对所有模块的整合,通过TcpServer模块实例化的对象,可以非常简单的完成一个服务器搭建。

管理成员:

  1. Acceptor对象:创建一个监听套接字(需要端口信息)
  2. EventLoop对象:base_loop,实现对监听套接字的事件监控。
  3. vector<PtrConnection> conns:实现对所有新建连接的管理。
  4. LoopThreadPool:创建Loop线程池,对新建连接进行事件监控及处理(将新建连接挂到从属EventLoop线程进行事件监控)

功能:

  1. 设置从属线程池数量(内部创建Acceptorbase_loop,线程池,但由用户控制线程池线程数量)
  2. 启动服务器:线程池初始化,acceptor监听,base_loop开启start等(也可放到构造函数)
  3. 设置各种回调函数(连接建立完成回调,消息回调,连接关闭回调,任意回调),这是用户设置给TcpServerTcpServer设置给获取的新连接
  4. 是否启动非活跃连接超时销毁功能(用户设置超时时间)
  5. 添加定时任务功能

工作流程:

  1. TcpServer中实例化一个Acceptor对象 , 以及一个EventLoop对象(base_loop)
  2. Acceptor挂到base_loop上进行事件监控(设置好回调再监控!)
  3. 一旦Acceptor对象就绪可读事件,则执行可读事件回调获取新链接
  4. Acceptor可读回调对新链接创建一个Connection进行管理
  5. 对链接对应的Connection设置功能回调(链接完成回调,消息回调,关闭回调,任意回调)
  6. 启动Connection的非活跃链接的超时销毁。
  7. 将新连接对应的Connection挂到LoopThread_pool中的从属线程对应的EventLoop中进行事件监控。
  8. 一旦Connection对应连接就绪可读事件,则此时执行可读事件回调,读取数据,读取完毕,调用TcpServer设置的消息回调进行业务处理

由于LoopThreadPool的从属线程数量由用户自己指定,因此我们不能在TcpServer构造的时候设置线程数量,需要用户自己显式指定,用户指定完之后,调用Start()接口,内部根据实际线程数量初始化线程池,然后再进行监控。

//TcpServer模块将前面的模块整合
class TcpServer
{
private:int _next_id; //一个自增长的链接id/定时任务idint _port;EventLoop _baseloop; //主Reactor线程Acceptor _acceptor; //监听套接字LoopThreadPool _pool;//eventLoop线程池unordered_map<uint64_t,PtrConnection> _conns;//保存管理的所有链接shared_Ptr对象int _timeout;//非活跃链接的超时时间bool _enable_inactive_release;//表示是否开启非活跃链接销毁//各种回调using Functor = function<void()>;using ConnectedCallBack = std::function<void(const PtrConnection&)>;//连接建立成功后如何处理using MessageCallBack   = std::function<void(const PtrConnection&,Buffer*)>;//连接收到数据如何处理using CloseCallBack = std::function<void(const PtrConnection&)>;//连接关闭前如何处理using AnyCallBack = std::function<void(const PtrConnection&)>;//任意事件有没有处理由用户决定ConnectedCallBack _connectd_cb;MessageCallBack _message_cb;CloseCallBack _close_cb;AnyCallBack _any_cb;public:TcpServer(int port):_next_id(0),_port(port),_acceptor(&_baseloop,port),_pool(&_baseloop),_enable_inactive_release(false){_acceptor.SetAcceptCallBack(std::bind(&TcpServer::NewConnection,this,std::placeholders::_1));_acceptor.Listen();}void Start()//服务器启动{//1.初始化线程池_pool.Create();//2.开启主线程的事件监控_baseloop.Start();}//回调函数设置   void SetConnectedCallBack(const ConnectedCallBack& cb){_connectd_cb = cb;}void SetMessageCallBack(const MessageCallBack& cb){_message_cb = cb;}void SetCloseCallBack(const CloseCallBack& cb){_close_cb = cb;}void SetAnyCallBack(const AnyCallBack& cb){_any_cb = cb;}//添加定时任务void RunAfter(const Functor& task,int delay){_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop,this,task,delay));}//启动非活跃超时链接销毁void EnableInactiveRelease(int timeout){_timeout = timeout;_enable_inactive_release = true;}//用户设置线程池中线程数量void SetThreadCount(int count){_pool.SetThreadCount(count);}private://监听套接字Acceptror获取新链接之后的回调(给获取的链接创建Connection对象,设置事件回调,分配eventLoop对象监控,开启事件监控等)void NewConnection(int fd){_next_id++;//自增链接id//分配线程池的线程给新链接,即分配eventLoop进行事件监控PtrConnection conn(new Conncetion(_pool.NextLoop(),_next_id,fd));cout << "gain a new link" << endl;conn->SetMessageCallBack(_message_cb);conn->SetConnectedCallBack(_connectd_cb);conn->SetCloseCallBack(_close_cb);conn->SetAnyCallBack(_any_cb);conn->SetSvrClosedCallBack(std::bind(&TcpServer::RemoveConnection,this,std::placeholders::_1));if(_enable_inactive_release) conn->EnableInactiveRelease(_timeout); //启动-非活跃链接销毁//先启动非活跃连接销毁再启动读监控conn->Established();//就绪初始化:设置回调 启动读监控_conns.insert(std::make_pair(_next_id,conn));//放进服务器组件管理}//移除链接的回调(给服务器组件设置的移除链接的回调,不从这删除,链接永远不会释放)void RemoveConnection(const PtrConnection& conn){_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop,this,conn));}//保证线程安全void RunAfterInLoop(const Functor& task,int delay){_next_id++;_baseloop.TimerAdd(_next_id,delay,task);}void RemoveConnectionInLoop(const PtrConnection& conn){int id = conn->Id();auto it = _conns.find(id);if(it != _conns.end())_conns.erase(it);}};

基于TcpServer实现EchoServer

        在封装完TcpServer模块之后,实现一个回显服务器其实就很简单了,其实就是基于TcpServer服务区修改下收到数据的业务处理,即在messageCallback中进行数据的回显。

#include"server.hpp"class EchoServer
{private:TcpServer _server;public:EchoServer(int port,int count,bool enable_inactive_release,int timeout):_server(port){//设置线程数量和是否启动非活跃链接超时销毁_server.SetThreadCount(count);if(enable_inactive_release)_server.EnableInactiveRelease(timeout);//设置回调_server.SetMessageCallBack(std::bind(&EchoServer::OnMessage,this,std::placeholders::_1,std::placeholders::_2));_server.SetCloseCallBack(std::bind(&EchoServer::OnClosed,this,std::placeholders::_1)); _server.SetConnectedCallBack(std::bind(&EchoServer::OnConnected,this,std::placeholders::_1));       }  void Start(){_server.Start();}private:void OnConnected(const PtrConnection& conn){DBG_LOG("NEW Connection:%p",conn.get());}void OnClosed(const PtrConnection& conn){DBG_LOG("Close Connection:%p",conn.get());}void OnMessage(const PtrConnection& conn,Buffer* buf){conn->Send(buf->ReadPos(),buf->ReadAbleSize());buf->MoveReadIdx(buf->ReadAbleSize());// conn->ShutDown();}};

模块关系图:

http://www.dtcms.com/a/590148.html

相关文章:

  • 免费网站推广网站在线怎么做网页啊
  • 泰安手机网站建设做校园文化的网站
  • 自助建站网站程序源码dw做网站弊端
  • 新城区网站建设做网站.服务器怎么买
  • 【教学类-98-01】20251109“兔子头像”(小班主题《小兔乖乖》)
  • 上海电子门户网站建设数据怎么用editplus做网站
  • iBiz开源:iBizPLM BOM插件来了
  • 8.游戏逆向-pxxx-获取GObject
  • 建立网站 数据分析网站怎么做动效
  • 什么软件做网站链接安宁市建设厅网站
  • 4.1.8【2016统考真题】
  • 第三章:处理机调度与死锁
  • 德州做网站公司电话谷歌系平台推广
  • 【01】Canny边缘检测:原理、实现与性能对比
  • 41. CMake
  • 11.string(上)
  • 【开题答辩全过程】以 基于SpringBoot的智慧教育系统的设计与实现为例,包含答辩的问题和答案
  • 360永久免费建网站网站建设及空间
  • 轻松阅读漫画的利器——Kotatsu漫画阅读器
  • 婚纱外贸网站怎么用PS做珠宝网站
  • 新乡网站网站建设网页制作软件是什么
  • C#权威指南第9课:方法
  • fastjson中的原生反序列化漏洞
  • 网站弹屏广告怎么做的如何修改网站后台的用户名和密码
  • Spring中如何使用@Resource注解?
  • 高频面试八股文用法篇(十二)Java 包装类缓存机制
  • 【Envi遥感图像处理】019:影像自动配准操作
  • 杭州网站开发制作公司排名邹平做网站的公司
  • 做家装的网站classplus wordpress
  • IO接口基本结构与内容