当前位置: 首页 > news >正文

【C++】异步操作

文章目录

  • std::async 与 std::future
  • std::shared_future
  • std::packaged_task
  • std::promise
  • C++11线程池




在 C++ 中,异步操作是指程序在执行一个操作时,不需要等待该操作完成就可以继续执行后续代码,从而提高程序的并发性能和响应性。C++11 及后续标准引入了一系列异步操作相关的库和机制,主要包括以下几个核心组件:

  • std::asyncstd::future
  • std::packaged_task
  • std::promise

下面就逐一介绍一下


std::async 与 std::future


std::async 是一个函数模板,它会异步执行一个函数,而 std::future 用于获取异步任务的结果,两者通常配合使用。

std::async

功能:自动创建线程并执行任务,无需手动管理线程生命周期。
原型:

template< class Function, class... Args >
std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>>
async( std::launch policy, Function&& f, Args&&... args );

启动策略:

  • std::launch::async:强制立即在新线程中执行任务。
  • std::launch::deferred:延迟执行(任务会在调用 get()wait() 时在当前线程执行)。
  • 默认策略(不指定):由实现决定,可能是上述两种之一。

std::future
功能:作为异步操作结果的 “占位符”,提供获取结果、等待完成的接口。
主要方法:
get():获取异步任务的返回值(如果任务未完成,会阻塞当前线程)。注意:get() 只能调用一次,调用后 future 状态变为无效(valid() == false)。
wait():阻塞等待任务完成,但不获取结果。
wait_for(duration):等待指定时长,返回状态(ready/timeout/deferred)。
wait_until(timepoint):等待到指定时间点,返回状态同上。

基本用法:

#include <iostream>
#include <future>
int Add(int a, int b) {return a + b;
}int main()
{// 异步调用Add函数// std::launch::async策略: 内部创建一个线程执行函数, 函数运行结果通过future获取// std::launch::deferred策略: 函数不会立即执行, 直到调用get()或wait()时才执行std::future<int> result = std::async(std::launch::async, Add, 1, 2);// 获取结果std::cout << "1 + 2 = " << result.get() << std::endl;return 0;
}

在这里插入图片描述


std::shared_future


  • 共享结果所有权:与 std::future(只能移动,不能复制)不同,std::shared_future 可以被复制,多个 shared_future 对象可以指向同一个异步结果。
  • 获取结果的方式:通过 get() 方法获取异步操作的结果,该方法可以被多个线程调用,且每个线程都能得到完整的结果。
  • 与 std::future 的关系std::shared_future 通常通过 std::future::share() 方法从一个 std::future 对象转换而来。

使用场景
当需要多个线程获取同一个异步操作的结果时,std::shared_future 非常有用。例如,多个线程需要基于同一个计算结果执行后续操作。

#include <iostream>
#include <future>
#include <thread>
#include <chrono>// 异步执行的任务
int calculate() {std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟耗时计算return 42;
}// 多个线程共享结果
void print_result(std::shared_future<int> fut, std::mutex &mtx) {std::unique_lock<std::mutex> lck(mtx);std::cout << "线程 " << std::this_thread::get_id()<< " 获取结果: " << fut.get() << std::endl;
}int main() {// 启动异步任务,得到一个futurestd::future<int> fut = std::async(std::launch::async, calculate);// 转换为shared_future,允许多个线程共享std::shared_future<int> shared_fut = fut.share();// 启动多个线程共享结果std::mutex mtx; // 使用锁管理共享资源(输出位置--控制台)std::thread t1(print_result, shared_fut, std::ref(mtx));std::thread t2(print_result, shared_fut, std::ref(mtx));std::thread t3(print_result, shared_fut, std::ref(mtx));t1.join();t2.join();t3.join();return 0;
}

在这里插入图片描述


std::packaged_task


std::packaged_task 是一种模板类,可以对一个函数进行二次封装,封装成一个可调用对象放到其他线程去执行。我们可以通过std::packaged_task对象获取任务相关联的std::feature对象,通过调用get_future()方法获得。std::packaged_task的模板参数是函数签名
执行任务的步骤:

  1. 封装任务
  2. 执行任务
  3. 通过future获取任务执行结果

可以把std::futurestd::async看成是分开的, 而 std::packaged_task则是一个整体。

特性

  • 包装后的任务可以像函数一样被调用(通过 operator())。
  • 任务执行后,结果会自动存储,供关联的 future 获取。
  • 任务必须被执行(否则 future.get() 会永远阻塞)。

主要方法

  • get_future():返回与任务关联的 std::future 对象。
  • operator():执行包装的任务(可在任意线程中调用)。

基本用法:

#include <iostream>
#include <thread>
#include <memory>
#include <future>
#include <functional>
int Add(int a, int b) {return a + b;
}int main()
{// 封装任务// std::packaged_task<int(int, int)> task(add);// 此处可执行其他操作, 无需等待// std::future<int> result_future = task.get_future();//需要注意的是,task虽然重载了()运算符,但task并不是一个函数,//std::async(std::launch::async, task, 1, 2); //--错误用法//所以导致它作为线程的入口函数时,语法上看没有问题,但是实际编译的时候会报错//std::thread(task, 1, 2); //---错误用法//而packaged_task禁止了拷贝构造,//且因为每个packaged_task所封装的函数签名都有可能不同,因此也无法当作参数一样传递//传引用不可取,毕竟任务在多线程下执行存在局部变量声明周期的问题,因此不能传引用//因此想要将一个packaged_task进行异步调用,//简单方法就只能是new packaged_task,封装函数传地址进行解引用调用了//而类型不同的问题,在使用的时候可以使用类型推导来解决// 封装任务auto task = std::make_shared<std::packaged_task<int(int, int)>>(Add);// 获取任务包关联的future对象std::future<int> result = task->get_future();// 异步执行任务std::thread([task]() {(*task)(11, 22);}).detach();  // 使用detach使线程在后台运行// 等待任务完成并获取结果std::cout << "异步计算结果: " << result.get() << std::endl;return 0;
}

在这里插入图片描述

来看一个比较复杂的场景:

#include <iostream>
#include <future>   // 包含异步操作相关类(packaged_task, future等)
#include <thread>   // 包含线程操作相关类
#include <chrono>   // 包含时间相关功能
#include <queue>    // 包含队列容器
#include <mutex>    // 包含互斥锁相关类
#include <condition_variable>  // 包含条件变量类// 用于保护cout输出的互斥锁,避免多线程输出混乱
std::mutex cout_mtx;/*** 任务执行函数,由工作线程运行* @param task_queue 任务队列,存储待执行的任务* @param task_mtx 保护任务队列的互斥锁* @param cv 条件变量,用于通知工作线程有新任务*/
void taskRunner(std::queue<std::packaged_task<int()>> &task_queue, std::mutex &task_mtx, std::condition_variable &cv) {// 工作线程主循环,持续处理任务while (true) {// 存储从队列中取出的任务std::packaged_task<int()> task;{// 创建unique_lock用于条件变量等待,自动管理锁的生命周期std::unique_lock<std::mutex> lock(task_mtx);// 等待条件变量,直到队列不为空才继续执行// 第二个参数是谓词,避免虚假唤醒cv.wait(lock, [&task_queue]() {return !task_queue.empty();});// 将队列头部的任务移动到本地变量task = std::move(task_queue.front());// 从队列中移除已取出的任务task_queue.pop();}// 如果任务无效(空任务),则退出循环,结束工作线程if (!task.valid()) break;// 执行任务task();	{// 加锁保护cout输出std::lock_guard<std::mutex> lock(cout_mtx);std::cout << "任务在线程 " << std::this_thread::get_id() << " 执行完毕" << std::endl;}}
}int main()
{// 任务队列,存储所有待执行的任务std::queue<std::packaged_task<int()>> taskqueue;// 保护任务队列的互斥锁std::mutex task_mtx;// 条件变量,用于通知工作线程有新任务到来std::condition_variable cv;// 启动工作线程,执行taskRunner函数std::thread worker(taskRunner, std::ref(taskqueue), std::ref(task_mtx), std::ref(cv));// 创建并添加5个任务到队列for (int i = 0; i < 5; ++i) {// 创建一个打包任务,任务是一个lambda表达式// 该任务会休眠1秒,然后返回i的平方std::packaged_task<int()> task([i]() {std::this_thread::sleep_for(std::chrono::seconds(1));  // 模拟耗时操作return i * i;  // 返回计算结果});// 获取与该任务关联的future,用于获取任务执行结果auto fut = task.get_future();{// 加锁保护任务队列,将任务加入队列std::lock_guard<std::mutex> lock(task_mtx);taskqueue.push(std::move(task));  // 任务只能移动,不能复制}// 通知工作线程有新任务到来cv.notify_one();// 等待任务执行完成并获取结果(会阻塞主线程直到结果返回)int result = fut.get();// 加锁保护cout输出任务结果std::unique_lock<std::mutex> lock(cout_mtx);std::cout << "任务 " << i << " 结果: " << result << std::endl;}// 发送结束信号:添加一个空任务到队列{std::lock_guard<std::mutex> lock(task_mtx);taskqueue.emplace();  // 空任务用于通知工作线程退出}// 通知工作线程处理最后的空任务cv.notify_one();// 等待工作线程结束worker.join();return 0;
}

在这里插入图片描述


std::promise


std::promise提供了一种设置值的方式,在一个线程中 “承诺”(设置)一个值或异常,另一个线程通过关联的 std::future 获取该值或异常,实现线程间的数据传递。

使用步骤:

  1. 在使用的时候,先实例化一个指定结果的promise对象
  2. 通过promis对象,获取到关联的future对象
  3. 在任意位置给promise设置数据,就可以通过关联的future获取到这个设置的数据了

特性

  • std::promisestd::future 是 “一对多” 或 “一对一” 的关系(一个 promise 可对应多个 shared_future)。
  • 必须通过 set_value()set_exception()promise 设置结果,否则关联的 future.get() 会抛出异常。

主要方法

  • get_future():返回关联的 std::future
  • set_value(T):设置结果值(只能调用一次)。
  • set_exception(std::exception_ptr):设置异常(例如:传递错误信息)。

基本用法:

#include <iostream>
#include <thread>
#include <memory>
#include <future>
#include <functional>
int Add(int a, int b) {return a + b;
}int main()
{// 先创建一个 std::promise 对象std::promise<int> promise;// 获取与 promise 关联的 std::future 对象std::future<int> future = promise.get_future();// 在任意位置给promise设置数据, 就可以通过关联的future获取数据std::thread t([&promise]() {// 模拟一些耗时操作std::this_thread::sleep_for(std::chrono::seconds(2));// 设置结果promise.set_value(Add(10, 20));});// 等待结果std::cout << "Waiting for result..." << std::endl;// 获取结果int result = future.get();std::cout << "Result: " << result << std::endl;// 等待线程结束t.join();return 0;
}

在这里插入图片描述

& 
& 

C++11线程池


下面用上面所介绍的知识来实现一个线程池, 并且使用该线程池并行运算素数:

// 线程池类:管理一组工作线程,处理提交的任务
class ThreadPool {
public:// 构造函数,启动指定数量的工作线程// 参数threads:工作线程的数量ThreadPool(size_t threads) : stop(false) {// 创建threads个工作线程并加入线程池for (size_t i = 0; i < threads; ++i) {workers.emplace_back([this] {  // 每个线程执行的函数for (;;) {  // 无限循环,不断从任务队列取任务执行std::function<void()> task;  // 存储要执行的任务{  // 临界区开始(使用大括号限制锁的作用域)// 获取互斥锁,unique_lock会自动释放锁std::unique_lock<std::mutex> lock(this->queue_mutex);// 等待条件变量:当stop为true或任务队列不为空时唤醒// 如果条件不满足,线程会进入阻塞状态,释放锁this->condition.wait(lock,[this] { return this->stop || !this->tasks.empty(); });// 如果线程池已停止且任务队列为空,则退出线程if (this->stop && this->tasks.empty())return;// 从队列中取出一个任务task = std::move(this->tasks.front());this->tasks.pop();}  // 临界区结束,锁自动释放// 执行任务(此时已释放锁,其他线程可以操作队列)task();}});}}// 向线程池添加任务// 模板参数F:任务函数类型;Args:函数参数类型// 返回值:一个future对象,用于获取任务执行结果template<class F, class... Args>auto enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type> {// 确定任务返回值类型using return_type = typename std::result_of<F(Args...)>::type;// 创建一个包装任务,将函数和参数绑定// 使用shared_ptr是因为需要在多个线程间共享auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));// 获取与任务关联的future对象,用于获取结果std::future<return_type> res = task->get_future();{  // 临界区:操作任务队列std::unique_lock<std::mutex> lock(queue_mutex);// 如果线程池已停止,不允许添加新任务if (stop)throw std::runtime_error("enqueue on stopped ThreadPool");// 将任务添加到队列,使用lambda包装以便无参数调用tasks.emplace([task]() { (*task)(); });}  // 释放锁// 通知一个等待的工作线程有新任务到来condition.notify_one();return res;}// 析构函数,等待所有任务完成后销毁线程池~ThreadPool() {{  // 临界区:设置停止标志std::unique_lock<std::mutex> lock(queue_mutex);stop = true;  // 告诉所有工作线程准备退出}// 通知所有工作线程(唤醒它们)condition.notify_all();// 等待所有工作线程完成当前任务并退出for (std::thread& worker : workers)worker.join();}private:// 工作线程集合std::vector<std::thread> workers;// 任务队列:存储待执行的任务std::queue<std::function<void()>> tasks;// 同步原语std::mutex queue_mutex;          // 保护任务队列的互斥锁std::condition_variable condition;// 用于线程间同步的条件变量bool stop;                       // 线程池是否停止的标志
};// 判断一个数是否为素数
// 参数n:要判断的整数
// 返回值:true表示是素数,false表示不是
bool isPrime(int n) {if (n <= 1) return false;    // 小于等于1的数不是素数if (n == 2) return true;     // 2是素数if (n % 2 == 0) return false;// 偶数不是素数(除了2)// 只需要检查到sqrt(n),且只检查奇数int sqrt_n = sqrt(n);for (int i = 3; i <= sqrt_n; i += 2) {if (n % i == 0) return false;  // 能被整除,不是素数}return true;  // 是素数
}// 计算指定范围内的素数个数
// 参数start:范围起始值;end:范围结束值
// 返回值:[start, end]范围内的素数数量
int countPrimes(int start, int end) {int count = 0;for (int i = start; i <= end; ++i) {if (isPrime(i)) ++count;  // 如果是素数则计数加1}return count;
}int main()
{const int MAX = 100000000;   // 计算1到MAX之间的素数const int THREADS = 8;       // 使用8个线程ThreadPool pool(THREADS);    // 创建线程池std::vector<std::future<int>> futures;  // 存储每个任务的future对象int chunk = MAX / THREADS;  // 每个线程处理的数字范围大小// 将任务分配到线程池for (int i = 0; i < THREADS; ++i) {int start = i * chunk + 1;  // 计算当前线程处理的起始值// 最后一个线程处理剩余的所有数字(处理整除不尽的情况)int end = (i == THREADS - 1) ? MAX : (i + 1) * chunk;// 向线程池添加任务,并保存返回的futurefutures.push_back(pool.enqueue(countPrimes, start, end));}// 收集所有任务的结果size_t begin1 = clock();  // 开始计时(收集结果的时间)int total1 = 0;for (auto& fut : futures) {// 等待任务完成并获取结果,累加到总数total1 += fut.get();}size_t end1 = clock();    // 结束计时// 输出结果和耗时std::cout << "1到" << MAX << "之间的素数数量-并行异步: "<< total1 << "->" << end1 - begin1 << std::endl;return 0;
}

在这里插入图片描述

http://www.dtcms.com/a/526667.html

相关文章:

  • 网站做推广团队郑州seo技术外包
  • 企业网站建设商城百度做的网站国外可以打开吗
  • .net 网站开发工程师com域名多少钱
  • 自己做资金盘网站推荐一个seo优化软件
  • asp网站后台管理教程企业人力资源管理师报名入口官网
  • 建设银行的网站网站建设那好
  • 找人做网站做的很烂香奈儿网站建设策划书
  • 北辰苏州网站建设小程序定制团队
  • 公司做推广做网站好还是宜昌做网站的公司
  • C++ STL 容器与算法详解
  • 网站推广途径和推广要点的案例讨论杭州市建设工程交易中心网站
  • 网站之间的区别中铁建设集团有限公司西南分公司
  • 做dj音乐网站漳州商城网站建设
  • 网站建设实验报告手写上海网站建设 知名觉
  • 网站功能设计建设通账号
  • 图片网站怎样选择虚拟主机网站改版后 存在大量404页面
  • 华为云速建站教程wordpress获取别名
  • 做中考试卷的网站企业网站的首页
  • iis配置wap网站外贸型网站
  • 深圳东莞的网站建设公司手机端开发工具
  • 设计高端网站易语言可以做网站吗
  • 企业产品做哪个网站推广好谷歌seo培训
  • 六安有哪些做网站的公司微信怎么创建小程序?
  • 数字化转型:概念性名词浅谈(第七十九讲)
  • 现在1做啥网站流量大最专业的手机网站建设
  • 开一个做网站的公司赚钱吗做网站能接到模具单吗
  • 可以用自己的电脑做网站主机wordpress主题不能用
  • 有哪些好的做问卷调查的网站做网站时如何上传图片
  • 中山专业网站制作苏州出名的网站公司
  • 网站上传附件目录格式计算机ui设计是什么