当前位置: 首页 > news >正文

交易网站开发2016网站建设总结

交易网站开发,2016网站建设总结,室内设计风格,百度营销是什么目录 日志宏定义 Buffer模块 Buffer缓冲区设计思想 Buffer缓冲区代码实现 基本功能 协议支持 Socket模块 Channel模块 Poller模块 Poller类编写 Poller与Channel整合 EventLoop模块 eventfd认识 EventLoop模块设计思想 EventLoop代码设计 TimerWheel定时器 Ti…

目录

日志宏定义

Buffer模块

Buffer缓冲区设计思想

Buffer缓冲区代码实现

基本功能

协议支持

Socket模块

Channel模块

Poller模块

Poller类编写

Poller与Channel整合

EventLoop模块

eventfd认识

EventLoop模块设计思想

EventLoop代码设计

TimerWheel定时器

TimerWheel与EventLoop整合

Connection模块

Connection模块功能思想

Connection模块类设计

Connection模块代码设计

Acceptor模块

LoopThread模块 

LoopThreadPool模块

TcpServer模块

基于TcpServer实现回显服务器

服务器模块的总结


日志宏定义

#define INF 0
#define DBG 1
#define ERR 2
#define LOG_LEVEL DBG#define LOG(level, format, ...) do{\if (level < LOG_LEVEL) break;\time_t t = time(NULL);\struct tm *ltm = localtime(&t);\char tmp[32] = {0};\strftime(tmp, 31, "%H:%M:%S", ltm);\fprintf(stdout, "[%p %s %s:%d] " format "\n", (void*)pthread_self(), tmp, __FILE__, __LINE__, ##__VA_ARGS__);\}while(0)#define INF_LOG(format, ...) LOG(INF, format, ##__VA_ARGS__)
#define DBG_LOG(format, ...) LOG(DBG, format, ##__VA_ARGS__)
#define ERR_LOG(format, ...) LOG(ERR, format, ##__VA_ARGS__)

Buffer模块

Buffer缓冲区设计思想

Buffer是缓冲区模块,需要进行存储数据和取出数据。需要有一块内存空间,在这里我们采用vector<char>。因为使用的是vector,所以我们不能对数据进行频繁地挪动,所以我们需要有以下几个数据:

  • 默认空间大小
  • 当前的读取数据位置
  • 当前的写入数据位置

主要的操作有写入数据和读取数据。对于写入数据:

  • 写入的数据大小小于等于写入位置到末尾的距离,直接写入
  • 写入的数据大小大于写入位置到末尾的距离,但小于等于写入位置到末尾的距离 + 起始到写入位置的距离,先挪动数据再写入
  • 写入的数据大小大于写入位置到末尾的距离 + 起始到写入位置的距离,扩容
  • 写入完成之后,写入位置要向后偏移,会发现,因为写入数据之前都需要先保证后面的空间足够才会进行写入,不够则会进行偏移或扩容,所以写偏移永远只会向后移动

对于读取数据,只要缓冲区存有的数据大于读取的数据即可。

Buffer缓冲区代码实现

基本功能

基于上面的设计,缓冲区需要实现下面的接口:

#define BUFFER_DEFAULT_SIZE 1024
class Buffer
{
public:Buffer():_reader_idx(0), _writer_idx(0), _buffer(BUFFER_DEFAULT_SIZE) {}// 获取vector的起始地址char *Begin() { return &*_buffer.begin(); }// 获取当前写入起始地址char *WritePosition() { return Begin() + _writer_idx; }// 获取当前读取起始地址char *ReadPosition() { return Begin() + _reader_idx; }// 获取缓冲区末尾空闲空间大小 - 写偏移之后的空闲空间uint64_t TailIdleSize() {return _buffer.size() - _writer_idx; }// 获取缓冲区起始空闲空间大小 - 读偏移之前的空闲空间uint64_t HeadIdleSize() { return _reader_idx; }// 获取可读数据大小uint64_t ReadAbleSize() {return _writer_idx - _reader_idx; }// 将读偏移向后移动void MoveReadOffset(uint64_t len){if(len == 0) return ;// 读偏移向后移动的距离一定要小于可读数据大小assert(len <= ReadAbleSize());_reader_idx += len;}// 将写偏移向后移动void MoveWriteOffset(uint64_t len){// 写偏移向后移动的距离一定要小于后边空闲空间的大小assert(len <= TailIdleSize());_writer_idx += len;// 因为写入数据之前都需要先保证后面的空间足够才会进行写入// 不够则会进行偏移或扩容,所以写偏移永远只会向后移动}// 确保可写空间足够void EnsureWriteSpace(uint64_t len){// 如果末尾空闲空间大小足够,直接返回if(TailIdleSize() >= len) return;// 末尾空间不够,则判断加上起始的空闲空间是否足够if(len <= TailIdleSize() + HeadIdleSize()){// 足够则将数据移动到起始位置uint64_t rsz = ReadAbleSize(); // 保存数据大小std::copy(ReadPosition(), ReadPosition() + rsz, Begin()); // 将可读数据拷贝到起始位置// 修改读偏移与写偏移_writer_idx = 0;_reader_idx = rsz;}else{// 不够则进行扩容// 这里不移动数据,直接给写偏移之后扩容足够的空间即可_buffer.resize(_writer_idx + len);}}// 写入数据void Write(const void *data, uint64_t len){// 1. 保证有足够的空间   2. 将数据拷贝进去if(len == 0) return ;EnsureWriteSpace(len);// 不能直接使用data,因为void*没有步长const char *d = (const char*)data;std::copy(d, d + len, WritePosition());}// 读取数据void Read(void *buf, uint64_t len){// 要读取的数据大小必须小于可读数据大小assert(len <= ReadAbleSize());std::copy(ReadPosition(), ReadPosition() + len, (char*)buf);}// 清空缓冲区void Clear(){// 不需要真的清0,只需要覆盖即可_reader_idx = 0;_writer_idx = 0;}
private:std::vector<char> _buffer; // 使用vector进行内存空间管理// 这里的读、写位置是相对于vector起始地址的偏移量uint64_t _reader_idx;      // 读偏移uint64_t _writer_idx;      // 写偏移
};

我们在写入和读取这里提供更多的操作,现在无论是写入,还是读取都是根据一个指针,未来可能需要根据string或者Buffer。

// 将string中的数据全部写入缓冲区
void WriteString(const std::string &data)
{return Write(data.c_str(), data.size());
}
// 将Buffer中的数据全部写入缓冲区
void WriteBuffer(Buffer &data)
{return Write(data.ReadPosition(), data.ReadAbleSize());
}
// 将读取的数据当作一个string返回
std::string ReadAsString(uint64_t len)
{// 要读取的数据大小必须小于可读数据大小assert(len <= ReadAbleSize());std::string str;str.resize(len);// 因为str.c_str()返回的是一个const char*,所以不使用Read(&str[0], len);return str;
}

读取了数据、写入了数据之后,指针是需要偏移的,上面有实现偏移的接口,但是如果每次写完之后都需要再调用,就太麻烦了,这里将写入和指针移动整合到一起。

// 读取或写入数据,并让指针偏移
void ReadAndPop(void* buf, uint64_t len)
{Read(buf, len);MoveReadOffset(len);
}
std::string ReadAsStringAndPop(uint64_t len)
{assert(len <= ReadAbleSize());std::string str = ReadAsString(len);MoveReadOffset(len);return str;
}
void WriteAndPush(const void* data, uint64_t len)
{Write(data, len);MoveWriteOffset(len);
}
void WriteStringAndPush(const std::string& data)
{WriteString(data);MoveWriteOffset(data.size());
}
void WriteBufferAndPush(Buffer& data)
{WriteBuffer(data);MoveWriteOffset(data.ReadAbleSize());
}

协议支持

上面的接口对于高并发服务器使用已经足够了,但是如果要支持HTTP协议的话,还需要提供获取一行数据的接口。

// 支持HTTP协议,获取一行数据
// 寻找换行符
char* FindCRLF()
{char* res = (char*)memchr(ReadPosition(), '\n', ReadAbleSize());return res;
}
// 获取一行数据
std::string GetLine()
{char* pos = FindCRLF();if (pos == nullptr) return "";// +1是为了将换行符也取出来return ReadAsString(pos - (char*)ReadPosition() + 1);
}
std::string GetLineAndPop()
{std::string str = GetLine();MoveReadOffset(str.size());return str;
}

我们现在来对Buffer模块进行测试。先测试以下能否存入数据,再取出数据。

int main()
{Buffer buf;std::string str = "hello!!";buf.WriteStringAndPush(str);std::string tmp;tmp = buf.ReadAsStringAndPop(buf.ReadAbleSize());std::cout << tmp << std::endl;std::cout << buf.ReadAbleSize() << std::endl;return 0;
}

int main()
{Buffer buf;std::string str = "hello!!";buf.WriteStringAndPush(str);Buffer buf1;buf1.WriteBufferAndPush(buf);std::string tmp;tmp = buf.ReadAsStringAndPop(buf.ReadAbleSize());std::cout << tmp << std::endl;std::cout << buf.ReadAbleSize() << std::endl;std::cout << buf1.ReadAbleSize() << std::endl;return 0;
}

是可以正常存储数据和取出数据的。

再测试一下扩容功能是否正常。

int main()
{Buffer buf;for(int i = 0;i < 300;i ++){std::string str = "hello!!" + std::to_string(i) + '\n';buf.WriteStringAndPush(str);}while(buf.ReadAbleSize() > 0){std::string line = buf.GetLineAndPop();std::cout << line;}return 0;
}

是正常的。

Socket模块

Socket模块是对套接字接口进行封装,让上层能够更方便地使用套接字进行通信。需要封装以下接口:

  • 创建套接字
  • 绑定地址信息
  • 开始监听
  • 向服务器发起连接
  • 获取新连接
  • 接收数据
  • 发送数据
  • 关闭套接字
  • 创建一个服务端连接
  • 创建一个客户端连接
  • 设置套接字选项---开启地址端口重用
  • 设置套接字阻塞属性--设置为非阻塞
#define MAX_LISTEN 1024
class Socket
{
public:Socket():_sockfd(-1) {}Socket(int fd):_sockfd(fd) {}~Socket() { Close(); }int Fd() { return _sockfd; }// 创建套接字bool Create(){_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);if(_sockfd < 0){ERR_LOG("CREATE SOCKET FAILED!!!");return false;}return true;}// 绑定地址信息bool Bind(const std::string &ip, uint16_t port){struct sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(struct sockaddr_in);int ret = bind(_sockfd, (struct sockaddr*)&addr, len);if(ret < 0){ERR_LOG("BIND ADDRESS FAILED!!!");return false;}return true;}// 开始监听bool Listen(int backlog = MAX_LISTEN){int ret = listen(_sockfd, backlog);if(ret < 0){ERR_LOG("SOCKET LISTEN FAILED!!!");return false;}return true;}// 向服务器发起连接bool Connect(const std::string &ip, uint16_t port){struct sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(struct sockaddr_in);int ret = connect(_sockfd, (struct sockaddr*)&addr, len);if(ret < 0){ERR_LOG("CONNECT SERVER FAILED!!!");return false;}return true;}// 获取新连接int Accept(){// 直接返回连接对应的文件描述符,不关心客户端的地址信息,所以直接传入nullptrint newfd = accept(_sockfd, nullptr, nullptr);if(newfd < 0){ERR_LOG("SOCKET ACCEPT FAILED!!!");return -1;}return newfd;}// 接收数据 - 以阻塞方式ssize_t Recv(void *buf, size_t len, int flag = 0){ssize_t ret = recv(_sockfd, buf, len, flag);if(ret < 0){// EAGAIN 当前socket的接收缓冲区中没有数据,在非阻塞的情况下会有这个错误// EINTR  表示当前socket的阻塞等待被信号打断了if(errno == EAGAIN || errno == EINTR)return 0;ERR_LOG("SOCKET RECV FAILED!!!");return -1;}return ret; // 实际接收到的数据长度}// 发送数据 - 以阻塞方式ssize_t Send(const void *buf, size_t len, int flag = 0){ssize_t ret = send(_sockfd, buf, len, flag);if(ret < 0){if(errno == EAGAIN || errno == EINTR){return 0;}ERR_LOG("SOCKET SEND FAILED!!!");return -1;}return ret; // 实际发送的数据长度}// 接收数据 - 以非阻塞方式ssize_t NonBlockRecv(void *buf, size_t len){return Recv(buf, len, MSG_DONTWAIT);}// 发送数据 - 以非阻塞方式ssize_t NonBlockSend(void *buf, size_t len){if(len == 0) return 0;return Send(buf, len, MSG_DONTWAIT);}// 关闭套接字void Close(){if(_sockfd != -1){close(_sockfd);_sockfd = -1;}}// 创建一个服务端连接, 最后一个参数表示是否将套接字设置为非阻塞bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false){// 1. 创建套接字  2. 绑定地址  3. 开始监听  4. 设置非阻塞  5. 启动地址重用if(Create() == false) return false;if(block_flag) NonBlock();if(Bind(ip, port) == false) return false;if(Listen() == false) return false;ReuseAddress();return true;}// 创建一个客户端连接bool CreateClient(uint16_t port, const std::string &ip){// 1. 创建套接字  2. 连接服务器if(Create() == false) return false;if(Connect(ip, port) == false) return false;return true;}// 设置套接字选项 - 开启地址端口重用void ReuseAddress(){// 地址重用int val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void*)&val, sizeof(int));// 端口复用val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void*)&val, sizeof(int));}// 设置套接字阻塞属性 - 设置为非阻塞void NonBlock(){int flag = fcntl(_sockfd, F_GETFL, 0);fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);}
private:int _sockfd;
};

现在来测试一下基于Socket能否搭建出一个客户端和服务端进行通信。服务端:

int main()
{Socket lst_sock;lst_sock.CreateServer(8080);while(true){int newfd = lst_sock.Accept();if(newfd < 0) continue;// 将新连接对应的文件描述符封装成一个SocketSocket cli_sock(newfd);// 接收数据char buf[1024] = {0};int ret = cli_sock.Recv(buf, 1023);if(ret < 0){cli_sock.Close();continue;}// 直接将读到的消息返回cli_sock.Send(buf, ret);cli_sock.Close();}lst_sock.Close();return 0;
}

客户端:

int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");std::string str = "hello world!";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};cli_sock.Recv(buf, 1023);DBG_LOG("%s", buf);return 0;
}

此时是能够正常进行通信的。

Channel模块

Channel是Connection中对事件进行管理的模块,表示要监控一个连接(或者说文件描述符)的什么事件,以及事件触发了之后要如何进行处理。所以在接口设计上,可以分为事件管理和事件触发后处理的管理两部分,对于事件管理:

  • 描述符是否可读
  • 描述符是否可写
  • 对描述符监控可读
  • 对描述符监控可写
  • 解除可读事件监控
  • 解除可写事件监控
  • 解除所有事件监控

对于事件触发后处理的管理:

  • 需要处理的事件:可读,可写,挂断,错误,任意
  • 事件处理的回调函数。提供给EventLoop模块,当这个文件描述符有事件触发了,就调用这个函数,内部会根据具体是什么事件触发,调用相应的函数。
class Channel
{using EventCallback = std::function<void()>;
public:Channel();void SetReadCallback(const EventCallback &cb);void SetWriteCallback(const EventCallback &cb);void SetErrorCallback(const EventCallback &cb);void SetCloseCallback(const EventCallback &cb);void SetEventCallback(const EventCallback &cb);bool ReadAble();     // 当前是否监控了可读bool WriteAble();    // 当前是否监控了可写void EnableRead();   // 启动读事件监控void EnableWrite();  // 启动写事件监控void DisableRead();  // 关闭读事件监控void DisableWrite(); // 关闭写事件监控void DisableAll();   // 关闭所有事件监控void Remove();       // 移除监控void HandleEvent();  // 事件处理函数,一旦连接触发了事件,就调用这个函数
private:int _fd;uint32_t _events;  // 当前需要监控的事件uint32_t _revents; // 当前连接触发的事件EventCallback _read_callback;   // 可读事件触发时的回调函数EventCallback _write_callback;  // 可写事件触发时的回调函数EventCallback _error_callback;  // 错误事件触发时的回调函数EventCallback _close_callback;  // 连接断开事件触发时的回调函数EventCallback _event_callback;  // 任意事件触发时的回调函数
};

Channel模块是Connection模块的一个子模块,Channel模块里面对事件的回调函数就是由Connection设置进去的。当连接建立完成之后,会创建一个Connection,启动对这个连接的监控时,是需要将这个Connection内部的Channel挂到EventLoop上的,其实就是调用EventLoop的添加事件监控接口,将这个Channel交给EventLoop内部的Poller模块。因为Channel模块只是说明了需要监控一个连接的什么事件,并且当事件触发了之后要如何处理,但是并没有真正地进行监控,真正进行监控是EventLoop模块的子模块Poller的任务。

class Channel
{using EventCallback = std::function<void()>;
public:Channel(int fd):_fd(fd), _events(0), _revents(0) {}int Fd() { return _fd; }uint32_t Events() { return _events; }  // 获取想要监控的事件void SetEvents(uint32_t events) { _revents = events; }  // 设置实际就绪的事件void SetReadCallback(const EventCallback &cb) { _read_callback = cb;}void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }// 当前是否监控了可读bool ReadAble() { return (_events & EPOLLIN); }   // 当前是否监控了可写bool WriteAble() { return (_events & EPOLLOUT); }// 启动读事件监控void EnableRead() { _events |= EPOLLIN; /*添加到EventLoop的事件监控中*/ }// 启动写事件监控void EnableWrite() { _events |= EPOLLOUT; /*添加到EventLoop的事件监控中*/ }// 关闭读事件监控void DisableRead() { _events &= ~EPOLLIN; /*添加到EventLoop的事件监控中*/ }// 关闭写事件监控void DisableWrite() { _events &= ~EPOLLOUT; /*添加到EventLoop的事件监控中*/ }// 关闭所有事件监控void DisableAll() { _events = 0; }// 移除监控void Remove() { /*调用EventLoop的接口来移除监控*/ }// 设置就绪事件void SetEvents(uint32_t events) { _revents = events; }// 事件处理函数,一旦连接触发了事件,就调用这个函数void HandleEvent(){if((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)){if(_event_callback) _event_callback();if(_read_callback) _read_callback();}// 有可能会释放连接的操作事件,一次只处理一个if(_revents & EPOLLOUT){if(_event_callback) _event_callback();if(_write_callback) _write_callback();}else if(_revents & EPOLLERR){if(_event_callback) _event_callback();if(_error_callback) _error_callback();}else if(_revents & EPOLLHUP){if(_event_callback) _event_callback();if(_close_callback) _close_callback();}}
private:int _fd;uint32_t _events;  // 当前需要监控的事件uint32_t _revents; // 当前连接触发的事件EventCallback _read_callback;   // 可读事件触发时的回调函数EventCallback _write_callback;  // 可写事件触发时的回调函数EventCallback _error_callback;  // 错误事件触发时的回调函数EventCallback _close_callback;  // 连接断开事件触发时的回调函数EventCallback _event_callback;  // 任意事件触发时的回调函数
};

这里重点看一下事件处理函数。任意事件触发的回调函数意思就是所有事件触发了,都要调用一下,一般的功能就是刷新连接的活跃度,避免被当成一个超时连接。在可读事件触发、对端连接关闭(自己这端的连接是正常的)、紧急数据可读事件触发时,是不会关闭自己这端的连接的,此时可以处理完这个事件后,继续处理其他的就绪事件,并且也可以先调用读事件触发回调处理函数,再调用任意事件触发回调处理函数。而可写事件触发、错误事件触发、连接断开事件触发,均有可能导致释放连接,就不应该继续向下处理了,所以使用else if,并且任意事件触发回调处理函数需要在前面调用,因为如果连接已经释放了,再调用就没有意义了。

Poller模块

Poller类编写

Poller模块其实就是对epoll的封装。Poller模块是EventLoop模块的子模块,是进行描述符事件监控的。因为Channel模块已经封装了一个连接关心的事件,并且有了这些事件触发的回调函数,所以Poller模块实际上就是对一个一个的Channel对象进行操作。

接口设计:

  • 添加/修改文件描述符的事件监控(一个接口,不存在则添加,存在则修改)
  • 移除文件描述符的事件监控
  • 启动监控

成员变量:

  • 拥有一个epoll的操作句柄
  • 拥有一个struct epoll_event的数组,用于保存获取到的就绪事件
  • 使用哈希表管理描述符与描述符对应的事件管理Channel对象

逻辑流程:

  • 通过Channel对文件描述符进行监控
  • 当文件描述符就绪了,通过哈希表找到对应的Channel对象,设置就绪事件,然后调用Channel内部的事件处理函数,即可完成对就绪事件的处理。
#define MAX_EPOLLEVENTS 1024
class Poller
{
public:Poller(){_epfd = epoll_create(MAX_EPOLLEVENTS);if(_epfd < 0){ERR_LOG("EPOLL CREATE FAILED!!!");abort(); // 退出程序}}// 添加或修改监控事件void UpdateEvent(Channel *channel){bool ret = HasChannel(channel);if(ret == false){// 不存在则添加_channels.insert(std::make_pair(channel->Fd(), channel));return Update(channel, EPOLL_CTL_ADD);}return Update(channel, EPOLL_CTL_MOD);}// 移除监控void RemoveEvent(Channel *channel){auto it = _channels.find(channel->Fd());if(it != _channels.end()){_channels.erase(it);}Update(channel, EPOLL_CTL_DEL);}// 开始监控,返回活跃连接void Poll(std::vector<Channel*> *active){int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1);if(nfds < 0){if(errno == EINTR) return ;ERR_LOG("EPOLL WAIT ERROR: %s\n", strerror(errno));abort(); }// 设置就绪事件for(int i = 0;i < nfds;i ++){auto it = _channels.find(_evs[i].data.fd);assert(it != _channels.end());it->second->SetEvents(_evs[i].events); active->push_back(it->second);}return;}
private:// 对epoll的直接操作void Update(Channel *channel, int op){int fd = channel->Fd();struct epoll_event ev;ev.data.fd = fd;ev.events = channel->Events();int ret = epoll_ctl(_epfd, op, fd, &ev);if(ret < 0){ERR_LOG("EPOLLCTL FAILED!!!");}return ;}// 判断一个Channel是否已经添加了事件监控bool HasChannel(Channel *channel){auto it = _channels.find(channel->Fd());if(it == _channels.end()) return false;return true;}
private:int _epfd;struct epoll_event _evs[MAX_EPOLLEVENTS];std::unordered_map<int, Channel*> _channels;
};

Poller与Channel整合

我们前面说过,启动对一个连接的某个事件的监控时,就是将这个Connection内部的Channel添加到EventLoop内部的Poller中,让Poller进行监控。虽然我们现在既没有Connection,也没有EventLoop,但是我们可以先完成将Channel交给Poller进行监控的操作。

Channel中就需要有一个Poller。

class Channel
{using EventCallback = std::function<void()>;
public:Channel(Poller *poller, int fd):_fd(fd), _events(0), _revents(0), _poller(poller) {}int Fd() { return _fd; }uint32_t Events() { return _events; }  // 获取想要监控的事件void SetEvents(uint32_t events) { _revents = events; }  // 设置实际就绪的事件void SetReadCallback(const EventCallback &cb) { _read_callback = cb;}void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }// 当前是否监控了可读bool ReadAble() { return (_events & EPOLLIN); }   // 当前是否监控了可写bool WriteAble() { return (_events & EPOLLOUT); }// 启动读事件监控void EnableRead() { _events |= EPOLLIN; Update(); }// 启动写事件监控void EnableWrite() { _events |= EPOLLOUT; Update(); }// 关闭读事件监控void DisableRead() { _events &= ~EPOLLIN; Update(); }// 关闭写事件监控void DisableWrite() { _events &= ~EPOLLOUT; Update(); }// 关闭所有事件监控void DisableAll() { _events = 0; Update(); }// 移除监控void Remove();// 添加/修改监控void Update();// 事件处理函数,一旦连接触发了事件,就调用这个函数void HandleEvent(){if((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)){if(_event_callback) _event_callback();if(_read_callback) _read_callback();}// 有可能会释放连接的操作事件,一次只处理一个if(_revents & EPOLLOUT){if(_event_callback) _event_callback();if(_write_callback) _write_callback();}else if(_revents & EPOLLERR){if(_event_callback) _event_callback();if(_error_callback) _error_callback();}else if(_revents & EPOLLHUP){if(_event_callback) _event_callback();if(_close_callback) _close_callback();}}
private:int _fd;Poller *_poller;uint32_t _events;  // 当前需要监控的事件uint32_t _revents; // 当前连接触发的事件EventCallback _read_callback;   // 可读事件触发时的回调函数EventCallback _write_callback;  // 可写事件触发时的回调函数EventCallback _error_callback;  // 错误事件触发时的回调函数EventCallback _close_callback;  // 连接断开事件触发时的回调函数EventCallback _event_callback;  // 任意事件触发时的回调函数
};void Channel::Remove() { return _poller->RemoveEvent(this); }
void Channel::Update() { return _poller->UpdateEvent(this); }

因为Channel定义在Poller前面,所以只能在前面声明,因为没有定义,所以后面两个函数只能在类外实现。

对于服务端,现在我们创建套接字之后,就不能立即获取新连接了,而是创建一个Channel,让Channel对这个套接字进行事件管理。对于监听套接字,需要监控可读事件,回调函数是获取新连接,并为新连接创建Channel。

void HandleClose(Channel *channel)
{std::cout << "close: " << channel->Fd() << std::endl;channel->Remove(); // 移除监控delete channel;
}
void HandleRead(Channel *channel)
{int fd = channel->Fd();char buf[1024] = {0};int ret = recv(fd, buf, 1023, 0);if(ret <= 0){return HandleClose(channel);}// 读到数据之后,给客户端回复,启动可写事件channel->EnableWrite();std::cout << buf << std::endl;
}
void HandleWrite(Channel *channel)
{int fd = channel->Fd();// 作为测试,向客户端写回固定的数据const char *data = "天气还不错!!!";int ret = send(fd, data, strlen(data), 0);if(ret < 0){return HandleClose(channel);}channel->DisableWrite(); // 关闭写监控
}
void HandleError(Channel *channel)
{return HandleClose(channel);
}
void HandleEvent(Channel *channel)
{std::cout << "有一个事件触发了!!!\n";
}// 对于监听套接字读事件触发的回调函数
void Acceptor(Poller *poller, Channel *lst_channel)
{// 获取新连接int fd = lst_channel->Fd();int newfd = accept(fd, nullptr, nullptr);if(newfd < 0) return ;// 为新连接封装ChannelChannel *channel = new Channel(poller, newfd);channel->SetReadCallback(std::bind(HandleRead, channel));channel->SetWriteCallback(std::bind(HandleWrite, channel));channel->SetCloseCallback(std::bind(HandleClose, channel));channel->SetErrorCallback(std::bind(HandleError, channel));channel->SetEventCallback(std::bind(HandleEvent, channel));channel->EnableRead();
}int main()
{Poller poller;Socket lst_sock;lst_sock.CreateServer(8080);// 为监听套接字创建一个ChannelChannel channel(&poller, lst_sock.Fd());channel.SetReadCallback(std::bind(Acceptor, &poller, &channel));// 启动可读事件监控channel.EnableRead();while(true){// 开始事件监控,获取事件就绪的连接,并进行处理std::vector<Channel*> actives;poller.Poll(&actives);for(auto &a : actives){a->HandleEvent();}}lst_sock.Close();return 0;
}

在获取新连接时,为了简便,没有使用Socket,直接使用文件描述符。客户端:

int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");while(true){std::string str = "今天天气怎么样?";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};cli_sock.Recv(buf, 1023);DBG_LOG("%s", buf);sleep(1);}return 0;
}

我们让客户端每隔1秒给服务端发送一条固定的消息,并将得到的响应打印出来。服务端会读取客户端发送过来的数据,并回复一条固定的响应。


服务端每次会打印出两个事件触发,是因为读事件触发后,又启动了写事件。

EventLoop模块

EventLoop模块是真正进行事件管理的模块,Poller只是它的一个子模块,负责进行事件监控。

eventfd认识

eventfd是一种事件通知机制。在内核中本质上就是一个计数器,创建一个eventfd在内核中就会创建一个计数器。每当向eventfd中写入一个数值,表示进行了一次事件通知,可以使用read将eventfd中的数据读取出来,读取到的数据就是通知的次数。

用处:在EventLoop模块中实现线程间事件通知的功能。

#include <sys/eventfd.h>int eventfd(unsigned int initval, int flags);

功能:创建一个eventfd对象,实现事件通知。参数:

  • initval:计数初值
  • flags:EFD_CLOEXEC表示禁止进程复制;EFD_NONBLOCK表示启动非阻塞属性

返回值:返回一个文件描述符。所以,eventfd是可以使用read/write/close进行操作的。注意:read和write在进行IO的时候,数据只能是一个8字节数据。

int main()
{int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if(efd < 0){perror("eventfd failed!!!");return -1;}uint64_t val = 1;write(efd, &val, sizeof(val));write(efd, &val, sizeof(val));write(efd, &val, sizeof(val));uint64_t res = 0;read(efd, &res, sizeof(res));std::cout << res << std::endl;close(efd);return 0;
}

可以看到,写入多次时,只需要一次就能读取完。

EventLoop模块设计思想

EventLoop模块是进行事件监控,以及事件处理的模块,这个模块与线程是一一对应的。也就是说,一个线程一个EventLoop

监控一个连接,一旦这个连接有事件就绪了,就需要对事件进行处理。如果一个文件描述符的IO事件被多个线程同时处理,是会存在线程安全问题的,因此我们需要将一个连接的事件监控、事件处理,以及其他操作都交给同一个线程处理。如何将这一系列的操作都放在一个线程中处理呢?我们是没办法将一个连接与一个线程进行绑定的,此时就可以利用一个线程一个EventLoop,将一个连接与一个EventLoop进行绑定。在上面,每一个Channel绑定一个Poller,未来只需要将Poller修改为EventLoop即可。

此时仍然是没办法保证所有的操作均由EventLoop的线程完成的。当其他线程需要与EventLoop交互时,例如主线程获取到了新线程后,将文件描述符注册到子线程的EventLoop中,也就是将文件描述符添加到EventLoop的Poller中,如果EventLoop允许主线程直接进行操作,必然是线程不安全的。此时可以定义一个任务队列,将向EventLoop注册一个文件描述符的操作封装成一个任务,放入到任务队列中,后序由EventLoop来执行这个任务,即可保证线程安全。

可以在EventLoop内部定义一个函数,这个函数的参数是一个要执行的任务,函数内部可以判断出调用这个函数的线程是否是EventLoop所在的线程,如果是,直接执行任务,如果不是,则将这个任务放入到任务队列当中,等就绪事件处理完成之后,再统一处理任务队列中的任务。未来当其他线程想与EventLoop交互时,可以直接调用这个函数。

eventfd的处理流程:

  • 在线程中对文件描述符进行事件监控
  • 有文件描述符就绪则对文件描述符进行事件处理
  • 所有就绪事件处理完成后,将任务队列中的所有任务一一执行

这里是需要保证任务队列线程安全的,所以可以给任务队列提供一把锁。任务队列也不一定要是队列,未来任务队列中就是一个一个的函数对象,如果我们加锁后对任务队列中的任务一个一个执行,效率太低了。所以我们可以使用一个vector来保存任务对象,未来要执行任务队列时,直接将vector拷贝一份,解锁,然后根据拷贝结果执行。

EventLoop代码设计

EventLoop模块根据目前的分析,需要实现以下功能:

  1. 事件监控。使用Poller模块,有事件就绪则进行事件处理。
  2. 执行任务队列中的任务。

注意:如果其他的线程将任务放到了任务队列中,而EventLoop的线程仍然阻塞在事件监控处,此时会导致任务队列中的任务得不到及时处理,所以EventLoop中要有一个eventfd对象,用于进行事件通知,唤醒阻塞状态的EventLoop线程。

并且还要有一个Channel对象,用于对eventfd进行管理,并将其添加到EventLoop的事件监控当中。因为当需要唤醒时,会向内部写入数据,所以可以监控它的可读事件。当可读事件触发了,EventLoop就会从阻塞状态恢复,从而执行任务队列中的任务。

前面我们说了,我们可以定义一个函数,这个函数可以判断出调用这个函数的线程是否是EventLoop线程。所以给成员变量增加一个线程ID,用于保存EventLoop所在线程的ID,专门用来判断用户的某个操作的线程与EventLoop所在线程是否是同一个线程,若是同一个线程则直接执行,若不是同一个线程则将任务压入任务队列。

class EventLoop
{using Functor = std::function<void()>;
private:// 执行任务池中的所有任务void RunAllTask(){// 拷贝一个任务池std::vector<Functor> functor;{std::unique_lock<std::mutex> _lock(_mutex);_tasks.swap(functor);}for(auto &f : functor){f();}return ;}// 创建一个eventfdstatic int CreateEventFd(){int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if(efd < 0){ERR_LOG("CREATE EVENTFD FAILED!!!");abort();}return efd;}// eventfd的可读事件回调函数void ReadEventFd(){uint64_t res = 0;int ret = read(_event_fd, &res, sizeof(res));if(ret < 0){// EINTR -- 被信号打断  EAGAIN -- 表示无数据可读if(errno == EINTR || errno == EAGAIN) return ;ERR_LOG("READ EVENTFD FAILED!!!");abort();}return ;}// 给eventfd中写入一个数据,触发它的可读事件void WeakUpEventFd(){uint64_t val = 1;int ret = write(_event_fd, &val, sizeof(val));if(ret < 0){if(errno == EINTR) return ;ERR_LOG("READ EVENTFD FAILED!!!");abort();}return ;}
public:EventLoop():_thread_id(std::this_thread::get_id()),_event_fd(CreateEventFd()),_event_channel(new Channel(this, _event_fd)){// 给eventfd添加可读事件回调函数,读取eventfd事件通知次数_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventFd, this));// 启动eventfd的读事件监控_event_channel->EnableRead();}// 判断当前线程是否是EventLoop对应的线程bool IsInLoop(){return (_thread_id == std::this_thread::get_id());}// 判断将要执行的任务是否处于EventLoop所在线程,如果是则执行,不是则压入任务池void RunInLoop(const Functor &cb){if(IsInLoop()) return cb();return QueueInLoop(cb);}// 将操作压入任务池void QueueInLoop(const Functor &cb){{std::unique_lock<std::mutex> _lock(_mutex);_tasks.push_back(cb);}// 唤醒可能因为没有事件就绪,而导致的epoll阻塞// 其实就是给eventfd写入一个数据,触发eventfd的可读事件WeakUpEventFd();}// 添加/修改描述符的事件监控void UpdateEvent(Channel *channel){return _poller.UpdateEvent(channel);}// 删除描述符的事件监控void RemoveEvent(Channel *channel){return _poller.RemoveEvent(channel);}// 启动EventLoopvoid Start(){// 1. 事件监控std::vector<Channel*> actives;_poller.Poll(&actives);// 2. 事件处理for(auto &channel : actives){channel->HandleEvent();} // 3. 执行任务RunAllTask();}
private:std::thread::id _thread_id;  // 线程IDint _event_fd;               // eventfd唤醒IO事件监控有可能导致的阻塞std::unique_ptr<Channel> _event_channel; // 管理_event_fdPoller _poller; // 进行所有文件描述符的事件监控std::vector<Functor> _tasks; // 任务池std::mutex _mutex;           // 保证任务池的线程安全
};

当外部要与EventLoop进行交互时,必须通过EventLoop提供的线程安全接口RunInLoop进行。比方说外部要向这个EventLoop注册一个文件描述符,此时就需要将任务压入到任务队列中,让任务在EventLoop所在线程内部执行。

当其他线程调用了RunInLoop,将任务压入到任务队列之后,如果EventLoop所在线程阻塞在了Start的Poll,任务队列中的任务就得不到及时的执行,所以在将任务压入任务队列之后,就需要唤醒EventLoop线程。

我们还需要对之前的Channel进行修改,内部不再使用Poller,而是使用EventLoop。

class Channel
{using EventCallback = std::function<void()>;
public:Channel(EventLoop *loop, int fd):_fd(fd), _events(0), _revents(0), _loop(loop) {}int Fd() { return _fd; }uint32_t Events() { return _events; }  // 获取想要监控的事件void SetEvents(uint32_t events) { _revents = events; }  // 设置实际就绪的事件void SetReadCallback(const EventCallback &cb) { _read_callback = cb;}void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }// 当前是否监控了可读bool ReadAble() { return (_events & EPOLLIN); }   // 当前是否监控了可写bool WriteAble() { return (_events & EPOLLOUT); }// 启动读事件监控void EnableRead() { _events |= EPOLLIN; Update(); }// 启动写事件监控void EnableWrite() { _events |= EPOLLOUT; Update(); }// 关闭读事件监控void DisableRead() { _events &= ~EPOLLIN; Update(); }// 关闭写事件监控void DisableWrite() { _events &= ~EPOLLOUT; Update(); }// 关闭所有事件监控void DisableAll() { _events = 0; Update(); }// 移除监控void Remove();// 添加/修改监控void Update();// 事件处理函数,一旦连接触发了事件,就调用这个函数void HandleEvent(){if((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)){if(_event_callback) _event_callback();if(_read_callback) _read_callback();}// 有可能会释放连接的操作事件,一次只处理一个if(_revents & EPOLLOUT){if(_event_callback) _event_callback();if(_write_callback) _write_callback();}else if(_revents & EPOLLERR){if(_event_callback) _event_callback();if(_error_callback) _error_callback();}else if(_revents & EPOLLHUP){if(_event_callback) _event_callback();if(_close_callback) _close_callback();}}
private:int _fd;EventLoop *_loop;uint32_t _events;  // 当前需要监控的事件uint32_t _revents; // 当前连接触发的事件EventCallback _read_callback;   // 可读事件触发时的回调函数EventCallback _write_callback;  // 可写事件触发时的回调函数EventCallback _error_callback;  // 错误事件触发时的回调函数EventCallback _close_callback;  // 连接断开事件触发时的回调函数EventCallback _event_callback;  // 任意事件触发时的回调函数
};void Channel::Remove() { return _loop->RemoveEvent(this); }
void Channel::Update() { return _loop->UpdateEvent(this); }

我们来测试一下EventLoop的事件监控功能是否正常。服务端:

void HandleClose(Channel *channel)
{std::cout << "close: " << channel->Fd() << std::endl;channel->Remove(); // 移除监控delete channel;
}
void HandleRead(Channel *channel)
{int fd = channel->Fd();char buf[1024] = {0};int ret = recv(fd, buf, 1023, 0);if(ret <= 0){return HandleClose(channel);}// 读到数据之后,给客户端回复,启动可写事件channel->EnableWrite();std::cout << buf << std::endl;
}
void HandleWrite(Channel *channel)
{int fd = channel->Fd();// 作为测试,向客户端写回固定的数据const char *data = "天气还不错!!!";int ret = send(fd, data, strlen(data), 0);if(ret < 0){return HandleClose(channel);}channel->DisableWrite(); // 关闭写监控
}
void HandleError(Channel *channel)
{return HandleClose(channel);
}
void HandleEvent(Channel *channel)
{std::cout << "有一个事件触发了!!!\n";
}// 对于监听套接字读事件触发的回调函数
void Acceptor(EventLoop *loop, Channel *lst_channel)
{// 获取新连接int fd = lst_channel->Fd();int newfd = accept(fd, nullptr, nullptr);if(newfd < 0) return ;// 为新连接封装ChannelChannel *channel = new Channel(loop, newfd);channel->SetReadCallback(std::bind(HandleRead, channel));channel->SetWriteCallback(std::bind(HandleWrite, channel));channel->SetCloseCallback(std::bind(HandleClose, channel));channel->SetErrorCallback(std::bind(HandleError, channel));channel->SetEventCallback(std::bind(HandleEvent, channel));channel->EnableRead();
}int main()
{EventLoop loop;Socket lst_sock;lst_sock.CreateServer(8080);// 为监听套接字创建一个ChannelChannel channel(&loop, lst_sock.Fd());channel.SetReadCallback(std::bind(Acceptor, &loop, &channel));// 启动可读事件监控channel.EnableRead();while(true){// 开始事件监控,获取事件就绪的连接,并进行处理loop.Start();}lst_sock.Close();return 0;
}

客户端:

int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");while(true){std::string str = "今天天气怎么样?";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};cli_sock.Recv(buf, 1023);DBG_LOG("%s", buf);sleep(1);}return 0;
}

两者可以正常进行通信,所以EventLoop的事件监控功能是没有问题的。

TimerWheel定时器

我们之前实现的TimerWheel时间轮需要每秒执行一次定时任务,到了时间后,要如何通知定时器执行任务呢?此时就需要使用timerfd了,每隔1秒,通知一次时间轮。

定时器模块的整合:

  • timerfd:实现内核每隔一段时间,给进程一次超时事件(timerfd可读)
  • timerwheel:实现每次执行Runtimetask,都可以执行一次到期的定时任务

要实现一个完整的秒级定时器,就需要将上面的两个模块整合到一起,具体做法是:timerfd设置为每秒触发一次定时事件,当事件被触发,则运行一次timerwheel的runtimetask,执行一下所有的过期的定时任务,而timerfd的事件监控与触发,可以融合EventLoop来实现。业务EventLoop是对文件描述符进行事件监控的,所以可以使用EventLoop对timerfd进行事件监控,这样当timerfd触发了事件,就可以通过EventLoop得知,从而对事件进行处理,也就是调用timerwheel的rumtimetask。

// 定时任务对象类
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{
public:TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb): _id(id), _timeout(delay), _task_cb(cb), _canceled(false) {    // 设置timerfd读事件触发时的回调函数_timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));// 启动读事件监控_timer_channel->EnableRead(); }~TimerTask() {// 只有定时任务没有被取消才执行定时任务if(_canceled == false) _task_cb(); _release(); }void SetRelease(const ReleaseFunc &cb) { _release = cb; }uint32_t DelayTime() { return _timeout; }// 取消定时任务void Cancel() { _canceled = true; }
private:uint64_t _id;          // 定时任务对象的IDuint32_t _timeout;     // 定时任务的超时时间bool _canceled;        // 该定时任务是否被取消TaskFunc _task_cb;     // 定时任务对象要执行的定时任务ReleaseFunc _release;  // 删除TimerWheel中保存的定时任务对象信息
};// 时间轮定时器
class TimerWheel
{
public:TimerWheel(EventLoop *loop):_capacity(60), _tick(0), _wheel(_capacity), _loop(loop),_timerfd(CreateTimerfd()), _timer_channel(new Channel(_loop, _timerfd)){}// 添加定时任务void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb);// 取消定时任务void TimerCancel(uint64_t id);void TimerRefresh(uint64_t id);// 查找是否存在指定的定时任务// 这个接口是存在线程安全的,只能在EventLoop所在线程内部调用bool HasTimer(uint64_t id){auto it = _timers.find(id);if(it == _timers.end()){return false;}return true;}
private:// 将ID与weak_ptr的映射关系从_timers中移除void RemoveTimer(uint64_t id){auto it = _timers.find(id);if(it != _timers.end()){_timers.erase(it);}}// 创建一个timerfdstatic int CreateTimerfd(){// 创建timerfdint timerfd = timerfd_create(CLOCK_MONOTONIC, 0);if(timerfd < 0){ERR_LOG("TIMERFD CREATE FAILED!!!");abort();}// 设置超时时间为1秒struct itimerspec itime;itime.it_value.tv_sec = 1;itime.it_value.tv_nsec = 0;itime.it_interval.tv_sec = 1;itime.it_interval.tv_nsec = 0;timerfd_settime(timerfd, 0, &itime, nullptr);return timerfd;}// 读取timerfd底层文件内的数据int ReadTimerfd(){uint64_t times;int ret = read(_timerfd, &times, 8);if(ret < 0){ERR_LOG("READ TIMEFD FAILED!!!");abort();}return times;}// 运转时间轮,让秒针每秒向后走一步void RunTimerTask(){_tick = (_tick + 1) % _capacity;_wheel[_tick].clear();}// timerfd时间到了之后执行的任务void OnTime(){// 时间到了之后,读取定时器的数值,并执行定时任务// 必须读取定时器的数值,否则会一直触发可读时间int times = ReadTimefd();for (int i = 0; i < times; i++) {RunTimerTask();}}// 线程安全地添加定时任务void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb){PtrTask pt(new TimerTask(id, delay, cb));pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);_timers[id] = WeakTask(pt);}// 线程安全地刷新定时任务void TimerRefreshInLoop(uint64_t id){// 通过保存的定时任务对象的weak_ptr构造一个shared_ptr,并添加到时间轮中auto it = _timers.find(id);if(it == _timers.end()){return; }PtrTask pt = it->second.lock();int delay = pt->DelayTime();int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);}// 线程安全地取消定时任务void TimerCancelInLoop(uint64_t id){auto it = _timers.find(id);if(it == _timers.end()){return ;}PtrTask pt = it->second.lock();if(pt) pt->Cancel();}
private:using WeakTask = std::weak_ptr<TimerTask>;using PtrTask = std::shared_ptr<TimerTask>;int _tick;      // 秒针int _capacity;  // 表盘容量,其实就是最大延迟时间std::vector<std::vector<PtrTask>> _wheel;// 保存定时任务ID与weak_ptr的映射关系,这里一定不能是shared_ptr// 否则shared_ptr的引用计数永远不为0std::unordered_map<uint64_t, WeakTask> _timers;EventLoop *_loop;// 定时器timerfd的描述符,可读事件回调就是读取计数器,执行定时任务int _timerfd;std::unique_ptr<Channel> _timer_channel;
};// 添加定时任务
void TimerWheel::TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb)
{_loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, cb));
}// 刷新/延迟定时任务
void TimerWheel::TimerRefresh(uint64_t id)
{_loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));
}// 取消定时任务
void TimerWheel::TimerCancel(uint64_t id)
{_loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id));
}

时间轮定时器一定要确保线程安全,有两点原因:

  • 一个时间轮定时器是会被多个线程同时使用的,向定时器添加定时任务时,是在哪一个线程都有可能添加的,而定时器是与EventLoop保存在一起的,里面的很多操作都是对连接进行操作的,所以定时任务的添加、刷新、取消等都需要在EventLoop所在线程中执行,并且定时任务也需要在EventLoop所在线程中执行
  • TimerWheel中是使用一个哈希表管理所有定时任务的,如果如何线程都可以对其进行操作,那么是存在线程安全问题的,所以可以将对定时器的操作都放在一个线程中进行,确保线程安全。

TimerWheel与EventLoop整合

class EventLoop
{using Functor = std::function<void()>;
private:// 执行任务池中的所有任务void RunAllTask(){// 拷贝一个任务池std::vector<Functor> functor;{std::unique_lock<std::mutex> _lock(_mutex);_tasks.swap(functor);}for(auto &f : functor){f();}return ;}// 创建一个eventfdstatic int CreateEventFd(){int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if(efd < 0){ERR_LOG("CREATE EVENTFD FAILED!!!");abort();}return efd;}// eventfd的可读事件回调函数void ReadEventFd(){uint64_t res = 0;int ret = read(_event_fd, &res, sizeof(res));if(ret < 0){// EINTR -- 被信号打断  EAGAIN -- 表示无数据可读if(errno == EINTR || errno == EAGAIN) return ;ERR_LOG("READ EVENTFD FAILED!!!");abort();}return ;}// 给eventfd中写入一个数据,触发它的可读事件void WeakUpEventFd(){uint64_t val = 1;int ret = write(_event_fd, &val, sizeof(val));if(ret < 0){if(errno == EINTR) return ;ERR_LOG("READ EVENTFD FAILED!!!");abort();}return ;}
public:EventLoop():_thread_id(std::this_thread::get_id()),_event_fd(CreateEventFd()),_event_channel(new Channel(this, _event_fd)),_timer_wheel(this){// 给eventfd添加可读事件回调函数,读取eventfd事件通知次数_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventFd, this));// 启动eventfd的读事件监控_event_channel->EnableRead();}// 判断当前线程是否是EventLoop对应的线程bool IsInLoop(){return (_thread_id == std::this_thread::get_id());}// 判断将要执行的任务是否处于EventLoop所在线程,如果是则执行,不是则压入任务池void RunInLoop(const Functor &cb){if(IsInLoop()) return cb();return QueueInLoop(cb);}// 将操作压入任务池void QueueInLoop(const Functor &cb){{std::unique_lock<std::mutex> _lock(_mutex);_tasks.push_back(cb);}// 唤醒可能因为没有事件就绪,而导致的epoll阻塞// 其实就是给eventfd写入一个数据,触发eventfd的可读事件WeakUpEventFd();}// 添加/修改描述符的事件监控void UpdateEvent(Channel *channel){return _poller.UpdateEvent(channel);}// 删除描述符的事件监控void RemoveEvent(Channel *channel){return _poller.RemoveEvent(channel);}// 启动EventLoopvoid Start(){// 1. 事件监控std::vector<Channel*> actives;_poller.Poll(&actives);// 2. 事件处理for(auto &channel : actives){channel->HandleEvent();} // 3. 执行任务RunAllTask();}// 向定时器添加定时任务 void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { return _timer_wheel.TimerAdd(id, delay, cb); }// 刷新定时任务void TimerRefresh(uint64_t id) { return _timer_wheel.TimerRefresh(id); }// 取消定时任务void TimerCancel(uint64_t id) { return _timer_wheel.TimerCancel(id); }// 检查定时器中是否有指定的定时任务bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }
private: std::thread::id _thread_id;  // 线程IDint _event_fd;               // eventfd唤醒IO事件监控有可能导致的阻塞std::unique_ptr<Channel> _event_channel; // 管理_event_fdPoller _poller; // 进行所有文件描述符的事件监控std::vector<Functor> _tasks; // 任务池std::mutex _mutex;           // 保证任务池的线程安全TimerWheel _timer_wheel;	 // 定时器,用于超时管理
};

我们来测试一下定时功能是否正常。服务端:

void HandleClose(Channel *channel)
{DBG_LOG("close fd: %d", channel->Fd());channel->Remove(); // 移除监控delete channel;
}
void HandleRead(Channel *channel)
{int fd = channel->Fd();char buf[1024] = {0};int ret = recv(fd, buf, 1023, 0);if(ret <= 0){return HandleClose(channel);}DBG_LOG("%s", buf);// 读到数据之后,给客户端回复,启动可写事件channel->EnableWrite();
}
void HandleWrite(Channel *channel)
{int fd = channel->Fd();// 作为测试,向客户端写回固定的数据const char *data = "天气还不错!!!";int ret = send(fd, data, strlen(data), 0);if(ret < 0){return HandleClose(channel);}channel->DisableWrite(); // 关闭写监控
}
void HandleError(Channel *channel)
{return HandleClose(channel);
}
// 任意事件触发时,就刷新定时任务
void HandleEvent(EventLoop *loop, Channel *channel, uint64_t timerid)
{loop->TimerRefresh(timerid);
}// 对于监听套接字读事件触发的回调函数
void Acceptor(EventLoop *loop, Channel *lst_channel)
{// 获取新连接int fd = lst_channel->Fd();int newfd = accept(fd, nullptr, nullptr);if(newfd < 0) return ;// 为新连接封装Channeluint64_t timerid = rand() % 10000;Channel *channel = new Channel(loop, newfd);channel->SetReadCallback(std::bind(HandleRead, channel));channel->SetWriteCallback(std::bind(HandleWrite, channel));channel->SetCloseCallback(std::bind(HandleClose, channel));channel->SetErrorCallback(std::bind(HandleError, channel));channel->SetEventCallback(std::bind(HandleEvent, loop, channel, timerid));// 非活跃连接10秒后释放// 注意:设置定时销毁任务时,必须在启动读事件之前,因为有可能启动了事件监控后,立即就有了事件,但是这个时候还没用任务loop->TimerAdd(timerid, 10, std::bind(HandleClose, channel));channel->EnableRead();
}int main()
{srand(time(nullptr));EventLoop loop;Socket lst_sock;lst_sock.CreateServer(8080);// 为监听套接字创建一个ChannelChannel channel(&loop, lst_sock.Fd());channel.SetReadCallback(std::bind(Acceptor, &loop, &channel));// 启动可读事件监控channel.EnableRead();while(true){// 开始事件监控,获取事件就绪的连接,并进行处理loop.Start();}lst_sock.Close();return 0;
}

客户端:

int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");for(int i = 0;i < 5;i ++){std::string str = "今天天气怎么样?";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};cli_sock.Recv(buf, 1023);DBG_LOG("%s", buf);sleep(1);}while(1) sleep(1);return 0;
}

我们让客户端给服务端发送5次数据后就不再发送了。

可以看到,定时功能是正常的。

Connection模块

Connection模块功能思想

Connection是对通信连接进行整体管理的模块,对通信连接的所有操作都是通过这个模块提供的功能完成的。整合了Buffer模块、Socket模块、Channel模块。

管理:

  • 套接字的管理,能够进行套接字的操作
  • 连接事件的管理,可读、可写、错误、挂断、任意
  • 缓冲区的管理,便于Socket数据的接收和发送
  • 协议上下文的管理,记录请求数据的处理过程
  • 回调函数的管理

因为连接接收到数据之后该如何处理,需要由用户决定,因此必须有业务处理回调函数,一个连接建立成功后,该如何处理,由用户决定,因此必须有连接建立成功的回调函数,一个连接关闭前,该如何处理,由用户决定,因此必须由关闭连接回调函数。任意事件的产生,有没有某些处理,由用户决定,因此必须有任意事件的回调函数。

功能:

  • 发送数据。给用户提供的发送数据的接口,并不是真正的发送接口,而是将数据放到发送缓冲区,然后启动写事件监控。写事件触发的回调就是将发送缓冲区内的数据发送出去。
  • 关闭连接。给用户提供的关闭连接的接口,并不是真正的关闭接口,在真正关闭之前需要看看输入输出缓冲区是否有数据待处理。
  • 启动非活跃连接的超时销毁功能。
  • 取消非活跃连接的超时销毁功能。
  • 协议切换。一个连接接收到数据后如何进行业务处理,取决于上下文和数据的业务处理回调函数。切换协议就是切换数据的处理方式,所以需要清空上下文,更改回调函数。

Connection模块类设计

我们来看一个场景:当有多个线程同时对某个连接操作时,可能出现一个线程将连接释放,另一个线程操作连接的情况,此时就会导致程序崩溃。即使每个线程对连接的操作都由EventLoop线程进行处理,但是也是会存在时序问题的,可能先释放了连接,再对连接操作。此时可以使用智能指针shared_ptr对Connection对象进行管理,这样就能保证任意一个地方对Connection对象进行操作的时候,保存了一份shared_ptr,因此就算其他地方进行了释放操作,也只是对shared_ptr的计数器-1,而不会导致Connection的实际释放。

协议上下文是记录上一次对于不完整请求处理到了哪里。

还要有一个连接状态:

  • 连接关闭状态:马上要被释放了,不能再对连接进行任何操作
  • 连接建立未完成状态:获取了连接,但是还未设置回调函数
  • 连接建立完成状态
  • 待关闭状态:需要先清空缓冲区内的数据后,才能设置为连接关闭状态

上面的很多操作都是对于连接的操作,是不能让使用者直接操作的,为了保证线程安全,必须将对连接的所有操作均放在EventLoop线程中才行,所以Connection中需要有一个EventLoop。

typedef enum
{DISCONNECTED,  // 连接关闭状态CONNECTING,    // 连接建立未完成状态CINNECTED,     // 连接建立完成状态DISCONNECTING  // 连接待关闭状态
}ConnStatu;
using PtrConnection = std::shared_ptr<Connection>;
class Connection
{using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(cosst PtrConnection&)>;
public:Connection();~Connection();// 发送数据,将数据放到发送缓冲区,启动写事件监控void Send(char *data, size_t len);// 提供给组件使用者的关闭连接接口,不会真的关闭连接,需要判断有没有数据待处理void Shutdown();// 启动非活跃销毁void EnableInactiveRelease(int sec);// 取消非活跃销毁void CancelInactiveRelease();// 协议切换 --- 重置上下文以及阶段性处理函数void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg,const ClosedCallback &closed, const AnyEventCallback &event);// 获取连接对应的文件描述符int Fd();// 获取连接IDint Id();// 连接是否处于CONNECTED状态bool Connected();// 设置上下文,连接建立完成时进行调用void SetContext(const Any &context);// 获取上下文,返回的是指针Any *GetContext();void SetConnectedCallback(const ConnectedCallback &cb);void SetMessageCallback(const MessageCallback &cb);void SetClosedCallback(const ClosedCallback &cb);void SetAnyEventCallback(const AnyEventCallback &cb);// 连接建立完成后,进行Channel回调设置,启动读事件监控,调用用户设置的连接建立完成的回调函数void Established();
private:// 对连接的操作要放到EventLoop线程中// 连接建立完成之后,进行设置(启动读监控,调用回调函数)void EstablishedInLoop();// 真正释放连接的接口void ReleaseInLoop();// 线程安全的发送数据接口void SendInLoop(char *data, size_t len);// 线程安全的关闭连接接口void ShutdownInLoop();// 线程安全的启动非活跃销毁void EnableInactiveReleaseInLoop(int sec);// 线程安全的取消非活跃销毁void CancelInactiveReleaseInLoop();// 线程安全的协议切换void UpgradeInLoop(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg,const ClosedCallback &closed, const AnyEventCallback &event);// 下面5个函数是给Channel的事件回调函数void HandlerRead();void HandlerWrite();void HandlerClose();void HandlerError();void HandlerEvent();
private:uint64_t _conn_id; // 连接的ID,便于进行管理和查找// uint64_t _timer_id; // 定时器ID,必须是唯一的,这里简化为使用_conn_id作为定时器IDint _sockfd;	   // 连接关联的文件描述符bool _enable_inactive_release; // 连接是否启动了非活跃连接销毁EventLoop *_loop;  // 连接所属的EventLoopConnStatu _statu;  // 连接状态Socket _socket;    // 套接字操作管理Channel _channel;  // 连接的事件管理Buffer _in_buffer; // 输入缓冲区 --- 存放从Socket中读取到的数据Buffer _out_buffer;// 输出缓冲区 --- 存放要发送给对端的数据Any _context;	   // 请求的接收处理上下文// 下面4个函数是服务器模块设置的,是由组件使用者设置给服务器模块的ConnectedCallback _connected_callback; // 连接建立时的回调函数MessageCallback _message_callback;	   // 接收到消息时的回调函数ClosedCallback _closed_callback;	   // 连接关闭时的回调函数AnyEventCallback _event_callback;	   // 任意事件触发时的回调函数// 服务器模块会将所有的连接管理起来,下面这个函数是当连接断开时,将自己从管理中删除CloseCallback _server_closed_callback;
}

Connection模块代码设计

class Connection;
typedef enum
{DISCONNECTED,  // 连接关闭状态CONNECTING,    // 连接建立未完成状态CONNECTED,     // 连接建立完成状态DISCONNECTING  // 连接待关闭状态
}ConnStatu;
using PtrConnection = std::shared_ptr<Connection>;
class Connection : public std::enable_shared_from_this<Connection>
{using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;
public:Connection(EventLoop *loop, uint64_t conn_id, int sockfd):_conn_id(conn_id), _sockfd(sockfd),_enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd), _channel(loop, _sockfd){_channel.SetCloseCallback(std::bind(&Connection::HandlerClose, this));_channel.SetEventCallback(std::bind(&Connection::HandlerEvent, this));_channel.SetReadCallback(std::bind(&Connection::HandlerRead, this));_channel.SetWriteCallback(std::bind(&Connection::HandlerWrite, this));_channel.SetErrorCallback(std::bind(&Connection::HandlerError, this));}~Connection() { DBG_LOG("RELEASE CONNECTION: %p", this); }// 发送数据,将数据放到发送缓冲区,启动写事件监控void Send(const char *data, size_t len){// 外界传入的data可能是一个临时的空间,我们现在只是吧发送操作压入任务池,有可能没有立即被执行// 因此有可能执行的时候,data指向的空间已经被释放了Buffer buf;buf.WriteAndPush(data, len);_loop->RunInLoop(std::bind(&Connection::SendInLoop, this, buf));}// 提供给组件使用者的关闭连接接口,不会真的关闭连接,需要判断有没有数据待处理void Shutdown(){_loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this));}// 启动非活跃销毁void EnableInactiveRelease(int sec){_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));}// 取消非活跃销毁void CancelInactiveRelease(){_loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this));}// 协议切换 --- 重置上下文以及阶段性处理函数void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg,const ClosedCallback &closed, const AnyEventCallback &event){_loop->AssertInLoop();_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event));}// 获取连接对应的文件描述符int Fd() { return _sockfd; }// 获取连接IDint Id() { return _conn_id; }// 连接是否处于CONNECTED状态bool Connected() { return (_statu == CONNECTED); }// 设置上下文,连接建立完成时进行调用void SetContext(const Any &context) { _context = context; }// 获取上下文,返回的是指针Any *GetContext() { return &_context; }void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }void SetSrvClosedCallback(const ClosedCallback &cb) { _server_closed_callback = cb; }// 连接建立完成后,进行Channel回调设置,启动读事件监控,调用用户设置的连接建立完成的回调函数void Established(){_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));}
private:// 对连接的操作要放到EventLoop线程中// 连接建立完成之后,进行设置(启动读监控, 调用回调函数)void EstablishedInLoop(){// 1. 修改连接状态,并且当前连接状态必须是连接未完成状态assert(_statu == CONNECTING);_statu = CONNECTED;// 2. 启动读事件监控_channel.EnableRead();if(_connected_callback) _connected_callback(shared_from_this());}// 真正释放连接的接口void ReleaseInLoop(){// 1. 修改连接状态,将其置为DISCONNECTED_statu = DISCONNECTED;// 2. 移除连接的事件监控_channel.Remove();// 3. 关闭文件描述符_socket.Close();// 4. 如果当前定时器队列中还有定时销毁任务,则取消任务if(_loop->HasTimer(_conn_id)) CancelInactiveReleaseInLoop();// 5. 调用连接关闭回调函数,避免先移除服务器管理的连接信息导致Connection被释放,再去处理会出错if(_closed_callback) _closed_callback(shared_from_this());// 6. 移除服务器模块对连接的管理if(_server_closed_callback) _server_closed_callback(shared_from_this());}// 线程安全的发送数据接口,并不是真正的发送接口,只是将数据放到了发送缓冲区,启动了可写事件监控void SendInLoop(Buffer buf){if(_statu == DISCONNECTED) return ;_out_buffer.WriteBufferAndPush(buf);if(_channel.WriteAble() == false){_channel.EnableWrite();}}// 线程安全的关闭连接接口,并不是真正的关闭连接接口void ShutdownInLoop(){_statu = DISCONNECTING;// 如果输入缓冲区有数据,将数据处理了,这里可能会因为数据不完整,上层不处理,这里我们直接不管if(_in_buffer.ReadAbleSize() > 0){if(_message_callback) _message_callback(shared_from_this(), &_in_buffer);}// 如果输出缓冲区还有数据,清空,然后调用真正关闭连接的接口释放连接if(_out_buffer.ReadAbleSize() > 0){if(_channel.WriteAble() == false) _channel.EnableWrite();}if(_out_buffer.ReadAbleSize() == 0){ReleaseInLoop();}}// 线程安全的启动非活跃销毁void EnableInactiveReleaseInLoop(int sec){// 1. 将_enable_inactive_release设置为true_enable_inactive_release = true;// 2. 如果当前定时器的销毁任务已经存在,那就延迟执行if(_loop->HasTimer(_conn_id)){return _loop->TimerRefresh(_conn_id);}// 3. 如果不存在定时销毁任务,则新增_loop->TimerAdd(_conn_id, sec, std::bind(&Connection::ReleaseInLoop, this));}// 线程安全的取消非活跃销毁void CancelInactiveReleaseInLoop(){_enable_inactive_release = false;if(_loop->HasTimer(_conn_id)){_loop->TimerCancel(_conn_id);}}// 线程安全的协议切换void UpgradeInLoop(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg,const ClosedCallback &closed, const AnyEventCallback &event){_context = context;_connected_callback = conn;_message_callback = msg;_closed_callback = closed;_event_callback = event;}// 下面5个函数是给Channel的事件回调函数void HandlerRead(){// 1. 接收socket的数据,放到缓冲区char buf[65536];ssize_t ret = _socket.NonBlockRecv(buf, 65535);if(ret < 0){// 出错了,直接关闭连接return ShutdownInLoop();}_in_buffer.WriteAndPush(buf, ret);// 2. 调用message_callback进行业务处理if(_in_buffer.ReadAbleSize() > 0){return _message_callback(shared_from_this(), &_in_buffer);}}void HandlerWrite(){// _out_buffer中保存的数据就算要发送的数据ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());if(ret < 0){// 发送错误就关闭连接if(_in_buffer.ReadAbleSize() > 0){_message_callback(shared_from_this(), &_in_buffer);}return ReleaseInLoop();  // 释放连接}_out_buffer.MoveReadOffset(ret);// 如果输出缓冲区没有数据了,就关闭写事件监控if(_out_buffer.ReadAbleSize() == 0){_channel.DisableWrite();// 若此时连接是待关闭状态,并且现在输出缓冲区已经没有数据了,直接关闭连接if(_statu == DISCONNECTING){return ReleaseInLoop();}}}void HandlerClose(){// 一旦连接挂断,套接字就什么都干不了了,因此有数据待处理就处理一下,完毕直接关闭连接if(_in_buffer.ReadAbleSize() > 0){_message_callback(shared_from_this(), &_in_buffer);}return ReleaseInLoop();}void HandlerError(){return HandlerClose();}void HandlerEvent(){// 1. 刷新连接的活跃度,延迟定时销毁任务if(_enable_inactive_release == true) _loop->TimerRefresh(_conn_id);// 2. 调用组件使用者设置的任意事件回调函数if(_event_callback) _event_callback(shared_from_this());}
private:uint64_t _conn_id; // 连接的ID,便于进行管理和查找// uint64_t _timer_id; // 定时器ID,必须是唯一的,这里简化为使用_conn_id作为定时器IDint _sockfd;	   // 连接关联的文件描述符bool _enable_inactive_release; // 连接是否启动了非活跃连接销毁EventLoop *_loop;  // 连接所属的EventLoopConnStatu _statu;  // 连接状态Socket _socket;    // 套接字操作管理Channel _channel;  // 连接的事件管理Buffer _in_buffer; // 输入缓冲区 --- 存放从Socket中读取到的数据Buffer _out_buffer;// 输出缓冲区 --- 存放要发送给对端的数据Any _context;	   // 请求的接收处理上下文// 下面4个函数是服务器模块设置的,是由组件使用者设置给服务器模块的,再由服务器模块设置给ConnectionConnectedCallback _connected_callback; // 连接建立时的回调函数MessageCallback _message_callback;	   // 接收到消息时的回调函数ClosedCallback _closed_callback;	   // 连接关闭时的回调函数AnyEventCallback _event_callback;	   // 任意事件触发时的回调函数// 服务器模块会将所有的连接管理起来,下面这个函数是当连接断开时,将自己从管理中删除ClosedCallback _server_closed_callback;
};

这里说一下代码中几个需要注意的点:

  • 连接内部要将自身作为一个shared_ptr传递,可以使用shared_from_this,为了使用shared_from_this,要让Connection类继承于一个模板类
  • 在HandlerRead中,读取失败不会直接关闭连接,因为读取数据的时间可能比较长,我们需要先读取数据,再刷新连接的活跃度,而若是刷新连接活跃度时连接已经关闭,则会出错,所以我们在读取失败时不能直接关闭连接
  • Upgrade是必须由EventLoop线程进行调用的,因为如果是其他线程调用,会将切换协议的任务放入任务池中,如果此时触发了事件,而协议切换的任务还没有被执行,此时就会以原来的协议进行处理,只有在EventLoop内部执行才能够立即执行任务。所以,我们给EventLoop增加一个成员函数,保证调用接口的是EventLoop所在线程。
// 保证调用接口的是EventLoop线程
void AssertInLoop()
{assert(_thread_id == std::this_thread::get_id());
}

我们来进行测试,服务端:

// 管理所有的连接
std::unordered_map<uint64_t, PtrConnection> _conns;
uint64_t conn_id = 0;
// 连接销毁时的回调函数
void ConnectionDestory(const PtrConnection &conn)
{_conns.erase(conn->Id());
}
// 连接建立时的回调函数
void OnConnected(const PtrConnection &conn)
{DBG_LOG("NEW CONNECTING: %p", conn.get());
}
// 接收到消息时的回调函数
void OnMessage(const PtrConnection &conn, Buffer *buf)
{DBG_LOG("%s", buf->ReadPosition());buf->MoveReadOffset(buf->ReadAbleSize());std::string str = "今天天气不错!!!";conn->Send(str.c_str(), str.size());
}// 对于监听套接字读事件触发的回调函数
void Acceptor(EventLoop *loop, Channel *lst_channel)
{// 获取新连接int fd = lst_channel->Fd();int newfd = accept(fd, nullptr, nullptr);if(newfd < 0) return ;// 为新连接封装Connectionconn_id ++;PtrConnection conn(new Connection(loop, conn_id, newfd));conn->SetMessageCallback(std::bind(OnMessage, std::placeholders::_1, std::placeholders::_2));conn->SetSrvClosedCallback(std::bind(ConnectionDestory, std::placeholders::_1));conn->SetConnectedCallback(std::bind(OnConnected, std::placeholders::_1));conn->EnableInactiveRelease(10); // 启动非活跃超时销毁conn->Established();  // 就绪初始化_conns.insert(std::make_pair(conn_id, conn));
}int main()
{EventLoop loop;Socket lst_sock;lst_sock.CreateServer(8080);// 为监听套接字创建一个ChannelChannel channel(&loop, lst_sock.Fd());channel.SetReadCallback(std::bind(Acceptor, &loop, &channel));// 启动可读事件监控channel.EnableRead();while(true){// 开始事件监控,获取事件就绪的连接,并进行处理loop.Start();}lst_sock.Close();return 0;
}

获取到一个新连接之后,不再使用Channel,而是使用Connection对新连接进行管理。当监听套接字触发了可读事件后,就获取新连接,将文件描述符封装成一个connection,并使用智能指针管理起来。设置Connection的业务处理函数、连接销毁回调函数、连接就绪后的会回调函数。

客户端:

int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");for(int i = 0;i < 5;i ++){std::string str = "今天天气怎么样?";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};cli_sock.Recv(buf, 1023);DBG_LOG("%s", buf);sleep(1);}while(1) sleep(1);return 0;
}

功能是正常的。

Acceptor模块

前面的Connection模块是对普通套接字的管理。Acceptor模块是对监听套接字进行管理的模块,是对Socket模块和Channel模块的整合。需要有以下功能:

  • 创建一个监听套接字
  • 启动读事件监控
  • 事件触发后,获取新连接
  • 调用新连接获取成功后的回调函数。

新连接获取成功后的回调函数的内容是为新连接创建Connection进行管理,这个函数由服务器模块进行设置。因为Acceptor模块只进行监听连接进行管理,获取到新连接的文件描述符之后,对于新连接的文件描述符要如何处理并不关心。对于新连接如何处理,是服务器模块来管理的。未来服务器模块实现一个对于新连接的文件描述符的处理函数,将这个函数设置给Acceptor模块。

class Acceptor
{using AcceptCallback = std::function<void(int)>;
public:// 不能将启动事件监控放到构造函数中,必须设置了回调函数之后,再启动事件监控// 否则可能造成启动事件监控后,立即有事件,处理的时候,回调函数还未设置Acceptor(EventLoop *loop, int port):_socket(CreateServer(port)),_loop(loop),_channel(loop, _socket.Fd()){_channel.SetReadCallback(std::bind(&Acceptor::HandlerRead, this));}void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; }// 开始监听,其实就是启动对监听套接字的读事件监控void Listen() { _channel.EnableRead(); }
private:// 监听套接字读事件触发时的回调函数void HandlerRead(){int newfd = _socket.Accept();if(newfd < 0) return ;if(_accept_callback) _accept_callback(newfd);}int CreateServer(int port){bool ret = _socket.CreateServer(port);assert(ret == true);return _socket.Fd();}
private:Socket _socket;   // 用于创建监听套接字EventLoop *_loop; // 用于对监听套接字进行事件监控Channel _channel; // 用于对监听套接字进行事件管理AcceptCallback _accept_callback; // 接收到新连接时的回调函数
};

修改一下EventLoop的Start,让其在内部即可进行循环。

// 启动EventLoop
void Start()
{while(true){// 1. 事件监控std::vector<Channel*> actives;_poller.Poll(&actives);// 2. 事件处理for(auto &channel : actives){channel->HandleEvent();} // 3. 执行任务RunAllTask();}
}

测试一下Acceptor模块,服务端:

// 管理所有的连接
std::unordered_map<uint64_t, PtrConnection> _conns;
uint64_t conn_id = 0;
EventLoop loop;// 连接销毁时的回调函数
void ConnectionDestory(const PtrConnection &conn)
{_conns.erase(conn->Id());
}
// 连接建立时的回调函数
void OnConnected(const PtrConnection &conn)
{DBG_LOG("NEW CONNECTING: %p", conn.get());
}
// 接收到消息时的回调函数
void OnMessage(const PtrConnection &conn, Buffer *buf)
{DBG_LOG("%s", buf->ReadPosition());buf->MoveReadOffset(buf->ReadAbleSize());std::string str = "今天天气不错!!!";conn->Send(str.c_str(), str.size());
}// 对于监听套接字读事件触发的回调函数
void NewConnection(int fd)
{// 为新连接封装Connectionconn_id ++;PtrConnection conn(new Connection(&loop, conn_id, fd));conn->SetMessageCallback(std::bind(OnMessage, std::placeholders::_1, std::placeholders::_2));conn->SetSrvClosedCallback(std::bind(ConnectionDestory, std::placeholders::_1));conn->SetConnectedCallback(std::bind(OnConnected, std::placeholders::_1));conn->EnableInactiveRelease(10); // 启动非活跃超时销毁conn->Established();  // 就绪初始化_conns.insert(std::make_pair(conn_id, conn));
}int main()
{Acceptor acceptor(&loop, 8080);acceptor.SetAcceptCallback(std::bind(NewConnection, std::placeholders::_1));acceptor.Listen();loop.Start();return 0;
}

未来在服务器模块会有一个EventLoop线程池,Acceptor是在主EventLoop线程中的,这里我们将EventLoop定义为全局变量。

客户端:

int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");for(int i = 0;i < 5;i ++){std::string str = "今天天气怎么样?";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};cli_sock.Recv(buf, 1023);DBG_LOG("%s", buf);sleep(1);}while(1) sleep(1);return 0;
}

可以正常进行通信。

LoopThread模块 

我们前面说过,一个线程一个EventLoop,而前面吗实现的是EventLoop的功能模块,并没有将EventLoop与线程进行关联。LoopThread模块就是将EventLoop与线程整合起来的模块。

EventLoop模块与线程是一一对应的,EventLoop在构造函数中会初始化_thread_id,后边运行一个操作时,判断执行操作的线程是否是EventLoop模块对应的线程,就算将线程ID与EventLoopm对象中的_thread_id进行比较,相同表示在同一个线程中,不同则表示不在同一个线程中。也就是说,在实例化EventLoop对象时,必须在线程内部,否则在构造函数中初始化线程ID时就会出错。

此时可能会想到,可以先创建多个EventLoop,然后再创建多个线程,将这些线程的ID设置进EventLoop中,这样是不行的,因为在构造EventLoop对象,到设置线程ID的过程中是不可控的。所以,我们必须先创建出线程,然后在线程的入口函数中去实例化EventLoop对象。

所以,我们构造一个新的模块LoopThread,这个模块将EventLoop与thread整合到一起。

思想:

  • 创建线程
  • 在线程中实例化EventLoop对象

功能:可以向外部返回实例化的EventLoop

当创建了线程,还没来得及实例化EventLoop对象时,就获取EventLoop对象是会出错的,所以使用锁和条件变量来进行同步。

class LoopThread
{
public:// 创建线程,设定线程的入口函数LoopThread():_loop(nullptr),_thread(std::thread(&LoopThread::ThreadEntry, this)){}// 返回当前线程关联的EventLoop对象指针EventLoop *GetLoop(){EventLoop *loop = nullptr;{std::unique_lock<std::mutex> lock(_mutex);// 条件变量,只要_loop为空时就一直等待_cond.wait(lock, [&](){ return _loop != nullptr; });loop = _loop;}return loop;}
private:// 线程入口函数// 实例化EventLoop对象,并运行EventLoop模块的功能void ThreadEntry(){// 线程的生命周期是与ThreadEntry的生命周期相同的// 所以,这里不使用new,而是定义类对象,让_loop指向该对象,是为了让_loop的生命周期随进程EventLoop loop;{std::unique_lock<std::mutex> lock(_mutex);_loop = &loop;_cond.notify_all();}loop.Start();}
private:std::mutex _mutex;				// 互斥锁std::condition_variable _cond;	// 条件变量EventLoop *_loop;				// EventLoop指针变量,这个对象需要在线程内实例化std::thread _thread; 			// EventLoop对应的线程
};

现在来测试一下,服务端:

// 管理所有的连接
std::unordered_map<uint64_t, PtrConnection> _conns;
uint64_t conn_id = 0;
EventLoop base_loop;  // 主线程的EventLoop
// 现在有两个从属Reactor线程,主Reactor获取新连接后,就可以选择性地将新连接分配给不同的从属Reactor线程
std::vector<LoopThread> threads(2);
int next_loop = 0;    // 下一次将新连接分配给哪一个从属Reactor线程// 连接销毁时的回调函数
void ConnectionDestory(const PtrConnection &conn)
{_conns.erase(conn->Id());
}
// 连接建立时的回调函数
void OnConnected(const PtrConnection &conn)
{DBG_LOG("NEW CONNECTING: %p", conn.get());
}
// 接收到消息时的回调函数
void OnMessage(const PtrConnection &conn, Buffer *buf)
{DBG_LOG("%s", buf->ReadPosition());buf->MoveReadOffset(buf->ReadAbleSize());std::string str = "今天天气不错!!!";conn->Send(str.c_str(), str.size());
}// 对于监听套接字读事件触发的回调函数
void NewConnection(int fd)
{// 为新连接封装Connectionconn_id ++;next_loop = (next_loop + 1) % 2;PtrConnection conn(new Connection(threads[next_loop].GetLoop(), conn_id, fd));conn->SetMessageCallback(std::bind(OnMessage, std::placeholders::_1, std::placeholders::_2));conn->SetSrvClosedCallback(std::bind(ConnectionDestory, std::placeholders::_1));conn->SetConnectedCallback(std::bind(OnConnected, std::placeholders::_1));conn->EnableInactiveRelease(10); // 启动非活跃超时销毁conn->Established();  // 就绪初始化_conns.insert(std::make_pair(conn_id, conn));DBG_LOG("NEW CONNECTION!!!");
}int main()
{// 主线程只负责监听新连接Acceptor acceptor(&base_loop, 8080);acceptor.SetAcceptCallback(std::bind(NewConnection, std::placeholders::_1));acceptor.Listen();base_loop.Start();return 0;
}

客户端:

int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");for(int i = 0;i < 5;i ++){std::string str = "今天天气怎么样?";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};cli_sock.Recv(buf, 1023);DBG_LOG("%s", buf);sleep(1);}while(1) sleep(1);return 0;
}

可以看到,线程是不同的。

LoopThreadPool模块

LoopThreadPool模块是针对LoopThread设计一个线程池,对所有的LoopThread进行管理及分配。

功能:

  • 线程数量可配置(0个或多个)。当线程数量为多个时,主Reactor线程只负责获取新连接,从属Reactor线程负责对新连接进行事件监控及处理。当线程数量为0个时,只有一个Reactor线程,这个线程既要负责获取新连接,也要负责对新连接进行事件监控以处理。
  • 对所有的线程进行管理,其实就是管理0个或多个LoopThread对象。
  • 提供线程分配功能。当主线程获取到一个新连接,需要将新连接挂到从属线程上进行事件监控及处理。
class LoopThreadPool
{
public:LoopThreadPool(EventLoop *baseloop):_thread_count(0),_next_id(0),_baseloop(baseloop){}// 设置线程数量void SetThreadCount(int count) { _thread_count = count; }// 创建所有的从属Reactor线程void Create(){if(_thread_count > 0){_threads.resize(_thread_count);_loops.resize(_thread_count);for(int i = 0;i < _thread_count;i ++){_threads[i] = new LoopThread();_loops[i] = _threads[i]->GetLoop();}}}// 获取下一个新连接分配给哪一个从属Reactor线程EventLoop* NextLoop(){if(_thread_count == 0){return _baseloop;}_next_id = (_next_id + 1) % _thread_count;return _loops[_next_id];}
private:int _thread_count; 	 	  		   // 从属Reactor线程的数量int _next_id; 	  	 	   		   // 下一次将新连接分配给哪一个从属Reactor线程EventLoop *_baseloop; 	  		   // 主Reactor线程中的EventLoopstd::vector<LoopThread*> _threads; // 从属Reactor线程的数量std::vector<EventLoop*> _loops;    // 从属Reactor线程中的EventLoop
};

测试一下,服务端:

// 管理所有的连接
std::unordered_map<uint64_t, PtrConnection> _conns;
uint64_t conn_id = 0;
EventLoop base_loop;  // 主线程的EventLoop
LoopThreadPool *loop_pool;// 连接销毁时的回调函数
void ConnectionDestory(const PtrConnection &conn)
{_conns.erase(conn->Id());
}
// 连接建立时的回调函数
void OnConnected(const PtrConnection &conn)
{DBG_LOG("NEW CONNECTING: %p", conn.get());
}
// 接收到消息时的回调函数
void OnMessage(const PtrConnection &conn, Buffer *buf)
{DBG_LOG("%s", buf->ReadPosition());buf->MoveReadOffset(buf->ReadAbleSize());std::string str = "今天天气不错!!!";conn->Send(str.c_str(), str.size());
}// 对于监听套接字读事件触发的回调函数
void NewConnection(int fd)
{// 为新连接封装Connectionconn_id ++;PtrConnection conn(new Connection(loop_pool->NextLoop(), conn_id, fd));conn->SetMessageCallback(std::bind(OnMessage, std::placeholders::_1, std::placeholders::_2));conn->SetSrvClosedCallback(std::bind(ConnectionDestory, std::placeholders::_1));conn->SetConnectedCallback(std::bind(OnConnected, std::placeholders::_1));conn->EnableInactiveRelease(10); // 启动非活跃超时销毁conn->Established();  // 就绪初始化_conns.insert(std::make_pair(conn_id, conn));DBG_LOG("NEW CONNECTION!!!");
}int main()
{// 主线程只负责监听新连接loop_pool = new LoopThreadPool(&base_loop);loop_pool->SetThreadCount(2);loop_pool->Create();Acceptor acceptor(&base_loop, 8080);acceptor.SetAcceptCallback(std::bind(NewConnection, std::placeholders::_1));acceptor.Listen();base_loop.Start();return 0;
}

客户端:

int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");for(int i = 0;i < 5;i ++){std::string str = "今天天气怎么样?";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};cli_sock.Recv(buf, 1023);DBG_LOG("%s", buf);sleep(1);}while(1) sleep(1);return 0;
}

TcpServer模块

TcpServer模块是对所有模块的整合,通过TcpServer模块实例化对象,可以非常简单地完成一个服务器的搭建。

管理:

  • Acceptor对象,创建一个监听套接字。
  • EventLoop对象,实现对监听套接字的事件监控。
  • std::unordered_map<uint64_t, PtrConnection> _conns,实现对所有连接的管理。
  • LoopThreadPool对象,对所有连接进行事件监控及处理。

功能:

  • 设置从属Reactor线程数量
  • 启动服务器
  • 设置各种回调函数,用户设置给TcpServer,TcpServer再设置给获取的新连接,所以Connection有什么回调函数,这里就有什么回调函数。
  • 是否启动非活跃连接超时销毁功能
  • 添加定时任务功能

流程:

  • 在TcpServer中实例化一个Acceptor对象,以及一个EventLoop对象baseloop
  • 将Acceptor对象挂到baseloop上,并启动读事件监控
  • 一旦Acceptor对象触发了可读事件,则调用读事件回调函数获取新连接
  • 在这个回调函数中,就会将新连接封装成一个Connection,并设置各种回调函数,启动Connection的非活跃连接的超时销毁规则,将Connection挂到LoopThreadPool中的从属Reactor线程中进行事件监控及处理
  • 当Connection对应的连接就绪了可读事件,则这时候执行读事件回调函数,读取数据,读取完毕后调用TcpServer设置的回调函数进行业务处理。
class TcpServer
{using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;using Functor = std::function<void()>;
public:TcpServer(int port):_port(port),_next_id(0),_enable_inactive_release(false),_acceptor(&_baseloop, port),_pool(&_baseloop){_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));_acceptor.Listen();}void SetThreadCount(int count) { _pool.SetThreadCount(count); }void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }// 启动非活跃连接超时销毁功能void EnableInactiveRelease(int timeout){_timeout = timeout;_enable_inactive_release = true;}// 添加定时任务,指定秒数之后执行一个任务void RunAfter(const Functor &task, int delay){_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));}// 启动服务器void Start(){_pool.Create();_baseloop.Start();}
private:// 为新连接构造一个Connection进行管理void NewConnection(int fd){_next_id ++;PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));conn->SetMessageCallback(_message_callback);conn->SetClosedCallback(_connected_callback);conn->SetConnectedCallback(_connected_callback);conn->SetAnyEventCallback(_event_callback);conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));if(_enable_inactive_release) conn->EnableInactiveRelease(_timeout);conn->Established();_conns.insert(std::make_pair(_next_id, conn));}// 从_conns中移除一个Connectionvoid RemoveConnection(const PtrConnection &conn){_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));}// 线程安全地添加一个定时任务void RunAfterInLoop(const Functor &task, int delay){_next_id ++;_baseloop.TimerAdd(_next_id, delay, task);}// 线程安全地从_conns中移除一个Connectionvoid RemoveConnectionInLoop(const PtrConnection &conn){int id = conn->Id();auto it = _conns.find(id);if(it != _conns.end()){_conns.erase(it);}}
private:int _port;					   // 服务器监听的端口号uint64_t _next_id; 			   // 自动增长的ID,连接ID和定时任务ID都使用它int _timeout;				   // 非活跃连接的定义时间bool _enable_inactive_release; // 是否启动了非活跃连接超时销毁的判断标志EventLoop _baseloop; 		   // 主Reactor线程中的EventLoopAcceptor _acceptor;			   // 监听套接字LoopThreadPool _pool; 		   // 从属Reactor// 保存服务器的所有连接对应的shared_ptr对象std::unordered_map<uint64_t, PtrConnection> _conns; ConnectedCallback _connected_callback; // 连接建立时的回调函数MessageCallback _message_callback;	   // 接收到消息时的回调函数ClosedCallback _closed_callback;	   // 连接关闭时的回调函数AnyEventCallback _event_callback;	   // 任意事件触发时的回调函数
};

现在,要搭建一个服务器只需要实例化一个TcpServer对象,并设置好回调函数,然后Start即可。

我们来进行测试,服务端:

void OnConnected(const PtrConnection &conn)
{DBG_LOG("NEW CONNECTION: %p", conn.get());
}void OnClosed(const PtrConnection &conn)
{DBG_LOG("CLOSE CONNECTION: %p", conn.get());
}void OnMessage(const PtrConnection &conn, Buffer *buf)
{DBG_LOG("%s", buf->ReadPosition());buf->MoveReadOffset(buf->ReadAbleSize());std::string str = "今天天气很好!!!";conn->Send(str.c_str(), str.size());conn->Shutdown();
}int main()
{TcpServer server(8080);server.SetThreadCount(2);server.EnableInactiveRelease(10);server.SetClosedCallback(OnClosed);server.SetConnectedCallback(OnConnected);server.SetMessageCallback(OnMessage);server.Start();return 0;
}

客户端:

int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");for(int i = 0;i < 5;i ++){std::string str = "今天天气怎么样?";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};cli_sock.Recv(buf, 1023);DBG_LOG("%s", buf);sleep(1);}while(1) sleep(1);return 0;
}

基于TcpServer实现回显服务器

class EchoServer
{
public:EchoServer(int port):_server(port){_server.SetThreadCount(2);_server.EnableInactiveRelease(10);_server.SetClosedCallback(std::bind(&EchoServer::OnClosed, this, std::placeholders::_1));_server.SetConnectedCallback(std::bind(&EchoServer::OnConnected, this, std::placeholders::_1));_server.SetMessageCallback(std::bind(&EchoServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));}void Start() { _server.Start(); }
private:void OnConnected(const PtrConnection &conn){DBG_LOG("NEW CONNECTION: %p", conn.get());}void OnClosed(const PtrConnection &conn){DBG_LOG("CLOSE CONNECTION: %p", conn.get());}void OnMessage(const PtrConnection &conn, Buffer *buf){conn->Send(buf->ReadPosition(), buf->ReadAbleSize());buf->MoveReadOffset(buf->ReadAbleSize());conn->Shutdown();}
private:TcpServer _server;
};int main()
{EchoServer server(8080);server.Start();return 0;
}

我们对EchoServer进行性能测试。我们使用WebBench进行性能测试。WebBench是在程序当中创建指定数量的进程,是测试HTTP服务器的工具。-c选项是指定客户端数量,其实就算进程的数量;-t选项是指定测试的持续时间,单位是秒。

可以看到,在500个客户端的情况下,每分钟可以处理91526个请求,每秒可以处理100676个字节。当然,服务器和测试程序在一台云服务器上,会存在竞争资源的情况,肯定是不规范的,这里只是做一个简单的测试。

服务器模块的总结

服务器模块是一个主从Reactor模型,主Reactor线程负责对监听套接字进行监控和处理,从属Reactor线程负责对普通套接字进行监控和处理。对套接字进行监控依赖的是EventLoop模块,一个线程一个EventLoop,对于监听套接字,需要监控它的读事件,当读事件触发,说明有新连接到来了,获取新连接的文件描述符,封装成一个Connection,设置好回调方法,选择一个从属Reactor线程,将Connection交给这个线程的EventLoop进行监控。对于普通套接字,同样是监控它的读事件,当读事件触发,说明客户端发送了消息,将消息从连接对应的文件描述符的输入缓冲区读取到Connection的输入缓冲区中,调用消息处理回调函数,将处理完成的数据放到Connection的输出缓冲区中,启动写事件监控,当写事件监控触发了,就将Connection的输出缓冲区的数据拷贝到连接对应的文件描述符的输出缓冲区中。

Buffer模块:Buffer模块是一个缓冲区模块,对于客户端发送过来的数据,可以暂时进行保存,后序再处理,对于处理完成后的数据,要以响应的形式发送给客户端,也可以暂时先保存在缓冲区中,等待写事件触发后,再进行发送。

Socket模块

Channel模块:Channel模块是对连接对应的文件描述符的管理。表示对一个连接要监控它的什么事件,一般是监控它的读事件或者写事件。并且要设置当这个文件描述符的某个事件触发时,要如何处理,所以提供了一些设置回调函数的接口。有可读事件触发回调函数、可写事件触发回调函数、任意事件触发回调函数、错误事件触发回调函数、连接挂断事件触发回调函数。

Connection模块:是对连接进行管理的模块。由Buffer模块、Socket模块、Channel模块组合而成。Connection中有一个输入缓冲区和一个输出缓冲区,用于保存客户端发送过来的数据和要发送给客户端的数据。对于套接字的操作由Socket模块完成。对于这个连接要监控什么事件,以及事件触发后要如何处理,由Socket模块完成。所以,Channel模块的5个回调函数是由Connection模块设置的。同样,Connection也有自己的回调函数,连接建立时的回调函数、连接关闭时的回调函数、接收到消息时的回调函数、任意事件触发时的回调函数。Channel和Connection都有回调函数,这些回调函数之间有什么关系呢?Channel的回调函数是由Connection设置的,而Connection的回调函数是由TcpServer设置的,TcpServer的回调函数是由用户设置的。所以,Channel的回调函数是对底层数据的处理,而Connection的回调函数是业务的处理。例如,Channel的可读事件回调函数,是将客户端发送过来的数据放到了Connection的输入缓冲区中,然后调用Connection的接收到消息时的回调函数进行业务处理。

所以,组件使用者在使用TcpServer搭建服务器时,设置的回调函数就是业务处理回调函数,这里面的回调函数最重要的就算接收到消息时的回调函数,因为对于消息要如何处理,只有使用者才知道。

http://www.dtcms.com/a/447712.html

相关文章:

  • 专业网站开发设计深圳网站制作必找祥奔科技
  • 企业网站包括哪些wordpress能不能做管理系统
  • Linux使用kprobes跟踪内核函数
  • 公司网站优化哪家好做全屏网站图片显示不全
  • 春节网页设计素材重庆百度快照优化
  • 自建网站套现海外贸易在什么网站做
  • 义乌企业网站客户打不开网站
  • 中文网站开发工具wordpress文章首页设置
  • 企业网站建设计什么科目中国施工企业协会官网
  • 用爱站工具包如何做网站地图毕业ppt模板免费下载
  • logo设计网站官网wordpress link
  • 建立网站接受投注是什么意思做废铝的关注哪个网站好
  • 无极app定制开发公司网站模板三明市住房与建设局网站
  • 门户网站建设工作方案网页设计公司济南兴田德润优惠吗
  • 泰州专一做淘宝网站网络营销是什么工作主要干啥
  • 做网站是先做后台还是前端wordpress 培训
  • 怎样做企业学校网站本地wordpress如何传到服务器上
  • 百度站长联盟微信电商怎样开店
  • dw用设计视图做网站视频素材库网站下载
  • 做肝病科网站wordpress导航栏的文件在哪
  • 网站的发布方案有哪些免费大型网站
  • 网站开发信息文档宁波做网站首推荣盛网络
  • Redis-用户签到(BitMap)
  • 网站建设人力成本费用企业专业建站
  • PCIe协议之均衡篇之 3-TAP Coefficients的理解(一)
  • 西宁网站维护公司如何使用wordpress
  • 网站图片缩略图深圳住 建设局网站
  • 烟台芝罘区住房建设局网站程序员开源网站
  • 锟鹏建设招聘网站网站设计营销
  • 无锡网站建设兼职开发公司审计稽查的内容