线程池的实现
目录
什么是线程池
线程池:
线程池的应用场景:
线程池的种类:
线程池示例:
线程池的实现
日志系统
1. 初始化阶段
2. 日志记录阶段
3. 线程安全保障
4. 输出格式
宏设计的核心优势
Thread.hpp
ThreadPool.hpp
核心功能与设计思路
代码结构分析
工作流程
设计亮点
Task.hpp
类设计与功能
设计亮点
Main.cc
测试程序功能分析
设计亮点
什么是线程池
线程池:
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景:
- 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个 Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限, 出现错误.
线程池的种类:
常见线程池类型包括FixedThreadPool(固定大小)、CachedThreadPool(动态缓存)、SingleThreadExecutor(单线程顺序执行)、ScheduledThreadPool(定时任务)和WorkStealingPool(工作窃取)。
线程池示例:
- 创建固定数量线程池,循环从任务队列中获取任务对象
- 获取到任务对象后,执行任务对象中的任务接口
线程池的实现
日志系统
在讲我们的线程池之前我们先引入一个日志系统,它可以帮助我们追踪程序的运行过程和执行状态。
代码部分
#pragma once#include <iostream> #include <string> #include <unistd.h> #include <sys/types.h> #include <ctime> #include <stdarg.h> #include <fstream> #include <string.h> #include <pthread.h>namespace log_ns {enum{DEBUG = 1,INFO,WARNING,ERROR,FATAL};std::string LevelToString(int level){switch (level){case DEBUG:return "DEBUG";case INFO:return "INFO";case WARNING:return "WARNING";case ERROR:return "ERROR";case FATAL:return "FATAL";default:return "UNKNOW";}}std::string GetCurrTime(){time_t now = time(nullptr);struct tm *curr_time = localtime(&now);char buffer[128];snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",curr_time->tm_year + 1900,curr_time->tm_mon + 1,curr_time->tm_mday,curr_time->tm_hour,curr_time->tm_min,curr_time->tm_sec);return buffer;}class logmessage{public:std::string _level;pid_t _id;std::string _filename;int _filenumber;std::string _curr_time;std::string _message_info;};#define SCREEN_TYPE 1#define FILE_TYPE 2const std::string glogfile = "./log.txt";pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;class Log{public:Log(const std::string &logfile = glogfile) : _logfile(logfile), _type(SCREEN_TYPE){}void Enable(int type){_type = type;}void FlushLogToScreen(const logmessage &lg){printf("[%s][%d][%s][%d][%s] %s",lg._level.c_str(),lg._id,lg._filename.c_str(),lg._filenumber,lg._curr_time.c_str(),lg._message_info.c_str());}void FlushLogToFile(const logmessage &lg){std::ofstream out(_logfile, std::ios::app);if (!out.is_open())return;char logtxt[2048];snprintf(logtxt, sizeof(logtxt), "[%s][%d][%s][%d][%s] %s",lg._level.c_str(),lg._id,lg._filename.c_str(),lg._filenumber,lg._curr_time.c_str(),lg._message_info.c_str());out.write(logtxt, strlen(logtxt));out.close();}void FlushLog(const logmessage &lg){pthread_mutex_lock(&glock);switch (_type){case SCREEN_TYPE:FlushLogToScreen(lg);break;case FILE_TYPE:FlushLogToFile(lg);break;}pthread_mutex_unlock(&glock);}void logMessage(std::string filename, int filenumber, int level, const char *format, ...){logmessage lg;lg._level = LevelToString(level);lg._id = getpid();lg._filename = filename;lg._filenumber = filenumber;lg._curr_time = GetCurrTime();va_list ap;va_start(ap, format);char log_info[1024];vsnprintf(log_info, sizeof(log_info), format, ap);va_end(ap);lg._message_info = log_info;// 打印出日志FlushLog(lg);}~Log(){}private:int _type;std::string _logfile;};Log lg;#define LOG(level, Format, ...) do {lg.logMessage(__FILE__, __LINE__, level, Format, ##__VA_ARGS__); }while (0)#define EnableScreen() do {lg.Enable(SCREEN_TYPE);}while(0)#define EnableFile() do {lg.Enable(FILE_TYPE);}while(0) }
日志系统的门道没有那么多,我只需要将核心部分讲一下就可以了。首先我们要打印运行的信息,那么我们运行的状态好坏就要标明出来:
enum{DEBUG = 1,INFO,WARNING,ERROR,FATAL};
DEBUG
:调试信息(值为 1)INFO
:普通信息(值为 2)WARNING
:警告信息(值为 3)ERROR
:错误信息(值为 4)FATAL
:致命错误(值为 5)std::string LevelToString(int level)
这个函数没什么好说的,就是把我们的日志等级转换成字符串类型方便我们观察
std::string GetCurrTime(){time_t now = time(nullptr);struct tm *curr_time = localtime(&now);char buffer[128];snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",curr_time->tm_year + 1900,curr_time->tm_mon + 1,curr_time->tm_mday,curr_time->tm_hour,curr_time->tm_min,curr_time->tm_sec);return buffer;}
这段代码定义了一个名为`GetCurrTime`的函数,用于获取并返回当前系统时间的字符串表示。它首先调用`time()`获取当前时间戳,再通过`localtime()`将其转换为本地时间结构体`tm`,包含年、月、日等信息。随后使用`snprintf()`将这些信息格式化为`YYYY-MM-DD HH:MM:SS`的字符串(如`2023-10-15 14:30:45`),存储在临时字符数组`buffer`中。最后通过隐式转换`std::string(buffer)`将其转换为C++字符串对象返回,这样做确保了内存安全,避免了直接返回局部数组可能导致的悬空指针问题。整个过程简洁高效,适用于日志记录、时间戳生成等场景。
class logmessage{public:std::string _level;pid_t _id;std::string _filename;int _filenumber;std::string _curr_time;std::string _message_info;};
这个类实际上就是我们的日志属性结构体,帮助我们更好的管理它的属性。
_level
:日志级别(如 "INFO", "WARNING", "ERROR" 等)_id
:进程 ID(PID),标识产生日志的进程_filename
:文件名,通常记录日志产生的源文件_filenumber
:文件行号,指示日志产生的代码位置_curr_time
:日志产生的时间戳_message_info
:具体的日志消息内容#define SCREEN_TYPE 1#define FILE_TYPE 2
这段宏定义是两个选项,让用户自主选择将日志信息打印到屏幕还是文件上。
const std::string glogfile = "./log.txt";pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;
这段代码表示的是如果我们要把打印信息放在文件里的话,我们要放在哪个文件,还有我们要设置一把锁,每次只允许一个线程对日志进行操作。
接下来就是我们上层操作的日志类了。
class Log{public:Log(const std::string &logfile = glogfile) : _logfile(logfile), _type(SCREEN_TYPE){}void Enable(int type){_type = type;}void FlushLogToScreen(const logmessage &lg){printf("[%s][%d][%s][%d][%s] %s",lg._level.c_str(),lg._id,lg._filename.c_str(),lg._filenumber,lg._curr_time.c_str(),lg._message_info.c_str());}void FlushLogToFile(const logmessage &lg){std::ofstream out(_logfile, std::ios::app);if (!out.is_open())return;char logtxt[2048];snprintf(logtxt, sizeof(logtxt), "[%s][%d][%s][%d][%s] %s",lg._level.c_str(),lg._id,lg._filename.c_str(),lg._filenumber,lg._curr_time.c_str(),lg._message_info.c_str());out.write(logtxt, strlen(logtxt));out.close();}void FlushLog(const logmessage &lg){pthread_mutex_lock(&glock);switch (_type){case SCREEN_TYPE:FlushLogToScreen(lg);break;case FILE_TYPE:FlushLogToFile(lg);break;}pthread_mutex_unlock(&glock);}void logMessage(std::string filename, int filenumber, int level, const char *format, ...){logmessage lg;lg._level = LevelToString(level);lg._id = getpid();lg._filename = filename;lg._filenumber = filenumber;lg._curr_time = GetCurrTime();va_list ap;va_start(ap, format);char log_info[1024];vsnprintf(log_info, sizeof(log_info), format, ap);va_end(ap);lg._message_info = log_info;// 打印出日志FlushLog(lg);}~Log(){}private:int _type;std::string _logfile;};Log lg;
日志系统的执行流程可以概括为以下几个关键步骤:
1. 初始化阶段
- 创建日志对象:通过全局变量
Log lg;
创建单例日志实例,默认输出到屏幕(SCREEN_TYPE
)。- 设置日志文件:构造函数接收日志文件路径(默认使用
glogfile
),但未立即打开文件。2. 日志记录阶段
当调用
lg.logMessage(...)
时:
创建日志消息对象:
- 自动获取当前进程 ID(
getpid()
)。- 调用
GetCurrTime()
获取时间戳。- 通过
LevelToString()
将整数级别转换为字符串(如 "INFO")。- 保存文件名、行号和用户传入的格式化消息。
格式化日志内容:
- 使用
va_list
和vsnprintf
处理可变参数,生成格式化的日志文本。输出日志:
- 调用
FlushLog()
根据_type
决定输出目标:
- 屏幕:调用
FlushLogToScreen()
,使用printf
打印。- 文件:调用
FlushLogToFile()
,每次打开文件、写入日志、关闭文件。3. 线程安全保障
- 在
FlushLog()
中使用pthread_mutex
加锁,确保多线程环境下日志不会交错。- 锁的范围覆盖整个输出过程(包括屏幕打印和文件 IO)。
4. 输出格式
日志统一格式为
[LEVEL][PID][FILE][LINE][TIME] MESSAGE
#define LOG(level, Format, ...) do {lg.logMessage(__FILE__, __LINE__, level, Format, ##__VA_ARGS__); }while (0)#define EnableScreen() do {lg.Enable(SCREEN_TYPE);}while(0)#define EnableFile() do {lg.Enable(FILE_TYPE);}while(0)
代码定义了三个宏,为日志系统提供了更便捷的接口:
LOG
宏
- 功能:简化日志记录,自动捕获当前文件路径和行号。
- 机制:
- 使用
__FILE__
和__LINE__
预处理器宏获取源码位置。- 通过
##__VA_ARGS__
支持可变参数,兼容printf
风格的格式化字符串。- 采用
do {...} while (0)
结构确保宏作为单个语句执行,避免分号或嵌套条件语句导致的错误。
EnableScreen
宏
- 功能:启用控制台输出。
- 机制:调用
lg.Enable(SCREEN_TYPE)
,将日志定向到屏幕。- 设计目的:提供简洁接口,避免直接操作全局日志对象。
EnableFile
宏
- 功能:启用文件输出。
- 机制:调用
lg.Enable(FILE_TYPE)
,将日志定向到文件。- 设计目的:与
EnableScreen
对称,统一配置方式。宏设计的核心优势
- 自动化上下文:
LOG
自动捕获源码位置,减少手动输入。- 安全性:
do-while
结构确保宏在任何场景下正确展开(如作为if
语句的子句)。- 易用性:用户无需关心底层
Log
类实现,直接使用类似printf
的语法。Thread.hpp
#pragma once #include <iostream> #include <unistd.h> #include <string> #include <functional> #include <pthread.h>namespace ThreadMudle {using func_t = std::function<void(const std::string &)>;class Thread{public:void Excute(){_isrunning = true;_func(_name);_isrunning = false;}public:Thread(const std::string &name, func_t func): _name(name), _func(func){}static void *ThreadRoutine(void *args) // 新线程都会执行该方法{Thread *self = static_cast<Thread *>(args); // 获得了当前对象self->Excute(); // 调用回调函数func的方法return nullptr;}bool Start() // 启动线程{//::强调此调用为系统调用int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);if (n != 0)return false; // 创建失败返回falsereturn true;}std::string Status() // 获取当前状态{if (_isrunning)return "running";elsereturn "sleep";}void Stop() // 中止线程{if (_isrunning){::pthread_cancel(_tid);_isrunning = false;}}void Join() // 等待回收线程{::pthread_join(_tid, nullptr);}std::string Name() // 返回线程的名字{return _name;}~Thread(){}private:std::string _name; // 线程名pthread_t _tid; // 线程idbool _isrunning; // 线程是否在运行func_t _func; // 线程要执行的回调函数}; }
我们将linux的线程封装成类,方便我们更好地操作,它是使用 POSIX 线程库(pthread)实现。以下是对其执行过程的文字介绍:
类结构:
Thread
类封装了线程的创建、启动、状态查询和终止操作。- 成员变量包括线程名称、线程 ID、运行状态标志和回调函数。
构造函数:
- 初始化线程名称和回调函数。
- 此时线程尚未创建,状态标志为未运行。
启动线程:
- 调用
Start()
方法,内部使用pthread_create
创建新线程。- 指定静态方法
ThreadRoutine
作为线程入口点,并传入当前对象指针。线程执行体:
ThreadRoutine
通过传入的对象指针调用Excute()
方法。Excute()
设置状态标志为运行中,执行回调函数,执行完毕后设置状态标志为未运行。状态查询:
Status()
方法返回线程状态("running" 或 "sleep"),依赖状态标志判断。终止线程:
Stop()
方法调用pthread_cancel
发送取消请求,并直接设置状态标志为未运行。等待线程结束:
Join()
方法调用pthread_join
阻塞当前线程,直到目标线程结束。ThreadPool.hpp
我先展示完整代码再慢慢说细节:
#pragma once#include <iostream> #include <unistd.h> #include <string> #include <unistd.h> #include <vector> #include <functional> #include <queue> #include <pthread.h> #include "Thread.hpp" #include "Log.hpp" using namespace log_ns;using namespace ThreadMudle; // 开放封装好的线程的命名空间static const int gdefaultnum = 5; // 线程池的个数template <typename T> class ThreadPool { private:void LockQueue(){pthread_mutex_lock(&_mutex);}void UnlockQueue(){pthread_mutex_unlock(&_mutex);}void Wakeup(){pthread_cond_signal(&_cond);}void WakeupAll(){pthread_cond_broadcast(&_cond);}void Sleep(){pthread_cond_wait(&_cond, &_mutex);}bool IsEmpty(){return _task_queue.empty();}// 处理任务void HandlerTask(const std::string &name) // this{while (true){// 取任务LockQueue(); // 给任务队列上锁while (IsEmpty() && _isrunning) // 如果这个线程还在运行任务且任务队列为空,就让线程去休息{_sleep_thread_num++;LOG(INFO, "%s thread sleep begin!\n", name.c_str());Sleep();LOG(INFO, "%s thread wakeup!\n", name.c_str());_sleep_thread_num--;}// 判定一种情况if (IsEmpty() && !_isrunning) // 如果任务为空且线程不处于运行状态就可以让这个线程退出了{UnlockQueue();LOG(INFO, "%s thread quit\n", name.c_str());break;}// 有任务T t = _task_queue.front();_task_queue.pop();UnlockQueue();// 处理任务t(); // 处理任务,此处不用/不能再临界区中处理// std::cout << name << ": " << t.result() << std::endl;LOG(DEBUG, "hander task done, task is : %s\n", t.result().c_str());}}void Init() // 创建线程{func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);for (int i = 0; i < _thread_num; i++){std::string threadname = "thread-" + std::to_string(i + 1);_threads.emplace_back(threadname, func);LOG(DEBUG, "construct thread obj %s done, init sucess\n", threadname.c_str());}}void Start() // 复用封装好的线程类里面的Start方法{_isrunning = true;for (auto &thread : _threads){LOG(DEBUG, "start thread %s done.\n", thread.Name().c_str());thread.Start();}}ThreadPool(int thread_num = gdefaultnum): _thread_num(thread_num), _isrunning(false), _sleep_thread_num(0){// 创建锁和条件变量pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}ThreadPool(const ThreadPool<T> &t) = delete;void operator=(const ThreadPool<T> &t) = delete;public:void Stop() // 停止执行任务{LockQueue();_isrunning = false;WakeupAll();UnlockQueue();LOG(INFO, "thread Pool Stop Success!\n");}static ThreadPool<T> *GetInstance(){if (_tp == nullptr){pthread_mutex_lock(&_sig_mutex);if (_tp == nullptr){LOG(INFO, "creat threadpool\n");_tp = new ThreadPool<T>();_tp->Init();_tp->Start();}else{LOG(INFO, "get threadpool\n");}pthread_mutex_unlock(&_sig_mutex);}return _tp;}void Equeue(const T &in) // 生产任务{LockQueue();if (_isrunning){_task_queue.push(in);if (_sleep_thread_num > 0){Wakeup(); // 唤醒之前Sleep的线程}}UnlockQueue();}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:int _thread_num; // 线程个数std::vector<Thread> _threads; // 用顺序表来存储线程std::queue<T> _task_queue; // 用队列来存储任务数据bool _isrunning; // 线程的运行状态int _sleep_thread_num; // 没有执行任务的线程数量pthread_mutex_t _mutex; // 锁pthread_cond_t _cond; // 条件变量// 单例模式volatile static ThreadPool<T> *_tp;static pthread_mutex_t _sig_mutex; };template <typename T> volatile ThreadPool<T> *ThreadPool<T>::_tp = nullptr; template <typename T> pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;
核心功能与设计思路
单例模式实现
- 使用双重检查锁定(Double-Checked Locking)机制确保线程池实例的唯一性
- 通过静态方法
GetInstance()
获取线程池实例,延迟初始化任务处理机制
- 使用
std::queue<T>
存储待处理任务,支持线程安全的任务入队(Equeue
)和出队操作- 线程通过条件变量(
pthread_cond_t
)实现等待和唤醒机制,避免忙等待线程管理
- 线程池初始化时创建固定数量的工作线程
- 支持线程池的启动(
Start
)和停止(Stop
)操作- 记录休眠线程数量(
_sleep_thread_num
),优化任务调度代码结构分析
私有方法
- 锁操作封装:
LockQueue()
、UnlockQueue()
- 条件变量操作:
Wakeup()
、WakeupAll()
、Sleep()
- 任务处理核心逻辑:
HandlerTask()
- 线程池初始化:
Init()
、Start()
公共接口
- 单例获取:
GetInstance()
- 任务入队:
Equeue()
- 线程池停止:
Stop()
关键成员变量
_task_queue
:任务队列,存储待处理的任务_isrunning
:线程池运行状态标志_mutex
和_cond
:保护任务队列的互斥锁和条件变量_sig_mutex
:单例模式创建实例时使用的互斥锁工作流程
线程池初始化
- 首次调用
GetInstance()
时创建线程池实例- 初始化固定数量的工作线程,线程启动后进入
HandlerTask
循环任务处理流程
- 外部通过
Equeue()
向任务队列添加任务- 工作线程从队列获取任务:
- 若队列为空且线程池运行中,线程进入休眠状态
- 若队列为空且线程池停止,线程退出循环
- 若队列有任务,取出任务并执行
线程池停止流程
- 调用
Stop()
设置运行标志为false
- 唤醒所有休眠线程,线程检查状态后退出
设计亮点
线程安全实现
- 使用互斥锁保护任务队列的读写操作
- 条件变量实现高效的线程等待和唤醒
资源管理
- 使用 RAII 原则管理锁和条件变量的生命周期
- 线程池析构时销毁锁和条件变量
任务执行优化
- 任务处理与队列操作分离,减少临界区范围
- 优先唤醒休眠线程处理新任务
Task.hpp
#pragma once#include <iostream> #include <functional>// using task_t = std::function<void()>;// void Download() // { // std::cout << "我是一个下载的任务" << std::endl; // }// 做加法 class Task { public:Task(int x, int y): _x(x), _y(y){}Task(){}void operator()(){Excute();}void Excute(){_result = _x + _y;}std::string debug(){std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";return msg;}std::string result(){std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);return msg;}private:int _x;int _y;int _result; };
这段代码定义了一个名为
Task
的类,它代表一个可以在线程池中执行的任务。以下是对该类的详细分析:类设计与功能
Task
类是一个可调用对象(Functor),它封装了一个简单的加法运算。主要功能包括:
数据成员:
_x
和_y
:需要相加的两个整数_result
:存储计算结果构造函数:
- 带参数的构造函数:初始化
_x
和_y
- 默认构造函数:未初始化成员变量(可能导致未定义行为)
核心功能:
operator()
:重载函数调用运算符,使 Task 对象可直接调用Excute()
:执行加法运算,将结果存入_result
debug()
:返回运算表达式字符串(如 "3+4=?")result()
:返回带结果的字符串(如 "3+4=7")设计亮点
可调用对象设计:
- 通过重载
operator()
,Task 对象可以像函数一样被调用,符合线程池对任务类型的要求- 这种设计使得 Task 可以无缝集成到之前提供的线程池实现中
结果处理:
- 分离计算过程和结果展示,提高了代码的可维护性
- 提供格式化的结果输出,方便日志记录和调试
Main.cc
#include "ThreadPool.hpp" #include "Task.hpp" #include "Thread.hpp" #include <memory> #include "Log.hpp" using namespace log_ns;int main() {EnableScreen();int cnt = 10;while (cnt){// 不断的向线程池推送任务sleep(1);Task t(1, 1);ThreadPool<Task>::GetInstance()->Equeue(t);LOG(INFO, "equeue a task, %s\n", t.debug().c_str());sleep(1);cnt--;}ThreadPool<Task>::GetInstance()->Stop();LOG(INFO, "thread pool stop!\n");// sleep(10);return 0; }
这段代码是线程池的测试程序,主要功能是向线程池发送任务并验证其工作流程。以下是对该测试程序的详细分析:
测试程序功能分析
初始化与配置:
- 启用屏幕日志输出(
EnableScreen()
)- 创建线程池单例实例(通过
ThreadPool<Task>::GetInstance()
)任务生成与提交:
- 循环 10 次,每次间隔 1 秒生成一个加法任务(固定参数 1+1)
- 将任务提交到线程池(
Equeue(t)
)- 记录任务提交日志
线程池管理:
- 所有任务提交完成后,调用
Stop()
停止线程池- 记录线程池停止日志
设计亮点
简单直观的测试流程:
- 通过固定参数的任务简化测试逻辑
- 适当的延时使日志输出更易于观察
资源管理:
- 利用单例模式确保线程池资源正确初始化和释放
- 显式调用
Stop()
方法清理线程资源运行截图: