【Linux】线程池——详细讲解
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
一、为什么需要线程池?
❌ 传统多线程的问题:
✅ 线程池的解决方案:
二、线程池核心设计解析
1. 整体架构
2. 类定义详解
三、关键技术实现深度解析
1. 线程例程(Routine) - 核心工作循环
2. 线程池启动与停止
3. 任务投递机制
四、线程封装类详解
Thread类实现
五、同步机制实现
1. 互斥锁封装
2. 条件变量封装
3. 锁守卫(RAII模式)
六、使用示例
1. 定义任务类型
2. 创建和使用线程池
一、为什么需要线程池?
在现代服务器开发中,高并发处理是核心需求。想象一个Web服务器,每个请求都创建一个新线程来处理:
❌ 传统多线程的问题:
cpp
// 为每个请求创建新线程 void handle_request(int client_fd) {pthread_t tid;pthread_create(&tid, nullptr, process_request, &client_fd);// 问题:频繁创建销毁线程开销巨大! }
✅ 线程池的解决方案:
cpp
// 使用线程池处理请求 void handle_request(int client_fd) {ThreadPool pool(10); // 创建10个线程的池子pool.Enqueue([client_fd](){ process_request(client_fd); });// 优势:线程复用,避免频繁创建销毁 }
二、线程池核心设计解析
1. 整体架构
https://img-blog.csdnimg.cn/direct/thread_pool_workflow.png
三大核心组件:
-
🧵 工作线程:实际执行任务的线程
-
📦 任务队列:存储待处理的任务
-
🔒 同步机制:保证线程安全访问
2. 类定义详解
cpp
template <class T> class ThreadPool { private:std::queue<T> _q; // 任务队列std::vector<Thread> _threads; // 工作线程集合Mutex _lock; // 互斥锁Cond _cond; // 条件变量bool _is_running; // 线程池运行状态int _wait_thread_num; // 等待任务的线程数 };
三、关键技术实现深度解析
1. 线程例程(Routine) - 核心工作循环
cpp
void Routine(const std::string &name) {while (true){T task;{// 1. 加锁访问任务队列LockGuard lockguard(&_lock);// 2. 等待条件:队列不为空或线程池停止while (QueueIsEmpty() && _is_running){_wait_thread_num++;_cond.Wait(_lock); // 释放锁并等待_wait_thread_num--;}// 3. 退出条件检查if (!_is_running && QueueIsEmpty()){LOG(LogLevel::INFO) << "线程池退出 && 任务队列为空, " << name << " 退出";break;}// 4. 获取任务task = _q.front();_q.pop();} // 锁在这里自动释放// 5. 执行任务(不在临界区内!)task();LOG(LogLevel::DEBUG) << name << " 处理任务: " << task.Result2String();} }
关键设计要点:
-
🔒 锁范围最小化:只在访问共享数据时加锁
-
⏰ 条件变量等待:避免忙等待,节省CPU资源
-
🚀 任务执行外移:在锁外执行任务,提高并发性
2. 线程池启动与停止
启动线程池:
cpp
void Start() {if (_is_running) return;_is_running = true;for (auto &t : _threads){t.Start(); // 启动所有工作线程}LOG(LogLevel::INFO) << "线程池启动成功"; }
优雅停止线程池:
cpp
void Stop() {if (!_is_running) return;_is_running = false;// 唤醒所有等待的线程if (_wait_thread_num > 0)_cond.NotifyAll();LOG(LogLevel::INFO) << "线程池停止信号已发送"; }
等待线程结束:
cpp
void Wait() {for (auto &t : _threads){t.Join(); // 等待所有线程结束}LOG(LogLevel::INFO) << "所有线程已回收"; }
3. 任务投递机制
cpp
void EnqueuePush(const T &task) {if (!_is_running) return;{LockGuard lockguard(&_lock); // 加锁_q.push(task); // 任务入队// 如果有线程在等待,唤醒一个if (_wait_thread_num > 0)_cond.NotifyOne();} // 自动释放锁 }
四、线程封装类详解
Thread类实现
cpp
class Thread { public:Thread(func_t func, const std::string &name = "None-name"): _name(name), _func(func), _isrunning(false){LOG(LogLevel::INFO) << _name << " 线程对象创建成功";}static void *start_routine(void *args){Thread *self = static_cast<Thread *>(args);self->_isrunning = true;self->_lwpid = get_lwp_id(); // 获取系统线程IDself->_func(self->_name); // 执行线程函数pthread_exit((void *)0);}void Start(){pthread_create(&_tid, nullptr, start_routine, this);LOG(LogLevel::INFO) << _name << " 线程启动成功";}void Join(){if (!_isrunning) return;pthread_join(_tid, nullptr);LOG(LogLevel::INFO) << _name << " 线程回收成功";}private:bool _isrunning;pthread_t _tid;pid_t _lwpid;std::string _name;func_t _func; };
五、同步机制实现
1. 互斥锁封装
cpp
class Mutex { public:Mutex() { pthread_mutex_init(&_mutex, nullptr); }~Mutex() { pthread_mutex_destroy(&_mutex); }void Lock() { pthread_mutex_lock(&_mutex); }void Unlock() { pthread_mutex_unlock(&_mutex); }pthread_mutex_t* Get() { return &_mutex; }private:pthread_mutex_t _mutex; };
2. 条件变量封装
cpp
class Cond { public:Cond() { pthread_cond_init(&_cond, nullptr); }~Cond() { pthread_cond_destroy(&_cond); }void Wait(Mutex& mutex) {pthread_cond_wait(&_cond, mutex.Get());}void NotifyOne() { pthread_cond_signal(&_cond); }void NotifyAll() { pthread_cond_broadcast(&_cond); }private:pthread_cond_t _cond; };
3. 锁守卫(RAII模式)
cpp
class LockGuard { public:explicit LockGuard(Mutex* mutex) : _mutex(mutex) {_mutex->Lock();}~LockGuard() {_mutex->Unlock();}private:Mutex* _mutex; };
六、使用示例
1. 定义任务类型
cpp
// 任务类示例 class Task { public:void operator()() {// 执行具体任务std::cout << "处理任务..." << std::endl;}std::string Result2String() const {return "任务完成";} };
2. 创建和使用线程池
cpp
int main() {// 创建包含5个线程的线程池ThreadPool<Task> pool(5);// 启动线程池pool.Start();// 投递10个任务for (int i = 0; i < 10; ++i) {pool.EnqueuePush(Task());usleep(100000); // 100ms间隔}// 优雅停止pool.Stop();pool.Wait();return 0; }