手撕线程池
文章目录
- 线程池
- 1. 概念
- 2. 线程池的应用场景
- 3. 手撕线程池
- Thread.hpp
- LockGuard.hpp
- ThreadPool.hpp
- Task.hpp
- Main.cpp
- 执行结果
- 4. 懒汉方式实现单例模式的线程池
- ThreadPool.hpp
- Main.cpp
- 执行结果
线程池
1. 概念
线程池是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着
监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。
线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络 sockets 等的数量。
2. 线程池的应用场景
- 需要大量的线程来完成任务,且完成任务的时间比较短。 web 服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个 Telnet 连接请求,线程池的优点就不明显了。因为 Telnet 会话时间比线程的创建时间大多了。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。
3. 手撕线程池
目标:
- 创建固定数量线程池,循环从任务队列中获取任务对象。
- 获取到任务对象后,执行任务对象中的任务接口。
Thread.hpp
封装一个线程接口
// hpp文件可以把类的声明和定义放在一起
#pragma once#include <iostream>
#include <functional>
#include <string>
#include <cstdio>
#include <cstring>
#include <cstdlib>
#include <cassert>
#include <pthread.h>
#include <unistd.h>
using namespace std;// 把Thread封装在命名空间里
namespace ThreadNS
{using func_t = function<void *(void *)>;const int num = 1024;// 简单的一个线程封装class Thread{private:// 在类内创建线程, 想让线程执行对应的方法, 我们需要将方法设置为staic, 因为static没有所谓的this指针static void *start_routine(void *args){Thread *_this = static_cast<Thread *>(args);void *ret = _this->callback();}public:// 构造函数Thread(){// 推荐用下面的C接口写法char namebuffer[num];snprintf(namebuffer, sizeof namebuffer, "thread-%d", _threadnum++);_name = namebuffer;}// 线程启动void start(func_t func, void *args = nullptr){_func = func;_args = args;int n = pthread_create(&_tid, nullptr, start_routine, this);assert(n == 0);(void)n;}// 线程等待void join(){int n = pthread_join(_tid, nullptr);assert(0 == n);(void)n;}// 析构函数~Thread(){// do nothing...}// 获取线程名字string threadname(){return _name;}// 回调方法void *callback(){return _func(_args);}private:// 类的成员变量全部带_string _name; // 线程名字func_t _func; // 线程执行的函数void *_args; // 线程的第四个参数pthread_t _tid; // 线程idstatic int _threadnum; // 线程编号};int Thread::_threadnum = 1;
}
LockGuard.hpp
封装一把锁
#pragma once// 封装一把锁#include <iostream>
#include <pthread.h>class Mutex
{
public:// 构造函数Mutex(pthread_mutex_t *lock_p = nullptr) : _lock_p(lock_p){}// 加锁void lock(){if (_lock_p) // 如果锁不为空, 那么就加锁{pthread_mutex_lock(_lock_p);}}// 解锁void unlock(){if (_lock_p) // 如果锁不为空, 那么就解锁{pthread_mutex_unlock(_lock_p);}}// 析构函数~Mutex(){}private:pthread_mutex_t *_lock_p;
};//
class LockGuard
{
public:LockGuard(pthread_mutex_t* mutex) :_mutex(mutex){_mutex.lock(); // 在构造函数中进行加锁}~LockGuard(){_mutex.unlock(); // 在析构函数中进行解锁}
private:Mutex _mutex;
};
ThreadPool.hpp
线程池代码实现
#pragma once/*
名称: 设计一套线程池:目的: 一个用户他只需要向线程池当中去 push 任务, 或者是把自己的任务交给线程池, 然后他就不用管了,接着, 这个任务最终可以被指定的线程去获取并处理。
*/#include "Thread.hpp"
#include "LockGuard.hpp"
#include <vector>
#include <queue>
#include <pthread.h>using namespace ThreadNS;
using namespace std;const int gnum = 3; // 初始化5个线程// 声明
template <class T>
class ThreadPool;// 定义线程数据结构体
template <class T>
class ThreadData
{
public:ThreadPool<T> *threadpool;string name;public:ThreadData(ThreadPool<T> *tp, const string &n) : threadpool(tp), name(n) {}
};template <class T>
class ThreadPool
{
private:// 线程执行的方法static void* handlerTask(void* args){ThreadData<T>* td = (ThreadData<T> *)args;while (true){T t;{// td->threadpool->lockQueue(); // 加锁LockGuard lockguard(td->threadpool->mutex()); // 加锁while (td->threadpool->isQueueEmpty()) // 检测临界资源中是否有资源{td->threadpool->threadWait(); // 如果没有, 就先休眠等待}t = td->threadpool->pop(); // pop的本质是: 将任务从公共队列中, 拿到当前线程自己独立的栈中}cout << td->name << " 获取了一个任务: " << t.toTaskString() << " 并处理完成, 结果是: " << t() << endl; // 然后处理任务, 此时会调用 Task.hpp 中的 operator()() 仿函数, 直接打印}delete td;return nullptr;}public: // 封装所有方法// 加锁方法void lockQueue() { pthread_mutex_lock(&_mutex); } // 解锁方法void unlockQueue() { pthread_mutex_unlock(&_mutex); } // 队列判空bool isQueueEmpty() { return _taskQueue.empty(); }// 线程等待void threadWait() { pthread_cond_wait(&_cond, &_mutex); }// 获取任务T pop() { T t = _taskQueue.front(); _taskQueue.pop();return t;}// 获取锁的地址pthread_mutex_t *mutex() { return &_mutex; }public:ThreadPool(const int& num = gnum) : _num(num){ // 初始化: 锁、条件变量、线程pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);for (int i = 0; i < _num; ++i){_threads.push_back(new ThreadNS::Thread()); // 把new出来的Thread对象尾插到vector中}}// 线程启动void run(){for (const auto& t : _threads){ThreadData<T>* td = new ThreadData<T>(this, t->threadname());t->start(handlerTask, td);cout << t->threadname() << " start ..." << endl;}}// 往线程池中添加任务void Push(const T& in){LockGuard lockguard(&_mutex);_taskQueue.push(in); // 往队列(临界资源)中添加任务pthread_cond_signal(&_cond); // 去唤醒在特定条件变量下等的线程}~ThreadPool(){// 销毁: 锁、条件变量、线程pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);for (const auto& t : _threads){delete t;}}private:int _num; // 线程池中的线程个数vector<Thread*> _threads; // 用vector来管理这一批线程queue<T> _taskQueue; // 任务队列(临界资源)pthread_mutex_t _mutex; // 定义锁pthread_cond_t _cond; // 定义条件变量
};
Task.hpp
创建任务
#pragma once#include <iostream>
#include <string>
#include <functional>
#include <cstdio>using namespace std;// 构建一个任务, 放入阻塞队列中
class Task
{using func_t = function<int(int, int, char)>;
public:Task() {}Task(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _callback(func){}string operator()(){int ret = _callback(_x, _y, _op);char buffer[1024];snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, ret);return buffer;}string toTaskString(){char buffer[1024];snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);return buffer;}
private:int _x;int _y;char _op;func_t _callback; // 回调方法
};const string oper = "+-*/%";
int mymath(int x, int y, char op)
{int ret = 0;switch (op){case '+':ret = x + y;break;case '-':ret = x - y;break;case '*':ret = x * y;break;case '/':{if (y == 0){cerr << "div zero error!" << endl;ret = -1;}else{ret = x / y;}}break;case '%':{if (y == 0){cerr << "mod zero error!" << endl;ret = -1;}else{ret = x % y;}}break;default:// do nothingbreak;}return ret;
}
Main.cpp
主函数代码如下
#include "ThreadPool.hpp"
#include "Task.hpp"
#include "unistd.h"
#include <memory>/*
相当于我们写了一个小组件: 线程池, 它是生产者消费者模型。
你只需要把任务push到这个线程池里,那么线程池当中的线程会自动去进行处理。
因为我们有多个线程, 我自己手动的去给多线程派发任务, 那么最终每个线程没有任务的时候, 大家都在等; 有任务的时候,大家再处理.
*/int main()
{unique_ptr< ThreadPool<Task> > tp(new ThreadPool<Task>());tp->run();int x, y;char op;while (1){cout << "请输入数据1# ";cin >> x;cout << "请输入数据2# ";cin >> y;cout << "请输入你要进行的运算# ";cin >> op;Task t(x, y, op, mymath);tp->Push(t);sleep(1);}return 0;
}
执行结果
我这里设置线程数量只有 3 个,可以看到它们是依次轮流来获取任务的:
4. 懒汉方式实现单例模式的线程池
这里选择为把线程池改为 单例模式,只需要改动 ThreadPool.hpp
和 Main.cpp
即可。
ThreadPool.hpp
代码实现
#pragma once/*
名称: 设计一套线程池:目的: 一个用户他只需要向线程池当中去 push 任务, 或者是把自己的任务交给线程池, 然后他就不用管了,接着, 这个任务最终可以被指定的线程去获取并处理。
*/#include "Thread.hpp"
#include "LockGuard.hpp"
#include <vector>
#include <queue>
#include <mutex>
#include <pthread.h>using namespace ThreadNS;
using namespace std;const int gnum = 3; // 初始化5个线程// 声明
template <class T>
class ThreadPool;// 定义线程数据结构体
template <class T>
class ThreadData
{
public:ThreadPool<T> *threadpool;string name;public:ThreadData(ThreadPool<T> *tp, const string &n) : threadpool(tp), name(n) {}
};template <class T>
class ThreadPool
{
private:// 线程执行的方法static void* handlerTask(void* args){ThreadData<T>* td = (ThreadData<T> *)args;while (true){T t;{// td->threadpool->lockQueue(); // 加锁LockGuard lockguard(td->threadpool->Mutex()); // 加锁while (td->threadpool->isQueueEmpty()) // 检测临界资源中是否有资源{td->threadpool->threadWait(); // 如果没有, 就先休眠等待}t = td->threadpool->pop(); // pop的本质是: 将任务从公共队列中, 拿到当前线程自己独立的栈中}cout << td->name << " 获取了一个任务: " << t.toTaskString() << " 并处理完成, 结果是: " << t() << endl; // 然后处理任务, 此时会调用 Task.hpp 中的 operator()() 仿函数, 直接打印}delete td;return nullptr;}// 单例模式 --> 把构造函数设置成静态方法ThreadPool(const int& num = gnum) : _num(num){ // 初始化: 锁、条件变量、线程pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);for (int i = 0; i < _num; ++i){_threads.push_back(new ThreadNS::Thread()); // 把new出来的Thread对象尾插到vector中}}void operator=(const ThreadPool& ) = delete; // 删除赋值语句ThreadPool(const ThreadPool& ) = delete; // 删除拷贝构造public: // 封装所有方法// 加锁方法void lockQueue() { pthread_mutex_lock(&_mutex); } // 解锁方法void unlockQueue() { pthread_mutex_unlock(&_mutex); } // 队列判空bool isQueueEmpty() { return _taskQueue.empty(); }// 线程等待void threadWait() { pthread_cond_wait(&_cond, &_mutex); }// 获取任务T pop() { T t = _taskQueue.front(); _taskQueue.pop();return t;}// 获取锁的地址pthread_mutex_t *Mutex() { return &_mutex; }public:// 线程启动void run(){for (const auto& t : _threads){ThreadData<T>* td = new ThreadData<T>(this, t->threadname());t->start(handlerTask, td);cout << t->threadname() << " start ..." << endl;}}// 往线程池中添加任务void Push(const T& in){LockGuard lockguard(&_mutex);_taskQueue.push(in); // 往队列(临界资源)中添加任务pthread_cond_signal(&_cond); // 去唤醒在特定条件变量下等的线程}~ThreadPool(){// 销毁: 锁、条件变量、线程pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);for (const auto& t : _threads){delete t;}}// 单例模式static ThreadPool<T> *getInstance(){if (nullptr == tp){_siglock.lock(); // 加锁if (nullptr == tp){tp = new ThreadPool<T>();}_siglock.unlock(); // 解锁}return tp;}private:int _num; // 线程池中的线程个数vector<Thread*> _threads; // 用vector来管理这一批线程queue<T> _taskQueue; // 任务队列(临界资源)pthread_mutex_t _mutex; // 定义锁pthread_cond_t _cond; // 定义条件变量static ThreadPool<T>* tp; // 定义一个静态的指针static std::mutex _siglock; // 定义一个静态的锁(c++11的锁)
};// 静态成员要在类外面初始化
template <class T>
ThreadPool<T>* ThreadPool<T>::tp = nullptr;template <class T>
mutex ThreadPool<T>::_siglock;
Main.cpp
代码实现
#include "ThreadPool.hpp"
#include "Task.hpp"
#include "unistd.h"
#include <memory>int main()
{// 获取单例ThreadPool<Task>::getInstance()->run();int x, y;char op;while (1){cout << "请输入数据1# ";cin >> x;cout << "请输入数据2# ";cin >> y;cout << "请输入你要进行的运算# ";cin >> op;Task t(x, y, op, mymath);ThreadPool<Task>::getInstance()->Push(t);sleep(1);}return 0;
}
执行结果
可以看到和之前的结果是一样的: