基于C++重构muduo网络库搭建HTTP服务器项目设计及反思
目录
基于muduo思想搭建TcpServer服务器
一、日志类的设计
二、C++17 any类的设计与实现
any类的设计思想
设计框架
测试用例
三、缓冲区Buffer类设计
四、Socket套接字封装
五、多路复用模型epoll封装
六、事件通道Channel封装
七、定时功能模块封装
时间轮思想
工作原理
时间轮的应用
优化方案:使用多级时间轮增加定时任务的精度与上限。
八、EventLoop、LoopThread、LoopThreadPool模块
EventLoop类封装设计
LoopThread类封装设计
LoopThreadPool封装设计
九、Connection模块设计
十、Acceptor模块设计
十一、TcpServer模块设计
基于TcpSever搭建支持HTTP协议的HttpServer服务器
一、工具类模块
二、HttpRequest与HttpReponse类
三、HttpContext类设计
四、HttpServer服务器类设计
使用WebBench对服务器进行压力测试
2. 基本使用方法
3. 示例
4. 测试结果分析
前言:为什么要做muduo库呢?在后端开发中,服务器往往是基于如httplib库、muduo库等基础组件的基础上进行搭建的。选择重构muduo库,我们不仅可以借此机会了解网络库底层的实现逻辑以便于我们日后进行服务器的搭建,更重要的,我们可以学习muduo库中高效的事件驱动、多线程并发处理等优秀的设计理念。同时,可以加深我们对Epoll LT与ET模式、主从Reactor模式、非阻塞I/O读取、线程池,TCP、HTTP协议等等知识的巩固和掌握。
在本项目中,我们将基于重构的muduo网络库搭建一个HTTP服务器件。
muduo是一个备受推崇的高性能C++开源网络库,用于网络编程领域。它以高效的事件驱动、多线程并发处理和优秀的设计理念而闻名。Muduo基于非阻塞IO和事件驱动,采用基于对象而非面向对象的设计风格,使用线程模型是one loop per thread,主从线程Reactor模式,适用于高并发TCP网络编程。
主从线程Reactor模式图:
muduo网络库模块关联图:
陈硕大神muduo库源码链接:GitHub - chenshuo/muduo: Event-driven network library for multi-threaded Linux server in C++11
文档介绍:Linux多线程服务端编程 (豆瓣)
在学习muduo时建议还是不要上来就去看源码,否则代码中大量的回调函数就可能把你绕晕。网上有很多muduo的学习笔记,大部分针对muduo的思想和代码逻辑做出了讲解。在这里博主建议,在学习完每个模块的功能与作用之后,自己要“从上至下”过一遍muduo的逻辑。例如关注一条连接从建立到通信再到连接关闭和销毁的过程。当理解透彻后,写起代码来会容易得多。
前置知识:C、C++11、多线程、网络编程、线程池、多路复用、Reactor模式
开发环境:Ubuntu 22.04、vscode,xshell,g++,gdb,Makefile,WebBench
基于muduo思想搭建TcpServer服务器
一、日志类的设计
在项目代码设计中,我们可能无法预测代码设计不周带来的错误。而日志可以帮助我们快速定位问题所在。在一个复杂的函数中,通过在关键位置添加日志输出变量的值,就能够清晰地了解程序的执行流程和数据的变化情况。
设计日志类我们需要考虑多种情况,我们所要设计的日志类支持如下功能:
- 支持自主选择选择向屏幕输入或向文件输入日志内容
- 提供五个日志信息等级,比提供根据日志等级过滤日志信息的功能
- 因为显示器文件和记录日志的文件在多线程中属于公共资源,使用互斥锁保护日志的打印预防在多线程环境下日志信息打印可能导致的信息混淆
- 使用宏定义提供类似于printf函数风格的日志打印接口,支持可变参数
// 日志信息等级
enum LogLevel {
DEBUG = 1, // 调试
INFO, // 信息输出
WARNING, // 警告
ERROR, // 错误
FATAL // 致命错误
};
// 将日志等级转换为字符串
std::string LevelToString(LogLevel level) {
switch (level) {
case DEBUG:
return "DEBUG";
case INFO:
return "INFO";
case WARNING:
return "WARNING";
case ERROR:
return "ERROR";
case FATAL:
return "FATAL";
default:
return "UNKNOWN";
}
}
// 日志消息结构体
struct LogMessage {
std::string level;
pid_t pid;
std::string fileName;
int fileNumber;
std::string logInfo;
std::string currentTime;
};
// 打印选项
const int TO_SCREEN = 1;
const int TO_FILE = 2;
const int TIME_BUFF = 128;
const int INFO_BUFF = 1024;
// 日志类
class Log {
private:
LogMessage logMessage;
int printChoice;
std::string printFileName;
std::mutex logMutex;
LogLevel miniErrLevel; // 最低错误等级,只打印等于或高于该错误等级的日志信息
// 获取当前时间
std::string getCurrentTime() {
auto now = std::time(nullptr);
auto time = std::localtime(&now);
char timeBuff[TIME_BUFF];
std::snprintf(timeBuff, TIME_BUFF, "%d/%02d/%02d %02d:%02d:%02d",
time->tm_year + 1900, time->tm_mon + 1,
time->tm_mday, time->tm_hour, time->tm_min, time->tm_sec);
return timeBuff;
}
// 创建日志消息
void createLogMessage(LogLevel level, const std::string& filename, int filenumber, const char* format, va_list args) {
if (level < miniErrLevel) {
return; // 过滤低于最低错误等级的日志信息
}
logMessage.level = LevelToString(level);
logMessage.fileName = filename;
logMessage.fileNumber = filenumber;
logMessage.pid = getpid();
logMessage.currentTime = getCurrentTime();
char infoBuff[INFO_BUFF];
std::vsnprintf(infoBuff, INFO_BUFF, format, args);
logMessage.logInfo = infoBuff;
flushLog(logMessage);
}
// 向屏幕打印日志
void flushLogToScreen(const LogMessage& lg) {
std::printf("[%s][%d][%s][%d][%s] %s\n",
lg.level.c_str(),
lg.pid,
lg.fileName.c_str(),
lg.fileNumber,
lg.currentTime.c_str(),
lg.logInfo.c_str());
}
// 向文件打印日志
void flushLogToFile(const LogMessage& lg) {
std::ofstream out(printFileName, std::ios::app);
if (out.is_open()) {
char info[INFO_BUFF];
std::snprintf(info, sizeof(info), "[%s][%d][%s][%d][%s] %s\n",
lg.level.c_str(),
lg.pid,
lg.fileName.c_str(),
lg.fileNumber,
lg.currentTime.c_str(),
lg.logInfo.c_str());
out.write(info, std::strlen(info));
out.close();
}
}
// 刷新日志
void flushLog(const LogMessage& lg) {
std::lock_guard<std::mutex> lock(logMutex);
switch (printChoice) {
case TO_SCREEN:
flushLogToScreen(lg);
break;
case TO_FILE:
flushLogToFile(lg);
break;
}
}
public:
// 构造函数
Log(int printChoice = TO_SCREEN, const std::string& printFileName = "./log.txt", LogLevel minLevel = DEBUG)
: printChoice(printChoice), printFileName(printFileName), miniErrLevel(minLevel) {}
// 记录日志
void log(LogLevel level, const std::string& filename, int filenumber, const char* format, ...) {
va_list args;
va_start(args, format);
createLogMessage(level, filename, filenumber, format, args);
va_end(args);
}
// 改变日志打印路径
void changePrintChoice(int newChoice) {
printChoice = newChoice;
}
// 设置最低错误等级
void setMiniErrLevel(LogLevel level) {
miniErrLevel = level;
}
// 启用输出选项
void enableOutput(int choice) {
changePrintChoice(choice);
}
};
// 全局日志对象
Log lg;
// 日志打印宏
#define INF_LOG(format, ...) lg.log(INFO, __FILE__, __LINE__, format, ##__VA_ARGS__)
#define DBG_LOG(format, ...) lg.log(DEBUG, __FILE__, __LINE__, format, ##__VA_ARGS__)
#define ERR_LOG(format, ...) lg.log(ERROR, __FILE__, __LINE__, format, ##__VA_ARGS__)
#define WRN_LOG(format, ...) lg.log(WARNING, __FILE__, __LINE__, format, ##__VA_ARGS__)
#define FTL_LOG(format, ...) lg.log(FATAL, __FILE__, __LINE__, format, ##__VA_ARGS__)
#define EnableScreen() lg.enableOutput(TO_SCREEN)
#define EnableFILE() lg.enableOutput(TO_FILE)
二、C++17 any类的设计与实现
什么是any类?
std::any 是 C++17 引入的一个标准库类型,用于表示一个可以存储任意类型数据的容器。与 std::variant 不同,std::any 不限制存储的类型,因此它可以用来存储任意的对象。它的设计目标是提供一种简单的方式来存储和检索任意类型的值,而不需要像 void* 那样手动管理类型信息。
为什么要引入any类,在本项目中的用途是什么?
我们实现的是一个基于TCP的核心服务器组件,未来,我们会基于TCPServer搭建能够处理不同协议的服务器。这意味着,对于每个TcpConnection,它们内部能够存储不同协议请求的上下文。对于HTTP协议,TcpConnection需要处理HTTP_Request;对于websocket,我们要求Connection同样能对websocket_Request进行处理。这就要求TcpConnection内部存储Request的对象能够存储不同的协议请求上下文对象。因此,该部分我们使用any类对象来存储不同的协议请求的上下文。
any类的设计思想
⾸先Any类肯定不能是⼀个模板类,否则编译的时候 Any<int> a, Any<float>b,需要传类型作为模板参数,也就是说在使⽤的时候就要确定其类型。显然,我们希望的是直接定义一个Any对象能够接收不同类型的对象。如:Any a = std::string("hello"); 或 Any a = 10;。
我们如何在不进行显式实例化的同时还能接收各类型的对象呢?我们知道,想要接收任意类型的对象,就务必要使用模板,但模板参数的实际类型是在编译时才进行确认的,但我们又无法直接对Any类使用模板,因此Any类的成员变量也一定不是模板,而是一个既定类型的变量。因此考虑Any内部设计⼀个模板容器subholder类,该类可以保存各种类型数据。
而因为在Any类中⽆法定义这个subholder对象或指针,因为any也不知道这个类要保存什么类型的数据,因此⽆法传递类型参数。所以,定义一个基类placehoder,让subholder继承于placeholder,⽽Any类保存父类指针即可。
当需要保存数据时,由于模板可以自行推导所传入参数的类型,因此我们在Any类构造函数或赋值运算符重载函数的设计中可以使用模板函数,然后在函数内部new⼀个带有模板参数的子类subholder对象出来保存数据,然后让Any类中的⽗类指针指向这个子类对象就搞定了。例如:
template <typename T>
Any(const T &operand)
: _content(new subholder<T>(operand))
{}
这其实是使用了多态的思想:假设Any类中有一个非模板成员变量content,要使得该变量可以接收任意类型的值,我们可以使用“多态的思想”。在C++多态中,基类对象可以接收子类对象——会进行“切片处理”保证赋值的正确性,且在需要的时候,可以对基类对象进行强制转换使其转换为子类对象。
设计框架
Any 类的设计目标:
- 提供一个通用的容器,能够存储任意类型的数据(类似于 std::any)。
- 支持在运行时获取存储数据的类型信息(通过 typeid)。
- 支持深拷贝、赋值操作,以及类型安全的值提取(any_cast)。
核心设计思想:
- 使用多态和模板技术实现类型擦除(Type Erasure)。
- 通过派生类(subholder<T>)存储具体类型的数据,并通过基类指针(placeholder*)进行管理。
类层次结构:
placeholder 类:
- 是一个纯虚基类,定义了接口(type() 和 clone()),不能直接实例化。
- 所有具体类型的数据存储类(subholder<T>)都继承自它。
subholder<T> 类:
- 继承自 placeholder,用于存储具体类型的数据(T _val)。
- 实现了 type() 和 clone() 接口,提供类型信息和深拷贝功能。
class Any
{
// 基类
class placeholder // 包含纯虚函数,是抽象类,不能直接实例化对象
{
public:
virtual ~placeholder() {
/*placeholder 类是一个纯虚类,
它的析构函数需要提供一个实现,否则会导致链接错误。
纯虚析构函数的实现通常是一个空函数体。*/
}
virtual const std::type_info &type() = 0; // 返回参数的类型
virtual placeholder *clone() = 0; // 克隆出一个相同的对象,实现深拷贝
};
template <typename T>
class subholder : public placeholder // 子类
{
public:
subholder(const T &operand) : _val(operand) {}
~subholder() {} // 析构
const std::type_info &type() override // 返回对象的数据类型
{
return typeid(T);
}
placeholder *clone() override
{
return new subholder(_val);
}
public:
T _val; // 数据对象
};
private:
placeholder *_content; // 定义基类指针
public:
Any() {};
template <typename T>
Any(const T &operand);
bool has_value(); // 是否已经有值
Any(const Any &operand); //拷贝构造
template <typename T>
Any &operator=(const T &operand); // 赋值运算符重载
Any &operator=(Any operand); // 赋值运算符重载
void swap(Any &operand); // 交换数据
const std::type_info &type(); // 返回保存的对象的数据类型
template <typename T>
T *get_val(); // 获取指向数据对象的指针
};
#include <iostream>
#include <string>
#include <typeinfo>
#include <typeindex>
#include <cassert>
namespace MyAny
{
class Any
{
class placeholder // 包含纯虚函数,是抽象类,不能直接实例化对象
{
public:
virtual ~placeholder() {
/*placeholder 类是一个纯虚类,
它的析构函数需要提供一个实现,否则会导致链接错误。
纯虚析构函数的实现通常是一个空函数体。*/
}
virtual const std::type_info &type() = 0; // 返回参数的类型
virtual placeholder *clone() = 0; // 克隆出一个相同的对象,实现深拷贝
};
template <typename T>
class subholder : public placeholder // 子类
{
public:
subholder(const T &operand) : _val(operand) {}
~subholder() {} // 析构
const std::type_info &type() override // 返回对象的数据类型
{
return typeid(T);
}
placeholder *clone() override
{
return new subholder(_val);
}
public:
T _val; // 数据对象
};
private:
placeholder *_content; // 定义基类指针
public:
Any() : _content(nullptr) {};
template <typename T>
Any(const T &operand) : _content(new subholder<T>(operand)){};
bool has_value() // 是否已经有值
{
return _content != nullptr;
}
Any(const Any &operand)
{
_content = operand._content == nullptr ? nullptr : operand._content->clone();
}
template <typename T>
Any &operator=(const T &operand)
{
/*为operand构建⼀个临时对象出来,然后进⾏交换,这样临时对象销毁的时候,顺带原先
保存的placeholder也会被销毁*/
Any(operand).swap(*this);
return *this;
}
Any &operator=(Any operand) // 构造一个新的临时对象,直接交换资源即可
{
// if(has_value()) {delete _content; _content = nullptr;} 使用拷贝交换惯用法,已经将资源托管给临时对象,无需这一步
swap(operand);
return *this;
}
void swap(Any &operand)
{
std::swap(_content, operand._content);
}
const std::type_info &type()
{
return _content->type();
}
template <typename T>
T *get_val()
{
if (typeid(T) != type())
assert(nullptr);
return &(((subholder<T> *)_content)->_val);
}
~Any()
{
if(has_value()) delete _content;
_content = nullptr;
}
};
template <typename T>
T any_cast(Any &operand)
{
if (typeid(T) != operand.type())
throw("type mismatch");
return *(operand.get_val<T>());
}
template <typename T>
T *any_cast(Any *operand)
{
if (typeid(T) != operand->type())
throw("type mismatch");
return operand->get_val<T>();
}
}
测试用例
class Test
{
public:
std::string _data;
public:
Test(const std::string &data) : _data(data) { std::cout << "构造" << _data << std::endl; }
Test(const Test &other)
{
_data = other._data;
std::cout << "拷⻉" << _data << std::endl;
}
~Test() { std::cout << "析构" << _data << std::endl; }
};
using namespace MyAny;
int main()
{
// 测试用例1
Any a;
a = std::string("asdasdasd");
std::string ret = any_cast<std::string>(a);
std::cout << ret << std::endl;
Any b = 10;
std::cout << any_cast<int>(b) << std::endl;
b = a;
std::cout << *b.get_val<std::string>() << std::endl;
std::cout << any_cast<std::string>(a) << std::endl;
// // 测试用例2
// Any a;
// {
// Test t("1");
// a = t;
// }
// while(1);
return 0;
}
三、缓冲区Buffer类设计
应用层缓冲区Buffer的设计,有2个关键点:
1、为什么要需要有应用层缓冲区?
2、Buffer结构
为什么需要有应用层缓冲区?
non-blocking(非阻塞)网络编程中,non-blocking IO核心思想是避免阻塞在read()/write()或其他IO系统调用上,可以最大限度复用thread-of-control,让一个线程能服务于多个socket连接。而IO线程只能阻塞在IO-multiplexing(多路复用)函数上,如select()/poll()/epoll_wait(),这样应用层的缓冲区就是必须的,每个TCP socket都要有stateful的input buffer和output buffer。
具体来说,
1)TcpConnection必须要有output buffer
一个常见场景:程序想通过TCP连接发送100K byte数据,但在write()调用中,OS只接收80K(受TCP通告窗口advertised window的控制),而程序又不能原地阻塞等待,事实上也不知道要等多久。程序应该尽快交出控制器,返回到event loop。此时,剩余20K数据怎么办?
对应用程序,它只管生成数据,不应该关心数据是一次发送,还是分几次发送,这些应该由网络库操心,程序只需调用TcpConnection::send()即可。网络库应该接管剩余的20K数据,把它保存到TcpConnection的output buffer,然后注册EPOLLOUT事件,一旦socket变得可写就立刻发送数据。当然,第二次不一定能完全写入20K,如果有剩余,网络库应该继续关注EPOLLOUT事件;如果写完20K,网络库应该停止关注EPOLLOUT,以免造成busy loop。
如果程序又写入50K,而此时output buffer里还有待发20K数据,那么网络库不应该直接调用write(),而应该把这50K数据append到那20K数据之后,等socket变得可写时再一并写入。
如果output buffer里还有待发送数据,而程序又想关闭连接,但对程序而言,调用TcpConnection::send()后就认为数据迟早会发出去,此时网络库不应该直接关闭连接,而要等数据发送完毕。因为此时数据可能还在内核缓冲区中,并没有通过网卡成功发送给接收方。
将数据append到buffer,甚至write进内核,都不代表数据成功发送给对端。
综上,要让程序在write操作上不阻塞,网络库必须给每个tcp connection配置output buffer。
2)TcpConnection必须要有input buffer
TCP是一个无边界的字节流协议,接收方必须要处理“收到的数据尚不构成一条完整的消息”“一次收到多条消息的数据”等情况。
一个常见场景:发送方send 2条10K byte消息(共计20K),接收方收到数据的可能情况:
- 一次性收到20K数据
- 分2次收到,第一次5K,第二次15K
- 分3次收到,第一次6K,第二次8K,第三次6K
- 其他任何可能
这些情况统称为粘包问题。
LT模式下,如何解决的粘包问题?
可以一次把内核缓冲区中的数据读完,存至input buffer,通知应用程序,进行onMessage(Buffer* buffer)回调。在onMessage回调中,应用层协议判定是否是一个完整的包,如果不是一条完整的消息,不会取走数据,也不会进行相应的处理;如果是一条完整的消息,将取走这条消息,并进行相应的处理。
如何判断是一条完整的消息?
答:相应应用层协议制定协议,不由网络库负责。
网络库如何处理 “socket可读”事件?
答:一次性把socket中数据读完,从内核缓冲区读取到应用层buffer,否则会反复触发POLLIN事件,造成busy-loop。
应用层与input buffer、output buffer:
muduo中的IO都是带缓冲的IO(buffered IO),应用层不会自行去read()或write()某个socket,只会操作TcpConnection的input buffer和output buffer。更准确来说,是在onMessage()回调中读取input buffer;调用TcpConnection::send() 来间接操作output buffer,而不直接操作output buffer。
为什么要自主设计缓冲区,而不使用std::string呢?
我们考虑使用string的缺陷如下:
1、当使用string接收字符串时,当遇到'\0'字符时,默认该字符串已经结束。网络中传输的数据多种多样,我们无法保证缓冲区接收到的数据中一定不包含'\0',这就可能导致用户缓冲区接收通信数据不完善,导致信息处理错误。
2、我们在接收数据时总是从缓冲区头部提取数据,而需要从string头部提取数据后,string会自动将提取末尾后的数据向前填充,这一结果在最坏情况下所耗费的时间复杂度为O(N)。当面对大量数据时,显然使用string是个不划算的选择。
因此,我们考虑从解决上述两个问题的基础上设计一个Buffer类充当缓冲区。
1、Buffer要支持空间自动增长,以便在LT和ET模式下都能源源不断的接收可控长度的数据。
2、遇到'\0'字符也能够正常继续接收数据。
3、将数据删除的操作控制在O(1)的时间复杂度,并且支持空间的循环利用。
看到上述三个需求,大家脑海里有没有浮现出一个我们见过的且符合上述要求的缓冲区设计?
对,就是TCP协议缓冲区的设计。TCP的缓冲区采用双指针的形式,底层为字符数组,在逻辑上形同一个“循环队列”。在删除数据时并不是真正的“删除”,而是控制读指针向后移动,增加数据时则将写指针向后移动,当尾部空间不够时,继续从头部剩余空间覆盖写入数据。
为了避免资源的浪费,我们并不在一开始就定义大块空间的缓冲区,而是采取动态增长字符数组的形式。当尾部剩余空间+头部剩余空间不足以接收所有数据时,我们对其进行扩容,否则直接写入。
2个indices(标记)readIndex、writeIndex把vector内容分为3块:prependable、readable、writable,各块大小关系:
- prependable = readIndex
- readable = writeIndex - readIndex
- writable = size() - writeIndex
readIndex,writeIndex满足以下不变式:0 ≤ readIndex ≤ writeIndex ≤ data.size()
Buffer类的设计并不关心线程安全,原因如下:
- 对于input buffer,onMessage()回调发生在该TcpConnection所属IO线程,应用程序应该在onMessage()完成对input buffer的操作,并且不要把input buffer暴露给其他线程。这样,对input buffer的操作都在同一个IO线程,因此Buffer class不必是线程安全的。
- 对于output buffer,应用程序不会直接操作它,而是调用TcpConenction::send()来发送数据,后者是线程安全的。准确来说,是会让output buffer只会在所属IO线程操作。
muduo网络库源码中Buffer类的核心成员设计:
class Buffer : public muduo::copyable
{
public:
...
private:
std::vector<char> buffer_; // 存储数据的线性缓冲区, 大小可变
size_t readerIndex_; // 可读数据首地址, i.e. readable空间首地址
size_t writerIndex_; // 可写数据首地址, i.e. writable空间首地址
...
};
以上就是Buffer类的设计思想,下面是基于上述思想的代码实现:
// 定义缓冲区的默认大小
#define BUFFER_DEFAULT_SIZE 1024
// Buffer 类用于管理一个动态大小的缓冲区,支持数据的读写操作
class Buffer {
private:
// 使用 std::vector 管理缓冲区的内存空间
std::vector<char> _buffer;
// 读偏移量,指示当前可读数据的起始位置
uint64_t _reader_idx;
// 写偏移量,指示当前可写数据的起始位置
uint64_t _writer_idx;
// 确保缓冲区有足够的空间来写入指定长度的数据
// 如果尾部空闲空间足够,直接返回;若总空闲空间足够,将数据移到起始位置;否则进行扩容
void _EnsureSufficientWriteSpace(uint64_t len) {
// 若尾部空闲空间足够,无需额外操作
if (TailIdleSize() >= len) {
return;
}
// 若总空闲空间(头部空闲 + 尾部空闲)足够,移动数据到起始位置
if (len <= TailIdleSize() + HeadIdleSize()) {
// 记录当前可读数据的长度
uint64_t readable_size = ReadAbleSize();
// 将可读数据复制到缓冲区起始位置
std::copy(ReadPosition(), ReadPosition() + readable_size, Begin());
// 重置读偏移量为 0
_reader_idx = 0;
// 更新写偏移量为当前可读数据的长度
_writer_idx = readable_size;
} else {
// 总空闲空间不足,进行扩容操作
DBG_LOG("RESIZE %ld", _writer_idx + len);
_buffer.resize(_writer_idx + len);
}
}
public:
// 构造函数,初始化缓冲区、读偏移量和写偏移量
Buffer() : _reader_idx(0), _writer_idx(0), _buffer(BUFFER_DEFAULT_SIZE) {}
// 获取缓冲区的起始地址
char* Begin() {
return &*_buffer.begin();
}
// 获取当前写入的起始地址,即缓冲区起始地址加上写偏移量
char* WritePosition() {
return Begin() + _writer_idx;
}
// 获取当前读取的起始地址,即缓冲区起始地址加上读偏移量
char* ReadPosition() {
return Begin() + _reader_idx;
}
// 获取缓冲区末尾的空闲空间大小,即总空间大小减去写偏移量
uint64_t TailIdleSize() {
return _buffer.size() - _writer_idx;
}
// 获取缓冲区起始的空闲空间大小,即读偏移量的值
uint64_t HeadIdleSize() {
return _reader_idx;
}
// 获取缓冲区中可读数据的大小,即写偏移量减去读偏移量
uint64_t ReadAbleSize() {
return _writer_idx - _reader_idx;
}
// 将读偏移量向后移动指定的长度
// 移动长度需小于等于当前可读数据的大小
void MoveReadOffset(uint64_t len) {
if (len == 0) {
return;
}
// 确保移动长度不超过可读数据大小
assert(len <= ReadAbleSize());
_reader_idx += len;
}
// 将写偏移量向后移动指定的长度
// 移动长度需小于等于当前尾部的空闲空间大小
void MoveWriteOffset(uint64_t len) {
// 确保移动长度不超过尾部空闲空间大小
assert(len <= TailIdleSize());
_writer_idx += len;
}
// 确保可写空间足够,调用私有函数 _EnsureSufficientWriteSpace 实现
void EnsureWriteSpace(uint64_t len) {
_EnsureSufficientWriteSpace(len);
}
// 向缓冲区写入指定长度的数据
// 先确保有足够空间,再将数据复制到写入位置
void Write(const void* data, uint64_t len) {
if (len == 0) {
return;
}
// 确保有足够空间写入数据
EnsureWriteSpace(len);
const char* d = static_cast<const char*>(data);
// 将数据复制到写入位置
std::copy(d, d + len, WritePosition());
}
// 写入数据并将写偏移量向后移动相应长度
void WriteAndPush(const void* data, uint64_t len) {
Write(data, len);
MoveWriteOffset(len);
}
// 写入字符串数据,将字符串转换为字符指针并调用 Write 函数
void WriteString(const std::string& data) {
Write(data.c_str(), data.size());
}
// 写入字符串数据并将写偏移量向后移动相应长度
void WriteStringAndPush(const std::string& data) {
WriteString(data);
MoveWriteOffset(data.size());
}
// 从另一个缓冲区写入数据,调用 Write 函数
void WriteBuffer(Buffer& data) {
Write(data.ReadPosition(), data.ReadAbleSize());
}
// 从另一个缓冲区写入数据并将写偏移量向后移动相应长度
void WriteBufferAndPush(Buffer& data) {
WriteBuffer(data);
MoveWriteOffset(data.ReadAbleSize());
}
// 从缓冲区读取指定长度的数据到指定位置
// 读取长度需小于等于当前可读数据的大小
void Read(void* buf, uint64_t len) {
// 确保读取长度不超过可读数据大小
assert(len <= ReadAbleSize());
// 将数据从读取位置复制到指定位置
std::copy(ReadPosition(), ReadPosition() + len, static_cast<char*>(buf));
}
// 读取数据并将读偏移量向后移动相应长度
void ReadAndPop(void* buf, uint64_t len) {
Read(buf, len);
MoveReadOffset(len);
}
// 以字符串形式读取指定长度的数据
// 读取长度需小于等于当前可读数据的大小
std::string ReadAsString(uint64_t len) {
// 确保读取长度不超过可读数据大小
assert(len <= ReadAbleSize());
std::string str;
str.resize(len);
// 读取数据到字符串
Read(&str[0], len);
return str;
}
// 以字符串形式读取指定长度的数据并将读偏移量向后移动相应长度
std::string ReadAsStringAndPop(uint64_t len) {
assert(len <= ReadAbleSize());
std::string str = ReadAsString(len);
MoveReadOffset(len);
return str;
}
// 在可读数据中查找换行符('\n')的位置
char* FindCRLF() {
return static_cast<char*>(std::memchr(ReadPosition(), '\n', ReadAbleSize()));
}
// 获取一行数据,包含换行符
// 若未找到换行符,返回空字符串
std::string GetLine() {
char* pos = FindCRLF();
if (pos == nullptr) {
return "";
}
// 读取包含换行符的一行数据
return ReadAsString(pos - ReadPosition() + 1);
}
// 获取一行数据并将读偏移量向后移动相应长度
std::string GetLineAndPop() {
std::string str = GetLine();
MoveReadOffset(str.size());
return str;
}
// 清空缓冲区,将读偏移量和写偏移量都置为 0
void Clear() {
_reader_idx = 0;
_writer_idx = 0;
}
};
四、Socket套接字封装
这块相信大家只要学习过网络编程就自主封装过。此处不再进行过多讲解,仅作简要阐述。
该类的主要作用是将socket相关的函数进行封装,简化编码。值得注意的是:该类同时提供阻塞I/O和非阻塞I/O 两套I/O接口,以应对不同情况下的需求。
#define MAX_LISTEN 1024
class Socket {
private:
int _sockfd;
public:
Socket():_sockfd(-1) {}
Socket(int fd): _sockfd(fd) {}
~Socket() { Close(); }
int Fd() { return _sockfd; }
//创建套接字
bool Create() {
// int socket(int domain, int type, int protocol)
_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (_sockfd < 0) {
ERR_LOG("CREATE SOCKET FAILED!!");
return false;
}
return true;
}
//绑定地址信息
bool Bind(const std::string &ip, uint16_t port) {
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
// int bind(int sockfd, struct sockaddr*addr, socklen_t len);
int ret = bind(_sockfd, (struct sockaddr*)&addr, len);
if (ret < 0) {
ERR_LOG("BIND ADDRESS FAILED!");
return false;
}
return true;
}
//开始监听
bool Listen(int backlog = MAX_LISTEN) {
// int listen(int backlog)
int ret = listen(_sockfd, backlog);
if (ret < 0) {
ERR_LOG("SOCKET LISTEN FAILED!");
return false;
}
return true;
}
//向服务器发起连接
bool Connect(const std::string &ip, uint16_t port) {
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
// int connect(int sockfd, struct sockaddr*addr, socklen_t len);
int ret = connect(_sockfd, (struct sockaddr*)&addr, len);
if (ret < 0) {
ERR_LOG("CONNECT SERVER FAILED!");
return false;
}
return true;
}
//获取新连接
int Accept() {
// int accept(int sockfd, struct sockaddr *addr, socklen_t *len);
int newfd = accept(_sockfd, NULL, NULL);
if (newfd < 0) {
ERR_LOG("SOCKET ACCEPT FAILED!");
return -1;
}
return newfd;
}
//接收数据
ssize_t Recv(void *buf, size_t len, int flag = 0) {
// ssize_t recv(int sockfd, void *buf, size_t len, int flag);
ssize_t ret = recv(_sockfd, buf, len, flag);
if (ret <= 0) {
//EAGAIN 当前socket的接收缓冲区中没有数据了,在非阻塞的情况下才会有这个错误
//EINTR 表示当前socket的阻塞等待,被信号打断了,
if (errno == EAGAIN || errno == EINTR) {
return 0;//表示这次接收没有接收到数据
}
ERR_LOG("SOCKET RECV FAILED!!");
return -1;
}
return ret; //实际接收的数据长度
}
ssize_t NonBlockRecv(void *buf, size_t len) {
return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞。
}
//发送数据
ssize_t Send(const void *buf, size_t len, int flag = 0) {
// ssize_t send(int sockfd, void *data, size_t len, int flag);
ssize_t ret = send(_sockfd, buf, len, flag);
if (ret < 0) {
if (errno == EAGAIN || errno == EINTR) {
return 0;
}
ERR_LOG("SOCKET SEND FAILED!!");
return -1;
}
return ret;//实际发送的数据长度
}
ssize_t NonBlockSend(void *buf, size_t len) {
if (len == 0) return 0;
return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前发送为非阻塞。
}
//关闭套接字
void Close() {
if (_sockfd != -1) {
close(_sockfd);
_sockfd = -1;
}
}
//创建一个服务端连接
bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) {
//1. 创建套接字,2. 绑定地址,3. 开始监听,4. 设置非阻塞, 5. 启动地址重用
if (Create() == false) return false;
if (block_flag) NonBlock();
if (Bind(ip, port) == false) return false;
if (Listen() == false) return false;
ReuseAddress();
return true;
}
//创建一个客户端连接
bool CreateClient(uint16_t port, const std::string &ip) {
//1. 创建套接字,2.指向连接服务器
if (Create() == false) return false;
if (Connect(ip, port) == false) return false;
return true;
}
//设置套接字选项---开启地址端口重用
void ReuseAddress() {
// int setsockopt(int fd, int leve, int optname, void *val, int vallen)
int val = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void*)&val, sizeof(int));
val = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void*)&val, sizeof(int));
}
//设置套接字阻塞属性-- 设置为非阻塞
void NonBlock() {
//int fcntl(int fd, int cmd, ... /* arg */ );
int flag = fcntl(_sockfd, F_GETFL, 0);
fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
}
};
五、多路复用模型epoll封装
epoll使用LT模式原因:
- LT模式不会发生漏掉事件的BUG,但EPOLLOUT事件不能一开始就关注,否则会出现busy loop,而应该在write无法完全写入内核缓冲区的时候才关注,将未写入内核缓冲区的数据添加到应用层output buffer,直到应用层output buffer写完,停止关注POLLOUT事件。-- 相当于写完成回调,处理写完成事件。
- 读写的时候不必等候EAGAIN,可以节省系统调用次数,降低延迟。(注:如果用ET模式,读的时候读到EAGAIN,写的时候直到output buffer写完成或者EAGAIN)
该类对epoll相关函数进行封装。需要注意的是,只有拥有EventLoop的线程,才能调用EventLoop所拥有的Poller对象的接口。这也就意味着Poller对象并不由外界直接使用,而是存在于一个EventLoop对象中,又因为一个EventLoop只存在于一个Thread中,即One Thread One Loop。
因此Poller对象并不考虑线程安全问题,操作epoll的接口将由EventLoop进行二次封装向外界提供使用,由EventLoop保证操作epoll模型的线程安全问题。
// epoll 最大事件数量
#define MAX_EPOLLEVENTS 1024
// Poller 类用于管理 epoll 事件的监听和处理
class Poller {
private:
// epoll 实例的文件描述符
int _epoll_fd;
// 存储 epoll 等待返回的事件数组
struct epoll_event _epoll_events[MAX_EPOLLEVENTS];
// 存储文件描述符和对应的 Channel 指针的映射
std::unordered_map<int, Channel*> _channel_map;
// 直接操作 epoll_ctl 函数,更新指定 Channel 的事件监听状态
void _UpdateEpollEvent(Channel* channel, int op) {
// 获取 Channel 的文件描述符
int fd = channel->Fd();
// 构造 epoll_event 结构体
struct epoll_event ev;
ev.data.fd = fd;
ev.events = channel->Events();
// 调用 epoll_ctl 函数进行操作
int ret = epoll_ctl(_epoll_fd, op, fd, &ev);
if (ret < 0) {
ERR_LOG("EPOLLCTL FAILED!");
}
}
// 检查指定的 Channel 是否已经在 _channel_map 中,即是否已经添加了事件监控
bool _HasRegisteredChannel(Channel* channel) {
// 在 _channel_map 中查找指定文件描述符对应的 Channel
auto it = _channel_map.find(channel->Fd());
if (it == _channel_map.end()) {
return false;
}
return true;
}
public:
// 构造函数,创建 epoll 实例
Poller() {
_epoll_fd = epoll_create(MAX_EPOLLEVENTS);
if (_epoll_fd < 0) {
ERR_LOG("EPOLL CREATE FAILED!!");
abort();
}
}
// 添加或修改指定 Channel 的事件监听状态
void UpdateEvent(Channel* channel) {
// 检查 Channel 是否已经注册
bool ret = _HasRegisteredChannel(channel);
if (!ret) {
// 未注册则添加到 _channel_map 中,并调用 _UpdateEpollEvent 函数添加事件监听
_channel_map.insert(std::make_pair(channel->Fd(), channel));
_UpdateEpollEvent(channel, EPOLL_CTL_ADD);
} else {
// 已注册则调用 _UpdateEpollEvent 函数修改事件监听
_UpdateEpollEvent(channel, EPOLL_CTL_MOD);
}
}
// 移除指定 Channel 的事件监听
void RemoveEvent(Channel* channel) {
// 在 _channel_map 中查找指定文件描述符对应的 Channel
auto it = _channel_map.find(channel->Fd());
if (it != _channel_map.end()) {
// 找到则从 _channel_map 中移除
_channel_map.erase(it);
}
// 调用 _UpdateEpollEvent 函数移除事件监听
_UpdateEpollEvent(channel, EPOLL_CTL_DEL);
}
// 进行 epoll 事件监听,将活跃的 Channel 指针添加到 active 向量中
void Poll(std::vector<Channel*>* active) {
// 调用 epoll_wait 函数进行事件监听,-1 表示无限期等待
int nfds = epoll_wait(_epoll_fd, _epoll_events, MAX_EPOLLEVENTS, -1);
if (nfds < 0) {
if (errno == EINTR) {
// 被信号中断,直接返回
return;
}
// 发生其他错误,记录错误日志并终止程序
ERR_LOG("EPOLL WAIT ERROR:%s\n", strerror(errno));
abort();
}
// 遍历所有活跃的事件
for (int i = 0; i < nfds; i++) {
// 在 _channel_map 中查找对应的 Channel
auto it = _channel_map.find(_epoll_events[i].data.fd);
assert(it != _channel_map.end());
// 设置 Channel 的实际就绪事件
it->second->SetREvents(_epoll_events[i].events);
// 将活跃的 Channel 指针添加到 active 中
active->push_back(it->second);
}
}
};
六、事件通道Channel封装
Poller的存在,是为了监听事件,但具体监听什么事件呢?
这就需要用到Channel类。一个Channel对象绑定了一个fd(文件描述符),用来监听发生在fd上的事件,事件包括空事件(不监听)、可读事件、可写事件。当fd上被监听事件就绪时,对应Channel对象就会被Poller放入活跃队列中,进而在loop循环中调用封装在Channel的相应回调来进行事件处理。
Channel可以通过EventLoop,向Poller更新自己关心的(监听)事件。对于epoll,会同步更新传递给内核中epoll模型中的红黑树添加事件关心。
可以这样理解,epoll监听的是fd(上指定的事件epoll_events),Poller监听的是Channel对象(上指定的事件events),当监听到事件就绪时,将对应通道加入活跃通道队列,在EventLoop的loop循环中依次调用Channel中注册的事件回调。
class Channel {
private:
int _fd;
EventLoop *_loop;
uint32_t _events; // 当前需要监控的事件
uint32_t _revents; // 当前连接触发的事件
using EventCallback = std::function<void()>;
EventCallback _read_callback; //可读事件被触发的回调函数
EventCallback _write_callback; //可写事件被触发的回调函数
EventCallback _error_callback; //错误事件被触发的回调函数
EventCallback _close_callback; //连接断开事件被触发的回调函数
EventCallback _event_callback; //任意事件被触发的回调函数
public:
Channel(EventLoop *loop, int fd):_fd(fd), _events(0), _revents(0), _loop(loop) {}
int Fd() { return _fd; }
uint32_t Events() { return _events; }//获取想要监控的事件
void SetREvents(uint32_t events) { _revents = events; }//设置实际就绪的事件
void SetReadCallback(const EventCallback &cb) { _read_callback = cb; }
void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }
void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }
void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }
void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }
//当前是否监控了可读
bool ReadAble() { return (_events & EPOLLIN); }
//当前是否监控了可写
bool WriteAble() { return (_events & EPOLLOUT); }
//启动读事件监控
void EnableRead() { _events |= EPOLLIN; Update(); }
//启动写事件监控
void EnableWrite() { _events |= EPOLLOUT; Update(); }
//关闭读事件监控
void DisableRead() { _events &= ~EPOLLIN; Update(); }
//关闭写事件监控
void DisableWrite() { _events &= ~EPOLLOUT; Update(); }
//关闭所有事件监控
void DisableAll() { _events = 0; Update(); }
//移除监控
void Remove();
void Update();
//事件处理,一旦连接触发了事件,就调用这个函数,自己触发了什么事件如何处理自己决定
void HandleEvent() {
if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) {
/*不管任何事件,都调用的回调函数*/
if (_read_callback) _read_callback();
}
/*有可能会释放连接的操作事件,一次只处理一个*/
if (_revents & EPOLLOUT) {
if (_write_callback) _write_callback();
}else if (_revents & EPOLLERR) {
if (_error_callback) _error_callback();//一旦出错,就会释放连接,因此要放到前边调用任意回调
}else if (_revents & EPOLLHUP) {
if (_close_callback) _close_callback();
}
if (_event_callback) _event_callback();
}
};
// 由于EventLoop模块在Channel之后定义,所以对相关函数在类外进行定义
void Channel::Remove() { return _loop->RemoveEvent(this); }
void Channel::Update() { return _loop->UpdateEvent(this); }
七、定时功能模块封装
为什么要设计定时器模块?
作为一个服务器,有些任务并不要求服务器立即执行,而是等到合适的时机或者指定的时间进行处理。如,当前服务器开启了非活跃连接销毁。当新连接到来我们对其进行初始化设置时,就应为其添加一个定时任务,当该连接在规定时间内都没有进行任何I/O操作,我们则直接销毁该连接。如果在这期间内该连接保持活跃,我们则在该连接触发读/写事件之后更新该连接的定时销毁任务。
我们设计了两个类来实现如上功能:定时任务类TimerTask和定时任务时间轮TimerWheel类。
我们如何计时呢?—— timerfd简要介绍
timerfd 通过创建一个特殊的文件描述符来代表一个定时器。当定时器到期时,内核会将定时器的到期信息写入到对应的文件描述符中,此时该文件描述符变为可读状态。程序可以通过 read 系统调用从文件描述符中读取定时器的到期次数等信息。这样一来,定时器就可以和其他 I/O 事件(如 select、poll、epoll 等)一起进行统一管理。
所以我们建立一个Channel对象来管理timerfd,并将其挂载到epoll模型中进行事件监控。当timerfd触发可读事件,就意味着timerfd已经超时,此时我们读取timerfd的超时次数timeout_times,并执行timeout_times个时间单位所对应的定时任务。
timerfd相关系统调用接口
#include <sys/timerfd.h>
/* 创建一个定时器对象, 返回与之关联的fd
* clockid 可指定为CLOCK_REALTIME(系统范围时钟)或CLOCK_MONOTONIC(不可设置的时钟,不能手动修改)
* flags 可指定为TFD_NONBLOCK(为fd设置O_NONBLOCK),TFD_CLOEXEC(为fd设置close-on-exec)
*/
int timerfd_create(int clockid, int flags);
/* 启动或停止绑定到fd的定时器
* flags 指定0:启动一个相对定时器,由new_value->it_value指定相对定时值;TFD_TIMER_ABSTIME启动一个绝对定时器,由new_value->it_value指定定时值
* old_value 保存旧定时值
*/
int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value);
/* 获取fd对应定时器的当前时间值 */
int timerfd_gettime(int fd, struct itimerspec *curr_value);
时间轮思想
时间轮通常是一个环形结构(逻辑结构 / 物理结构),由多个槽组成,每个槽代表一个时间单位(如10ms、1s等——取决于使用者如何控制时间轮的粒度)。
每个槽中可以存储多个定时任务(通常使用数组/链表的形式组织起来),这些任务在对应的时间单位到期时会被触发。
时间轮的思想来源于钟表,如果我们定了⼀个3点钟的闹铃,则当时针⾛到3的时候,就代表时间到
了。
同样的道理,如果我们定义了⼀个环形数组,并且有⼀个tick指针,指向数组起始位置,这个指针每秒钟向后⾛动⼀步,⾛到哪⾥,则代表哪⾥的任务该被执⾏了。我们将时间轮的移动指针称为“滴答指针tick”,tick当前指向的位置称之为“槽”,tick与tick+1之间相隔的事件称之为一个“槽间隔”。
假设当前槽间隔时长为1s,那么如果我们想要定⼀个3s后的任务,则只需要将任务添加到tick+3位置,则每秒中⾛⼀步,3秒钟后tick⾛到对应位置,这时候执⾏对应位置的任务即可。
但是,同⼀时间可能会有⼤批量的定时任务,因此我们可以给数组对应位置下拉一个数组,这样就可以在同⼀个时刻上添加多个定时任务了。
为什么时间轮需要使用环形结构?
tick指针是一直向后走的,而tick指针之前的位置所存储的定时任务都已经执行完毕并被清空,且时间轮的大小是固定的,那么我们如何使时间轮中的资源可以被循环利用呢?假设当前时间轮仅有一层,槽间隔时长为1s,且最大容量为8,即最多允许设置当前8s后的定时任务。假设当前需要执行1秒后的定时任务,我们置tick = (tick + 1) % 8即可,并处理tick下标对应的任务队列。通过对tick的取模,我们就生成了一个“逻辑上的轮子”。
工作原理
任务添加:
- 当需要添加一个定时任务时,根据任务的超时时间计算出应在哪个槽中放置该任务。
- 如果超时时间超出当前时间轮的覆盖范围,可以将其放入下一级时间轮中。
时间轮的转动:
- 定时器会周期性地转动时间轮,每次转动一个槽。
- 当时间轮转动到某个槽时,会触发该槽中所有定时任务的回调函数。
任务的到期与回调:
- 当时间轮转动到包含到期任务的槽时,会执行这些任务的回调函数。
- 对于需要周期性执行的任务,可以重新计算下次执行时间并重新添加到时间轮中。
时间轮的应用
假设我们规定新接入的连接在30s内如果没有任何I/O操作,则该连接为非活跃连接。所以在未来,我们的服务器在添加一个新连接时,会同时向时间轮的任务队列中为该连接设置一个距当前时间相差30s的定时任务用于销毁非活跃连接。未来,我们的服务器会维持一个“系统定时器”,该定时器的超时时间设置为30s,则30s后该定时器超时,此时我们就需要执行时间轮中tick+30处的任务队列中的定时任务,且更新tick = (tick + 30) % 60。
同时,如果一个连接在30s内发生了I/O事件,说明该连接是“活跃连接”,我们就需要更新该连接的定时任务到发生本次I/O事件后的30s——即需要即时更新时间轮中活跃连接的定时任务。
但是这样就带来了一个问题:我们如何及时执行tick指针指向的任务队列中的任务呢?如何高效地更新的连接的定时任务呢?难道要先找到该连接,将它从当前任务队列中剔除,然后再添加到新的连接中吗?假设当前仅有秒针轮,如此我们便需要进行二维数组的遍历,时间复杂度是O(n²),无疑是低效的。
同时,我们也得考虑⼀个问题,当前的设计是时间到了,则主动去执⾏定时任务,释放连接,那能不能在时间到了后,自动执行定时任务呢,这时候我们就想到⼀个操作——类的析构函数。 ⼀个类的析构函数,在对象被释放时会⾃动被执行,那么我们如果将⼀个定时任务作为⼀个类的析构 函数内的操作,则这个定时任务在对象被释放的时候就会执行。
但是仅仅为了这个⽬的,⽽设计⼀个额外的任务类,好像有些不划算,因为以上操作我们在连接类Connection中就可设计。但是,这⾥我们⼜要考虑另⼀个问题,那就是假如有⼀个连接建⽴成功了,我们给这个连接设置了⼀个30s后的定时销毁任务,但是在第10s的时候,这个连接进⾏了⼀次通信,那么我们应该时在第30s的时候关闭,还是第40s的时候关闭呢?⽆疑应该是在第40s的时候。也就是说,这时候,我们需要让这个第30s的任务失效,但是我们该如何实现这个操作呢?
这⾥,我们就⽤到了智能指针shared_ptr,shared_ptr内部有个计数器,当计数为0的时候,才会真正释放⼀个对象,那么如果连接在第10s进⾏了⼀次通信,则我们继续向定时任务中,添加⼀个30s后(也就 是第40s)的任务类对象的shared_ptr,则这时候两个任务shared_ptr计数为2,则第30s的定时任务 被释放的时候,计数器减1,此时数值变为1,并不为0,则并不会执⾏实际的析构函数,那么就相当于这个第30s的任务失效了,只有在第40s的时候,计数器变为0时,这个任务才会被真正释放。
单独设计一个定时任务类TimerTask来参与时间轮对连接的定时任务的管理,使得Connection类与定时任务的具体实现解耦,自此Connection类的设计只关心连接的逻辑,同时便于未来对定时任务机制进行修改或替换。
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask{
private:
uint64_t _id; // 定时器任务对象ID
uint32_t _timeout; //定时任务的超时时间
uint32_t _delay; //定时任务的延时时间
bool _canceled; // false-表示没有被取消, true-表示被取消
TaskFunc _task_cb; //定时器对象要执行的定时任务
ReleaseFunc _release; //用于删除TimerWheel中保存的定时器对象信息
public:
TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb):
_id(id), _timeout(delay), _task_cb(cb), _canceled(false) {}
~TimerTask() {
if (_canceled == false) _task_cb();
_release();
}
void Cancel() { _canceled = true; } // 取消定时任务
void SetRelease(const ReleaseFunc &cb) { _release = cb; } // 设置回调
uint32_t TimeOut() { return _timeout; } // 返回定时时间
uint32_t DelayTime() { return _delay; } // 返回延时时间
void SetDelay(uint32_t delay) { _delay = delay; }
};
#define MAXCAPACITY 60
// 时间轮类设计——使用数组组织
class TimerWheel
{
using TimerTaskHandler_t = std::function<void (void)>; // 定时任务回调函数
using SharedPtr_t = std::shared_ptr<TimerTask>;
using WeakPtr_t = std::weak_ptr<TimerTask>;
private:
int _capacity; // 时间轮最大容量
int _tick; // 滴答指针
std::vector<std::vector<SharedPtr_t>> _timer_wheel; // 时间轮
std::unordered_map<uint64_t, WeakPtr_t> _timertask_management; // 管理时间轮中的所有定时任务,timer_id和weak_ptr作为键值对
/* 为什么定时任务管理容器中使用weak_ptr,而不是使用shared_ptr呢? */
/* shared_ptr内部包含一个计数器,如果使用shared_ptr为shared_ptr赋值,会导致计数器+1,也就无法保证时间轮的中的定时任务在规定的时间内正确清除 */
/* 而使用weak_ptr接收shared_ptr时不会增加shared_ptr的计数器数值,如此便既保证了对定时任务的管理,也保证了定时任务可以在对象析构的时候正确的执行 */
public:
TimerWheel()
:_tick(0), _capacity(MAXCAPACITY), _timer_wheel(_capacity)
{}
public:
// 为外界提供的一系列接口
// 判断定时任务是否存在
bool IsTimerTaskExist(uint64_t timer_id/*定时任务id*/)
{
if(_timertask_management.find(timer_id) != _timertask_management.end()) return true;
return false;
}
// 移除管理容器中的定时任务项,/*设置给Timer,最终定时任务执⾏完毕后从TimerWheel移除TimerTask信息的回调函数*/
void RemoveTaskFromContainer(uint64_t timer_id/*定时任务id*/)
{
auto it = _timertask_management.find(timer_id);
if(it == _timertask_management.end()) return ;
_timertask_management.erase(it);
}
// 添加定时任务:向管理容器和时间轮中同时添加
void AddTimerTask(TimerTaskHandler_t handle_task/*上层提供的定时任务函数*/, uint64_t timer_id/*定时任务id*/, int timeout/*定时时长*/)
{
// 当前时间轮最多支持60s内的定时任务
if(timeout <= 0 || timeout > 60) return ;
// 当前设计仅支持一个timer_id对应一个定时任务,所以如果定时任务存在则直接返回
if(IsTimerTaskExist(timer_id)) return;
// 构造定式任务类的shared_ptr
SharedPtr_t temp(new TimerTask(timer_id, timeout));
// 设置定时任务函数,对定时任务的管理回调函数
temp->SetTimerTaskHandler(handle_task);
temp->SetRemoveUpperManagement(std::bind(&TimerWheel::RemoveTaskFromContainer, this, timer_id));
// 加入时间轮相应槽的任务队列中
int index = (_tick + timeout) % _capacity;
_timer_wheel[index].emplace_back(temp);
// 托管给管理容器
_timertask_management[timer_id] = WeakPtr_t(temp);
}
// 刷新定时任务
void RefreshTimerTask(uint64_t timer_id/*定时任务id*/)
{
// 如果定时任务不存在,则直接返回
if(!IsTimerTaskExist(timer_id)) return;
// 方式一,不会导致重复计数,因为refresh离开作用域后会自动析构,计数器自动-1
SharedPtr_t refresh = _timertask_management[timer_id].lock();
int timeout = refresh->DelayTime();
// 刷新定时任务在时间轮中的位置
_timer_wheel[(_tick + timeout) % _capacity].emplace_back(refresh);
}
// 取消定时任务
void CancelTimerTask(uint64_t timer_id)
{
if(!IsTimerTaskExist(timer_id)) return ;
WeakPtr_t temp = _timertask_management[timer_id];
temp.lock()->Canceled();
}
/*每滴答⼀次被执⾏_tick++就会向后⾛⼀步,⾛到哪⾥,释放哪⾥的定时器,也就是执⾏哪⾥的定时任务*/
void RunOnTimeTask()
{
_tick = (_tick + 1) % _capacity;
_timer_wheel[_tick].clear();
}
};
单级时间轮的缺陷
1、时间轮的大小在初始化时间轮的时候就已经被设定,后续无法更改。
2、对于时间跨度较长的定时任务,当使用单级时间轮时可能会造成资源的浪费——即时间轮过大且空间资源使用率较低。
3、时间轮采用固定的时间间隔,虽然可以通过定时器对时间间隔进行控制,但缺乏灵活性,不易兼顾较低的时间精度。
使用小根堆实现定时器适合当前高并发服务器的场景吗?
好处:使用小根堆的方式控制定时任务,可以精确地管理任意时间间隔的定时任务,不受时间轮的槽位(时间间隔)限制。并且支持动态添加、删除和修改定时任务,操作灵活。
缺陷:小根堆的插入、删除操作都涉及到堆的调整,且调整操作的时间复杂度为 O(log n),对于大量任务可能会有性能瓶颈。当涉及到批量无序任务导入时,时间复杂度可能会达到O(n * log n)。因此并不适合高并发的场景。
小根堆更适合用于任务数量较少、任务触发不频繁的场景中。
优化方案:使用多级时间轮增加定时任务的精度与上限。
由于我们使用的时单层时间轮,其最大容量capacity则代表当前时间轮最多能存储capacity秒后的定时任务。比如我们如果要定义⼀个60s后的任务,则需要将数组的元素个数设置为60才可以,如果设置一小时后的定时任务,则需要定义3600个元素的数组,这样无疑是比较麻烦的。
为了支持更长的定时时间,可以将多个时间轮级联起来,形成多级时间轮。每一级时间轮负责不同时间范围的定时任务。例如,第一级时间轮负责短期定时(如60s内),第二级负责中期定时(如60分钟内),第三级负责长期定时(如12小时内)。
假如此时有一个80s后的定时任务,80s / 60s = 1min,所以此时我们将该定时任务添加到分针轮1分钟位置当中。当秒针轮走到0时,这意味着已经过去了1分钟,此时分针轮走到1分钟的位置。
注意:我们的定时任务一定是在秒针轮上执行的!!!因此,我们需要将1分钟处的定时任务重新映射到秒针轮上!
每个定时任务都是TimerTask对象,其中包含着定时任务的相对延时时间。因为此时已经经过了一分钟,而延时时间为80s,所以再次映射到秒针轮上的时候,应该映射至(80 - 60)% 60 = 20s的位置上!当1分钟处的任务映射完成后,清除分针轮中1分钟处的shared_ptr管理的定时任务,接着去执行秒针轮上的定时任务。
而时间的精度则是由timerfd的超时时间控制,我们可以通过改变 timerfd的超时时间和多级时间轮的重设计以支持更高精度的定时任务。由于我们本次项目设计无需毫秒级定时任务,所以将最低精度设置为秒级。
在muduo源码当中,定时器模块的定时任务的处理是使用有序集合set(底层为红黑树)实现的。其实类似于以红黑树、小根堆、跳表为底层实现的定时器模块与时间轮最大的不同点就在于前者是采用的是绝对时间延迟,而时间轮采用的是相对时间延迟。不可否认,前者在精度和灵活性的支持上更高,但面对对时间精度要求不高、高并发的场景,时间轮依然是不错的选择。
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask{
private:
uint64_t _id; // 定时器任务对象ID
uint32_t _timeout; //定时任务的超时时间
uint32_t _delay; //定时任务的延时时间
bool _canceled; // false-表示没有被取消, true-表示被取消
TaskFunc _task_cb; //定时器对象要执行的定时任务
ReleaseFunc _release; //用于删除TimerWheel中保存的定时器对象信息
public:
TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb):
_id(id), _timeout(delay), _task_cb(cb), _canceled(false) {}
~TimerTask() {
if (_canceled == false) _task_cb();
_release();
}
void Cancel() { _canceled = true; } // 取消定时任务
void SetRelease(const ReleaseFunc &cb) { _release = cb; } // 设置回调
uint32_t TimeOut() { return _timeout; } // 返回定时时间
uint32_t DelayTime() { return _delay; } // 返回延时时间
void SetDelay(uint32_t delay) { _delay = delay; }
};
// 时分秒三级时间轮类
class TimerWheel {
using TaskFunc = std::function<void()>;
using WeakTask = std::weak_ptr<TimerTask>;
using PtrTask = std::shared_ptr<TimerTask>;
private:
static constexpr int _SECOND_CAPACITY = 60;
static constexpr int _MINUTE_CAPACITY = 60;
static constexpr int _HOUR_CAPACITY = 12;
int _second_tick = 0;
std::vector<std::vector<PtrTask>> _second_wheel;
int _minute_tick = 0;
std::vector<std::vector<PtrTask>> _minute_wheel;
int _hour_tick = 0;
std::vector<std::vector<PtrTask>> _hour_wheel;
std::unordered_map<uint64_t, WeakTask> _timers;
class EventLoop* _loop;
int _timer_fd;
std::unique_ptr<Channel> _timer_channel;
// 移除定时器任务
void RemoveTimer(uint64_t id) {
auto it = _timers.find(id);
if (it != _timers.end()) {
_timers.erase(it);
}
}
// 创建定时器文件描述符
static int CreateTimerFd() {
int timer_fd = timerfd_create(CLOCK_MONOTONIC, 0);
if (timer_fd < 0) {
ERR_LOG("TIMERFD CREATE FAILED!");
std::abort();
}
struct itimerspec itime;
itime.it_value.tv_sec = 1;
itime.it_value.tv_nsec = 0;
itime.it_interval.tv_sec = 1;
itime.it_interval.tv_nsec = 0;
timerfd_settime(timer_fd, 0, &itime, nullptr);
return timer_fd;
}
// 读取定时器文件描述符
int ReadTimerFd() {
uint64_t times;
int ret = read(_timer_fd, ×, 8);
if (ret < 0) {
std::cerr << "READ TIMEFD FAILED!" << std::endl;
std::abort();
}
return times;
}
// 推进秒级时间轮
void AdvanceSecondWheel() {
_second_tick = (_second_tick + 1) % _SECOND_CAPACITY;
if (_second_tick == 0) {
AdvanceMinuteWheel();
}
// 执行秒级时间轮当前槽位的任务
_second_wheel[_second_tick].clear();
}
// 重新映射任务到合适的时间轮
void RemapTask(PtrTask task) {
int remaining_delay = task->DelayTime();
if (remaining_delay < 60) {
// 剩余时间小于 60 秒,放入秒级时间轮
_second_wheel[(remaining_delay + _second_tick) % _SECOND_CAPACITY].push_back(task);
} else if (remaining_delay < 3600) {
// 剩余时间小于 3600 秒,放入分钟级时间轮
int minutes = remaining_delay / 60;
_minute_wheel[(_minute_tick + minutes) % _MINUTE_CAPACITY].push_back(task);
} else {
// 剩余时间大于等于 3600 秒,放入小时级时间轮
int hours = remaining_delay / 3600;
_hour_wheel[(_hour_tick + hours) % _HOUR_CAPACITY].push_back(task);
}
}
// 推进分钟级时间轮
void AdvanceMinuteWheel() {
_minute_tick = (_minute_tick + 1) % _MINUTE_CAPACITY;
if (_minute_tick == 0) {
AdvanceHourWheel();
}
// 处理分钟级时间轮当前槽位的任务
for (auto& task : _minute_wheel[_minute_tick]) {
int remaining_delay = task->DelayTime() - 60;
task->SetDelay(remaining_delay); // 更新任务的延迟时间
RemapTask(task);
}
_minute_wheel[_minute_tick].clear();
}
// 推进小时级时间轮
void AdvanceHourWheel() {
_hour_tick = (_hour_tick + 1) % _HOUR_CAPACITY;
// 处理小时级时间轮当前槽位的任务
for (auto& task : _hour_wheel[_hour_tick]) {
int remaining_delay = task->DelayTime() - 3600;
task->SetDelay(remaining_delay); // 更新任务的延迟时间
RemapTask(task);
}
_hour_wheel[_hour_tick].clear();
}
// 处理定时器超时事件
void OnTime() {
int times = ReadTimerFd();
for (int i = 0; i < times; i++) {
AdvanceSecondWheel();
}
}
// 在事件循环中添加定时器任务
void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc& cb) {
PtrTask pt = std::make_shared<TimerTask>(id, delay, cb);
pt->SetRelease([this, id]() { RemoveTimer(id); });
if (delay < 60) {
// 延迟小于 60 秒,放入秒级时间轮
_second_wheel[(_second_tick + delay) % _SECOND_CAPACITY].push_back(pt);
} else if (delay < 3600) {
// 延迟小于 3600 秒,放入分钟级时间轮
int minutes = delay / 60;
_minute_wheel[(_minute_tick + minutes) % _MINUTE_CAPACITY].push_back(pt);
} else {
// 延迟大于等于 3600 秒,放入小时级时间轮
int hours = delay / 3600;
_hour_wheel[(_hour_tick + hours) % _HOUR_CAPACITY].push_back(pt);
}
_timers[id] = pt;
}
// 在事件循环中刷新定时器任务
void TimerRefreshInLoop(uint64_t id) {
auto it = _timers.find(id);
if (it == _timers.end()) {
return;
}
PtrTask pt = it->second.lock();
if (!pt) {
return;
}
int delay = pt->TimeOut();
if (delay < 60) {
// 延迟小于 60 秒,放入秒级时间轮
_second_wheel[(_second_tick + delay) % _SECOND_CAPACITY].push_back(pt);
} else if (delay < 3600) {
// 延迟小于 3600 秒,放入分钟级时间轮
int minutes = delay / 60;
_minute_wheel[(_minute_tick + minutes) % _MINUTE_CAPACITY].push_back(pt);
} else {
// 延迟大于等于 3600 秒,放入小时级时间轮
int hours = delay / 3600;
_hour_wheel[(_hour_tick + hours) % _HOUR_CAPACITY].push_back(pt);
}
}
// 在事件循环中取消定时器任务
void TimerCancelInLoop(uint64_t id) {
auto it = _timers.find(id);
if (it == _timers.end()) {
return;
}
PtrTask pt = it->second.lock();
if (pt) {
pt->Cancel();
}
}
public:
TimerWheel(class EventLoop* loop)
: _second_wheel(_SECOND_CAPACITY), _minute_wheel(_MINUTE_CAPACITY), _hour_wheel(_HOUR_CAPACITY),
_loop(loop), _timer_fd(CreateTimerFd()),
_timer_channel(new Channel(loop, _timer_fd)) {
_timer_channel->SetReadCallback([this]() { OnTime(); });
_timer_channel->EnableRead();
}
// 添加定时器任务
void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc& cb);
// 刷新定时器任务
void TimerRefresh(uint64_t id);
// 取消定时器任务
void TimerCancel(uint64_t id);
// 检查定时器任务是否存在
bool HasTimer(uint64_t id) {
auto it = _timers.find(id);
return it != _timers.end();
}
};
八、EventLoop、LoopThread、LoopThreadPool模块
muduo网络库采用的是Reactor模式。服务器软件框架是one loop per thread,即一个线程一个事件循环。这个循环称为EventLoop,loop线程是指运行EventLoop::loop()的线程。这种以事件为驱动的编程模式称为事件驱动模式。
网络库是由Reactor + 线程池来完成的,线程池中每个线程都是一个Reactor模型。这种结构在处理大量并发I/O连接任务的服务器上,就很有优势。
我们实际上要实现的是主从Reactor模型——即主线程负责监听连接,并将新建连接采用RR轮询的方式分配给LoopThreadPool中LoopThread线程对应的EventLoop中进行时间监控。
什么是主从Reactor模式?——简要介绍
要求主线程(I/O单元)只负责监听文件描述符上是否有事件发生,有的话就立即将该事件通知工作线程(逻辑单元)。除此之外,主线程不做其他任何实质性的工作。读写数据,接受新连接,以及处理客户请求均在工作线程中完成。
使用同步I/O模型(以epoll(7)为例)实现Reactor模式工作流程:
1、主线程往epoll内核事件表注册socket上的读就绪事件(epoll_ctl + EPOLL_CTL_ADD);
2、主线程调用epoll_wait等待socket上有数据可读;
3、当socket上有数可读时,epoll_wait通知主线程。主线程则将socket可读事件放入请求队列;
4、阻塞在请求队列上的某个工作线程被唤醒,它从socket读取数据,并处理客户请求,然后往epoll内核事件表中注册该socket上写就绪事件。
5、主线程调用epoll_wait等待socket可写;
6、当socket可写时,epoll_wait通知主线程。主线程将socket可写事件放入请求队列。
7、阻塞在请求队列上的某个工作线程被唤醒,它往socket上写入服务器处理客户请求的结果。
当然,epoll不止能监听socket上的事件,其他文件描述符都能监听。
下图是主从Reactor模式的工作流程示意:
EventLoop类封装设计
EventLoop是一个接口类,不宜暴露太多内部细节给客户,接口及其使用应尽量简洁。EventLoop的主要职责是:
1、提供定时执行用户指定任务的方法,支持一次性、周期执行用户任务;
2、提供一个运行循环,每当Poller监听到有通道对应事件发生时,会将通道加入激活通道列表,运行循环要不断从取出激活通道,然后调用事件回调处理事件;
3、每个EventLoop对应一个线程,不允许一对多或者多对一,提供判断当前线程是否为创建EventLoop对象的线程的方法;
4、允许在其他线程中调用EventLoop的public接口,但同时要确保线程安全;
为什么要保证EventLoop类中方法的线程安全?
未来我们的接口是提供给服务器开发者的,我们并不确保开发者在上层是否在多线程/引入线程池的环境中进行I/O、定时器任务添加或业务处理,以上操作皆涉及到对公共资源的修改。如果我们在可能发生多个线程同时修改公共资源的地方添加互斥锁保证多线程的同步,在高并发量的情况下,锁的竞争势必会造成时间与资源的浪费。
而muduo的设计思想如下:将所有可能涉及到公共资源的操作统一放在EventLoop中进行,因为一个EventLoop对应一个线程,所以只要在EventLoop中执行任务,一定是单线程的,也不必关心会发生多个线程同时操作一个公共资源的情况。
为此,EventLoop中提供了如下函数来保证对于公共资源的安全操作。
IsInLoopThread()/AssertInLoopThread():判断/断言 当前线程是创建当前EventLoop对象的线程。
RunInLoop():在EventLoop线程中“立即”运行一次用户任务。如果当前执行流线程为EventLoop所属线程,则直接执行函数。否则将任务函数添加到EventLoop的待执行任务队列中。
QueueInLoop():在EventLoop线程中向EventLoop的待执行任务队列中添加一次用户任务。
在未来的函数设计中,但凡涉及到公共资源的操作的函数,都需要封装为函数对象交给RunInLoop()/QueueInLoop()进行执行/添加至待执行任务队列中。
使用eventfd进行事件通知。
eventfd是用来传递事件的fd, 在本项目中的作用是为了唤醒有可能epoll因为没有事件就绪而导致的阻塞, 因为要执行后续的任务, 所以通过eventfd来唤醒epoll_wait。
当函数调用QueueInLoop()时,就说明任务队列中新增了任务。此时向eventfd中写入数据用于唤醒阻塞状态,进而去执行任务队列中的任务。
原理:eventfd实现的是⼀个计数功能, 写进去⼀个 8 字节的整数,eventfd 实现的逻辑是累计计数;读的时候,读到总计数,并且会清零。使⽤ epoll 监听事件,那么都是监听读事件,因为监听写事件⽆意义, read eventfd 的时候,如果计数器的值为 0,就会阻塞, ⼀旦向该eventfd写⼊数据,则唤醒epoll。
为什么不使用普通的fd?
不是所有的 fd 类型都可用 epoll 池来监听事件的,只有实现了 file_operation- >poll 的调用的“文件” fd 才能被 epoll 管理。eventfd 刚好就实现了这个接口。
class EventLoop {
private:
using Functor = std::function<void()>;
std::thread::id _thread_id; // 线程ID
int _event_fd; // eventfd唤醒IO事件监控有可能导致的阻塞
std::unique_ptr<Channel> _event_channel;
Poller _poller; // 进行所有描述符的事件监控
std::vector<Functor> _tasks; // 任务池
std::mutex _mutex; // 实现任务池操作的线程安全
TimerWheel _timer_wheel; // 定时器模块
public:
// 执行任务池中的所有任务
void RunAllTask() {
std::vector<Functor> functor;
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks.swap(functor);
}
for (auto& f : functor) {
f();
}
return;
}
static int CreateEventFd() {
int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (efd < 0) {
ERR_LOG("CREATE EVENTFD FAILED!!");
abort(); // 让程序异常退出
}
return efd;
}
void ReadEventfd() {
uint64_t res = 0;
int ret = read(_event_fd, &res, sizeof(res));
if (ret < 0) {
// EINTR -- 被信号打断; EAGAIN -- 表示无数据可读
if (errno == EINTR || errno == EAGAIN) {
return;
}
ERR_LOG("READ EVENTFD FAILED!");
abort();
}
return;
}
void WeakUpEventFd() {
uint64_t val = 1;
int ret = write(_event_fd, &val, sizeof(val));
if (ret < 0) {
if (errno == EINTR) {
return;
}
ERR_LOG("READ EVENTFD FAILED!");
abort();
}
return;
}
public:
EventLoop()
: _thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()),
_event_channel(new Channel(this, _event_fd)), _timer_wheel(this) {
// 给eventfd添加可读事件回调函数,读取eventfd事件通知次数
_event_channel->SetReadCallback(
std::bind(&EventLoop::ReadEventfd, this));
// 启动eventfd的读事件监控
_event_channel->EnableRead();
}
// 启动事件循环
void Start() {
while (1) {
// 1. 事件监控,
std::vector<Channel*> actives;
_poller.Poll(&actives);
// 2. 事件处理。
for (auto& channel : actives) {
channel->HandleEvent();
}
// 3. 执行任务
RunAllTask();
}
}
// 用于判断当前线程是否是EventLoop对应的线程;
bool IsInLoop() { return (_thread_id == std::this_thread::get_id()); }
void AssertInLoop() { assert(_thread_id == std::this_thread::get_id()); }
// 判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列。
void RunInLoop(const Functor& cb) {
if (IsInLoop()) {
return cb();
}
return QueueInLoop(cb);
}
// 将操作压入任务池
void QueueInLoop(const Functor& cb) {
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks.push_back(cb);
}
// 唤醒有可能因为没有事件就绪,而导致的epoll阻塞;
// 其实就是给eventfd写入一个数据,eventfd就会触发可读事件
WeakUpEventFd();
}
// 添加/修改描述符的事件监控
void UpdateEvent(Channel* channel) { return _poller.UpdateEvent(channel); }
// 移除描述符的监控
void RemoveEvent(Channel* channel) { return _poller.RemoveEvent(channel); }
void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc& cb) {
return _timer_wheel.TimerAdd(id, delay, cb);
}
void TimerRefresh(uint64_t id) { return _timer_wheel.TimerRefresh(id); }
void TimerCancel(uint64_t id) { return _timer_wheel.TimerCancel(id); }
bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }
};
LoopThread类封装设计
为了保证一个EventLoop只属于一个LoopThread,所以我们的EventLoop对象只在LoopThread类对象中进行创建。
LoopThread类对外提供一个GetLoop()函数,用于获取LoopThread对象中的EventLoop。
需要注意:LoopThread中EventLoop的创建与获取需要使用互斥锁+条件变量的方式保证执行顺序的同步性。
我们将EventLoop对象的实例化放入线程的执行函数中,假如此时执行函数并未执行完毕,且有另一个线程想要获取该LoopThread对象中的EventLoop对象指针,只能获取到一个空指针。因为EventLoop对象的指针并未实例化完成。
因此,我们在两个函数中使用互斥锁,并在GetLoop()函数中使用条件变量判断,如果EventLoop对象指针为空就一致等待,直到EventLoop对象创建完成。
class LoopThread {
private:
/*用于实现_loop获取的同步关系,避免线程创建了,但是_loop还没有实例化之前去获取_loop*/
std::mutex _mutex; // 互斥锁
std::condition_variable _cond; // 条件变量
EventLoop* _loop; // EventLoop指针变量,这个对象需要在线程内实例化
std::thread _thread; // EventLoop对应的线程
private:
/*实例化 EventLoop
* 对象,唤醒_cond上有可能阻塞的线程,并且开始运行EventLoop模块的功能*/
void ThreadEntry() {
EventLoop loop;
{
std::unique_lock<std::mutex> lock(_mutex); // 加锁
_loop = &loop;
_cond.notify_all();
}
loop.Start();
}
public:
/*创建线程,设定线程入口函数*/
LoopThread()
: _loop(NULL), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}
/*返回当前线程关联的EventLoop对象指针*/
EventLoop* GetLoop() {
EventLoop* loop = NULL;
{
std::unique_lock<std::mutex> lock(_mutex); // 加锁
_cond.wait(lock,
[&]() { return _loop != NULL; }); // loop为NULL就一直阻塞
loop = _loop;
}
return loop;
}
};
LoopThreadPool封装设计
LoopThreadPool为外层提供使用RR轮询的方式分配EventLoop的函数接口、提供设置线程数量的接口。
需要注意,上层使用者可能并不使用多线程。当线程池中的数量为0时,当调用EventLoop的分配接口时,直接返回主线程baseLoop。由主线程承担各项工作——此时为单线程Reactor模式。
class LoopThreadPool {
private:
int _thread_count;
int _next_idx;
EventLoop* _baseloop;
std::vector<LoopThread*> _threads;
std::vector<EventLoop*> _loops;
public:
LoopThreadPool(EventLoop* baseloop)
: _thread_count(0), _next_idx(0), _baseloop(baseloop) {}
void SetThreadCount(int count) { _thread_count = count; }
void Create() {
if (_thread_count > 0) {
_threads.resize(_thread_count);
_loops.resize(_thread_count);
for (int i = 0; i < _thread_count; i++) {
_threads[i] = new LoopThread();
_loops[i] = _threads[i]->GetLoop();
}
}
return;
}
EventLoop* NextLoop() {
if (_thread_count == 0) {
return _baseloop;
}
_next_idx = (_next_idx + 1) % _thread_count;
return _loops[_next_idx];
}
};
九、Connection模块设计
Connection模块是对Buffer模块,Socket模块,Channel模块的⼀个整体封装,实现了对一个通信套接字的整体的管理,每⼀个进行数据通信的套接字(也就是accept获取到的新连接)都会使用Connection进行管理。
Connection模块内部包含有三个由组件使用者传入的回调函数:连接建立完成回调,事件回调,新数据回调,关闭回调。
Connection模块内部包含有两个组件使用者提供的接口:数据发送接口,连接关闭接口
Connection模块内部包含有两个用户态缓冲区:用户态接收缓冲区,用户态发送缓冲区
Connection模块内部包含有⼀个Socket对象:完成描述符面向系统的IO操作
Connection模块内部包含有⼀个Channel对象:完成描述符IO事件就绪的处理
Connection模块内部包含为Channel设置的回调函数:读、写、错误等事件的触发函数
具体处理流程如下:
1. 实现向Channel提供可读,可写,错误等不同事件的IO事件回调函数,然后将Channel和对应的描述符添加到Poller事件监控中。
2. 当描述符在Poller模块中就绪了IO可读事件,则调用描述符对应Channel中保存的读事件处理函数,进⾏数据读取,将socket接收缓冲区全部读取到Connection管理的用户态接收缓冲区中。然后调用由组件使用者传⼊的新数据到来回调函数进行处理。
3. 组件使用者进行数据的业务处理完毕后,通过Connection向使用者提供的数据发送接口,将数据写入Connection的发送缓冲区中。
4. 启动描述符在Poller模块中的IO写事件监控,就绪后,调用Channel中保存的写事件处理函数,将发送缓冲区中的数据通过Socket进⾏面向系统的实际数据发送。
基于Connection不同的状态,采用状态机的方式确保代码执行流程正确。
因此为 Connection定义如下状态:
- CONNECTING -- 连接建立成功-待处理状态
- CONNECTED -- 连接建立完成,各种设置已完成,可以通信的状态;
- DISCONNECTING -- 待关闭状态
- DISCONECTED -- 连接关闭状态;
只有当当前状态的所有操作完成并满足下一个执行函数的状态后,才能执行后续代码。
例如:如果此时服务器启动了非活跃链接的定时销毁任务,在Connection的构造函数中,我们并不能立刻就为其添加定时销毁任务,因为此时还可能在进行各项初始化工作或并没有开启读关心事件。因此我们在构造时设置为CONNECTING状态,当构造完成后进行读关心事件的开启和定时任务的添加,完成这些操作后将状态设置为CONNECTED表示连接初始化彻底完成,可以进行通信。当连接通信完毕需要关闭时,将状态设置为DISCONNECTING状态,我们需要处理Connection缓冲区中所剩余的数据,并在数据处理完之后关闭连接。在关闭连接的函数中,其状态一定是上层的DISCONNECTING状态,代表数据处理完成可以关闭连接,此时将连接状态设置为DISCONECTED并执行连接关闭相关的函数。
优势:状态机可以避免不同操作对连接状态的混淆,确保每个状态的转换都是明确和可控的。比如,在 CONNECTED 状态下,才能进行数据的读写操作,如果处于其他状态则不允许,这样可以避免因错误的操作导致程序出现异常。
Connection类继承了一个类:std::enable_shared_from_this<Connection>
如果Connection中有正在发送的数据,怎么保证在某些回调函数中触发Connection关闭机制后,能让Connection先把数据发送完再释放Connection对象的资源?
这就需要我们始终保持Connection不因为在业务处理/IO相关的函数中发生错误时触发关闭机制后被意外删除。因此,对相关函数传参时使用shared_ptr智能指针传参,并且在TcpServer中始终保存一份Connection的shared_ptr。因此,即时在其他函数触发关闭机制后,由于shared_ptr中引用计数的存在Connection也不会被真正的销毁。只有在处理完Connection缓冲区中的数据后,在关闭连接时才会调用上层TcpServer设置的回调函数进行彻底销毁。
Connection中的Any类对象用于在协议切换时即时更新协议的上下文。
// 前向声明 Connection 类
class Connection;
// 定义连接状态的枚举类型
// CONNECTING -- 连接建立成功,但处于待处理状态
// CONNECTED -- 连接建立完成,各项设置已完成,可以进行通信的状态
// DISCONNECTING -- 连接处于待关闭状态
// DISCONNECTED -- 连接关闭状态
typedef enum { DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING } ConnStatu;
// 使用智能指针管理 Connection 对象
using PtrConnection = std::shared_ptr<Connection>;
// Connection 类,用于管理网络连接,继承自 std::enable_shared_from_this 以便安全地获取自身的 shared_ptr
class Connection : public std::enable_shared_from_this<Connection> {
private:
uint64_t _conn_id; // 连接的唯一标识符,用于连接的管理和查找
int _sockfd; // 连接关联的文件描述符
bool _enable_inactive_release; // 连接是否启用非活跃销毁的判断标志,默认为 false
EventLoop *_loop; // 连接所关联的 EventLoop 对象,用于事件循环和任务调度
ConnStatu _statu; // 连接的当前状态
Socket _socket; // 套接字操作管理对象,负责与网络套接字的交互
Channel _channel; // 连接的事件管理对象,用于处理文件描述符的各种事件
Buffer _in_buffer; // 输入缓冲区,用于存放从套接字中读取到的数据
Buffer _out_buffer; // 输出缓冲区,用于存放要发送给对端的数据
Any _context; // 请求的接收处理上下文,可用于存储和传递额外的信息
// 定义各种回调函数类型,这些回调函数由服务器模块或组件使用者设置
// ConnectedCallback -- 连接建立完成时的回调函数
using ConnectedCallback = std::function<void(const PtrConnection&)>;
// MessageCallback -- 接收到消息时的回调函数
using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;
// ClosedCallback -- 连接关闭时的回调函数
using ClosedCallback = std::function<void(const PtrConnection&)>;
// AnyEventCallback -- 任意事件触发时的回调函数
using AnyEventCallback = std::function<void(const PtrConnection&)>;
ConnectedCallback _connected_callback; // 连接建立完成时的回调函数
MessageCallback _message_callback; // 接收到消息时的回调函数
ClosedCallback _closed_callback; // 连接关闭时的回调函数
AnyEventCallback _event_callback; // 任意事件触发时的回调函数
// 组件内的连接关闭回调函数,用于在服务器组件内移除连接信息
ClosedCallback _server_closed_callback;
private:
// 处理文件描述符可读事件的回调函数
// 当文件描述符可读时,从套接字读取数据到输入缓冲区,并调用消息处理回调函数
void HandleRead() {
// 1. 从套接字接收数据到缓冲区
char buf[65536];
ssize_t ret = _socket.NonBlockRecv(buf, 65535);
if (ret < 0) {
// 接收数据出错,不能直接关闭连接,调用 ShutdownInLoop 处理
return ShutdownInLoop();
}
// 将读取到的数据写入输入缓冲区,并更新写偏移
_in_buffer.WriteAndPush(buf, ret);
// 2. 如果输入缓冲区有可读数据,调用消息处理回调函数进行业务处理
if (_in_buffer.ReadAbleSize() > 0) {
// 使用 shared_from_this 获取当前对象的 shared_ptr,确保对象在回调期间不会被意外销毁
return _message_callback(shared_from_this(), &_in_buffer);
}
}
// 处理文件描述符可写事件的回调函数
// 当文件描述符可写时,将输出缓冲区中的数据发送出去
void HandleWrite() {
// 从输出缓冲区读取数据并发送
ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());
if (ret < 0) {
// 发送数据出错,先处理输入缓冲区中的数据,然后释放连接
if (_in_buffer.ReadAbleSize() > 0) {
_message_callback(shared_from_this(), &_in_buffer);
}
return Release();
}
// 更新输出缓冲区的读偏移
_out_buffer.MoveReadOffset(ret);
// 如果输出缓冲区没有待发送的数据,关闭写事件监控
if (_out_buffer.ReadAbleSize() == 0) {
_channel.DisableWrite();
// 如果连接处于待关闭状态,释放连接
if (_statu == DISCONNECTING) {
return Release();
}
}
return;
}
// 处理文件描述符挂断事件的回调函数
// 当文件描述符触发挂断事件时,处理输入缓冲区中的数据,然后释放连接
void HandleClose() {
// 如果输入缓冲区有数据,调用消息处理回调函数处理
if (_in_buffer.ReadAbleSize() > 0) {
_message_callback(shared_from_this(), &_in_buffer);
}
return Release();
}
// 处理文件描述符出错事件的回调函数
// 当文件描述符触发出错事件时,调用 HandleClose 处理
void HandleError() {
return HandleClose();
}
// 处理文件描述符任意事件的回调函数
// 当文件描述符触发任意事件时,刷新连接的活跃度(如果启用了非活跃销毁),并调用任意事件回调函数
void HandleEvent() {
if (_enable_inactive_release == true) {
// 刷新连接的活跃度,延迟定时销毁任务
_loop->TimerRefresh(_conn_id);
}
if (_event_callback) {
// 调用任意事件回调函数
_event_callback(shared_from_this());
}
}
// 连接建立完成后,在 EventLoop 中执行的初始化操作
// 修改连接状态,启动读事件监控,并调用连接建立完成的回调函数
void EstablishedInLoop() {
// 确保当前连接状态为 CONNECTING
assert(_statu == CONNECTING);
// 修改连接状态为 CONNECTED
_statu = CONNECTED;
// 启动读事件监控
_channel.EnableRead();
// 如果设置了连接建立完成的回调函数,调用该函数
if (_connected_callback) {
_connected_callback(shared_from_this());
}
}
// 实际释放连接资源的函数
// 修改连接状态,移除事件监控,关闭文件描述符,取消定时销毁任务,调用关闭回调函数
void ReleaseInLoop() {
// 1. 修改连接状态为 DISCONNECTED
_statu = DISCONNECTED;
// 2. 移除连接的事件监控
_channel.Remove();
// 3. 关闭文件描述符
_socket.Close();
// 4. 如果定时器队列中存在定时销毁任务,取消该任务
if (_loop->HasTimer(_conn_id)) {
CancelInactiveReleaseInLoop();
}
// 5. 调用用户设置的关闭回调函数
if (_closed_callback) {
_closed_callback(shared_from_this());
}
// 移除服务器内部管理的连接信息
if (_server_closed_callback) {
_server_closed_callback(shared_from_this());
}
}
// 发送数据的内部函数
// 将数据放入输出缓冲区,启动写事件监控
void SendInLoop(Buffer &buf) {
// 如果连接已经关闭,直接返回
if (_statu == DISCONNECTED) {
return;
}
// 将数据写入输出缓冲区
_out_buffer.WriteBufferAndPush(buf);
// 如果写事件未启用,启动写事件监控
if (_channel.WriteAble() == false) {
_channel.EnableWrite();
}
}
// 关闭连接的内部函数
// 设置连接为待关闭状态,处理输入缓冲区中的数据,根据输出缓冲区的情况决定是否释放连接
void ShutdownInLoop() {
// 设置连接状态为 DISCONNECTING
_statu = DISCONNECTING;
// 如果输入缓冲区有数据,调用消息处理回调函数处理
if (_in_buffer.ReadAbleSize() > 0) {
if (_message_callback) {
_message_callback(shared_from_this(), &_in_buffer);
}
}
// 如果输出缓冲区有数据,启动写事件监控
if (_out_buffer.ReadAbleSize() > 0) {
if (_channel.WriteAble() == false) {
_channel.EnableWrite();
}
}
// 如果输出缓冲区没有数据,释放连接
if (_out_buffer.ReadAbleSize() == 0) {
Release();
}
}
// 启用非活跃连接超时释放规则的内部函数
// 设置启用标志,根据定时任务的存在情况进行刷新或新增
void EnableInactiveReleaseInLoop(int sec) {
// 1. 设置启用非活跃销毁的标志为 true
_enable_inactive_release = true;
// 2. 如果定时销毁任务已经存在,刷新该任务
if (_loop->HasTimer(_conn_id)) {
return _loop->TimerRefresh(_conn_id);
}
// 3. 如果定时销毁任务不存在,新增定时销毁任务
_loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this));
}
// 取消非活跃连接超时释放规则的内部函数
// 禁用标志,取消定时销毁任务
void CancelInactiveReleaseInLoop() {
// 禁用非活跃销毁标志
_enable_inactive_release = false;
// 如果定时销毁任务存在,取消该任务
if (_loop->HasTimer(_conn_id)) {
_loop->TimerCancel(_conn_id);
}
}
// 升级连接协议的内部函数
// 重置上下文和回调函数
void UpgradeInLoop(const Any &context,
const ConnectedCallback &conn,
const MessageCallback &msg,
const ClosedCallback &closed,
const AnyEventCallback &event) {
// 更新上下文
_context = context;
// 更新连接建立完成的回调函数
_connected_callback = conn;
// 更新消息处理回调函数
_message_callback = msg;
// 更新连接关闭的回调函数
_closed_callback = closed;
// 更新任意事件的回调函数
_event_callback = event;
}
public:
// 构造函数,初始化连接对象
Connection(EventLoop *loop, uint64_t conn_id, int sockfd)
: _conn_id(conn_id), _sockfd(sockfd),
_enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd),
_channel(loop, _sockfd) {
// 设置 Channel 的各种回调函数
_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));
_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));
_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));
_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));
_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));
}
// 析构函数,输出连接释放的调试信息
~Connection() {
DBG_LOG("RELEASE CONNECTION:%p", this);
}
// 获取连接关联的文件描述符
int Fd() {
return _sockfd;
}
// 获取连接的唯一标识符
int Id() {
return _conn_id;
}
// 判断连接是否处于 CONNECTED 状态
bool Connected() {
return (_statu == CONNECTED);
}
// 设置连接的上下文信息
void SetContext(const Any &context) {
_context = context;
}
// 获取连接的上下文信息指针
Any *GetContext() {
return &_context;
}
// 设置连接建立完成的回调函数
void SetConnectedCallback(const ConnectedCallback& cb) {
_connected_callback = cb;
}
// 设置消息处理的回调函数
void SetMessageCallback(const MessageCallback& cb) {
_message_callback = cb;
}
// 设置连接关闭的回调函数
void SetClosedCallback(const ClosedCallback& cb) {
_closed_callback = cb;
}
// 设置任意事件的回调函数
void SetAnyEventCallback(const AnyEventCallback& cb) {
_event_callback = cb;
}
// 设置服务器内部的连接关闭回调函数
void SetSrvClosedCallback(const ClosedCallback& cb) {
_server_closed_callback = cb;
}
// 连接建立就绪后,在 EventLoop 中执行初始化操作
void Established() {
_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));
}
// 发送数据的公共接口
// 将数据放入缓冲区,在 EventLoop 中执行发送操作
void Send(const char *data, size_t len) {
// 创建临时缓冲区,避免传入的 data 可能是临时空间
Buffer buf;
buf.WriteAndPush(data, len);
// 在 EventLoop 中执行发送操作
_loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf)));
}
// 关闭连接的公共接口
// 在 EventLoop 中执行关闭操作
void Shutdown() {
_loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this));
}
// 释放连接资源的公共接口
// 将释放操作放入 EventLoop 的任务队列中执行
void Release() {
_loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop, this));
}
// 启用非活跃连接超时释放规则的公共接口
// 在 EventLoop 中执行启用操作
void EnableInactiveRelease(int sec) {
_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));
}
// 取消非活跃连接超时释放规则的公共接口
// 在 EventLoop 中执行取消操作
void CancelInactiveRelease() {
_loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this));
}
// 升级连接协议的公共接口
// 在 EventLoop 线程中立即执行升级操作
void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg,
const ClosedCallback &closed, const AnyEventCallback &event) {
// 确保在 EventLoop 线程中执行
_loop->AssertInLoop();
// 在 EventLoop 中执行升级操作
_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event));
}
};
十、Acceptor模块设计
实现向Channel提供可读事件的IO事件处理回调函数,函数的功能其实也就是获取新连接。
为新连接构建一个Connection对象的回调函数由TcpServer模块提供,TcpServer是Connection的实际管理者。
class Acceptor {
private:
Socket _socket;//用于创建监听套接字
EventLoop *_loop; //用于对监听套接字进行事件监控
Channel _channel; //用于对监听套接字进行事件管理
using AcceptCallback = std::function<void(int)>;
AcceptCallback _accept_callback;
private:
/*监听套接字的读事件回调处理函数---获取新连接,调用_accept_callback函数进行新连接处理*/
void HandleRead() {
int newfd = _socket.Accept();
if (newfd < 0) {
return ;
}
if (_accept_callback) _accept_callback(newfd);
}
int CreateServer(int port) {
bool ret = _socket.CreateServer(port);
assert(ret == true);
return _socket.Fd();
}
public:
/*不能将启动读事件监控,放到构造函数中,必须在设置回调函数后,再去启动*/
/*否则有可能造成启动监控后,立即有事件,处理的时候,回调函数还没设置:新连接得不到处理,且资源泄漏*/
Acceptor(EventLoop *loop, int port): _socket(CreateServer(port)), _loop(loop),
_channel(loop, _socket.Fd()) {
_channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));
}
void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; }
void Listen() { _channel.EnableRead(); }
};
十一、TcpServer模块设计
muduo使用Connection类来管理TCP连接,使用接受器Acceptor来接受连接。TcpServer管理Acceptor获得Connection,生命周期由用户控制。
使用TcpServer类即可搭建一个简单的服务器。
我们从一个新连接的生命周期来观察TcpServer的工作流程:
1、TcpServer管理Acceptor模块,创建监听套接字。
2、TcpServer设置各项回调函数,其中包含新连接到来时创建新连接的回调函数。
3、TcpServer控制Acceptor开启监听——即将监听套接字的读事件设置为关心,当触发读事件就绪后,调用OnNewConnection回调函数对新连接进行初始化。
4、在初始化时,为新连接设置业务处理等回调函数,设置epoll关心事件如读、写事件的回调函数。为Connection分配一个EventLoop事件循环对象,并将该事件设置的关心事件更新至Connection所属EventLoop的Poller对象中。
5、当连接的读/写事件就绪后,触发Connection模块内部的读/写事件回调,进行数据I/O和业务处理。如果当前服务器开启了非活跃连接销毁,则在业务处理完成后更新当前连接的定时销毁任务。
6、当长时间未通信或发生意外错误时,对连接的缓冲区中未处理完的数据进行处理。当处理完成后,将关闭连接的任务推送至EventLoop的任务池中。【注意:为保证数据安全处理,我们不直接销毁连接,而是等epoll中就绪的事件处理完成后再去执行任务池中的销毁任务,这样可以防止处理完数据尚未发送完成时,连接就已经被销毁的情况。尽管主动关闭的一方会进入TIME_WAIT状态,我们还是做好应用层的预防工作】。
7、调用TcpServer设置的清除连接的回调函数,清除TcpServer中所关闭的连接。因为Connection在项目中都是使用shared_ptr进行管理的,只有TcpServer所管理的那一份连接的引用计数被彻底清除之后,连接才会真正地被销毁。
TcpServer模块关联图:
class TcpServer {
private:
uint64_t _next_id; //这是一个自动增长的连接ID,
int _port;
int _timeout; //这是非活跃连接的统计时间---多长时间无通信就是非活跃连接
bool _enable_inactive_release;//是否启动了非活跃连接超时销毁的判断标志
EventLoop _baseloop; //这是主线程的EventLoop对象,负责监听事件的处理
Acceptor _acceptor; //这是监听套接字的管理对象
LoopThreadPool _pool; //这是从属EventLoop线程池
std::unordered_map<uint64_t, PtrConnection> _conns;//保存管理所有连接对应的shared_ptr对象
using ConnectedCallback = std::function<void(const PtrConnection&)>;
using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;
using ClosedCallback = std::function<void(const PtrConnection&)>;
using AnyEventCallback = std::function<void(const PtrConnection&)>;
using Functor = std::function<void()>;
ConnectedCallback _connected_callback;
MessageCallback _message_callback;
ClosedCallback _closed_callback;
AnyEventCallback _event_callback;
private:
void RunAfterInLoop(const Functor &task, int delay) {
_next_id++;
_baseloop.TimerAdd(_next_id, delay, task);
}
//为新连接构造一个Connection进行管理
void NewConnection(int fd) {
_next_id++;
PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));
conn->SetMessageCallback(_message_callback);
conn->SetClosedCallback(_closed_callback);
conn->SetConnectedCallback(_connected_callback);
conn->SetAnyEventCallback(_event_callback);
conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));
if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout);//启动非活跃超时销毁
conn->Established();//就绪初始化
_conns.insert(std::make_pair(_next_id, conn));
}
void RemoveConnectionInLoop(const PtrConnection &conn) {
int id = conn->Id();
auto it = _conns.find(id);
if (it != _conns.end()) {
_conns.erase(it);
}
}
//从管理Connection的_conns中移除连接信息
void RemoveConnection(const PtrConnection &conn) {
_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));
}
public:
TcpServer(int port):
_port(port),
_next_id(0),
_enable_inactive_release(false),
_acceptor(&_baseloop, port),
_pool(&_baseloop) {
_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));
_acceptor.Listen();//将监听套接字挂到baseloop上
}
void SetThreadCount(int count) { return _pool.SetThreadCount(count); }
void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }
void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }
void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; }
void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; }
void EnableInactiveRelease(int timeout) { _timeout = timeout; _enable_inactive_release = true; }
//用于添加一个定时任务
void RunAfter(const Functor &task, int delay) {
_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));
}
void Start() { _pool.Create(); _baseloop.Start(); }
};
// 忽略客户端连接中断时,服务端向客户端发送数据引发SIGPIPE信号导致的服务端进程崩溃
class NetWork {
public:
NetWork() {
DBG_LOG("SIGPIPE INIT");
signal(SIGPIPE, SIG_IGN); // 忽略SIGPIPE信号
}
};
static NetWork nw;
基于TcpSever搭建支持HTTP协议的HttpServer服务器
一、工具类模块
包含URL编码、解码,状态码转化为状态描述,文件名后缀转化为mime,判断资源路径有效性,文件I/O等相关http协议解析常用工具函数。
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <regex>
#include <sys/stat.h>
#include "server.hpp"
#define DEFALT_TIMEOUT 10
std::unordered_map<int, std::string> _statu_msg = {
{100, "Continue"},
{101, "Switching Protocol"},
{102, "Processing"},
{103, "Early Hints"},
{200, "OK"},
{201, "Created"},
{202, "Accepted"},
{203, "Non-Authoritative Information"},
{204, "No Content"},
{205, "Reset Content"},
{206, "Partial Content"},
{207, "Multi-Status"},
{208, "Already Reported"},
{226, "IM Used"},
{300, "Multiple Choice"},
{301, "Moved Permanently"},
{302, "Found"},
{303, "See Other"},
{304, "Not Modified"},
{305, "Use Proxy"},
{306, "unused"},
{307, "Temporary Redirect"},
{308, "Permanent Redirect"},
{400, "Bad Request"},
{401, "Unauthorized"},
{402, "Payment Required"},
{403, "Forbidden"},
{404, "Not Found"},
{405, "Method Not Allowed"},
{406, "Not Acceptable"},
{407, "Proxy Authentication Required"},
{408, "Request Timeout"},
{409, "Conflict"},
{410, "Gone"},
{411, "Length Required"},
{412, "Precondition Failed"},
{413, "Payload Too Large"},
{414, "URI Too Long"},
{415, "Unsupported Media Type"},
{416, "Range Not Satisfiable"},
{417, "Expectation Failed"},
{418, "I'm a teapot"},
{421, "Misdirected Request"},
{422, "Unprocessable Entity"},
{423, "Locked"},
{424, "Failed Dependency"},
{425, "Too Early"},
{426, "Upgrade Required"},
{428, "Precondition Required"},
{429, "Too Many Requests"},
{431, "Request Header Fields Too Large"},
{451, "Unavailable For Legal Reasons"},
{501, "Not Implemented"},
{502, "Bad Gateway"},
{503, "Service Unavailable"},
{504, "Gateway Timeout"},
{505, "HTTP Version Not Supported"},
{506, "Variant Also Negotiates"},
{507, "Insufficient Storage"},
{508, "Loop Detected"},
{510, "Not Extended"},
{511, "Network Authentication Required"}
};
std::unordered_map<std::string, std::string> _mime_msg = {
{".aac", "audio/aac"},
{".abw", "application/x-abiword"},
{".arc", "application/x-freearc"},
{".avi", "video/x-msvideo"},
{".azw", "application/vnd.amazon.ebook"},
{".bin", "application/octet-stream"},
{".bmp", "image/bmp"},
{".bz", "application/x-bzip"},
{".bz2", "application/x-bzip2"},
{".csh", "application/x-csh"},
{".css", "text/css"},
{".csv", "text/csv"},
{".doc", "application/msword"},
{".docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"},
{".eot", "application/vnd.ms-fontobject"},
{".epub", "application/epub+zip"},
{".gif", "image/gif"},
{".htm", "text/html"},
{".html", "text/html"},
{".ico", "image/vnd.microsoft.icon"},
{".ics", "text/calendar"},
{".jar", "application/java-archive"},
{".jpeg", "image/jpeg"},
{".jpg", "image/jpeg"},
{".js", "text/javascript"},
{".json", "application/json"},
{".jsonld", "application/ld+json"},
{".mid", "audio/midi"},
{".midi", "audio/x-midi"},
{".mjs", "text/javascript"},
{".mp3", "audio/mpeg"},
{".mpeg", "video/mpeg"},
{".mpkg", "application/vnd.apple.installer+xml"},
{".odp", "application/vnd.oasis.opendocument.presentation"},
{".ods", "application/vnd.oasis.opendocument.spreadsheet"},
{".odt", "application/vnd.oasis.opendocument.text"},
{".oga", "audio/ogg"},
{".ogv", "video/ogg"},
{".ogx", "application/ogg"},
{".otf", "font/otf"},
{".png", "image/png"},
{".pdf", "application/pdf"},
{".ppt", "application/vnd.ms-powerpoint"},
{".pptx", "application/vnd.openxmlformats-officedocument.presentationml.presentation"},
{".rar", "application/x-rar-compressed"},
{".rtf", "application/rtf"},
{".sh", "application/x-sh"},
{".svg", "image/svg+xml"},
{".swf", "application/x-shockwave-flash"},
{".tar", "application/x-tar"},
{".tif", "image/tiff"},
{".tiff", "image/tiff"},
{".ttf", "font/ttf"},
{".txt", "text/plain"},
{".vsd", "application/vnd.visio"},
{".wav", "audio/wav"},
{".weba", "audio/webm"},
{".webm", "video/webm"},
{".webp", "image/webp"},
{".woff", "font/woff"},
{".woff2", "font/woff2"},
{".xhtml", "application/xhtml+xml"},
{".xls", "application/vnd.ms-excel"},
{".xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"},
{".xml", "application/xml"},
{".xul", "application/vnd.mozilla.xul+xml"},
{".zip", "application/zip"},
{".3gp", "video/3gpp"},
{".3g2", "video/3gpp2"},
{".7z", "application/x-7z-compressed"}
};
class Util {
public:
// 字符串分割函数,将 src 字符串按照 sep 字符进行分割,得到的各个子串放到 arry 中,最终返回子串的数量
// 参数:
// src: 待分割的源字符串
// sep: 分割字符串的分隔符
// arry: 用于存储分割后子串的向量
// 返回值:
// 分割后子串的数量
static size_t Split(const std::string &src, const std::string &sep, std::vector<std::string> *arry) {
size_t offset = 0;
// 遍历源字符串,直到偏移量超出字符串长度
while (offset < src.size()) {
// 在 src 字符串偏移量 offset 处开始向后查找 sep 字符/子串,返回查找到的位置
size_t pos = src.find(sep, offset);
if (pos == std::string::npos) {
// 没有找到特定的字符,将剩余的部分当作一个子串放入 arry 中
if (offset < src.size()) {
arry->push_back(src.substr(offset));
}
return arry->size();
}
if (pos == offset) {
// 当前子串是一个空的,没有内容,跳过
offset = pos + sep.size();
continue;
}
// 将找到的子串添加到 arry 中
arry->push_back(src.substr(offset, pos - offset));
offset = pos + sep.size();
}
return arry->size();
}
// 读取文件的所有内容,将读取的内容放到一个字符串中
static bool ReadFile(const std::string &filename, std::string *buf) {
// 以二进制模式打开文件
std::ifstream ifs(filename, std::ios::binary);
if (!ifs.is_open()) {
std::cerr << "OPEN " << filename << " FILE FAILED!!" << std::endl;
return false;
}
// 获取文件大小
ifs.seekg(0, ifs.end);
size_t fsize = ifs.tellg();
ifs.seekg(0, ifs.beg);
// 调整字符串大小以容纳文件内容
buf->resize(fsize);
// 读取文件内容
ifs.read(&(*buf)[0], fsize);
if (!ifs.good()) {
std::cerr << "READ " << filename << " FILE FAILED!!" << std::endl;
ifs.close();
return false;
}
ifs.close();
return true;
}
// 向文件写入数据
static bool WriteFile(const std::string &filename, const std::string &buf) {
// 以二进制模式打开文件,并截断原有内容
std::ofstream ofs(filename, std::ios::binary | std::ios::trunc);
if (!ofs.is_open()) {
std::cerr << "OPEN " << filename << " FILE FAILED!!" << std::endl;
return false;
}
// 写入数据
ofs.write(buf.c_str(), buf.size());
if (!ofs.good()) {
std::cerr << "WRITE " << filename << " FILE FAILED!" << std::endl;
ofs.close();
return false;
}
ofs.close();
return true;
}
// URL 编码,避免 URL 中资源路径与查询字符串中的特殊字符与 HTTP 请求中特殊字符产生歧义
// 编码格式:将特殊字符的 ASCII 值,转换为两个十六进制字符,前缀 % C++ -> C%2B%2B
// 不编码的特殊字符:RFC3986 文档规定 . - _ ~ 字母,数字属于绝对不编码字符
// RFC3986 文档规定,编码格式 %HH
// W3C 标准中规定,查询字符串中的空格,需要编码为 +, 解码则是 + 转空格
// url: 待编码的 URL 字符串
// convert_space_to_plus: 是否将空格转换为 +
static std::string UrlEncode(const std::string url, bool convert_space_to_plus) {
std::string res;
for (char c : url) {
// 检查字符是否为不需要编码的字符
if (c == '.' || c == '-' || c == '_' || c == '~' || std::isalnum(c)) {
res += c;
continue;
}
// 处理空格
if (c == ' ' && convert_space_to_plus) {
res += '+';
continue;
}
// 对特殊字符进行编码
char tmp[4] = {0};
std::snprintf(tmp, 4, "%%%02X", static_cast<unsigned char>(c));
res += tmp;
}
return res;
}
// 将十六进制字符转换为对应的整数值
static char HEXTOI(char c) {
if (c >= '0' && c <= '9') {
return c - '0';
} else if (c >= 'a' && c <= 'z') {
return c - 'a' + 10;
} else if (c >= 'A' && c <= 'Z') {
return c - 'A' + 10;
}
return -1;
}
// URL 解码
static std::string UrlDecode(const std::string url, bool convert_plus_to_space) {
std::string res;
for (size_t i = 0; i < url.size(); ++i) {
// 处理 + 转换为空格
if (url[i] == '+' && convert_plus_to_space) {
res += ' ';
continue;
}
// 处理 % 编码的字符
if (url[i] == '%' && i + 2 < url.size()) {
char v1 = HEXTOI(url[i + 1]);
char v2 = HEXTOI(url[i + 2]);
if (v1 != -1 && v2 != -1) {
char v = static_cast<char>(v1 * 16 + v2);
res += v;
i += 2;
continue;
}
}
res += url[i];
}
return res;
}
// 响应状态码的描述信息获取
static std::string StatuDesc(int statu) {
static const std::unordered_map<int, std::string> _statu_msg = {
{200, "OK"},
{404, "Not Found"},
// 可以根据需要添加更多状态码描述
};
auto it = _statu_msg.find(statu);
if (it != _statu_msg.end()) {
return it->second;
}
return "Unknow";
}
// 根据文件后缀名获取文件 MIME 类型
static std::string ExtMime(const std::string &filename) {
static const std::unordered_map<std::string, std::string> _mime_msg = {
{".html", "text/html"},
{".css", "text/css"},
{".js", "application/javascript"},
// 可以根据需要添加更多 MIME 类型映射
};
// 获取文件扩展名
size_t pos = filename.find_last_of('.');
if (pos == std::string::npos) {
return "application/octet-stream";
}
std::string ext = filename.substr(pos);
auto it = _mime_msg.find(ext);
if (it == _mime_msg.end()) {
return "application/octet-stream";
}
return it->second;
}
// 判断一个文件是否是一个目录
static bool IsDirectory(const std::string &filename) {
struct stat st;
int ret = stat(filename.c_str(), &st);
if (ret < 0) {
return false;
}
return S_ISDIR(st.st_mode);
}
// 判断一个文件是否是一个普通文件
static bool IsRegular(const std::string &filename) {
struct stat st;
int ret = stat(filename.c_str(), &st);
if (ret < 0) {
return false;
}
return S_ISREG(st.st_mode);
}
// HTTP 请求的资源路径有效性判断
// 如/../login, 这个路径中的 .. 会让路径的查找跑到相对根目录之外
static bool ValidPath(const std::string &path) {
std::vector<std::string> subdir;
Split(path, "/", &subdir);
int level = 0;
for (const auto &dir : subdir) {
if (dir == "..") {
// 任意一层走出相对根目录,就认为有问题
level--;
if (level < 0) return false;
continue;
}
level++;
}
return true;
}
};
二、HttpRequest与HttpReponse类
此处不再做过多详解,详细请见:【Linux】网络编程:应用层协议—HTTP协议(超详细)_账号登录密码user:pass的协议-CSDN博客
class HttpRequest {
public:
std::string _method; //请求方法
std::string _path; //资源路径
std::string _version; //协议版本
std::string _body; //请求正文
std::smatch _matches; //资源路径的正则提取数据
std::unordered_map<std::string, std::string> _headers; //头部字段
std::unordered_map<std::string, std::string> _params; //查询字符串
public:
HttpRequest():_version("HTTP/1.1") {}
void ReSet() {
_method.clear();
_path.clear();
_version = "HTTP/1.1";
_body.clear();
std::smatch match;
_matches.swap(match);
_headers.clear();
_params.clear();
}
//插入头部字段
void SetHeader(const std::string &key, const std::string &val) {
_headers.insert(std::make_pair(key, val));
}
//判断是否存在指定头部字段
bool HasHeader(const std::string &key) const {
auto it = _headers.find(key);
if (it == _headers.end()) {
return false;
}
return true;
}
//获取指定头部字段的值
std::string GetHeader(const std::string &key) const {
auto it = _headers.find(key);
if (it == _headers.end()) {
return "";
}
return it->second;
}
//插入查询字符串
void SetParam(const std::string &key, const std::string &val) {
_params.insert(std::make_pair(key, val));
}
//判断是否有某个指定的查询字符串
bool HasParam(const std::string &key) const {
auto it = _params.find(key);
if (it == _params.end()) {
return false;
}
return true;
}
//获取指定的查询字符串
std::string GetParam(const std::string &key) const {
auto it = _params.find(key);
if (it == _params.end()) {
return "";
}
return it->second;
}
//获取正文长度
size_t ContentLength() const {
// Content-Length: 1234\r\n
bool ret = HasHeader("Content-Length");
if (ret == false) {
return 0;
}
std::string clen = GetHeader("Content-Length");
return std::stol(clen);
}
//判断是否是短链接
bool Close() const {
// 没有Connection字段,或者有Connection但是值是close,则都是短链接,否则就是长连接
if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive") {
return false;
}
return true;
}
};
class HttpResponse {
public:
int _statu;
bool _redirect_flag;
std::string _body;
std::string _redirect_url;
std::unordered_map<std::string, std::string> _headers;
public:
HttpResponse():_redirect_flag(false), _statu(200) {}
HttpResponse(int statu):_redirect_flag(false), _statu(statu) {}
void ReSet() {
_statu = 200;
_redirect_flag = false;
_body.clear();
_redirect_url.clear();
_headers.clear();
}
//插入头部字段
void SetHeader(const std::string &key, const std::string &val) {
_headers.insert(std::make_pair(key, val));
}
//判断是否存在指定头部字段
bool HasHeader(const std::string &key) {
auto it = _headers.find(key);
if (it == _headers.end()) {
return false;
}
return true;
}
//获取指定头部字段的值
std::string GetHeader(const std::string &key) {
auto it = _headers.find(key);
if (it == _headers.end()) {
return "";
}
return it->second;
}
void SetContent(const std::string &body, const std::string &type = "text/html") {
_body = body;
SetHeader("Content-Type", type);
}
void SetRedirect(const std::string &url, int statu = 302) {
_statu = statu;
_redirect_flag = true;
_redirect_url = url;
}
//判断是否是短链接
bool Close() {
// 没有Connection字段,或者有Connection但是值是close,则都是短链接,否则就是长连接
if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive") {
return false;
}
return true;
}
};
三、HttpContext类设计
该模块是⼀个HTTP请求接收的上下文模块,主要是为了防止在一次接收的数据中,不是一个完整的HTTP请求,则解析过程并未完成,无法进行完整的请求处理,需要在下次接收到新数据后继续根据上下文进⾏解析,最终得到⼀个HttpRequest请求信息对象,因此在请求数据的接收以及解析部分需要⼀个上下文来进⾏控制接收和处理节奏。
同样的,HttpContext类也采用了状态机的方式来严格控制函数执行流程。
HttpContext类使用状态机的目的是什么?
当我们对数据缓冲区中的HTTP请求进行处理时,可能会遇到如下情况:
1、接收缓冲区刚好有一条完整的HTTP请求,此时我们直接处理即可。
2、假设本次的请求大小为10kb,缓冲区当前只接收到了8kb的数据,显然这并不是一条完整的HTTP请求。因此我们HttpContext类会存储当前已经处理好的HTTP请求中的报文,并且将状态切换到需要继续执行的状态。例如,当前HttpContext已经处理到正文,但正文还未接收完毕,此时应为RECV_HTTP_BODY状态,直到整个请求接收完成后才会切换到RECV_HTTP_OVER状态。
3、假设当前接收缓冲区中只有1.5条请求。当我们正常处理完第一个请求时,会去处理第二个,但第二个请求只有一半,并不完整。假设已经处理完请求行,当前正在进行请求报头的解析,因此当前HttpContext状态为RECV_HTTP_HEAD,直到处理完请求报头后才会去继续处理请求正文。
使用状态机可以很好地标识当前HttpRequest的处理阶段,防止处理粘包问题或请求数据不完整问题时可能会造成的错误。
typedef enum {
RECV_HTTP_ERROR,
RECV_HTTP_LINE,
RECV_HTTP_HEAD,
RECV_HTTP_BODY,
RECV_HTTP_OVER
}HttpRecvStatu;
#define MAX_LINE 8192
class HttpContext {
private:
int _resp_statu; //响应状态码
HttpRecvStatu _recv_statu; //当前接收及解析的阶段状态
HttpRequest _request; //已经解析得到的请求信息
private:
bool ParseHttpLine(const std::string &line) {
std::smatch matches;
std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r\n)?", std::regex::icase);
bool ret = std::regex_match(line, matches, e);
if (ret == false) {
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400;//BAD REQUEST
return false;
}
//0 : GET /bitejiuyeke/login?user=xiaoming&pass=123123 HTTP/1.1
//1 : GET
//2 : /bitejiuyeke/login
//3 : user=xiaoming&pass=123123
//4 : HTTP/1.1
//请求方法的获取
_request._method = matches[1];
std::transform(_request._method.begin(), _request._method.end(), _request._method.begin(), ::toupper);
//资源路径的获取,需要进行URL解码操作,但是不需要+转空格
_request._path = Util::UrlDecode(matches[2], false);
//协议版本的获取
_request._version = matches[4];
//查询字符串的获取与处理
std::vector<std::string> query_string_arry;
std::string query_string = matches[3];
//查询字符串的格式 key=val&key=val....., 先以 & 符号进行分割,得到各个字串
Util::Split(query_string, "&", &query_string_arry);
//针对各个字串,以 = 符号进行分割,得到key 和val, 得到之后也需要进行URL解码
for (auto &str : query_string_arry) {
size_t pos = str.find("=");
if (pos == std::string::npos) {
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400;//BAD REQUEST
return false;
}
std::string key = Util::UrlDecode(str.substr(0, pos), true);
std::string val = Util::UrlDecode(str.substr(pos + 1), true);
_request.SetParam(key, val);
}
return true;
}
bool RecvHttpLine(Buffer *buf) {
if (_recv_statu != RECV_HTTP_LINE) return false;
//1. 获取一行数据,带有末尾的换行
std::string line = buf->GetLineAndPop();
//2. 需要考虑的一些要素:缓冲区中的数据不足一行, 获取的一行数据超大
if (line.size() == 0) {
//缓冲区中的数据不足一行,则需要判断缓冲区的可读数据长度,如果很长了都不足一行,这是有问题的
if (buf->ReadAbleSize() > MAX_LINE) {
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414;//URI TOO LONG
return false;
}
//缓冲区中数据不足一行,但是也不多,就等等新数据的到来
return true;
}
if (line.size() > MAX_LINE) {
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414;//URI TOO LONG
return false;
}
bool ret = ParseHttpLine(line);
if (ret == false) {
return false;
}
//首行处理完毕,进入头部获取阶段
_recv_statu = RECV_HTTP_HEAD;
return true;
}
bool RecvHttpHead(Buffer *buf) {
if (_recv_statu != RECV_HTTP_HEAD) return false;
//一行一行取出数据,直到遇到空行为止, 头部的格式 key: val\r\nkey: val\r\n....
while(1){
std::string line = buf->GetLineAndPop();
//2. 需要考虑的一些要素:缓冲区中的数据不足一行, 获取的一行数据超大
if (line.size() == 0) {
//缓冲区中的数据不足一行,则需要判断缓冲区的可读数据长度,如果很长了都不足一行,这是有问题的
if (buf->ReadAbleSize() > MAX_LINE) {
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414;//URI TOO LONG
return false;
}
//缓冲区中数据不足一行,但是也不多,就等等新数据的到来
return true;
}
if (line.size() > MAX_LINE) {
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414;//URI TOO LONG
return false;
}
if (line == "\n" || line == "\r\n") {
break;
}
bool ret = ParseHttpHead(line);
if (ret == false) {
return false;
}
}
//头部处理完毕,进入正文获取阶段
_recv_statu = RECV_HTTP_BODY;
return true;
}
bool ParseHttpHead(std::string &line) {
//key: val\r\nkey: val\r\n....
if (line.back() == '\n') line.pop_back();//末尾是换行则去掉换行字符
if (line.back() == '\r') line.pop_back();//末尾是回车则去掉回车字符
size_t pos = line.find(": ");
if (pos == std::string::npos) {
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400;//
return false;
}
std::string key = line.substr(0, pos);
std::string val = line.substr(pos + 2);
_request.SetHeader(key, val);
return true;
}
bool RecvHttpBody(Buffer *buf) {
if (_recv_statu != RECV_HTTP_BODY) return false;
//1. 获取正文长度
size_t content_length = _request.ContentLength();
if (content_length == 0) {
//没有正文,则请求接收解析完毕
_recv_statu = RECV_HTTP_OVER;
return true;
}
//2. 当前已经接收了多少正文,其实就是往 _request._body 中放了多少数据了
size_t real_len = content_length - _request._body.size();//实际还需要接收的正文长度
//3. 接收正文放到body中,但是也要考虑当前缓冲区中的数据,是否是全部的正文
// 3.1 缓冲区中数据,包含了当前请求的所有正文,则取出所需的数据
if (buf->ReadAbleSize() >= real_len) {
_request._body.append(buf->ReadPosition(), real_len);
buf->MoveReadOffset(real_len);
_recv_statu = RECV_HTTP_OVER;
return true;
}
// 3.2 缓冲区中数据,无法满足当前正文的需要,数据不足,取出数据,然后等待新数据到来
_request._body.append(buf->ReadPosition(), buf->ReadAbleSize());
buf->MoveReadOffset(buf->ReadAbleSize());
return true;
}
public:
HttpContext():_resp_statu(200), _recv_statu(RECV_HTTP_LINE) {}
void ReSet() {
_resp_statu = 200;
_recv_statu = RECV_HTTP_LINE;
_request.ReSet();
}
int RespStatu() { return _resp_statu; }
HttpRecvStatu RecvStatu() { return _recv_statu; }
HttpRequest &Request() { return _request; }
//接收并解析HTTP请求
void RecvHttpRequest(Buffer *buf) {
//不同的状态,做不同的事情,但是这里不要break, 因为处理完请求行后,应该立即处理头部,而不是退出等新数据
switch(_recv_statu) {
case RECV_HTTP_LINE: RecvHttpLine(buf);
case RECV_HTTP_HEAD: RecvHttpHead(buf);
case RECV_HTTP_BODY: RecvHttpBody(buf);
}
return;
}
};
四、HttpServer服务器类设计
HttpServer模块内部包含有⼀个hash-map表存储请求与处理函数的映射表:组件使用者向HttpServer设置哪些请求应该使用哪些函数进行处理,等TcpServer收到对应的请求就会调用对应的函数进行处理。
该类设计了 RESTful 风格的功能接口,并且使用正则表达式对请求关键字段进行匹配,便于用户快速搭建基于HTTP协议的服务器。
需要注意的是:针对每个HTTP请求中,包含Connection报头,该报头包含属性:
keep-alive:表示当前连接为长连接。如果设置了非活跃连接销毁,则为长连接添加定时任务。
close:表示当前连接为短链接,当前通信结束后应立即关闭该连接。
要实现简便的搭建HTTP服务器,所需要的要素和提供的功能要素:
1、GET请求的路由映射表 2、POST请求的路由映射表
3、PUT请求的路由映射表 4、DELETE请求的路由映射表
【注】路由映射表记录对应请求方法的请求的处理函数映射关系,更多是功能性请求的处理
5.静态资源相对根目录 --- 实现静态资源请求的处理
6.高性能TCP服务器 --- 进行连接的IO操作接口
服务器处理流程:
1)从socket接收数据,放到接收缓冲区
2)调用OnMessage回调函数进行业务处理
3)对请求进行解析,得到了一个HttpRequest结构,包含了所有的请求要素
4)进行请求的路由查找 : 找到对应请求的处理方法
4.1、静态资源请求 ——些实体文件资源的请求,html,image文件等,将静态资源文件的数据读取出来,填充到HttpResponse结构中
4.2、功能性请求 —— 在请求路由映射表中查找处理函数,找到了则执行响应函数进行具体的业务处理,并进行HttpResponse结构的数据填充
5)对静态资源请求/功能性请求进行处理完毕后,得到了一个填充了响应信息的HttpResponse对象,组织http格式响应,进行发送。
class HttpServer {
private:
using Handler = std::function<void(const HttpRequest &, HttpResponse *)>;
using Handlers = std::vector<std::pair<std::regex, Handler>>;
Handlers _get_route;
Handlers _post_route;
Handlers _put_route;
Handlers _delete_route;
std::string _basedir; //静态资源根目录
TcpServer _server;
private:
void ErrorHandler(const HttpRequest &req, HttpResponse *rsp) {
//1. 组织一个错误展示页面
std::string body;
body += "<html>";
body += "<head>";
body += "<meta http-equiv='Content-Type' content='text/html;charset=utf-8'>";
body += "</head>";
body += "<body>";
body += "<h1>";
body += std::to_string(rsp->_statu);
body += " ";
body += Util::StatuDesc(rsp->_statu);
body += "</h1>";
body += "</body>";
body += "</html>";
//2. 将页面数据,当作响应正文,放入rsp中
rsp->SetContent(body, "text/html");
}
//将HttpResponse中的要素按照http协议格式进行组织,发送
void WriteReponse(const PtrConnection &conn, const HttpRequest &req, HttpResponse &rsp) {
//1. 先完善头部字段
if (req.Close() == true) {
rsp.SetHeader("Connection", "close");
}else {
rsp.SetHeader("Connection", "keep-alive");
}
if (rsp._body.empty() == false && rsp.HasHeader("Content-Length") == false) {
rsp.SetHeader("Content-Length", std::to_string(rsp._body.size()));
}
if (rsp._body.empty() == false && rsp.HasHeader("Content-Type") == false) {
rsp.SetHeader("Content-Type", "application/octet-stream");
}
if (rsp._redirect_flag == true) {
rsp.SetHeader("Location", rsp._redirect_url);
}
//2. 将rsp中的要素,按照http协议格式进行组织
std::stringstream rsp_str;
rsp_str << req._version << " " << std::to_string(rsp._statu) << " " << Util::StatuDesc(rsp._statu) << "\r\n";
for (auto &head : rsp._headers) {
rsp_str << head.first << ": " << head.second << "\r\n";
}
rsp_str << "\r\n";
rsp_str << rsp._body;
//3. 发送数据
conn->Send(rsp_str.str().c_str(), rsp_str.str().size());
}
bool IsFileHandler(const HttpRequest &req) {
// 1. 必须设置了静态资源根目录
if (_basedir.empty()) {
return false;
}
// 2. 请求方法,必须是GET / HEAD请求方法
if (req._method != "GET" && req._method != "HEAD") {
return false;
}
// 3. 请求的资源路径必须是一个合法路径
if (Util::ValidPath(req._path) == false) {
return false;
}
// 4. 请求的资源必须存在,且是一个普通文件
// 有一种请求比较特殊 -- 目录:/, /image/, 这种情况给后边默认追加一个 index.html
// index.html /image/a.png
// 不要忘了前缀的相对根目录,也就是将请求路径转换为实际存在的路径 /image/a.png -> ./wwwroot/image/a.png
std::string req_path = _basedir + req._path;//为了避免直接修改请求的资源路径,因此定义一个临时对象
if (req._path.back() == '/') {
req_path += "index.html";
}
if (Util::IsRegular(req_path) == false) {
return false;
}
return true;
}
//静态资源的请求处理 --- 将静态资源文件的数据读取出来,放到rsp的_body中, 并设置mime
void FileHandler(const HttpRequest &req, HttpResponse *rsp) {
std::string req_path = _basedir + req._path;
if (req._path.back() == '/') {
req_path += "index.html";
}
bool ret = Util::ReadFile(req_path, &rsp->_body);
if (ret == false) {
return;
}
std::string mime = Util::ExtMime(req_path);
rsp->SetHeader("Content-Type", mime);
return;
}
//功能性请求的分类处理
void Dispatcher(HttpRequest &req, HttpResponse *rsp, Handlers &handlers) {
//在对应请求方法的路由表中,查找是否含有对应资源请求的处理函数,有则调用,没有则发挥404
//思想:路由表存储的时键值对 -- 正则表达式 & 处理函数
//使用正则表达式,对请求的资源路径进行正则匹配,匹配成功就使用对应函数进行处理
// /numbers/(\d+) /numbers/12345
for (auto &handler : handlers) {
const std::regex &re = handler.first;
const Handler &functor = handler.second;
bool ret = std::regex_match(req._path, req._matches, re);
if (ret == false) {
continue;
}
return functor(req, rsp);//传入请求信息,和空的rsp,执行处理函数
}
rsp->_statu = 404;
}
void Route(HttpRequest &req, HttpResponse *rsp) {
//1. 对请求进行分辨,是一个静态资源请求,还是一个功能性请求
// 静态资源请求,则进行静态资源的处理
// 功能性请求,则需要通过几个请求路由表来确定是否有处理函数
// 既不是静态资源请求,也没有设置对应的功能性请求处理函数,就返回405
if (IsFileHandler(req) == true) {
//是一个静态资源请求, 则进行静态资源请求的处理
return FileHandler(req, rsp);
}
if (req._method == "GET" || req._method == "HEAD") {
return Dispatcher(req, rsp, _get_route);
}else if (req._method == "POST") {
return Dispatcher(req, rsp, _post_route);
}else if (req._method == "PUT") {
return Dispatcher(req, rsp, _put_route);
}else if (req._method == "DELETE") {
return Dispatcher(req, rsp, _delete_route);
}
rsp->_statu = 405;// Method Not Allowed
return ;
}
//设置上下文
void OnConnected(const PtrConnection &conn) {
conn->SetContext(HttpContext());
DBG_LOG("NEW CONNECTION %p", conn.get());
}
//缓冲区数据解析+处理
void OnMessage(const PtrConnection &conn, Buffer *buffer) {
while(buffer->ReadAbleSize() > 0){
//1. 获取上下文
HttpContext *context = conn->GetContext()->get<HttpContext>();
//2. 通过上下文对缓冲区数据进行解析,得到HttpRequest对象
// 1. 如果缓冲区的数据解析出错,就直接回复出错响应
// 2. 如果解析正常,且请求已经获取完毕,才开始去进行处理
context->RecvHttpRequest(buffer);
HttpRequest &req = context->Request();
HttpResponse rsp(context->RespStatu());
if (context->RespStatu() >= 400) {
//进行错误响应,关闭连接
ErrorHandler(req, &rsp);//填充一个错误显示页面数据到rsp中
WriteReponse(conn, req, rsp);//组织响应发送给客户端
context->ReSet();
buffer->MoveReadOffset(buffer->ReadAbleSize());//出错了就把缓冲区数据清空
conn->Shutdown();//关闭连接
return;
}
if (context->RecvStatu() != RECV_HTTP_OVER) {
//当前请求还没有接收完整,则退出,等新数据到来再重新继续处理
return;
}
//3. 请求路由 + 业务处理
Route(req, &rsp);
//4. 对HttpResponse进行组织发送
WriteReponse(conn, req, rsp);
//5. 重置上下文
context->ReSet();
//6. 根据长短连接判断是否关闭连接或者继续处理
if (rsp.Close() == true) conn->Shutdown();//短链接则直接关闭
}
return;
}
public:
HttpServer(int port, int timeout = DEFALT_TIMEOUT):_server(port) {
_server.EnableInactiveRelease(timeout);
_server.SetConnectedCallback(std::bind(&HttpServer::OnConnected, this, std::placeholders::_1));
_server.SetMessageCallback(std::bind(&HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));
}
void SetBaseDir(const std::string &path) {
assert(Util::IsDirectory(path) == true);
_basedir = path;
}
/*设置/添加,请求(请求的正则表达)与处理函数的映射关系*/
void Get(const std::string &pattern, const Handler &handler) {
_get_route.push_back(std::make_pair(std::regex(pattern), handler));
}
void Post(const std::string &pattern, const Handler &handler) {
_post_route.push_back(std::make_pair(std::regex(pattern), handler));
}
void Put(const std::string &pattern, const Handler &handler) {
_put_route.push_back(std::make_pair(std::regex(pattern), handler));
}
void Delete(const std::string &pattern, const Handler &handler) {
_delete_route.push_back(std::make_pair(std::regex(pattern), handler));
}
void SetThreadCount(int count) {
_server.SetThreadCount(count);
}
void Listen() {
_server.Start();
}
};
使用WebBench对服务器进行压力测试
WebBench 是一个简单而高效的网站性能测试工具,由 Lionbridge 公司开发。它可以帮助我们评估 Web 服务器在不同负载下的性能表现,例如处理请求的能力、响应时间等。
下载源码
wget http://home.tiscali.cz/~cz210552/distfiles/webbench-1.5.tar.gz
解压源码
tar -zxvf webbench-1.5.tar.gz
cd webbench-1.5
编译和安装
make
sudo make install
基本使用方法
WebBench 的基本语法如下:
webbench [选项] [URL]
常用选项说明
-c
或--clients
:指定并发客户端的数量,即模拟同时访问网站的用户数量。例如,-c 100
表示模拟 100 个并发用户。-t
或--time
:指定测试的持续时间,单位为秒。例如,-t 60
表示测试持续 60 秒。-p
或--proxy
:指定使用的代理服务器,格式为[协议]://[代理地址]:[端口]
。-2
或--http0.9
:使用 HTTP 0.9 协议进行测试。-1
或--http1.0
:使用 HTTP 1.0 协议进行测试。--http1.1
:使用 HTTP 1.1 协议进行测试。-f
或--force
:即使服务器返回错误,也不停止测试。-r
或--reload
:在每次请求之间不保持连接,即每次请求都重新建立连接。
示例
简单测试
以下命令将模拟 100 个并发用户,对 http://example.com
网站进行 30 秒的性能测试:
webbench -c 100 -t 30 http://example.com
使用 HTTP 1.1 协议测试
webbench -c 200 -t 60 --http1.1 http://example.com
通过代理服务器进行测试
webbench -c 50 -t 45 -p http://proxy.example.com:8080 http://example.com
测试结果分析
WebBench 测试完成后会输出一系列测试结果信息,主要包括以下几个方面:
总请求数:表示在测试期间总共发送的请求数量。
成功请求数:表示成功处理的请求数量。
失败请求数:表示处理失败的请求数量。
每秒请求数(Requests/sec):这是一个重要的性能指标,反映了服务器在单位时间内能够处理的请求数量。数值越高,说明服务器的处理能力越强。
平均响应时间(Transfer/sec):表示每个请求的平均响应时间,单位通常为毫秒。响应时间越短,说明服务器的响应速度越快。
测试示例:
测试环境:
操作系统:Ubuntu 22.04;2核2GB的云服务器;1个主线程、3个从属线程,3000个客户端同时访问,运行1分钟。
测试结果:Speed=51861 pages/min, 131717 bytes/sec. Requests: 51861 susceed, 0 failed.
QPS为 864pages/sec,吞吐量为131717 bytes/sec
由于博主还未回学校,目前的电脑上没有安装虚拟机,测试环境有限。大家可以根据自己云服务器/虚拟机的核心数量和内存大小进行压力测试。
除了进行压力测试以外,我们还可以进行如下测试:
1、服务器对非活跃连接销毁的测试:
如对属性为keep-alive但长期不活跃的长连接,服务器是否能够及时销毁。
如对属性为close的短连接,服务器是否能够及时销毁。
2、一种场景:当某一就绪的连接业务处理超时时,导致该连接后续的已经就绪的连接同样超时。假设当当前连接的下一个就绪的事件就是定时器的读事件,此时会触发对非活跃连接进行销毁的定时任务。但此时服务器不应该释放该连接,因为该连接与当前连接后续的连接并非处于非活跃状态。所以我们将定时任务的执行时机统一放到EventLoop的任务池中进行处理。这样一来,当在Poller中已经就绪的连接即使因为业务处理超时,只要我们在其执行完业务处理后及时刷新定时任务,就能保证连接就绪事件的正常处理。
3、当前服务器能否正常处理粘包问题。能否解决当前请求不完善且等待后续信息到来请求完善后正常进行业务处理。
4、当接收超大数据量的请求时,能否正常处理。
篇幅有限,完整代码及测试用例存放至Gitee仓库中,有需要的老铁可以自取: