当前位置: 首页 > news >正文

Linux——线程池的模拟实现

文章目录

  • 一、线程池的简单介绍
  • 二、单个线程的封装
  • 三、线程池封装
  • 四、线程池的单例化

一、线程池的简单介绍

线程池是一种线程使用模式

线程过多会带来调度开销,进而影响缓存局部性和整体性能
而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价
线程池不仅能够保证内核的充分利用,还能防止过分调度
可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量

线程池的应用场景:

  • 需要大量的线程来完成任务,且完成任务的时间比较短

    比如WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。
    但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了

  • 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求

  • 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用

    突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.

如果来一个任务就创建一个线程,虽然不是不行,但是创建线程也需要花费时间
所以我们可以预先创建一个线程池,线程池里面维护了一批线程,他们在没有任务的时候会阻塞等待有任务的时候只需要唤醒它们就行了
在这里插入图片描述

一般线程池中的线程总个数不会太多,并且我们可以控制线程的个数

  • 线程个数太多,其实效率提升也并不明显,因为可以并行的线程是有上限的(即cpu的核数有上限)
    甚至可能因为切换调度成本上升,而效率下降

  • 就算突然出现大量任务,线程池中的线程个数也不会有太大波动,只是会在任务队列里面堆积很多任务
    这样可以一定程度上保证系统的稳定性

  • 线程池适用于:任务量多,并且单个任务执行时长较短的场景

二、单个线程的封装

#ifndef _THREAD_H_
#define _THREAD_H_#include <iostream>
#include <pthread.h> 
#include <string>
#include <cstdio>
#include <cstring>
#include <functional>
using namespace std;#include "Log.hpp"namespace ThreadModlue
{using namespace LogModule;static uint32_t number = 1;class Thread{using func_t = function<void()>;private:void EnableDetach(){cout << "线程被分离了" << endl;_isdetach = true;}void EnableRunning(){_isrunning = true;}//类的成员函数默认包含this指针,static里面没有,所有传this指针static void* Routinue(void* args){//获得this指针Thread* self = static_cast<Thread*>(args);self->EnableRunning();if(self->_isdetach)self->Detach();//设置线程的名字pthread_setname_np(self->_tid, self->_name.c_str());//回调处理self->_func();return nullptr;}public:Thread(func_t func): _tid(0),_isdetach(false),_isrunning(false),res(nullptr),_func(func){_name = "thread- " + to_string(number++);}void Detach(){if(_isdetach)return;if(_isrunning)pthread_detach(_tid);EnableDetach();}bool Start(){if(_isrunning){return false;}int n = pthread_create(&_tid, nullptr, Routinue, this);if(n != 0){cerr << "create thread error " <<  strerror(n) << endl;return false;}else{cout << _name << " create thread success" << endl;return true;}}bool Stop(){if(_isrunning){int n = pthread_cancel(_tid);if(n != 0){cerr << "cancel thread error " << strerror(n) << endl;return false;}else{_isrunning = false;cout << _name << " stop success" << endl;return true;}}}void Join(){if(_isdetach){cout << "你的线程已经是分离的了" << endl;return;}int n = pthread_join(_tid, &res);if(n != 0){LOG(LogLevel::DEBUG) << "Join thread error ";}else{LOG(LogLevel::DEBUG) << "Join success";}}string Name(){return _name;}~Thread(){}private:pthread_t _tid;string _name;bool _isdetach;bool _isrunning;void* res;func_t _func;};
}#endif

三、线程池封装

#pragma once#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include "Log.hpp"
#include "Thread.hpp"
#include "Cond.hpp"
#include "Mutex.hpp"namespace ThreadPoolModule
{using namespace ThreadModlue;using namespace LogModule;using namespace CondModule;using namespace MutexModue;static const int gnum = 5;template<typename T>class ThreadPool{private:void WakeUpAllThread(){LockGuard lockguard(_mutex);if(_sleepernum)_cond.Broadcast();LOG(LogLevel::INFO) << "唤醒所有的休眠线程";}void WakeUpOne(){_cond.Signal();LOG(LogLevel::INFO) << "唤醒一个休眠线程";}public:ThreadPool(int num = gnum) : _num(num), _isrunning(false), _sleepernum(0){for(int i = 0;i < num;i++){_threads.emplace_back([this](){HandlerTask();});}}void Start(){if(_isrunning){return;}_isrunning = true;for(auto& thread:_threads){thread.Start();LOG(LogLevel::INFO) << "start new thread success: " << thread.Name();}}ThreadPool(const ThreadPool<T>&) = delete;ThreadPool<T>& operator=(const ThreadPool<T>&) = delete;void HandlerTask(){char name[128];pthread_getname_np(pthread_self(), name, sizeof(name));while(true){T t;{LockGuard lockguard(_mutex);while(_taskq.empty() && _isrunning){_sleepernum++;_cond.Wait(_mutex);_sleepernum--;}//内部的线程被唤醒if(!_isrunning && _taskq.empty()){LOG(LogLevel::INFO) << name << "线程池退出 && 任务队列为空";break;}//一定有任务t = _taskq.front();//从q中获取任务,任务已经是线程私有的了_taskq.pop();}t();//处理任务// sleep(1);// LOG(LogLevel::DEBUG) << name << "is running hello mihayou";}}void Stop(){if(!_isrunning){return;}_isrunning = false;//唤醒所有线程WakeUpAllThread();}void Join(){for(auto& thread:_threads){thread.Join();}}bool Equeue(const T& in){if(_isrunning){LockGuard lockguard(_mutex);_taskq.push(in);if(_threads.size() == _sleepernum){WakeUpOne();}return true;}return false;}~ThreadPool(){}private:std::vector<Thread> _threads;int _num;//线程池中,线程的个数std::queue<T> _taskq;Cond _cond;Mutex _mutex;bool _isrunning;int _sleepernum;};}

四、线程池的单例化

我们之前说过,一个进程中的线程不应该太多,线程池的线程不应该太多
因为线程太多了并没有意义,对效率提升不明显,因为CPU的个数和核数就那么多

为什么要让线程池单例化
因为一个线程池如果设定它有5个线程,如果创建n个线程池,线程的个数就会有5n个

所以用户如果无意识地创建了多个线程池对象,会导致线程的个数偏多

所以我们直接让线程池单例化,让用户无论如何都最多只能创建一个线程池对象

线程池单例化的线程安全问题

  • 饿汉模式是在线程池类中,直接定义一个static修饰的线程池对象,在进程启动的时候就已经定义了,所以根本不可能存在线程安全的问题
  • 懒汉模式是在线程池类中,定义一个static修饰指针,然后通过一个static修饰的成员函数new出那唯一的对象

因为static修饰的成员指针是共享资源,而且可能同时有多个线程去访问它,此时就可能因为线程安全问题,导致创建了多个线程池对象

所以我们要给那个获取对象的static成员函数加锁

//懒汉模式
static ThreadPool<T>* inc;//单例指针
static Mutex _lock;

但是如果那唯一的一个线程池对象已经被创建了的话,每次调用这个函数如果都还是要加锁才能获取到那唯一的一个对象的指针
效率就会很低,因为加锁之后是互斥的,也就是说不能并行调用成员函数获取线程池对象的指针

那怎么办?
其实线程安全问题出现在,最开始的时候,static线程池对象指针为nullptr的时候,可能会有多个线程同时通过if判断,进而创建出多个线程池对象

当这个唯一的一个线程池对象被new出来之后,再调用这个静态成员函数,就只是获取指针,并不会修改,而且其他的地方里面也不可能修改成功私有的成员指针,所以不会有线程安全问题

所以我们只需要在私有static线程池指针为空时,才加锁就行了
即:

template<typename T>
ThreadPool<T>* ThreadPool<T>::inc = nullptr;
//单例模式
template<typename T>
Mutex ThreadPool<T>::_lock;static ThreadPool<T>* GetInstance(){if(inc == nullptr){LockGuard lockguard(_lock);LOG(LogLevel::DEBUG) << "获取单例对象...";if(inc == nullptr){LOG(LogLevel::DEBUG) << "首次使用单例,创建...";inc = new ThreadPool<T>();inc->Start();}}return inc;}
#pragma once#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include "Log.hpp"
#include "Thread.hpp"
#include "Cond.hpp"
#include "Mutex.hpp"namespace ThreadPoolModule
{using namespace ThreadModlue;using namespace LogModule;using namespace CondModule;using namespace MutexModue;static const int gnum = 5;template<typename T>class ThreadPool{private:void WakeUpAllThread(){LockGuard lockguard(_mutex);if(_sleepernum)_cond.Broadcast();LOG(LogLevel::INFO) << "唤醒所有的休眠线程";}void WakeUpOne(){_cond.Signal();LOG(LogLevel::INFO) << "唤醒一个休眠线程";}private:ThreadPool(int num = gnum) : _num(num), _isrunning(false), _sleepernum(0){for(int i = 0;i < num;i++){_threads.emplace_back([this](){HandlerTask();});}}void Start(){if(_isrunning){return;}_isrunning = true;for(auto& thread:_threads){thread.Start();LOG(LogLevel::INFO) << "start new thread success: " << thread.Name();}}ThreadPool(const ThreadPool<T>&) = delete;ThreadPool<T>& operator=(const ThreadPool<T>&) = delete;public:static ThreadPool<T>* GetInstance(){if(inc == nullptr){LockGuard lockguard(_lock);LOG(LogLevel::DEBUG) << "获取单例对象...";if(inc == nullptr){LOG(LogLevel::DEBUG) << "首次使用单例,创建...";inc = new ThreadPool<T>();inc->Start();}}return inc;}void HandlerTask(){char name[128];pthread_getname_np(pthread_self(), name, sizeof(name));while(true){T t;{LockGuard lockguard(_mutex);while(_taskq.empty() && _isrunning){_sleepernum++;_cond.Wait(_mutex);_sleepernum--;}//内部的线程被唤醒if(!_isrunning && _taskq.empty()){LOG(LogLevel::INFO) << name << "线程池退出 && 任务队列为空";break;}//一定有任务t = _taskq.front();//从q中获取任务,任务已经是线程私有的了_taskq.pop();}t();//处理任务// sleep(1);// LOG(LogLevel::DEBUG) << name << "is running hello mihayou";}}void Stop(){if(!_isrunning){return;}_isrunning = false;//唤醒所有线程WakeUpAllThread();}void Join(){for(auto& thread:_threads){thread.Join();}}bool Equeue(const T& in){if(_isrunning){LockGuard lockguard(_mutex);_taskq.push(in);if(_threads.size() == _sleepernum){WakeUpOne();}return true;}return false;}~ThreadPool(){}private:std::vector<Thread> _threads;int _num;//线程池中,线程的个数std::queue<T> _taskq;Cond _cond;Mutex _mutex;bool _isrunning;int _sleepernum;//懒汉模式static ThreadPool<T>* inc;//单例指针static Mutex _lock;};template<typename T>ThreadPool<T>* ThreadPool<T>::inc = nullptr;//单例模式template<typename T>Mutex ThreadPool<T>::_lock;}
http://www.dtcms.com/a/303448.html

相关文章:

  • 解决c++静态成员编译报错:‘xxx‘ is not a member of ‘xxx‘ 问题
  • 第五届先进算法与神经网络国际学术会议(AANN 2025)
  • vue项目进首页不加载全部资源
  • 【数据结构初阶】--二叉树(三)
  • ICDC自动化部署方案概述
  • 如何规范化项目执行
  • 2024年7月19日全国青少年信息素养大赛图形化(Scratch)编程小学低年级组复赛真题+答案解析
  • KubeSphere离线部署Kubernetes集群
  • “量子通信”
  • 系统远程配置
  • 概率有限自动机定义与示例
  • 智慧社区项目开发(二)——基于 JWT 的登录验证功能实现详解
  • 吃透 lambda 表达式(匿名函数)
  • mysql详细知识点
  • python中类变量 __slots__ 解析
  • Matplotlib(三)- 图表辅助元素
  • Vue3判断对象是否为空方法
  • 飞鹤困局:增长神话的裂痕
  • 嵌软面试——通信协议
  • 7.项目起步(1)
  • 1.vue体验
  • 快速构建基于React.js的用户注册与登录的Web应用程序
  • vue element 封装表单
  • 代码随想录算法训练营第三十三天
  • 7.28PBR技术
  • Linux系统编程——数据库
  • 介绍一下static关键字
  • Sum-rate计算
  • 【代码解读】通义万相最新视频生成模型 Wan 2.2 实现解析
  • 同态滤波算法详解:基于频域变换的光照不均匀校正