Linux:线程池
文章目录
- 一、线程池的基本认识
- 二、线程池的实现思路
- 三、代码实现详解
- 1. 类定义与成员
- 2. 线程任务函数HandlerTask
- 3. 启动
- 四、运行效果
- 五、单例模式下的线程池
- 1. 什么是单例模式?
- 六、💕💕(`看这里!!!`)💕💕线程池的单例实现
- 1. 怎么将线程池改为单例模式
- 2. 单例的线程池怎么启动?
- 3. 线程池怎么去执行任务?
- 4. 线程池怎么样去退出?
- 5. 线程池完整代码:
- 6. 运行结果
一、线程池的基本认识
线程池是一种经典的线程使用模式。为什么需要线程池?如果每来一个任务就创建一个线程,短时间内可能会产生大量线程,结果就是调度开销大,CPU 花更多时间在调度而不是执行任务上,同时还会破坏缓存局部性,降低整体性能。
线程池的思路就是提前创建一批线程,让它们在后台等待,有任务来了直接分配给空闲线程去执行。这样就避免了频繁创建与销毁线程的代价。
线程池的优势可以总结为:
- 避免线程创建/销毁开销 —— 对于短任务尤其明显。
- 提高资源利用率 —— 控制线程数不超过 CPU 核心数,避免系统因为线程爆炸而崩溃。
- 响应更快 —— 线程已经在等待队列中,来任务立即执行。
适用场景:
- Web 服务器处理网页请求(单个任务短,但任务量巨大)。
- 对性能要求很高,需要快速响应的场景。
- 突发性任务激增,但又不希望线程数无限增加的应用。
二、线程池的实现思路
常见的线程池有两种:
- 固定大小线程池:初始化时创建固定数量的线程,它们不断从任务队列中取任务来执行。
- 动态线程池:根据任务负载自动扩展或回收线程。
在这里,我们选择固定线程数的线程池,实现一个通用的 ThreadPool
模板类。
三、代码实现详解
下面的代码是一个完整的 C++17 线程池实现(基于 pthread,自己封装了 Thread、Mutex、Cond 等类)。
1. 类定义与成员
template <typename T>
class ThreadPool
{
private:int _threadnum; // 线程数std::vector<Thread> _threads; // 工作线程容器std::queue<T> _task_queue; // 任务队列Mutex _mutex; // 互斥锁,保证任务队列安全Cond _cond; // 条件变量,支持线程等待/唤醒int _waitnum; // 等待中的线程数bool _isrunning; // 线程池运行状态
_task_queue
用来存放外部提交的任务。_mutex
和_cond
保证任务队列的线程安全和线程同步。_threads
保存固定数量的线程对象。
2. 线程任务函数HandlerTask
这是线程池的核心逻辑,每个线程启动后都会执行这个函数:
void HandlerTask()
{std::string name = GetThreadNameFromNptl();LOG(LogLevel::INFO) << name << " is running...";while (true){_mutex.Lock();while (_task_queue.empty() && _isrunning){_waitnum++;_cond.Wait(_mutex); // 没任务就等待_waitnum--;}// 情况1:线程池关闭且队列为空 -> 退出if (_task_queue.empty() && !_isrunning){_mutex.Unlock();break;}// 情况2:有任务 -> 取任务T t = _task_queue.front();_task_queue.pop();_mutex.Unlock();LOG(LogLevel::DEBUG) << name << " get a task";t(); // 执行任务}
}
这里有几点关键:
- 任务等待:当任务队列为空时,线程进入
_cond.Wait()
状态,避免 CPU 空转。 - 退出机制:线程池停止时,所有线程会在完成任务后有序退出。
- 独占任务:任务被取出后由某个线程独立执行,互不干扰。
3. 启动
void Start()
{_isrunning = true;for (auto &thread : _threads){thread.Start();LOG(LogLevel::INFO) << "start thread " << thread.Name() << " done";}
}
四、运行效果
编译运行:
g++ main.cc -std=c++17 -lpthread
./a.out
日志输出示例:
[INFO] ThreadPool Construct()
[INFO] init thread Thread-0 done
[INFO] init thread Thread-1 done
[INFO] start thread Thread-0 done
[INFO] Thread-0 is running...
[DEBUG] 任务入队列成功
[DEBUG] Thread-0 get a task
this is a task
[DEBUG] 线程池退出中...
[INFO] Thread-0 退出...
[INFO] Thread-1 退出...
可以看到:线程池成功初始化线程,提交任务后线程执行,最后退出。
五、单例模式下的线程池
1. 什么是单例模式?
- 什么是单例模式
单例模式指的是:一个类在整个进程生命周期内只能存在一个实例,并且需要对外提供全局唯一的访问接口。
例如:
一个进程中的日志系统,通常只需要一个全局日志对象。
一个游戏中的配置管理器,只需要一个实例负责读取和管理配置文件。
本文的线程池,也应该全局唯一。
- 单例模式的两种实现方式
饿汉模式:类一加载就创建单例对象,之后直接使用即可。
懒汉模式:第一次调用 GetInstance() 时才创建单例对象,延迟加载,节省资源。
这里我们选择懒汉模式,因为懒汉模式可以在创建单例对象前,线程可以拿这个单例做其他事情,从而提高效率,适用于单例本身比较大的场景
六、💕💕(看这里!!!
)💕💕线程池的单例实现
1. 怎么将线程池改为单例模式
- 构造函数私有化
- 禁止拷贝构造和赋值构造
private:ThreadPool(int num = gnum): _num(num){for (int i = 0; i < _num; i++){_threads.emplace_back([this](){HandlerTask();});}}ThreadPool(const ThreadPool &) = delete;ThreadPool &operator=(const ThreadPool &) = delete;
- 在外部创建单例指针与单例锁
private:std::vector<Thread> _threads;int _num; // 线程池中线程的个数std::queue<T> _taskq; // 任务队列Mutex _mutex; // 互斥锁Cond _cond; // 条件变量static ThreadPool<T> *_inc; // 单例指针static Mutex _lock; // 单例锁bool _isrunning;int _sleepernum;};template <typename T>ThreadPool<T> *ThreadPool<T>::_inc = nullptr;template <typename T>Mutex ThreadPool<T>::_lock;
2. 单例的线程池怎么启动?
- 通过静态的static ThreadPool *GetInstance()来启动
static ThreadPool<T> *GetInstance(){if (_inc == nullptr) // 这里判断是因为多个生产者线程避免都等待这把锁导致效率降低,因此加锁{LockGuard lock(_lock); // 这里加锁是因为如果有多个生产者线程同时调用GetInstance()函数,可能会造成多个线程同时创建单例,因此加锁LOG(LogLevel::DEBUG) << "获取单例...";if (_inc == nullptr) // 此处判断的原因是如果创建单例了,就直接返回不应再创建了{LOG(LogLevel::DEBUG) << "创建单例...";_inc = new ThreadPool<T>();_inc->Start();}}return _inc;}
- 进入Start后按个让线程启动
// 启动也私有,外部不允许直接调用,只有通过GetInstance()获取单例void Start(){if (_isrunning)return;_isrunning = true;for (auto &thread : _threads){thread.Start();LOG(LogLevel::INFO) << "start new thread success: " << thread.Name();}}
3. 线程池怎么去执行任务?
- 所有的线程都需要去执行这个函数
void HandlerTask(){char name[128];pthread_getname_np(pthread_self(), name, sizeof(name));while (true){T t;{LockGuard lock(_mutex);// 1. a.队列为空 b. 线程池没有退出while (_taskq.empty() && _isrunning){_sleepernum++;_cond.Wait(_mutex);_sleepernum--;}// 2. 内部的线程被唤醒if (!_isrunning && _taskq.empty()){LOG(LogLevel::INFO) << name << " 退出了, 线程池退出&&任务队列为空";break;}// 一定有任务t = _taskq.front();_taskq.pop();}t(); // 处理任务在临界区外}}
- 细节1,当刚开始任务队列空的时候 while (_taskq.empty() && _isrunning),所有线程都会跑过去等待,同时记录等待的线程数
- 细节2, 我们的任务对象T是要在临界区外的,处理任务是并发的
- 所有的任务通过这个函数进入任务队列
bool Enqueue(const T &in){if (_isrunning){LockGuard lockguard(_mutex);_taskq.push(in);if (_threads.size() == _sleepernum)WakeUpOne();return true;}return false;}
当所有的线程都在等待且有一个任务来了,就唤醒一个线程让他去执行任务,此事被唤醒的线程就会被执行
// 一定有任务t = _taskq.front();_taskq.pop();}t(); // 处理任务在临界区外
4. 线程池怎么样去退出?
- 退出时需要先将标志位记为false,用户需要手动调用Stop
void Stop(){if (!_isrunning)return;_isrunning = false;// 唤醒所有线程WakeUpAllThread();}
- 然后所有的线程会被唤醒不让他等待了
- 接下来会进入 void HandlerTask()的判断
void HandlerTask(){char name[128];pthread_getname_np(pthread_self(), name, sizeof(name));while (true){T t;{LockGuard lock(_mutex);// 1. a.队列为空 b. 线程池没有退出while (_taskq.empty() && _isrunning){_sleepernum++;_cond.Wait(_mutex);_sleepernum--;}// 2. 内部的线程被唤醒if (!_isrunning && _taskq.empty()){LOG(LogLevel::INFO) << name << " 退出了, 线程池退出&&任务队列为空";break;}// 一定有任务t = _taskq.front();_taskq.pop();}t(); // 处理任务在临界区外}}
细节1:正在处理任务的线程,会将任务处理完t(); // 处理任务在临界区外
细节2:等待中的线程会因为标志位而进去if条件退出循环`_cond.Wait(_mutex);\// 2. 内部的线程被唤醒if (!_isrunning && _taskq.empty())`
细节3:准备等待的while (_taskq.empty() && _isrunning),也会因为标志位而进不去
5. 线程池完整代码:
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include "Log.hpp"
#include "Thread.hpp"
#include "Cond.hpp"
#include "Mutex.hpp"namespace ThreadPoolModule
{using namespace ThreadModlue;using namespace LogModule;using namespace CondModule;using namespace MutexModule;static const int gnum = 5;template <typename T>class ThreadPool{private:ThreadPool(int num = gnum): _num(num){for (int i = 0; i < _num; i++){_threads.emplace_back([this](){HandlerTask();});}}ThreadPool(const ThreadPool &) = delete;ThreadPool &operator=(const ThreadPool &) = delete;// 启动也私有,外部不允许直接调用,只有通过GetInstance()获取单例void Start(){if (_isrunning)return;_isrunning = true;for (auto &thread : _threads){thread.Start();LOG(LogLevel::INFO) << "start new thread success: " << thread.Name();}}void WakeUpAllThread(){LockGuard lock(_mutex);if (_sleepernum){_cond.Broadcast();}LOG(LogLevel::INFO) << "唤醒所有的休眠线程";}void WakeUpOne(){_cond.Signal();LOG(LogLevel::INFO) << "唤醒一个休眠线程";}public:static ThreadPool<T> *GetInstance(){if (_inc == nullptr) // 这里判断是因为多个生产者线程避免都等待这把锁导致效率降低,因此加锁{LockGuard lock(_lock); // 这里加锁是因为如果有多个生产者线程同时调用GetInstance()函数,可能会造成多个线程同时创建单例,因此加锁LOG(LogLevel::DEBUG) << "获取单例...";if (_inc == nullptr) // 此处判断的原因是如果创建单例了,就直接返回不应再创建了{LOG(LogLevel::DEBUG) << "创建单例...";_inc = new ThreadPool<T>();_inc->Start();}}return _inc;}void Stop(){if (!_isrunning)return;_isrunning = false;// 唤醒所有线程WakeUpAllThread();}void Join(){for (auto &t : _threads){t.Join();}}void HandlerTask(){char name[128];pthread_getname_np(pthread_self(), name, sizeof(name));while (true){T t;{LockGuard lock(_mutex);// 1. a.队列为空 b. 线程池没有退出while (_taskq.empty() && _isrunning){_sleepernum++;_cond.Wait(_mutex);_sleepernum--;}// 2. 内部的线程被唤醒if (!_isrunning && _taskq.empty()){LOG(LogLevel::INFO) << name << " 退出了, 线程池退出&&任务队列为空";break;}// 一定有任务t = _taskq.front();_taskq.pop();}t(); // 处理任务在临界区外}}bool Enqueue(const T &in){if (_isrunning){LockGuard lock(_mutex);_taskq.push(in);if (_threads.size() == _sleepernum)WakeUpOne();return true;}return false;}~ThreadPool() {}private:std::vector<Thread> _threads;int _num; // 线程池中线程的个数std::queue<T> _taskq; // 任务队列Mutex _mutex; // 互斥锁Cond _cond; // 条件变量static ThreadPool<T> *_inc; // 单例指针static Mutex _lock; // 单例锁bool _isrunning;int _sleepernum;};template <typename T>ThreadPool<T> *ThreadPool<T>::_inc = nullptr;template <typename T>Mutex ThreadPool<T>::_lock;}