【C++】异步操作
文章目录
- std::async 与 std::future
- std::shared_future
- std::packaged_task
- std::promise
- C++11线程池
在 C++ 中,异步操作是指程序在执行一个操作时,不需要等待该操作完成就可以继续执行后续代码,从而提高程序的并发性能和响应性。C++11 及后续标准引入了一系列异步操作相关的库和机制,主要包括以下几个核心组件:
std::async与std::futurestd::packaged_taskstd::promise
下面就逐一介绍一下
std::async 与 std::future
std::async 是一个函数模板,它会异步执行一个函数,而 std::future 用于获取异步任务的结果,两者通常配合使用。
std::async
功能:自动创建线程并执行任务,无需手动管理线程生命周期。
原型:
template< class Function, class... Args >
std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>>
async( std::launch policy, Function&& f, Args&&... args );
启动策略:
std::launch::async:强制立即在新线程中执行任务。std::launch::deferred:延迟执行(任务会在调用get()或wait()时在当前线程执行)。- 默认策略(不指定):由实现决定,可能是上述两种之一。
std::future
功能:作为异步操作结果的 “占位符”,提供获取结果、等待完成的接口。
主要方法:
get():获取异步任务的返回值(如果任务未完成,会阻塞当前线程)。注意:get() 只能调用一次,调用后 future 状态变为无效(valid() == false)。
wait():阻塞等待任务完成,但不获取结果。
wait_for(duration):等待指定时长,返回状态(ready/timeout/deferred)。
wait_until(timepoint):等待到指定时间点,返回状态同上。
基本用法:
#include <iostream>
#include <future>
int Add(int a, int b) {return a + b;
}int main()
{// 异步调用Add函数// std::launch::async策略: 内部创建一个线程执行函数, 函数运行结果通过future获取// std::launch::deferred策略: 函数不会立即执行, 直到调用get()或wait()时才执行std::future<int> result = std::async(std::launch::async, Add, 1, 2);// 获取结果std::cout << "1 + 2 = " << result.get() << std::endl;return 0;
}

std::shared_future
- 共享结果所有权:与
std::future(只能移动,不能复制)不同,std::shared_future可以被复制,多个 shared_future 对象可以指向同一个异步结果。 - 获取结果的方式:通过
get()方法获取异步操作的结果,该方法可以被多个线程调用,且每个线程都能得到完整的结果。 - 与 std::future 的关系:
std::shared_future通常通过std::future::share()方法从一个std::future对象转换而来。
使用场景
当需要多个线程获取同一个异步操作的结果时,std::shared_future 非常有用。例如,多个线程需要基于同一个计算结果执行后续操作。
#include <iostream>
#include <future>
#include <thread>
#include <chrono>// 异步执行的任务
int calculate() {std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟耗时计算return 42;
}// 多个线程共享结果
void print_result(std::shared_future<int> fut, std::mutex &mtx) {std::unique_lock<std::mutex> lck(mtx);std::cout << "线程 " << std::this_thread::get_id()<< " 获取结果: " << fut.get() << std::endl;
}int main() {// 启动异步任务,得到一个futurestd::future<int> fut = std::async(std::launch::async, calculate);// 转换为shared_future,允许多个线程共享std::shared_future<int> shared_fut = fut.share();// 启动多个线程共享结果std::mutex mtx; // 使用锁管理共享资源(输出位置--控制台)std::thread t1(print_result, shared_fut, std::ref(mtx));std::thread t2(print_result, shared_fut, std::ref(mtx));std::thread t3(print_result, shared_fut, std::ref(mtx));t1.join();t2.join();t3.join();return 0;
}

std::packaged_task
std::packaged_task 是一种模板类,可以对一个函数进行二次封装,封装成一个可调用对象放到其他线程去执行。我们可以通过std::packaged_task对象获取任务相关联的std::feature对象,通过调用get_future()方法获得。std::packaged_task的模板参数是函数签名。
执行任务的步骤:
- 封装任务
- 执行任务
- 通过
future获取任务执行结果
可以把std::future和std::async看成是分开的, 而 std::packaged_task则是一个整体。
特性
- 包装后的任务可以像函数一样被调用(通过
operator())。 - 任务执行后,结果会自动存储,供关联的
future获取。 - 任务必须被执行(否则
future.get()会永远阻塞)。
主要方法
get_future():返回与任务关联的std::future对象。operator():执行包装的任务(可在任意线程中调用)。
基本用法:
#include <iostream>
#include <thread>
#include <memory>
#include <future>
#include <functional>
int Add(int a, int b) {return a + b;
}int main()
{// 封装任务// std::packaged_task<int(int, int)> task(add);// 此处可执行其他操作, 无需等待// std::future<int> result_future = task.get_future();//需要注意的是,task虽然重载了()运算符,但task并不是一个函数,//std::async(std::launch::async, task, 1, 2); //--错误用法//所以导致它作为线程的入口函数时,语法上看没有问题,但是实际编译的时候会报错//std::thread(task, 1, 2); //---错误用法//而packaged_task禁止了拷贝构造,//且因为每个packaged_task所封装的函数签名都有可能不同,因此也无法当作参数一样传递//传引用不可取,毕竟任务在多线程下执行存在局部变量声明周期的问题,因此不能传引用//因此想要将一个packaged_task进行异步调用,//简单方法就只能是new packaged_task,封装函数传地址进行解引用调用了//而类型不同的问题,在使用的时候可以使用类型推导来解决// 封装任务auto task = std::make_shared<std::packaged_task<int(int, int)>>(Add);// 获取任务包关联的future对象std::future<int> result = task->get_future();// 异步执行任务std::thread([task]() {(*task)(11, 22);}).detach(); // 使用detach使线程在后台运行// 等待任务完成并获取结果std::cout << "异步计算结果: " << result.get() << std::endl;return 0;
}

来看一个比较复杂的场景:
#include <iostream>
#include <future> // 包含异步操作相关类(packaged_task, future等)
#include <thread> // 包含线程操作相关类
#include <chrono> // 包含时间相关功能
#include <queue> // 包含队列容器
#include <mutex> // 包含互斥锁相关类
#include <condition_variable> // 包含条件变量类// 用于保护cout输出的互斥锁,避免多线程输出混乱
std::mutex cout_mtx;/*** 任务执行函数,由工作线程运行* @param task_queue 任务队列,存储待执行的任务* @param task_mtx 保护任务队列的互斥锁* @param cv 条件变量,用于通知工作线程有新任务*/
void taskRunner(std::queue<std::packaged_task<int()>> &task_queue, std::mutex &task_mtx, std::condition_variable &cv) {// 工作线程主循环,持续处理任务while (true) {// 存储从队列中取出的任务std::packaged_task<int()> task;{// 创建unique_lock用于条件变量等待,自动管理锁的生命周期std::unique_lock<std::mutex> lock(task_mtx);// 等待条件变量,直到队列不为空才继续执行// 第二个参数是谓词,避免虚假唤醒cv.wait(lock, [&task_queue]() {return !task_queue.empty();});// 将队列头部的任务移动到本地变量task = std::move(task_queue.front());// 从队列中移除已取出的任务task_queue.pop();}// 如果任务无效(空任务),则退出循环,结束工作线程if (!task.valid()) break;// 执行任务task(); {// 加锁保护cout输出std::lock_guard<std::mutex> lock(cout_mtx);std::cout << "任务在线程 " << std::this_thread::get_id() << " 执行完毕" << std::endl;}}
}int main()
{// 任务队列,存储所有待执行的任务std::queue<std::packaged_task<int()>> taskqueue;// 保护任务队列的互斥锁std::mutex task_mtx;// 条件变量,用于通知工作线程有新任务到来std::condition_variable cv;// 启动工作线程,执行taskRunner函数std::thread worker(taskRunner, std::ref(taskqueue), std::ref(task_mtx), std::ref(cv));// 创建并添加5个任务到队列for (int i = 0; i < 5; ++i) {// 创建一个打包任务,任务是一个lambda表达式// 该任务会休眠1秒,然后返回i的平方std::packaged_task<int()> task([i]() {std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时操作return i * i; // 返回计算结果});// 获取与该任务关联的future,用于获取任务执行结果auto fut = task.get_future();{// 加锁保护任务队列,将任务加入队列std::lock_guard<std::mutex> lock(task_mtx);taskqueue.push(std::move(task)); // 任务只能移动,不能复制}// 通知工作线程有新任务到来cv.notify_one();// 等待任务执行完成并获取结果(会阻塞主线程直到结果返回)int result = fut.get();// 加锁保护cout输出任务结果std::unique_lock<std::mutex> lock(cout_mtx);std::cout << "任务 " << i << " 结果: " << result << std::endl;}// 发送结束信号:添加一个空任务到队列{std::lock_guard<std::mutex> lock(task_mtx);taskqueue.emplace(); // 空任务用于通知工作线程退出}// 通知工作线程处理最后的空任务cv.notify_one();// 等待工作线程结束worker.join();return 0;
}

std::promise
std::promise提供了一种设置值的方式,在一个线程中 “承诺”(设置)一个值或异常,另一个线程通过关联的 std::future 获取该值或异常,实现线程间的数据传递。
使用步骤:
- 在使用的时候,先实例化一个指定结果的
promise对象 - 通过
promis对象,获取到关联的future对象 - 在任意位置给
promise设置数据,就可以通过关联的future获取到这个设置的数据了
特性
std::promise与std::future是 “一对多” 或 “一对一” 的关系(一个 promise 可对应多个 shared_future)。- 必须通过
set_value()或set_exception()为promise设置结果,否则关联的future.get()会抛出异常。
主要方法
get_future():返回关联的std::future。set_value(T):设置结果值(只能调用一次)。set_exception(std::exception_ptr):设置异常(例如:传递错误信息)。
基本用法:
#include <iostream>
#include <thread>
#include <memory>
#include <future>
#include <functional>
int Add(int a, int b) {return a + b;
}int main()
{// 先创建一个 std::promise 对象std::promise<int> promise;// 获取与 promise 关联的 std::future 对象std::future<int> future = promise.get_future();// 在任意位置给promise设置数据, 就可以通过关联的future获取数据std::thread t([&promise]() {// 模拟一些耗时操作std::this_thread::sleep_for(std::chrono::seconds(2));// 设置结果promise.set_value(Add(10, 20));});// 等待结果std::cout << "Waiting for result..." << std::endl;// 获取结果int result = future.get();std::cout << "Result: " << result << std::endl;// 等待线程结束t.join();return 0;
}

&
&
C++11线程池
下面用上面所介绍的知识来实现一个线程池, 并且使用该线程池并行运算素数:
// 线程池类:管理一组工作线程,处理提交的任务
class ThreadPool {
public:// 构造函数,启动指定数量的工作线程// 参数threads:工作线程的数量ThreadPool(size_t threads) : stop(false) {// 创建threads个工作线程并加入线程池for (size_t i = 0; i < threads; ++i) {workers.emplace_back([this] { // 每个线程执行的函数for (;;) { // 无限循环,不断从任务队列取任务执行std::function<void()> task; // 存储要执行的任务{ // 临界区开始(使用大括号限制锁的作用域)// 获取互斥锁,unique_lock会自动释放锁std::unique_lock<std::mutex> lock(this->queue_mutex);// 等待条件变量:当stop为true或任务队列不为空时唤醒// 如果条件不满足,线程会进入阻塞状态,释放锁this->condition.wait(lock,[this] { return this->stop || !this->tasks.empty(); });// 如果线程池已停止且任务队列为空,则退出线程if (this->stop && this->tasks.empty())return;// 从队列中取出一个任务task = std::move(this->tasks.front());this->tasks.pop();} // 临界区结束,锁自动释放// 执行任务(此时已释放锁,其他线程可以操作队列)task();}});}}// 向线程池添加任务// 模板参数F:任务函数类型;Args:函数参数类型// 返回值:一个future对象,用于获取任务执行结果template<class F, class... Args>auto enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type> {// 确定任务返回值类型using return_type = typename std::result_of<F(Args...)>::type;// 创建一个包装任务,将函数和参数绑定// 使用shared_ptr是因为需要在多个线程间共享auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));// 获取与任务关联的future对象,用于获取结果std::future<return_type> res = task->get_future();{ // 临界区:操作任务队列std::unique_lock<std::mutex> lock(queue_mutex);// 如果线程池已停止,不允许添加新任务if (stop)throw std::runtime_error("enqueue on stopped ThreadPool");// 将任务添加到队列,使用lambda包装以便无参数调用tasks.emplace([task]() { (*task)(); });} // 释放锁// 通知一个等待的工作线程有新任务到来condition.notify_one();return res;}// 析构函数,等待所有任务完成后销毁线程池~ThreadPool() {{ // 临界区:设置停止标志std::unique_lock<std::mutex> lock(queue_mutex);stop = true; // 告诉所有工作线程准备退出}// 通知所有工作线程(唤醒它们)condition.notify_all();// 等待所有工作线程完成当前任务并退出for (std::thread& worker : workers)worker.join();}private:// 工作线程集合std::vector<std::thread> workers;// 任务队列:存储待执行的任务std::queue<std::function<void()>> tasks;// 同步原语std::mutex queue_mutex; // 保护任务队列的互斥锁std::condition_variable condition;// 用于线程间同步的条件变量bool stop; // 线程池是否停止的标志
};// 判断一个数是否为素数
// 参数n:要判断的整数
// 返回值:true表示是素数,false表示不是
bool isPrime(int n) {if (n <= 1) return false; // 小于等于1的数不是素数if (n == 2) return true; // 2是素数if (n % 2 == 0) return false;// 偶数不是素数(除了2)// 只需要检查到sqrt(n),且只检查奇数int sqrt_n = sqrt(n);for (int i = 3; i <= sqrt_n; i += 2) {if (n % i == 0) return false; // 能被整除,不是素数}return true; // 是素数
}// 计算指定范围内的素数个数
// 参数start:范围起始值;end:范围结束值
// 返回值:[start, end]范围内的素数数量
int countPrimes(int start, int end) {int count = 0;for (int i = start; i <= end; ++i) {if (isPrime(i)) ++count; // 如果是素数则计数加1}return count;
}int main()
{const int MAX = 100000000; // 计算1到MAX之间的素数const int THREADS = 8; // 使用8个线程ThreadPool pool(THREADS); // 创建线程池std::vector<std::future<int>> futures; // 存储每个任务的future对象int chunk = MAX / THREADS; // 每个线程处理的数字范围大小// 将任务分配到线程池for (int i = 0; i < THREADS; ++i) {int start = i * chunk + 1; // 计算当前线程处理的起始值// 最后一个线程处理剩余的所有数字(处理整除不尽的情况)int end = (i == THREADS - 1) ? MAX : (i + 1) * chunk;// 向线程池添加任务,并保存返回的futurefutures.push_back(pool.enqueue(countPrimes, start, end));}// 收集所有任务的结果size_t begin1 = clock(); // 开始计时(收集结果的时间)int total1 = 0;for (auto& fut : futures) {// 等待任务完成并获取结果,累加到总数total1 += fut.get();}size_t end1 = clock(); // 结束计时// 输出结果和耗时std::cout << "1到" << MAX << "之间的素数数量-并行异步: "<< total1 << "->" << end1 - begin1 << std::endl;return 0;
}

