【项目】基于One Thread One Loop模型的高性能网络库实现 - 服务器模块实现
目录
日志宏定义
Buffer模块
Buffer缓冲区设计思想
Buffer缓冲区代码实现
基本功能
协议支持
Socket模块
Channel模块
Poller模块
Poller类编写
Poller与Channel整合
EventLoop模块
eventfd认识
EventLoop模块设计思想
EventLoop代码设计
TimerWheel定时器
TimerWheel与EventLoop整合
Connection模块
Connection模块功能思想
Connection模块类设计
Connection模块代码设计
Acceptor模块
LoopThread模块
LoopThreadPool模块
TcpServer模块
基于TcpServer实现回显服务器
服务器模块的总结
日志宏定义
#define INF 0
#define DBG 1
#define ERR 2
#define LOG_LEVEL DBG#define LOG(level, format, ...) do{\if (level < LOG_LEVEL) break;\time_t t = time(NULL);\struct tm *ltm = localtime(&t);\char tmp[32] = {0};\strftime(tmp, 31, "%H:%M:%S", ltm);\fprintf(stdout, "[%p %s %s:%d] " format "\n", (void*)pthread_self(), tmp, __FILE__, __LINE__, ##__VA_ARGS__);\}while(0)#define INF_LOG(format, ...) LOG(INF, format, ##__VA_ARGS__)
#define DBG_LOG(format, ...) LOG(DBG, format, ##__VA_ARGS__)
#define ERR_LOG(format, ...) LOG(ERR, format, ##__VA_ARGS__)
Buffer模块
Buffer缓冲区设计思想
Buffer是缓冲区模块,需要进行存储数据和取出数据。需要有一块内存空间,在这里我们采用vector<char>。因为使用的是vector,所以我们不能对数据进行频繁地挪动,所以我们需要有以下几个数据:
- 默认空间大小
- 当前的读取数据位置
- 当前的写入数据位置
主要的操作有写入数据和读取数据。对于写入数据:
- 写入的数据大小小于等于写入位置到末尾的距离,直接写入
- 写入的数据大小大于写入位置到末尾的距离,但小于等于写入位置到末尾的距离 + 起始到写入位置的距离,先挪动数据再写入
- 写入的数据大小大于写入位置到末尾的距离 + 起始到写入位置的距离,扩容
- 写入完成之后,写入位置要向后偏移,会发现,因为写入数据之前都需要先保证后面的空间足够才会进行写入,不够则会进行偏移或扩容,所以写偏移永远只会向后移动
对于读取数据,只要缓冲区存有的数据大于读取的数据即可。
Buffer缓冲区代码实现
基本功能
基于上面的设计,缓冲区需要实现下面的接口:
#define BUFFER_DEFAULT_SIZE 1024
class Buffer
{
public:Buffer():_reader_idx(0), _writer_idx(0), _buffer(BUFFER_DEFAULT_SIZE) {}// 获取vector的起始地址char *Begin() { return &*_buffer.begin(); }// 获取当前写入起始地址char *WritePosition() { return Begin() + _writer_idx; }// 获取当前读取起始地址char *ReadPosition() { return Begin() + _reader_idx; }// 获取缓冲区末尾空闲空间大小 - 写偏移之后的空闲空间uint64_t TailIdleSize() {return _buffer.size() - _writer_idx; }// 获取缓冲区起始空闲空间大小 - 读偏移之前的空闲空间uint64_t HeadIdleSize() { return _reader_idx; }// 获取可读数据大小uint64_t ReadAbleSize() {return _writer_idx - _reader_idx; }// 将读偏移向后移动void MoveReadOffset(uint64_t len){if(len == 0) return ;// 读偏移向后移动的距离一定要小于可读数据大小assert(len <= ReadAbleSize());_reader_idx += len;}// 将写偏移向后移动void MoveWriteOffset(uint64_t len){// 写偏移向后移动的距离一定要小于后边空闲空间的大小assert(len <= TailIdleSize());_writer_idx += len;// 因为写入数据之前都需要先保证后面的空间足够才会进行写入// 不够则会进行偏移或扩容,所以写偏移永远只会向后移动}// 确保可写空间足够void EnsureWriteSpace(uint64_t len){// 如果末尾空闲空间大小足够,直接返回if(TailIdleSize() >= len) return;// 末尾空间不够,则判断加上起始的空闲空间是否足够if(len <= TailIdleSize() + HeadIdleSize()){// 足够则将数据移动到起始位置uint64_t rsz = ReadAbleSize(); // 保存数据大小std::copy(ReadPosition(), ReadPosition() + rsz, Begin()); // 将可读数据拷贝到起始位置// 修改读偏移与写偏移_writer_idx = 0;_reader_idx = rsz;}else{// 不够则进行扩容// 这里不移动数据,直接给写偏移之后扩容足够的空间即可_buffer.resize(_writer_idx + len);}}// 写入数据void Write(const void *data, uint64_t len){// 1. 保证有足够的空间 2. 将数据拷贝进去if(len == 0) return ;EnsureWriteSpace(len);// 不能直接使用data,因为void*没有步长const char *d = (const char*)data;std::copy(d, d + len, WritePosition());}// 读取数据void Read(void *buf, uint64_t len){// 要读取的数据大小必须小于可读数据大小assert(len <= ReadAbleSize());std::copy(ReadPosition(), ReadPosition() + len, (char*)buf);}// 清空缓冲区void Clear(){// 不需要真的清0,只需要覆盖即可_reader_idx = 0;_writer_idx = 0;}
private:std::vector<char> _buffer; // 使用vector进行内存空间管理// 这里的读、写位置是相对于vector起始地址的偏移量uint64_t _reader_idx; // 读偏移uint64_t _writer_idx; // 写偏移
};
我们在写入和读取这里提供更多的操作,现在无论是写入,还是读取都是根据一个指针,未来可能需要根据string或者Buffer。
// 将string中的数据全部写入缓冲区
void WriteString(const std::string &data)
{return Write(data.c_str(), data.size());
}
// 将Buffer中的数据全部写入缓冲区
void WriteBuffer(Buffer &data)
{return Write(data.ReadPosition(), data.ReadAbleSize());
}
// 将读取的数据当作一个string返回
std::string ReadAsString(uint64_t len)
{// 要读取的数据大小必须小于可读数据大小assert(len <= ReadAbleSize());std::string str;str.resize(len);// 因为str.c_str()返回的是一个const char*,所以不使用Read(&str[0], len);return str;
}
读取了数据、写入了数据之后,指针是需要偏移的,上面有实现偏移的接口,但是如果每次写完之后都需要再调用,就太麻烦了,这里将写入和指针移动整合到一起。
// 读取或写入数据,并让指针偏移
void ReadAndPop(void* buf, uint64_t len)
{Read(buf, len);MoveReadOffset(len);
}
std::string ReadAsStringAndPop(uint64_t len)
{assert(len <= ReadAbleSize());std::string str = ReadAsString(len);MoveReadOffset(len);return str;
}
void WriteAndPush(const void* data, uint64_t len)
{Write(data, len);MoveWriteOffset(len);
}
void WriteStringAndPush(const std::string& data)
{WriteString(data);MoveWriteOffset(data.size());
}
void WriteBufferAndPush(Buffer& data)
{WriteBuffer(data);MoveWriteOffset(data.ReadAbleSize());
}
协议支持
上面的接口对于高并发服务器使用已经足够了,但是如果要支持HTTP协议的话,还需要提供获取一行数据的接口。
// 支持HTTP协议,获取一行数据
// 寻找换行符
char* FindCRLF()
{char* res = (char*)memchr(ReadPosition(), '\n', ReadAbleSize());return res;
}
// 获取一行数据
std::string GetLine()
{char* pos = FindCRLF();if (pos == nullptr) return "";// +1是为了将换行符也取出来return ReadAsString(pos - (char*)ReadPosition() + 1);
}
std::string GetLineAndPop()
{std::string str = GetLine();MoveReadOffset(str.size());return str;
}
我们现在来对Buffer模块进行测试。先测试以下能否存入数据,再取出数据。
int main()
{Buffer buf;std::string str = "hello!!";buf.WriteStringAndPush(str);std::string tmp;tmp = buf.ReadAsStringAndPop(buf.ReadAbleSize());std::cout << tmp << std::endl;std::cout << buf.ReadAbleSize() << std::endl;return 0;
}
int main()
{Buffer buf;std::string str = "hello!!";buf.WriteStringAndPush(str);Buffer buf1;buf1.WriteBufferAndPush(buf);std::string tmp;tmp = buf.ReadAsStringAndPop(buf.ReadAbleSize());std::cout << tmp << std::endl;std::cout << buf.ReadAbleSize() << std::endl;std::cout << buf1.ReadAbleSize() << std::endl;return 0;
}
是可以正常存储数据和取出数据的。
再测试一下扩容功能是否正常。
int main()
{Buffer buf;for(int i = 0;i < 300;i ++){std::string str = "hello!!" + std::to_string(i) + '\n';buf.WriteStringAndPush(str);}while(buf.ReadAbleSize() > 0){std::string line = buf.GetLineAndPop();std::cout << line;}return 0;
}
是正常的。
Socket模块
Socket模块是对套接字接口进行封装,让上层能够更方便地使用套接字进行通信。需要封装以下接口:
- 创建套接字
- 绑定地址信息
- 开始监听
- 向服务器发起连接
- 获取新连接
- 接收数据
- 发送数据
- 关闭套接字
- 创建一个服务端连接
- 创建一个客户端连接
- 设置套接字选项---开启地址端口重用
- 设置套接字阻塞属性--设置为非阻塞
#define MAX_LISTEN 1024
class Socket
{
public:Socket():_sockfd(-1) {}Socket(int fd):_sockfd(fd) {}~Socket() { Close(); }int Fd() { return _sockfd; }// 创建套接字bool Create(){_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);if(_sockfd < 0){ERR_LOG("CREATE SOCKET FAILED!!!");return false;}return true;}// 绑定地址信息bool Bind(const std::string &ip, uint16_t port){struct sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(struct sockaddr_in);int ret = bind(_sockfd, (struct sockaddr*)&addr, len);if(ret < 0){ERR_LOG("BIND ADDRESS FAILED!!!");return false;}return true;}// 开始监听bool Listen(int backlog = MAX_LISTEN){int ret = listen(_sockfd, backlog);if(ret < 0){ERR_LOG("SOCKET LISTEN FAILED!!!");return false;}return true;}// 向服务器发起连接bool Connect(const std::string &ip, uint16_t port){struct sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(struct sockaddr_in);int ret = connect(_sockfd, (struct sockaddr*)&addr, len);if(ret < 0){ERR_LOG("CONNECT SERVER FAILED!!!");return false;}return true;}// 获取新连接int Accept(){// 直接返回连接对应的文件描述符,不关心客户端的地址信息,所以直接传入nullptrint newfd = accept(_sockfd, nullptr, nullptr);if(newfd < 0){ERR_LOG("SOCKET ACCEPT FAILED!!!");return -1;}return newfd;}// 接收数据 - 以阻塞方式ssize_t Recv(void *buf, size_t len, int flag = 0){ssize_t ret = recv(_sockfd, buf, len, flag);if(ret < 0){// EAGAIN 当前socket的接收缓冲区中没有数据,在非阻塞的情况下会有这个错误// EINTR 表示当前socket的阻塞等待被信号打断了if(errno == EAGAIN || errno == EINTR)return 0;ERR_LOG("SOCKET RECV FAILED!!!");return -1;}return ret; // 实际接收到的数据长度}// 发送数据 - 以阻塞方式ssize_t Send(const void *buf, size_t len, int flag = 0){ssize_t ret = send(_sockfd, buf, len, flag);if(ret < 0){if(errno == EAGAIN || errno == EINTR){return 0;}ERR_LOG("SOCKET SEND FAILED!!!");return -1;}return ret; // 实际发送的数据长度}// 接收数据 - 以非阻塞方式ssize_t NonBlockRecv(void *buf, size_t len){return Recv(buf, len, MSG_DONTWAIT);}// 发送数据 - 以非阻塞方式ssize_t NonBlockSend(void *buf, size_t len){if(len == 0) return 0;return Send(buf, len, MSG_DONTWAIT);}// 关闭套接字void Close(){if(_sockfd != -1){close(_sockfd);_sockfd = -1;}}// 创建一个服务端连接, 最后一个参数表示是否将套接字设置为非阻塞bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false){// 1. 创建套接字 2. 绑定地址 3. 开始监听 4. 设置非阻塞 5. 启动地址重用if(Create() == false) return false;if(block_flag) NonBlock();if(Bind(ip, port) == false) return false;if(Listen() == false) return false;ReuseAddress();return true;}// 创建一个客户端连接bool CreateClient(uint16_t port, const std::string &ip){// 1. 创建套接字 2. 连接服务器if(Create() == false) return false;if(Connect(ip, port) == false) return false;return true;}// 设置套接字选项 - 开启地址端口重用void ReuseAddress(){// 地址重用int val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void*)&val, sizeof(int));// 端口复用val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void*)&val, sizeof(int));}// 设置套接字阻塞属性 - 设置为非阻塞void NonBlock(){int flag = fcntl(_sockfd, F_GETFL, 0);fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);}
private:int _sockfd;
};
现在来测试一下基于Socket能否搭建出一个客户端和服务端进行通信。服务端:
int main()
{Socket lst_sock;lst_sock.CreateServer(8080);while(true){int newfd = lst_sock.Accept();if(newfd < 0) continue;// 将新连接对应的文件描述符封装成一个SocketSocket cli_sock(newfd);// 接收数据char buf[1024] = {0};int ret = cli_sock.Recv(buf, 1023);if(ret < 0){cli_sock.Close();continue;}// 直接将读到的消息返回cli_sock.Send(buf, ret);cli_sock.Close();}lst_sock.Close();return 0;
}
客户端:
int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");std::string str = "hello world!";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};cli_sock.Recv(buf, 1023);DBG_LOG("%s", buf);return 0;
}
此时是能够正常进行通信的。
Channel模块
Channel是Connection中对事件进行管理的模块,表示要监控一个连接(或者说文件描述符)的什么事件,以及事件触发了之后要如何进行处理。所以在接口设计上,可以分为事件管理和事件触发后处理的管理两部分,对于事件管理:
- 描述符是否可读
- 描述符是否可写
- 对描述符监控可读
- 对描述符监控可写
- 解除可读事件监控
- 解除可写事件监控
- 解除所有事件监控
对于事件触发后处理的管理:
- 需要处理的事件:可读,可写,挂断,错误,任意
- 事件处理的回调函数。提供给EventLoop模块,当这个文件描述符有事件触发了,就调用这个函数,内部会根据具体是什么事件触发,调用相应的函数。
class Channel
{using EventCallback = std::function<void()>;
public:Channel();void SetReadCallback(const EventCallback &cb);void SetWriteCallback(const EventCallback &cb);void SetErrorCallback(const EventCallback &cb);void SetCloseCallback(const EventCallback &cb);void SetEventCallback(const EventCallback &cb);bool ReadAble(); // 当前是否监控了可读bool WriteAble(); // 当前是否监控了可写void EnableRead(); // 启动读事件监控void EnableWrite(); // 启动写事件监控void DisableRead(); // 关闭读事件监控void DisableWrite(); // 关闭写事件监控void DisableAll(); // 关闭所有事件监控void Remove(); // 移除监控void HandleEvent(); // 事件处理函数,一旦连接触发了事件,就调用这个函数
private:int _fd;uint32_t _events; // 当前需要监控的事件uint32_t _revents; // 当前连接触发的事件EventCallback _read_callback; // 可读事件触发时的回调函数EventCallback _write_callback; // 可写事件触发时的回调函数EventCallback _error_callback; // 错误事件触发时的回调函数EventCallback _close_callback; // 连接断开事件触发时的回调函数EventCallback _event_callback; // 任意事件触发时的回调函数
};
Channel模块是Connection模块的一个子模块,Channel模块里面对事件的回调函数就是由Connection设置进去的。当连接建立完成之后,会创建一个Connection,启动对这个连接的监控时,是需要将这个Connection内部的Channel挂到EventLoop上的,其实就是调用EventLoop的添加事件监控接口,将这个Channel交给EventLoop内部的Poller模块。因为Channel模块只是说明了需要监控一个连接的什么事件,并且当事件触发了之后要如何处理,但是并没有真正地进行监控,真正进行监控是EventLoop模块的子模块Poller的任务。
class Channel
{using EventCallback = std::function<void()>;
public:Channel(int fd):_fd(fd), _events(0), _revents(0) {}int Fd() { return _fd; }uint32_t Events() { return _events; } // 获取想要监控的事件void SetEvents(uint32_t events) { _revents = events; } // 设置实际就绪的事件void SetReadCallback(const EventCallback &cb) { _read_callback = cb;}void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }// 当前是否监控了可读bool ReadAble() { return (_events & EPOLLIN); } // 当前是否监控了可写bool WriteAble() { return (_events & EPOLLOUT); }// 启动读事件监控void EnableRead() { _events |= EPOLLIN; /*添加到EventLoop的事件监控中*/ }// 启动写事件监控void EnableWrite() { _events |= EPOLLOUT; /*添加到EventLoop的事件监控中*/ }// 关闭读事件监控void DisableRead() { _events &= ~EPOLLIN; /*添加到EventLoop的事件监控中*/ }// 关闭写事件监控void DisableWrite() { _events &= ~EPOLLOUT; /*添加到EventLoop的事件监控中*/ }// 关闭所有事件监控void DisableAll() { _events = 0; }// 移除监控void Remove() { /*调用EventLoop的接口来移除监控*/ }// 设置就绪事件void SetEvents(uint32_t events) { _revents = events; }// 事件处理函数,一旦连接触发了事件,就调用这个函数void HandleEvent(){if((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)){if(_event_callback) _event_callback();if(_read_callback) _read_callback();}// 有可能会释放连接的操作事件,一次只处理一个if(_revents & EPOLLOUT){if(_event_callback) _event_callback();if(_write_callback) _write_callback();}else if(_revents & EPOLLERR){if(_event_callback) _event_callback();if(_error_callback) _error_callback();}else if(_revents & EPOLLHUP){if(_event_callback) _event_callback();if(_close_callback) _close_callback();}}
private:int _fd;uint32_t _events; // 当前需要监控的事件uint32_t _revents; // 当前连接触发的事件EventCallback _read_callback; // 可读事件触发时的回调函数EventCallback _write_callback; // 可写事件触发时的回调函数EventCallback _error_callback; // 错误事件触发时的回调函数EventCallback _close_callback; // 连接断开事件触发时的回调函数EventCallback _event_callback; // 任意事件触发时的回调函数
};
这里重点看一下事件处理函数。任意事件触发的回调函数意思就是所有事件触发了,都要调用一下,一般的功能就是刷新连接的活跃度,避免被当成一个超时连接。在可读事件触发、对端连接关闭(自己这端的连接是正常的)、紧急数据可读事件触发时,是不会关闭自己这端的连接的,此时可以处理完这个事件后,继续处理其他的就绪事件,并且也可以先调用读事件触发回调处理函数,再调用任意事件触发回调处理函数。而可写事件触发、错误事件触发、连接断开事件触发,均有可能导致释放连接,就不应该继续向下处理了,所以使用else if,并且任意事件触发回调处理函数需要在前面调用,因为如果连接已经释放了,再调用就没有意义了。
Poller模块
Poller类编写
Poller模块其实就是对epoll的封装。Poller模块是EventLoop模块的子模块,是进行描述符事件监控的。因为Channel模块已经封装了一个连接关心的事件,并且有了这些事件触发的回调函数,所以Poller模块实际上就是对一个一个的Channel对象进行操作。
接口设计:
- 添加/修改文件描述符的事件监控(一个接口,不存在则添加,存在则修改)
- 移除文件描述符的事件监控
- 启动监控
成员变量:
- 拥有一个epoll的操作句柄
- 拥有一个struct epoll_event的数组,用于保存获取到的就绪事件
- 使用哈希表管理描述符与描述符对应的事件管理Channel对象
逻辑流程:
- 通过Channel对文件描述符进行监控
- 当文件描述符就绪了,通过哈希表找到对应的Channel对象,设置就绪事件,然后调用Channel内部的事件处理函数,即可完成对就绪事件的处理。
#define MAX_EPOLLEVENTS 1024
class Poller
{
public:Poller(){_epfd = epoll_create(MAX_EPOLLEVENTS);if(_epfd < 0){ERR_LOG("EPOLL CREATE FAILED!!!");abort(); // 退出程序}}// 添加或修改监控事件void UpdateEvent(Channel *channel){bool ret = HasChannel(channel);if(ret == false){// 不存在则添加_channels.insert(std::make_pair(channel->Fd(), channel));return Update(channel, EPOLL_CTL_ADD);}return Update(channel, EPOLL_CTL_MOD);}// 移除监控void RemoveEvent(Channel *channel){auto it = _channels.find(channel->Fd());if(it != _channels.end()){_channels.erase(it);}Update(channel, EPOLL_CTL_DEL);}// 开始监控,返回活跃连接void Poll(std::vector<Channel*> *active){int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1);if(nfds < 0){if(errno == EINTR) return ;ERR_LOG("EPOLL WAIT ERROR: %s\n", strerror(errno));abort(); }// 设置就绪事件for(int i = 0;i < nfds;i ++){auto it = _channels.find(_evs[i].data.fd);assert(it != _channels.end());it->second->SetEvents(_evs[i].events); active->push_back(it->second);}return;}
private:// 对epoll的直接操作void Update(Channel *channel, int op){int fd = channel->Fd();struct epoll_event ev;ev.data.fd = fd;ev.events = channel->Events();int ret = epoll_ctl(_epfd, op, fd, &ev);if(ret < 0){ERR_LOG("EPOLLCTL FAILED!!!");}return ;}// 判断一个Channel是否已经添加了事件监控bool HasChannel(Channel *channel){auto it = _channels.find(channel->Fd());if(it == _channels.end()) return false;return true;}
private:int _epfd;struct epoll_event _evs[MAX_EPOLLEVENTS];std::unordered_map<int, Channel*> _channels;
};
Poller与Channel整合
我们前面说过,启动对一个连接的某个事件的监控时,就是将这个Connection内部的Channel添加到EventLoop内部的Poller中,让Poller进行监控。虽然我们现在既没有Connection,也没有EventLoop,但是我们可以先完成将Channel交给Poller进行监控的操作。
Channel中就需要有一个Poller。
class Channel
{using EventCallback = std::function<void()>;
public:Channel(Poller *poller, int fd):_fd(fd), _events(0), _revents(0), _poller(poller) {}int Fd() { return _fd; }uint32_t Events() { return _events; } // 获取想要监控的事件void SetEvents(uint32_t events) { _revents = events; } // 设置实际就绪的事件void SetReadCallback(const EventCallback &cb) { _read_callback = cb;}void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }// 当前是否监控了可读bool ReadAble() { return (_events & EPOLLIN); } // 当前是否监控了可写bool WriteAble() { return (_events & EPOLLOUT); }// 启动读事件监控void EnableRead() { _events |= EPOLLIN; Update(); }// 启动写事件监控void EnableWrite() { _events |= EPOLLOUT; Update(); }// 关闭读事件监控void DisableRead() { _events &= ~EPOLLIN; Update(); }// 关闭写事件监控void DisableWrite() { _events &= ~EPOLLOUT; Update(); }// 关闭所有事件监控void DisableAll() { _events = 0; Update(); }// 移除监控void Remove();// 添加/修改监控void Update();// 事件处理函数,一旦连接触发了事件,就调用这个函数void HandleEvent(){if((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)){if(_event_callback) _event_callback();if(_read_callback) _read_callback();}// 有可能会释放连接的操作事件,一次只处理一个if(_revents & EPOLLOUT){if(_event_callback) _event_callback();if(_write_callback) _write_callback();}else if(_revents & EPOLLERR){if(_event_callback) _event_callback();if(_error_callback) _error_callback();}else if(_revents & EPOLLHUP){if(_event_callback) _event_callback();if(_close_callback) _close_callback();}}
private:int _fd;Poller *_poller;uint32_t _events; // 当前需要监控的事件uint32_t _revents; // 当前连接触发的事件EventCallback _read_callback; // 可读事件触发时的回调函数EventCallback _write_callback; // 可写事件触发时的回调函数EventCallback _error_callback; // 错误事件触发时的回调函数EventCallback _close_callback; // 连接断开事件触发时的回调函数EventCallback _event_callback; // 任意事件触发时的回调函数
};void Channel::Remove() { return _poller->RemoveEvent(this); }
void Channel::Update() { return _poller->UpdateEvent(this); }
因为Channel定义在Poller前面,所以只能在前面声明,因为没有定义,所以后面两个函数只能在类外实现。
对于服务端,现在我们创建套接字之后,就不能立即获取新连接了,而是创建一个Channel,让Channel对这个套接字进行事件管理。对于监听套接字,需要监控可读事件,回调函数是获取新连接,并为新连接创建Channel。
void HandleClose(Channel *channel)
{std::cout << "close: " << channel->Fd() << std::endl;channel->Remove(); // 移除监控delete channel;
}
void HandleRead(Channel *channel)
{int fd = channel->Fd();char buf[1024] = {0};int ret = recv(fd, buf, 1023, 0);if(ret <= 0){return HandleClose(channel);}// 读到数据之后,给客户端回复,启动可写事件channel->EnableWrite();std::cout << buf << std::endl;
}
void HandleWrite(Channel *channel)
{int fd = channel->Fd();// 作为测试,向客户端写回固定的数据const char *data = "天气还不错!!!";int ret = send(fd, data, strlen(data), 0);if(ret < 0){return HandleClose(channel);}channel->DisableWrite(); // 关闭写监控
}
void HandleError(Channel *channel)
{return HandleClose(channel);
}
void HandleEvent(Channel *channel)
{std::cout << "有一个事件触发了!!!\n";
}// 对于监听套接字读事件触发的回调函数
void Acceptor(Poller *poller, Channel *lst_channel)
{// 获取新连接int fd = lst_channel->Fd();int newfd = accept(fd, nullptr, nullptr);if(newfd < 0) return ;// 为新连接封装ChannelChannel *channel = new Channel(poller, newfd);channel->SetReadCallback(std::bind(HandleRead, channel));channel->SetWriteCallback(std::bind(HandleWrite, channel));channel->SetCloseCallback(std::bind(HandleClose, channel));channel->SetErrorCallback(std::bind(HandleError, channel));channel->SetEventCallback(std::bind(HandleEvent, channel));channel->EnableRead();
}int main()
{Poller poller;Socket lst_sock;lst_sock.CreateServer(8080);// 为监听套接字创建一个ChannelChannel channel(&poller, lst_sock.Fd());channel.SetReadCallback(std::bind(Acceptor, &poller, &channel));// 启动可读事件监控channel.EnableRead();while(true){// 开始事件监控,获取事件就绪的连接,并进行处理std::vector<Channel*> actives;poller.Poll(&actives);for(auto &a : actives){a->HandleEvent();}}lst_sock.Close();return 0;
}
在获取新连接时,为了简便,没有使用Socket,直接使用文件描述符。客户端:
int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");while(true){std::string str = "今天天气怎么样?";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};cli_sock.Recv(buf, 1023);DBG_LOG("%s", buf);sleep(1);}return 0;
}
我们让客户端每隔1秒给服务端发送一条固定的消息,并将得到的响应打印出来。服务端会读取客户端发送过来的数据,并回复一条固定的响应。
服务端每次会打印出两个事件触发,是因为读事件触发后,又启动了写事件。
EventLoop模块
EventLoop模块是真正进行事件管理的模块,Poller只是它的一个子模块,负责进行事件监控。
eventfd认识
eventfd是一种事件通知机制。在内核中本质上就是一个计数器,创建一个eventfd在内核中就会创建一个计数器。每当向eventfd中写入一个数值,表示进行了一次事件通知,可以使用read将eventfd中的数据读取出来,读取到的数据就是通知的次数。
用处:在EventLoop模块中实现线程间事件通知的功能。
#include <sys/eventfd.h>int eventfd(unsigned int initval, int flags);
功能:创建一个eventfd对象,实现事件通知。参数:
- initval:计数初值
- flags:EFD_CLOEXEC表示禁止进程复制;EFD_NONBLOCK表示启动非阻塞属性
返回值:返回一个文件描述符。所以,eventfd是可以使用read/write/close进行操作的。注意:read和write在进行IO的时候,数据只能是一个8字节数据。
int main()
{int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if(efd < 0){perror("eventfd failed!!!");return -1;}uint64_t val = 1;write(efd, &val, sizeof(val));write(efd, &val, sizeof(val));write(efd, &val, sizeof(val));uint64_t res = 0;read(efd, &res, sizeof(res));std::cout << res << std::endl;close(efd);return 0;
}
可以看到,写入多次时,只需要一次就能读取完。
EventLoop模块设计思想
EventLoop模块是进行事件监控,以及事件处理的模块,这个模块与线程是一一对应的。也就是说,一个线程一个EventLoop。
监控一个连接,一旦这个连接有事件就绪了,就需要对事件进行处理。如果一个文件描述符的IO事件被多个线程同时处理,是会存在线程安全问题的,因此我们需要将一个连接的事件监控、事件处理,以及其他操作都交给同一个线程处理。如何将这一系列的操作都放在一个线程中处理呢?我们是没办法将一个连接与一个线程进行绑定的,此时就可以利用一个线程一个EventLoop,将一个连接与一个EventLoop进行绑定。在上面,每一个Channel绑定一个Poller,未来只需要将Poller修改为EventLoop即可。
此时仍然是没办法保证所有的操作均由EventLoop的线程完成的。当其他线程需要与EventLoop交互时,例如主线程获取到了新线程后,将文件描述符注册到子线程的EventLoop中,也就是将文件描述符添加到EventLoop的Poller中,如果EventLoop允许主线程直接进行操作,必然是线程不安全的。此时可以定义一个任务队列,将向EventLoop注册一个文件描述符的操作封装成一个任务,放入到任务队列中,后序由EventLoop来执行这个任务,即可保证线程安全。
可以在EventLoop内部定义一个函数,这个函数的参数是一个要执行的任务,函数内部可以判断出调用这个函数的线程是否是EventLoop所在的线程,如果是,直接执行任务,如果不是,则将这个任务放入到任务队列当中,等就绪事件处理完成之后,再统一处理任务队列中的任务。未来当其他线程想与EventLoop交互时,可以直接调用这个函数。
eventfd的处理流程:
- 在线程中对文件描述符进行事件监控
- 有文件描述符就绪则对文件描述符进行事件处理
- 所有就绪事件处理完成后,将任务队列中的所有任务一一执行
这里是需要保证任务队列线程安全的,所以可以给任务队列提供一把锁。任务队列也不一定要是队列,未来任务队列中就是一个一个的函数对象,如果我们加锁后对任务队列中的任务一个一个执行,效率太低了。所以我们可以使用一个vector来保存任务对象,未来要执行任务队列时,直接将vector拷贝一份,解锁,然后根据拷贝结果执行。
EventLoop代码设计
EventLoop模块根据目前的分析,需要实现以下功能:
- 事件监控。使用Poller模块,有事件就绪则进行事件处理。
- 执行任务队列中的任务。
注意:如果其他的线程将任务放到了任务队列中,而EventLoop的线程仍然阻塞在事件监控处,此时会导致任务队列中的任务得不到及时处理,所以EventLoop中要有一个eventfd对象,用于进行事件通知,唤醒阻塞状态的EventLoop线程。
并且还要有一个Channel对象,用于对eventfd进行管理,并将其添加到EventLoop的事件监控当中。因为当需要唤醒时,会向内部写入数据,所以可以监控它的可读事件。当可读事件触发了,EventLoop就会从阻塞状态恢复,从而执行任务队列中的任务。
前面我们说了,我们可以定义一个函数,这个函数可以判断出调用这个函数的线程是否是EventLoop线程。所以给成员变量增加一个线程ID,用于保存EventLoop所在线程的ID,专门用来判断用户的某个操作的线程与EventLoop所在线程是否是同一个线程,若是同一个线程则直接执行,若不是同一个线程则将任务压入任务队列。
class EventLoop
{using Functor = std::function<void()>;
private:// 执行任务池中的所有任务void RunAllTask(){// 拷贝一个任务池std::vector<Functor> functor;{std::unique_lock<std::mutex> _lock(_mutex);_tasks.swap(functor);}for(auto &f : functor){f();}return ;}// 创建一个eventfdstatic int CreateEventFd(){int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if(efd < 0){ERR_LOG("CREATE EVENTFD FAILED!!!");abort();}return efd;}// eventfd的可读事件回调函数void ReadEventFd(){uint64_t res = 0;int ret = read(_event_fd, &res, sizeof(res));if(ret < 0){// EINTR -- 被信号打断 EAGAIN -- 表示无数据可读if(errno == EINTR || errno == EAGAIN) return ;ERR_LOG("READ EVENTFD FAILED!!!");abort();}return ;}// 给eventfd中写入一个数据,触发它的可读事件void WeakUpEventFd(){uint64_t val = 1;int ret = write(_event_fd, &val, sizeof(val));if(ret < 0){if(errno == EINTR) return ;ERR_LOG("READ EVENTFD FAILED!!!");abort();}return ;}
public:EventLoop():_thread_id(std::this_thread::get_id()),_event_fd(CreateEventFd()),_event_channel(new Channel(this, _event_fd)){// 给eventfd添加可读事件回调函数,读取eventfd事件通知次数_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventFd, this));// 启动eventfd的读事件监控_event_channel->EnableRead();}// 判断当前线程是否是EventLoop对应的线程bool IsInLoop(){return (_thread_id == std::this_thread::get_id());}// 判断将要执行的任务是否处于EventLoop所在线程,如果是则执行,不是则压入任务池void RunInLoop(const Functor &cb){if(IsInLoop()) return cb();return QueueInLoop(cb);}// 将操作压入任务池void QueueInLoop(const Functor &cb){{std::unique_lock<std::mutex> _lock(_mutex);_tasks.push_back(cb);}// 唤醒可能因为没有事件就绪,而导致的epoll阻塞// 其实就是给eventfd写入一个数据,触发eventfd的可读事件WeakUpEventFd();}// 添加/修改描述符的事件监控void UpdateEvent(Channel *channel){return _poller.UpdateEvent(channel);}// 删除描述符的事件监控void RemoveEvent(Channel *channel){return _poller.RemoveEvent(channel);}// 启动EventLoopvoid Start(){// 1. 事件监控std::vector<Channel*> actives;_poller.Poll(&actives);// 2. 事件处理for(auto &channel : actives){channel->HandleEvent();} // 3. 执行任务RunAllTask();}
private:std::thread::id _thread_id; // 线程IDint _event_fd; // eventfd唤醒IO事件监控有可能导致的阻塞std::unique_ptr<Channel> _event_channel; // 管理_event_fdPoller _poller; // 进行所有文件描述符的事件监控std::vector<Functor> _tasks; // 任务池std::mutex _mutex; // 保证任务池的线程安全
};
当外部要与EventLoop进行交互时,必须通过EventLoop提供的线程安全接口RunInLoop进行。比方说外部要向这个EventLoop注册一个文件描述符,此时就需要将任务压入到任务队列中,让任务在EventLoop所在线程内部执行。
当其他线程调用了RunInLoop,将任务压入到任务队列之后,如果EventLoop所在线程阻塞在了Start的Poll,任务队列中的任务就得不到及时的执行,所以在将任务压入任务队列之后,就需要唤醒EventLoop线程。
我们还需要对之前的Channel进行修改,内部不再使用Poller,而是使用EventLoop。
class Channel
{using EventCallback = std::function<void()>;
public:Channel(EventLoop *loop, int fd):_fd(fd), _events(0), _revents(0), _loop(loop) {}int Fd() { return _fd; }uint32_t Events() { return _events; } // 获取想要监控的事件void SetEvents(uint32_t events) { _revents = events; } // 设置实际就绪的事件void SetReadCallback(const EventCallback &cb) { _read_callback = cb;}void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }// 当前是否监控了可读bool ReadAble() { return (_events & EPOLLIN); } // 当前是否监控了可写bool WriteAble() { return (_events & EPOLLOUT); }// 启动读事件监控void EnableRead() { _events |= EPOLLIN; Update(); }// 启动写事件监控void EnableWrite() { _events |= EPOLLOUT; Update(); }// 关闭读事件监控void DisableRead() { _events &= ~EPOLLIN; Update(); }// 关闭写事件监控void DisableWrite() { _events &= ~EPOLLOUT; Update(); }// 关闭所有事件监控void DisableAll() { _events = 0; Update(); }// 移除监控void Remove();// 添加/修改监控void Update();// 事件处理函数,一旦连接触发了事件,就调用这个函数void HandleEvent(){if((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)){if(_event_callback) _event_callback();if(_read_callback) _read_callback();}// 有可能会释放连接的操作事件,一次只处理一个if(_revents & EPOLLOUT){if(_event_callback) _event_callback();if(_write_callback) _write_callback();}else if(_revents & EPOLLERR){if(_event_callback) _event_callback();if(_error_callback) _error_callback();}else if(_revents & EPOLLHUP){if(_event_callback) _event_callback();if(_close_callback) _close_callback();}}
private:int _fd;EventLoop *_loop;uint32_t _events; // 当前需要监控的事件uint32_t _revents; // 当前连接触发的事件EventCallback _read_callback; // 可读事件触发时的回调函数EventCallback _write_callback; // 可写事件触发时的回调函数EventCallback _error_callback; // 错误事件触发时的回调函数EventCallback _close_callback; // 连接断开事件触发时的回调函数EventCallback _event_callback; // 任意事件触发时的回调函数
};void Channel::Remove() { return _loop->RemoveEvent(this); }
void Channel::Update() { return _loop->UpdateEvent(this); }
我们来测试一下EventLoop的事件监控功能是否正常。服务端:
void HandleClose(Channel *channel)
{std::cout << "close: " << channel->Fd() << std::endl;channel->Remove(); // 移除监控delete channel;
}
void HandleRead(Channel *channel)
{int fd = channel->Fd();char buf[1024] = {0};int ret = recv(fd, buf, 1023, 0);if(ret <= 0){return HandleClose(channel);}// 读到数据之后,给客户端回复,启动可写事件channel->EnableWrite();std::cout << buf << std::endl;
}
void HandleWrite(Channel *channel)
{int fd = channel->Fd();// 作为测试,向客户端写回固定的数据const char *data = "天气还不错!!!";int ret = send(fd, data, strlen(data), 0);if(ret < 0){return HandleClose(channel);}channel->DisableWrite(); // 关闭写监控
}
void HandleError(Channel *channel)
{return HandleClose(channel);
}
void HandleEvent(Channel *channel)
{std::cout << "有一个事件触发了!!!\n";
}// 对于监听套接字读事件触发的回调函数
void Acceptor(EventLoop *loop, Channel *lst_channel)
{// 获取新连接int fd = lst_channel->Fd();int newfd = accept(fd, nullptr, nullptr);if(newfd < 0) return ;// 为新连接封装ChannelChannel *channel = new Channel(loop, newfd);channel->SetReadCallback(std::bind(HandleRead, channel));channel->SetWriteCallback(std::bind(HandleWrite, channel));channel->SetCloseCallback(std::bind(HandleClose, channel));channel->SetErrorCallback(std::bind(HandleError, channel));channel->SetEventCallback(std::bind(HandleEvent, channel));channel->EnableRead();
}int main()
{EventLoop loop;Socket lst_sock;lst_sock.CreateServer(8080);// 为监听套接字创建一个ChannelChannel channel(&loop, lst_sock.Fd());channel.SetReadCallback(std::bind(Acceptor, &loop, &channel));// 启动可读事件监控channel.EnableRead();while(true){// 开始事件监控,获取事件就绪的连接,并进行处理loop.Start();}lst_sock.Close();return 0;
}
客户端:
int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");while(true){std::string str = "今天天气怎么样?";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};cli_sock.Recv(buf, 1023);DBG_LOG("%s", buf);sleep(1);}return 0;
}
两者可以正常进行通信,所以EventLoop的事件监控功能是没有问题的。
TimerWheel定时器
我们之前实现的TimerWheel时间轮需要每秒执行一次定时任务,到了时间后,要如何通知定时器执行任务呢?此时就需要使用timerfd了,每隔1秒,通知一次时间轮。
定时器模块的整合:
- timerfd:实现内核每隔一段时间,给进程一次超时事件(timerfd可读)
- timerwheel:实现每次执行Runtimetask,都可以执行一次到期的定时任务
要实现一个完整的秒级定时器,就需要将上面的两个模块整合到一起,具体做法是:timerfd设置为每秒触发一次定时事件,当事件被触发,则运行一次timerwheel的runtimetask,执行一下所有的过期的定时任务,而timerfd的事件监控与触发,可以融合EventLoop来实现。业务EventLoop是对文件描述符进行事件监控的,所以可以使用EventLoop对timerfd进行事件监控,这样当timerfd触发了事件,就可以通过EventLoop得知,从而对事件进行处理,也就是调用timerwheel的rumtimetask。
// 定时任务对象类
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{
public:TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb): _id(id), _timeout(delay), _task_cb(cb), _canceled(false) { // 设置timerfd读事件触发时的回调函数_timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));// 启动读事件监控_timer_channel->EnableRead(); }~TimerTask() {// 只有定时任务没有被取消才执行定时任务if(_canceled == false) _task_cb(); _release(); }void SetRelease(const ReleaseFunc &cb) { _release = cb; }uint32_t DelayTime() { return _timeout; }// 取消定时任务void Cancel() { _canceled = true; }
private:uint64_t _id; // 定时任务对象的IDuint32_t _timeout; // 定时任务的超时时间bool _canceled; // 该定时任务是否被取消TaskFunc _task_cb; // 定时任务对象要执行的定时任务ReleaseFunc _release; // 删除TimerWheel中保存的定时任务对象信息
};// 时间轮定时器
class TimerWheel
{
public:TimerWheel(EventLoop *loop):_capacity(60), _tick(0), _wheel(_capacity), _loop(loop),_timerfd(CreateTimerfd()), _timer_channel(new Channel(_loop, _timerfd)){}// 添加定时任务void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb);// 取消定时任务void TimerCancel(uint64_t id);void TimerRefresh(uint64_t id);// 查找是否存在指定的定时任务// 这个接口是存在线程安全的,只能在EventLoop所在线程内部调用bool HasTimer(uint64_t id){auto it = _timers.find(id);if(it == _timers.end()){return false;}return true;}
private:// 将ID与weak_ptr的映射关系从_timers中移除void RemoveTimer(uint64_t id){auto it = _timers.find(id);if(it != _timers.end()){_timers.erase(it);}}// 创建一个timerfdstatic int CreateTimerfd(){// 创建timerfdint timerfd = timerfd_create(CLOCK_MONOTONIC, 0);if(timerfd < 0){ERR_LOG("TIMERFD CREATE FAILED!!!");abort();}// 设置超时时间为1秒struct itimerspec itime;itime.it_value.tv_sec = 1;itime.it_value.tv_nsec = 0;itime.it_interval.tv_sec = 1;itime.it_interval.tv_nsec = 0;timerfd_settime(timerfd, 0, &itime, nullptr);return timerfd;}// 读取timerfd底层文件内的数据int ReadTimerfd(){uint64_t times;int ret = read(_timerfd, ×, 8);if(ret < 0){ERR_LOG("READ TIMEFD FAILED!!!");abort();}return times;}// 运转时间轮,让秒针每秒向后走一步void RunTimerTask(){_tick = (_tick + 1) % _capacity;_wheel[_tick].clear();}// timerfd时间到了之后执行的任务void OnTime(){// 时间到了之后,读取定时器的数值,并执行定时任务// 必须读取定时器的数值,否则会一直触发可读时间int times = ReadTimefd();for (int i = 0; i < times; i++) {RunTimerTask();}}// 线程安全地添加定时任务void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb){PtrTask pt(new TimerTask(id, delay, cb));pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);_timers[id] = WeakTask(pt);}// 线程安全地刷新定时任务void TimerRefreshInLoop(uint64_t id){// 通过保存的定时任务对象的weak_ptr构造一个shared_ptr,并添加到时间轮中auto it = _timers.find(id);if(it == _timers.end()){return; }PtrTask pt = it->second.lock();int delay = pt->DelayTime();int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);}// 线程安全地取消定时任务void TimerCancelInLoop(uint64_t id){auto it = _timers.find(id);if(it == _timers.end()){return ;}PtrTask pt = it->second.lock();if(pt) pt->Cancel();}
private:using WeakTask = std::weak_ptr<TimerTask>;using PtrTask = std::shared_ptr<TimerTask>;int _tick; // 秒针int _capacity; // 表盘容量,其实就是最大延迟时间std::vector<std::vector<PtrTask>> _wheel;// 保存定时任务ID与weak_ptr的映射关系,这里一定不能是shared_ptr// 否则shared_ptr的引用计数永远不为0std::unordered_map<uint64_t, WeakTask> _timers;EventLoop *_loop;// 定时器timerfd的描述符,可读事件回调就是读取计数器,执行定时任务int _timerfd;std::unique_ptr<Channel> _timer_channel;
};// 添加定时任务
void TimerWheel::TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb)
{_loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, cb));
}// 刷新/延迟定时任务
void TimerWheel::TimerRefresh(uint64_t id)
{_loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));
}// 取消定时任务
void TimerWheel::TimerCancel(uint64_t id)
{_loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id));
}
时间轮定时器一定要确保线程安全,有两点原因:
- 一个时间轮定时器是会被多个线程同时使用的,向定时器添加定时任务时,是在哪一个线程都有可能添加的,而定时器是与EventLoop保存在一起的,里面的很多操作都是对连接进行操作的,所以定时任务的添加、刷新、取消等都需要在EventLoop所在线程中执行,并且定时任务也需要在EventLoop所在线程中执行
- TimerWheel中是使用一个哈希表管理所有定时任务的,如果如何线程都可以对其进行操作,那么是存在线程安全问题的,所以可以将对定时器的操作都放在一个线程中进行,确保线程安全。
TimerWheel与EventLoop整合
class EventLoop
{using Functor = std::function<void()>;
private:// 执行任务池中的所有任务void RunAllTask(){// 拷贝一个任务池std::vector<Functor> functor;{std::unique_lock<std::mutex> _lock(_mutex);_tasks.swap(functor);}for(auto &f : functor){f();}return ;}// 创建一个eventfdstatic int CreateEventFd(){int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if(efd < 0){ERR_LOG("CREATE EVENTFD FAILED!!!");abort();}return efd;}// eventfd的可读事件回调函数void ReadEventFd(){uint64_t res = 0;int ret = read(_event_fd, &res, sizeof(res));if(ret < 0){// EINTR -- 被信号打断 EAGAIN -- 表示无数据可读if(errno == EINTR || errno == EAGAIN) return ;ERR_LOG("READ EVENTFD FAILED!!!");abort();}return ;}// 给eventfd中写入一个数据,触发它的可读事件void WeakUpEventFd(){uint64_t val = 1;int ret = write(_event_fd, &val, sizeof(val));if(ret < 0){if(errno == EINTR) return ;ERR_LOG("READ EVENTFD FAILED!!!");abort();}return ;}
public:EventLoop():_thread_id(std::this_thread::get_id()),_event_fd(CreateEventFd()),_event_channel(new Channel(this, _event_fd)),_timer_wheel(this){// 给eventfd添加可读事件回调函数,读取eventfd事件通知次数_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventFd, this));// 启动eventfd的读事件监控_event_channel->EnableRead();}// 判断当前线程是否是EventLoop对应的线程bool IsInLoop(){return (_thread_id == std::this_thread::get_id());}// 判断将要执行的任务是否处于EventLoop所在线程,如果是则执行,不是则压入任务池void RunInLoop(const Functor &cb){if(IsInLoop()) return cb();return QueueInLoop(cb);}// 将操作压入任务池void QueueInLoop(const Functor &cb){{std::unique_lock<std::mutex> _lock(_mutex);_tasks.push_back(cb);}// 唤醒可能因为没有事件就绪,而导致的epoll阻塞// 其实就是给eventfd写入一个数据,触发eventfd的可读事件WeakUpEventFd();}// 添加/修改描述符的事件监控void UpdateEvent(Channel *channel){return _poller.UpdateEvent(channel);}// 删除描述符的事件监控void RemoveEvent(Channel *channel){return _poller.RemoveEvent(channel);}// 启动EventLoopvoid Start(){// 1. 事件监控std::vector<Channel*> actives;_poller.Poll(&actives);// 2. 事件处理for(auto &channel : actives){channel->HandleEvent();} // 3. 执行任务RunAllTask();}// 向定时器添加定时任务 void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { return _timer_wheel.TimerAdd(id, delay, cb); }// 刷新定时任务void TimerRefresh(uint64_t id) { return _timer_wheel.TimerRefresh(id); }// 取消定时任务void TimerCancel(uint64_t id) { return _timer_wheel.TimerCancel(id); }// 检查定时器中是否有指定的定时任务bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }
private: std::thread::id _thread_id; // 线程IDint _event_fd; // eventfd唤醒IO事件监控有可能导致的阻塞std::unique_ptr<Channel> _event_channel; // 管理_event_fdPoller _poller; // 进行所有文件描述符的事件监控std::vector<Functor> _tasks; // 任务池std::mutex _mutex; // 保证任务池的线程安全TimerWheel _timer_wheel; // 定时器,用于超时管理
};
我们来测试一下定时功能是否正常。服务端:
void HandleClose(Channel *channel)
{DBG_LOG("close fd: %d", channel->Fd());channel->Remove(); // 移除监控delete channel;
}
void HandleRead(Channel *channel)
{int fd = channel->Fd();char buf[1024] = {0};int ret = recv(fd, buf, 1023, 0);if(ret <= 0){return HandleClose(channel);}DBG_LOG("%s", buf);// 读到数据之后,给客户端回复,启动可写事件channel->EnableWrite();
}
void HandleWrite(Channel *channel)
{int fd = channel->Fd();// 作为测试,向客户端写回固定的数据const char *data = "天气还不错!!!";int ret = send(fd, data, strlen(data), 0);if(ret < 0){return HandleClose(channel);}channel->DisableWrite(); // 关闭写监控
}
void HandleError(Channel *channel)
{return HandleClose(channel);
}
// 任意事件触发时,就刷新定时任务
void HandleEvent(EventLoop *loop, Channel *channel, uint64_t timerid)
{loop->TimerRefresh(timerid);
}// 对于监听套接字读事件触发的回调函数
void Acceptor(EventLoop *loop, Channel *lst_channel)
{// 获取新连接int fd = lst_channel->Fd();int newfd = accept(fd, nullptr, nullptr);if(newfd < 0) return ;// 为新连接封装Channeluint64_t timerid = rand() % 10000;Channel *channel = new Channel(loop, newfd);channel->SetReadCallback(std::bind(HandleRead, channel));channel->SetWriteCallback(std::bind(HandleWrite, channel));channel->SetCloseCallback(std::bind(HandleClose, channel));channel->SetErrorCallback(std::bind(HandleError, channel));channel->SetEventCallback(std::bind(HandleEvent, loop, channel, timerid));// 非活跃连接10秒后释放// 注意:设置定时销毁任务时,必须在启动读事件之前,因为有可能启动了事件监控后,立即就有了事件,但是这个时候还没用任务loop->TimerAdd(timerid, 10, std::bind(HandleClose, channel));channel->EnableRead();
}int main()
{srand(time(nullptr));EventLoop loop;Socket lst_sock;lst_sock.CreateServer(8080);// 为监听套接字创建一个ChannelChannel channel(&loop, lst_sock.Fd());channel.SetReadCallback(std::bind(Acceptor, &loop, &channel));// 启动可读事件监控channel.EnableRead();while(true){// 开始事件监控,获取事件就绪的连接,并进行处理loop.Start();}lst_sock.Close();return 0;
}
客户端:
int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");for(int i = 0;i < 5;i ++){std::string str = "今天天气怎么样?";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};cli_sock.Recv(buf, 1023);DBG_LOG("%s", buf);sleep(1);}while(1) sleep(1);return 0;
}
我们让客户端给服务端发送5次数据后就不再发送了。
可以看到,定时功能是正常的。
Connection模块
Connection模块功能思想
Connection是对通信连接进行整体管理的模块,对通信连接的所有操作都是通过这个模块提供的功能完成的。整合了Buffer模块、Socket模块、Channel模块。
管理:
- 套接字的管理,能够进行套接字的操作
- 连接事件的管理,可读、可写、错误、挂断、任意
- 缓冲区的管理,便于Socket数据的接收和发送
- 协议上下文的管理,记录请求数据的处理过程
- 回调函数的管理
因为连接接收到数据之后该如何处理,需要由用户决定,因此必须有业务处理回调函数,一个连接建立成功后,该如何处理,由用户决定,因此必须有连接建立成功的回调函数,一个连接关闭前,该如何处理,由用户决定,因此必须由关闭连接回调函数。任意事件的产生,有没有某些处理,由用户决定,因此必须有任意事件的回调函数。
功能:
- 发送数据。给用户提供的发送数据的接口,并不是真正的发送接口,而是将数据放到发送缓冲区,然后启动写事件监控。写事件触发的回调就是将发送缓冲区内的数据发送出去。
- 关闭连接。给用户提供的关闭连接的接口,并不是真正的关闭接口,在真正关闭之前需要看看输入输出缓冲区是否有数据待处理。
- 启动非活跃连接的超时销毁功能。
- 取消非活跃连接的超时销毁功能。
- 协议切换。一个连接接收到数据后如何进行业务处理,取决于上下文和数据的业务处理回调函数。切换协议就是切换数据的处理方式,所以需要清空上下文,更改回调函数。
Connection模块类设计
我们来看一个场景:当有多个线程同时对某个连接操作时,可能出现一个线程将连接释放,另一个线程操作连接的情况,此时就会导致程序崩溃。即使每个线程对连接的操作都由EventLoop线程进行处理,但是也是会存在时序问题的,可能先释放了连接,再对连接操作。此时可以使用智能指针shared_ptr对Connection对象进行管理,这样就能保证任意一个地方对Connection对象进行操作的时候,保存了一份shared_ptr,因此就算其他地方进行了释放操作,也只是对shared_ptr的计数器-1,而不会导致Connection的实际释放。
协议上下文是记录上一次对于不完整请求处理到了哪里。
还要有一个连接状态:
- 连接关闭状态:马上要被释放了,不能再对连接进行任何操作
- 连接建立未完成状态:获取了连接,但是还未设置回调函数
- 连接建立完成状态
- 待关闭状态:需要先清空缓冲区内的数据后,才能设置为连接关闭状态
上面的很多操作都是对于连接的操作,是不能让使用者直接操作的,为了保证线程安全,必须将对连接的所有操作均放在EventLoop线程中才行,所以Connection中需要有一个EventLoop。
typedef enum
{DISCONNECTED, // 连接关闭状态CONNECTING, // 连接建立未完成状态CINNECTED, // 连接建立完成状态DISCONNECTING // 连接待关闭状态
}ConnStatu;
using PtrConnection = std::shared_ptr<Connection>;
class Connection
{using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(cosst PtrConnection&)>;
public:Connection();~Connection();// 发送数据,将数据放到发送缓冲区,启动写事件监控void Send(char *data, size_t len);// 提供给组件使用者的关闭连接接口,不会真的关闭连接,需要判断有没有数据待处理void Shutdown();// 启动非活跃销毁void EnableInactiveRelease(int sec);// 取消非活跃销毁void CancelInactiveRelease();// 协议切换 --- 重置上下文以及阶段性处理函数void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg,const ClosedCallback &closed, const AnyEventCallback &event);// 获取连接对应的文件描述符int Fd();// 获取连接IDint Id();// 连接是否处于CONNECTED状态bool Connected();// 设置上下文,连接建立完成时进行调用void SetContext(const Any &context);// 获取上下文,返回的是指针Any *GetContext();void SetConnectedCallback(const ConnectedCallback &cb);void SetMessageCallback(const MessageCallback &cb);void SetClosedCallback(const ClosedCallback &cb);void SetAnyEventCallback(const AnyEventCallback &cb);// 连接建立完成后,进行Channel回调设置,启动读事件监控,调用用户设置的连接建立完成的回调函数void Established();
private:// 对连接的操作要放到EventLoop线程中// 连接建立完成之后,进行设置(启动读监控,调用回调函数)void EstablishedInLoop();// 真正释放连接的接口void ReleaseInLoop();// 线程安全的发送数据接口void SendInLoop(char *data, size_t len);// 线程安全的关闭连接接口void ShutdownInLoop();// 线程安全的启动非活跃销毁void EnableInactiveReleaseInLoop(int sec);// 线程安全的取消非活跃销毁void CancelInactiveReleaseInLoop();// 线程安全的协议切换void UpgradeInLoop(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg,const ClosedCallback &closed, const AnyEventCallback &event);// 下面5个函数是给Channel的事件回调函数void HandlerRead();void HandlerWrite();void HandlerClose();void HandlerError();void HandlerEvent();
private:uint64_t _conn_id; // 连接的ID,便于进行管理和查找// uint64_t _timer_id; // 定时器ID,必须是唯一的,这里简化为使用_conn_id作为定时器IDint _sockfd; // 连接关联的文件描述符bool _enable_inactive_release; // 连接是否启动了非活跃连接销毁EventLoop *_loop; // 连接所属的EventLoopConnStatu _statu; // 连接状态Socket _socket; // 套接字操作管理Channel _channel; // 连接的事件管理Buffer _in_buffer; // 输入缓冲区 --- 存放从Socket中读取到的数据Buffer _out_buffer;// 输出缓冲区 --- 存放要发送给对端的数据Any _context; // 请求的接收处理上下文// 下面4个函数是服务器模块设置的,是由组件使用者设置给服务器模块的ConnectedCallback _connected_callback; // 连接建立时的回调函数MessageCallback _message_callback; // 接收到消息时的回调函数ClosedCallback _closed_callback; // 连接关闭时的回调函数AnyEventCallback _event_callback; // 任意事件触发时的回调函数// 服务器模块会将所有的连接管理起来,下面这个函数是当连接断开时,将自己从管理中删除CloseCallback _server_closed_callback;
}
Connection模块代码设计
class Connection;
typedef enum
{DISCONNECTED, // 连接关闭状态CONNECTING, // 连接建立未完成状态CONNECTED, // 连接建立完成状态DISCONNECTING // 连接待关闭状态
}ConnStatu;
using PtrConnection = std::shared_ptr<Connection>;
class Connection : public std::enable_shared_from_this<Connection>
{using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;
public:Connection(EventLoop *loop, uint64_t conn_id, int sockfd):_conn_id(conn_id), _sockfd(sockfd),_enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd), _channel(loop, _sockfd){_channel.SetCloseCallback(std::bind(&Connection::HandlerClose, this));_channel.SetEventCallback(std::bind(&Connection::HandlerEvent, this));_channel.SetReadCallback(std::bind(&Connection::HandlerRead, this));_channel.SetWriteCallback(std::bind(&Connection::HandlerWrite, this));_channel.SetErrorCallback(std::bind(&Connection::HandlerError, this));}~Connection() { DBG_LOG("RELEASE CONNECTION: %p", this); }// 发送数据,将数据放到发送缓冲区,启动写事件监控void Send(const char *data, size_t len){// 外界传入的data可能是一个临时的空间,我们现在只是吧发送操作压入任务池,有可能没有立即被执行// 因此有可能执行的时候,data指向的空间已经被释放了Buffer buf;buf.WriteAndPush(data, len);_loop->RunInLoop(std::bind(&Connection::SendInLoop, this, buf));}// 提供给组件使用者的关闭连接接口,不会真的关闭连接,需要判断有没有数据待处理void Shutdown(){_loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this));}// 启动非活跃销毁void EnableInactiveRelease(int sec){_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));}// 取消非活跃销毁void CancelInactiveRelease(){_loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this));}// 协议切换 --- 重置上下文以及阶段性处理函数void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg,const ClosedCallback &closed, const AnyEventCallback &event){_loop->AssertInLoop();_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event));}// 获取连接对应的文件描述符int Fd() { return _sockfd; }// 获取连接IDint Id() { return _conn_id; }// 连接是否处于CONNECTED状态bool Connected() { return (_statu == CONNECTED); }// 设置上下文,连接建立完成时进行调用void SetContext(const Any &context) { _context = context; }// 获取上下文,返回的是指针Any *GetContext() { return &_context; }void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }void SetSrvClosedCallback(const ClosedCallback &cb) { _server_closed_callback = cb; }// 连接建立完成后,进行Channel回调设置,启动读事件监控,调用用户设置的连接建立完成的回调函数void Established(){_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));}
private:// 对连接的操作要放到EventLoop线程中// 连接建立完成之后,进行设置(启动读监控, 调用回调函数)void EstablishedInLoop(){// 1. 修改连接状态,并且当前连接状态必须是连接未完成状态assert(_statu == CONNECTING);_statu = CONNECTED;// 2. 启动读事件监控_channel.EnableRead();if(_connected_callback) _connected_callback(shared_from_this());}// 真正释放连接的接口void ReleaseInLoop(){// 1. 修改连接状态,将其置为DISCONNECTED_statu = DISCONNECTED;// 2. 移除连接的事件监控_channel.Remove();// 3. 关闭文件描述符_socket.Close();// 4. 如果当前定时器队列中还有定时销毁任务,则取消任务if(_loop->HasTimer(_conn_id)) CancelInactiveReleaseInLoop();// 5. 调用连接关闭回调函数,避免先移除服务器管理的连接信息导致Connection被释放,再去处理会出错if(_closed_callback) _closed_callback(shared_from_this());// 6. 移除服务器模块对连接的管理if(_server_closed_callback) _server_closed_callback(shared_from_this());}// 线程安全的发送数据接口,并不是真正的发送接口,只是将数据放到了发送缓冲区,启动了可写事件监控void SendInLoop(Buffer buf){if(_statu == DISCONNECTED) return ;_out_buffer.WriteBufferAndPush(buf);if(_channel.WriteAble() == false){_channel.EnableWrite();}}// 线程安全的关闭连接接口,并不是真正的关闭连接接口void ShutdownInLoop(){_statu = DISCONNECTING;// 如果输入缓冲区有数据,将数据处理了,这里可能会因为数据不完整,上层不处理,这里我们直接不管if(_in_buffer.ReadAbleSize() > 0){if(_message_callback) _message_callback(shared_from_this(), &_in_buffer);}// 如果输出缓冲区还有数据,清空,然后调用真正关闭连接的接口释放连接if(_out_buffer.ReadAbleSize() > 0){if(_channel.WriteAble() == false) _channel.EnableWrite();}if(_out_buffer.ReadAbleSize() == 0){ReleaseInLoop();}}// 线程安全的启动非活跃销毁void EnableInactiveReleaseInLoop(int sec){// 1. 将_enable_inactive_release设置为true_enable_inactive_release = true;// 2. 如果当前定时器的销毁任务已经存在,那就延迟执行if(_loop->HasTimer(_conn_id)){return _loop->TimerRefresh(_conn_id);}// 3. 如果不存在定时销毁任务,则新增_loop->TimerAdd(_conn_id, sec, std::bind(&Connection::ReleaseInLoop, this));}// 线程安全的取消非活跃销毁void CancelInactiveReleaseInLoop(){_enable_inactive_release = false;if(_loop->HasTimer(_conn_id)){_loop->TimerCancel(_conn_id);}}// 线程安全的协议切换void UpgradeInLoop(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg,const ClosedCallback &closed, const AnyEventCallback &event){_context = context;_connected_callback = conn;_message_callback = msg;_closed_callback = closed;_event_callback = event;}// 下面5个函数是给Channel的事件回调函数void HandlerRead(){// 1. 接收socket的数据,放到缓冲区char buf[65536];ssize_t ret = _socket.NonBlockRecv(buf, 65535);if(ret < 0){// 出错了,直接关闭连接return ShutdownInLoop();}_in_buffer.WriteAndPush(buf, ret);// 2. 调用message_callback进行业务处理if(_in_buffer.ReadAbleSize() > 0){return _message_callback(shared_from_this(), &_in_buffer);}}void HandlerWrite(){// _out_buffer中保存的数据就算要发送的数据ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());if(ret < 0){// 发送错误就关闭连接if(_in_buffer.ReadAbleSize() > 0){_message_callback(shared_from_this(), &_in_buffer);}return ReleaseInLoop(); // 释放连接}_out_buffer.MoveReadOffset(ret);// 如果输出缓冲区没有数据了,就关闭写事件监控if(_out_buffer.ReadAbleSize() == 0){_channel.DisableWrite();// 若此时连接是待关闭状态,并且现在输出缓冲区已经没有数据了,直接关闭连接if(_statu == DISCONNECTING){return ReleaseInLoop();}}}void HandlerClose(){// 一旦连接挂断,套接字就什么都干不了了,因此有数据待处理就处理一下,完毕直接关闭连接if(_in_buffer.ReadAbleSize() > 0){_message_callback(shared_from_this(), &_in_buffer);}return ReleaseInLoop();}void HandlerError(){return HandlerClose();}void HandlerEvent(){// 1. 刷新连接的活跃度,延迟定时销毁任务if(_enable_inactive_release == true) _loop->TimerRefresh(_conn_id);// 2. 调用组件使用者设置的任意事件回调函数if(_event_callback) _event_callback(shared_from_this());}
private:uint64_t _conn_id; // 连接的ID,便于进行管理和查找// uint64_t _timer_id; // 定时器ID,必须是唯一的,这里简化为使用_conn_id作为定时器IDint _sockfd; // 连接关联的文件描述符bool _enable_inactive_release; // 连接是否启动了非活跃连接销毁EventLoop *_loop; // 连接所属的EventLoopConnStatu _statu; // 连接状态Socket _socket; // 套接字操作管理Channel _channel; // 连接的事件管理Buffer _in_buffer; // 输入缓冲区 --- 存放从Socket中读取到的数据Buffer _out_buffer;// 输出缓冲区 --- 存放要发送给对端的数据Any _context; // 请求的接收处理上下文// 下面4个函数是服务器模块设置的,是由组件使用者设置给服务器模块的,再由服务器模块设置给ConnectionConnectedCallback _connected_callback; // 连接建立时的回调函数MessageCallback _message_callback; // 接收到消息时的回调函数ClosedCallback _closed_callback; // 连接关闭时的回调函数AnyEventCallback _event_callback; // 任意事件触发时的回调函数// 服务器模块会将所有的连接管理起来,下面这个函数是当连接断开时,将自己从管理中删除ClosedCallback _server_closed_callback;
};
这里说一下代码中几个需要注意的点:
- 连接内部要将自身作为一个shared_ptr传递,可以使用shared_from_this,为了使用shared_from_this,要让Connection类继承于一个模板类
- 在HandlerRead中,读取失败不会直接关闭连接,因为读取数据的时间可能比较长,我们需要先读取数据,再刷新连接的活跃度,而若是刷新连接活跃度时连接已经关闭,则会出错,所以我们在读取失败时不能直接关闭连接
- Upgrade是必须由EventLoop线程进行调用的,因为如果是其他线程调用,会将切换协议的任务放入任务池中,如果此时触发了事件,而协议切换的任务还没有被执行,此时就会以原来的协议进行处理,只有在EventLoop内部执行才能够立即执行任务。所以,我们给EventLoop增加一个成员函数,保证调用接口的是EventLoop所在线程。
// 保证调用接口的是EventLoop线程
void AssertInLoop()
{assert(_thread_id == std::this_thread::get_id());
}
我们来进行测试,服务端:
// 管理所有的连接
std::unordered_map<uint64_t, PtrConnection> _conns;
uint64_t conn_id = 0;
// 连接销毁时的回调函数
void ConnectionDestory(const PtrConnection &conn)
{_conns.erase(conn->Id());
}
// 连接建立时的回调函数
void OnConnected(const PtrConnection &conn)
{DBG_LOG("NEW CONNECTING: %p", conn.get());
}
// 接收到消息时的回调函数
void OnMessage(const PtrConnection &conn, Buffer *buf)
{DBG_LOG("%s", buf->ReadPosition());buf->MoveReadOffset(buf->ReadAbleSize());std::string str = "今天天气不错!!!";conn->Send(str.c_str(), str.size());
}// 对于监听套接字读事件触发的回调函数
void Acceptor(EventLoop *loop, Channel *lst_channel)
{// 获取新连接int fd = lst_channel->Fd();int newfd = accept(fd, nullptr, nullptr);if(newfd < 0) return ;// 为新连接封装Connectionconn_id ++;PtrConnection conn(new Connection(loop, conn_id, newfd));conn->SetMessageCallback(std::bind(OnMessage, std::placeholders::_1, std::placeholders::_2));conn->SetSrvClosedCallback(std::bind(ConnectionDestory, std::placeholders::_1));conn->SetConnectedCallback(std::bind(OnConnected, std::placeholders::_1));conn->EnableInactiveRelease(10); // 启动非活跃超时销毁conn->Established(); // 就绪初始化_conns.insert(std::make_pair(conn_id, conn));
}int main()
{EventLoop loop;Socket lst_sock;lst_sock.CreateServer(8080);// 为监听套接字创建一个ChannelChannel channel(&loop, lst_sock.Fd());channel.SetReadCallback(std::bind(Acceptor, &loop, &channel));// 启动可读事件监控channel.EnableRead();while(true){// 开始事件监控,获取事件就绪的连接,并进行处理loop.Start();}lst_sock.Close();return 0;
}
获取到一个新连接之后,不再使用Channel,而是使用Connection对新连接进行管理。当监听套接字触发了可读事件后,就获取新连接,将文件描述符封装成一个connection,并使用智能指针管理起来。设置Connection的业务处理函数、连接销毁回调函数、连接就绪后的会回调函数。
客户端:
int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");for(int i = 0;i < 5;i ++){std::string str = "今天天气怎么样?";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};cli_sock.Recv(buf, 1023);DBG_LOG("%s", buf);sleep(1);}while(1) sleep(1);return 0;
}
功能是正常的。
Acceptor模块
前面的Connection模块是对普通套接字的管理。Acceptor模块是对监听套接字进行管理的模块,是对Socket模块和Channel模块的整合。需要有以下功能:
- 创建一个监听套接字
- 启动读事件监控
- 事件触发后,获取新连接
- 调用新连接获取成功后的回调函数。
新连接获取成功后的回调函数的内容是为新连接创建Connection进行管理,这个函数由服务器模块进行设置。因为Acceptor模块只进行监听连接进行管理,获取到新连接的文件描述符之后,对于新连接的文件描述符要如何处理并不关心。对于新连接如何处理,是服务器模块来管理的。未来服务器模块实现一个对于新连接的文件描述符的处理函数,将这个函数设置给Acceptor模块。
class Acceptor
{using AcceptCallback = std::function<void(int)>;
public:// 不能将启动事件监控放到构造函数中,必须设置了回调函数之后,再启动事件监控// 否则可能造成启动事件监控后,立即有事件,处理的时候,回调函数还未设置Acceptor(EventLoop *loop, int port):_socket(CreateServer(port)),_loop(loop),_channel(loop, _socket.Fd()){_channel.SetReadCallback(std::bind(&Acceptor::HandlerRead, this));}void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; }// 开始监听,其实就是启动对监听套接字的读事件监控void Listen() { _channel.EnableRead(); }
private:// 监听套接字读事件触发时的回调函数void HandlerRead(){int newfd = _socket.Accept();if(newfd < 0) return ;if(_accept_callback) _accept_callback(newfd);}int CreateServer(int port){bool ret = _socket.CreateServer(port);assert(ret == true);return _socket.Fd();}
private:Socket _socket; // 用于创建监听套接字EventLoop *_loop; // 用于对监听套接字进行事件监控Channel _channel; // 用于对监听套接字进行事件管理AcceptCallback _accept_callback; // 接收到新连接时的回调函数
};
修改一下EventLoop的Start,让其在内部即可进行循环。
// 启动EventLoop
void Start()
{while(true){// 1. 事件监控std::vector<Channel*> actives;_poller.Poll(&actives);// 2. 事件处理for(auto &channel : actives){channel->HandleEvent();} // 3. 执行任务RunAllTask();}
}
测试一下Acceptor模块,服务端:
// 管理所有的连接
std::unordered_map<uint64_t, PtrConnection> _conns;
uint64_t conn_id = 0;
EventLoop loop;// 连接销毁时的回调函数
void ConnectionDestory(const PtrConnection &conn)
{_conns.erase(conn->Id());
}
// 连接建立时的回调函数
void OnConnected(const PtrConnection &conn)
{DBG_LOG("NEW CONNECTING: %p", conn.get());
}
// 接收到消息时的回调函数
void OnMessage(const PtrConnection &conn, Buffer *buf)
{DBG_LOG("%s", buf->ReadPosition());buf->MoveReadOffset(buf->ReadAbleSize());std::string str = "今天天气不错!!!";conn->Send(str.c_str(), str.size());
}// 对于监听套接字读事件触发的回调函数
void NewConnection(int fd)
{// 为新连接封装Connectionconn_id ++;PtrConnection conn(new Connection(&loop, conn_id, fd));conn->SetMessageCallback(std::bind(OnMessage, std::placeholders::_1, std::placeholders::_2));conn->SetSrvClosedCallback(std::bind(ConnectionDestory, std::placeholders::_1));conn->SetConnectedCallback(std::bind(OnConnected, std::placeholders::_1));conn->EnableInactiveRelease(10); // 启动非活跃超时销毁conn->Established(); // 就绪初始化_conns.insert(std::make_pair(conn_id, conn));
}int main()
{Acceptor acceptor(&loop, 8080);acceptor.SetAcceptCallback(std::bind(NewConnection, std::placeholders::_1));acceptor.Listen();loop.Start();return 0;
}
未来在服务器模块会有一个EventLoop线程池,Acceptor是在主EventLoop线程中的,这里我们将EventLoop定义为全局变量。
客户端:
int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");for(int i = 0;i < 5;i ++){std::string str = "今天天气怎么样?";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};cli_sock.Recv(buf, 1023);DBG_LOG("%s", buf);sleep(1);}while(1) sleep(1);return 0;
}
可以正常进行通信。
LoopThread模块
我们前面说过,一个线程一个EventLoop,而前面吗实现的是EventLoop的功能模块,并没有将EventLoop与线程进行关联。LoopThread模块就是将EventLoop与线程整合起来的模块。
EventLoop模块与线程是一一对应的,EventLoop在构造函数中会初始化_thread_id,后边运行一个操作时,判断执行操作的线程是否是EventLoop模块对应的线程,就算将线程ID与EventLoopm对象中的_thread_id进行比较,相同表示在同一个线程中,不同则表示不在同一个线程中。也就是说,在实例化EventLoop对象时,必须在线程内部,否则在构造函数中初始化线程ID时就会出错。
此时可能会想到,可以先创建多个EventLoop,然后再创建多个线程,将这些线程的ID设置进EventLoop中,这样是不行的,因为在构造EventLoop对象,到设置线程ID的过程中是不可控的。所以,我们必须先创建出线程,然后在线程的入口函数中去实例化EventLoop对象。
所以,我们构造一个新的模块LoopThread,这个模块将EventLoop与thread整合到一起。
思想:
- 创建线程
- 在线程中实例化EventLoop对象
功能:可以向外部返回实例化的EventLoop
当创建了线程,还没来得及实例化EventLoop对象时,就获取EventLoop对象是会出错的,所以使用锁和条件变量来进行同步。
class LoopThread
{
public:// 创建线程,设定线程的入口函数LoopThread():_loop(nullptr),_thread(std::thread(&LoopThread::ThreadEntry, this)){}// 返回当前线程关联的EventLoop对象指针EventLoop *GetLoop(){EventLoop *loop = nullptr;{std::unique_lock<std::mutex> lock(_mutex);// 条件变量,只要_loop为空时就一直等待_cond.wait(lock, [&](){ return _loop != nullptr; });loop = _loop;}return loop;}
private:// 线程入口函数// 实例化EventLoop对象,并运行EventLoop模块的功能void ThreadEntry(){// 线程的生命周期是与ThreadEntry的生命周期相同的// 所以,这里不使用new,而是定义类对象,让_loop指向该对象,是为了让_loop的生命周期随进程EventLoop loop;{std::unique_lock<std::mutex> lock(_mutex);_loop = &loop;_cond.notify_all();}loop.Start();}
private:std::mutex _mutex; // 互斥锁std::condition_variable _cond; // 条件变量EventLoop *_loop; // EventLoop指针变量,这个对象需要在线程内实例化std::thread _thread; // EventLoop对应的线程
};
现在来测试一下,服务端:
// 管理所有的连接
std::unordered_map<uint64_t, PtrConnection> _conns;
uint64_t conn_id = 0;
EventLoop base_loop; // 主线程的EventLoop
// 现在有两个从属Reactor线程,主Reactor获取新连接后,就可以选择性地将新连接分配给不同的从属Reactor线程
std::vector<LoopThread> threads(2);
int next_loop = 0; // 下一次将新连接分配给哪一个从属Reactor线程// 连接销毁时的回调函数
void ConnectionDestory(const PtrConnection &conn)
{_conns.erase(conn->Id());
}
// 连接建立时的回调函数
void OnConnected(const PtrConnection &conn)
{DBG_LOG("NEW CONNECTING: %p", conn.get());
}
// 接收到消息时的回调函数
void OnMessage(const PtrConnection &conn, Buffer *buf)
{DBG_LOG("%s", buf->ReadPosition());buf->MoveReadOffset(buf->ReadAbleSize());std::string str = "今天天气不错!!!";conn->Send(str.c_str(), str.size());
}// 对于监听套接字读事件触发的回调函数
void NewConnection(int fd)
{// 为新连接封装Connectionconn_id ++;next_loop = (next_loop + 1) % 2;PtrConnection conn(new Connection(threads[next_loop].GetLoop(), conn_id, fd));conn->SetMessageCallback(std::bind(OnMessage, std::placeholders::_1, std::placeholders::_2));conn->SetSrvClosedCallback(std::bind(ConnectionDestory, std::placeholders::_1));conn->SetConnectedCallback(std::bind(OnConnected, std::placeholders::_1));conn->EnableInactiveRelease(10); // 启动非活跃超时销毁conn->Established(); // 就绪初始化_conns.insert(std::make_pair(conn_id, conn));DBG_LOG("NEW CONNECTION!!!");
}int main()
{// 主线程只负责监听新连接Acceptor acceptor(&base_loop, 8080);acceptor.SetAcceptCallback(std::bind(NewConnection, std::placeholders::_1));acceptor.Listen();base_loop.Start();return 0;
}
客户端:
int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");for(int i = 0;i < 5;i ++){std::string str = "今天天气怎么样?";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};cli_sock.Recv(buf, 1023);DBG_LOG("%s", buf);sleep(1);}while(1) sleep(1);return 0;
}
可以看到,线程是不同的。
LoopThreadPool模块
LoopThreadPool模块是针对LoopThread设计一个线程池,对所有的LoopThread进行管理及分配。
功能:
- 线程数量可配置(0个或多个)。当线程数量为多个时,主Reactor线程只负责获取新连接,从属Reactor线程负责对新连接进行事件监控及处理。当线程数量为0个时,只有一个Reactor线程,这个线程既要负责获取新连接,也要负责对新连接进行事件监控以处理。
- 对所有的线程进行管理,其实就是管理0个或多个LoopThread对象。
- 提供线程分配功能。当主线程获取到一个新连接,需要将新连接挂到从属线程上进行事件监控及处理。
class LoopThreadPool
{
public:LoopThreadPool(EventLoop *baseloop):_thread_count(0),_next_id(0),_baseloop(baseloop){}// 设置线程数量void SetThreadCount(int count) { _thread_count = count; }// 创建所有的从属Reactor线程void Create(){if(_thread_count > 0){_threads.resize(_thread_count);_loops.resize(_thread_count);for(int i = 0;i < _thread_count;i ++){_threads[i] = new LoopThread();_loops[i] = _threads[i]->GetLoop();}}}// 获取下一个新连接分配给哪一个从属Reactor线程EventLoop* NextLoop(){if(_thread_count == 0){return _baseloop;}_next_id = (_next_id + 1) % _thread_count;return _loops[_next_id];}
private:int _thread_count; // 从属Reactor线程的数量int _next_id; // 下一次将新连接分配给哪一个从属Reactor线程EventLoop *_baseloop; // 主Reactor线程中的EventLoopstd::vector<LoopThread*> _threads; // 从属Reactor线程的数量std::vector<EventLoop*> _loops; // 从属Reactor线程中的EventLoop
};
测试一下,服务端:
// 管理所有的连接
std::unordered_map<uint64_t, PtrConnection> _conns;
uint64_t conn_id = 0;
EventLoop base_loop; // 主线程的EventLoop
LoopThreadPool *loop_pool;// 连接销毁时的回调函数
void ConnectionDestory(const PtrConnection &conn)
{_conns.erase(conn->Id());
}
// 连接建立时的回调函数
void OnConnected(const PtrConnection &conn)
{DBG_LOG("NEW CONNECTING: %p", conn.get());
}
// 接收到消息时的回调函数
void OnMessage(const PtrConnection &conn, Buffer *buf)
{DBG_LOG("%s", buf->ReadPosition());buf->MoveReadOffset(buf->ReadAbleSize());std::string str = "今天天气不错!!!";conn->Send(str.c_str(), str.size());
}// 对于监听套接字读事件触发的回调函数
void NewConnection(int fd)
{// 为新连接封装Connectionconn_id ++;PtrConnection conn(new Connection(loop_pool->NextLoop(), conn_id, fd));conn->SetMessageCallback(std::bind(OnMessage, std::placeholders::_1, std::placeholders::_2));conn->SetSrvClosedCallback(std::bind(ConnectionDestory, std::placeholders::_1));conn->SetConnectedCallback(std::bind(OnConnected, std::placeholders::_1));conn->EnableInactiveRelease(10); // 启动非活跃超时销毁conn->Established(); // 就绪初始化_conns.insert(std::make_pair(conn_id, conn));DBG_LOG("NEW CONNECTION!!!");
}int main()
{// 主线程只负责监听新连接loop_pool = new LoopThreadPool(&base_loop);loop_pool->SetThreadCount(2);loop_pool->Create();Acceptor acceptor(&base_loop, 8080);acceptor.SetAcceptCallback(std::bind(NewConnection, std::placeholders::_1));acceptor.Listen();base_loop.Start();return 0;
}
客户端:
int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");for(int i = 0;i < 5;i ++){std::string str = "今天天气怎么样?";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};cli_sock.Recv(buf, 1023);DBG_LOG("%s", buf);sleep(1);}while(1) sleep(1);return 0;
}
TcpServer模块
TcpServer模块是对所有模块的整合,通过TcpServer模块实例化对象,可以非常简单地完成一个服务器的搭建。
管理:
- Acceptor对象,创建一个监听套接字。
- EventLoop对象,实现对监听套接字的事件监控。
- std::unordered_map<uint64_t, PtrConnection> _conns,实现对所有连接的管理。
- LoopThreadPool对象,对所有连接进行事件监控及处理。
功能:
- 设置从属Reactor线程数量
- 启动服务器
- 设置各种回调函数,用户设置给TcpServer,TcpServer再设置给获取的新连接,所以Connection有什么回调函数,这里就有什么回调函数。
- 是否启动非活跃连接超时销毁功能
- 添加定时任务功能
流程:
- 在TcpServer中实例化一个Acceptor对象,以及一个EventLoop对象baseloop
- 将Acceptor对象挂到baseloop上,并启动读事件监控
- 一旦Acceptor对象触发了可读事件,则调用读事件回调函数获取新连接
- 在这个回调函数中,就会将新连接封装成一个Connection,并设置各种回调函数,启动Connection的非活跃连接的超时销毁规则,将Connection挂到LoopThreadPool中的从属Reactor线程中进行事件监控及处理
- 当Connection对应的连接就绪了可读事件,则这时候执行读事件回调函数,读取数据,读取完毕后调用TcpServer设置的回调函数进行业务处理。
class TcpServer
{using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;using Functor = std::function<void()>;
public:TcpServer(int port):_port(port),_next_id(0),_enable_inactive_release(false),_acceptor(&_baseloop, port),_pool(&_baseloop){_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));_acceptor.Listen();}void SetThreadCount(int count) { _pool.SetThreadCount(count); }void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }// 启动非活跃连接超时销毁功能void EnableInactiveRelease(int timeout){_timeout = timeout;_enable_inactive_release = true;}// 添加定时任务,指定秒数之后执行一个任务void RunAfter(const Functor &task, int delay){_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));}// 启动服务器void Start(){_pool.Create();_baseloop.Start();}
private:// 为新连接构造一个Connection进行管理void NewConnection(int fd){_next_id ++;PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));conn->SetMessageCallback(_message_callback);conn->SetClosedCallback(_connected_callback);conn->SetConnectedCallback(_connected_callback);conn->SetAnyEventCallback(_event_callback);conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));if(_enable_inactive_release) conn->EnableInactiveRelease(_timeout);conn->Established();_conns.insert(std::make_pair(_next_id, conn));}// 从_conns中移除一个Connectionvoid RemoveConnection(const PtrConnection &conn){_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));}// 线程安全地添加一个定时任务void RunAfterInLoop(const Functor &task, int delay){_next_id ++;_baseloop.TimerAdd(_next_id, delay, task);}// 线程安全地从_conns中移除一个Connectionvoid RemoveConnectionInLoop(const PtrConnection &conn){int id = conn->Id();auto it = _conns.find(id);if(it != _conns.end()){_conns.erase(it);}}
private:int _port; // 服务器监听的端口号uint64_t _next_id; // 自动增长的ID,连接ID和定时任务ID都使用它int _timeout; // 非活跃连接的定义时间bool _enable_inactive_release; // 是否启动了非活跃连接超时销毁的判断标志EventLoop _baseloop; // 主Reactor线程中的EventLoopAcceptor _acceptor; // 监听套接字LoopThreadPool _pool; // 从属Reactor// 保存服务器的所有连接对应的shared_ptr对象std::unordered_map<uint64_t, PtrConnection> _conns; ConnectedCallback _connected_callback; // 连接建立时的回调函数MessageCallback _message_callback; // 接收到消息时的回调函数ClosedCallback _closed_callback; // 连接关闭时的回调函数AnyEventCallback _event_callback; // 任意事件触发时的回调函数
};
现在,要搭建一个服务器只需要实例化一个TcpServer对象,并设置好回调函数,然后Start即可。
我们来进行测试,服务端:
void OnConnected(const PtrConnection &conn)
{DBG_LOG("NEW CONNECTION: %p", conn.get());
}void OnClosed(const PtrConnection &conn)
{DBG_LOG("CLOSE CONNECTION: %p", conn.get());
}void OnMessage(const PtrConnection &conn, Buffer *buf)
{DBG_LOG("%s", buf->ReadPosition());buf->MoveReadOffset(buf->ReadAbleSize());std::string str = "今天天气很好!!!";conn->Send(str.c_str(), str.size());conn->Shutdown();
}int main()
{TcpServer server(8080);server.SetThreadCount(2);server.EnableInactiveRelease(10);server.SetClosedCallback(OnClosed);server.SetConnectedCallback(OnConnected);server.SetMessageCallback(OnMessage);server.Start();return 0;
}
客户端:
int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");for(int i = 0;i < 5;i ++){std::string str = "今天天气怎么样?";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};cli_sock.Recv(buf, 1023);DBG_LOG("%s", buf);sleep(1);}while(1) sleep(1);return 0;
}
基于TcpServer实现回显服务器
class EchoServer
{
public:EchoServer(int port):_server(port){_server.SetThreadCount(2);_server.EnableInactiveRelease(10);_server.SetClosedCallback(std::bind(&EchoServer::OnClosed, this, std::placeholders::_1));_server.SetConnectedCallback(std::bind(&EchoServer::OnConnected, this, std::placeholders::_1));_server.SetMessageCallback(std::bind(&EchoServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));}void Start() { _server.Start(); }
private:void OnConnected(const PtrConnection &conn){DBG_LOG("NEW CONNECTION: %p", conn.get());}void OnClosed(const PtrConnection &conn){DBG_LOG("CLOSE CONNECTION: %p", conn.get());}void OnMessage(const PtrConnection &conn, Buffer *buf){conn->Send(buf->ReadPosition(), buf->ReadAbleSize());buf->MoveReadOffset(buf->ReadAbleSize());conn->Shutdown();}
private:TcpServer _server;
};int main()
{EchoServer server(8080);server.Start();return 0;
}
我们对EchoServer进行性能测试。我们使用WebBench进行性能测试。WebBench是在程序当中创建指定数量的进程,是测试HTTP服务器的工具。-c选项是指定客户端数量,其实就算进程的数量;-t选项是指定测试的持续时间,单位是秒。
可以看到,在500个客户端的情况下,每分钟可以处理91526个请求,每秒可以处理100676个字节。当然,服务器和测试程序在一台云服务器上,会存在竞争资源的情况,肯定是不规范的,这里只是做一个简单的测试。
服务器模块的总结
服务器模块是一个主从Reactor模型,主Reactor线程负责对监听套接字进行监控和处理,从属Reactor线程负责对普通套接字进行监控和处理。对套接字进行监控依赖的是EventLoop模块,一个线程一个EventLoop,对于监听套接字,需要监控它的读事件,当读事件触发,说明有新连接到来了,获取新连接的文件描述符,封装成一个Connection,设置好回调方法,选择一个从属Reactor线程,将Connection交给这个线程的EventLoop进行监控。对于普通套接字,同样是监控它的读事件,当读事件触发,说明客户端发送了消息,将消息从连接对应的文件描述符的输入缓冲区读取到Connection的输入缓冲区中,调用消息处理回调函数,将处理完成的数据放到Connection的输出缓冲区中,启动写事件监控,当写事件监控触发了,就将Connection的输出缓冲区的数据拷贝到连接对应的文件描述符的输出缓冲区中。
Buffer模块:Buffer模块是一个缓冲区模块,对于客户端发送过来的数据,可以暂时进行保存,后序再处理,对于处理完成后的数据,要以响应的形式发送给客户端,也可以暂时先保存在缓冲区中,等待写事件触发后,再进行发送。
Socket模块:
Channel模块:Channel模块是对连接对应的文件描述符的管理。表示对一个连接要监控它的什么事件,一般是监控它的读事件或者写事件。并且要设置当这个文件描述符的某个事件触发时,要如何处理,所以提供了一些设置回调函数的接口。有可读事件触发回调函数、可写事件触发回调函数、任意事件触发回调函数、错误事件触发回调函数、连接挂断事件触发回调函数。
Connection模块:是对连接进行管理的模块。由Buffer模块、Socket模块、Channel模块组合而成。Connection中有一个输入缓冲区和一个输出缓冲区,用于保存客户端发送过来的数据和要发送给客户端的数据。对于套接字的操作由Socket模块完成。对于这个连接要监控什么事件,以及事件触发后要如何处理,由Socket模块完成。所以,Channel模块的5个回调函数是由Connection模块设置的。同样,Connection也有自己的回调函数,连接建立时的回调函数、连接关闭时的回调函数、接收到消息时的回调函数、任意事件触发时的回调函数。Channel和Connection都有回调函数,这些回调函数之间有什么关系呢?Channel的回调函数是由Connection设置的,而Connection的回调函数是由TcpServer设置的,TcpServer的回调函数是由用户设置的。所以,Channel的回调函数是对底层数据的处理,而Connection的回调函数是业务的处理。例如,Channel的可读事件回调函数,是将客户端发送过来的数据放到了Connection的输入缓冲区中,然后调用Connection的接收到消息时的回调函数进行业务处理。
所以,组件使用者在使用TcpServer搭建服务器时,设置的回调函数就是业务处理回调函数,这里面的回调函数最重要的就算接收到消息时的回调函数,因为对于消息要如何处理,只有使用者才知道。