C++ - 仿 RabbitMQ 实现消息队列--C++11 异步操作实现线程池
目录
std::future
介绍
应用场景
用法示例
使用 std::async 关联异步任务
使用 std::packaged_task 和 std::future 配合
使用 std::promise 和 std::future 配合
c++11 线程池实现
std::future
介绍
std::future 是 C++11 标准库中的一个模板类,它表示一个异步操作的结果。当我们在多线程编程中使用异步任务时,std::future 可以帮助我们在需要的时候获取任务的执行结果。std::future 的一个重要特性是能够阻塞当前线程,直到异步操作完成,从而确保我们在获取结果时不会遇到未完成的操作。
应用场景
- 异步任务: 当我们需要在后台执行一些耗时操作时,如网络请求或计算密集型任务等,std::future 可以用来表示这些异步任务的结果。通过将任务与主线程分离,我们可以实现任务的并行处理,从而提高程序的执行效率。
- 并发控制: 在多线程编程中,我们可能需要等待某些任务完成后才能继续执行其他操作。通过使用 std::future,我们可以实现线程之间的同步,确保任务完成后再获取结果并继续执行后续操作。
- 结果获取:std::future 提供了一种安全的方式来获取异步任务的结果。我们可以使用 std::future::get()函数来获取任务的结果,此函数会阻塞当前线程,直到异步操作完成。这样,在调用 get()函数时,我们可以确保已经获取到了所需的结果。
用法示例
使用 std::async 关联异步任务
std::async 是一种将任务与 std::future 关联的简单方法。它创建并运行一个异步任务,并返回一个与该任务结果关联的 std::future 对象。默认情况下,std::async 是否启动一个新线程,或者在等待 future 时,任务是否同步运行都取决于你给的 参数。这个参数为 std::launch 类型:
- ○ std::launch::deferred 表明该函数会被延迟调用,直到在 future 上调用 get()或者 wait()才会开始执行任务。
- std::launch::async 表明函数会在自己创建的线程上运行。
- std::launch::deferred | std::launch::async 内部通过系统等条件自动选择策略。
deferred策略:
#include <iostream>
#include <thread>
#include <future>
#include <unistd.h>int add(int num1, int num2)
{std::cout << "加法:" << num1+num2 << std::endl;return num1+num2;
}int main()
{std::cout << "------------------------1---------------------------" << std::endl;// std::launch::deferred: 调用get才会执行加法函数std::future<int> result = std::async(std::launch::deferred, add, 11, 22);// std::future<int> result = std::async(std::launch::async, add, 11, 22);sleep(1);std::cout << "------------------------2---------------------------" << std::endl;int sum = result.get(); std::cout << sum << std::endl;return 0;
}
运行结果:
async策略:
#include <iostream>
#include <thread>
#include <future>
#include <unistd.h>int add(int num1, int num2)
{std::cout << "加法:" << num1+num2 << std::endl;return num1+num2;
}int main()
{std::cout << "------------------------1---------------------------" << std::endl;// std::launch::deferred: 调用get才会执行加法函数// std::future<int> result = std::async(std::launch::deferred, add, 11, 22);std::future<int> result = std::async(std::launch::async, add, 11, 22);sleep(1);std::cout << "------------------------2---------------------------" << std::endl;int sum = result.get(); std::cout << sum << std::endl;return 0;
}
运行结果:
使用 std::packaged_task 和 std::future 配合
std::packaged_task 就是将任务和 std::future 绑定在一起的模板,是一种对任务的封装。我们可以通过 std::packaged_task 对象获取任务相关联的 std::future 对象,通过调用 get_future()方法获得。std::packaged_task 的模板参数是函数签名。
可以把 std::future 和 std::async 看成是分开的, 而 std::packaged_task 则是一个整体。
演示demo:
#include <iostream>
#include <thread>
#include <future>
#include <unistd.h>
#include <memory>int add(int num1, int num2)
{sleep(1);return num1+num2;
}int main()
{std::packaged_task<int(int, int)> pt(add);// pt 可以看做是一个可调用对象, 但不能作为一个完全的函数来使用std::future<int> fu = pt.get_future();pt(11, 22);int result = fu.get();std::cout << result << std::endl;return 0;
}
运行结果:
异步执行 std::packaged_task 任务:
#include <iostream>
#include <thread>
#include <future>
#include <unistd.h>
#include <memory>int add(int num1, int num2)
{sleep(1);return num1+num2;
}int main()
{// pt虽然重载了()运算符,但pt并不是一个函数,所以导致它作为线程的入口函数时,语法上看没有问题,但是实际编译的时候会报错// 而 packaged_task 禁止了拷贝构造,// 且因为每个 packaged_task 所封装的函数签名都有可能不同,因此也无法当作参数一样传递// 传引用不可取,毕竟任务在多线程下执行存在局部变量声明周期的问题,因此不能传引用// 因此想要将一个 packaged_task 进行异步调用,// 简单方法就只能是 new packaged_task,封装函数传地址进行解引用调用// 而类型不同的问题,在使用的时候可以使用类型推导来解决// 将pt定义为一个智能指针auto ptask = std::make_shared<std::packaged_task<int(int, int)>>(add);std::future<int> fu = ptask->get_future();std::thread th([ptask](){(*ptask)(11, 22);});int result = fu.get();std::cout << result << std::endl;th.join();return 0;
}
运行结果:
使用 std::promise 和 std::future 配合
std::promise 提供了一种设置值的方式,它可以在设置之后通过相关联的 std::future 对象进行读取。换种说法就是之前说过 std::future 可以读取一个异步函数的返回值了, 但是要等待就绪,而 std::promise 就提供一种 方式手动让 std::future 就绪。
#include <iostream>
#include <thread>
#include <future>
#include <unistd.h>void add(int num1, int num2, std::promise<int> &prom)
{sleep(1);prom.set_value(num1+num2);return;
}int main()
{std::promise<int> prom;std::future<int> fu = prom.get_future();std::thread th(add, 11, 22, std::ref(prom));int result = fu.get();std::cout << result << std::endl;th.join();return 0;
}
运行结果:
c++11 线程池实现
基于线程池执行任务的时候,入口函数内部执行逻辑是固定的,因此选择std::packaged_task 加上std::future 的组合来实现。
线程池的工作思想:
用户传入要执行的函数,以及需要处理的数据(函数的参数),由线程池中的
工作线程来执行函数完成任务
实现:
- 管理的成员
- 任务池:用 vector 维护的一个函数任务池子▪ 互斥锁 & 条件变量: 实现同步互斥
- 一定数量的工作线程:用于不断从任务池取出任务执行任务▪ 结束运行标志:以便于控制线程池的结束。
- 管理的操作:
- 入队任务:入队一个函数和参数
- 停止运行:终止线程池
#include <iostream>
#include <thread>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <future>
#include <memory>
#include <vector>
#include <atomic>class ThreadPool
{
public:using Func = std::function<void(void)>;ThreadPool(int thread_num = 1):_stop(false){for (int i = 0; i < thread_num; i++){_threads.emplace_back(&ThreadPool::entry, this);}}~ThreadPool(){stop();}template <class F, class ...Args>auto push(F &&func, Args &&...args) -> std::future<decltype(func(args...))>{using return_type = decltype(func(args...));// 由于不确定参数,就先绑定进函数auto Func = std::bind(std::forward<F>(func), std::forward<Args>(args)...);// 利用智能指针管理一个new出来的packaged_task对象auto task = std::make_shared<std::packaged_task<return_type()>>(Func);std::future<return_type> fu = task->get_future();{// 推入任务池std::unique_lock<std::mutex> lock(_mutex);_tasks.emplace_back([task](){(*task)();});_cv.notify_one();}return fu;}void stop() {if(_stop) return;_stop = true;_cv.notify_all();for (auto &thread : _threads){thread.join();}}private:void entry(){while(!_stop){std::vector<Func> tmp_tasks;{// 加锁std::unique_lock<std::mutex> lock(_mutex);// 等待任务池不为空或线程池停止_cv.wait(lock, [this](){return !_tasks.empty() || _stop;});// 取出任务tmp_tasks.swap(_tasks);}// 执行任务for (auto &task : tmp_tasks){task();}}}private:std::mutex _mutex;std::condition_variable _cv;std::vector<std::thread> _threads;std::vector<Func> _tasks;std::atomic<bool> _stop;
};