【Linux】系统部分——线程池的基本设计思路
30.线程池
文章目录
- 30.线程池
- 线程池的基本概念与设计原理
- 自主设计线程池
- 线程池包含对象
- 线程池的接口设计
- 线程调用方法
- 测试用例
- 其他自定义类型
线程池的基本概念与设计原理
- 线程池是一种线程使用模式,通过预先创建多个线程来提高效率。每次任务到来时创建线程会产生开销,影响整体效率。线程池内部维护多个线程,无任务时大部分线程休眠,有任务时唤醒指定线程处理。
- 线程池的线程数目有上限,可以控制系统负载时的线程总数波动,保证系统稳定性。线程池适用于多种场景,如浏览器或手机app访问网站时的外部响应、批量化计算、数据存储、数据库访问等。虽然线程池不是编写服务器的最主要方式,但学习价值较高。
- 线程池种类包括固定数量线程池和浮动线程池,固定数量线程池从任务队列获取任务执行,浮动线程池根据任务量动态调整线程数量。这里采用固定线程池设计,内部预先创建一批线程,提供任务队列,用户从外部提交任务,有任务时唤醒线程,无任务时线程休眠。
自主设计线程池
#pragma once#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include <queue>
#include <memory>
#include "Log.hpp"
#include "Mutex.hpp"
#include "Cond.hpp"
#include "Thread.hpp"namespace My_ThreadPool
{using namespace My_Mutex;using namespace My_Log;using namespace My_Cond;using namespace My_Thread;using thread_t = std::shared_ptr<Thread>;const static int defaultnum = 5;void DefaultTest()//前期测试代码{while (true){LOG(LogLevel::DEBUG) << "Test......";sleep(1);}}template <class T>class ThreadPool{private:bool IsEmpty() { return _ptasks.empty(); }void HanderTask(std::string name){T t;LOG(LogLevel::INFO) << "线程: " << name << "执行HanderTask方法";while (true){{LockGuard lockguard(_lock);while (IsEmpty() && _isrunning){_wait_num++;_cond.Wait(_lock);_wait_num--;}if (IsEmpty() && !_isrunning)break;t = _ptasks.front();_ptasks.pop();}t(name);}LOG(LogLevel::INFO) << "线程: " << name << "退出HanderTask方法";}public:ThreadPool(int num = defaultnum) : _num(num), _isrunning(false){for (int i = 0; i < _num; i++){_threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HanderTask, this, std::placeholders::_1)));LOG(LogLevel::INFO) << "创建线程: " << _threads.back()->Name() << "...Success";}}void Equeue(T t){LockGuard lockguard(_lock); // 加锁if (!_isrunning)return;_ptasks.push(t);if (_wait_num)_cond.Weak();}void Start(){if (_isrunning)return;_isrunning = true;for (auto &t : _threads){t->Start();LOG(LogLevel::INFO) << "启动线程: " << t->Name() << "...Success";}}void Wait(){for (auto &t : _threads){t->Jion();LOG(LogLevel::INFO) << "线程: " << t->Name() << "回收";}}void Stop(){if (_isrunning){_isrunning = false;if (_wait_num)_cond.WeakAll(); // 如果有线程在可变参数下等待,唤醒所有线程,把没有执行完成的任务(如果有)去执行}}~ThreadPool() {}private:std::vector<thread_t> _threads; // 这里储存的是一个一个的指向Thread的智能指针int _num; // 线程个数std::queue<T> _ptasks; // 任务的指针队列Mutex _lock;Cond _cond;int _wait_num;bool _isrunning; // 线程池目前的工作状态};
}
- 线程池需要一个任务队列(task queue)来存储待执行的任务,任务队列可以存储各种类型的任务,包括对象和函数。由于需要支持不同函数类型和类类型,所以我们把线程池定义为一个类模版,方便后续使用时修改任务类型
线程池包含对象
std::vector<thread_t> _threads
:线程池的目的是用来管理一系列的线程,因此线程池需要一个容器(如vector)来管理所有的线程实体,可以使用智能指针(如shared_ptr)来管理线程对象,vector
用来储存指向Thread的智能指针。线程池在启动时需要创建指定数量的线程对象,并将这些对象存储在容器中。线程对象的创建可以通过循环和push_back操作完成,使用make_shared来创建线程对象并将其添加到容器中。(具体看构造函数的方法)_num
:线程个数,用于对所有线程的遍历操作比如线程等待,线程初始化std::queue<T> _ptasks;
:线程池需要一个任务队列(task queue)来存储待执行的任务,任务队列可以存储各种类型的任务,包括对象和函数Mutex _lock;
:对线程池的成员进行判断、++/–等操作都不是原子性的操作,多线程并行执行的时候可能会出现数据不一致的问题,因此在这些情况下需要加锁处理(保证线程互斥)Cond _cond;
:保证线程同步,让线程能够按照某种特定的顺序访问临界资源,从⽽有效避免饥饿问题_wait_num
:线程池运行时,某一时刻在条件变量下等待的线程数,用于判断是否需要执行唤醒操作_isrunning
:线程池目前的工作状态,修改这个状态可以让线程池停止
线程池的接口设计
线程池需要提供基本的接口,包括Start
方法用于启动线程池,Stop
方法用于停止线程池,Wait
方法用于回收线程池,以及Equeue
方法用于将任务加入队列。当然还有必不可少的构造和析构。
-
构造函数:在定义线程池时线程池需要调用构造函数,创建线程的智能指针,设置线程工作状态,并打印日志:
ThreadPool(int num = defaultnum) : _num(num), _isrunning(false) {for (int i = 0; i < _num; i++){_threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HanderTask, this, std::placeholders::_1)));LOG(LogLevel::INFO) << "创建线程: " << _threads.back()->Name() << "...Success";} }
_threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HanderTask, this, std::placeholders::_1)));
这行代码相当于为线程池创建了多个工作线程,每个线程都:- 绑定到同一个线程池实例(
this
) - 执行相同的任务处理逻辑(
HanderTask
) - 但可以并行处理不同的任务
- 绑定到同一个线程池实例(
可以把这个复杂的表达式理解为:
// 创建一个函数,这个函数会调用当前对象的HanderTask方法。这个代码片段定义了一个lambda表达式,并将其赋值给一个名为auto thread_func的变量。这个lambda表达式的功能是调用类成员函数HanderTask,并将传入的字符串参数name传递给该函数。
auto thread_func = [this](std::string name)
{this->HanderTask(name);
};// 用这个函数创建线程,并保存到线程列表中
_threads.push_back(std::make_shared<Thread>(thread_func));
-
析构函数:在线程池中我们没有用到动态开辟内存空间,所以析构函数没有必须要执行的代码
~ThreadPool() {}
-
Start
方法用于启动线程池:调用封装好的线程类的Start
方法创建线程并打印日志,注意需要提前判断当前线程池的状态_isrunning
,因为在构造函数中我们把线程池的初始状态设置为false,只有在这种状态下才需要Startvoid Start() {if (_isrunning)return;_isrunning = true;for (auto &t : _threads){t->Start();LOG(LogLevel::INFO) << "启动线程: " << t->Name() << "...Success";} }
-
Equeue
方法用于将任务加入队列:当线程池处于运行状态,我们可以调用Equeue
函数将需要执行的任务(函数)传入线程池,这个函数在把任务入队列之后需要唤醒正在_cond
条件变量下阻塞的线程void Equeue(T t) {LockGuard lockguard(_lock); // 加锁if (!_isrunning)return;_ptasks.push(t);if (_wait_num)_cond.Weak(); }
-
Stop
方法用于停止线程池:
void Stop()
{if (_isrunning){_isrunning = false;if (_wait_num)_cond.WeakAll(); // 如果有线程在可变参数下等待,唤醒所有线程,把没有执行完成的任务(如果有)去执行}
}
线程调用方法
void HanderTask(std::string name)
{T t;LOG(LogLevel::INFO) << "线程: " << name << "执行HanderTask方法";while (true){{LockGuard lockguard(_lock);while (IsEmpty() && _isrunning){_wait_num++;_cond.Wait(_lock);_wait_num--;}if (IsEmpty() && !_isrunning)break;t = _ptasks.front();_ptasks.pop();}t(name);}LOG(LogLevel::INFO) << "线程: " << name << "退出HanderTask方法";
}
-
T t;
: 声明一个类型为T的对象t。T是线程池模板参数,代表任务类型。这个对象将被用来存储从任务队列中取出的任务。 -
LOG(LogLevel::INFO) << "线程: " << name << "执行HanderTask方法";
: 打印日志,表示线程开始执行HanderTask方法。 -
while (true)
: 无限循环,直到线程池停止并且任务队列为空时跳出。 -
进入循环体,首先是一个代码块,用于限制锁的作用范围:
{LockGuard lockguard(_lock); // 获取互斥锁,保证对任务队列的访问是互斥的。 while (IsEmpty() && _isrunning) // 如果任务队列为空并且线程池正在运行,则进入等待。{_wait_num++; // 增加等待线程计数_cond.Wait(_lock); // 等待条件变量,释放锁并阻塞,直到被唤醒。被唤醒后会重新获取锁。_wait_num--; // 被唤醒后,减少等待线程计数}if (IsEmpty() && !_isrunning) // 如果任务队列为空且线程池已停止,则退出循环。break;t = _ptasks.front(); // 从任务队列中取出一个任务_ptasks.pop(); // 将任务从队列中移除 } // 锁的作用域结束,自动释放锁。
-
执行任务:
t(name);
。注意,这个操作是在锁外执行的,这样多个任务可以并发执行,而不会互相阻塞。 -
循环结束后,打印日志表示线程退出HanderTask方法。
注意:在条件变量等待时,使用while循环检查条件是为了防止虚假唤醒。即使被虚假唤醒,如果条件不满足(队列仍为空且线程池在运行),线程会继续等待。
另外,在等待条件变量时,先增加了_wait_num,等待后减少,这是为了在停止线程池时能够知道有多少线程正在等待,以便唤醒它们。
在取出任务后,锁就被释放了,这样其他线程就可以同时从队列中取任务(如果队列中有多个任务)。而任务执行是在锁外进行的,这允许并发执行任务。
最后,当线程池停止时,会唤醒所有等待的线程,它们会检查条件,发现线程池已停止且队列为空,就会退出循环。
测试用例
#include "ThreadPool.hpp"
#include "Task.hpp"int main()
{ENABLE_CONSOLE_LOG();std::unique_ptr<My_ThreadPool::ThreadPool<task_t>> tp = std::make_unique<My_ThreadPool::ThreadPool<task_t>>();tp->Start();for(int i = 0; i < 10; i++){tp->Equeue(Push);sleep(1);}tp->Stop();tp->Wait();return 0;
}
user@iZ7xvdsb1wn2io90klvtwlZ:~/lesson33/ThreadPool$ ./threadpool
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [74] - 创建线程: thread-1...Success
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [74] - 创建线程: thread-2...Success
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [74] - 创建线程: thread-3...Success
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [74] - 创建线程: thread-4...Success
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [74] - 创建线程: thread-5...Success
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [94] - 启动线程: thread-1...Success
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [94] - 启动线程: thread-2...Success
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [44] - 线程: thread-1执行HanderTask方法
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [94] - 启动线程: thread-3...Success
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [94] - 启动线程: thread-4...Success
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [94] - 启动线程: thread-5...Success
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [44] - 线程: thread-4执行HanderTask方法
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [44] - 线程: thread-2执行HanderTask方法
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [44] - 线程: thread-3执行HanderTask方法
[2025-09-22 20:21:50] [DEBUG] [103745] [Task.hpp] [19] - 我是一个推送数据到服务器的一个任务, 我正在被执行[thread-3]
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [44] - 线程: thread-5执行HanderTask方法
[2025-09-22 20:21:51] [DEBUG] [103745] [Task.hpp] [19] - 我是一个推送数据到服务器的一个任务, 我正在被执行[thread-4]
[2025-09-22 20:21:52] [DEBUG] [103745] [Task.hpp] [19] - 我是一个推送数据到服务器的一个任务, 我正在被执行[thread-2]
[2025-09-22 20:21:53] [DEBUG] [103745] [Task.hpp] [19] - 我是一个推送数据到服务器的一个任务, 我正在被执行[thread-3]
[2025-09-22 20:21:54] [DEBUG] [103745] [Task.hpp] [19] - 我是一个推送数据到服务器的一个任务, 我正在被执行[thread-5]
[2025-09-22 20:21:55] [DEBUG] [103745] [Task.hpp] [19] - 我是一个推送数据到服务器的一个任务, 我正在被执行[thread-1]
[2025-09-22 20:21:56] [DEBUG] [103745] [Task.hpp] [19] - 我是一个推送数据到服务器的一个任务, 我正在被执行[thread-4]
[2025-09-22 20:21:57] [DEBUG] [103745] [Task.hpp] [19] - 我是一个推送数据到服务器的一个任务, 我正在被执行[thread-2]
[2025-09-22 20:21:58] [DEBUG] [103745] [Task.hpp] [19] - 我是一个推送数据到服务器的一个任务, 我正在被执行[thread-3]
[2025-09-22 20:21:59] [DEBUG] [103745] [Task.hpp] [19] - 我是一个推送数据到服务器的一个任务, 我正在被执行[thread-5]
[2025-09-22 20:22:00] [INFO] [103745] [ThreadPool.hpp] [65] - 线程: thread-4退出HanderTask方法
[2025-09-22 20:22:00] [INFO] [103745] [ThreadPool.hpp] [65] - 线程: thread-2退出HanderTask方法
[2025-09-22 20:22:00] [INFO] [103745] [ThreadPool.hpp] [65] - 线程: thread-1退出HanderTask方法
[2025-09-22 20:22:00] [INFO] [103745] [ThreadPool.hpp] [102] - 线程: thread-1回收
[2025-09-22 20:22:00] [INFO] [103745] [ThreadPool.hpp] [102] - 线程: thread-2回收
[2025-09-22 20:22:00] [INFO] [103745] [ThreadPool.hpp] [65] - 线程: thread-3退出HanderTask方法
[2025-09-22 20:22:00] [INFO] [103745] [ThreadPool.hpp] [65] - 线程: thread-5退出HanderTask方法
[2025-09-22 20:22:00] [INFO] [103745] [ThreadPool.hpp] [102] - 线程: thread-3回收
[2025-09-22 20:22:00] [INFO] [103745] [ThreadPool.hpp] [102] - 线程: thread-4回收
[2025-09-22 20:22:00] [INFO] [103745] [ThreadPool.hpp] [102] - 线程: thread-5回收
其他自定义类型
//Mutex.hpp#pragma once#include <iostream>
#include <pthread.h>namespace My_Mutex
{class Mutex{public:Mutex(const Mutex &) = delete;const Mutex &operator=(const Mutex &) = delete;Mutex(){int n = pthread_mutex_init(&_lock, nullptr);if (n != 0){std::cerr << "err: pthread_mutex_init" << std::endl;exit(1);}}int Lock(){return pthread_mutex_lock(&_lock);}int Unlock(){return pthread_mutex_unlock(&_lock);}pthread_mutex_t* MutexPtr(){return &_lock;}~Mutex(){int n = pthread_mutex_destroy(&_lock);if (n != 0){std::cerr << "err: pthread_mutex_destroy" << std::endl;exit(2);}}private:pthread_mutex_t _lock;};class LockGuard{public:LockGuard(Mutex &lock):_lock(lock){_lock.Lock();}~LockGuard(){_lock.Unlock();}private:Mutex &_lock;};
}
//Cond.hpp
#pragma once#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"namespace My_Cond
{class Cond{public:Cond(){pthread_cond_init(&_cond, nullptr);}void Wait(My_Mutex::Mutex &mutex){pthread_cond_wait(&_cond, mutex.MutexPtr());}void Weak(){pthread_cond_signal(&_cond);}void WeakAll(){pthread_cond_broadcast(&_cond);}~Cond(){pthread_cond_destroy(&_cond);}private:pthread_cond_t _cond;};} // namespace My_Cond
//Thread.hpp
#pragma once#include <iostream>
#include <string>
#include <functional>
#include <unistd.h>namespace My_Thread
{using func_t = std::function<void(std::string name)>;static int num = 1; // 线程个数计数enum class TSTATUS{NEW,RUNNING,STOP};class Thread{private:static void *routine(void *args){Thread *t = static_cast<Thread *>(args);t->_status = TSTATUS::RUNNING;t->_func(t->Name());return nullptr;}public:Thread(func_t func) : _func(func){_name = "thread-" + std::to_string(num++); // 默认命名_status = TSTATUS::NEW;_joinable = true;_pid = getpid();}bool Start(){if (_status == TSTATUS::RUNNING)return true;int n = ::pthread_create(&_tid, nullptr, routine, this);if (n != 0)return false;return true;}bool Jion(){if (_joinable){int n = ::pthread_join(_tid, nullptr);if (n != 0)return false;_status = TSTATUS::STOP;return true;}return false;}bool Stop(){if (_status == TSTATUS::RUNNING){int n = ::pthread_cancel(_tid);if (n != 0)return false;_status = TSTATUS::STOP;return true;}return false;}bool Detach(){_joinable = false;int n = ::pthread_detach(_tid);if (n != 0)return false;return true;}bool IsJoinable(){return _joinable;}std::string Name(){return _name;}~Thread(){}private:std::string _name;pthread_t _tid;pid_t _pid;bool _joinable;func_t _func;TSTATUS _status;};
}
日志类后面会有一篇博客讲解…