C++异步编程入门
在C++中,相关的接口被如下组织:
promise & future
要在C++中使用该组异步接口,首先需要定义一个promise结构体,它表示某一个数据的输入端。//头文件
#include <future>//定义一个promise,传递int类型的未来值:
std::promise<int> p;
//定义一个future,并于p绑定(直接调用了p的接口获取)
std::future<int> f = p.get_future();
当某个值准备好后,调用promise的set_value接口,设置好未来值,随后,就能使用future的接口将该值读取出来。
#include <future>
#include <iostream>
int main()
{std::promise<int> p;std::future<int> f = p.get_future();p.set_value(42); // 这个值对p而言是只写的std::cout << f.get() << std::endl; // 这个值对f而言是只读的
}
假设我们将p传入到某一个线程,在这个线程内部,调用p的set_value接口,在另一个线程内部,调用f的get接口,就可以实现线程间的异步同步处理。
#include <future>
#include <iostream>
#include <thread>
int main()
{std::promise<int> p;std::future<int> f = p.get_future();std::thread t([](std::promise<int> p) {p.set_value(42);},std::move(p));std::cout << f.get() << std::endl;t.join();
}
需要注意的是,对应的一套 p.set_value 和 f.get 只能被使用一次,否则会抛出future_error的exception。
wait、wait_for、wait_until
由于异步之间效率的不同,某一个未来值可能迟迟不出现,这就会出现另一个线程的阻塞等待,C++也提供了相应的接口,并给出了超时语义。- void wait():阻塞式的等待,直到future的值可以被读取
- future_status wait_for(const std::chrono::duration<…> &):等待存在最大时间限制,返回future_status
- future_status_wait_until(const std::chrono::time_point<…> &):等待到某个时间,返回future_status
future_status可以有以下取值:
future_status | 定义 |
---|---|
deferred | 这是一个惰性估值(async中会使用) |
ready | 已经可以读取值 |
timeout | 等待超时 |
shared_future
std::future 是不可拷贝的,不是线程安全的,只能通过移动(std::move) 语义传递。只能由单一消费者消费结果,如果存在对一个future的多个线程同时访问需求,需要使用到shared_futrue。shared_future 支持共享访问,允许多个线程共享访问异步操作的结果,它允许被拷贝,每个线程都可以对不同的shared_future拷贝获取相同的结果。
#include <iostream>
#include <future>
#include <thread>// 模拟一个耗时计算的函数
int compute()
{return 42;
}int main()
{// 创建一个 std::promise 和与之关联的 std::futurestd::promise<int> promise;std::future<int> fut = promise.get_future();// 将 std::future 转换为 std::shared_futurestd::shared_future<int> shared_fut = fut.share();// 在另一个线程中设置 promise 的值std::thread t([&promise]() {int result = compute(); // 计算结果promise.set_value(result); // 将结果传递给 promise});// 多个地方获取结果std::cout << "Thread 1: " << shared_fut.get() << std::endl;std::cout << "Thread 2: " << shared_fut.get() << std::endl;// 共享的 future 可以拷贝std::shared_future<int> shared_fut_copy = shared_fut;std::cout << "Thread 3 (copy): " << shared_fut_copy.get() << std::endl;t.join(); // 等待线程结束return 0;
}output:
Thread 1: 42
Thread 2: 42
Thread 3 (copy): 42
packaged_task
在上面的例子中,我们总是希望在某个方法中异步的完成它的值,因此,我们需要设计出这个方法,packaged_task类试图将promise和实现promise的方法进一步封装为一个完整的结构。#include <future>
#include <iostream>
int compute(int a, int b) {return 42 + a + b;
}
int main() {// 相当于定义了promise 并绑定了对应的方法std::packaged_task<int(int, int)> task(compute);std::future<int> f = task.get_future();// task内部重载了(),将直接调用computetask(3, 4);std::cout << f.get() << std::endl;return 0;
}
简化版实现如下:
#include <exception>
#include <functional>
#include <future>
template <typename Func>
class my_packaged_task;
template <typename Ret, typename... Args>
class my_packaged_task<Ret(Args...)> {
private:std::promise<Ret> promise_;std::function<Ret(Args...)> func_;
public:my_packaged_task(std::function<Ret(Args...)> func): func_(std::move(func)) {}// 重载 () 操作void operator()(Args&&... args) {try {promise_.set_value(func_(std::forward<Args&&>(args)...));} catch (...) {promise_.set_exception(std::current_exception());}}std::future<Ret> get_future() {return promise_.get_future();}
};
因此,promise被进一步封装,对外不再可见,当需要调用预设的方法为future赋值时,只需要简单的调用task重载后的函数即可。
async
在将promise封装为packaged_task后,我们依然有大量的需求是创建一个线程,去执行异步操作,在计算机的世界里面,没有什么是再封一层做不到的,如果有,那一定就是再封两层,所以我们将packaged_task和thread再向上封装,就形成了最后的async接口,这个接口直接返回future,然后在**某个时间点**完成异步操作。某个时间点取决于 async 当前的执行策略:
- std::launch::async:建立一个线程执行指定的异步操作回传到future。
2. std::launch::deferred:将该操作的触发时间,延迟到future.get被调用的时候。
分析如下代码:
#include <iostream>
#include <future>
#include <thread>
#include <chrono>
// 模拟一个耗时任务
int compute(int x) {std::cout << "Start computation with x = " << x << " in thread " << std::this_thread::get_id() << std::endl;std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟耗时操作return x * x;
}
int main() {// 使用 std::async 的 std::launch::async 模式std::future<int> async_future = std::async(std::launch::async, compute, 10);// 使用 std::async 的 std::launch::deferred 模式std::future<int> deferred_future = std::async(std::launch::deferred, compute, 20);// 主线程继续执行其他任务std::cout << "Main thread ID: " << std::this_thread::get_id() << std::endl;// deferred 模式任务尚未启动,只有调用 `get()` 时才会运行std::cout << "Waiting for deferred result..." << std::endl;std::cout << "Deferred result: " << deferred_future.get() << std::endl; // 在调用 get() 时任务才运行// 查看任务是否已启动(async 模式的任务已经开始)std::cout << "Waiting for async result..." << std::endl;std::cout << "Async result: " << async_future.get() << std::endl; // 获取结果并等待 async 模式完成return 0;
}output:
Main thread ID: 135193224783680
Waiting for deferred result...
Deferred result: Start computation with x = 20 in thread 135193224783680
Start computation with x = 10 in thread 135193218250304
400
Waiting for async result...
Async result: 100
可以观察到,在输出deferred模式的future值时会感受到2秒的明显停顿且thread id和主线程相同,而async模式的future值瞬间输出,输出的线程thread id和主线程不同,因为deferred模式下,并没有额外的线程产生,依靠主线程执行了对future的赋值操作,在async模式下,会额外创建一个新的线程异步执行操作。