Linux——线程池的模拟实现
文章目录
- 一、线程池的简单介绍
- 二、单个线程的封装
- 三、线程池封装
- 四、线程池的单例化
一、线程池的简单介绍
线程池是一种线程使用模式
线程过多会带来调度开销,进而影响缓存局部性和整体性能
而线程池维护着多个线程,等待着监督管理者分配可并发
执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价
线程池不仅能够保证内核的充分利用,还能防止过分调度
可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量
线程池的应用场景:
-
需要大量的线程来完成任务,且完成任务的时间比较短
比如WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。
但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了 -
对性能要求苛刻的应用,比如要求服务器迅速响应客户请求
-
接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用
突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.
如果来一个任务就创建一个线程,虽然不是不行,但是创建线程也需要花费时间
所以我们可以预先创建一个线程池,线程池里面维护了一批线程,他们在没有任务的时候会阻塞等待有任务的时候只需要唤醒它们就行了
一般线程池中的线程总个数不会太多,并且我们可以控制线程的个数
-
线程个数太多,其实效率提升也并不明显,因为可以并行的线程是有上限的(即cpu的核数有上限)
甚至可能因为切换调度成本上升,而效率下降 -
就算突然出现大量任务,线程池中的线程个数也不会有太大波动,只是会在任务队列里面堆积很多任务
这样可以一定程度上保证系统的稳定性 -
线程池适用于:任务量多,并且单个任务执行时长较短的场景
二、单个线程的封装
#ifndef _THREAD_H_
#define _THREAD_H_#include <iostream>
#include <pthread.h>
#include <string>
#include <cstdio>
#include <cstring>
#include <functional>
using namespace std;#include "Log.hpp"namespace ThreadModlue
{using namespace LogModule;static uint32_t number = 1;class Thread{using func_t = function<void()>;private:void EnableDetach(){cout << "线程被分离了" << endl;_isdetach = true;}void EnableRunning(){_isrunning = true;}//类的成员函数默认包含this指针,static里面没有,所有传this指针static void* Routinue(void* args){//获得this指针Thread* self = static_cast<Thread*>(args);self->EnableRunning();if(self->_isdetach)self->Detach();//设置线程的名字pthread_setname_np(self->_tid, self->_name.c_str());//回调处理self->_func();return nullptr;}public:Thread(func_t func): _tid(0),_isdetach(false),_isrunning(false),res(nullptr),_func(func){_name = "thread- " + to_string(number++);}void Detach(){if(_isdetach)return;if(_isrunning)pthread_detach(_tid);EnableDetach();}bool Start(){if(_isrunning){return false;}int n = pthread_create(&_tid, nullptr, Routinue, this);if(n != 0){cerr << "create thread error " << strerror(n) << endl;return false;}else{cout << _name << " create thread success" << endl;return true;}}bool Stop(){if(_isrunning){int n = pthread_cancel(_tid);if(n != 0){cerr << "cancel thread error " << strerror(n) << endl;return false;}else{_isrunning = false;cout << _name << " stop success" << endl;return true;}}}void Join(){if(_isdetach){cout << "你的线程已经是分离的了" << endl;return;}int n = pthread_join(_tid, &res);if(n != 0){LOG(LogLevel::DEBUG) << "Join thread error ";}else{LOG(LogLevel::DEBUG) << "Join success";}}string Name(){return _name;}~Thread(){}private:pthread_t _tid;string _name;bool _isdetach;bool _isrunning;void* res;func_t _func;};
}#endif
三、线程池封装
#pragma once#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include "Log.hpp"
#include "Thread.hpp"
#include "Cond.hpp"
#include "Mutex.hpp"namespace ThreadPoolModule
{using namespace ThreadModlue;using namespace LogModule;using namespace CondModule;using namespace MutexModue;static const int gnum = 5;template<typename T>class ThreadPool{private:void WakeUpAllThread(){LockGuard lockguard(_mutex);if(_sleepernum)_cond.Broadcast();LOG(LogLevel::INFO) << "唤醒所有的休眠线程";}void WakeUpOne(){_cond.Signal();LOG(LogLevel::INFO) << "唤醒一个休眠线程";}public:ThreadPool(int num = gnum) : _num(num), _isrunning(false), _sleepernum(0){for(int i = 0;i < num;i++){_threads.emplace_back([this](){HandlerTask();});}}void Start(){if(_isrunning){return;}_isrunning = true;for(auto& thread:_threads){thread.Start();LOG(LogLevel::INFO) << "start new thread success: " << thread.Name();}}ThreadPool(const ThreadPool<T>&) = delete;ThreadPool<T>& operator=(const ThreadPool<T>&) = delete;void HandlerTask(){char name[128];pthread_getname_np(pthread_self(), name, sizeof(name));while(true){T t;{LockGuard lockguard(_mutex);while(_taskq.empty() && _isrunning){_sleepernum++;_cond.Wait(_mutex);_sleepernum--;}//内部的线程被唤醒if(!_isrunning && _taskq.empty()){LOG(LogLevel::INFO) << name << "线程池退出 && 任务队列为空";break;}//一定有任务t = _taskq.front();//从q中获取任务,任务已经是线程私有的了_taskq.pop();}t();//处理任务// sleep(1);// LOG(LogLevel::DEBUG) << name << "is running hello mihayou";}}void Stop(){if(!_isrunning){return;}_isrunning = false;//唤醒所有线程WakeUpAllThread();}void Join(){for(auto& thread:_threads){thread.Join();}}bool Equeue(const T& in){if(_isrunning){LockGuard lockguard(_mutex);_taskq.push(in);if(_threads.size() == _sleepernum){WakeUpOne();}return true;}return false;}~ThreadPool(){}private:std::vector<Thread> _threads;int _num;//线程池中,线程的个数std::queue<T> _taskq;Cond _cond;Mutex _mutex;bool _isrunning;int _sleepernum;};}
四、线程池的单例化
我们之前说过,一个进程中的线程不应该太多,线程池的线程不应该太多
因为线程太多了并没有意义,对效率提升不明显,因为CPU的个数和核数就那么多
为什么要让线程池单例化
?
因为一个线程池如果设定它有5个线程,如果创建n个线程池,线程的个数就会有5n个
所以用户如果无意识地创建了多个线程池对象,会导致线程的个数偏多
所以我们直接让线程池单例化,让用户无论如何都最多只能创建一个线程池对象
线程池单例化的线程安全问题
饿汉模式
是在线程池类中,直接定义一个static修饰的线程池对象,在进程启动的时候就已经定义了,所以根本不可能存在线程安全的问题懒汉模式
是在线程池类中,定义一个static修饰指针,然后通过一个static修饰的成员函数new出那唯一的对象
因为static修饰的成员指针是共享资源,而且可能同时有多个线程去访问它,此时就可能因为线程安全问题,导致创建了多个线程池对象
所以我们要给那个获取对象的static成员函数加锁
//懒汉模式
static ThreadPool<T>* inc;//单例指针
static Mutex _lock;
但是如果那唯一的一个线程池对象已经被创建了的话,每次调用这个函数如果都还是要加锁才能获取到那唯一的一个对象的指针
效率就会很低,因为加锁之后是互斥的,也就是说不能并行调用成员函数获取线程池对象的指针
那怎么办?
其实线程安全问题出现在,最开始的时候,static线程池对象指针为nullptr的时候,可能会有多个线程同时通过if判断,进而创建出多个线程池对象
当这个唯一的一个线程池对象被new出来之后,再调用这个静态成员函数,就只是获取指针,并不会修改,而且其他的地方里面也不可能修改成功私有的成员指针,所以不会有线程安全问题
所以我们只需要在私有static线程池指针为空时,才加锁就行了
即:
template<typename T>
ThreadPool<T>* ThreadPool<T>::inc = nullptr;
//单例模式
template<typename T>
Mutex ThreadPool<T>::_lock;static ThreadPool<T>* GetInstance(){if(inc == nullptr){LockGuard lockguard(_lock);LOG(LogLevel::DEBUG) << "获取单例对象...";if(inc == nullptr){LOG(LogLevel::DEBUG) << "首次使用单例,创建...";inc = new ThreadPool<T>();inc->Start();}}return inc;}
#pragma once#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include "Log.hpp"
#include "Thread.hpp"
#include "Cond.hpp"
#include "Mutex.hpp"namespace ThreadPoolModule
{using namespace ThreadModlue;using namespace LogModule;using namespace CondModule;using namespace MutexModue;static const int gnum = 5;template<typename T>class ThreadPool{private:void WakeUpAllThread(){LockGuard lockguard(_mutex);if(_sleepernum)_cond.Broadcast();LOG(LogLevel::INFO) << "唤醒所有的休眠线程";}void WakeUpOne(){_cond.Signal();LOG(LogLevel::INFO) << "唤醒一个休眠线程";}private:ThreadPool(int num = gnum) : _num(num), _isrunning(false), _sleepernum(0){for(int i = 0;i < num;i++){_threads.emplace_back([this](){HandlerTask();});}}void Start(){if(_isrunning){return;}_isrunning = true;for(auto& thread:_threads){thread.Start();LOG(LogLevel::INFO) << "start new thread success: " << thread.Name();}}ThreadPool(const ThreadPool<T>&) = delete;ThreadPool<T>& operator=(const ThreadPool<T>&) = delete;public:static ThreadPool<T>* GetInstance(){if(inc == nullptr){LockGuard lockguard(_lock);LOG(LogLevel::DEBUG) << "获取单例对象...";if(inc == nullptr){LOG(LogLevel::DEBUG) << "首次使用单例,创建...";inc = new ThreadPool<T>();inc->Start();}}return inc;}void HandlerTask(){char name[128];pthread_getname_np(pthread_self(), name, sizeof(name));while(true){T t;{LockGuard lockguard(_mutex);while(_taskq.empty() && _isrunning){_sleepernum++;_cond.Wait(_mutex);_sleepernum--;}//内部的线程被唤醒if(!_isrunning && _taskq.empty()){LOG(LogLevel::INFO) << name << "线程池退出 && 任务队列为空";break;}//一定有任务t = _taskq.front();//从q中获取任务,任务已经是线程私有的了_taskq.pop();}t();//处理任务// sleep(1);// LOG(LogLevel::DEBUG) << name << "is running hello mihayou";}}void Stop(){if(!_isrunning){return;}_isrunning = false;//唤醒所有线程WakeUpAllThread();}void Join(){for(auto& thread:_threads){thread.Join();}}bool Equeue(const T& in){if(_isrunning){LockGuard lockguard(_mutex);_taskq.push(in);if(_threads.size() == _sleepernum){WakeUpOne();}return true;}return false;}~ThreadPool(){}private:std::vector<Thread> _threads;int _num;//线程池中,线程的个数std::queue<T> _taskq;Cond _cond;Mutex _mutex;bool _isrunning;int _sleepernum;//懒汉模式static ThreadPool<T>* inc;//单例指针static Mutex _lock;};template<typename T>ThreadPool<T>* ThreadPool<T>::inc = nullptr;//单例模式template<typename T>Mutex ThreadPool<T>::_lock;}