C++线程池实现
C++线程池实现
- 知识点补充
- 为什么要实现线程池
- 线程池的实现过程
知识点补充
在C++11中引入了对线程库的支持,接下来我们介绍一下跟线程相关的一些知识点:
线程对象的构造方式
在C++11中主要提供的三种线程的构造方式:无参构造、带参构造和调用移动构造函数。
#include <iostream>
#include <thread>void Func1(int a)
{std::cout << "void Func1(int a)" << std::endl;
}int main()
{std::thread t1; // 无参进行构造std::thread t2(Func1, 1); // 带参进行构造// t1 = t2; /*t2线程赋给t1线程,这样是不可取的因为在线程库底层实现过程中是将拷贝构造函数设置成delete的*/t1 = std::move(t2); // 使用移动构造函数进行构造std::cout << t1.get_id() << std::endl;/*注意:此时的t2线程是获取不到对应的id的因为他的状态已经转移给其他的线程对象了同时,也不可去进行join了,会系统报错*/// std::cout << t2.get_id() << std::endl;// std::cout << t2.joinable() << std::endl; == 0t1.join();// t2.join();return 0;
}
在使用的过程中我们需要注意几点:
- thread类是防拷贝的,不允许拷贝构造和拷贝赋值,但是可以移动构造和移动赋值,可以将一个线程对象关联线程的状态转移给其他线程对象,并且转移期间不影响线程的执行。
- 在移动构造或者移动赋值以后,此时一个线程对象关联线程的状态转移给其他线程对象,就不能在进行join了;
- detach调用之后,目标线程就成为了守护线程,驻留后台运行,与之关联的std::thread对象
失去对目标线程的关联,无法再通过std::thread对象取得该线程的控制权。当线程主函数执
行完之后,线程就结束了,运行时库负责清理与该线程相关的资源,如下所示:
void Func1(int a)
{std::cout << "void Func1(int a)" << std::endl;
}void Func2(int a)
{std::cout << "void Func2(int a)" << std::endl;
}int main()
{std::cout << "main start >>>>>>>>>>>>>>>" << std::endl;std::thread t1(Func1, 1);t1.join();std::thread t2(Func2, 2);// std::this_thread::sleep_for(std::chrono::milliseconds(5));/*如果在这儿不进行睡眠的话,会发现当前线程就不再被控制,他的打印也就根据他执行的时机和次数而定*/t2.detach();std::cout << "main end >>>>>>>>>>>>>>>" << std::endl;return 0;
}
当然,我们对应的线程传入的对参数也可以是类函数的,如下所示:
class A
{
public:static void Func4(int a){std::cout << "void Func4(int a)->" << a << std::endl;}
};int main()
{A* newA = new A();std::thread t1(A::Func4, 10);t1.join();return 0;
}
对于互斥量以及条件变量可以参考之前的文章:C++11之线程库
std::aysnc和std::future
std::aysnc
表示异步运行某个任务函数,而std::future
异步指向某个任务,然后通过 future 特性去获取任务函数的返回结果。
std::future
期待一个返回,从一个异步调用的角度来说,future 更像是执行函数的返回值,C++标准库使用std::future
为一次性事件建模,如果一个事件需要等待特定的一次性事件,那么这线程可以获取一个 future 对象来代表这个事件。
异步调用往往不知道何时返回,但是如果异步调用的过程需要同步,或者说后一个异步调用需要使前一个异步调用的结果。这个时候就要用到 future。
线程可以周期性的在这个 future 上等待一小段时间,检查future是否已经ready,如果没有,该线程可以先去做另一个任务,一旦 future 就绪,该 future 就无法复位(无法再次使用这个 future 等待这个事件),所以 future 代表的是一次性事件。
std::async
返回一个std::future
对象,而不是给你一个确定的值(所以当你不需要立刻使用此值的时才需要用到这个机制)。当你需要使用这个值的时候,对 future 使用get()
,线程就会阻塞直到 future 绪,然后返回该值。
我们用一下一段代码来理解一下:
int find_result1(int a)
{std::this_thread::sleep_for(std::chrono::seconds(5));std::cout << "thread id find_result1: " << std::this_thread::get_id() << std::endl;std::cout << "void find_result1(int a)->" << a << std::endl;return a;
}int find_result2(int a, int b)
{std::cout << "thread id find_result2: " << std::this_thread::get_id() << std::endl;std::cout << "void find_result1(int a)->" << a + b << std::endl;return a + b;
}void do_other_things()
{std::cout << "thread id do_other_things: " << std::this_thread::get_id() << std::endl;std::cout << "do_other_things !!!" << std::endl;
}int main()
{// std::future<int> result1 = std::async(find_result1, 1);std::future<decltype(find_result1(0))> result1 = std::async(find_result1, 1);// 两种写法,更推荐使用第二种do_other_things();std::cout << "result1: " << result1.get() << std::endl;return 0;
}
std::future
是 C++11 标准库(并发支持库)中的一个模板类,它表示一个异步操作的结果。当我们在多线程编程中使用异步任务时,std::future
可以帮助我们在需要的时候获取任务的执行结果。
作用
- 异步操作的结果获取:
std::future
提供了一种机制,允许我们在多线程环境中安全地获取异步操作的结果。 - 隐藏异步操作的细节:
std::future
将异步操作的结果封装起来,使程序员无需关注线程同步和通信的具体实现细节。 - 线程同步:通过阻塞等待异步操作完成,
std::future
可以确保我们在继续执行其他操作之前,已经获取了所需的结果。 - 异常处理:
std::future
可以捕获异步操作中抛出的异常,并在获取结果时重新抛出,也可以在主线程中对其进行处理,从而使得异常处理更加简单。 - 提高性能:
std::future
使得我们能够更好地利用多核处理器的性能,通过并行执行任务来提高程序的执行效率。
当我们需要在后台执行一些耗时操作时,如文件读写、网络请求或计算密集型任务,std::future
可以用来表示这些异步任务的结果。通过将任务与主线程分离,我们可以实现任务的并行处理,从而提高程序的执行效率。或者是在多线程编程中,我们可能需要等待某些任务完成后才能继续执行其他操作。通过使用std::future
,我们可以实现线程之间的同步,确保任务完成后再获取结果并继续执行后续操作。
std::promise
std::promise
提供了一种设置值的方式,它可以在这之后通过相关联的std::future
对象进行读取。换种
说法,之前已经说过std::future
可以读取一个异步函数的返回值了,那么这个std::promise
就提供一种
方式手动让 future 就绪。
看下面这段代码:
void print(std::promise<std::string>& p)
{std::this_thread::sleep_for(std::chrono::seconds(5));std::cout << "thread id print: " << std::this_thread::get_id() << std::endl;// 将结果设置到promise当中去p.set_value("There is the result which you want !!!");
}void do_other_things()
{std::cout << "thread id do_other_things: " << std::this_thread::get_id() << std::endl;std::cout << "do_other_things !!!" << std::endl;
}int main()
{std::promise<std::string> promise;std::future<std::string> result = promise.get_future();// 创建一个线程执行当前任务std::thread t(print, std::ref(promise));// 执行其他任务do_other_things();std::cout << result.get() << std::endl;t.join();return 0;
}
由此可以看出在 promise 创建好的时候 future 也已经创建好了线程在创建 promise 的同时会获得一个future,然后将 promise 传递给设置他的线程,当前线程则持有 future,以便随时检查是否可以取值。
std::packaged_task
如果说std::async
和std::future
还是分开看的关系的话,那么std::packaged_task
就是将任务和 future 绑定在一起的模板,是一种封装对任务的封装。
可以通过std::packaged_task
对象获取任务相关联的 future,调用get_future()方法可以获得
std::packaged_task对象绑定的函数的返回值类型的future。std::packaged_task的模板参数是函数签
名。例如:int add(int a, intb)
的函数签名就是int(int, int)
int add(int a, int b, int c)
{std::this_thread::sleep_for(std::chrono::seconds(5));std::cout << "call add\n";return a + b + c;
}void do_other_things()
{std::cout << "thread id do_other_things: " << std::this_thread::get_id() << std::endl;std::cout << "do_other_things !!!" << std::endl;
}int main()
{std::packaged_task<int(int, int, int)> task(add); // 封装任务// 执行其他任务do_other_things();std::future<int> result = task.get_future();// task(1, 2, 3); // 必须让任务执行起来,否则在get()获取future值时会一直阻塞std::cout << "result: " << result.get() << std::endl;return 0;
}
为什么要实现线程池
线程池(Thread Pool)是一种并发编程中常用的技术,用于管理和重用线程。它由线程池管理器、工作队列和线程池线程组成。
线程池的基本概念是,在应用程序启动时创建一定数量的线程,并将它们保存在线程池中。当需要执行任务时,从线程池中获取一个空闲的线程,将任务分配给该线程执行。当任务执行完毕后,线程将返回到线程池,可以被其他任务复用。
我们试想,如果某类任务的处理时间特别长,但是当前线程需要执行的任务也很多,那么就会导致没有执行完当前任务就无法执行其他任务的现象,极大的降低了工作效率,此时,使用线程池就可以解决掉这个问题,我们可以让其他线程来异步执行这个任务,这样也就极大地提高了效率。
而且,大量的线程的创建跟销毁会影响系统的性能与资源利用率的,程池的设计思想是为了避免频繁地创建和销毁线程的开销,以及控制并发执行的线程数量,从而提高系统的性能和资源利用率。
线程池的实现过程
以下就是线程池的实现代码:
threadPool.h
#ifndef __M_THREADPOOL_H__
#define __M_THREADPOOL_H__#include <thread>
#include <iostream>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <atomic>
#include <functional>
#include <future>
#include <memory>
#include <string>const int NUM = 5; // 默认线程数量5个// template <class T>
class ThreadPool
{
protected:struct Task{Task(){ }std::function<void()> _func;};typedef std::shared_ptr<Task> taskPtr;public:ThreadPool();virtual ~ThreadPool();// 用线程池启用任务// 返回任务的future对象, 可以通过这个对象来获取返回值template <class F, class... Args>auto exec(F &&f, Args &&...args) -> std::future<decltype(f(args...))>{return exec1(f, args...);}template <class F, class... Args>auto exec1(F &&f, Args &&...args) -> std::future<decltype(f(args...))>{// 定义返回值类型using RetType = decltype(f(args...)); // 推导返回值// 封装任务auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));// 封装任务指针taskPtr tPtr = std::make_shared<Task>();// 具体执行函数tPtr->_func = [task](){(*task)();};// 插入任务std::unique_lock<std::mutex> lock(_mutex);_taskqueue.push(tPtr);_cond.notify_one();return task->get_future();}// 线程池的初始化bool init(const int num = NUM);// 停止线程池void stop();// 启动所有线程bool start();// 获取线程数量size_t getThreadNumber();// 获取任务数量size_t getTaskNumder();// 获取任务bool getTask(taskPtr &task);// 执行当前线程处理任务void run();// 等在所有任务执行完毕bool waitforAllDone();private:int _num; // 线程数量std::queue<taskPtr> _taskqueue; // 任务队列std::mutex _mutex; // 互斥锁std::condition_variable _cond; // 条件变量bool _stop; // 停止标志std::vector<std::thread *> _threads; // 线程组std::atomic<int> count{0}; // 标志
};#endif
threadPool.cpp
#include "threadPool.h"// 构造函数,初始化线程数量以及线程池状态
ThreadPool::ThreadPool() : _num(NUM), _stop(false)
{
}// 析构函数
ThreadPool::~ThreadPool()
{stop();
}// 初始化线程数量
bool ThreadPool::init(int num)
{std::unique_lock<std::mutex> lock(_mutex);if (!_taskqueue.empty()){std::cout << "thread already init !!!" << std::endl;return false;}_num = num;std::cout << "thread init succ !!!" << std::endl;return true;
}// 初始化线程池
bool ThreadPool::start()
{std::unique_lock<std::mutex> lock(_mutex);// 如果当前线程池的数量不为空,就不需要添加线程进去if (!_threads.empty()){std::cout << "threadpool not empty !!!\n";return false;}for (size_t i = 0; i < _num; i++){_threads.push_back(new std::thread(&ThreadPool::run, this));}std::cout << "threadpool init succ !!!\n";return true;
}// 停止线程池
void ThreadPool::stop()
{{std::unique_lock<std::mutex> lock(_mutex);_stop = true;// 唤醒所有线程_cond.notify_all();}for (size_t i = 0; i < _threads.size(); i++){if (_threads[i]->joinable()){_threads[i]->join();}delete _threads[i];_threads[i] = nullptr;}std::unique_lock<std::mutex> lock(_mutex);_threads.clear();
}// 执行当前线程的处理任务
void ThreadPool::run()
{while (!_stop){taskPtr task;auto ret = getTask(task);if (ret){// 证明当前正在执行该任务++count;task->_func();}}--count;// 所有任务执行完毕std::unique_lock<std::mutex> lock(_mutex);if (count == 0 && _taskqueue.empty()){// 为了waitforAllDone_cond.notify_all();}
}// 从任务队列中获取任务
bool ThreadPool::getTask(taskPtr &task)
{std::unique_lock<std::mutex> lock(_mutex);if (_taskqueue.empty()){_cond.wait(lock, [this](){ return _stop || !_taskqueue.empty(); });}if (_stop){return false;}if (!_taskqueue.empty()){task = std::move(_taskqueue.front());_taskqueue.pop();return true;}return false;
}// 获取线程数量
size_t ThreadPool::getThreadNumber()
{std::unique_lock<std::mutex> lock(_mutex);return _threads.size();
}// 获取任务数量
size_t ThreadPool::getTaskNumder()
{std::unique_lock<std::mutex> lock(_mutex);return _taskqueue.size();
}// 等待所有任务执行完毕
bool ThreadPool::waitforAllDone()
{std::unique_lock<std::mutex> lock(_mutex);if (_taskqueue.empty()){return true;}return _cond.wait_for(lock, std::chrono::milliseconds(100), [this] { return _taskqueue.empty(); });
}
main.cpp
#include <iostream>
#include <chrono>
#include "threadPool.cpp"void func1()
{// std::this_thread::sleep_for(std::chrono::seconds(5));std::cout << "void func1()" << std::endl;
}int func2(int a)
{// std::this_thread::sleep_for(std::chrono::seconds(3));std::cout << "void func2()->" << a << std::endl;return a;
}void func3(int a, std::string str)
{std::cout << "void func3()->" << a << ":" << str << std::endl;
}int main()
{ThreadPool thread_pool;thread_pool.init(3);thread_pool.start();// thread_pool.exec(func1);// thread_pool.exec(func2, 10);// thread_pool.exec(func3, 10, "hello");std::future<decltype(func2(1))> result = thread_pool.exec(func2, 1);std::cout << "result: " << result.get() << std::endl;thread_pool.waitforAllDone();thread_pool.stop();return 0;
}