仿muduo库实现并发服务器
1.实现目标
仿muduo库One Thread One Loop式主从Reactor模型实现高并发服务器:
通过实现高并发服务器的组件,可以快速实现一个高并发服务器的搭建,并且,通过组内不同应用层协议的支持,可以快速完成高性能服务器的搭建,由于要实现的是一个服务器,所以并不涉及实际的业务代码。
Http服务器:
Http协议是位于应用层的一个协议,全称为一个超文本传输协议,是一个简单的基于请求响应的协议,运行在Tcp之上的协议,它是不安全的协议。
Reactor模式:
Reactor 模式,是指通过一个或多个输入同时传递给服务器进行请求处理时的事件驱动处理模式。
服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫Dispatcher 模式,也可以叫做发布者模式。
分类:
单Reacto模式:
单I/O多路复用+业务处理,优点为所有的操作都在一个线程内执行,不存在线程安全问题,没有死锁问题,代码容易编写,缺点为无法利用cup多核有效资源,资源浪费严重,容易达到性能瓶颈。
单Reactor多线程:
单I/O多路转接+线程池+业务处理,优点为可以充分发挥cpu多核资源,缺点为多个线程之间共享数据比较麻烦,单个Reactor承担了所有的事件的监听和响应,在单线程之下,高并发成为了性能瓶颈。
多Reactor多线程模式:
多个I/O多路转接+线程池+业务处理,充分利用CPU多核资源,主从Reactor各司其职,在主Reactor中处理新连接请求事件,有新连接到来则分发到子Reactor中监控,在子Reactor中进行客户端通信监控,有事件触发,则接收数据分发给Worker线程池。
可以充分利用CPU资源,将相应交给子Reactor来实现。
目标定位
One Thread One Loop主从Reactor模型高并发服务器
咱们实现的是主从Reactor模型服务器,也就是主Reactor线程仅仅监控监听描述符,获取新建连接,保证获取新连接的高效性,提高服务器的并发性能,主Reactor获取到新链接之后,发布到子Reactor进行通信事件监控,子Reactor进行对行的监控各自的文件描述符的增删查改以及上层的业务处理。
One Thread One Loop的思想就是把所有的操作都放到一个线程中进行,一个线程对应一个事件处理的循环。
当前实现中,因为并不确定组件使用者的使用意向,因此并不提供业务层工作线程池的实现,只实现主从Reactor,而Worker工作线程池,可由组件库的使用者的需要自行决定是否使用和实现。
功能模块划分:
基于以上的理解,我们要实现的是一个带有协议支持的Reactor模型高性能服务器,因此将整个项目的实现划分为两个大的模块:
• SERVER模块:实现Reactor模型的TCP服务器;
• 协议模块:对当前的Reactor模型服务器提供应用层协议支持。
SERVER模块:
SERVER模块:SERVER模块就是将所有的连接以及线程进行管理,在合适的时候做合适的事情,完成服务器的实现。具体的管理分为三个模块:
监听连接管理:对监听链接进行通信管理。
通信连接管理:对正在通信的链接进行管理。
超时连接管理:对已经超时的链接进行管理。
基于以上思想,又可以划分为多个模块:
Buffer 模块:
Boffer模块,是一个缓冲区模块,用于通信过程中接收和发送信息的管理。
Socket 模块:
Socket模块,主要对创建Socket做一个封装,实现网络通信的各项功能。
Channel模块:
Channel模块,主要对文件描述符需要进行的I/O事件管理模块,实现对描述符可读,可写,错误...事件的管理操作,以及Poller模块对描述符进行IO事件监控就绪后,根据不同的事件,回调不同的处理函数功能。
Connection 模块:
Connection模块是对Buffer模块,Socket模块,Channel模块的一个整体封装,实现了对一个通信套接字的整体的管理,每一个进行数据通信的套接字(也就是accept获取到的新连接)都会使用Connection进行管理。
Connection模块内部包含有三个由组件使用者传入的回调函数:连接建立完成回调,事件回调,新数据回调,关闭回调。
- Connection模块内部包含有两个组件使用者提供的接口:数据发送接口,连接关闭接口
- Connection模块内部包含有两个用户态缓冲区:用户态接收缓冲区,用户态发送缓冲区
- Connection模块内部包含有一个Socket对象:完成描述符面向系统的IO操作
- Connection模块内部包含有一个Channel对象:完成描述符IO事件就绪的处理
具体处理流程如下:
1. 实现向Channel提供可读,可写,错误等不同事件的IO事件回调函数,然后将Channel和对应的描述符添加到Poller事件监控中。
2. 当描述符在Poller模块中就绪了IO可读事件,则调用描述符对应Channel中保存的读事件处理函数,进行数据读取,将socket接收缓冲区全部读取到Connection管理的用户态接收缓冲区中。然后调用由组件使用者传入的新数据到来回调函数进行处理。
3. 组件使用者进行数据的业务处理完毕后,通过Connection向使用者提供的数据发送接口,将数据写入Connection的发送缓冲区中。
4. 启动描述符在Poll模块中的IO写事件监控,就绪后,调用Channel中保存的写事件处理函数,将发送缓冲区中的数据通过Socket进行面向系统的实际数据发送。Accept 模块:
Acceptor模块是对Socket模块,Channel模块的一个整体封装,实现了对一个监听套接字的整体的管理。
- Acceptor模块内部包含有一个Socket对象:实现监听套接字的操作。
- Acceptor模块内部包含有一个Channel对象:实现监听套接字IO事件就绪的处理。
具体流程:
1.实现向Channel提供可读事件的IO事件处理回调函数,函数的功能其实也就是获取新连接
2. 为新连接构建一个Connection对象出来。TimerQueue模块:
TimerQueue模块是实现固定时间定时任务的模块,可以理解就是要给定时任务管理器,向定时任务管理器中添加一个任务,任务将在固定时间后被执行,同时也可以通过刷新定时任务来延迟任务的执行。
这个模块主要是对Connection对象的生命周期管理,对非活跃连接进行超时后的释放功能。TimerQueue模块内部包含有一个timerfd:linux系统提供的定时器。
TimerQueue模块内部包含有一个Channel对象:实现对timerfd的IO时间就绪回调处理。Poller模块:
Poller模块是对epoll进行封装的一个模块,主要实现epoll的IO事件添加,修改,移除,获取活跃连接功能。
EverlLoop 模块:
EventLoop模块可以理解就是我们上边所说的Reactor模块,它是对Poller模块,TimerQueue模块,Socket模块的一个整体封装,进行所有描述符的事件监控。
EventLoop模块必然是一个对象对应一个线程的模块,线程内部的目的就是运行EventLoop的启动函数。
EventLoop模块为了保证整个服务器的线程安全问题,因此要求使用者对于Connection的所有操作一定要在其对应的EventLoop线程内完成,不能在其他线程中进行(比如组件使用者使用Connection发送数据,以及关闭连接这种操作)。
- EventLoop模块保证自己内部所监控的所有描述符,都要是活跃连接,非活跃连接就要及时释放避免资源浪费。
- EventLoop模块内部包含有一个eventfd:eventfd其实就是linux内核提供的一个事件fd,专门用于事件通知。
- EventLoop模块内部包含有一个Poller对象:用于进行描述符的IO事件监控。
- EventLoop模块内部包含有一个TimerQueue对象:用于进行定时任务的管理。
- EventLoop模块内部包含有一个PendingTask队列:组件使用者将对Connection进行的所有操作,都加入到任务队列中,由EventLoop模块进行管理,并在EventLoop对应的线程中进行执行。
- 每一个Connection对象都会绑定到一个EventLoop上,这样能保证对这个连接的所有操作都是在一个线程中完成的。
具体流程:
- 通过Poller模块对当前模块管理内的所有描述符进行IO事件监控,有描述符事件就绪后,通过描述符对应的Channel进行事件处理。
- 所有就绪的描述符IO事件处理完毕后,对任务队列中的所有操作顺序进行执行。
- 由于epoll的事件监控,有可能会因为没有事件到来而持续阻塞,导致任务队列中的任务不能及时得到执行,因此创建了eventfd,添加到Poller的事件监控中,用于实现每次向任务队列添加任务的时候,通过向eventfd写入数据来唤醒epoll的阻塞。
TcpServer模块:
- 这个模块是一个整体Tcp服务器模块的封装,内部封装了Acceptor模块EventLoopThrea-dpool模块。
- TcpServer中包含有一个EventLoop对象:以备在超轻量使用场景中不需要EventLoop线程池,只需要在主线程中完成所有操作的情况。
- TcpServer模块内部包含有一个EventLoopThreadPool对象:其实就是EventLoop线程池,也就是子Reactor线程池。
- TcpServer模块内部包含有一个Acceptor对象:一个TcpServer服务器,必然对应有一个监听套接字,能够完成获取客户端新连接,并处理的任务。
- TcpServer模块内部包含有一个std::shared_ptr<Connection>的hash表:保存了所有的新建连接对应的Connection,注意,所有的Connection使用shared_ptr进行管理,这样能够保证在hash表中删除了Connection信息后,在shared_ptr计数器为0的情况下完成对Connection资源的释放操作。
具体流程:
1. 在实例化TcpServer对象过程中,完成BaseLoop的设置,Acceptor对象的实例化,以及EventLoop线程池的实例化,以及std::shared_ptr<Connection>的hash表的实例化。
2. 为Acceptor对象设置回调函数:获取到新连接后,为新连接构建Connection对象,设置
Connection的各项回调,并使用shared_ptr进行管理,并添加到hash表中进行管理,并为
Connection选择一个EventLoop线程,为Connection添加一个定时销毁任务,为Connection添加事件监控,
3. 启动BaseLoop。
SERVER模式图:
Http协议模式:
Http协议模式:HTTP协议模块用于对高并发服务器模块进行协议支持,基于提供的协议支持能够更方便的完成指定协议服务器的搭建。而HTTP协议支持模块的实现,可以细分为以下几个模块。
Util模块:
这个模块是一个工具模块,主要提供HTTP协议模块所用到的一些工具函数,比如url编解码,文件读写....等。
HttpRequest模块:
这个模块是HTTP请求数据模块,用于保存HTTP请求数据被解析后的各项请求元素信息。HttpResponse模块:
这个模块是HTTP响应数据模块,用于业务处理后设置并保存HTTP响应数据的的各项元素信息,最终会被按照HTTP协议响应格式组织成为响应信息发送给客户端。
HttpContext模块:
这个模块是一个HTTP请求接收的上下文模块,主要是为了防止在一次接收的数据中,不是一个完整的HTTP请求,则解析过程并未完成,无法进行完整的请求处理,需要在下次接收到新数据后继续根据上下文进行解析,最终得到一个HttpRequest请求信息对象,因此在请求数据的接收以及解析部分需要一个上下文来进行控制接收和处理节奏。HttpServer模块:
这个模块是最终给组件使用者提供的HTTP服务器模块了,用于以简单的接口实现HTTP服务器的搭建。
HttpServer模块内部包含有一个TcpServer对象:TcpServer对象实现服务器的搭建
HttpServer模块内部包含有两个提供给TcpServer对象的接口:连接建立成功设置上下文接口,数据处理接口。
HttpServer模块内部包含有一个hash-map表存储请求与处理函数的映射表:组件使用者向
HttpServer设置哪些请求应该使用哪些函数进行处理,等TcpServer收到对应的请求就会使用对应的函数进行处理。
2.代码实现
2.1 SERVER服务器模块实现:
1.Buffer 模块实现:
总缓冲区大小是一定的,开始的时候readbuff等于writebuff,为0,当写入数据的时候,写大小为数剧最后一个位置的后一个位置开始,对依然从数据开头开始读,读完之后,我们不用删除,当再次写入的时候,写入到上一个数据的最后一个位置的下一个位置,读的大小等于写的大小。写继续往后走。如果总体空间不够,直接扩容,需要实现的函数接口比较多。
具体代码实现:
#define BUFFER_DEFAULT_SIZE 1024
class Buffer
{
private:std::vector<char> _buffer;//总的缓冲区uint64_t _readbuff;//读缓冲区大小uint64_t _writerbuff;//写缓冲区大小
public:Buffer() : _readbuff(0), _writerbuff(0), _buffer(BUFFER_DEFAULT_SIZE) {}//初始化// 起始地址char* Begin(){return &(*_buffer.begin());}// 写的起始地址char* GetWritePos(){return Begin() + _writerbuff;}// 读的起始地址char* GetReadPos(){return Begin() + _readbuff;}// 获取缓冲区末尾空闲空间大小--写偏移之后的空闲空间, 总体空间大小减去写偏移uint64_t TailIndexSize(){return _buffer.size() - _writerbuff;}// 获取缓冲区起始空闲空间大小--读偏移之前的空闲空间uint64_t HeadIndexSize(){return _readbuff;}// 获取可读数据大小 = 写偏移 - 读偏移uint64_t ReadIndexSize(){return _writerbuff - _readbuff;}// 将读偏移向后移动size大小void MoveReadOff(uint64_t size){if (size == 0) return;// 向后移动的大小,必须小于可读数据大小assert(size <= ReadIndexSize());_readbuff += size;}// 将写偏移向后移动void MoveWriterOff(uint64_t size){if (size == 0) return;assert(size <= TailIndexSize());_writerbuff += size;}//确保有足够的空间进行写void EnsureWriteSpace(uint64_t size){// 末尾空间足够,直接返回if (TailIndexSize() >= size) return;if (TailIndexSize() + HeadIndexSize() >= size){uint64_t rsz = ReadIndexSize();//写空间的大小std::copy(GetReadPos(), GetReadPos() + rsz, Begin());//直接复制_readbuff = 0;// 将读偏移归0_writerbuff = rsz; // 将写位置置为可读数据大小, 因为当前的可读数据大小就是写偏移量}else{// 总体空间不够,则需要扩容,不移动数据,直接给写偏移之后扩容足够空间即可DBG_LOG("RESIZE %ld", _writerbuff + size);//扩容_buffer.resize(_writerbuff + size);}}// 写数据void Write(const void* data, uint64_t size){if (size == 0) return;EnsureWriteSpace(size);const char* wd = (const char* )data;std::copy(wd, wd + size, GetWritePos());}// 尾插void WritePush(const void* data, uint64_t size){Write(data, size);MoveWriterOff(size);}//写stringvoid WriteString(const std::string& data){return Write(data.c_str(), data.size());}//写数据并插入void WriteStringAndPush(const std::string &data){WriteString(data);MoveWriterOff(data.size());}void WriteBuffer(Buffer &data){return Write(data.GetReadPos(), data.ReadIndexSize());}void WriteBufferAndPush(Buffer &data){WriteBuffer(data);MoveWriterOff(data.ReadIndexSize());}// 读取数据void Read(void *buf, uint64_t size){// 要求要获取的数据大小必须小于可读数据大小assert(size <= ReadIndexSize());std::copy(GetReadPos(), GetReadPos() + size, (char *)buf);}//读并删除void ReadAndPop(void *buf, uint64_t size){Read(buf, size);MoveReadOff(size);}//读std::string ReadAsString(uint64_t size){// 要求要获取的数据大小必须小于可读数据大小assert(size <= ReadIndexSize());std::string str;str.resize(size);Read(&str[0], size);return str;}//读std::string ReadAsStringAndPop(uint64_t size){assert(size <= ReadIndexSize());std::string str = ReadAsString(size);MoveReadOff(size);return str;}//查找CRLFchar* FindCRLF(){char* res = (char *)memchr(GetReadPos(), '\n', ReadIndexSize());//返回找到的地址return res;}std::string GetLine(){char *pos = FindCRLF();if (pos == NULL) return "";// +1是为了把换行字符也取出来。return ReadAsString(pos - GetReadPos() + 1);}std::string GetLineAndPop(){std::string str = GetLine();MoveReadOff(str.size());return str;}// 清空缓冲区void Clear(){// 只需要将偏移量归0即可_readbuff = 0;_writerbuff = 0;}
};
2.Socket实现:
Socket套接字的实现比较简单,就是将网络函数接口进行封装,Socket的主要功能:创建套接字,绑定地址和端口号,监听是否有链接,服务端获取新链接,客户端发起连接请求,接收数据,发送数据,关闭套接字,端口号是否重用,设为非阻塞等。
代码接口:
#define MAX_LISTEN 1024
class Socket
{
private:int _socket;
public:Socket() : _socket(-1){}Socket(int socket) : _socket(socket){}~Socket() { Close(); }int Fd() {return _socket;}// 创建socketbool Create(){_socket = socket(AF_INET, SOCK_STREAM, 0);if (_socket < 0){ERR_LOG("CREATE SOCKET FAILED!!");return false;}return true;}// 绑定地址信息bool Bind(const std::string &ip, uint16_t &port){struct sockaddr_in local;memset(&local, 0, sizeof local);local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(local);if (bind(_socket, (struct sockaddr *)&local, len) < 0){ERR_LOG("BIND ADDRESS FAILED!");return false;}return true;}// 监听地址bool Listen(int backlog = MAX_LISTEN){if (listen(_socket, backlog) < 0){ERR_LOG("SOCKET LISTEN FAILED!");return false;}else return true;}// 请求连接bool Connect(const std::string& ip,const uint16_t& port){struct sockaddr_in local;memset(&local, 0, sizeof local);local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(local);if (connect(_socket, (struct sockaddr *)&local, len) < 0) // 错误{ERR_LOG("BIND ADDRESS FAILED!");return false;}return true;}// 获取新链接int Accept(){int ret = accept(_socket, NULL, NULL);if (ret < 0){ERR_LOG("BIND ADDRESS FAILED!");return false;}return ret;}// 读取数据bool Recv(char *buf, ssize_t size, int flag = 0){int n = recv(_socket, buf, size, flag);if (n <= 0){if (errno == EAGAIN || n == EINTR) return 0; // 这次没有数据return false;}return true;}// 非阻塞读取bool NoneBlockRecv(char *buf, ssize_t size){if (size == 0) return 0;Recv(buf, size, MSG_DONTWAIT); // MSG_DONTWAIT表示当前为非阻塞}// 发送bool Send(const char* buf, ssize_t size, int flag = 0){int n = send(_socket, buf, size, flag);if (n <= 0){if (errno == EAGAIN || n == EINTR) return 0; // 这次没有数据return false;}return true;}// 非阻塞发送bool NoneBlockSend(char *buf, size_t size){if (size == 0) return 0;Send(buf, size, MSG_DONTWAIT);}// 关闭void Close(){if (_socket >= 0){close(_socket);_socket = -1;}}// 创建服务段端bool CreateServer(uint16_t &port, const std::string &ip = "0.0.0.0", bool block_flag = false){if (Create() == false) return false;if (block_flag) NoneBlock(); // 设为非阻塞状态if (Bind(ip, port) == false) return false;if (Listen() == false) return false;ReuseAddress();return true;}// 创建客户端bool CreateClient(const uint16_t& port, const std::string& ip = "0.0.0.0"){// 1. 创建套接字,2.指向连接服务器if (Create() == false) return false;if (Connect(ip, port) == false) return false;return true;}// 开启地址端口重用void ReuseAddress(){int val = 1;setsockopt(_socket, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(int));val = 1;setsockopt(_socket, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(int));}// 非阻塞void NoneBlock(){int flag = fcntl(_socket, F_GETFL, 0);fcntl(_socket, F_SETFL, flag | O_NONBLOCK);}
};
3.Channel模块:
Chennal 模块是对一个文件描述符的封装,现对描述符可读,可写,错误…事件的管理操作,以及Poller模块对描述符进行IO事件监控就绪后,根据不同的事件,回调不同的处理函数功能。
功能:对一个描述符进行监控时间管理,意义,对于描述符的状态更容易维护,对之后的操作流程更加方便,功能设计:描述符是否可读,是否可写,监控可读,监控可写,解除可读,解除可写,解除所有文件描述符。
代码框架:
class Channel {private:int _fd;uint32_t events; // 当前需要监控的事件uint32_t revents; // 当前连接触发的事件using eventCallback = std::function<void()>>;eventCallback _read_callback; // 可读事件被触发的回调函数eventCallback _error_callback; // 可写事件被触发的回调函数eventCallback _close_callback; // 连接关闭事件被触发的回调函数eventCallback _event_callback; // 任意事件被触发的回调函数eventCallback _write_callback; // 可写事件被触发的回调函数public:Channel(int fd) : fd(_fd) {}int Fd() {return _fd ;}void setReadCallback(const eventCallback &cb);//设置读监控void setWriteCallback(const eventCallback &cb);//写监控void setErrorCallback(const eventCallback &cb);//错误监控void setCloseCallback(const eventCallback &cb);//关闭监控void setEventCallback(const eventCallback &cb);//任意时间监控bool readAble(); // 当前是否可读bool writeAble(); // 当前是否可写void enableRead(); // 启动读事件监控void enableWrite(); // 启动写事件监控void disableRead(); // 关闭读事件监控void disableWrite(); // 关闭写事件监控void disableAll(); // 关闭所有事件监控void Remove(); //移除监控void handleEvent(); // 事件处理,一旦触发了某个事件,就调用这个函数!};
4.Poller模块
描述符事件监控模块,对任意描述符进行时间监控,就是对epoll函数得封装,时期变得更有意义。
功能设计:添加事件,修改事件,移除事件,取消定时任务。
封装思想: 1. 必须拥有一个epoll的操作句柄
2. 拥有一个struct epoll_event 结构数组,监控保存所有的活跃事件!
3. 使用hash表管理描述符与描述符对应的事件管理Channnel对象!
逻辑流程:
1. 对描述符进行监控,通过Channnel才能知道描述符监控什么事件
2. 当描述符就绪了,通过描述符在hash表中找到对应的Channel(得到了Channel才知 道什么事件如何处理)当描述符就绪了,返回就绪描述符对应的Channel。
代码框架
框架:
class Poller {
private:int _epfd;struct epoll_event_evs[xxx];std::unordered_map<int,Channel*> mp;
private:// 1. 判断要更新事件的描述符是否存在// 2. 针对epoll直接操作(添加,修改,移除)
public:// 1. 添加或者更新描述符所监控的事件void Update(Channel* channel);// 2. 移除描述符所监控的事件void Remove(Channel* )// 3. 开始监控,获取就绪Channel
};
*/
/*
5.EventLoop模块
这个模块和线程是一一对应的!
监听了一个链接,如果这个连接一旦就绪,就要进行事件处理!
但是如果这个描述符,在多个线程中都触发了了事件,进行处理,就会存在线程安全问题!
因此我们需要将一个链接的事件监控, 以及连接事件处理,以及其他操作都放在同一个线程中!
如何保证一个连接的所有操作都在eventloop对应的线程中!
给eventLOOP模块中,都添加一个任务队列!
对连接的所有操作,都进行一次封装,将对连接的操作当作任务都添加到任务队列中!
功能:进行事件监控的模块,一个模块对应一个线程。意义,所有的线程都在这个模块中完成,功能设计,将一个任务添加到任务队列中,定时任务的添加,取消,刷新。
事件功能:
- 事件监控
- 使用Poller模块
- 有事件就绪则进行事件处理!
- 执行任务队列中的任务!
- 注意点:
- 因为有可能因为等待描述符IO事件就绪,执行流流程阻塞,这个时候任务对立中的任务得不到执行!
- 因此得有一个事件通知的东西,能够唤醒事件监控的阻塞!
- 当事件就绪,需要处理的时候,处理过程中,如果对连接要进行某些操作!
- 这些操作必须要在Eventloop对应的线程中进行,保证对连接的各项操作都是线程安全的。
- 如果执行的操作就在本线程中,不需要将操作压入队列了,可以直接执行!
- 如果执行的操作不在线程中,才需要加入任务池,等到事件处理完了之后就行执行任务!
设计框架:
class Eventloop {
private:std::thread::id _thread_id; // 线程IDint _event_fd // eventfd 唤醒IO事件监控有可能的阻塞!!!Poller _poller; // 进行所有描述符的事件监控using Functor = std::function<void()>;std::vector<Functor> _task; // 任务池std::mutex _mutex; // 实现任务池操作的线程安全!!!
public:void runAllTask();
public:Eventloop();void runInLoop(const Functor&cb); // 判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列。void queueInLoop(const Functor&cb); // 将操作压入任务池!bool isInLoop(); //永远判断当前线程是否是EventLoop所对应的线程void updateEvent(Channel* channel); // 添加/修改描述符的事件监控void removeEvent(Channel* channel); // 移除描述符的监控void Start(); // 任务监控完毕进行处理任务! 三步走:事件监控-》就绪事件处理-》执行任务};
6.Connection模块
Connection模块是对Buffer模块,Socket模块,Channel模块的⼀个整体封装,实现了对一个通信套接字的整体的管理,每一个进行数据通信的套接字(也就是accept获取到的新连接)都会使用
Connection进行管理。
• Connection模块内部包含有三个由组件使用者传入的回调函数:连接建立完成回调,事件回调,
新数据回调,关闭回调。
• Connection模块内部包含有两个组件使用者提供的接口:数据发送接口,连接关闭接口
• Connection模块内部包含有两个用户态缓冲区:用户态接收缓冲区,用户态发送缓冲区
• Connection模块内部包含有⼀个Socket对象:完成描述符面向系统的IO操作
• Connection模块内部包含有⼀个Channel对象:完成描述符IO事件就绪的处理
这是对通信套接字进行通信管理的一个模块,对一个连接的操作都是通过这个模块来实现的。这各模块本省并不是一个单独的功能模块,是一个连接管理的模块。
Connection模块,一个连接有任何的事件怎么处理都是有这个模块来进行处理的,因为组件的设计也不知道使用者要如何处理事件,因此只能是提供一些事件回调函数由使用者设置。
设计框架:
DISCONECTED -- 连接关闭状态; CONNECTING -- 连接建立成功-待处理状态
//CONNECTED -- 连接建立完成,各种设置已完成,可以通信的状态; DISCONNECTING -- 待关闭状态
type enum { // 连接关闭;// 连接建立成功 —— 待处理状态;// 连接设立完成,可以通信;// 待关闭状态;DISCONECTED,CONNECTING,CONNECTED,DISCONECTING} ConnStatu;
using PreConnection = std::shared_ptr<Connection>;
class Connection {private:uint64_t _conn_id; //连接的唯一ID,便于连接的管理和查找bool _enable_inactive_release; // 连接是否启动非活跃销毁的判断标志,默认为falseint _sockfd; // 连接关联的文件描述符ConnStatu _statu; // Socket _socket; // 套接字操作管理Channel _channel; // 连接二点事件管理Buffer _in_buffer; // 输入缓冲区 —— 存放从socket中读到的数据buffer _out_buffer; // 输出缓冲区 —— 发送给对端的是数据,等到描述符事件可写,再发!Any _context; // 请求的接受处理上下文/*这四个回调函数,是让服务器模块来设置的(其实服务器模块的处理回调也是组件使用者设置的)*//*换句话说,这几个回调都是组件使用者使用的*/using ConnectCallback = std::function<void(const PreConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;/*组件内的连接关闭回调--组件内设置的,因为服务器组件内会把所有的连接管理起来,一旦某个连接要关闭*//*就应该从管理的地方移除掉自己的信息*/ClosedCallback _server_closed_callback;private:// /*五个channel的事件回调函数*///描述符可读事件触发后调用的函数,接收socket数据放到接收缓冲区中,然后调用_message_callbackvoid HandleRead() {}void HandleRead() {}void HandleClose() {}void HandleError() {}//描述符触发任意事件: 1. 刷新连接的活跃度--延迟定时销毁任务; 2. 调用组件使用者的任意事件回调void HandleEvent() { }//连接获取之后,所处的状态下要进行各种设置(启动读监控,调用回调函数)void EstablishedInLoop() { }//这个接口才是实际的释放接口void ReleaseInLoop() {}//这个接口并不是实际的发送接口,而只是把数据放到了发送缓冲区,启动了可写事件监控void SendInLoop(Buffer &buf) {}//这个关闭操作并非实际的连接释放操作,需要判断还有没有数据待处理,待发送void ShutdownInLoop() {}//启动非活跃连接超时释放规则void EnableInactiveReleaseInLoop(int sec) {}void CancelInactiveReleaseInLoop() {}void UpgradeInLoop(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event) {_context = context;_connected_callback = conn;_message_callback = msg;_closed_callback = closed;_event_callback = event;}public:Connection(EventLoop* loop,uint64_t _conn_id,int sockfd) : _sockfd(sockfd),_enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd), _channel(loop, _sockfd) {_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));}~Connection() { DBG_LOG("RELEASE CONNECTION:%p", this); }//获取管理的文件描述符int Fd() {return _sockfd; }// 获取连接IDint Id() {return _conn_id; }// 是否处于CONNECTED状态bool Connected() { return (_statu == CONNECTED); }//设置上下文--连接建立完成时进行调用void SetContext(const Any &context) { _context = context; }//获取上下文,返回的是指针Any *GetContext() { return &_context; }void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; }void SetSrvClosedCallback(const ClosedCallback&cb) { _server_closed_callback = cb; }//连接建立就绪后,进行channel回调设置,启动读监控,调用_connected_callbackvoid Established() {}_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));// 发送数据,将数据放到发送缓冲区,启动写事件监控void Send(const char *data, size_t len) {}//提供给组件使用者的关闭接口--并不实际关闭,需要判断有没有数据待处理void Shutdown() {}void Release() {}//启动非活跃销毁,并定义多长时间无通信就是非活跃,添加定时任务void EnableInactiveRelease(int sec) { }//取消非活跃销毁void CancelInactiveRelease() {}void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event) {_loop->AssertInLoop();_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event));}};
7.Acception模块
对通信连接做整体管理的一个模块,对一个通信连接的模块都是通过这个模块来进行的。实现了对套接字整体的管理。
意义:当获取了一个新建连接的描述符后,需要为这个通信连接,封装一个connection对象,设置不同回调。
注意:因为Acceptor模块本身并不知道一个链接产生了某个事件该如何处理,因此获取一个通信连接后,Connection的封装,以及事件回调的设置都应该由服务器模块来进行!
设计框架:
/*
Acceptor 模块,对监听套接字进行管理!1. 创建一个监听套接字2. 启动读事件监控3. 事件触发后,获取新连接4. 调用新连接获取成功后的回调函数!4. 为新连接创建Connection进行管理(这一步不是Acceptor模块操作,应该是服务器模块)因为Acceptor模块只进行监听连接的管理,因此获取到新连接的描述符后,对于新连接描述符如何处理并不关心!对于新连接如何处理,应该是服务器模块关心管理!服务器模块,实现了一个对于新连接描述符处理的函数,将这个函数设置给Acceptor模块的回调函数!
*/
8.LoopThread模块
目标:将eventloop模块和线程整合起来!
eventloop 和 线程是一一对应的!
eventloop 模块实例化的对象,在构造的时候就会初始化! _thread_id;
而后面当运行一个操作的时候判断是否运行在eventloop所对应的线程中,就是将线程ID与EventLoop模块中的thread_id 进行一个比较,相同就表示在同一个线程,不同就表示当前运行线程并不是eventloop线程!
eventloop 模块在实例化对象的时候,必须在线程内部!
eventloop 实例化对象会设置自己的 thread_id;
如果我们先创建了多个 eventloop 对象,然后创建了多个线程,将各自的线程id,重新给 eventloop 进行设置!
存在问题:在构造 eventloop对象,到设置新的 thread_id 期间将是不可控的!
因此,必须先创建线程,然后在线程的入口函数中,去实例化 eventloop 对象!
构造一个新的模块:LoopThread。
class LoopThread {private:/*用于实现_loop获取的同步关系,避免线程创建了,但是_loop还没有实例化之前去获取_loop*/std::mutex _mutex; // 互斥锁std::condition_variable _cond; // 条件变量EventLoop *_loop; // EventLoop指针变量,这个对象需要在线程内实例化std::thread _thread; // EventLoop对应的线程private:/*实例化 EventLoop 对象,唤醒_cond上有可能阻塞的线程,并且开始运行EventLoop模块的功能*/void ThreadEntry() {EventLoop loop;{std::unique_lock<std::mutex> lock(_mutex);//加锁_loop = &loop;_cond.notify_all();}loop.Start();}public:/*创建线程,设定线程入口函数*/LoopThread():_loop(NULL), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}/*返回当前线程关联的EventLoop对象指针*/EventLoop *GetLoop() {EventLoop *loop = NULL;{std::unique_lock<std::mutex> lock(_mutex);//加锁_cond.wait(lock, [&](){ return _loop != NULL; });//loop为NULL就一直阻塞loop = _loop;}return loop;}
};
9.LoopThreadPool模块
线程数量可配置,调节线程数量,对线程数量进行管理。提供线程分配的功能。
1.线程数量可配置(0或多个)
注意事项:在服务器中,主从Reactor模型是 主线程只负责新连接获取,丛书线程负责新连接的事件监控以及处理!因此当前的线程池,有可能从属线程会数量为0,也就是实现单 Reactor服务器,一个线程及负责获取连接以及连接的处理!
2. 对所有的线程进行管理,其实也就是管理0个或多个LoopThread对象!
3. 提供线程分配的功能!
4.当主线程获取了一个链接,需要将新的线程挂到从属线程上进行事件监控以及管理!
5.假设0个从属线程,则直接分配给主线程的EventLoop,进行处理!
6.假设有多个丛书线程,则采用RR轮转!(将对应线程的EventLoop获取到,设置给对应的Connection)。
设计框架:
class LoopThreadPool {private:int _thread_count;int _next_idx;EventLoop *_baseloop;std::vector<LoopThread*> _threads;std::vector<EventLoop *> _loops;public:LoopThreadPool(EventLoop *baseloop):_thread_count(0), _next_idx(0), _baseloop(baseloop) {}void SetThreadCount(int count) { _thread_count = count; }void Create() {if (_thread_count > 0) {_threads.resize(_thread_count);_loops.resize(_thread_count);for (int i = 0; i < _thread_count; i++) {_threads[i] = new LoopThread();_loops[i] = _threads[i]->GetLoop();}}return ;}EventLoop *NextLoop() {if (_thread_count == 0) {return _baseloop;}_next_idx = (_next_idx + 1) % _thread_count;return _loops[_next_idx];}
};
10.TcpServer模块
对之前所有的模块进行整合,完成一个服务器的搭建。
实现思想:
1.管理
- Acceptor对象,创建一个监听套接字!
- EventLoop 对象,baseloop对象,实现对监听套接字的事件监控!
- std::vector conns,实现对新建连接的管理!
- EventLoopPool 对象,创建loop线程池,对新建连接进行事件监控和处理!
2.流程
- 在TcpServer中实例一个Acceptor对象,以及一个EventLoop 对象(baseloop)
- 将Acceptor 挂在baseloop 进行事件监控
- 一旦Acceptor 对象就绪了可读事件,则执行时间回调函数获取新建连接!
- 对新连接,创造一个 Connection 进行管理!
- 对新连接对应的 Connection 设置功能回调 (连接完成回调,消息回调,关闭回调,任意事件监控!)
- 启动Connettion 的非活跃链接的超时销毁功能.
- 将新连接对应的Connection 挂到 LoopThreadPool 中的丛书线程对应的Eventloop 中进行事件监控!
- 一旦Connection对应的链接就绪了可读事件,则这个时候执行读事件回调函数,读取数据,读取完毕后调用TcpServer设置的消息回调!
(三)功能设计
- 设置从属线程池数量!
- 启动服务器
- 设置各种回调函数!(连接建立完成,消息,关闭,任意) 用户设置给TcpServer TcpServer设置获取的新连接!
- 是否启动非活跃连接超时销毁功能
- 添加任务!
设计框架:
class TcpServer {private:uint64_t _next_id; //这是一个自动增长的连接ID,int _port;int _timeout; //这是非活跃连接的统计时间---多长时间无通信就是非活跃连接bool _enable_inactive_release;//是否启动了非活跃连接超时销毁的判断标志EventLoop _baseloop; //这是主线程的EventLoop对象,负责监听事件的处理Acceptor _acceptor; //这是监听套接字的管理对象LoopThreadPool _pool; //这是从属EventLoop线程池std::unordered_map<uint64_t, PtrConnection> _conns;//保存管理所有连接对应的shared_ptr对象using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;using Functor = std::function<void()>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;private:void RunAfterInLoop(const Functor &task, int delay) {}//为新连接构造一个Connection进行管理void NewConnection(int fd) {}void RemoveConnectionInLoop(const PtrConnection &conn) {}//从管理Connection的_conns中移除连接信息void RemoveConnection(const PtrConnection &conn) {}public:TcpServer(int port):_port(port), _next_id(0), _enable_inactive_release(false), _acceptor(&_baseloop, port),_pool(&_baseloop) {_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));}void SetThreadCount(int count) { }void SetConnectedCallback(const ConnectedCallback&cb) { }void SetMessageCallback(const MessageCallback&cb) { }void SetClosedCallback(const ClosedCallback&cb) {}void SetAnyEventCallback(const AnyEventCallback&cb) { }void EnableInactiveRelease(int timeout) { }//用于添加一个定时任务void RunAfter(const Functor &task, int delay) {}void Start() { }
};
2.2 Http模块实现
Http模块是处于应用层的一个简单的协议,包含五部分,每部分有不同的内容。
1.Util的处理
目的:目的:实现一些工具接口,读取文件内容,向文件写入内容,URL编码,URL解码,通过HTTP状态码获取描述信息,通过文件后缀名获取mime,判断一个文件是不是目录,判断一个文,是否是一个普通文件,HTTP资源路径有效性判断
框架设计:
class Util {public:// 字符串分割函数size_t Spilt();// 读取文件内容static bool ReadFile() {}// 向文件写入内容static bool WriteFile();// URL编码static bool UrlEncode();// URL解码static bool UrlDecode();// 通过HTTP状态码获取描述信息static std::string StatusDesc();// 根据文件后缀名获取文件MINEstatic std::string ExtMine();// 判断一个文件是不是目录static bool IsDirectory();//判断一个文件是否是一个普通文件static bool IsRegular();//HTTP资源路径有效性判断static bool VaildPath();
};
2.HttpRequest
目的:存储Http请求信息的,接收到一个数据,按照HTTP请求格式进行解析,得到各个关键要素放到Request中。
功能:
- HttpRequest模块
- 存储HTTP请求信息
- 接收到一个数据,按照HTTP请求格式进行解析,得到各个关键要素放到Request中
- HttpResponse模块
- 存储HTTP响应信息
- 进行业务处理的同时,让使用者向Response中填充响应要素,完毕后,将其组织成HTTP响应格式的数据,发给客户端。
class HttpRequest {public:std::string _method; //请求方法std::string _path; //资源路径std::string _version; //协议版本std::string _body; //请求正文std::smatch _matches; //资源路径的正则提取数据std::unordered_map<std::string, std::string> _headers; //头部字段std::unordered_map<std::string, std::string> _params; //查询字符串public:HttpRequest():_version("HTTP/1.1") {}void ReSet() {_method.clear();_path.clear();_version = "HTTP/1.1";_body.clear();std::smatch match;_matches.swap(match);_headers.clear();_params.clear();}//插入头部字段void SetHeader(const std::string &key, const std::string &val) {_headers.insert(std::make_pair(key, val));}//判断是否存在指定头部字段bool HasHeader(const std::string &key) const {auto it = _headers.find(key);if (it == _headers.end()) {return false;}return true;}//获取指定头部字段的值std::string GetHeader(const std::string &key) const {auto it = _headers.find(key);if (it == _headers.end()) {return "";}return it->second;}//插入查询字符串void SetParam(const std::string &key, const std::string &val) {_params.insert(std::make_pair(key, val));}//判断是否有某个指定的查询字符串bool HasParam(const std::string &key) const {auto it = _params.find(key);if (it == _params.end()) {return false;}return true;}//获取指定的查询字符串std::string GetParam(const std::string &key) const {auto it = _params.find(key);if (it == _params.end()) {return "";}return it->second;}//获取正文长度size_t ContentLength() const {// Content-Length: 1234\r\nbool ret = HasHeader("Content-Length");if (ret == false) {return 0;}std::string clen = GetHeader("Content-Length");return std::stol(clen);}//判断是否是短链接bool Close() const {// 没有Connection字段,或者有Connection但是值是close,则都是短链接,否则就是长连接if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive") {return false;}return true;}
};
3.Http