Reactor模式--单线程版本
1.reactor模式作用
reactor模式是事件驱动的设计模式,用于处理高并发的IO请求,用单线程或多线程处理大量的连接,再通过事件分发来处理对应的请求。
2.单线程版本Reactor特点
单线程的Reactor模型是一种简单高效的事件驱动模型,适用于业务逻辑简单,处理速度快的场景。缺点是,如果业务复杂就会进入阻塞状态,要解决这个问题就需要变为多线程的模式,把业务分发给线程处理,主线程可以继续接收新连接。
3.ET和LT
1. 水平触发(LT)
生活中的例子:餐厅服务员
假设你在一家餐厅吃饭,你叫了服务员来点菜。服务员过来后,你开始点菜。服务员会一直站在你旁边,直到你点完所有的菜。即使你点了一道菜后停下来思考一下,服务员也会耐心等待,直到你点完所有想点的菜。
**类比到 `epoll` 的水平触发模式(LT):**
* **文件描述符可读** :就像服务员一直站在你旁边,只要文件描述符可读(有数据可读取),`epoll` 会持续通知应用程序。
* **持续通知** :即使你只读取了一部分数据,`epoll` 仍然会持续通知,直到所有数据都被读取完毕。
* **易于实现** :应用程序可以在事件处理函数中部分处理数据,而不用担心错过事件通知。2. 边缘触发(ET)
#### 生活中的例子:快递员送包裹
假设你在网上购物,快递员送包裹到你家。快递员只会在包裹到达时敲一次门,通知你包裹到了。如果你没有立即开门取包裹,快递员不会再次敲门。你需要自己去检查包裹是否还在门口,或者等待下一次包裹到达。
**类比到 `epoll` 的边缘触发模式(ET):**
* **文件描述符状态变化** :就像快递员只在包裹到达时敲一次门,`epoll` 只在文件描述符的状态发生变化时(如从不可读变为可读)通知应用程序一次。
* **单次通知** :即使文件描述符仍然可读(还有数据可读取),`epoll` 不会再次通知,直到文件描述符的状态再次发生变化(如从可读变为不可读,然后再变为可读)。
* **高效** :减少了事件通知的次数,提高了效率,但需要应用程序在事件处理函数中一次性处理所有数据,否则可能会错过后续的数据。对比
* **水平触发(LT)** :就像服务员一直站在你旁边,持续提供服务,直到你完成所有操作。`epoll` 会持续通知,直到文件描述符的状态不再满足条件。
* **边缘触发(ET)** :就像快递员只在包裹到达时敲一次门,不会再次通知。`epoll` 只在文件描述符的状态发生变化时通知一次,应用程序需要一次性处理所有数据。总结
通过这两个生活中的例子,我们可以更直观地理解 `epoll` 的两种触发模式:
* **水平触发(LT)** :适用于简单的应用场景,尤其是使用阻塞 I/O 时。由于 `epoll` 会持续通知,应用程序可以部分处理数据,而不用担心错过事件通知。
* **边缘触发(ET)** :适用于高并发场景,尤其是使用非阻塞 I/O 时。由于减少了事件通知的次数,边缘触发模式可以提高效率,但需要更复杂的事件处理逻辑。
4.代码实现
流程
R对象调用Loop函数,而里又会执行LoopOnce函数,这个函数会执行epoll_wait函数会等待就绪fd,而一开始只有一个监听套接字被epoll监控,所以当有新连接到来时就会触发dispathcer函数进行派发,而监听套接字是读事件的,所以就会执行Recver函数,这个函数在Listener里的方法,会为新连接创建普通套接字,如果没有问题的话,创建connection类型对象指针管理Channel类型对象,这个对象的创建需要普通套接字和client,然后为这个普通套接字设置关心事件,然后设置回调函数,并把这个套接字加入到reactor的map中,第二次循环进来因为没有就绪就退出,此时会回到loop函数循环中,此时普通套接字就绪了就会再次派发,这是执行的Recver函数就是Channel类型的方法,会调用recv函数读取套接字的请求,把读取到的都放在inbuffer里面,然后再把inbuffer信息给到handler函数处理,这里是计数器业务,所以会把计算结果返回,结果存储到outbuffer中,然后再调用Sender函数把结果发送到套接字中,完成后重新循环epoll_wait等待就绪。
Main.cc
智能指针cal管理Cal类型对象,这个对象表示的业务实现,接着构建协议对象,指针protocol管理Protocol类型对象,这个协议对象是业务层的,这里括号里面的是一个函数调用具体实现,接收到请求req就会返回一个Response类型对象,里面有业务处理后的结果信息。创建listener对象,这里的connection类是Listener类的父类,所以可以用指针指向这个类型对象,listener类主要是用来监听的,所以conn另外一个子类就是对应普通套接字的,用来进行IO交互。conn调用Registerhandler函数为了把回调函数实现,这个回调函数的功能是把接收的请求进行处理,根据协议规定进行解包,返回的就是请求解析的结果。创建reactor对象,把conn放到connection里面,reactor对象的成员有一个map可以存储这个类型对象,调用reactor类的loop方法开始接收连接。
#include <iostream>
#include <string>
#include "Reactor.hpp"
#include "Listener.hpp"
#include "Channel.hpp"
#include "Log.hpp"
#include "Common.hpp"
#include "Protocol.hpp"
#include "NetCal.hpp"static void Usage(std::string proc)
{std::cerr << "Usage: " << proc << " port" << std::endl;
}//./server port
int main(int argc, char *argv[])
{if (argc != 2){Usage(argv[0]);exit(USAGE_ERR);}LogModule::ConsoleLogStrategy();uint16_t port = std::stoi(argv[1]);// 构建一个业务模块std::shared_ptr<Cal> cal = std::make_shared<Cal>();// 构建协议对象std::shared_ptr<Protocol> protocol = std::make_shared<Protocol>([&cal](Request &req) -> Response{ return cal->Execute(req); });// 构建Listener对象std::shared_ptr<Connection> conn = std::make_shared<Listener>(port);conn->RegisterHandler([&protocol](std::string &inbuffer) -> std::string { // 这个匿名函数就是要被ChannelLOG(LogLevel::DEBUG) << "进入到匿名函数中...";std::string response_str;while (true){std::string package;if (!protocol->Decode(inbuffer, &package))break;// 我敢保证,我的packge一定是一个完整的请求,是字节流的response_str += protocol->Execute(package);}LOG(LogLevel::DEBUG) << "结束匿名函数中...: " << response_str;return response_str;});// 构建一个reactor模块std::unique_ptr<Reactor> R = std::make_unique<Reactor>();R->AddConnection(conn);R->Loop();return 0;
}
Connection.hpp
handler_t就是回调函数,处理请求的返回解析好的请求信息,因为是父类,所以很多函数都没有具体的实现方法,而是再子类实现,除了简单的函数,如提供事件类型,设置事件类型等。
#pragma once#include <iostream>
#include <string>
#include "InetAddr.hpp"// 封装fd,保证给每一个fd一套缓冲
class Reactor;
class Connection;using handler_t = std::function<std::string (std::string &)>;// 基类
class Connection
{
public:Connection():_events(0), _owner(nullptr){}void SetEvent(const uint32_t &events){_events = events;}uint32_t GetEvent(){return _events;}void SetOwner(Reactor *owner){_owner = owner;}Reactor *GetOwner(){return _owner;}virtual void Recver() = 0;virtual void Sender() = 0;virtual void Excepter() = 0;virtual int GetSockFd() = 0;void RegisterHandler(handler_t handler){_handler = handler;}~Connection(){}private:// 关心事件uint32_t _events;// 回指指针Reactor *_owner;public:handler_t _handler; // 基类中定义了一个回调函数
};
listener.hpp
私有成员是端口号和监听套接字,监听套接字只接收,所以实现了Recver函数,构建函数会创建TcpSocket对象,并调用对应的函数创建监听套接字,设置关心的事件类型,并把监听套接字设置为非阻塞模式,recver函数会while循环,为新来连接创建普通套接字,普通套接字创建成功,就满足else,用创建成功的套接字构建channel对象,并用指针管理,为这个套接re字设置关心事件类型,如果没有设置回调函数,就把回调函数设置,最后再把这个指针放到map里面存储,这里需要getowner函数来获取reactor对象的map,这个map是在Reactor类的,子类listener提供父类的方法获取到了Reactor类的私有成员。
注意:
.buildTcpservermethod函数中的listen函数,当第一次执行时listen函数是不会报错的,第二次就会报错,所以这个函数第一次创建是监听套接字,第二次就是普通套接字,同一个端口只能有一个监听套接字
EAGAIN服务于accept,read,write等函数,表示当前没有可用资源,或者没有连接可接收
EINTR表示系统调用被信号中断,如accept接收客服端的连接请求,按下ctrl+c就会触发信号中断
Json root是一个数据结果,再创建wirte对象就可以把这个数据结构序列化
调用find以及类似函数,如果没有找到就会返回npos
#pragma once#include <iostream>
#include <memory>
#include "Epoller.hpp"
#include "Socket.hpp"
#include "Common.hpp"
#include "Connection.hpp"
#include "Channel.hpp"using namespace SocketModule;// Listener 专门进行获取新连接
class Listener : public Connection
{
public:Listener(int port = defaultport):_port(port), _listensock(std::make_unique<TcpSocket>()){_listensock->BuildTcpSocketMethod(_port);SetEvent(EPOLLIN | EPOLLET); //ET todoSetNonBlock(_listensock->Fd());}~Listener(){}void Recver() override{//accept//LOG(LogLevel::DEBUG) << "进入Listener模块的Recver函数";InetAddr client;// 新连接就绪了,你能保证只有 一个连接到来吗? 一次把所有的连接全部获取上来// while, ET, sockfd设置为非阻塞!! ---- listensock本身设置为非阻塞while(true){int sockfd = _listensock->Accept(&client);if(sockfd == ACCEPT_ERR)break;else if(sockfd == ACCEPT_CONTINUE)continue;else if(sockfd == ACCEPT_DONE)break;else{// 我们获得的是一个合法的fd,普通的文件描述符std::shared_ptr<Connection> conn = std::make_shared<Channel>(sockfd, client);conn->SetEvent(EPOLLIN|EPOLLET);if(_handler != nullptr)conn->RegisterHandler(_handler);GetOwner()->AddConnection(conn);}}}void Sender() override{}void Excepter() override{}int GetSockFd() override{return _listensock->Fd();}
private:int _port;std::unique_ptr<Socket> _listensock;
};
Channnel.hpp
构造函数会把这个套接字设置为非阻塞状态,这里实现了Recver函数和Sender函数,之所以while循环接收是因为要保证接收的完整性,直到recv返回值为0表述连接断开,就会异常处理,而recv为负数表示错误,就需要判断是什么错误,第一种是没有数据可读,就退出,第二种信号中断,就需要继续读取,其它的都按异常处理。最后判断inbuffer里面是否空,不为空表示有数据就调用回调函数处理inbuffer里面的数据,把结果放到outbuffer里面,再判断outbuffer是否为空,不为空就表示有数据可发,调用Sender函数发送数据。Sender函数与Recver函数差不多,最后的if判断是否为空,不为空表示有数据,就需要把这个套接字的读事件关心,负责就设置为不关心,异常处理则是调用DelConnection函数进行断开连接。
#pragma once#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <memory>
#include <functional>
#include "Common.hpp"
#include "Connection.hpp"
#include "Log.hpp"
#include "InetAddr.hpp"using namespace LogModule;#define SIZE 1024void SetNonBlock(int fd)
{int fl = fcntl(fd, F_GETFL);if(fl < 0){return;}fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}// 普通sockfd的封装
class Channel : public Connection
{
public:Channel(int sockfd, const InetAddr &client) : _sockfd(sockfd), _client_addr(client){SetNonBlock(_sockfd);}// 问题1: 怎么保证我把本轮数据读取完毕? while 循环 --- 本层只解决IO问题 --- done// 问题2:即便是你把本轮数据读完,你怎么知道数据就有完整的报文,如果不完整呢?如果是多个报文呢?粘报问题?反序列化 --- 引入协议的void Recver() override{// 我们读到的是字符串char buffer[SIZE];while (true){buffer[0] = 0; // 清空字符串ssize_t n = recv(_sockfd, buffer, sizeof(buffer) - 1, 0); // 非阻塞读取的if (n > 0){buffer[n] = 0;_inbuffer += buffer; // 接受缓冲区中,入队列的过程}else if (n == 0){Excepter();return;}else{if (errno == EAGAIN || errno == EWOULDBLOCK){break;}else if (errno == EINTR){continue;}else{Excepter();return;}}}LOG(LogLevel::DEBUG) << "Channel: Inbuffer:\n"<< _inbuffer;if (!_inbuffer.empty())_outbuffer += _handler(_inbuffer); // 和protocol相关的匿名函数里面!if (!_outbuffer.empty()){Sender(); // 最佳实践//GetOwner()->EnableReadWrite(_sockfd, true, true);}}void Sender() override{while (true){ssize_t n = send(_sockfd, _outbuffer.c_str(), _outbuffer.size(), 0);if (n > 0){_outbuffer.erase(0, n);if (_outbuffer.empty())break;}else if (n == 0){break;}else{if (errno == EAGAIN || errno == EWOULDBLOCK)break;if (errno == EINTR)continue;else{Excepter();return;}}}// 1. 数据发送完毕// 2. 发送条件不具备if (!_outbuffer.empty()){// 开启对写事件的关心GetOwner()->EnableReadWrite(_sockfd, true, true);}else{GetOwner()->EnableReadWrite(_sockfd, true, false);}}void Excepter() override{// 所有的异常,都被我归一到了这个函数内部!!GetOwner()->DelConnection(_sockfd);}int GetSockFd() override{return _sockfd;}std::string &Inbuffer(){return _inbuffer;}void AppendOutBuffer(const std::string &out){_outbuffer += out;}~Channel(){}private:int _sockfd;std::string _inbuffer; // 充当缓冲区,vector<char>std::string _outbuffer;InetAddr _client_addr;// handler_t _handler;
};
Reactor.hpp
私有成员变量有Epoller类型对象,unordered_map类型对象以fd为键值,Connection类型value,以及epoll_event结构类型的数组。looponce函数就是调用epoll_wait函数去等待就绪fd。dispathcer函数事件派发,根据就绪fd个数用for循环遍历,取出每一个就绪的fd的关心事件,如果关心事件有EPOLLERR和EPOLLHUP表示出现异常,就让事件同时关心读和写,把问题都变为IO问题,如果事件关心EPOLLIN则为读事件关心,EPOLLOUT则为写,读事件则就调用Recver函数,这里的Recver函数看是什么,有两种情况,一个是Channel一个是Listener。而Sender函数则只有Channel有。Loop函数则是while循环调用LoopOnce函数一直等待fd就绪,并把就绪的fd派发出去。AddConnection函数是把connection类型对象加入到map中,先判断是否存在,不存在才加入,获取connection类型对象的关心事件和fd,如何调用addevent函数把fd和关心事件传过去,epoll就会关心这个fd,调用setowner函数回指到reactor,所以前面可以调用Getowner函数获取到reactor类的map,再把这个connection类型对象加入到map中。
#pragma once#include <iostream>
#include <memory>
#include <unordered_map>
#include "Epoller.hpp"
#include "Connection.hpp"
#include "Log.hpp"using namespace LogModule;// 反应堆
class Reactor
{static const int revs_num = 128;private:bool IsConnectionExistsHelper(int sockfd){auto iter = _connections.find(sockfd);if (iter == _connections.end())return false;elsereturn true;}bool IsConnectionExists(const std::shared_ptr<Connection> &conn){return IsConnectionExistsHelper(conn->GetSockFd());}bool IsConnectionExists(int sockfd){return IsConnectionExistsHelper(sockfd);}bool IsConnectionEmpty(){return _connections.empty();}int LoopOnce(int timeout){return _epoller_ptr->WaitEvents(_revs, revs_num, timeout);}void Dispatcher(int n) // 事件派发器{for (int i = 0; i < n; i++){int sockfd = _revs[i].data.fd; // 就绪的fduint32_t revents = _revs[i].events; // 就绪的事件// 1. 将所有的异常处理,统一转化成IO错误 2. 所有的IO异常,统一转换成为一个异常处理函数if (revents & EPOLLERR)revents |= (EPOLLIN | EPOLLOUT); // 1. 将所有的异常处理,统一转化成IO错误if (revents & EPOLLHUP)revents |= (EPOLLIN | EPOLLOUT); // 1. 将所有的异常处理,统一转化成IO错误if (revents & EPOLLIN){// 读事件就绪, 用不用区分是否异常?不用// 读事件就绪,还用不用区分是listenfd还是普通socketfd?不用if (IsConnectionExists(sockfd))_connections[sockfd]->Recver();}if (revents & EPOLLOUT){// 写事件就绪if (IsConnectionExists(sockfd))_connections[sockfd]->Sender();}}}public:Reactor(): _epoller_ptr(std::make_unique<Epoller>()),_isrunning(false){}void Loop(){if (IsConnectionEmpty())return;_isrunning = true;int timeout = -1;while (_isrunning){PrintConnection(); //debugint n = LoopOnce(timeout);Dispatcher(n);}_isrunning = false;}// 该接口要把所有的新连接添加到_connections,并且,写透到epoll内核中!!!!!void AddConnection(std::shared_ptr<Connection> &conn){// 0. 不要重复添加if (IsConnectionExists(conn)){LOG(LogLevel::WARNING) << "conn is exists: " << conn->GetSockFd();return;}// 1. conn对应的fd和他要关心的事件,写透到内核中!uint32_t events = conn->GetEvent();int sockfd = conn->GetSockFd();_epoller_ptr->AddEvent(sockfd, events);// 2. 设置当前conn的拥有者回指指针conn->SetOwner(this);// 3. 将具体的connection添加到_connections_connections[sockfd] = conn;}void EnableReadWrite(int sockfd, bool enableread, bool enablewrite){// 0. 不要重复添加if (!IsConnectionExists(sockfd)){LOG(LogLevel::WARNING) << "EnableReadWrite, conn is exists: " << sockfd;return;}// 1. 修改当前sockfd对应的connection关心的事件uint32_t new_event = (EPOLLET | (enableread ? EPOLLIN : 0) | (enablewrite ? EPOLLOUT:0));_connections[sockfd]->SetEvent(new_event);// 2. 写透到内核,调整sockfd对特定事件的关心_epoller_ptr->ModEvent(sockfd, new_event);}void DelConnection(int sockfd){//1. epoll移除的时候,sockfd必须是合法的_epoller_ptr->DelEvent(sockfd);//2. 从_connections移除自己_connections.erase(sockfd);//3. 关闭不要的sockfdclose(sockfd);LOG(LogLevel::INFO) << "client quit: " << sockfd;}void Stop(){_isrunning = false;}void PrintConnection(){std::cout << "当前Reactor正在进行管理的fd List:";for(auto &conn : _connections){std::cout << conn.second->GetSockFd() << " ";}std::cout << "\r\n";}~Reactor(){}private:// 1. epoll模型std::unique_ptr<Epoller> _epoller_ptr;// 2. 是否启动bool _isrunning;// 3. 管理所有的connection,本质是管理未来所有我获取到的fd// fd : Connectionstd::unordered_map<int, std::shared_ptr<Connection>> _connections;// 4. 就绪的所有事件struct epoll_event _revs[revs_num];
};
补充:
EPOLL_CTL_ADD:注册新的 fd 到 epfd 中;
•
EPOLL_CTL_MOD:修改已经注册的 fd 的监听事件;
•
EPOLL_CTL_DEL:从 epfd 中删除一个 fd;
EPOLLIN : 表示对应的文件描述符可以读 (包括对端 SOCKET 正常关闭);
•
EPOLLOUT : 表示对应的文件描述符可以写;
•
EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外
数据到来);
•
EPOLLERR : 表示对应的文件描述符发生错误;
•
EPOLLHUP : 表示对应的文件描述符被挂断;
•
EPOLLET : 将 EPOLL 设为边缘触发(Edge Triggered)模式, 这是相对于水平
触发(Level Triggered)来说的.
•
EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继
续监听这个 socket 的话, 需要再次把这个 socket 加入到 EPOLL 队列里.