当前位置: 首页 > news >正文

【Linux】系统部分——线程池的基本设计思路

30.线程池

文章目录

  • 30.线程池
      • 线程池的基本概念与设计原理
      • 自主设计线程池
        • 线程池包含对象
        • 线程池的接口设计
        • 线程调用方法
      • 测试用例
      • 其他自定义类型

线程池的基本概念与设计原理

  • 线程池是一种线程使用模式,通过预先创建多个线程来提高效率。每次任务到来时创建线程会产生开销,影响整体效率。线程池内部维护多个线程,无任务时大部分线程休眠,有任务时唤醒指定线程处理。
  • 线程池的线程数目有上限,可以控制系统负载时的线程总数波动,保证系统稳定性。线程池适用于多种场景,如浏览器或手机app访问网站时的外部响应、批量化计算、数据存储、数据库访问等。虽然线程池不是编写服务器的最主要方式,但学习价值较高。
  • 线程池种类包括固定数量线程池和浮动线程池,固定数量线程池从任务队列获取任务执行,浮动线程池根据任务量动态调整线程数量。这里采用固定线程池设计,内部预先创建一批线程,提供任务队列,用户从外部提交任务,有任务时唤醒线程,无任务时线程休眠。

自主设计线程池

#pragma once#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include <queue>
#include <memory>
#include "Log.hpp"
#include "Mutex.hpp"
#include "Cond.hpp"
#include "Thread.hpp"namespace My_ThreadPool
{using namespace My_Mutex;using namespace My_Log;using namespace My_Cond;using namespace My_Thread;using thread_t = std::shared_ptr<Thread>;const static int defaultnum = 5;void DefaultTest()//前期测试代码{while (true){LOG(LogLevel::DEBUG) << "Test......";sleep(1);}}template <class T>class ThreadPool{private:bool IsEmpty() { return _ptasks.empty(); }void HanderTask(std::string name){T t;LOG(LogLevel::INFO) << "线程: " << name << "执行HanderTask方法";while (true){{LockGuard lockguard(_lock);while (IsEmpty() && _isrunning){_wait_num++;_cond.Wait(_lock);_wait_num--;}if (IsEmpty() && !_isrunning)break;t = _ptasks.front();_ptasks.pop();}t(name);}LOG(LogLevel::INFO) << "线程: " << name << "退出HanderTask方法";}public:ThreadPool(int num = defaultnum) : _num(num), _isrunning(false){for (int i = 0; i < _num; i++){_threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HanderTask, this, std::placeholders::_1)));LOG(LogLevel::INFO) << "创建线程: " << _threads.back()->Name() << "...Success";}}void Equeue(T t){LockGuard lockguard(_lock); // 加锁if (!_isrunning)return;_ptasks.push(t);if (_wait_num)_cond.Weak();}void Start(){if (_isrunning)return;_isrunning = true;for (auto &t : _threads){t->Start();LOG(LogLevel::INFO) << "启动线程: " << t->Name() << "...Success";}}void Wait(){for (auto &t : _threads){t->Jion();LOG(LogLevel::INFO) << "线程: " << t->Name() << "回收";}}void Stop(){if (_isrunning){_isrunning = false;if (_wait_num)_cond.WeakAll(); // 如果有线程在可变参数下等待,唤醒所有线程,把没有执行完成的任务(如果有)去执行}}~ThreadPool() {}private:std::vector<thread_t> _threads; // 这里储存的是一个一个的指向Thread的智能指针int _num;                       // 线程个数std::queue<T> _ptasks;          // 任务的指针队列Mutex _lock;Cond _cond;int _wait_num;bool _isrunning; // 线程池目前的工作状态};
}
  • 线程池需要一个任务队列(task queue)来存储待执行的任务,任务队列可以存储各种类型的任务,包括对象和函数。由于需要支持不同函数类型和类类型,所以我们把线程池定义为一个类模版,方便后续使用时修改任务类型
线程池包含对象
  • std::vector<thread_t> _threads:线程池的目的是用来管理一系列的线程,因此线程池需要一个容器(如vector)来管理所有的线程实体,可以使用智能指针(如shared_ptr)来管理线程对象,vector用来储存指向Thread的智能指针。线程池在启动时需要创建指定数量的线程对象,并将这些对象存储在容器中。线程对象的创建可以通过循环和push_back操作完成,使用make_shared来创建线程对象并将其添加到容器中。(具体看构造函数的方法)
  • _num:线程个数,用于对所有线程的遍历操作比如线程等待,线程初始化
  • std::queue<T> _ptasks;:线程池需要一个任务队列(task queue)来存储待执行的任务,任务队列可以存储各种类型的任务,包括对象和函数
  • Mutex _lock;:对线程池的成员进行判断、++/–等操作都不是原子性的操作,多线程并行执行的时候可能会出现数据不一致的问题,因此在这些情况下需要加锁处理(保证线程互斥)
  • Cond _cond;:保证线程同步,让线程能够按照某种特定的顺序访问临界资源,从⽽有效避免饥饿问题
  • _wait_num:线程池运行时,某一时刻在条件变量下等待的线程数,用于判断是否需要执行唤醒操作
  • _isrunning:线程池目前的工作状态,修改这个状态可以让线程池停止
线程池的接口设计

线程池需要提供基本的接口,包括Start方法用于启动线程池,Stop方法用于停止线程池,Wait方法用于回收线程池,以及Equeue方法用于将任务加入队列。当然还有必不可少的构造和析构。

  • 构造函数:在定义线程池时线程池需要调用构造函数,创建线程的智能指针,设置线程工作状态,并打印日志:

    ThreadPool(int num = defaultnum) : _num(num), _isrunning(false)
    {for (int i = 0; i < _num; i++){_threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HanderTask, this, std::placeholders::_1)));LOG(LogLevel::INFO) << "创建线程: " << _threads.back()->Name() << "...Success";}
    }
    

    _threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HanderTask, this, std::placeholders::_1)));这行代码相当于为线程池创建了多个工作线程,每个线程都:

    • 绑定到同一个线程池实例(this
    • 执行相同的任务处理逻辑(HanderTask
    • 但可以并行处理不同的任务

可以把这个复杂的表达式理解为:

// 创建一个函数,这个函数会调用当前对象的HanderTask方法。这个代码片段定义了一个lambda表达式,并将其赋值给一个名为auto thread_func的变量。这个lambda表达式的功能是调用类成员函数HanderTask,并将传入的字符串参数name传递给该函数。
auto thread_func = [this](std::string name) 
{this->HanderTask(name);
};// 用这个函数创建线程,并保存到线程列表中
_threads.push_back(std::make_shared<Thread>(thread_func));
  • 析构函数:在线程池中我们没有用到动态开辟内存空间,所以析构函数没有必须要执行的代码

    ~ThreadPool() {}
    
  • Start方法用于启动线程池:调用封装好的线程类的Start方法创建线程并打印日志,注意需要提前判断当前线程池的状态_isrunning,因为在构造函数中我们把线程池的初始状态设置为false,只有在这种状态下才需要Start

    void Start()
    {if (_isrunning)return;_isrunning = true;for (auto &t : _threads){t->Start();LOG(LogLevel::INFO) << "启动线程: " << t->Name() << "...Success";}
    }
    
  • Equeue方法用于将任务加入队列:当线程池处于运行状态,我们可以调用Equeue函数将需要执行的任务(函数)传入线程池,这个函数在把任务入队列之后需要唤醒正在_cond条件变量下阻塞的线程

    void Equeue(T t)
    {LockGuard lockguard(_lock); // 加锁if (!_isrunning)return;_ptasks.push(t);if (_wait_num)_cond.Weak();
    }
    
  • Stop方法用于停止线程池:

void Stop()
{if (_isrunning){_isrunning = false;if (_wait_num)_cond.WeakAll(); // 如果有线程在可变参数下等待,唤醒所有线程,把没有执行完成的任务(如果有)去执行}
}
线程调用方法
void HanderTask(std::string name)
{T t;LOG(LogLevel::INFO) << "线程: " << name << "执行HanderTask方法";while (true){{LockGuard lockguard(_lock);while (IsEmpty() && _isrunning){_wait_num++;_cond.Wait(_lock);_wait_num--;}if (IsEmpty() && !_isrunning)break;t = _ptasks.front();_ptasks.pop();}t(name);}LOG(LogLevel::INFO) << "线程: " << name << "退出HanderTask方法";
}
  1. T t;: 声明一个类型为T的对象t。T是线程池模板参数,代表任务类型。这个对象将被用来存储从任务队列中取出的任务。

  2. LOG(LogLevel::INFO) << "线程: " << name << "执行HanderTask方法";: 打印日志,表示线程开始执行HanderTask方法。

  3. while (true): 无限循环,直到线程池停止并且任务队列为空时跳出。

  4. 进入循环体,首先是一个代码块,用于限制锁的作用范围:

    {LockGuard lockguard(_lock); // 获取互斥锁,保证对任务队列的访问是互斥的。 while (IsEmpty() && _isrunning)  // 如果任务队列为空并且线程池正在运行,则进入等待。{_wait_num++;  // 增加等待线程计数_cond.Wait(_lock);  // 等待条件变量,释放锁并阻塞,直到被唤醒。被唤醒后会重新获取锁。_wait_num--;  // 被唤醒后,减少等待线程计数}if (IsEmpty() && !_isrunning)  // 如果任务队列为空且线程池已停止,则退出循环。break;t = _ptasks.front();  // 从任务队列中取出一个任务_ptasks.pop();        // 将任务从队列中移除
    } // 锁的作用域结束,自动释放锁。
    
  5. 执行任务:t(name);。注意,这个操作是在锁外执行的,这样多个任务可以并发执行,而不会互相阻塞。

  6. 循环结束后,打印日志表示线程退出HanderTask方法。

注意:在条件变量等待时,使用while循环检查条件是为了防止虚假唤醒。即使被虚假唤醒,如果条件不满足(队列仍为空且线程池在运行),线程会继续等待。

另外,在等待条件变量时,先增加了_wait_num,等待后减少,这是为了在停止线程池时能够知道有多少线程正在等待,以便唤醒它们。

在取出任务后,锁就被释放了,这样其他线程就可以同时从队列中取任务(如果队列中有多个任务)。而任务执行是在锁外进行的,这允许并发执行任务。

最后,当线程池停止时,会唤醒所有等待的线程,它们会检查条件,发现线程池已停止且队列为空,就会退出循环。

测试用例

#include "ThreadPool.hpp"
#include "Task.hpp"int main()
{ENABLE_CONSOLE_LOG();std::unique_ptr<My_ThreadPool::ThreadPool<task_t>> tp = std::make_unique<My_ThreadPool::ThreadPool<task_t>>();tp->Start();for(int i = 0; i < 10; i++){tp->Equeue(Push);sleep(1);}tp->Stop();tp->Wait();return 0;
}
user@iZ7xvdsb1wn2io90klvtwlZ:~/lesson33/ThreadPool$ ./threadpool 
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [74] - 创建线程: thread-1...Success
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [74] - 创建线程: thread-2...Success
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [74] - 创建线程: thread-3...Success
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [74] - 创建线程: thread-4...Success
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [74] - 创建线程: thread-5...Success
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [94] - 启动线程: thread-1...Success
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [94] - 启动线程: thread-2...Success
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [44] - 线程: thread-1执行HanderTask方法
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [94] - 启动线程: thread-3...Success
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [94] - 启动线程: thread-4...Success
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [94] - 启动线程: thread-5...Success
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [44] - 线程: thread-4执行HanderTask方法
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [44] - 线程: thread-2执行HanderTask方法
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [44] - 线程: thread-3执行HanderTask方法
[2025-09-22 20:21:50] [DEBUG] [103745] [Task.hpp] [19] - 我是一个推送数据到服务器的一个任务, 我正在被执行[thread-3]
[2025-09-22 20:21:50] [INFO] [103745] [ThreadPool.hpp] [44] - 线程: thread-5执行HanderTask方法
[2025-09-22 20:21:51] [DEBUG] [103745] [Task.hpp] [19] - 我是一个推送数据到服务器的一个任务, 我正在被执行[thread-4]
[2025-09-22 20:21:52] [DEBUG] [103745] [Task.hpp] [19] - 我是一个推送数据到服务器的一个任务, 我正在被执行[thread-2]
[2025-09-22 20:21:53] [DEBUG] [103745] [Task.hpp] [19] - 我是一个推送数据到服务器的一个任务, 我正在被执行[thread-3]
[2025-09-22 20:21:54] [DEBUG] [103745] [Task.hpp] [19] - 我是一个推送数据到服务器的一个任务, 我正在被执行[thread-5]
[2025-09-22 20:21:55] [DEBUG] [103745] [Task.hpp] [19] - 我是一个推送数据到服务器的一个任务, 我正在被执行[thread-1]
[2025-09-22 20:21:56] [DEBUG] [103745] [Task.hpp] [19] - 我是一个推送数据到服务器的一个任务, 我正在被执行[thread-4]
[2025-09-22 20:21:57] [DEBUG] [103745] [Task.hpp] [19] - 我是一个推送数据到服务器的一个任务, 我正在被执行[thread-2]
[2025-09-22 20:21:58] [DEBUG] [103745] [Task.hpp] [19] - 我是一个推送数据到服务器的一个任务, 我正在被执行[thread-3]
[2025-09-22 20:21:59] [DEBUG] [103745] [Task.hpp] [19] - 我是一个推送数据到服务器的一个任务, 我正在被执行[thread-5]
[2025-09-22 20:22:00] [INFO] [103745] [ThreadPool.hpp] [65] - 线程: thread-4退出HanderTask方法
[2025-09-22 20:22:00] [INFO] [103745] [ThreadPool.hpp] [65] - 线程: thread-2退出HanderTask方法
[2025-09-22 20:22:00] [INFO] [103745] [ThreadPool.hpp] [65] - 线程: thread-1退出HanderTask方法
[2025-09-22 20:22:00] [INFO] [103745] [ThreadPool.hpp] [102] - 线程: thread-1回收
[2025-09-22 20:22:00] [INFO] [103745] [ThreadPool.hpp] [102] - 线程: thread-2回收
[2025-09-22 20:22:00] [INFO] [103745] [ThreadPool.hpp] [65] - 线程: thread-3退出HanderTask方法
[2025-09-22 20:22:00] [INFO] [103745] [ThreadPool.hpp] [65] - 线程: thread-5退出HanderTask方法
[2025-09-22 20:22:00] [INFO] [103745] [ThreadPool.hpp] [102] - 线程: thread-3回收
[2025-09-22 20:22:00] [INFO] [103745] [ThreadPool.hpp] [102] - 线程: thread-4回收
[2025-09-22 20:22:00] [INFO] [103745] [ThreadPool.hpp] [102] - 线程: thread-5回收

其他自定义类型

//Mutex.hpp#pragma once#include <iostream>
#include <pthread.h>namespace My_Mutex
{class Mutex{public:Mutex(const Mutex &) = delete;const Mutex &operator=(const Mutex &) = delete;Mutex(){int n = pthread_mutex_init(&_lock, nullptr);if (n != 0){std::cerr << "err: pthread_mutex_init" << std::endl;exit(1);}}int Lock(){return pthread_mutex_lock(&_lock);}int Unlock(){return pthread_mutex_unlock(&_lock);}pthread_mutex_t* MutexPtr(){return &_lock;}~Mutex(){int n = pthread_mutex_destroy(&_lock);if (n != 0){std::cerr << "err: pthread_mutex_destroy" << std::endl;exit(2);}}private:pthread_mutex_t _lock;};class LockGuard{public:LockGuard(Mutex &lock):_lock(lock){_lock.Lock();}~LockGuard(){_lock.Unlock();}private:Mutex &_lock;};
}
//Cond.hpp
#pragma once#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"namespace My_Cond
{class Cond{public:Cond(){pthread_cond_init(&_cond, nullptr);}void Wait(My_Mutex::Mutex &mutex){pthread_cond_wait(&_cond, mutex.MutexPtr());}void Weak(){pthread_cond_signal(&_cond);}void WeakAll(){pthread_cond_broadcast(&_cond);}~Cond(){pthread_cond_destroy(&_cond);}private:pthread_cond_t _cond;};} // namespace My_Cond
//Thread.hpp
#pragma once#include <iostream>
#include <string>
#include <functional>
#include <unistd.h>namespace My_Thread
{using func_t = std::function<void(std::string name)>;static int num = 1; // 线程个数计数enum class TSTATUS{NEW,RUNNING,STOP};class Thread{private:static void *routine(void *args){Thread *t = static_cast<Thread *>(args);t->_status = TSTATUS::RUNNING;t->_func(t->Name());return nullptr;}public:Thread(func_t func) : _func(func){_name = "thread-" + std::to_string(num++); // 默认命名_status = TSTATUS::NEW;_joinable = true;_pid = getpid();}bool Start(){if (_status == TSTATUS::RUNNING)return true;int n = ::pthread_create(&_tid, nullptr, routine, this);if (n != 0)return false;return true;}bool Jion(){if (_joinable){int n = ::pthread_join(_tid, nullptr);if (n != 0)return false;_status = TSTATUS::STOP;return true;}return false;}bool Stop(){if (_status == TSTATUS::RUNNING){int n = ::pthread_cancel(_tid);if (n != 0)return false;_status = TSTATUS::STOP;return true;}return false;}bool Detach(){_joinable = false;int n = ::pthread_detach(_tid);if (n != 0)return false;return true;}bool IsJoinable(){return _joinable;}std::string Name(){return _name;}~Thread(){}private:std::string _name;pthread_t _tid;pid_t _pid;bool _joinable;func_t _func;TSTATUS _status;};
}

日志类后面会有一篇博客讲解…

http://www.dtcms.com/a/396092.html

相关文章:

  • 专业的家居行业网站制作视频素材网站推荐
  • C++第九篇:静态函数
  • 手机网站全屏代码莱芜都市网房产频道
  • 上海高品质网站建设公司揭阳东莞网站建设
  • Roo Code 的消息队列功能
  • Windows安全机制--模块执行防御
  • 网站开发 沈阳怎样查询江西省城乡建设厅网站
  • 导航网站备案河南网站推广那家好
  • 衡水网站优化最新远程网站建设服务器
  • 都匀网站开发的公司官方商城入口
  • 网站采用什么方法建设新春祝福图片在线制作
  • C++第六篇:对象的创建
  • 山东定制网站建设公司大型门户网站建设功能
  • 只做一种产品的网站省级建筑信息平台
  • ps做网站边框深圳的深圳的网站建设公司
  • 网站经常被黑宜昌最权威网站建设公司
  • 网站建设 创意视频爱站网站seo查询工具
  • 网站如何优化关键词排名做餐饮加盟的网站建设
  • 怎样进入网站管理系统快速排名优化推广价格
  • Python 类
  • 网站维护公司推荐合作营销
  • wordpress快站平台免费企业网站如何建设
  • 哪个网站可以做奖状住房与城乡建设部网站建造师
  • 电商网站前端架构设计全球最大的购物网站
  • 酒泉网站建设培训云虚机安装wordpress教程
  • 从事高端网站建设产品宣传
  • 网站制作多久能完成房地产型网站建设
  • 使用建造者模式创建对象
  • python判断与循环
  • 买了dede模板之后就可以做网站域名如何连接wordpress