C++异步通信-future学习
C++异步通信-future学习
future 的核心组件:
组件 | 作用 |
---|---|
std::future<T> | 用于异步获取某个操作的返回值 |
std::promise<T> | 提供一种主动设置 future 的方式 |
std::async | 异步启动一个任务,返回 future |
std::packaged_task | 封装函数任务,稍后执行并返回结果 |
std::shared_future<T> | future 的可共享版本 |
std::future
表示一个异步操作的结果,提供了一种访问异步操作结果的机制。
你可以把它想象成一张"欠条"——当你启动一个异步操作时,它会给你一个 future 对象,承诺将来会有一个结果
基本工作原理
- 1、你启动一个异步操作(比如通过 std::async)
- 2、该操作返回一个 std::future 对象
- 3、当需要结果时,调用 future.get() 获取值
- 4、如果结果还没准备好,get() 会阻塞当前线程直到结果可用
主要方法
- get(): 获取结果,如果结果未就绪则阻塞当前线程
- valid(): 检查 future 是否关联了共享状态
- wait(): 等待结果变为可用
- wait_for() / wait_until(): 带超时的等待
实例
#include <iostream>
#include <future>
#include <vector>int compute(int x) {// 模拟耗时计算std::this_thread::sleep_for(std::chrono::milliseconds(500));return x * x;
}
int main() {//并行计算std::vector<std::future<int>> futures;for (int i = 1; i <= 5; ++i) {futures.push_back(std::async(std::launch::async, compute, i));}for (auto& fut : futures) {if(fut.valid())std::cout << fut.get() << " ";}//超时处理std::future<int> fut = std::async([](){std::this_thread::sleep_for(std::chrono::seconds(3));return 42;});auto status = fut.wait_for(std::chrono::seconds(1));if (status == std::future_status::ready) {std::cout << "Get result: " << fut.get() << std::endl;} else {std::cout << "Timeout occurred\n";}return 0;
}
注意事项
- get() 只能调用一次 - 第二次调用是未定义行为
- 生命周期管理 - 确保 future 对象在获取结果前不被销毁
- 共享状态 - 没有关联共享状态的 future 是无效的
- 线程安全 - future 对象本身不是线程安全的
std::promise
std::promise 是 C++11 引入的一个与 std::future 配合使用的组件,它提供了一种显式设置异步操作结果(值或异常)的机制。std::promise 可以被看作是一个"承诺",它承诺将来会提供一个值。这个承诺可以通过与之关联的 std::future 来获取。
核心特点
- 生产者端:用于设置值或异常
- 一次性写入:只能设置一次值/异常
- 与 future 配对:每个 promise 都有一个关联的 future
主要方法
- set_value(): 设置结果值
- set_exception(): 设置异常
- get_future(): 获取关联的 future 对象
实例
#include <iostream>
#include <future>
#include <thread>void worker(std::promise<int> prom) {try {// 模拟工作std::this_thread::sleep_for(std::chrono::seconds(1));prom.set_value(42); // 履行承诺,设置值} catch (...) {prom.set_exception(std::current_exception()); // 设置异常}
}int main() {std::promise<int> prom;std::future<int> fut = prom.get_future(); // 获取关联的futurestd::thread t(worker, std::move(prom)); // 移动promise到线程// 在主线程中获取结果try {int result = fut.get(); // 阻塞直到获取结果std::cout << "Result: " << result << std::endl;} catch (const std::exception& e) {std::cerr << "Exception: " << e.what() << std::endl;}t.join();return 0;
}
生命周期管理
std::promise 的销毁行为:
- 如果 promise 被销毁且未设置值/异常:
- 关联的 future 会收到 std::future_error 异常
- 错误码为 std::future_errc::broken_promise
std::future<int> create_abandoned_future() {std::promise<int> prom;std::future<int> fut = prom.get_future();return fut; // promise被销毁,没有设置值
}auto fut = create_abandoned_future();
try {int val = fut.get(); // 抛出std::future_error
} catch (const std::future_error& e) {if (e.code() == std::future_errc::broken_promise) {std::cerr << "Promise was broken!\n";}
}
应用场景
- 线程间通信:一个线程计算结果,另一个线程获取结果
- 实现回调机制:将 promise 传递给工作线程,稍后获取结果
注意事项
- 总是设置值或异常:避免 broken_promise 情况
- 考虑使用 RAII:确保 promise 在异常情况下也能被正确设置
- 移动而非复制:记住 promise 只能移动
- 及时获取结果:不要长时间持有 future 而不调用 get()
- 线程安全注意:promise 对象本身不是线程安全的
std::async
启动一个异步任务,返回一个 future 对象。
std::async 是 C++11 提供的一个高级异步操作工具,它简化了异步任务的创建和管理,自动在后台线程执行任务并返回结果。
启动策略:
- std::launch::async: 在新线程中执行任务
- std::launch::deferred: 延迟执行,直到调用 future 的 get() 或 wait()
- 默认策略是两者都可能,由实现决定
实例
#include <iostream>
#include <future>
#include <thread>int compute() {std::this_thread::sleep_for(std::chrono::seconds(1));return 42;
}int main() {// 异步执行auto fut1 = std::async(std::launch::async, compute);// 延迟执行auto fut2 = std::async(std::launch::deferred, compute);std::cout << "Getting fut1 result...\n";std::cout << "fut1: " << fut1.get() << std::endl;std::cout << "Getting fut2 result...\n";std::cout << "fut2: " << fut2.get() << std::endl; // 此时才执行return 0;
}
注意事项
- 明确指定启动策略:避免依赖默认行为
- 优先使用 std::launch::async:如果确实需要异步
- 合理管理future生命周期:避免意外阻塞
- 考虑任务粒度:太小的任务可能不值得异步执行
- 异常安全:确保任务中的异常能被正确处理
std::packaged_task
std::packaged_task 是 C++11 提供的一个重要并发工具,它将可调用对象与 promise/future 机制相结合,提供了更灵活的异步任务管理方式。
将可调用对象包装起来,以便异步调用,其返回值或异常存储在共享状态中,可通过 future 对象访问。
概念
std::packaged_task 是一个类模板,它:
- 包装任何可调用对象(函数、lambda、函数对象等)
- 允许异步调用该对象
- 将调用结果存储在共享状态中,可通过 future 获取
实例
#include <iostream>
#include <future>
#include <thread>int compute(int x, int y) {return x * y;
}int main() {// 创建 packaged_task,包装 compute 函数std::packaged_task<int(int, int)> task(compute);// 获取关联的 futurestd::future<int> result = task.get_future();// 在单独线程中执行任务std::thread t(std::move(task), 6, 7); // 注意:task 只能移动// 获取结果std::cout << "6 * 7 = " << result.get() << std::endl;t.join();return 0;
}
核心特性
模板参数
std::packaged_task 的模板参数 F 是一个函数类型:
- int(int, int) 表示接受两个int参数,返回int的函数
- void() 表示无参数无返回值的函数
主要成员函数
- get_future() 返回与任务关联的 future 对象
- operator() 调用包装的可调用对象
- reset() 重置任务状态,允许重用(C++11不支持,C++14引入)
移动语义
std::packaged_task 只能移动,不能复制:
std::packaged_task<int()> task1([]{ return 42; });
// std::packaged_task<int()> task2 = task1; // 错误!不能复制
std::packaged_task<int()> task2 = std::move(task1); // OK
任务队列中使用用例
#include <queue>
#include <mutex>
#include <condition_variable>std::queue<std::packaged_task<int()>> tasks;
std::mutex queue_mutex;
std::condition_variable cv;void worker_thread() {while (true) {std::packaged_task<int()> task;{std::unique_lock<std::mutex> lock(queue_mutex);cv.wait(lock, []{ return !tasks.empty(); });task = std::move(tasks.front());tasks.pop();}task(); // 执行任务}
}int main() {std::thread worker(worker_thread);// 提交任务std::packaged_task<int()> task([]{ return 42; });std::future<int> result = task.get_future();{std::lock_guard<std::mutex> lock(queue_mutex);tasks.push(std::move(task));}cv.notify_one();std::cout << "Result: " << result.get() << std::endl;worker.join();return 0;
}
生命周期管理
- 共享状态生命周期:由 packaged_task 和 future 共同管理
- 任务对象销毁:如果 packaged_task 被销毁且未调用,关联的 future 会收到 broken_promise 错误
std::future<int> create_abandoned_future() {std::packaged_task<int()> task([]{ return 42; });std::future<int> fut = task.get_future();return fut; // task 被销毁,未调用
}auto fut = create_abandoned_future();
try {int val = fut.get(); // 抛出 std::future_error
} catch (const std::future_error& e) {if (e.code() == std::future_errc::broken_promise) {std::cerr << "Task was never executed!\n";}
}
应用场景
- 任务队列系统:将任务封装后放入队列,由工作线程取出执行
- 延迟执行:先创建任务,稍后决定何时执行
- 复杂任务调度:需要精细控制任务执行顺序的情况
- 回调机制:将 packaged_task 作为回调传递
注意事项
- 总是移动而非复制:记住 packaged_task 只能移动
- 确保任务被执行:避免 broken_promise 情况
- 合理管理线程:确保线程安全地访问任务对象
- 考虑异常安全:确保异常能正确传播
- 及时获取结果:不要长时间持有 future 而不调用 get()
std::shared_future
std::shared_future 是 C++11 引入的一个与 std::future 相关的并发工具,它解决了 std::future 的一些限制,允许多个线程等待和访问同一个异步结果。
与 future 的区别:
- future 只能移动,shared_future 可以复制
- future 的 get() 只能调用一次,shared_future 可以多次调用
基本用法
从 std::future 创建
std::promise<int> prom;
std::future<int> fut = prom.get_future();// 将 future 转换为 shared_future (移动构造)
std::shared_future<int> shared_fut = fut.share();
// 或者:
// std::shared_future<int> shared_fut(std::move(fut));// 现在 fut 不再有效 (valid() == false)
直接创建
std::promise<int> prom;
std::shared_future<int> shared_fut = prom.get_future().share();
多线程共享使用
#include <iostream>
#include <future>
#include <vector>
#include <thread>void worker(std::shared_future<int> sf, int id) {int result = sf.get(); // 所有线程获取相同结果std::cout << "Worker " << id << " got: " << result << std::endl;
}int main() {std::promise<int> prom;std::shared_future<int> shared_fut = prom.get_future().share();std::vector<std::thread> threads;for (int i = 0; i < 5; ++i) {threads.emplace_back(worker, shared_fut, i);}prom.set_value(42); // 设置值,所有等待的线程将被唤醒for (auto& t : threads) {t.join();}return 0;
}
核心特性
- 多次获取结果
std::promise<std::string> prom;
std::shared_future<std::string> shared_fut = prom.get_future().share();prom.set_value("Hello");std::cout << shared_fut.get() << std::endl; // 输出: Hello
std::cout << shared_fut.get() << std::endl; // 再次输出: Hello (可以多次调用)
- 线程安全访问
std::shared_future<int> shared_fut = /*...*/;// 线程1:
int val1 = shared_fut.get();// 线程2同时:
int val2 = shared_fut.get(); // 安全
- 复制语义
std::shared_future<int> sf1 = /*...*/;
std::shared_future<int> sf2 = sf1; // 复制构造
std::shared_future<int> sf3;
sf3 = sf2; // 复制赋值
使用实例:实现发布-订阅模式
class ResultBroadcaster {std::promise<void> prom;std::shared_future<void> shared_fut;
public:ResultBroadcaster() : shared_fut(prom.get_future().share()) {}std::shared_future<void> get_future() const { return shared_fut; }void notify_all() { prom.set_value(); }
};// 使用示例
ResultBroadcaster broadcaster;
auto fut = broadcaster.get_future();std::thread t1([fut]{ fut.wait(); std::cout << "Thread 1 notified\n"; });
std::thread t2([fut]{ fut.wait(); std::cout << "Thread 2 notified\n"; });broadcaster.notify_all();
t1.join(); t2.join();
注意事项
- 明确共享需求:只在多个消费者需要访问同一结果时使用
- 合理管理副本:避免不必要的复制
- 考虑使用 const:如果只是读取结果,可以使用 const shared_future
- 异常安全:确保异常能正确传播到所有等待线程
std::shared_future 特别适合需要广播结果或实现观察者模式的场景。