当前位置: 首页 > news >正文

【网络编程】十八、Reactor模式

文章目录

    • 1、服务器模型
    • 2、服务器编程框架
    • 3、I/O 模型
    • 4、两种高效的事件处理模式
    • 5、两种高效的并发模式
    • 6、简单的reactor代码
      • makefile
      • reactor.hpp
      • main.cc
      • epoller.hpp
      • connection.hpp
      • protocol.hpp
      • err.hpp
      • sock.hpp
      • log.hpp
      • lockguard.hpp

在这里插入图片描述

1、服务器模型

  • C/S 模型:服务端压力大
  • P2P 模型:网络负载高 (可以看成 C/S 模型的扩展)

2、服务器编程框架

  • IO 处理单元:服务器管理客户端连接的框架,通常等待并接受新的客户端连接,接受客户端数据,将服务器响应返回给客户端. 但是数据收发不一定在IO 处理单元,也可能在逻辑单元中执行,取决于事件处理模式。
  • 逻辑单元:通常是一个线程或者进程,分析并处理客户端数据,然后把结果传递给 IO 处理单元或者直接发送客户端
  • 网络存储单元:数据库、缓存、文件等,非必须的
  • 请求队列:各单元之间的通信方式的抽象,通常被实现为池的一部分。对于服务器机群,请求队列是服务器之间预先建立、静态的、永久的 TCP 连接

在这里插入图片描述

3、I/O 模型

socket 创建的时候默认是阻塞的,可以传递参数设置成非阻塞。

​ 针对 阻塞 I/O 执行的系统调用可能因为无法立即完成而被操作系统挂起,直到等待的事件发生为止。比如,客户端通过 connect 向服务器发起连接时,connect 将首先发送同步报文段给服务器,然后等待服务器返回确认报文段。如果服务器的确认报文段没有立即到达客户端,则 connect 调用将被挂起,直到客户端收到确认报文段并唤醒 connect 调用。socket 的基础 API 中,可能被阻塞的系统调用包括 acceptsendrecvconnect

​ 针对 非阻塞 I/O 执行的系统调用则总是立即返回,而不管事件是否已经发生。如果事件没有立即发生,这些系统调用就返回 -1,和出错的情况一样。此时我们必须根据 errno 来区分这两种情况。对 acceptsendrecv 而言,事件未发生时 errno 通常被设置成 EAGAIN(意为“再来一次”)或者**EWOULDBLOCK(意为“期望阻塞”);对 connect 而言,errno** 则被设置成 EINPROGRESS

​ 很显然,我们只有在事件已经发生的情况下操作非阻塞 I/O(读、写等),才能提高程序的效率。因此,非阻塞 I/O 通常要和其他 I/O 通知机制一起使用,比如 I/O 复用和 SIGIO 信号。

I/O 复用是最常使用的 I/O 通知机制。它指的是,应用程序通过 I/O 复用函数向内核注册一组事件,内核通过 I/O 复用函数把其中就绪的事件通知给应用程序。Linux 上常用的 I/O 复用函数是 selectpollepoll_wait。需要指出的是,I/O 复用函数本身是阻塞的,它们能提高程序效率的原因在于它们具有同时监听多个 I/O 事件的能力

4、两种高效的事件处理模式

​ 服务器程序通常需要处理三类事件 I/O 事件、信号及定时事件。 两种高效事件处理模式:

Reactor:同步 I/O 模型通常用于实现 Reactor。Reactor是这样一种模式,它要求主线程(I/O处理单元,下同)只负责监听文件描述上是否有事件发生,有的话就立即将该事件通知工作线程(逻辑单元,下同)。除此之外,主线程不做任何其他实质性的工作。读写数据,接受新的连接,以及处理客户请求均在工作线程中完成。

使用同步I/O模型(以epoll_wait为例)实现的Reactor模式的工作流程是:

  1. 主线程往epoll内核事件表中注册socket上的读就绪事件。
  2. 主线程调用epoll_wait等待socket上有数据可读。
  3. 当socket上有数据可读时,epoll_wait通知主线程。主线程则将socket可读事件放入请求队列。
  4. 睡眠在请求队列上的某个工作线程被唤醒,它从socket读取数据,并处理客户请求,然后往epoll内核事件表中注册该socket上的写就绪事件。
  5. 主线程调用epoll_wait等待socket可写。
  6. 当socket可写时,epoll_wait通知主线程。主线程将socket可写事件放入请求队列。
  7. 睡眠在请求队列上的某个工作线程被唤醒,它往socket上写入服务器处理客户请求的结果。

在这里插入图片描述

Proactor:异步 I/O 模型则用于实现 Proactor 模式。与Reactor模式不同,Proactor模式将所有I/O操作都交给主线程和内核来处理,工作线程仅仅负责业务逻辑。因此,Proactor模式更符合图8-4所描述的服务器编程框架。

使用异步I/O模型(以aio_read和aio_write为例)实现的Proactor模式的工作流程是:

  1. 主线程调用aio_read函数向内核注册socket上的读完成事件,并告诉内核用户读缓冲区的位置,以及读操作完成时如何通知应用程序(这里以信号为例,详情请参考sigevent的man手册)。
  2. 主线程继续处理其他逻辑。
  3. 当socket上的数据被读入用户缓冲区后,内核将向应用程序发送一个信号,以通知应用程序数据已经可用。
  4. 应用程序预先定义好的信号处理函数选择一个工作线程来处理客户请求。工作线程处理完客户请求之后,调用aio_write函数向内核注册socket上的写完成事件,并告诉内核用户写缓冲区的位置,以及写操作完成时如何通知应用程序(仍然以信号为例)。
  5. 主线程继续处理其他逻辑。
  6. 当用户缓冲区的数据被写入socket之后,内核将向应用程序发送一个信号,以通知应用程序数据已经发送完毕。
  7. 应用程序预先定义好的信号处理函数选择一个工作线程来做善后处理,比如决定是否关闭socket。

在这里插入图片描述

5、两种高效的并发模式

​ 要注意的是,在 IO 模型 中,同步和异步区分的是内核向应用程序通知的是何种 IO 事件(是就绪事件还是已完成事件),以及该由谁来完成 IO(是由应用程序还是内核)。而在 并发模式 中,同步指的是程序完全按照代码序列的顺序执行,异步指的是程序的执行需要由系统事件来驱动。

  • 半同步/半异步(half-sync/half-async):同步线程用来处理客户逻辑,异步线程处理I/O事件。

    • 其中,使用这种半同步半异步再配合 reactor 模式的话,就是一种变体,简称 半同步/半异步堆模式(half-sync/half-reactive) 如下所示:

      在这里插入图片描述

  • 领导者/追随者(Leader/Followers):领导者/追随者模式是多个工作线程轮流获得事件源集合,轮流监听、分发并处理事件的一种模式。在任意时间点,程序都仅有一个领导者线程,它负责监听I/O事件。而其他线程则都是追随者,它们休眠在线程池中等待成为新的领导者。当前的领导者如果检测到I/O事件,首先要从线程池中推选出新的领导者线程,然后处理I/O事件。此时,新的领导者等待新的I/O事件,而原来的领导者则处理I/O事件,二者实现了并发。

6、简单的reactor代码

​ 我们这里实现一个单线程来处理监听/连接/IO/业务处理,后面我们会写一个 muduo 项目,是对该模式的改进!

makefile

main : main.ccg++ -o $@ $^ -std=c++11 -lpthread -ljsoncpp
.PHONY:clean
clean:rm -rf main

reactor.hpp

#pragma once 
#include <unordered_map>
#include <functional>
#include "epoller.hpp"
#include "err.hpp"
#include "log.hpp"
#include "sock.hpp"
#include "connection.hpp"
#include "protocol.hpp"class connection;
using func_t = std::function<void(connection*)>;static const int default_port = 8080; // 默认端口
static const int event_size = 100;    // 就绪事件数组的大小
static const int epoll_size = 128;    // 内核中epoll_event的个数
static const int buffer_size = 1024;  // 收发缓冲区的大小class reactor
{
private:int _port;                                      // 服务器端口号sock _sock;                                     // 监听套接字对象int _epfd;                                      // epoll模型的文件描述符struct epoll_event* _events;                    // 就绪事件数组epoller _epoller;                               // epoller对象unordered_map<int, connection*> _connect_table; // connection对象的集合func_t _service;                                // 业务处理类型
public:reactor(func_t service, int port = default_port): _port(port), _epfd(default_num), _events(nullptr), _service(service){// 1. 完成套接字初始化_sock.Socket();_sock.Bind(_port);_sock.Listen();// 2. 创建epoll模型_epfd = _epoller.create(epoll_size);// 3. 将监听套接字交给epoll模型管理,并且添加到哈希表中管理_epoller.control(_sock.GetFD(), EPOLL_CTL_ADD, EPOLLIN | EPOLLET);addConnection(_sock.GetFD(), std::bind(&reactor::Accepter, this, std::placeholders::_1), nullptr, nullptr);// 4. 开辟就绪事件数组_events = new struct epoll_event[event_size];if(_events == nullptr){logMessage(Level::ERROR, "new epoll_event error, errno: %d, string_err: %s", errno, strerror(errno));exit(NEW_EVENTS_ERR);}logMessage(Level::NORMAL, "new epoll_event success");}~reactor(){if(_epfd != -1)close(_epfd);if(_events != nullptr)delete[] _events;}    // 运行函数,监听epoll中的就绪事件,根据类型派发给不同的处理函数void run(){while(true){int n = _epoller.wait(_events, event_size);if(n == -1){logMessage(Level::ERROR, "epoll wait error, errno: %d, string_err: %s", errno, strerror(errno));exit(EPOLL_WAIT_ERR);}else if(n == 0){logMessage(Level::NORMAL, "epoll wait timeout...");}else{// 遍历所有就绪事件for(int i = 0; i < n; ++i){int fd = _events[i].data.fd;uint32_t event = _events[i].events;// 如果是错误事件,则变成EPOLLIN和EPOLLOUT事件去解决,其处理函数中会转化为异常处理if((event & EPOLLERR) || (event & EPOLLHUP)) event |= (EPOLLIN & EPOLLOUT);if(event & EPOLLIN)_connect_table[fd]->_receiver(_connect_table[fd]);if(event & EPOLLOUT)_connect_table[fd]->_sender(_connect_table[fd]);}}}}void addConnection(int fd, func_t receiver, func_t sender, func_t exception){// 1. 创建一个connection对象,然后初始化connection* conn = new connection(fd, this, receiver, sender, exception);if(conn == nullptr){logMessage(Level::ERROR, "new connection error, errno: %d, string_err: %s", errno, strerror(errno));exit(NEW_CONNECTION_ERR);}// 2. 将其添加到哈希表中维护_connect_table[fd] = conn;logMessage(Level::DEBUG, "addConnection: %d in unordered_map", conn->_fd);}void EnableReadWrite(connection *conn, bool readable, bool writeable){uint32_t event = (readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET;_epoller.control(conn->_fd, event, EPOLL_CTL_MOD);}
private:void Accepter(connection* conn){// 必须循环读,直到内容都被读取上来为止while(true){// 获取新链接std::string clientip;std::uint16_t clientport;int err;int newfd = _sock.Accept(&clientip, &clientport, &err);if(newfd == -1){if(err == EAGAIN || err == EWOULDBLOCK) {logMessage(Level::NORMAL, "accept数据已经读取完整,退出!");break;}else if(err == EINTR){logMessage(Level::NORMAL, "被中断了,要继续读取!");continue;}else{logMessage(Level::ERROR, "accept数据已经读取完整,退出!");break;}}else{// 将新链接交给epoll管理,并且添加到哈希表中管理_epoller.control(newfd, EPOLL_CTL_ADD, EPOLLIN | EPOLLET);addConnection(newfd, std::bind(&reactor::Receiver, this, std::placeholders::_1),std::bind(&reactor::Sender, this, std::placeholders::_1),std::bind(&reactor::Excepter, this, std::placeholders::_1));logMessage(Level::DEBUG, "get a new link, info: [%s:%d]", clientip.c_str(), clientport);}}}void Receiver(connection* conn){// 必须循环读,直到内容都被读取上来为止char buffer[buffer_size];while(true){ssize_t n = recv(conn->_fd, buffer, sizeof(buffer) - 1, 0);if(n > 0){buffer[n] = 0;conn->_inbuffer += buffer; // 进行尾插到缓冲区logMessage(Level::DEBUG, "%s", conn->_inbuffer.c_str());// 将读到缓冲区的数据交给业务处理函数去拆解_service(conn);}else if(n == 0){// 请求断开连接,则直接交给异常处理if (conn->_excepter){conn->_excepter(conn);return;}}else{if (errno == EAGAIN || errno == EWOULDBLOCK) // 数据读完了直接breakreturn;else if (errno == EINTR) // 读取中断,则继续读取continue;else{if (conn->_excepter) // 异常的话交给异常处理{conn->_excepter(conn);return;}}}}}void Sender(connection* conn){// 必须循环读,直到内容都被读取上来为止while(true){ssize_t n = send(conn->_fd, conn->_outbuffer.c_str(), conn->_outbuffer.size(), 0);if(n > 0){if(conn->_outbuffer.empty())break;elseconn->_outbuffer.erase(0, n);}else{if (errno == EAGAIN || errno == EWOULDBLOCK) // 缓冲区满了直接breakbreak;else if (errno == EINTR) // 发送中断,则继续读取continue;else{if (conn->_excepter) // 异常的话交给异常处理{conn->_excepter(conn);return;}}}}// 如果没有发送完毕,需要对对应的sock开启对写事件的关系, 如果发完了,我们要关闭对写事件的关心!if(!conn->_outbuffer.empty())conn->_rp->EnableReadWrite(conn, true, true);elseconn->_rp->EnableReadWrite(conn, true, false);}void Excepter(connection* conn){logMessage(DEBUG, "关闭%d 文件描述符的所有的资源", conn->_fd);_epoller.control(conn->_fd, 0, EPOLL_CTL_DEL);conn->Close();_connect_table.erase(conn->_fd);delete conn;}
};

main.cc

#include "reactor.hpp"
#include "threadpool.hpp"
#include <memory>
using namespace std;static void Usage(const string& proc)
{cerr << "\nUsage:\n\t" << proc << " port\n\n"; 
}// req: 里面一定是我们的处理好的一个完整的请求对象
// resp: 根据req,进行业务处理填充resp,不用管理任何读取和写入、序列化和反序列化等任何细节
bool cal(const Request &req, Response &resp)
{// req已经有结构化完成的数据了,可以直接使用resp._exitcode = OK;resp._result = OK;switch(req._op){case '+':resp._result = req._x + req._y;break;case '-':resp._result = req._x - req._y;break;case '*':resp._result = req._x * req._y;break;case '/':{if(req._y == 0)resp._exitcode = DIV_ZERO;elseresp._result = req._x / req._y;}break;case '%':{if(req._y == 0)resp._exitcode = MOD_ZERO;elseresp._result = req._x % req._y;}break;default:resp._exitcode = OP_ERROR;break;}return true;
}// 业务处理函数
void service(connection* conn)
{std::string text;while(recvPackage(conn->_inbuffer, &text)){logMessage(Level::DEBUG, "service while begin");// 1. 去报头std::string body;if(!delRule(text, &body))return;std::cout << "去掉报头的正文:\n" << body << std::endl;// 2. 反序列化Request req;if(!req.deserialize(body))return;// 3. 业务处理Response resp;cal(req, resp);// 4. 序列化std::string out;if(!resp.serialize(&out))return;// 5. 添加报头// 6. 将其放到输出缓冲区中conn->_outbuffer += addRule(out);logMessage(Level::DEBUG, "service while end, %s", conn->_outbuffer.c_str());}// 7. 收到完整报文之后,调用发送函数进行响应if(conn->_sender)conn->_sender(conn);
}int main(int argc, char* argv[])
{if(argc != 2){Usage(argv[0]);exit(USAGE_ERR);}unique_ptr<reactor> server(new reactor(service, atoi(argv[1])));server->run();return 0;
}

epoller.hpp

#pragma once
#include <iostream>
#include <sys/epoll.h>
#include <fcntl.h>
#include <cstring>
#include <cerrno>
#include "sock.hpp"
#include "err.hpp"
#include "log.hpp"const static int time_out = 1000;  // epoll_wait的超时时间class epoller
{
private:int _epfd; // epoll模型的文件描述符
public:epoller(): _epfd(default_num){}// 创建epoll模型,并且开辟就绪事件数组int create(int epoll_size){// 创建epoll模型_epfd = epoll_create(epoll_size);if(_epfd == -1){logMessage(Level::ERROR, "epoll_create error, errno: %d, string_err: %s", errno, strerror(errno));exit(EPOLL_CREATE_ERR);}logMessage(Level::NORMAL, "epoll_create success");return _epfd;}// 操作epoll模型void control(int fd, int option, uint32_t events){if(option == EPOLL_CTL_ADD | option == EPOLL_CTL_MOD){struct epoll_event ev;ev.data.fd = fd;ev.events = events;// 设置非阻塞setnonblocking(fd);int ret = epoll_ctl(_epfd, option, fd, &ev);if(ret == -1){logMessage(Level::ERROR, "epoll_add or mod error, errno: %d, string_err: %s", errno, strerror(errno));exit(EPOLL_CTL_ERR);}logMessage(Level::NORMAL, "epoll_add or mod success");}else if(option == EPOLL_CTL_DEL){int ret = epoll_ctl(_epfd, option, fd, nullptr);if(ret == -1){logMessage(Level::ERROR, "epoll_del error, errno: %d, string_err: %s", errno, strerror(errno));exit(EPOLL_CTL_ERR);}logMessage(Level::NORMAL, "epoll_del success");}}// 等待就绪事件,并且进行任务的分配int wait(struct epoll_event* events, int maxevents){int n = epoll_wait(_epfd, events, maxevents, time_out);return n;}
private:// 设置非阻塞状态void setnonblocking(int fd){// 获取原先状态int old_option = fcntl(fd, F_GETFL);if(old_option == -1){logMessage(Level::ERROR, "get non_blocking error, errno: %d, string_err: %s", errno, strerror(errno));exit(GET_NON_BLOCKING_ERR);}// 设置非阻塞状态int new_option = fcntl(fd, F_SETFL, old_option | O_NONBLOCK);if(new_option == -1){logMessage(Level::ERROR, "set non_blocking error, errno: %d, string_err: %s", errno, strerror(errno));exit(SET_NON_BLOCKING_ERR);}}
};

connection.hpp

#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <unistd.h>
#include "reactor.hpp"class reactor;
class connection;
using func_t = std::function<void(connection* conn)>;class connection
{
public:int _fd;std::string _inbuffer;  // 输入缓冲区std::string _outbuffer; // 输出缓冲区func_t _receiver;  // 可读事件处理函数func_t _sender;    // 可写事件处理函数func_t _excepter; // 异常事件处理函数reactor* _rp; // 指向reactor的指针,是为了方便找到_rp
public:connection(int fd, reactor* rp, func_t receiver, func_t sender, func_t excepter): _fd(fd), _rp(rp), _receiver(receiver), _sender(sender), _excepter(excepter){}~connection(){Close();}void Close(){if(_fd != -1){close(_fd);_fd = -1;}}
};

protocol.hpp

#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <jsoncpp/json/json.h>
#include "err.hpp"
#include "connection.hpp"
using namespace std;const char* SEP = " ";                 // 分隔符,用于区分开序列化之间的字符
const int SEP_LEN = strlen(SEP);
const char* SEP_LINE = "\r\n";         // 行分隔符,用于区分开各报文之间
const int SEP_LINE_LEN = strlen(SEP_LINE);// 接收客户端或者服务端发来的报文,该函数的目的是拿到一个完整的报文
bool recvPackage(std::string &inbuffer, std::string *recv_string)
{*recv_string = "";// 分析处理auto pos = inbuffer.find(SEP_LINE);if (pos == std::string::npos)return false;logMessage(Level::DEBUG, "%d", stoi(inbuffer.substr(0, pos)));int body_size = stoi(inbuffer.substr(0, pos)); int package_size = pos + SEP_LINE_LEN*2 + body_size;if(package_size > inbuffer.size())return false;// 至少有一个完整的报文*recv_string = inbuffer.substr(0, package_size);logMessage(Level::DEBUG, "%s", recv_string->c_str());inbuffer.erase(0, package_size);return true;
}// 为报文添加自定义首部和尾部的函数
// 自定义首部规则:报文长度+行分隔符"\r\n",比如下面的:
// "x or yyyy"  -->  "有效载荷长度"\r\n"x or yyyy"\r\n
// "exitcode result"  -->  "有效载荷长度"\r\n"exitcode result"\r\n
// 其中有效载荷指的就是"x or yyyy"和"exitcode result"
string addRule(const string& body)
{string send_string = to_string(body.size()); // 有效载荷长度send_string += SEP_LINE; // 加上行分隔符send_string += body;     // 加上有效载荷send_string += SEP_LINE; // 加上行分隔符return send_string;
}// 为报文去掉自定义首部和尾部的函数,比如:
// "有效载荷长度"\r\n"x or yyyy"\r\n  -->  "x or yyyy"
bool delRule(const string& package, string* body)
{auto pos = package.find(SEP_LINE);if(pos == string::npos)return false;int body_size = stoi(package.substr(0, pos));*body = package.substr(pos + SEP_LINE_LEN, body_size);return true;
}// 请求一般是客户端给服务端的class Request
{
public:int _x;int _y;  char _op; // 运算符
public:Request(int x = 0, int y = 0, int op = 0):_x(x), _y(y), _op(op){}// 序列化相当于:  结构体 -》 字符串(更正确的说法是字节流)// 反序列化相当于:字符串 -》 结构体bool serialize(string* out) // 输出型参数{// 填写键值对Json::Value root;root["first"] = _x;root["second"] = _y;root["operator"] = _op;// 选择方式进行序列化Json::FastWriter writer;*out = writer.write(root);return true;}bool deserialize(const string& in){Json::Reader reader;Json::Value root;reader.parse(in, root); // 将in中的字符流反序列化到root中_x = root["first"].asInt();_y = root["second"].asInt();_op = root["operator"].asInt(); // 注意这里因为字符也是ASCII码,所以直接转为int即可return true;}
};// 响应一般是服务端发给客户端的
class Response
{
public:int _exitcode = 0; // 退出码,规定0表示计算成功,非0表示计算失败int _result = 0;   // 计算结果
public:Response(int exitcode = 0, int result = 0):_exitcode(exitcode), _result(result){}bool serialize(string* out) // 输出型参数{Json::Value root;root["exitcode"] = _exitcode;root["result"] = _result;Json::FastWriter writer;*out = writer.write(root);return true;}bool deserialize(const string& in){Json::Value root;Json::Reader reader;reader.parse(in, root);_exitcode = root["exitcode"].asInt();_result = root["result"].asInt();return true;}
};// 将字符串转化为请求结构体的工具函数
bool get_req_from_string(const string& msg, Request& req)
{string leftnum, rightnum;char op;int status = 0; // 0表示当前为左操作数范围,1表示当前为右操作数范围bool op_occur = false;for(int i = 0; i < msg.size(); ++i){if(!isdigit(msg[i])) // 非数字的情况{// 如果不是操作符则直接falseif(msg[i] != '+' && msg[i] != '-' && msg[i] != '*' && msg[i] != '/' && msg[i] != '%')return false;op_occur = true; // 标志出现过操作符op = msg[i];status = 1;      // 变成右操作数范围continue;}if(status == 0)leftnum += msg[i];elserightnum += msg[i];}// 如果没出现过操作符直接falseif(!op_occur)return false;req._x = stoi(leftnum), req._y = stoi(rightnum), req._op = op;return true;
}

err.hpp

#pragma once
#include <iostream>enum
{USAGE_ERR = 1,SOCKET_ERR,BIND_ERR,LISTEN_ERR,ACCEPT_ERR,EPOLL_CREATE_ERR,EPOLL_CTL_ERR,EPOLL_WAIT_ERR,NEW_EVENTS_ERR,GET_NON_BLOCKING_ERR,SET_NON_BLOCKING_ERR,NEW_CONNECTION_ERR,OK,DIV_ZERO,MOD_ZERO,OP_ERROR
};

sock.hpp

#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include "err.hpp"
#include "log.hpp"const static int maxbacklog = 32;
const static int default_num = -1;class sock
{
private:int _listensock = default_num; 
public:sock() {}~sock() { Close(); }int GetFD() { return _listensock; }void Socket(){// 1. 创建套接字_listensock = socket(AF_INET, SOCK_STREAM, 0);if(_listensock < 0){logMessage(Level::ERROR, "socket error: %s", strerror(errno));exit(SOCKET_ERR);}logMessage(Level::NORMAL, "create socket success: %d", _listensock);// 1.1 设置地址复用int opt = 1;setsockopt(_listensock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof opt);}void Bind(int port){// 2. 绑定套接字信息struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = htonl(INADDR_ANY);if(bind(_listensock, (struct sockaddr*)&local, sizeof(local)) < 0){logMessage(Level::ERROR, "bind error: %s", strerror(errno));exit(BIND_ERR);}logMessage(Level::NORMAL, "bind succuess");}void Listen(){// 3. 设置socket为监听状态if(listen(_listensock, maxbacklog) < 0){logMessage(Level::ERROR, "listen error: %s", strerror(errno));exit(LISTEN_ERR);}logMessage(Level::NORMAL, "listen succuess");}int Accept(std::string* clientip, uint16_t* clientport, int* err){struct sockaddr_in client;socklen_t len = sizeof(client);int fd = accept(_listensock, (struct sockaddr*)&client, &len);*err = errno;if(fd >= 0){*clientip = inet_ntoa(client.sin_addr);*clientport = ntohs(client.sin_port);}return fd;}
private:void Close(){if(_listensock != default_num){close(_listensock);_listensock = default_num;}}
};

log.hpp

#pragma once
#include <iostream>
#include <string>
#include <cstdarg>
#include <ctime>
#include <unistd.h>
#include <sys/types.h>
using namespace std;const int NUM = 1024;
enum Level{DEBUG = 0,NORMAL,WARING, ERROR,FATAL
};
const char* to_levelstr(int level)
{switch(level){case DEBUG: return "DEBUG";case NORMAL: return "NORMAL";case WARING: return "WARING";case ERROR: return "ERROR";case FATAL: return "FATAL";default: return nullptr;}
}// 日志格式:[日志等级][时间戳/时间][pid][message]
void logMessage(int level, const char* format, ...)
{// 1. 先将时间戳转化为本地时间然后格式化char timebuffer[128];time_t timestamp = time(nullptr); 			 // 获取当前时间戳struct tm* timeinfo = localtime(&timestamp); // 转化为本地时间结构strftime(timebuffer, sizeof(timebuffer), "%Y-%m-%d %H:%M:%S", timeinfo); // 格式化时间字符串// 2. 拼凑前缀部分,是固定的char prefixbuffer[NUM];snprintf(prefixbuffer, sizeof(prefixbuffer), "[%s][%s][%d]", to_levelstr(level), timebuffer, getpid());// 3. 格式化信息部分也就是后缀部分,是可变参数的内容 -- 通过vsnprintf格式化到数组中char msgbuffer[NUM];va_list start;va_start(start, format);vsnprintf(msgbuffer, sizeof(msgbuffer), format, start);// 4. 打印printf("%s%s\n", prefixbuffer, msgbuffer);
}

lockguard.hpp

#pragma once
#include <iostream>
#include <pthread.h>
#include "log.hpp"class lockguard
{
private:pthread_mutex_t _mtx;
public:lockguard(pthread_mutex_t& mtx): _mtx(mtx){if(pthread_mutex_lock(&_mtx) != 0){logMessage(Level::ERROR, "lock error");std::exception();}}~lockguard(){if(pthread_mutex_unlock(&_mtx) != 0){logMessage(Level::ERROR, "unlock error");std::exception();}}
};

在这里插入图片描述

相关文章:

  • 2025年05月28日Github流行趋势
  • 农业光合参数反演专栏
  • kubernate解决 “cni0“ already has an IP address different from 10.244.0.1/24问题
  • Caddy如何在测试环境中使用IP地址配置HTTPS服务
  • bug: uniCloud 查询数组字段失败
  • HTTP Accept简介
  • linux系统(centos7为例)将jar配置成服务操作教程
  • 浏览器之禁止打开控制台【F12】
  • 网页前端开发(基础进阶1)
  • Transformer核心技术解析LCPO方法:精准控制推理长度的新突破
  • 计算机内存管理全解析:从基础原理到前沿技术(含分页/分段/置换算法/大页/NVM/CXL等技术详解
  • LVS的DR模式部署
  • Linux文件权限相关
  • Oracle基础知识(五)——ROWID ROWNUM
  • 8.8 Primary ODSA service without ODSA Portal
  • 基于亚博K210开发板——物体分类测试
  • 企业信息化/数字化项目管理办法V3.0
  • 记一次前端逻辑绕过登录到内网挖掘
  • JAVA学习 DAY1 初识JAVA
  • MCP 登场:掘金开启 AI 前端项目部署新时代
  • 行业网站建设费用明细/国内广告投放平台
  • 禁止网站收录/在线生成个人网站免费
  • 哪个做企业网站/免费做网站自助建站
  • 可信赖的做pc端网站/网站优化平台
  • 淘宝 网站建设教程视频/怎么自己弄一个平台
  • 三级域名和二级域名的区别/广州seo代理