【Linux】Reactor反应堆模式
目录
一、Reactor 模式
二、OneThreadOneLoop多进程方案
2.1 核心概念
2.2 OneThreadOneLoop多线程方案1
2.3 Eventfd事件驱动
2.4 Eventfd工作模式
2.5 OneThreadOneLoop多线程方案2
三、Reactor示例代码
一、Reactor 模式
Reactor 模式是一种事件驱动的编程模式,用于高效的处理并发I/O操作。它通过一个或多个事件循环(Event Loop)来监听和处理各种事件(如网络请求、定时器事件等),从而实现高效的并发处理,而无需为每一个连接创建一个线程或进程。
二、OneThreadOneLoop多进程方案
OneThreadOneLoop(OTOL)是一种设计模式,通常用于描述基于事件驱动编程(Event-Driven Programming)的架构,特别是在使用异步I/O(Asynchronous I/O)框架时。这种模式强调每个线程运行一个独立的事件循环(Event Loop),从而实现高效的并发处理。
2.1 核心概念
事件循环(Event Loop):
- 事件循环是异步编程的核心,负责监听事件(如网络请求、定时器事件等)并触发相应的回调函数。
- 它通常是一个单线程的执行模型,通过多路复用技术(如select、poll或epoll)高效的管理多个I/O操作。
单线程模型:
- 在One Thread One Loop模式中,每个线程运行一个独立的事件循环。
- 这种设计避免了多线程编程中的复杂同步问题,同时利用了现代系统高效的I/O操作多路复用机制。

- 只要把单Reactor写完,扩展多进程很简单。
- 每个进程管理的连接和fd,彼此不要重复。
- 每个Reactor,自己处理自己的sockfd的完整生命周期,不涉及任何IO穿插,乱序问题。
2.2 OneThreadOneLoop多线程方案1

注意:管道也能线程间通信,所以我们可以使用同样的方法进行设计。
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <thread>// 定义管道的读写端
int pipefd[2];// 工作线程函数
void work_handle() {char inbuffer[1024];while(true) {int n = read(pipefd[0], inbuffer, sizeof(inbuffer) - 1);if(n < 0) {perror("read");break;}else if(n == 0) {std::cout << "No more data to read. Exiting thread." << std::endl;break;} else {inbuffer[n] = 0;std::cout << "thread received: " << inbuffer << std::endl;}}
}int main()
{if(pipe(pipefd) == -1) {perror("pipe");return 1;}std::thread worker(work_handle);const char* message[] = {"Hello, I am from main thread.", "some message.", "Goodbye!"};for(const char* msg : message) {if(write(pipefd[1], msg, strlen(msg)) < 0) {perror("write");break;}std::cout << "main thread send: " << msg << std::endl;sleep(1);}worker.join();close(pipefd[0]);close(pipefd[1]);return 0;
}

另外:
- 多线程是可以共享fd的,如果我们把工作职责变一下(其实就是修改一下connection回调方式),让master线程,检测并直接accepter到新的连接fd列表,然后通过管道传递给每一个子进程,每个子进程拿到sockfd,直接添加到自己的Reactor中,进行IO处理就行,这样做会更简单。
- 这样,读取管道内容,按照4字节读取,自动序列和反序列化。
如果不想使用管道,可以使用其他做法。
2.3 Eventfd事件驱动
- 如果不想使用管道,可以选择下列方案进行驱动。
- 注意:基于事件驱动,就必须由多路转接,也就是必须基于fd文件描述符,这也就是为什么我们要用管道的原因。
NAME
eventfd - create a file descriptor for event notificationSYNOPSIS
#include <sys/eventfd.h>int eventfd(unsigned int initval, int flags);
RETURN VALUE
On success, eventfd() returns a new eventfd file descriptor. On error, -1 is returned and errno is set to indicate the error.
- eventfd 是一个轻量级的事件通知机制,基于文件描述符。
- 它可以与I/O多路复用机制(如epoll)结合使用。
- 内核维护一个64位计数器,write会增加计数器,read会减少计数器。
参数说明
- initval:初始计数器值,通常设置为0。
- flags:控制行为的标志位。常用选项:EFD_CLOEXEC:执行时关闭;EFD_NONBLOCK:非阻塞模式;EFD_SEMAPHORE:信号量模式。
进程间事件通知
#include <iostream>
#include <stdlib.h>
#include <unistd.h>
#include <sys/eventfd.h>int main()
{int efd = eventfd(0, EFD_CLOEXEC);if(efd == -1) {perror("eventfd");return 1;}pid_t pid = fork();if(pid == -1) {perror("fork");return 2;}if(pid == 0) {int value;read(efd, &value, sizeof(value)); // 向 eventfd 读取事件std::cout << "child process received event!" << std::endl;close(efd);}else {int value = 3;write(efd, &value, sizeof(value)); // 向 eventfd 写入事件std::cout << "parent process send event!" << std::endl;close(efd);}return 0;
}

线程间事件通知
#include <iostream>
#include <stdlib.h>
#include <unistd.h>
#include <sys/eventfd.h>
#include <pthread.h>void* work_handler(void* arg) {int* efd = (int*)arg;int value;read(*efd, &value, sizeof(value)); // 向 eventfd 读取事件std::cout << "child process received event!" << std::endl;return nullptr;
}int main()
{int efd = eventfd(0, EFD_CLOEXEC);if(efd == -1) {perror("eventfd");return 1;}pthread_t pid;pthread_create(&pid, nullptr, work_handler, &efd);int value = 3;write(efd, &value, sizeof(value)); // 向 eventfd 写入事件std::cout << "parent process send event!" << std::endl;pthread_join(pid, nullptr);close(efd);return 0;
}

特点:
- 低开销:eventfd 内部是一个 64 位计数器,内核维护成本低。
- 支持多路复用:可以与select、poll或epoll等I/O多路复用机制结合使用。
- 原子性:读写操作是原子的,适合高并发场景。
- 广播通知:可以用于多对多的事件通知,而不仅仅是点对点的通信。
- 高效性:相比传统管道,eventfd 避免了多次数据拷贝,并且内核开销更小。
注意事项:
- 仅用于事件通知:eventfd 不能传递具体的消息内容,仅用于通知事件的发生。
- 非阻塞模式:建议设置 EFD_NONBLOCK,避免阻塞操作。
- 信号量语义:如果需要信号量语义(每次读取计数器减1),需设置EFD_SEMAPHORE。
- 资源管理:使用完 eventfd 后,记得关闭文件描述符。
- 结合I/O多路复用:在高并发场景下,建议结合epoll使用。
2.4 Eventfd工作模式
- 普通模式:不设置 EFD_SEMAPHORE,读取的时候,计数器会清空。
- 设置 EFD_SEMAPHORE:信号量模式
#include <iostream>
#include <stdlib.h>
#include <unistd.h>
#include <sys/eventfd.h>int main()
{// 模式一:不设置 EFD_SEMAPHORE// int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);// 模式二:设置 EFD_SEMAPHOREint efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC | EFD_SEMAPHORE);if(efd == -1) {perror("eventfd");return 1;}uint64_t value;value = 1;write(efd, &value, sizeof(value));std::cout << "write 1 to eventfd" << std::endl;value = 2;write(efd, &value, sizeof(value));std::cout << "write 2 to eventfd" << std::endl;value = 0; // 防止干扰read(efd, &value, sizeof(value));std::cout << "Read value: " << value << std::endl;value = 0; // 防止干扰read(efd, &value, sizeof(value));std::cout << "Read value: " << value << std::endl;value = 0; // 防止干扰read(efd, &value, sizeof(value));std::cout << "Read value: " << value << std::endl;return 0;
}

2.5 OneThreadOneLoop多线程方案2

三、Reactor示例代码
接下来我们使用epoll和Reactor重写我们之前做的网络版计算器。
// Calculator.hpp#pragma once#include <iostream>
#include "Protocol.hpp"using namespace Protocol;class Calculator
{
public:Calculator(){}Response Excute(const Request &req) {Response resp;switch(req.GetOper()) {case '+':resp.SetResult(req.GetX() + req.GetY());break;case '-':resp.SetResult(req.GetX() - req.GetY());break;case '*':resp.SetResult(req.GetX() * req.GetY());break;case '/':if(req.GetY() == 0) resp.SetCode(1);else resp.SetResult(req.GetX() / req.GetY());break;case '%':if(req.GetY() == 0) resp.SetCode(2);else resp.SetResult(req.GetX() % req.GetY());break;default:resp.SetCode(3);break;}return resp;}
}cal;std::string RequestHandler(std::string &inbuffer) {std::string request_str;std::string result_str;while(Decode(inbuffer, &request_str)) {std::string resp_str;if(request_str.empty()) break;Request req;if(!req.Deserialize(request_str)) break;Response resp = cal.Excute(req);resp_str = resp.Serialize();result_str += Encode(resp_str);}return result_str;
}
// Common.hpp#pragma once#include <iostream>
#include <string>
#include <fcntl.h>
#include <unistd.h>#define Die(code) \do \{ \exit(code); \}while(0) \#define CONV(addr_ptr) ((struct sockaddr*)(addr_ptr))enum {Usage_Err = 1,Socket_Err,Bind_Err,Listen_Err,EPOLL_CREATE_ERR,EPOLL_CTL_ERR
};const static int gsockfd = -1;
const static int gbacklog = 6;bool ParseOneLine(std::string &str, std::string *out, const std::string &sep) {auto pos = str.find(sep);if(pos == std::string::npos) return false;*out = str.substr(0, pos);str.erase(0, pos);return true;
}// Connection: Keep-alive
bool SplitString(std::string &header, const std::string &sep, std::string *key, std::string *value) {auto pos = header.find(sep);if(pos == std::string::npos) return false;*key = header.substr(0, pos);*value = header.substr(pos + sep.size());return true;
}void SetNonBlock(int sockfd) {int fl = fcntl(sockfd, F_GETFL, 0);if(fl < 0) return;fcntl(sockfd, F_SETFL, fl | O_NONBLOCK);
}
// Connection.hpp#pragma once#include <iostream>
#include <string>
#include <functional>
#include <memory>
#include "InetAddr.hpp"class Reactor;class Connection
{
public:Connection():_sockfd(-1), _event(0){}void SetSockfd(int sockfd) { _sockfd = sockfd; }void SetPeerAddr(const InetAddr& peer_addr) { _peer_addr = peer_addr; }void AppendToIn(const std::string& in) { _inbuffer += in; }void AppendToOut(const std::string& out) { _outbuffer += out; }void SetOwner(Reactor* own) { _own = own; }void SetEvent(uint32_t event) { _event = event; }int GetSockfd() { return _sockfd; }std::string &Inbuffer() { return _inbuffer; }std::string &Outbuffer() { return _outbuffer; }void EraseOutbuffer(int n) { _outbuffer.erase(0, n); }void EraseInbuffer(int n) { _inbuffer.erase(0, n); }bool OutbufferIsEmpty() { return _outbuffer.empty(); }Reactor* GetOwner() { return _own; }uint32_t GetEvent() { return _event; }void Close() {if(_sockfd >= 0) close(_sockfd);}// 回调函数virtual void Recver() = 0;virtual void Sender() = 0;virtual void Excepter() = 0;~Connection(){}
protected:int _sockfd;std::string _inbuffer;std::string _outbuffer;InetAddr _peer_addr; // 指向哪个客户端Reactor* _own;uint32_t _event;uint64_t _timetamp;
};
// Epoller.hpp#pragma once#include <sys/epoll.h>
#include "Log.hpp"
#include "Common.hpp"using namespace LogModule;namespace EpollModule
{class Epoll{public:Epoll():_epfd(-1){}void Init() {_epfd = epoll_create(256);if(_epfd < 0) {LOG(LogLevel::ERROR) << "epoll create err";exit(EPOLL_CREATE_ERR);}LOG(LogLevel::DEBUG) << "epoll create success, epfd: " << _epfd;}int Wait(struct epoll_event revs[], int num, int timeout) {int n = epoll_wait(_epfd, revs, num, timeout);if(n < 0) LOG(LogLevel::ERROR) << "epoll wait error";return n;}void Ctrl(int sockfd, uint32_t event, int flag) {struct epoll_event ev;ev.events = event;ev.data.fd = sockfd;int n = epoll_ctl(_epfd, flag, sockfd, &ev);if(n < 0) {LOG(LogLevel::ERROR) << "epoll_ctl error";}}void Add(int sockfd, uint32_t event) {Ctrl(sockfd, event, EPOLL_CTL_ADD);}void Del(int sockfd) {int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, sockfd, nullptr);if(n < 0) LOG(LogLevel::ERROR) << "epoll_ctl error";}void Updata(int sockfd, uint32_t event) {Ctrl(sockfd, event, EPOLL_CTL_MOD);}~Epoll(){}private:int _epfd;};
}
// InetAddr.hpp#pragma once#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Common.hpp"class InetAddr
{
public:InetAddr(){}InetAddr(uint16_t port):_port(port) {_addr.sin_port = htons(port);_addr.sin_family = AF_INET;_addr.sin_addr.s_addr = INADDR_ANY;}InetAddr(const struct sockaddr_in& addr):_addr(addr){Portntoh();Ipntoh();}bool operator==(const InetAddr& net) {return _port == net._port && _ip == net._ip;}struct sockaddr_in* GetNetAddr() { return &_addr; }socklen_t NetAddrLen() { return sizeof(_addr); }uint16_t GetPort() { return _port; }std::string GetIp() { return _ip; }std::string Addr() { return _ip + ":" + std::to_string(_port); }void SetAddr(struct sockaddr_in &client) {_addr = client;Portntoh();Ipntoh();}~InetAddr(){}
private:void Portntoh() {_port = ntohs(_addr.sin_port);}void Ipntoh() {char buffer[64];inet_ntop(AF_INET, &_addr.sin_addr, buffer, sizeof(buffer));_ip = buffer;}
private:struct sockaddr_in _addr;uint16_t _port;std::string _ip;
};
// IOService.hpp#pragma once#include <iostream>
#include <functional>
#include <memory>
#include "Protocol.hpp"
#include "Log.hpp"
#include "Socket.hpp"
#include "InetAddr.hpp"
#include "Reactor.hpp"
#include "Connection.hpp"
#include "Epoller.hpp"
#include "Common.hpp"using namespace LogModule;
using func_t = std::function<std::string(std::string &)>;class IOService : public Connection
{static const int size = 4096;public:IOService(int sockfd) {SetNonBlock(sockfd);SetSockfd(sockfd);SetEvent(EPOLLIN | EPOLLET);}virtual void Recver() {while(true) {char buffer[size];ssize_t n = recv(GetSockfd(), buffer, sizeof(buffer) - 1, 0);if(n > 0) {buffer[n] = 0;AppendToIn(buffer);}else if(n == 0) {// 对端连接关闭Excepter();return;}else {if(errno == EAGAIN || errno == EWOULDBLOCK) {// 缓冲区写满,下次再来break;}else if(errno == EINTR) continue;else {Excepter();return;}}}// 走到这我们一定把本轮数据读完std::cout << "inbuffer: " << Inbuffer() << std::endl;// 我们能确保读到完整的报文吗?不能// 我们怎么知道读到完整的请求呢?协议std::string res;if(_func) res = _func(Inbuffer());AppendToOut(res);if(!OutbufferIsEmpty()) GetOwner()->EnableReadWrite(GetSockfd(), true, true);}virtual void Sender() {while(true) {ssize_t n = send(GetSockfd(), Outbuffer().c_str(), Outbuffer().size(), 0);if(n > 0) {// 发送成功EraseOutbuffer(n);}else if(n == 0) {break;}else {if(errno == EAGAIN || errno == EWOULDBLOCK) {// 缓冲区写满,下次再来break;}else if(errno == EINTR) continue;else {Excepter();return;}}}// 一种:outbuffer empty// 一种:缓冲区写满了 && outbuffer没有empty,写条件不满足,使得sockfd在epoll中的事件if(!OutbufferIsEmpty()) {// 修改对sockfd事件的关系 --- 开启对写事件的关心// 按需设置GetOwner()->EnableReadWrite(GetSockfd(), true, true);}else {GetOwner()->EnableReadWrite(GetSockfd(), true, false);}}virtual void Excepter() {// IO读取的时候,所有的异常处理,全部都会转化成为这个一个函数的调用// 出现异常我们应该怎么做// 打印日志,出错处理,关闭连接LOG(LogLevel::INFO) << "客户端连接可能结束,进行异常处理: " << GetSockfd();GetOwner()->DeleteConnection(GetSockfd());}void RegisterFunc(func_t func) {_func = func;}~IOService(){}
private:func_t _func;
};
// Listen.hpp#pragma once#include <iostream>
#include <memory>
#include "Epoller.hpp"
#include "InetAddr.hpp"
#include "Socket.hpp"
#include "Log.hpp"
#include "Connection.hpp"
#include "IOService.hpp"
#include "Reactor.hpp"
#include "Protocol.hpp"
#include "Calculator.hpp"using namespace SocketModule;
using namespace LogModule;// 连接管理器
class Listener : public Connection
{
public:Listener(uint32_t port):_listen_socket(std::make_unique<TcpSocket>()),_port(port) {_listen_socket->BuildListenMethod(_port);SetSockfd(_listen_socket->GetSockfd());SetEvent(EPOLLIN | EPOLLET);}virtual void Sender(){}virtual void Recver() {while(true) {InetAddr client;int aerrno = 0;auto cli_socket = _listen_socket->Accept(&client, &aerrno);if(cli_socket) {LOG(LogLevel::INFO) << "Accept success: " << cli_socket->GetSockfd();auto conn = std::make_shared<IOService>(cli_socket->GetSockfd());conn->RegisterFunc(RequestHandler);GetOwner()->InsertConnection(conn);}else {if (aerrno == EAGAIN || aerrno == EWOULDBLOCK){LOG(LogLevel::DEBUG) << "accetper all connection ... done";break;}else if (aerrno == EINTR){LOG(LogLevel::DEBUG) << "accetper intr by signal, continue";continue;}else{LOG(LogLevel::WARNING) << "accetper error ... Ignore";break;}}}}virtual void Excepter(){}int GetSockfd() { return _listen_socket->GetSockfd(); }~Listener() {_listen_socket->Close();}
private:std::unique_ptr<Socket> _listen_socket;uint32_t _port;
};
// Lock.hpp#pragma once#include <iostream>
#include <string>
#include <pthread.h>namespace LockModule
{// 对锁进行封装,可以独立使用class Mutex{public:Mutex(const Mutex&) = delete;const Mutex& operator=(const Mutex&) = delete;Mutex() {int n = pthread_mutex_init(&_mutex, nullptr);if(n != 0) std::cerr << "pthread_mutex_init" << std::endl;}void Lock() {int n = pthread_mutex_lock(&_mutex);if(n != 0) std::cerr << "pthread_mutex_lock" << std::endl;}void Unlock() {int n = pthread_mutex_unlock(&_mutex);if(n != 0) std::cerr << "pthread_mutex_unlock" << std::endl;}pthread_mutex_t* GetMutexPtr() {return &_mutex;}~Mutex() {int n = pthread_mutex_destroy(&_mutex);if(n != 0) std::cerr << "pthread_mutex_destroy" << std::endl;}private:pthread_mutex_t _mutex;};// 采用RAII风格,进行锁管理class LockGuard{public:LockGuard(Mutex& mutex): _mutex(mutex){_mutex.Lock();}~LockGuard() {_mutex.Unlock();}private:Mutex& _mutex;};
}
// Log.hpp#pragma once#include <iostream>
#include <string>
#include <fstream>
#include <ctime>
#include <memory>
#include <sstream>
#include <filesystem> // C++17
#include <unistd.h>
#include "Lock.hpp"namespace LogModule
{using namespace LockModule;// 默认路径和日志名称const std::string defaultpath = "./log/";const std::string defaultname = "log.txt";// 日志等级enum LogLevel{DEBUG,INFO,WARNING,ERROR,FATAL};// 日志等级转化成字符串std::string LogLevelToString(LogLevel level) {switch(level) {case LogLevel::DEBUG: return "DEBUG";case LogLevel::INFO: return "INFO";case LogLevel::WARNING: return "WARNING";case LogLevel::ERROR: return "ERROR";case LogLevel::FATAL: return "FATAL";default: return "UNKNOW";}}// 根据时间戳,获取时间信息std::string GetCurTime() {time_t tm = time(nullptr);struct tm curr;localtime_r(&tm, &curr);char timebuffer[64];snprintf(timebuffer, sizeof(timebuffer), "%4d-%02d-%02d %02d:%02d:%02d",curr.tm_year + 1900,curr.tm_mon,curr.tm_mday,curr.tm_hour,curr.tm_min,curr.tm_sec);return timebuffer;}// 策略模式,策略接口class LogStrategy{public:virtual ~LogStrategy() = default;virtual void SyncLog(const std::string& message) = 0; // 不同模式的核心是刷新方式不同};// 控制台策略,日志只向显示器打印,方便调试class ConsoleLogStrategy : public LogStrategy{public:void SyncLog(const std::string& message) {LockGuard guard(_mutex);std::cout << message << std::endl;}~ConsoleLogStrategy(){}private:Mutex _mutex; // 显示器也是临界资源,保证输出线程安全};// 文件日志class FileLogStrategy : public LogStrategy{public:// 构造函数,建立指定的目录结构和文件结构FileLogStrategy(const std::string filepath = defaultpath, const std::string filename = defaultname):_filepath(filepath), _filename(filename){LockGuard guard(_mutex);if(std::filesystem::exists(_filepath))return;try{std::filesystem::create_directory(_filepath);}catch(const std::filesystem::filesystem_error &e){std::cerr << e.what() << '\n';}}void SyncLog(const std::string& message) {LockGuard guard(_mutex);std::string log = _filepath + _filename;std::ofstream out(log.c_str(), std::ios::app); // 已追加的方式打开if(!out.is_open())return;out << message << "\n";out.close();}~FileLogStrategy(){}private:std::string _filepath;std::string _filename;Mutex _mutex;};// 具体的日志类class Logger{public:Logger() {UseConsoleLogStrategy(); // 默认使用控制台模式}void UseConsoleLogStrategy() {_strategy = std::make_unique<ConsoleLogStrategy>();}void UseFileLogStrategy() {_strategy = std::make_unique<FileLogStrategy>();}// 内部类,实现RAII风格的日志格式化和刷新// 这个LogMessage,表示一条完整的日志对象class LogMessage{public:LogMessage(LogLevel level, const std::string filename, int line, Logger& log):_level(level), _filename(filename), _line(line), _log(log), _curr_time(GetCurTime()), _pid(getpid()){std::stringstream ss;ss << "[" << _curr_time << "]"<< "[" << LogLevelToString(_level) << "]"<< "[" << _pid << "]"<< "[" << _filename << "]"<< "[" << _line << "]" << " - ";_log_info = ss.str();}// 重载<<, 支持C++风格的日志格式,使用模板,支持任意类型template<typename T>LogMessage& operator<<(const T& info) {std::stringstream ss;ss << info;_log_info += ss.str();return *this; // 返回当前LogMessage对象,方便再次调用<<}~LogMessage() {if(_log._strategy)_log._strategy->SyncLog(_log_info);}private:std::string _curr_time; // 当前时间LogLevel _level; // 日志等级pid_t _pid; // 进程idstd::string _filename; // 运行文件名int _line; // 行号Logger& _log; // 引用外部类,方便使用策略刷新std::string _log_info; // 一条完整的日志信息};// 故意拷贝,形成LogMessage临时对象,后续再被<<时,会被持续调用// 直到完成输入,才会自动析构,至此完成了日志的刷新// 同时,形成的临时对象内包含独立的日志数据// 之后采用宏替换,支持文件名和行号的获取LogMessage operator()(LogLevel level, const std::string filename, int line) {return LogMessage(level, filename, line, *this);}private:std::unique_ptr<LogStrategy> _strategy;};// 定义全局的Logger对象Logger logger;// 使用宏定义,可以进行代码插入,方便随时获取文件名和行号#define LOG(type) logger(type, __FILE__, __LINE__)// 提供选择使用何种日志策略的方法#define ENABLE_CONSOLE_LOG_STRATEGY() logger.UseConsoleLogStrategy()#define ENABLE_FILE_LOG_STRATEGY() logger.UseFileLogStrategy()
}
// Main.cc#include <iostream>
#include <string>
#include "Log.hpp"
#include "Listener.hpp"
#include "Connection.hpp"
#include "Reactor.hpp"using namespace LogModule;int main(int argc, char *argv[])
{if (argc != 2){std::cout << "Usage: " << argv[0] << " port" << std::endl;return 1;}uint16_t local_port = std::stoi(argv[1]);Reactor reactor;auto conn = std::make_shared<Listener>(local_port);reactor.InsertConnection(conn);reactor.Loop();return 0;
}
// Protocol.hpp#pragma once#include <iostream>
#include <string>
#include <memory>
#include <jsoncpp/json/json.h>namespace Protocol
{const std::string ProtSep = " ";const std::string LineBreakSep = "\r\n";std::string Encode(const std::string &message) {std::string len = std::to_string(message.size());std::string package = len + LineBreakSep + message + LineBreakSep;return package;}// "len\nx op y\n" : \n不属于报⽂的⼀部分,约定// 我⽆法保证package就是⼀个独⽴的完整的报⽂// "l// "len// "len\r\n// "len\r\nx// "len\r\nx op// "len\r\nx op y// "len\r\nx op y\r\n"// "len\r\nx op y\r\n""len// "len\r\nx op y\r\n""len\n// "len\r\nx op// "len\r\nx op y\r\n""len\nx op y\r\n"// "len\r\nresult code\r\n""len\nresult code\r\n"bool Decode(std::string& package, std::string *message) {// 除了解包,我还想要保证报文的完整性,需要正确处理具有“边界”的报文auto pos = package.find(LineBreakSep);if(pos == std::string::npos) return false;std::string slen = package.substr(0, pos);int messagelen = std::stoi(slen);int totallen = slen.size() + messagelen + 2 * LineBreakSep.size();if(package.size() < totallen) return false;*message = package.substr(pos + LineBreakSep.size(), messagelen);package.erase(0, totallen);return true;}class Request{public:Request(){}Request(int x, int y, char op):_data_x(x), _data_y(y), _oper(op){}std::string Serialize() {Json::Value root;root["data_x"] = _data_x;root["data_y"] = _data_y;root["oper"] = _oper;Json::FastWriter writer;return writer.write(root);// // _data_x _oper _data_y// std::string message = std::to_string(_data_x) + ProtSep;// message += _oper;// message += ProtSep + std::to_string(_data_y);// return message;}bool Deserialize(const std::string &message) {Json::Value root;Json::Reader reader;bool res = reader.parse(message, root);if(res) {_data_x = root["data_x"].asInt();_data_y = root["data_y"].asInt();_oper = root["oper"].asInt(); // 字符类型也属于整数类型}return res;// auto pos = message.find(ProtSep);// if(pos == std::string::npos) return false;// std::string sx = message.substr(0, pos);// std::string sy = message.substr(pos + 2 * ProtSep.size() + 1);// _data_x = std::stoi(sx);// _data_y = std::stoi(sy);// _oper = message[pos + ProtSep.size()];// return true;}void SetX(int x) { _data_x = x; }void SetY(int y) { _data_y = y; }void SetOper(char oper) { _oper = oper; }int GetX() const { return _data_x; }int GetY() const { return _data_y; }char GetOper() const { return _oper; }private:// _data_x _oper _data_y// 报文的自描述字段// len\r\n_data_x _oper _data_y\r\n : \r\n 不属于报文的一部分,约定// 很多工作都是在做字符串处理int _data_x; // 第一个参数int _data_y; // 第二个参数char _oper; // +、-、*、/};class Response{public:Response(){}Response(int result, int code):_result(result), _code(code){}std::string Serialize() {Json::Value root;root["result"] = _result;root["code"] = _code;Json::FastWriter writer;return writer.write(root);// // _result _code// return std::to_string(_result) + ProtSep + std::to_string(_code);}bool Deserialize(const std::string& message) {Json::Value root;Json::Reader reader;bool res = reader.parse(message, root);if(res) {_result = root["result"].asInt();_code = root["code"].asInt();}return res;// auto pos = message.find(ProtSep);// if(pos == std::string::npos) return false;// std::string sres = message.substr(0, pos);// std::string scode = message.substr(pos + ProtSep.size());// _result = atoi(sres.c_str());// _code = atoi(scode.c_str());// return true;}void SetResult(int result) { _result = result; }void SetCode(int code) { _code = code; }int GetResult() { return _result; }int GetCode() { return _code; }private:// _len\r\n_result _code\r\nint _result; // 运算结果int _code = 0; // 运算状态: 0(运行结果正常)、1(除零错误)、2(未知错误)};
}
// Reactor.hpp#pragma once#include <functional>
#include <unordered_map>
#include <memory>
#include "Epoller.hpp"
#include "Connection.hpp"using namespace EpollModule;using connection_t = std::shared_ptr<Connection>;class Reactor
{static const int revs_num = 256;
public:Reactor():_epoller(std::make_unique<Epoll>()), _running(false){_epoller->Init();}void InsertConnection(connection_t conn) {auto it = _connections.find(conn->GetSockfd());if(it == _connections.end()) {conn->SetOwner(this);_connections[conn->GetSockfd()] = conn;_epoller->Add(conn->GetSockfd(), conn->GetEvent());LOG(LogLevel::DEBUG) << "add a new connection, fd: " << conn->GetSockfd();}}void DeleteConnection(int sockfd) {auto it = _connections.find(sockfd);if(it != _connections.end()) {_epoller->Del(sockfd);_connections[sockfd]->Close();_connections.erase(it);}}void EnableReadWrite(int sockfd, bool readable, bool writeable) {auto it = _connections.find(sockfd);if(it != _connections.end()) {// 修改用户层的connection事件uint32_t event = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);_connections[sockfd]->SetEvent(event);_epoller->Updata(sockfd, event); // 写到内核中}}void Loop() {_running = true;int timeout = 1000;while(_running) {int n = _epoller->Wait(_revs, revs_num, timeout);Dispatcher(n);DebugPrint();}}void Dispatcher(int n) {for(int i = 0; i < n; i++) {int sockfd = _revs[i].data.fd;uint32_t revent = _revs[i].events;if((revent & EPOLLERR) || (revent & EPOLLHUP))revent = (EPOLLIN | EPOLLOUT); // 异常事件转化为读写事件auto it = _connections.find(sockfd);if(it != _connections.end() && (revent & EPOLLIN)) _connections[sockfd]->Recver();if(it != _connections.end() && (revent & EPOLLOUT)) _connections[sockfd]->Sender();}}void DebugPrint() {std::cout << "Epoller 管理的 fd: ";for(auto &it : _connections) std::cout << it.first << " ";std::cout << std::endl;}void Stop() { _running = false; }~Reactor(){}
private:std::unique_ptr<Epoll> _epoller;std::unordered_map<int, connection_t> _connections;bool _running;struct epoll_event _revs[revs_num];
};
// Socket.hpp#pragma once#include <iostream>
#include <string>
#include <cstdlib>
#include <sys/socket.h>
#include <unistd.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <strings.h>
#include "Log.hpp"
#include "Common.hpp"
#include "InetAddr.hpp"namespace SocketModule
{using namespace LogModule;class Socket;using SocketPtr = std::shared_ptr<Socket>;// 封装一个基类:Socket接口类class Socket{public:virtual ~Socket(){};virtual SocketPtr Accept(InetAddr *client, int *out_err) = 0;virtual int GetSockfd() = 0;virtual void Close() = 0;virtual bool Recv(std::string *in) = 0;virtual void Send(std::string &out) = 0;void BuildListenMethod(uint16_t port, int backlog = gbacklog) {CreateSocket();Setsockopt();Bind(port);Listen(backlog);}virtual void Setsockopt() = 0;virtual void CreateSocket() = 0;virtual void Bind(uint16_t port) = 0;virtual void Listen(int backlog) = 0;};class TcpSocket : public Socket{public:TcpSocket(int sockfd = gsockfd):_sockfd(sockfd){}virtual SocketPtr Accept(InetAddr *client, int *out_err) {if(!client) return nullptr;struct sockaddr_in peer;socklen_t len = sizeof(peer);int newsockfd = accept(_sockfd, CONV(&peer), &len);*out_err = errno;if(newsockfd < 0) {LOG(LogLevel::WARNING) << "accept err";return nullptr;}client->SetAddr(peer);return std::make_shared<TcpSocket>(newsockfd);}~TcpSocket(){}virtual int GetSockfd() {return _sockfd;}virtual void Close() {if(_sockfd != gsockfd) close(_sockfd);}virtual bool Recv(std::string *in) {char inbuffer[1024 * 8];ssize_t n = recv(_sockfd, inbuffer, sizeof(inbuffer) - 1, 0);if(n > 0) {inbuffer[n] = 0;*in += inbuffer;return true;}else return false;}virtual void Send(std::string &out) {send(_sockfd, out.c_str(), out.size(), 0);}virtual void Setsockopt() {// 保障我们的服务器,异常断开之后,能够自动重连,不会有bind问题int opt = 1;int n = setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));(void)n;}virtual void CreateSocket() {_sockfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);if(_sockfd < 0) {LOG(LogLevel::ERROR) << "socket err";exit(Socket_Err);}SetNonBlock(_sockfd);LOG(LogLevel::DEBUG) << "socket success, sockfd: " << _sockfd;}virtual void Bind(uint16_t port) {InetAddr local(port);ssize_t n = bind(_sockfd, CONV(local.GetNetAddr()), local.NetAddrLen());if(n < 0) {LOG(LogLevel::ERROR) << "bind err";exit(Bind_Err);}LOG(LogLevel::DEBUG) << "bind success";}virtual void Listen(int backlog) {ssize_t n = listen(_sockfd, backlog);if(n < 0) {LOG(LogLevel::ERROR) << "listen err";exit(Listen_Err);}LOG(LogLevel::DEBUG) << "listen success";}private:int _sockfd;};
}
// TcpClient.cc#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <arpa/inet.h>#include "Protocol.hpp" // 形成约定using namespace Protocol;// ./client_tcp server_ip server_port
int main(int argc, char *argv[])
{if (argc != 3){std::cout << "Usage:./client_tcp server_ip server_port" << std::endl;return 1;}std::string server_ip = argv[1]; // "192.168.1.1"int server_port = std::stoi(argv[2]);int sockfd = ::socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0){std::cout << "create socket failed" << std::endl;return 2;}struct sockaddr_in server_addr;memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_port = htons(server_port);server_addr.sin_addr.s_addr = inet_addr(server_ip.c_str());// client 不需要显示的进行bind, tcp是面向连接的, connect 底层会自动进行bindint n = ::connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr));if (n < 0){std::cout << "connect failed" << std::endl;return 3;}// echo clientstd::string message;while (true){int x, y;char oper;std::cout << "input x: ";std::cin >> x;std::cout << "input y: ";std::cin >> y;std::cout << "input oper: ";std::cin >> oper;Request req(x, y, oper);// 1. 序列化message = req.Serialize();// 2. Encodemessage = Encode(message);// 3. 发送n = ::send(sockfd, message.c_str(), message.size(), 0);if (n > 0){char inbuffer[1024];// 4. 获得应答int m = ::recv(sockfd, inbuffer, sizeof(inbuffer), 0);if (m > 0){inbuffer[m] = 0;std::string package = inbuffer;//TODOstd::string content;// 4. 读到应答完整--暂定, decodeDecode(package, &content);// 5. 反序列化Response resp;resp.Deserialize(content);// 6. 得到结构化数据std::cout << resp.GetResult() << "[" << resp.GetCode() << "]" << std::endl;}elsebreak;}elsebreak;}::close(sockfd);return 0;
}




