当前位置: 首页 > news >正文

【项目】 :C++ - 仿mudou库one thread one loop式并发服务器实现(代码实现)

C++ -仿mudou库one thread one loop式并发服务器实现代码实现

  • 一、SERVER模块
    • 缓存区 `Buffer` 类
    • 套接字`Socket`类
    • 事件管理`Channel`类
    • 描述符事件监控`Poller`类
    • 定时任务管理`TimerWheel`类
    • Reactor-EventLoop线程池类
    • 通用类型Any类
    • 通信连接管理`Connection`类
    • 监听描述符管理`Acceptor`类
    • 服务器`TcpServer`类
  • 二、HTTP协议模块
    • `Util`实用工具类
    • `HttpRequest`请求类
    • `HttpResponse`响应类
    • `HttpContext`上下文类
    • `HttpServer`类
    • 基于HttpServer搭建HTTP服务器
  • 三、功能测试
    • 使用浏览器进行基本功能测试
    • 长连接连续请求测试
    • 超时连接测试1
    • 超时连接释放测试2
    • 超时连接释放测试3
    • 数据中多条请求处理测试
    • PUT大文件上传测试
  • 四、性能测试
    • 服务器性能测试 —— 使用 Webbench
      • 测试目标
      • 测试原理
      • 重点衡量标准
      • 测试环境

上一个项目中我实现了日志系统,因此本项目的日志模块我便使用了该系统。
日志系统博客
日志代码仓库

不过在本项目中使用日志系统模块似乎有点大了,这里有个比较简便的日志宏的实现:

#define INF 0
#define DBG 1
#define ERR 2// 默认日志级别
#define DEFAULT_LOG_LEVEL DBG// 通用日志宏
#define LOG(level, format, ...) { \if (level >= DEFAULT_LOG_LEVEL) { \time_t t = time(NULL); \struct tm *m = localtime(&t); \char ts[32] = {0}; \strftime(ts, 31, "%H:%M:%S", m); \fprintf(stdout, "[%p %s %s:%d] " format "\n", \(void*)pthread_self(), ts, __FILE__, __LINE__, ##__VA_ARGS__); \} \
}// 不同级别日志宏
#define INF_LOG(format, ...) LOG(INF, format, ##__VA_ARGS__);
#define DBG_LOG(format, ...) LOG(DBG, format, ##__VA_ARGS__);
#define ERR_LOG(format, ...) LOG(ERR, format, ##__VA_ARGS__);

一、SERVER模块

缓存区 Buffer

Buffer 类用于实现用户态缓冲区,提供数据存储、读取和管理等功能。

#define BUFF_DEFAULT_SIZE 1024
class Buffer
{
private:std::vector<char> _buffer; // 缓冲区uint64_t _read_idx;        // 读偏移uint64_t _write_idx;       // 写偏移public:char *Begin() { return &*_buffer.begin(); }// 获取读写位置起始地址char *WritePosition() { return Begin() + _write_idx; }char *ReadPosition() { return Begin() + _read_idx; }// 获取当前缓冲区中 前后缓冲区位置大小uint64_t TailFreeSpace() { return _buffer.size() - _write_idx; }uint64_t HeadFreeSpace() { return _read_idx; }// 获取可读数据大小uint64_t ReadAbleSize() { return _write_idx - _read_idx; }// 读写idx 向后移动void MoveReadOffset(uint64_t len){if (len == 0)return;assert(len <= ReadAbleSize());_read_idx += len;}void MoveWriteOffset(uint64_t len){if (len == 0)return;assert(len <= TailFreeSpace());_write_idx += len;}void EnsureWriteSpace(uint64_t len){if (len <= TailFreeSpace())return;/*如果len 是小于等于缓冲区中所有空闲位置大小,那么将数据往前移动*/if (len <= TailFreeSpace() + HeadFreeSpace()){uint64_t res = ReadAbleSize();std::copy(ReadPosition(), ReadPosition() + res, Begin());_read_idx = 0;_write_idx = res;}else // 缓冲区的空间不足,进行扩容{Log::DEBUG("RESIZE %ld", _write_idx + len);_buffer.resize(_write_idx + len);}}public:Buffer() : _buffer(BUFF_DEFAULT_SIZE), _read_idx(0), _write_idx(0) {}// write接口void Write(const char *data, uint64_t len){if (len == 0)return;EnsureWriteSpace(len);std::copy(data, data + len, WritePosition());}void WriteAndPush(const char *data, uint64_t len){if (len == 0)return;Write(data, len);MoveWriteOffset(len);}void WriteString(const std::string &data){Write(data.c_str(), data.size());}void WriteStringAndPush(const std::string &data){WriteString(data);MoveWriteOffset(data.size());}void WriteBuffer(Buffer &buf){Write(buf.ReadPosition(), buf.ReadAbleSize());}void WriteBufferAndPush(Buffer &buf){WriteBuffer(buf);MoveWriteOffset(buf.ReadAbleSize());}// read接口void Read(char *buf, uint64_t len){if (len == 0)return;assert(len <= ReadAbleSize());std::copy(ReadPosition(), ReadPosition() + len, buf);}void ReadAndPop(char *buf, uint64_t len){Read(buf, len);MoveReadOffset(len);}std::string ReadAsString(uint64_t len){assert(len <= ReadAbleSize());std::string str;str.resize(len);Read(&str[0], len);return str;}std::string ReadAsStringAndPop(uint64_t len){assert(len <= ReadAbleSize());std::string str = ReadAsString(len);MoveReadOffset(len);return str;}// 这里我们就不需要再写一个ReadBuffer了 因为使用WriteBuffer可以实现同样的功能std::string GetLine(){char *pos = (char *)memchr(ReadPosition(), '\n', ReadAbleSize());if (pos == NULL){return "";}//  +1是为了把换行字符也取出来。读取出\nreturn ReadAsString(pos - ReadPosition() + 1);}std::string GetLineAndPop(){std::string str = GetLine();MoveReadOffset(str.size());return str;}void Clear(){_write_idx = 0;_read_idx = 0;}
};

套接字Socket

封装套接字操作。

class Socket
{
private:int _sockfd;public:Socket() : _sockfd(-1) {}Socket(int fd) : _sockfd(fd) {}~Socket() { Close(); }int GetFd() { return _sockfd; }// 创建套接字bool CreateSock(){_sockfd = socket(AF_INET, SOCK_STREAM, 0);if (_sockfd < 0){Log::ERROR("%s", "Create socket false");return false;}return true;}// fd绑定ip portbool Bind(const std::string &ip, uint16_t port){struct sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());int n = bind(_sockfd, (const sockaddr *)&addr, sizeof(addr));if (n < 0){Log::ERROR("%s", "SOCKET bind false");return false;}return true;}// 监听bool Listen(int backlog = 15){int ret = listen(_sockfd, backlog);if (ret < 0){Log::ERROR("%s", "SOCKET listen false");return false;}return true;}// 客户端连接服务器bool Connect(const std::string &ip, uint16_t port){struct sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());int n = connect(_sockfd, (const sockaddr *)&addr, sizeof(addr));if (n < 0){Log::ERROR("%s", "SOCKET Connect false");return false;}return true;}// 监听 接受新连接int Accept(){// 这里我不需要知道ip 和port 所以我设置了 nullint newfd = accept(_sockfd, NULL, NULL);if (newfd < 0){Log::ERROR("%s", "SOCKET accept false");return -1;}Log::DEBUG("%s", "get a link");return newfd;}// flag == MSG_DONTWAIT 表示当前接收为非阻塞。ssize_t Recv(void *buff, size_t len, int nonblock_flag = 0){ssize_t n = recv(_sockfd, buff, len, nonblock_flag);if (n <= 0){// EAGAIN 当前socket的接收缓冲区中没有数据了,在非阻塞的情况下才会有这个错误// EINTR  表示当前socket的阻塞等待,被信号打断了,if (errno == EAGAIN || errno == EINTR)return 0; // 表示这次接收没有接收到数据Log::ERROR("%s", "SOCKET recv false");return -1;}return n;}ssize_t NonBlockRecv(void *buff, size_t len){return Recv(buff, len, MSG_DONTWAIT);}ssize_t Send(const void *buff, size_t len, int nonblock_flag = 0){ssize_t n = send(_sockfd, buff, len, nonblock_flag);if (n <= 0){// EAGAIN 当前socket的接收缓冲区中没有数据了,在非阻塞的情况下才会有这个错误// EINTR  表示当前socket的阻塞等待,被信号打断了,if (errno == EAGAIN || errno == EINTR)return 0; // 表示这次接收没有接收到数据Log::ERROR("%s", "SOCKET send false");return -1;}return n;}ssize_t NonBlockSend(const void *buff, size_t len){return Send(buff, len, MSG_DONTWAIT);}// 关闭套接字void Close(){if (_sockfd != -1){close(_sockfd);_sockfd = -1;}}// 设置非堵塞void SetNonBlock(){int flag = fcntl(_sockfd, F_GETFL, 0);fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);}// 设置套接字选项---开启地址端口重用void ReuseAddress(){// int setsockopt(int fd, int leve, int optname, void *val, int vallen)int val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(int));val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(int));}// 创建一个服务端bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool nonblock_flag = false, int backlog = 15){// 1. 创建套接字,2. 设置非阻塞,3. 启动地址重用 ,4.绑定地址 ,5. 开始监听if (CreateSock() == false)return false;if (nonblock_flag == true)SetNonBlock();ReuseAddress();if (Bind(ip, port) == false)return false;if (Listen(backlog) == false)return false;Log::INFO("%s", "create server success");return true;}bool CreateClient(uint16_t port, const std::string &ip){// 1. 创建套接字。bind(对于客户端可以忽略)  2. 连接服务器if (CreateSock() == false)return false;if (Connect(ip, port) == false)return false;Log::INFO("%s", "create client success");return true;}
};

事件管理Channel

管理描述符的 I/O 事件(读、写、错误等)。
与 Poller 配合,触发事件时回调相应处理函数。

class Poller;
class EventLoop;
// Channel 实现单个 描述符与事件的管理
class Channel
{using EventCallback = std::function<void()>;private:int _fd;EventLoop *_loop;uint32_t _events;  // 监控的事件uint32_t _revents; // 已就绪的事件EventCallback _read_callback;EventCallback _write_callback;EventCallback _error_callback;EventCallback _close_callback; // 连接断开触发的回调函数EventCallback _event_callback; // 事件触发后调整活跃度
public:Channel(EventLoop *loop, int fd) : _loop(loop), _fd(fd), _events(0), _revents(0) {}void SetREvents(uint32_t events) { _revents = events; } // 设置实际就绪的事件void SetReadCallback(const EventCallback &cb) { _read_callback = cb; }void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }int GetFd() { return _fd; }uint32_t Events() { return _events; } // 获取需要监控的事件// 判断当前是否监控了可读事件 或者可写事件bool ReadAble() { return _events & EPOLLIN; }bool WriteAble() { return _events & EPOLLOUT; }// 启动读写事件监控void EnableRead(){_events |= EPOLLIN;Update();}void EnableWrite(){_events |= EPOLLOUT;Update();}// 关闭读写事件监控void DisableRead(){_events &= ~EPOLLIN;Update();}void DisableWrite(){_events &= ~EPOLLOUT;Update();}// 关闭所有事件监控void DisableAll() { _events = 0; }// 直接移除在epoll的监控, 因为需要调用poller的接口,所以类外定义void Remove();void Update();void HandleEvent(){// 读事件就绪 | 对端断开 |if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)){if (_read_callback)_read_callback();}// EPOLLOUT < EPOLLERR < EPOLLHUP 如果出错了,就别处理写;如果挂掉了,就别处理写/错。if (_revents & EPOLLHUP){if (_close_callback)_close_callback();return;}else if (_revents & EPOLLERR){if (_error_callback)_error_callback();return;}else if (_revents & EPOLLOUT){if (_write_callback)_write_callback();}if (_event_callback)_event_callback(); // 调整事件的活跃度}
};

描述符事件监控Poller

#define MAX_EPOLLEVENTS 100
class Poller
{
private:int _epfd; // epoll句柄struct epoll_event _evs[MAX_EPOLLEVENTS];std::unordered_map<int, Channel *> _channels; // fd和事件管理的映射
private:// 对epoll中的红黑树事件进行管理void Updata(Channel *channel, int op){int fd = channel->GetFd();struct epoll_event ev;ev.data.fd = fd;ev.events = channel->Events();int ret = epoll_ctl(_epfd, op, fd, &ev);if (ret < 0){Log::ERROR("%s", "epoll_ctl false");}return;}// 判断channel 是否已经添加到了事件管理中bool HasChannel(Channel *channel){int fd = channel->GetFd();auto it = _channels.find(fd);if (it == _channels.end())return false;return true;}public:Poller(){_epfd = epoll_create(MAX_EPOLLEVENTS);if (_epfd < 0){Log::ERROR("epoll_create false");abort(); // 退出程序}}// 修改事件void UpdateEvent(Channel *channel){// 判断channel 是否存在 事件管理中bool ret = HasChannel(channel);if (ret == false){_channels.insert(std::make_pair(channel->GetFd(), channel));Updata(channel, EPOLL_CTL_ADD);return;}Updata(channel, EPOLL_CTL_MOD);return;}void RemoveEvent(Channel *channel){auto it = _channels.find(channel->GetFd());if (it == _channels.end())return;// 在hash中删除_channels.erase(it);// 在epoll红黑树中删除Updata(channel, EPOLL_CTL_DEL);}// 开始监控,返回活跃连接void Poll(std::vector<Channel *> *active){int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1); // 永久堵塞if (nfds < 0){// EINTR  表示当前socket的阻塞等待,被信号打断了,if (errno == EINTR) // 信号打断return;Log::ERROR("EPOLL WAIT ERROR:%s", strerror(errno));return;}for (int i = 0; i < nfds; i++){auto it = _channels.find(_evs[i].data.fd);assert(it != _channels.end());it->second->SetREvents(_evs[i].events); // 设置就绪事件active->push_back(it->second);          // 输出型参数}return;}
};

定时任务管理TimerWheel

using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{private:uint64_t _id;          // 定时器任务IDuint32_t _timeout;     // 定时任务超时事件bool _canceled;        // 取消定时任务,false-表示没有被取消, true-表示被取消TaskFunc _task_cb;     // 定时器对象要执行的定时任务ReleaseFunc _relea_cb; // 用于删除TimerWheel中保存的定时器对象信息
public:TimerTask(uint64_t id, uint32_t timeout, const TaskFunc &cb) : _id(id), _timeout(timeout), _canceled(false), _task_cb(cb) {}~TimerTask(){if (_canceled == false)_task_cb();_relea_cb();}void Cancel() { _canceled = true; }void SetRelease(const ReleaseFunc &cb) { _relea_cb = cb; }uint32_t GetTimeout() { return _timeout; }
};class TimerWheel
{using WeakTask = std::weak_ptr<TimerTask>;using ShareTask = std::shared_ptr<TimerTask>;private:int _tick;                                      // 当前的秒针,走到哪里释放哪里,释放哪里,就相当于执行哪里的任务int _capacity;                                  // 表盘最大数量---其实就是最大延迟时间std::unordered_map<uint64_t, WeakTask> _timers; //_timers 的作用只是 快速查找任务(通过 id 找对应的任务)。它本身并不想延长任务的生命周期。std::vector<std::vector<ShareTask>> _wheel;// timefdEventLoop *_loop;int _timerfd; // 定时器描述符--可读事件回调就是读取计数器,执行定时任务std::unique_ptr<Channel> _timer_channel;private:void RemoveTimer(uint64_t id){auto it = _timers.find(id);if (it != _timers.end()){_timers.erase(it);}}static int CreateTimerfd(){int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);if (timerfd < 0){Log::ERROR("timerfd create false");abort();}struct itimerspec itime;itime.it_value.tv_sec = 1; // 第一次超时时间为1s后itime.it_value.tv_nsec = 0;itime.it_interval.tv_sec = 1; // 第一次超时后,每次超时的间隔时itime.it_interval.tv_nsec = 0;timerfd_settime(timerfd, 0, &itime, NULL);return timerfd;}// 跑动时间轮void RunWheel(){_tick = (_tick + 1) % _capacity;_wheel[_tick].clear(); // 清空指定位置的数组,就会把数组中保存的所有管理定时器对象的shared_ptr释放掉}int ReadTimefd(){uint64_t times;// 有可能因为其他描述符的事件处理花费事件比较长,然后在处理定时器描述符事件的时候,有可能就已经超时了很多次// read读取到的数据times就是从上一次read之后超时的次数int ret = read(_timerfd, &times, 8);if (ret < 0){Log::ERROR("read Timerfd false");abort();}return times;}// 通过timerfd read来跑动时间轮void OnTime(){int times = ReadTimefd();for (int i = 0; i < times; i++){RunWheel();}}void TimerAddInLoop(uint64_t id, uint32_t timeout, const TaskFunc &cb){ShareTask ptr(new TimerTask(id, timeout, cb));ptr->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));// 添加到timer 中_timers[id] = WeakTask(ptr);int pos = (_tick + timeout) % _capacity;_wheel[pos].push_back(ptr);}// 刷新/延迟定时任务void TimerRefreshInLoop(uint64_t id){// 通过保存的定时器对象的weak_ptr构造一个shared_ptr出来,添加到轮子中auto it = _timers.find(id);if (it == _timers.end()){return; // 定时任务没有找到}ShareTask ptr = it->second.lock(); // lock获取weak_ptr管理的对象对应的shared_ptr// 由于是shared_ptr,所以我们只需要将一个新的shared_ptr插入到_wheel中即可达到延迟的目的int timeout = ptr->GetTimeout();int pos = (_tick + timeout) %_capacity;_wheel[pos].push_back(ptr);}// 将定时任务的任务取消void TaskCancelInLoop(uint64_t id){auto it = _timers.find(id);if (it == _timers.end()){return;}ShareTask ptr = it->second.lock();if (ptr)ptr->Cancel();}public:TimerWheel(EventLoop *loop) : _tick(0), _capacity(60), _wheel(_capacity), _loop(loop),_timerfd(CreateTimerfd()), _timer_channel(new Channel(_loop, _timerfd)){_timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));_timer_channel->EnableRead();}void TimerAdd(uint64_t id, uint32_t timeout, const TaskFunc &cb);void TimerRefresh(uint64_t id);void TimerCancel(uint64_t id);/*这个接口存在线程安全问题--这个接口实际上不能被外界使用者调用,只能在模块内,在对应的EventLoop线程内执行*/bool HasTimer(uint64_t id){auto it = _timers.find(id);if (it == _timers.end()){return false;}return true;}
};

Reactor-EventLoop线程池类

using Functor = std::function<void()>;
class EventLoop
{
private:std::mutex _mutex;                       // 给任务池上锁std::vector<Functor> _tasks;             // 任务池std::thread::id _thread_id;              // 每一个Evenetloop都由一个线程池来管理int _event_fd;                           // eventfd唤醒IO事件监控有可能导致的阻塞std::unique_ptr<Channel> _event_channel; // eventfd的ChannelPoller _poller;                          // 当前Loop下所有描述符的事件监控TimerWheel _timer_wheel;                 // 定时器模块
public:// 执行任务池中的任务void RunAllTask(){std::vector<Functor> tmp;{std::unique_lock<std::mutex> _lock(_mutex);_tasks.swap(tmp);}for (auto &f : tmp){f();}return;}// 创造一个 eventfd , readEvent , WeakUpEventfdstatic int CreateEvent(){int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if (efd < 0){Log::ERROR("create eventfd false");abort(); // 关闭程序}return efd;}void ReadEventfd(){uint64_t ret = 0;int n = read(_event_fd, &ret, sizeof(ret));if (n < 0){// EINTR -- 被信号打断;   EAGAIN -- 表示无数据可读if (errno == EINTR || errno == EAGAIN){return;}Log::ERROR("read eventfd false");abort();}return;}void WakeUpEventfd(){uint64_t ret = 1;int n = write(_event_fd, &ret, sizeof(ret));if (n < 0){// EINTR -- 被信号打断;   EAGAIN -- 表示无数据可读if (errno == EINTR || errno == EAGAIN){return;}Log::ERROR("write eventfd false");abort();}return;}public:EventLoop() : _thread_id(std::this_thread::get_id()),_event_fd(CreateEvent()),_event_channel(new Channel(this, _event_fd)),_timer_wheel(this){// 给 evetfd设置可读回调_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));// 启动读事件监控_event_channel->EnableRead();}// 事件监控->就绪事件处理->执行任务池中任务void Start(){while (1){std::vector<Channel *> actives;_poller.Poll(&actives);for (auto &channel : actives){channel->HandleEvent();}RunAllTask();}}// 判断当前线程是否是EventLoop对应的线程bool IsInloop() { return _thread_id == std::this_thread::get_id(); }void AssertInloop() { assert(_thread_id == std::this_thread::get_id()); }// 判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列。void RunInLoop(const Functor &cb){// 如果是eventloop线程内部直接执行if (IsInloop() == true){cb();return;}QueueInLoop(cb);}// 将操作压入任务池void QueueInLoop(const Functor &cb){// 涉及任务池的操作都要加锁{std::unique_lock<std::mutex> _lock(_mutex);_tasks.push_back(cb);}// 压入任务池的操作,代表任务池中存在数据,而start需要处理就绪事件// 如果没有就绪事件则导致epol_wait会堵塞// 所以我们需要通过给eventfd写入一个数据,eventfd就会触发可读事件WakeUpEventfd();}// 添加/修改描述符的事件监控void UpdateEvent(Channel *channel) { _poller.UpdateEvent(channel); }// 移除描述符的监控void RemoveEvent(Channel *channel) { _poller.RemoveEvent(channel); }// 定时器模块接口void TimerAdd(uint64_t id, uint32_t timeout, const TaskFunc &cb) { _timer_wheel.TimerAdd(id, timeout, cb); }void TimerRefresh(uint64_t id) { _timer_wheel.TimerRefresh(id); }void TimerCancel(uint64_t id) { _timer_wheel.TimerCancel(id); }bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }
};void Channel::Remove() { _loop->RemoveEvent(this); }
void Channel::Update() { _loop->UpdateEvent(this); }void TimerWheel::TimerAdd(uint64_t id, uint32_t timeout, const TaskFunc &cb)
{_loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, timeout, cb));
}// 刷新/延迟定时任务
void TimerWheel::TimerRefresh(uint64_t id)
{_loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));
}
void TimerWheel::TimerCancel(uint64_t id)
{_loop->RunInLoop(std::bind(&TimerWheel::TaskCancelInLoop, this, id));
}class LoopThread
{
private:/*互斥锁和条件变量用于实现_loop获取的同步关系,避免线程创建了,但是_loop还没有实例化之前去获取_loop*/std::mutex _mutex;             // 互斥锁std::condition_variable _cond; // 条件变量EventLoop *_loop;              // 由线程实例化一个对象std::thread _thread;           // Loop所在的线程
private:void ThreadEntry(){ /*这里创造一个对象而不是new,是为了保持_loop的生命周期与线程的一致*/EventLoop loop;{std::unique_lock<std::mutex> lock(_mutex);_loop = &loop;_cond.notify_all();}_loop->Start(); // 循坏}public:LoopThread() : _loop(nullptr), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}EventLoop *GetLoop(){EventLoop *loop = nullptr;{std::unique_lock<std::mutex> lock(_mutex);_cond.wait(lock, [&](){ return _loop != nullptr; });loop = _loop;}return loop;}
};class LoopThreadPool
{
private:int _thread_count;                  // 从属线程数量int _loops_idx;                     //_loops下标EventLoop *_base_loop;              // 主线程std::vector<LoopThread *> _threads; // 从属线程std::vector<EventLoop *> _loops;    // 从属线程的eventloop
public:LoopThreadPool(EventLoop *base_loop, int thread_count = 0) : _thread_count(thread_count), _loops_idx(0), _base_loop(base_loop) {}void SetThreadCount(int count) { _thread_count = count; }void CreateThread(){if (_thread_count > 0){_threads.resize(_thread_count);_loops.resize(_thread_count);for (int i = 0; i < _thread_count; i++){_threads[i] = new LoopThread();_loops[i] = _threads[i]->GetLoop();}}}EventLoop *NextLoop(){if (_thread_count == 0){return _base_loop;}EventLoop* loop = _loops[_loops_idx];_loops_idx = (_loops_idx + 1) % _thread_count;return loop;}
};

通用类型Any类

class Any
{
private:class holder{public:virtual ~holder() {}virtual const std::type_info &type() = 0;virtual holder *clone() = 0;};template <class T>class placeholder : public holder{public:T _val;public:placeholder(const T &val) : _val(val) {}// 获取对象保存的数据类型const std::type_info &type() override { return typeid(T); }holder *clone() override { return new placeholder(_val); }};private:holder *_content;public:Any() : _content(nullptr) {}template <class T>Any(const T &val) : _content(new placeholder<T>(val)) {}Any(const Any &other) : _content(other._content ? other._content->clone() : nullptr) {}~Any() { delete _content; }Any &swap(Any &other){std::swap(_content, other._content);return *this;}template <class T>Any &operator=(const T &val){// 构造一个临时对象Any(val).swap(*this);return *this;}Any &operator=(const Any &other){// 构造一个临时对象Any(other).swap(*this);return *this;}template <class T>T &GetContent(){assert(_content != nullptr);assert(typeid(T) == _content->type());auto p = dynamic_cast<placeholder<T> *>(_content);assert(p != nullptr);return p->_val;}
};

通信连接管理Connection

class Connection;
typedef enum
{DISCONNECTED, // 连接关闭状态CONNECTING,   // 连接建立成功-待处理状态CONNECTED,    // 连接建立完成,各种设置已完成,可以通信的状态DISCONNECTING // 待关闭状态
} ConnStatus;using PtrConnection = std::shared_ptr<Connection>;
// Connection 实现 单个套接字连接管理
class Connection : public std::enable_shared_from_this<Connection>
{using ConnectedCallback = std::function<void(const PtrConnection &)>;// 消息处理using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection &)>;using AnyEventCallback = std::function<void(const PtrConnection &)>;private:EventLoop *_loop;  // 连接锁管理的loopuint64_t _conn_id; // 连接的唯一ID,便于连接的管理和查找// uint64_t _timer_id;   //定时器ID,必须是唯一的,这块为了简化操作使用conn_id作为定时器IDint _sockfd;                   // 连接关联的文件描述符Socket _socket;                // 套接字操作管理Channel _channel;              // 连接的事件管理Buffer _in_buffer;             // 输入缓冲区---存放从socket中读取到的数据Buffer _out_buffer;            // 输出缓冲区---存放要发送给对端的数据ConnStatus _statu;             // 连接状态bool _enable_inactive_release; // 连接是否启动非活跃销毁的判断标志,默认为falseAny _context; // 请求的接收处理上下文ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;ClosedCallback _server_closed_callback;private:void HandleRead() // 描述符可读事件{char buffer[65536];// 1.接受socket套接字中的数据ssize_t ret = _socket.NonBlockRecv(buffer, 65535);if (ret < 0){ // recv出错ShutdownInLoop();return;}else if (ret == 0)return;// 将读到的数据放入缓冲区中_in_buffer.WriteAndPush(buffer, ret);if (_in_buffer.ReadAbleSize() > 0){ // shared_from_this--从当前对象自身获取自身的shared_ptr管理对象if (_message_callback)_message_callback(shared_from_this(), &_in_buffer);}}void HandleWrite() // 描述符可写事件{ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());if (ret < 0){ // send出错,即使发送失败、连接马上要释放,这部分已经读到的数据 还没被应用层处理。如果直接释放连接,这些数据就丢了。// 所以在释放前调用 _message_callback,可以让业务层先处理掉已经收到的数据,避免数据丢失。if (_in_buffer.ReadAbleSize() > 0){ // shared_from_this--从当前对象自身获取自身的shared_ptr管理对象if (_message_callback)_message_callback(shared_from_this(), &_in_buffer);}return Release();}else if (ret == 0)return;_out_buffer.MoveReadOffset(ret); // 千万不要忘了,将读偏移向后移动if (_out_buffer.ReadAbleSize() == 0){// 如果输出缓冲区没有可读数据,则关闭可写事件的监控_channel.DisableWrite();if (_statu == DISCONNECTING)Release();}return;}void HandleClose() // 描述符触发挂断事件{if (_in_buffer.ReadAbleSize() > 0){ // shared_from_this--从当前对象自身获取自身的shared_ptr管理对象if (_message_callback)_message_callback(shared_from_this(), &_in_buffer);}// 输出缓冲 _out_buffer 里的数据是 准备发给对端的。// 但既然挂断了,对端已经不再接收,所以发也没用,只会出错。Release();}// 描述符触发出错事件void HandleError() { HandleClose(); }// 描述符触发任意事件void HandleEvent(){// 刷新活跃度if (_enable_inactive_release == true){_loop->TimerRefresh(_conn_id);}if (_event_callback)_event_callback(shared_from_this());}// 非活跃的销毁任务void EnableInactiveReleaseInLoop(int sec){_enable_inactive_release = true;if (_loop->HasTimer(_conn_id)){_loop->TimerRefresh(_conn_id);return;}_loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this));}void CancelInactiveReleaseInLoop(){_enable_inactive_release = false;if (_loop->HasTimer(_conn_id)){_loop->TimerCancel(_conn_id);}}void EstablishedInLoop(){assert(_statu == CONNECTING); // 连接中_statu = CONNECTED;_channel.EnableRead();if (_connected_callback)_connected_callback(shared_from_this());//_statu = CONNECTED;}void SendInLoop(Buffer &buf){if (_statu == DISCONNECTED)return;_out_buffer.WriteBufferAndPush(buf);// 启动可写事件监控if (_channel.WriteAble() == false){_channel.EnableWrite();}}void ReleaseInLoop() // 实际的释放接口{_statu = DISCONNECTED;// 将channel 从loop的管理中移除_channel.Remove();_socket.Close();// loop中的定时任务器可能存在任务,我们要销毁非活跃任务if (_loop->HasTimer(_conn_id))CancelInactiveReleaseInLoop();// 组件使用者先关闭设置的回调函数if (_closed_callback)_closed_callback(shared_from_this());// 移除服务器内部管理的连接信息if (_server_closed_callback)_server_closed_callback(shared_from_this());}void ShutdownInLoop() // shutdown判断发送缓冲区中是否有数据,然后调用ReleaseInLoop{_statu = DISCONNECTING;if (_in_buffer.ReadAbleSize() > 0){if (_message_callback)_message_callback(shared_from_this(), &_in_buffer);}if (_out_buffer.ReadAbleSize() > 0){if (_channel.WriteAble() == false)_channel.EnableWrite();}if (_out_buffer.ReadAbleSize() == 0){Release();}}void UpgradeInLoop(const Any &context, const ConnectedCallback &conn,const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event){_context = context;_connected_callback = conn;_message_callback = msg;_closed_callback = closed;_event_callback = event;}public:Connection(EventLoop *loop, uint64_t conn_id, int sockfd) : _loop(loop), _conn_id(conn_id), _sockfd(sockfd), _socket(_sockfd),_channel(_loop, _sockfd), _statu(CONNECTING), _enable_inactive_release(false){_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));// 一旦启动读事件监控就有可能会立即触发读事件,如果这时候启动了非活跃连接销毁就会产生问题}~Connection() { Log::DEBUG("realse Connection:%p", this); }// 获取连接iduint64_t GetConnid() { return _conn_id; }// 获取sockfdint GetSockfd() { return _sockfd; }// 获取上下文Any *GetContext() { return &_context; }// 是否Connected状态bool IsConnected() { return _statu == CONNECTED; }/*这四个回调函数,是让服务器模块来设置的(即组件的使用者设置)*/void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }void SetMessageedCallback(const MessageCallback &cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }void SetSrvClosedCallback(const ClosedCallback &cb) { _server_closed_callback = cb; }// 设置上下文,连接处理后会调用void SetContext(const Any &context) { _context = context; }// 启动非活跃销毁void EnableInactiveRelease(int sec){_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));}// 关闭非活跃销毁void CancelInactiveRelease(){_loop->RunInLoop(std::bind(&Connection::CancelInactiveRelease, this));}// 启动监控调用 _connected_callbackvoid Established(){_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));}// 发送数据到缓冲区void Send(const char *data, size_t len){Buffer buf;buf.WriteAndPush(data, len);// 这个操作是压入任务池,可能调用的时候临时变量已经释放,所以需要创造一个buff保存数据_loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf)));}// 关闭连接 提供给组件使用者-在关闭前需要判断发送缓冲区中是否有数据void Shutdown(){_loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this));}void Release(){_loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop, this));}// 切换协议,设置相关的回调函数void Upgrade(const Any &context, const ConnectedCallback &conn,const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event){// 该接口必须在本线程中执行// 防备新的事件触发后,处理的时候,切换任务还没有被执行--会导致数据使用原协议处理了。_loop->AssertInloop();_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event));}
};

监听描述符管理Acceptor

class Acceptor
{using AcceptCallback = std::function<void(int)>;private:Socket _lis_sock; // 监听套接字EventLoop *_loop;Channel _channel;AcceptCallback _accept_callback;private:void HandleRead(){int newfd = _lis_sock.Accept();if (newfd < 0)return;if (_accept_callback)_accept_callback(newfd);}// 创造监听套接字int CreateServer(int port){bool ret = _lis_sock.CreateServer(port);assert(ret == true);return _lis_sock.GetFd();}public:Acceptor(EventLoop *loop, int port) : _lis_sock(CreateServer(port)), _loop(loop), _channel(_loop, _lis_sock.GetFd()){_channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));/*不能将启动读事件监控,放到构造函数中,必须在设置回调函数后,再去启动*//*否则有可能造成启动监控后,立即有事件,处理的时候,回调函数还没设置:新连接得不到处理,且资源泄漏*/}void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; }void SetLisEnableRead() { _channel.EnableRead(); }
};

服务器TcpServer

class TcpServer
{using ConnectedCallback = std::function<void(const PtrConnection &)>;// 消息处理using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection &)>;using AnyEventCallback = std::function<void(const PtrConnection &)>;private:int _port;                                          // 端口号uint64_t _next_id;                                  // 自增长的连接idint _timeout;                                       // 非活跃连接时间bool _enable_inactive_release;                      // 非活跃销毁启动标志FEventLoop _base_loop;                               // 主线程的Eventloop对象,负责监听处理Acceptor _acceptor;                                 // 这是监听套接字LoopThreadPool _thread_poll;                        // 从属线程池std::unordered_map<uint64_t, PtrConnection> _conns; // 保存管理所有连接对应的shared_ptr对象ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;private:void NewConnection(int newfd) // 为新连接构造一个Connection进行管理{_next_id++;PtrConnection conn(new Connection(_thread_poll.NextLoop(), _next_id, newfd));conn->SetConnectedCallback(_connected_callback);conn->SetMessageedCallback(_message_callback);conn->SetClosedCallback(_closed_callback);conn->SetAnyEventCallback(_event_callback);conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));if (_enable_inactive_release == true)conn->EnableInactiveRelease(10);conn->Established();_conns.insert(std::make_pair(_next_id, conn));}void RemoveConnectionInLoop(const PtrConnection &conn){uint64_t id = conn->GetConnid();auto it = _conns.find(id);if (it != _conns.end()){_conns.erase(it);}}void RemoveConnection(const PtrConnection &conn) // 删除_conns中保存的对象,这里删除才是真正的删除{_base_loop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));}void TaskRunAfterInLoop(const Functor &task, int delay){_next_id++;_base_loop.TimerAdd(_next_id, delay, task);}public:TcpServer(int port) : _port(port), _next_id(0), _enable_inactive_release(false),_acceptor(&_base_loop, _port), _thread_poll(&_base_loop){_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));_acceptor.SetLisEnableRead();}void SetThreadCount(int count) { _thread_poll.SetThreadCount(count); }void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }void SetMessageedCallback(const MessageCallback &cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }void EnableInactiveRelease(int timeout){_timeout = timeout;_enable_inactive_release = true;}// 用于添加定时任务void TaskRunAfter(const Functor &task, int delay){_base_loop.RunInLoop(std::bind(&TcpServer::TaskRunAfterInLoop, this, task, delay));}void Start(){_thread_poll.CreateThread();_base_loop.Start();}
};class NetWork
{
public:NetWork(){Log::DEBUG("SIGPIPE INIT");signal(SIGPIPE, SIG_IGN);}
};
static NetWork nw;

二、HTTP协议模块

Util实用工具类

std::unordered_map<int, std::string> _status_msg = {{100, "Continue"},{101, "Switching Protocol"},{102, "Processing"},{103, "Early Hints"},{200, "OK"},{201, "Created"},{202, "Accepted"},{203, "Non-Authoritative Information"},{204, "No Content"},{205, "Reset Content"},{206, "Partial Content"},{207, "Multi-Status"},{208, "Already Reported"},{226, "IM Used"},{300, "Multiple Choice"},{301, "Moved Permanently"},{302, "Found"},{303, "See Other"},{304, "Not Modified"},{305, "Use Proxy"},{306, "unused"},{307, "Temporary Redirect"},{308, "Permanent Redirect"},{400, "Bad Request"},{401, "Unauthorized"},{402, "Payment Required"},{403, "Forbidden"},{404, "Not Found"},{405, "Method Not Allowed"},{406, "Not Acceptable"},{407, "Proxy Authentication Required"},{408, "Request Timeout"},{409, "Conflict"},{410, "Gone"},{411, "Length Required"},{412, "Precondition Failed"},{413, "Payload Too Large"},{414, "URI Too Long"},{415, "Unsupported Media Type"},{416, "Range Not Satisfiable"},{417, "Expectation Failed"},{418, "I'm a teapot"},{421, "Misdirected Request"},{422, "Unprocessable Entity"},{423, "Locked"},{424, "Failed Dependency"},{425, "Too Early"},{426, "Upgrade Required"},{428, "Precondition Required"},{429, "Too Many Requests"},{431, "Request Header Fields Too Large"},{451, "Unavailable For Legal Reasons"},{501, "Not Implemented"},{502, "Bad Gateway"},{503, "Service Unavailable"},{504, "Gateway Timeout"},{505, "HTTP Version Not Supported"},{506, "Variant Also Negotiates"},{507, "Insufficient Storage"},{508, "Loop Detected"},{510, "Not Extended"},{511, "Network Authentication Required"}};std::unordered_map<std::string, std::string> _mime_msg = {{".aac", "audio/aac"},{".abw", "application/x-abiword"},{".arc", "application/x-freearc"},{".avi", "video/x-msvideo"},{".azw", "application/vnd.amazon.ebook"},{".bin", "application/octet-stream"},{".bmp", "image/bmp"},{".bz", "application/x-bzip"},{".bz2", "application/x-bzip2"},{".csh", "application/x-csh"},{".css", "text/css"},{".csv", "text/csv"},{".doc", "application/msword"},{".docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"},{".eot", "application/vnd.ms-fontobject"},{".epub", "application/epub+zip"},{".gif", "image/gif"},{".htm", "text/html"},{".html", "text/html"},{".ico", "image/vnd.microsoft.icon"},{".ics", "text/calendar"},{".jar", "application/java-archive"},{".jpeg", "image/jpeg"},{".jpg", "image/jpeg"},{".js", "text/javascript"},{".json", "application/json"},{".jsonld", "application/ld+json"},{".mid", "audio/midi"},{".midi", "audio/x-midi"},{".mjs", "text/javascript"},{".mp3", "audio/mpeg"},{".mpeg", "video/mpeg"},{".mpkg", "application/vnd.apple.installer+xml"},{".odp", "application/vnd.oasis.opendocument.presentation"},{".ods", "application/vnd.oasis.opendocument.spreadsheet"},{".odt", "application/vnd.oasis.opendocument.text"},{".oga", "audio/ogg"},{".ogv", "video/ogg"},{".ogx", "application/ogg"},{".otf", "font/otf"},{".png", "image/png"},{".pdf", "application/pdf"},{".ppt", "application/vnd.ms-powerpoint"},{".pptx", "application/vnd.openxmlformats-officedocument.presentationml.presentation"},{".rar", "application/x-rar-compressed"},{".rtf", "application/rtf"},{".sh", "application/x-sh"},{".svg", "image/svg+xml"},{".swf", "application/x-shockwave-flash"},{".tar", "application/x-tar"},{".tif", "image/tiff"},{".tiff", "image/tiff"},{".ttf", "font/ttf"},{".txt", "text/plain"},{".vsd", "application/vnd.visio"},{".wav", "audio/wav"},{".weba", "audio/webm"},{".webm", "video/webm"},{".webp", "image/webp"},{".woff", "font/woff"},{".woff2", "font/woff2"},{".xhtml", "application/xhtml+xml"},{".xls", "application/vnd.ms-excel"},{".xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"},{".xml", "application/xml"},{".xul", "application/vnd.mozilla.xul+xml"},{".zip", "application/zip"},{".3gp", "video/3gpp"},{".3g2", "video/3gpp2"},{".7z", "application/x-7z-compressed"}};class Util
{
public:// 字符串分割static size_t Split(const std::string &str, const std::string &seq, std::vector<std::string> *arry){int offset = 0;while (offset < str.size()){// abc....bcd.aaa.size_t pos = str.find(seq, offset);if (pos == std::string::npos) // 如果没找到则表示要截取最后一个字符串{arry->push_back(str.substr(offset));return arry->size();}// 找到则pos指向【.】的下标if (pos == offset) // 代表存在多个【.】,截取的字符没有任何意义{offset = pos + seq.size();continue;}arry->push_back(str.substr(offset, pos - offset));offset = pos + seq.size();}return arry->size();}// 读取字符串内容static bool ReadFile(const std::string &filename, std::string *buf){std::ifstream ifs(filename, std::ios::binary);if (ifs.is_open() == false){Log::ERROR("open %s false", filename.c_str());return false;}size_t fsize = 0;ifs.seekg(0, ifs.end); // 跳转到文件末尾fsize = ifs.tellg();   // 获取当前读写位置相对于起始位置的偏移量,从末尾偏移刚好就是文件大小ifs.seekg(0, ifs.beg); // 跳转到文件起始位置buf->resize(fsize);ifs.read(&(*buf)[0], fsize);if (ifs.good() == false){Log::ERROR("read %s false", filename.c_str());ifs.close();return false;}ifs.close();return true;}// 向文件写入数据static bool WriteFile(const std::string &filename, const std::string &buf){std::ofstream ofs(filename, std::ios::binary | std::ios::trunc); // 在打开文件时,把已有内容全部清空(truncate to zero length)if (ofs.is_open() == false){Log::ERROR("open %s false", filename.c_str());return false;}ofs.write(buf.c_str(), buf.size());if (ofs.good() == false){Log::ERROR("read %s false", filename.c_str());ofs.close();return false;}ofs.close();return true;}// URL 编码static std::string UrlEncode(const std::string &url, bool convert_space_to_plus){std::string res;for (unsigned char c : url){if (c == '.' || c == '-' || c == '_' || c == '~' || isalnum(c)){res += c;}else if (c == ' ' && convert_space_to_plus){res += '+';}else{char tmp[4] = {0};snprintf(tmp, 4, "%%%02X", c);res += tmp;}}return res;}// %2B == 43// 只支持 '0'-'9' 和 'A'-'F'static int CtoInt(char c){if (c >= '0' && c <= '9')return c - '0';if (c >= 'A' && c <= 'F')return c - 'A' + 10;// 非法字符直接返回 -1(也可以选择抛异常或忽略)return -1;}// URL 解码static std::string UrlDecode(const std::string &url, bool convert_plus_to_space){std::string res;for (int i = 0; i < url.size(); i++){if (url[i] == '%'){if (i + 2 < url.size()){int c1 = CtoInt(url[i + 1]);int c2 = CtoInt(url[i + 2]);if (c1 != -1 && c2 != -1){res += static_cast<char>(c1 * 16 + c2);i += 2;continue;}}// 如果不是有效的 %xx,直接保留 %res += '%';}else if (url[i] == '+' && convert_plus_to_space){res += ' ';}elseres += url[i];}return res;}// 响应状态码的描述信息获取static std::string StatusDesc(int status){auto it = _status_msg.find(status);if (it != _status_msg.end()){return it->second;}return "Unknow status";}// 根据文件后缀名获取文件minestatic std::string GetMimeTypeFromExtension(const std::string &filename){// 获取文件扩展名 a.txtsize_t pos = filename.find_last_of('.');if (pos == std::string::npos){return "application/octet-stream";}std::string ext = filename.substr(pos);// 通过扩展名获取mimeauto it = _mime_msg.find(ext);if (it != _mime_msg.end()){return it->second;}return "application/octet-stream";}// 判断一个文件是否是目录static bool IsDirectory(const std::string &filename){struct stat st;int ret = stat(filename.c_str(), &st);if (ret < 0){return false;}return S_ISDIR(st.st_mode);}// 判断一个文件是否是普通文件static bool IsRegularFile(const std::string &filename){struct stat st;int ret = stat(filename.c_str(), &st);if (ret < 0){return false;}return S_ISREG(st.st_mode);}// http请求路径是否有效static bool ValidPath(const std::string &path){std::vector<std::string> sub;Split(path, "/", &sub);int level = 0;for (auto &s : sub){if (s == ".."){level--;if (level < 0)return false;continue;}if (s != ".")level++;}return true;}
};

HttpRequest请求类

class HttpRequest
{
public:std::string _method;                                   // 请求方法std::string _path;                                     // 资源路径std::string _version;                                  // 协议版本std::string _body;                                     // 请求正文std::smatch _matches;                                  // 资源路径的正则提取数据std::unordered_map<std::string, std::string> _headers; // http头部字段std::unordered_map<std::string, std::string> _params;  // 查询字符串
public:HttpRequest() : _version("HTTP/1.1") {}void ReSet(){_method.clear();_path.clear();_version = "HTTP/1.1";_body.clear();std::smatch matches;_matches.swap(matches);_headers.clear();_params.clear();}// 插入头部字段void SetHeader(const std::string &key, const std::string &val){_headers.insert(std::make_pair(key, val));}// 判断是否存在指定的头部字段bool HasHeader(const std::string &key) const{auto it = _headers.find(key);if (it == _headers.end()){return false;}return true;}// 获取指定头部字段的值std::string GetHeader(const std::string &key) const{auto it = _headers.find(key);if (it == _headers.end()){return "";}return it->second;}// 插入查询字符串void SetParam(const std::string &key, const std::string &val){_params.insert(std::make_pair(key, val));}// 判断是否有指定的查询字符串bool HasParam(const std::string &key){auto it = _params.find(key);if (it == _params.end()){return false;}return true;}// 获取指定的查询字符串std::string GetParam(const std::string &key) const{auto it = _params.find(key);if (it == _params.end()){return "";}return it->second;}// 获取正文长度size_t ContentLength() const{if (HasHeader("Content-Length") == false){return 0;}std::string content = GetHeader("Content-Length");return std::stol(content);}// 判断是否是短连接bool Close() const{if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive"){return false;}//Log::DEBUG("长连接");return true;}
};

HttpResponse响应类

class HttpResponse
{
public:int _status;                                           // 状态码bool _redirect_flag;                                   // 重定向标志std::string _redirect_url;                             // 重定向路径std::unordered_map<std::string, std::string> _headers; // http头部字段std::string _body;                                     // 回复正文
public:HttpResponse() : _status(200), _redirect_flag(false) {}HttpResponse(int status) : _status(status), _redirect_flag(false) {}void ReSet(){_status = 200;_redirect_flag = false;_redirect_url.clear();_headers.clear();_body.clear();}// 插入头部字段void SetHeader(const std::string &key, const std::string &val){// std::cout << "插入的键值对是: " << key << ": " << val << std::endl;_headers.insert(std::make_pair(key, val));}// 判断是否存在指定的头部字段bool HasHeader(const std::string &key){auto it = _headers.find(key);if (it == _headers.end()){return false;}return true;}// 获取指定头部字段的值std::string GetHeader(const std::string &key){auto it = _headers.find(key);if (it == _headers.end()){return "";}return it->second;}void SetContent(const std::string &body, const std::string &type = "text/html"){_body = body;SetHeader("Content-Type", type);}void SetRedirect(const std::string &url, int status = 302){_status = status;_redirect_flag = true;_redirect_url = url;}// 判断是否是短连接bool Close(){if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive"){return false;}return true;}
};

HttpContext上下文类

typedef enum
{RECV_HTTP_ERROR,RECV_HTTP_LINE,RECV_HTTP_HEAD,RECV_HTTP_BODY,RECV_HTTP_OVER
} HttpRecvStatus;#define MAX_LINE 8192
class HttpContext
{
private:int _resp_status;            // 响应状态码HttpRecvStatus _recv_status; // 当前接收及解析的阶段状态HttpRequest _request;        // 已经解析得到的请求信息
private:bool ParseHttpLine(const std::string &line){std::smatch matches;std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r\n)?", std::regex::icase); // icase忽略大小写bool ret = std::regex_match(line, matches, e);if (ret == false){_recv_status = RECV_HTTP_ERROR;_resp_status = 400; // BAD REQUESTreturn false;}// 0 : GET /xxx/login?user=xiaoming&pass=123123 HTTP/1.1// 1 : GET// 2 : /xxx/login// 3 : user=xiaoming&pass=123123// 4 : HTTP/1.1// 请求方法的获取_request._method = matches[1];/*由于忽略了大小写,这里我们得到的方法可能是小写,所以需要转换成大写*/std::transform(_request._method.begin(), _request._method.end(), _request._method.begin(), ::toupper);// 资源路径的获取,需要进行URL解码操作,但是不需要+转空格_request._path = Util::UrlDecode(matches[2], false);// 协议版本获取_request._version = matches[4];// 参数的字符串的获取std::string query_string = Util::UrlDecode(matches[3], true);std::vector<std::string> query_arry;Util::Split(query_string, "&", &query_arry);for (auto &str : query_arry){size_t pos = str.find("="); // user=xiaominif (pos == std::string::npos){_recv_status = RECV_HTTP_ERROR;_resp_status = 400; // BAD REQUESTreturn false;}std::string key = Util::UrlDecode(str.substr(0, pos), true);std::string value = Util::UrlDecode(str.substr(pos + 1), true);_request.SetParam(key, value);}return true;}bool RecvHttpLine(Buffer *buf){if (_recv_status != RECV_HTTP_LINE)return false;std::string line = buf->GetLineAndPop();if (line.size() == 0) // 1.读取到的数据没有一行{if (buf->ReadAbleSize() > MAX_LINE) // 缓冲区的数据大于 MAX_LINE{_recv_status = RECV_HTTP_ERROR;_resp_status = 414; // URL TOO LONGreturn false;}// 缓冲区中数据不足一行,但是也不多,就等等新数据的到来return true;}if (line.size() > MAX_LINE) // 请求行的数据过大,存在异常{_recv_status = RECV_HTTP_ERROR;_resp_status = 414; // URL TOO LONGreturn false;}bool ret = ParseHttpLine(line);if (ret == false){Log::DEBUG("ParseHttpLine error");return false;}//Log::DEBUG("_recv_status 改变为RECV_HTTP_HEAD");// 处理完调整阶段状态_recv_status = RECV_HTTP_HEAD;return true;}// 头部处理bool ParseHttpHead(std::string &line){// key: val\r\nif (line.back() == '\n')line.pop_back(); // 末尾是换行则去掉换行字符if (line.back() == '\r')line.pop_back(); // 末尾是回车则去掉回车字符size_t pos = line.find(": ");if (pos == std::string::npos){_recv_status = RECV_HTTP_ERROR;_resp_status = 400; // BAD REQUESTreturn false;}std::string key = line.substr(0, pos);std::string value = line.substr(pos + 2);_request.SetHeader(key, value);//Log::DEBUG("key: %s, value:%s", key.c_str(), value.c_str());return true;}bool RecvHttpHead(Buffer *buf){int a = 0;if (_recv_status != RECV_HTTP_HEAD){Log::DEBUG("recv_status != RECV_HTTP_HEAD");return false;}while (1){std::string line = buf->GetLineAndPop();//Log::DEBUG("头部的数据是: %s", line.c_str());if (line.size() == 0) // 1.读取到的数据没有一行{if (buf->ReadAbleSize() > MAX_LINE) // 缓冲区的数据大于 MAX_LINE{_recv_status = RECV_HTTP_ERROR;_resp_status = 414; // URL TOO LONGreturn false;}// 缓冲区中数据不足一行,但是也不多,就等等新数据的到来return true;}if (line.size() > MAX_LINE) // 请求行的数据过大,存在异常{_recv_status = RECV_HTTP_ERROR;_resp_status = 414; // URL TOO LONGreturn false;}// 在头部处理中最后一个是\r\n 或者\nif (line == "\n" || line == "\r\n"){//Log::DEBUG("解析完毕");break;}bool ret = ParseHttpHead(line);if (ret == false){return false;}}_recv_status = RECV_HTTP_BODY;return true;}bool RecvHttpBody(Buffer *buf){if (_recv_status != RECV_HTTP_BODY)return false;// 1.获取正文长度size_t content_length = _request.ContentLength();//   std::cout << "获得的正文长度是: " << content_length << std::endl;if (content_length == 0){// 没有正文,则请求接收解析完毕_recv_status = RECV_HTTP_OVER;return true;}// 2. 缓冲区中数据,包含了当前请求的所有正文,则取出所需的数据size_t real_len = content_length - _request._body.size();// 3.1判断当前缓冲区中的数据是否>= real_lenif (buf->ReadAbleSize() >= real_len){_request._body.append(buf->ReadPosition(), real_len);buf->MoveReadOffset(real_len);_recv_status = RECV_HTTP_OVER;return true;}// 3.2缓冲区中数据,无法满足当前正文的需要,数据不足,取出数据,然后等待新数据到来_request._body.append(buf->ReadPosition(), buf->ReadAbleSize());buf->MoveReadOffset(buf->ReadAbleSize());// 这里不能改变状态,因为数据接收并没有完成return true;}public:HttpContext() : _resp_status(200), _recv_status(RECV_HTTP_LINE) {}void ReSet(){_resp_status = 200;_recv_status = RECV_HTTP_LINE;_request.ReSet();}int GetRespStatus() { return _resp_status; }HttpRecvStatus GerRecvStatus() { return _recv_status; }HttpRequest &GetRequest() { return _request; }void ParseHttpRequest(Buffer *buf) // 接收并解析http请求{// 不同的状态,做不同的事情,但是这里不要break, 因为处理完请求行后,应该立即处理头部,而不是退出等新数据//_recv_status标志位可以控制处理Recvhttp的阶段!!!switch (_recv_status){case RECV_HTTP_LINE:RecvHttpLine(buf);case RECV_HTTP_HEAD://Log::DEBUG("开始解析头部");RecvHttpHead(buf);case RECV_HTTP_BODY:RecvHttpBody(buf);}return;}
};

HttpServer

class HttpServer
{using Handler = std::function<void(const HttpRequest &, HttpResponse *)>;using Handlers = std::vector<std::pair<std::regex, Handler>>;private:Handlers _get_route;Handlers _post_route;Handlers _put_route;Handlers _delete_route;std::string _basedir; // 静态资源根目录TcpServer _server;private:// 错误页面设置void ErrorHandler(const HttpRequest &req, HttpResponse *rsp){//Log::DEBUG("错误页面");// 1. 组织一个错误展示页面std::string body;body += "<html>";body += "<head>";body += "<meta http-equiv='Content-Type' content='text/html;charset=utf-8'>";body += "</head>";body += "<body>";body += "<h1>";body += std::to_string(rsp->_status);body += " ";body += Util::StatusDesc(rsp->_status);body += "</h1>";body += "</body>";body += "</html>";// 2. 将页面数据,当作响应正文,放入rsp中rsp->SetContent(body, "text/html");}// 将HttpResponse的要素按照http协议格式进行组织并且发送出去void WriteResponse(const PtrConnection &conn, const HttpRequest &req, HttpResponse &rsp){// 完善头部if (req.Close() == true){rsp.SetHeader("Connection", "close");}else{rsp.SetHeader("Connection", "keep-alive");}if (rsp._body.empty() == false && rsp.HasHeader("Content-Length") == false){rsp.SetHeader("Content-Length", std::to_string(rsp._body.size()));}if (rsp._body.empty() == false && rsp.HasHeader("Content-Type") == false){rsp.SetHeader("Content-Type", "application/octet-stream"); // 默认二进制}if (rsp._redirect_flag == true){rsp.SetHeader("Location", rsp._redirect_url);}// 按照http格式进行组织std::stringstream rsp_str;rsp_str << req._version << " " << std::to_string(rsp._status) << " " << Util::StatusDesc(rsp._status) << "\r\n";for (auto &head : rsp._headers){rsp_str << head.first << ": " << head.second << "\r\n";}rsp_str << "\r\n";rsp_str << rsp._body;// 3.发送数据conn->Send(rsp_str.str().c_str(), rsp_str.str().size());}// 静态资源的请求处理bool IsFileHandler(const HttpRequest &req){// 静态根目录必须设置if (_basedir.empty() == true){return false;}// 请求方法 必须是GER 或者 HEADif (req._method != "GET" && req._method != "HEAD"){return false;}// 请求路径必须合法if (Util::ValidPath(req._path) == false){return false;}// 请求资源必须存在,且是普通文件/*这里存在一个特殊情况, 当访问的是 "/" 的时候 我们要返回一个默认页面*/// 为了避免直接修改请求的资源路径,因此定义一个临时对象,如果是功能性请求我们修改路径会造成错误std::string req_path = _basedir + req._path;if (req_path.back() == '/'){req_path += "index.html";}if (Util::IsRegularFile(req_path) == false){return false;}return true;}void FileHandler(const HttpRequest &req, HttpResponse *rsp){std::string req_path = _basedir + req._path;if (req._path.back() == '/'){req_path += "index.html";}// 读取文件内容bool ret = Util::ReadFile(req_path, &rsp->_body);if (ret == false){return;}// 设置Content-Typestd::string mime = Util::GetMimeTypeFromExtension(req_path);rsp->SetHeader("Content-Type", mime);}// 功能性请求的分类处理void Dispatcher(HttpRequest &req, HttpResponse *rsp, Handlers &handlers){// 在对应请求方法的路由表中,查找是否含有对应资源请求的处理函数,有则调用,没有则发挥404// 思想:路由表存储的时键值对 -- 正则表达式 & 处理函数// 使用正则表达式,对请求的资源路径进行正则匹配,匹配成功就使用对应函数进行处理for (auto &handler : handlers){const std::regex &re = handler.first;const Handler &functor = handler.second;bool ret = std::regex_match(req._path, req._matches, re);if (ret == false){continue;}functor(req, rsp); // 存在匹配的处理函数,则执行return;}// 没有匹配的函数rsp->_status = 404;}// 通过判断方法,来反馈一个对应的内容void Route(HttpRequest &req, HttpResponse *rsp){// 加上重定向逻辑if (req._path == "/oldpage"){rsp->SetRedirect("/newpage"); // 默认302return;}if (req._path == "/newpage"){std::string body = "<html><body><h1>Welcome to the new page!</h1></body></html>";rsp->SetContent(body, "text/html");return;}// 判断是否是静态资源if (IsFileHandler(req) == true){FileHandler(req, rsp);return;}if (req._method == "GET" || req._method == "HEAD"){return Dispatcher(req, rsp, _get_route);}else if (req._method == "POST"){return Dispatcher(req, rsp, _post_route);}else if (req._method == "PUT"){return Dispatcher(req, rsp, _put_route);}else if (req._method == "DELETE"){return Dispatcher(req, rsp, _delete_route);}rsp->_status = 405; // Method Not Allowedreturn;}// 设置上下文void OnConnected(const PtrConnection &conn){conn->SetContext(HttpContext());Log::DEBUG("NEW CONNECTION %p", conn.get());}// 缓冲区数据解析+处理void OnMessage(const PtrConnection &conn, Buffer *buffer){while (buffer->ReadAbleSize() > 0){// 1. 获取上下文HttpContext& context = conn->GetContext()->GetContent<HttpContext>(); // 这里是引用// 2. 通过上下文对缓冲区数据进行解析,得到HttpRequest对象//   1. 如果缓冲区的数据解析出错,就直接回复出错响应//   2. 如果解析正常,且请求已经获取完毕,才开始去进行处理context.ParseHttpRequest(buffer);HttpRequest &req = context.GetRequest();HttpResponse rsp(context.GetRespStatus());if (context.GetRespStatus() >= 400){// 进行错误响应,关闭连接ErrorHandler(req, &rsp);       // 填充一个错误显示页面数据到rsp的_body中WriteResponse(conn, req, rsp); // 将错误信息发送给用户context.ReSet();buffer->MoveReadOffset(buffer->ReadAbleSize()); // 清空缓冲区的数据conn->Shutdown();return;}if (context.GerRecvStatus() != RECV_HTTP_OVER){ // 当前请求还没有接收完整,则退出,等新数据到来再重新继续处理return;}// 3. 请求路由 + 业务处理,将处理后的数据放入到rsp._body中去Route(req, &rsp);// 4. 对HttpResponse进行组织发送WriteResponse(conn, req, rsp);// 5. 重置上下文context.ReSet();// 6. 根据长短连接判断是否关闭连接或者继续处理if (rsp.Close() == true)conn->Shutdown();}}public:HttpServer(int port, int timeout = 10) : _server(port){_server.EnableInactiveRelease(timeout);_server.SetConnectedCallback(std::bind(&HttpServer::OnConnected, this, std::placeholders::_1));_server.SetMessageedCallback(std::bind(&HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));}void SetBaseDir(const std::string &path) { _basedir = path; }// Get等函数的作用是加入到映射中:pattern实际上是正则表达式void Get(const std::string &pattern, const Handler &handler) { _get_route.push_back(std::make_pair(std::regex(pattern), handler)); }void Post(const std::string &pattern, const Handler &handler) { _post_route.push_back(std::make_pair(std::regex(pattern), handler)); }void Put(const std::string &pattern, const Handler &handler) { _put_route.push_back(std::make_pair(std::regex(pattern), handler)); }void Delete(const std::string &pattern, const Handler &handler) { _delete_route.push_back(std::make_pair(std::regex(pattern), handler)); }void SetThreadCount(int count) { _server.SetThreadCount(count); }void Start() { _server.Start(); }
};

基于HttpServer搭建HTTP服务器

#include "http.hpp"std::string RequestToStr(const HttpRequest &req)
{std::stringstream ss;ss << req._method << " " << req._path << " " << req._version << "\r\n";for (auto &it : req._params){ss << it.first << ": " << it.second << "\r\n";}for (auto &it : req._headers){ss << it.first << ": " << it.second << "\r\n";}ss << "\r\n";ss << req._body;return ss.str();
}void Hello(const HttpRequest &req, HttpResponse *rsp)
{rsp->SetContent(RequestToStr(req), "text/plain");//sleep(15); //测试client4 
}
void Login(const HttpRequest &req, HttpResponse *rsp)
{rsp->SetContent(RequestToStr(req), "text/plain");
}
void PutFile(const HttpRequest &req, HttpResponse *rsp)
{std::string pathname = "./wwwroot" + req._path;Util::WriteFile(pathname, req._body);
}
void DelFile(const HttpRequest &req, HttpResponse *rsp)
{rsp->SetContent(RequestToStr(req), "text/plain");
}int main()
{Log::setDefaultLoggerLevel(Log::LogLevel::value::ERROR);HttpServer server(8080);server.SetThreadCount(2);server.SetBaseDir("./wwwroot");//这里的 Hello 是一个函数指针,它在传参时会转换成一个 右值(临时对象)server.Get("/hello", Hello);server.Post("/login", Login);server.Put("/1234.txt", PutFile);server.Delete("/1234.txt", DelFile);server.Start();return 0;
}

三、功能测试

使用浏览器进行基本功能测试

在这里插入图片描述


长连接连续请求测试

创建一个客户端持续给服务器发送数据,直到超过超时时间看看是否正常(当前默
认设置超时时间为10s)。
预期结果:连接不会释放,持续发送消息

#include "../server.hpp"int main()
{Socket cli;cli.CreateClient(8080, "127.0.0.1");std::string s = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";while(1){cli.Send(s.c_str(), s.size());char buff[1024] = {0};cli.Recv(buff, 1023);Log::DEBUG("[%s]", buff);sleep(3);}return 0;
}

输出:

[xrw@iZ7xv0vjzfc2mzsasssfooZ test]$ ./client 
[16:57:43][140259617371968][DEBUG][default][../server.hpp:1440] SIGPIPE INIT
[16:57:43][140259617371968][INFO][default][../server.hpp:353] create client success
[16:57:43][140259617371968][DEBUG][default][client1.cc:15] [HTTP/1.1 200 OK
Content-Length: 66
Connection: keep-alive
Content-Type: text/plainGET /hello HTTP/1.1
Content-Length: 0
Connection: keep-alive][16:58:01][140259617371968][DEBUG][default][client1.cc:15] [HTTP/1.1 200 OK
Content-Length: 66
Connection: keep-alive
Content-Type: text/plainGET /hello HTTP/1.1
Content-Length: 0
Connection: keep-alive]

可以看到16:57:43的时候连接建立,并接收请求,直到10s后连接依然存在。


超时连接测试1

创建一个客户端,给服务器发送一次数据后,不动了,查看服务器是否会正常的超时关闭连接

#include "../server.hpp"int main()
{Socket cli;cli.CreateClient(8080, "127.0.0.1");std::string s = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";while(1){cli.Send(s.c_str(), s.size());char buff[1024] = {0};cli.Recv(buff, 1023);Log::DEBUG("[%s]", buff);sleep(20);}return 0;
}

结果:

[17:03:06][140131753428736][DEBUG][default][http.hpp:970] NEW CONNECTION 0x146d5a0
[17:03:16][140131770832704][DEBUG][default][../server.hpp:1234] realse Connection:0x146d5a0
可以看到10s后连接关闭

超时连接释放测试2

连接服务器,并告诉服务器要发送 100 字节的正文数据,但实际发送的数据不足 100 字节,然后观察服务器的处理情况。

预期结果:
服务器第一次接收请求时由于数据不完整,可能会将后续请求的数据误认为是本次请求的正文。在处理剩余数据时可能出现错误,并最终关闭连接。

#include "../server.hpp"int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 100\r\n\r\nxxxxxxx";while(1) {// assert(cli_sock.Send(req.c_str(), req.size()) != -1);// assert(cli_sock.Send(req.c_str(), req.size()) != -1);// assert(cli_sock.Send(req.c_str(), req.size()) != -1);cli_sock.Send(req.c_str(), req.size());cli_sock.Send(req.c_str(), req.size());cli_sock.Send(req.c_str(), req.size());char buf[1024] = {0};assert(cli_sock.Recv(buf, 1023));  //堵塞等待数据Log::DEBUG("[%s]", buf);sleep(3);}cli_sock.Close();return 0;
}

结果:

[17:10:46][140088962811712][DEBUG][default][../server.hpp:1440] SIGPIPE INIT
[17:10:46][140088962811712][INFO][default][../server.hpp:353] create client success
[17:10:46][140088962811712][DEBUG][default][client3.cc:24] [HTTP/1.1 200 OK
Content-Length: 168
Connection: keep-alive
Content-Type: text/plainGET /hello HTTP/1.1
Content-Length: 100
Connection: keep-alivexxxxxxxGET /hello HTTP/1.1
Connection: keep-alive
Content-Length: 100xxxxxxxGET /hello HTTP/1.HTTP/1.1 400 Bad Request
Content-Length: 129
Connection: close
Content-Type: text/html<html><head><meta http-equiv='Content-Type' content='text/html;charset=utf-8'></head><body><h1>400 Bad Request</h1></body></html>]
[17:10:49][140088962811712][ERROR][default][../server.hpp:290] SOCKET send false

超时连接释放测试3

当服务器接收请求的数据时,如果业务处理耗时过长,超过了设置的超时销毁时间(即服务器性能达到瓶颈),需要观察服务端的处理情况。

预期结果:
一次业务处理耗时过长可能导致其他连接被连累超时,从而可能会释放其他连接。假设有描述符 1,2,3,4,5 就绪,而处理描述符 1 时耗时 30 秒:

  1. 如果接下来的 2,3,4,5 都是通信连接描述符,并且事件也都就绪,那么不会有问题。因为当 1 处理完后,接下来的描述符会依次处理并刷新活跃度。

  2. 如果接下来的 2 是定时器事件描述符,定时器触发超时任务,会释放掉 3,4,5 描述符对应的连接。此时,如果在处理 3,4,5 的事件时直接操作已释放的连接,会导致程序崩溃(内存访问错误)。

总结:
在任何事件处理过程中,不应直接释放连接。正确做法是将释放操作压入任务池,等所有连接事件处理完后,再统一执行任务池中的释放操作。

将搭建服务器的处理hello函数中做出小调整:

void Hello(const HttpRequest &req, HttpResponse *rsp)
{rsp->SetContent(RequestToStr(req), "text/plain");sleep(15); //测试client4 
}
/* 业务处理超时,查看服务器的处理情况当服务器达到了一个性能瓶颈,在一次业务处理中花费了太长的时间(超过了服务器设置的非活跃超时时间)1. 在一次业务处理中耗费太长时间,导致其他的连接也被连累超时,其他的连接有可能会被拖累超时释放假设现在  12345描述符就绪了, 在处理1的时候花费了30s处理完,超时了,导致2345描述符因为长时间没有刷新活跃度1. 如果接下来的2345描述符都是通信连接描述符,如果都就绪了,则并不影响,因为接下来就会进行处理并刷新活跃度2. 如果接下来的2号描述符是定时器事件描述符,定时器触发超时,执行定时任务,就会将345描述符给释放掉这时候一旦345描述符对应的连接被释放,接下来在处理345事件的时候就会导致程序崩溃(内存访问错误)因此这时候,在本次事件处理中,并不能直接对连接进行释放,而应该将释放操作压入到任务池中,等到事件处理完了执行任务池中的任务的时候,再去释放
*/#include "../server.hpp"
//在将realese 压入到任务池中执行,而不是RunInLoop中, 因为当触发事件监控时,若此时销毁了某个连接,而后续又触发了该连接的事件处理,那么将会产生内存访问的错误
int main()
{signal(SIGCHLD, SIG_IGN);for (int i = 0; i < 10; i++) {pid_t pid = fork();if (pid < 0) {Log::DEBUG("FORK ERROR");return -1;}else if (pid == 0) {Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";while(1) {assert(cli_sock.Send(req.c_str(), req.size()) != -1);char buf[1024] = {0};assert(cli_sock.Recv(buf, 1023));Log::DEBUG("[%s]", buf);}cli_sock.Close();exit(0);}}while(1) sleep(1);return 0;
}

输出结果:

[17:16:13][140144714336064][DEBUG][default][../server.hpp:1440] SIGPIPE INIT
[17:16:13][140144714336064][INFO][default][../server.hpp:342] create server success
[17:16:31][140144714336064][DEBUG][default][../server.hpp:254] get a link
[17:16:31][140144696932096][DEBUG][default][http.hpp:970] NEW CONNECTION 0x1d505a0
[17:16:31][140144714336064][DEBUG][default][../server.hpp:254] get a link
[17:16:31][140144714336064][DEBUG][default][../server.hpp:254] get a link
[17:16:31][140144714336064][DEBUG][default][../server.hpp:254] get a link
[17:16:31][140144688539392][DEBUG][default][http.hpp:970] NEW CONNECTION 0x1d526a0
[17:16:31][140144714336064][DEBUG][default][../server.hpp:254] get a link
[17:16:31][140144688539392][DEBUG][default][http.hpp:970] NEW CONNECTION 0x1d53e60
[17:16:31][140144714336064][DEBUG][default][../server.hpp:254] get a link
[17:16:31][140144688539392][DEBUG][default][http.hpp:970] NEW CONNECTION 0x1d555f0
[17:16:31][140144714336064][DEBUG][default][../server.hpp:254] get a link
[17:16:31][140144714336064][DEBUG][default][../server.hpp:254] get a link
[17:16:31][140144688539392][DEBUG][default][http.hpp:970] NEW CONNECTION 0x1d56d90
[17:16:31][140144714336064][DEBUG][default][../server.hpp:254] get a link
[17:16:31][140144714336064][DEBUG][default][../server.hpp:254] get a link
[17:16:46][140144696932096][DEBUG][default][http.hpp:970] NEW CONNECTION 0x1d53250
[17:16:46][140144696932096][DEBUG][default][http.hpp:970] NEW CONNECTION 0x1d54ac0
[17:16:46][140144696932096][DEBUG][default][http.hpp:970] NEW CONNECTION 0x1d56190
[17:16:46][140144696932096][DEBUG][default][http.hpp:970] NEW CONNECTION 0x1d57990
[17:16:46][140144688539392][DEBUG][default][http.hpp:970] NEW CONNECTION 0x1d58510
[17:17:46][140144714336064][DEBUG][default][../server.hpp:1234] realse Connection:0x1d53250
[17:17:46][140144714336064][DEBUG][default][../server.hpp:1234] realse Connection:0x1d54ac0
[17:17:46][140144714336064][DEBUG][default][../server.hpp:1234] realse Connection:0x1d56190
[17:17:46][140144714336064][DEBUG][default][../server.hpp:1234] realse Connection:0x1d57990
[17:17:46][140144714336064][DEBUG][default][../server.hpp:1234] realse Connection:0x1d505a0
[17:17:46][140144714336064][DEBUG][default][../server.hpp:1234] realse Connection:0x1d58510
[17:17:46][140144714336064][DEBUG][default][../server.hpp:1234] realse Connection:0x1d53e60
[17:17:46][140144714336064][DEBUG][default][../server.hpp:1234] realse Connection:0x1d555f0
[17:17:46][140144714336064][DEBUG][default][../server.hpp:1234] realse Connection:0x1d526a0
[17:17:46][140144714336064][DEBUG][default][../server.hpp:1234] realse Connection:0x1d56d90

数据中多条请求处理测试

/*一次性给服务器发送多条数据,然后查看服务器的处理结果*/
/*每一条请求都应该得到正常处理*/
#include "../server.hpp"int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";req += "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";req += "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";assert(cli_sock.Send(req.c_str(), req.size()) != -1);char buf[1024] = {0};assert(cli_sock.Recv(buf, 1023)); // 堵塞等待数据Log::DEBUG("[%s]", buf);sleep(20);cli_sock.Close();return 0;
}

输出结果:

[17:21:35][140427968341824][DEBUG][default][../server.hpp:1440] SIGPIPE INIT
[17:21:35][140427968341824][INFO][default][../server.hpp:353] create client success
[17:21:35][140427968341824][DEBUG][default][client5.cc:16] [HTTP/1.1 200 OK
Content-Length: 66
Connection: keep-alive
Content-Type: text/plainGET /hello HTTP/1.1
Content-Length: 0
Connection: keep-aliveHTTP/1.1 200 OK
Content-Length: 66
Connection: keep-alive
Content-Type: text/plainGET /hello HTTP/1.1
Content-Length: 0
Connection: keep-aliveHTTP/1.1 200 OK
Content-Length: 66
Connection: keep-alive
Content-Type: text/plainGET /hello HTTP/1.1
Content-Length: 0
Connection: keep-alive]

PUT大文件上传测试

/*大文件传输测试,给服务器上传一个大文件,服务器将文件保存下来,观察处理结果*/
/*上传的文件,和服务器保存的文件一致
*/
#include "../http/http.hpp"int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");std::string req = "PUT /1234.txt HTTP/1.1\r\nConnection: keep-alive\r\n";std::string body;Util::ReadFile("./hello.txt",&body);req += "Content-Length: " + std::to_string(body.size()) + "\r\n\r\n";std::cout << "[" << body.size() <<"]" << std::endl;;assert(cli_sock.Send(req.c_str(), req.size()) != -1);assert(cli_sock.Send(body.c_str(), body.size()) != -1);char buf[1024] = {0};assert(cli_sock.Recv(buf, 1023)); // 堵塞等待数据Log::DEBUG("[%s]", buf);sleep(20);cli_sock.Close();return 0;
}

输出结果:

xrw@iZ7xv0vjzfc2mzsasssfooZ test]$ md5sum hello.txt
0e4f8f9fb47564d51bd5d186f77e94e8  hello.txt
[xrw@iZ7xv0vjzfc2mzsasssfooZ wwwroot]$ md5sum 1234.txt 
0e4f8f9fb47564d51bd5d186f77e94e8  1234.txt

四、性能测试

服务器性能测试 —— 使用 Webbench

Webbench 是知名的网站压力测试工具,由 Lionbridge 公司开发(http://www.lionbridge.com)。

测试目标

Webbench 的标准测试主要展示服务器的两项内容:

  • 每秒响应请求数(Requests per Second, QPS)
  • 每秒传输数据量(Throughput)

测试原理

Webbench 的工作原理:

  1. 创建指定数量的进程。
  2. 每个进程不断创建套接字向服务器发送请求。
  3. 每个进程将结果通过管道返回给主进程进行汇总和统计。

重点衡量标准

  • 吞吐量(Throughput)
  • QPS(Queries Per Second)

测试环境

  • 服务器环境:2 核 2G 云服务器,CentOS Linux release 7.6.1810 (Core),服务器程序采用 1 主 2 从 Reactor 模式。
  • Webbench 客户端环境:同一服务器。

注意:因为客户端和服务器在同一主机上,会存在 CPU 资源争抢,所以测试结果仅作示例说明,主要目的是演示如何进行性能压力测试,而非得到准确的生产数据。

[xrw@iZ7xv0vjzfc2mzsasssfooZ WebBench-master]$ ./webbench -c 1000 -t 60 http://127.0.0.1:8080/hello
Webbench - Simple Web Benchmark 1.5
Copyright (c) Radim Kolar 1997-2004, GPL Open Source Software.Request:
GET /hello HTTP/1.0
User-Agent: WebBench 1.5
Host: 127.0.0.1Runing info: 1000 clients, running 60 sec.Speed=135803 pages/min, 339510 bytes/sec.
Requests: 135803 susceed, 0 failed.

测试其实意义不大,因为测试客户端和服务器都在同一台机器上,传输速度较快,但同时抢占 CPU 也会影响处理。最好的方式是在两台不同的机器上进行测试。目前受限于设备环境配置,尚未进行更多并发量的测试。


文章转载自:

http://dCIZ55jR.pmptm.cn
http://IqgukcdY.pmptm.cn
http://ZwQEYMSE.pmptm.cn
http://StdWVjEB.pmptm.cn
http://MKwtL895.pmptm.cn
http://Nainy1ae.pmptm.cn
http://hLxsKdLz.pmptm.cn
http://rJOf8ZEH.pmptm.cn
http://3geZJL9q.pmptm.cn
http://7ki8qFyH.pmptm.cn
http://XLp66Vag.pmptm.cn
http://4Sjm2mxx.pmptm.cn
http://hNEQz8Sm.pmptm.cn
http://9QZoEAUv.pmptm.cn
http://yjPthYI5.pmptm.cn
http://MxeqOFDh.pmptm.cn
http://cSTxua8k.pmptm.cn
http://ahXQc2Rs.pmptm.cn
http://YPYfWn7w.pmptm.cn
http://VKhNWTvj.pmptm.cn
http://SXsSFbWx.pmptm.cn
http://sb1Vn84F.pmptm.cn
http://ykcDNrT3.pmptm.cn
http://SrTH2sUT.pmptm.cn
http://YWutEOWA.pmptm.cn
http://iAdpkEMF.pmptm.cn
http://T7WfcgZE.pmptm.cn
http://ahs6GvET.pmptm.cn
http://BTiYwrzC.pmptm.cn
http://JQJVTAHi.pmptm.cn
http://www.dtcms.com/a/380121.html

相关文章:

  • 主动性算法-解决点:新陈代谢
  • 从0开始开发app(AI助手版)-架构及环境搭建
  • 服务器内存不足会造成哪些影响?
  • 缓存三大劫攻防战:穿透、击穿、雪崩的Java实战防御体系(二)
  • MongoDB BI Connector 详细介绍与使用指南(手动安装方式,CentOS 7 + MongoDB 5.0.5)
  • 【计算机网络】HTTP协议(一)——超文本传输协议
  • 【国内电子数据取证厂商龙信科技】被格式化的手机如何恢复数据
  • 【项目】 :C++ - 仿mudou库one thread one loop式并发服务器实现(模块划分)
  • 采集集群外的k8s(prometheus监控)
  • AI 玩转网页自动化无压力:基于函数计算 FC 构建 Browser Tool Sandbox
  • Redisson原理与面试问题解析
  • ICCV 2025 | 首次引入Flash Attention,轻量SR窗口扩至32×32还不卡!
  • 关于线性子空间(Linear Subspace)的数学定义
  • OpenHarmony AVSession深度解析(二):从本地会话到分布式跨设备协同的完整生命周期管理
  • 12.NModbus4在C#上的部署与使用 C#例子 WPF例子
  • 迅为RK3568开发板Linux_NVR_SDK 系统开发-扩展根文件系统
  • OpenCV:特征提取
  • Zynq开发实践(FPGA之第一个vivado工程)
  • 数字人技术如何与数字孪生深度融合?
  • 如何生成 GitHub Token(用于 Hexo 部署):保姆级教程+避坑指南
  • Python uv常用命令及使用详解
  • MySQL主从同步参数调优案例
  • Python的uv包管理工具使用
  • 构建python3.11+uv+openssh环境的docker镜像
  • RabbitMQ的核心使用示例
  • 大数据电商流量分析项目实战:Hive 数据仓库(三)
  • 【Kubernetes】Tomcat 启用 Prometheus 监控指标
  • 数字人分身 + 矩阵系统聚合的源码搭建与定制开发
  • 如何使用 OCR 提取扫描件 PDF 的文本(Python 实现)
  • 并发:使用volatile和不可变性实现线程安全