Async++ 源码分析13--parallel_reduce.h
一、Async++ 代码目录结构
Async++ 项目的目录结构清晰,主要包含根目录下的配置文件、源代码目录、头文件目录以及示例代码目录,具体结构如下:
asyncplusplus/
├── .gitignore # Git 忽略文件配置
├── Async++Config.cmake.in # CMake 配置模板文件
├── CMakeLists.txt # CMake 构建脚本
├── LICENSE # 许可证文件(MIT 许可证)
├── README.md # 项目说明文档
├── examples/ # 示例代码目录
│ └── gtk_scheduler.cpp # GTK 调度器示例
├── src/ # 源代码目录
│ ├── fifo_queue.h # FIFO 队列实现
│ ├── internal.h # 内部头文件(包含类型定义、宏等)
│ ├── scheduler.cpp # 调度器实现
│ ├── singleton.h # 单例模式实现
│ ├── task_wait_event.h # 任务等待事件实现
│ ├── threadpool_scheduler.cpp # 线程池调度器实现
│ └── work_steal_queue.h # 工作窃取队列实现
└── include/ # 头文件目录├── async++.h # 主头文件(对外提供统一接口)└── async++/ # 子模块头文件目录├── aligned_alloc.h├── cancel.h├── continuation_vector.h├── parallel_for.h├── parallel_invoke.h├── parallel_reduce.h├── partitioner.h # 分区器相关定义├── range.h # 范围(迭代器对)相关定义├── ref_count.h├── scheduler.h # 调度器接口定义├── scheduler_fwd.h├── task.h # 任务类定义├── task_base.h # 任务基类定义├── traits.h└── when_all_any.h
二、parallel_reduce源码分析
2.1 源码
// Copyright (c) 2015 Amanieu d'Antras
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.#ifndef ASYNCXX_H_
# error "Do not include this header directly, include <async++.h> instead."
#endifnamespace async {
namespace detail {// Task states
enum class task_state: unsigned char {pending, // Task has not completed yetlocked, // Task is locked (used by event_task to prevent double set)unwrapped, // Task is waiting for an unwrapped task to finishcompleted, // Task has finished execution and a result is availablecanceled // Task has been canceled and an exception is available
};// Determine whether a task is in a final state
inline bool is_finished(task_state s)
{return s == task_state::completed || s == task_state::canceled;
}// Virtual function table used to allow dynamic dispatch for task objects.
// While this is very similar to what a compiler would generate with virtual
// functions, this scheme was found to result in significantly smaller
// generated code size.
struct task_base_vtable {// Destroy the function and resultvoid (*destroy)(task_base*) LIBASYNC_NOEXCEPT;// Run the associated functionvoid (*run)(task_base*) LIBASYNC_NOEXCEPT;// Cancel the task with an exceptionvoid (*cancel)(task_base*, std::exception_ptr&&) LIBASYNC_NOEXCEPT;// Schedule the task using its schedulervoid (*schedule)(task_base* parent, task_ptr t);
};// Type-generic base task object
struct task_base_deleter;
struct LIBASYNC_CACHELINE_ALIGN task_base: public ref_count_base<task_base, task_base_deleter> {// Task statestd::atomic<task_state> state;// Whether get_task() was already called on an event_taskbool event_task_got_task;// Vector of continuationscontinuation_vector continuations;// Virtual function table used for dynamic dispatchconst task_base_vtable* vtable;// Use aligned memory allocationstatic void* operator new(std::size_t size){return aligned_alloc(size, LIBASYNC_CACHELINE_SIZE);}static void operator delete(void* ptr){aligned_free(ptr);}// Initialize task statetask_base(): state(task_state::pending) {}// Check whether the task is ready and include an acquire barrier if it isbool ready() const{return is_finished(state.load(std::memory_order_acquire));}// Run a single continuationtemplate<typename Sched>void run_continuation(Sched& sched, task_ptr&& cont){LIBASYNC_TRY {detail::schedule_task(sched, cont);} LIBASYNC_CATCH(...) {// This is suboptimal, but better than letting the exception leakcont->vtable->cancel(cont.get(), std::current_exception());}}// Run all of the task's continuations after it has completed or canceled.// The list of continuations is emptied and locked to prevent any further// continuations from being added.void run_continuations(){continuations.flush_and_lock([this](task_ptr t) {const task_base_vtable* vtable_ptr = t->vtable;vtable_ptr->schedule(this, std::move(t));});}// Add a continuation to this tasktemplate<typename Sched>void add_continuation(Sched& sched, task_ptr cont){// Check for task completiontask_state current_state = state.load(std::memory_order_relaxed);if (!is_finished(current_state)) {// Try to add the task to the continuation list. This can fail only// if the task has just finished, in which case we run it directly.if (continuations.try_add(std::move(cont)))return;}// Otherwise run the continuation directlystd::atomic_thread_fence(std::memory_order_acquire);run_continuation(sched, std::move(cont));}// Finish the task after it has been executed and the result setvoid finish(){state.store(task_state::completed, std::memory_order_release);run_continuations();}// Wait for the task to finish executingtask_state wait(){task_state s = state.load(std::memory_order_acquire);if (!is_finished(s)) {wait_for_task(this);s = state.load(std::memory_order_relaxed);}return s;}
};// Deleter for task_ptr
struct task_base_deleter {static void do_delete(task_base* p){// Go through the vtable to delete p with its proper typep->vtable->destroy(p);}
};// Result type-specific task object
template<typename Result>
struct task_result_holder: public task_base {union {alignas(Result) std::uint8_t result[sizeof(Result)];alignas(std::exception_ptr) std::uint8_t except[sizeof(std::exception_ptr)];// Scheduler that should be used to schedule this task. The scheduler// type has been erased and is held by vtable->schedule.void* sched;};template<typename T>void set_result(T&& t){new(&result) Result(std::forward<T>(t));}// Return a result using an lvalue or rvalue reference depending on the task// type. The task parameter is not used, it is just there for overload resolution.template<typename T>Result&& get_result(const task<T>&){return std::move(*reinterpret_cast<Result*>(&result));}template<typename T>const Result& get_result(const shared_task<T>&){return *reinterpret_cast<Result*>(&result);}// Destroy the result~task_result_holder(){// Result is only present if the task completed successfullyif (state.load(std::memory_order_relaxed) == task_state::completed)reinterpret_cast<Result*>(&result)->~Result();}
};// Specialization for references
template<typename Result>
struct task_result_holder<Result&>: public task_base {union {// Store as pointer internallyResult* result;alignas(std::exception_ptr) std::uint8_t except[sizeof(std::exception_ptr)];void* sched;};void set_result(Result& obj){result = std::addressof(obj);}template<typename T>Result& get_result(const task<T>&){return *result;}template<typename T>Result& get_result(const shared_task<T>&){return *result;}
};// Specialization for void
template<>
struct task_result_holder<fake_void>: public task_base {union {alignas(std::exception_ptr) std::uint8_t except[sizeof(std::exception_ptr)];void* sched;};void set_result(fake_void) {}// Get the result as fake_void so that it can be passed to set_result and// continuationstemplate<typename T>fake_void get_result(const task<T>&){return fake_void();}template<typename T>fake_void get_result(const shared_task<T>&){return fake_void();}
};template<typename Result>
struct task_result: public task_result_holder<Result> {// Virtual function table for task_resultstatic const task_base_vtable vtable_impl;task_result(){this->vtable = &vtable_impl;}// Destroy the exception~task_result(){// Exception is only present if the task was canceledif (this->state.load(std::memory_order_relaxed) == task_state::canceled)reinterpret_cast<std::exception_ptr*>(&this->except)->~exception_ptr();}// Cancel a task with the given exceptionvoid cancel_base(std::exception_ptr&& except_){set_exception(std::move(except_));this->state.store(task_state::canceled, std::memory_order_release);this->run_continuations();}// Set the exception value of the taskvoid set_exception(std::exception_ptr&& except_){new(&this->except) std::exception_ptr(std::move(except_));}// Get the exception a task was canceled withstd::exception_ptr& get_exception(){return *reinterpret_cast<std::exception_ptr*>(&this->except);}// Wait and throw the exception if the task was canceledvoid wait_and_throw(){if (this->wait() == task_state::canceled)LIBASYNC_RETHROW_EXCEPTION(get_exception());}// Delete the task using its proper typestatic void destroy(task_base* t) LIBASYNC_NOEXCEPT{delete static_cast<task_result<Result>*>(t);}
};
template<typename Result>
const task_base_vtable task_result<Result>::vtable_impl = {task_result<Result>::destroy, // destroynullptr, // runnullptr, // cancelnullptr // schedule
};// Class to hold a function object, with empty base class optimization
template<typename Func, typename = void>
struct func_base {Func func;template<typename F>explicit func_base(F&& f): func(std::forward<F>(f)) {}Func& get_func(){return func;}
};
template<typename Func>
struct func_base<Func, typename std::enable_if<std::is_empty<Func>::value>::type> {template<typename F>explicit func_base(F&& f){new(this) Func(std::forward<F>(f));}~func_base(){get_func().~Func();}Func& get_func(){return *reinterpret_cast<Func*>(this);}
};// Class to hold a function object and initialize/destroy it at any time
template<typename Func, typename = void>
struct func_holder {alignas(Func) std::uint8_t func[sizeof(Func)];Func& get_func(){return *reinterpret_cast<Func*>(&func);}template<typename... Args>void init_func(Args&&... args){new(&func) Func(std::forward<Args>(args)...);}void destroy_func(){get_func().~Func();}
};
template<typename Func>
struct func_holder<Func, typename std::enable_if<std::is_empty<Func>::value>::type> {Func& get_func(){return *reinterpret_cast<Func*>(this);}template<typename... Args>void init_func(Args&&... args){new(this) Func(std::forward<Args>(args)...);}void destroy_func(){get_func().~Func();}
};// Task object with an associated function object
// Using private inheritance so empty Func doesn't take up space
template<typename Sched, typename Func, typename Result>
struct task_func: public task_result<Result>, func_holder<Func> {// Virtual function table for task_funcstatic const task_base_vtable vtable_impl;template<typename... Args>explicit task_func(Args&&... args){this->vtable = &vtable_impl;this->init_func(std::forward<Args>(args)...);}// Run the stored functionstatic void run(task_base* t) LIBASYNC_NOEXCEPT{LIBASYNC_TRY {// Dispatch to execution functionstatic_cast<task_func<Sched, Func, Result>*>(t)->get_func()(t);} LIBASYNC_CATCH(...) {cancel(t, std::current_exception());}}// Cancel the taskstatic void cancel(task_base* t, std::exception_ptr&& except) LIBASYNC_NOEXCEPT{// Destroy the function object when canceling since it won't be// used anymore.static_cast<task_func<Sched, Func, Result>*>(t)->destroy_func();static_cast<task_func<Sched, Func, Result>*>(t)->cancel_base(std::move(except));}// Schedule a continuation task using its schedulerstatic void schedule(task_base* parent, task_ptr t){void* sched = static_cast<task_func<Sched, Func, Result>*>(t.get())->sched;parent->run_continuation(*static_cast<Sched*>(sched), std::move(t));}// Free the function~task_func(){// If the task hasn't completed yet, destroy the function object. Note// that an unwrapped task has already destroyed its function object.if (this->state.load(std::memory_order_relaxed) == task_state::pending)this->destroy_func();}// Delete the task using its proper typestatic void destroy(task_base* t) LIBASYNC_NOEXCEPT{delete static_cast<task_func<Sched, Func, Result>*>(t);}
};
template<typename Sched, typename Func, typename Result>
const task_base_vtable task_func<Sched, Func, Result>::vtable_impl = {task_func<Sched, Func, Result>::destroy, // destroytask_func<Sched, Func, Result>::run, // runtask_func<Sched, Func, Result>::cancel, // canceltask_func<Sched, Func, Result>::schedule // schedule
};// Helper functions to access the internal_task member of a task object, which
// avoids us having to specify half of the functions in the detail namespace
// as friend. Also, internal_task is downcast to the appropriate task_result<>.
template<typename Task>
typename Task::internal_task_type* get_internal_task(const Task& t)
{return static_cast<typename Task::internal_task_type*>(t.internal_task.get());
}
template<typename Task>
void set_internal_task(Task& t, task_ptr p)
{t.internal_task = std::move(p);
}// Common code for task unwrapping
template<typename Result, typename Child>
struct unwrapped_func {explicit unwrapped_func(task_ptr t): parent_task(std::move(t)) {}void operator()(Child child_task) const{// Forward completion state and result to parent tasktask_result<Result>* parent = static_cast<task_result<Result>*>(parent_task.get());LIBASYNC_TRY {if (get_internal_task(child_task)->state.load(std::memory_order_relaxed) == task_state::completed) {parent->set_result(get_internal_task(child_task)->get_result(child_task));parent->finish();} else {// We don't call the generic cancel function here because// the function of the parent task has already been destroyed.parent->cancel_base(std::exception_ptr(get_internal_task(child_task)->get_exception()));}} LIBASYNC_CATCH(...) {// If the copy/move constructor of the result threw, propagate the exceptionparent->cancel_base(std::current_exception());}}task_ptr parent_task;
};
template<typename Sched, typename Result, typename Func, typename Child>
void unwrapped_finish(task_base* parent_base, Child child_task)
{// Destroy the parent task's function since it has been executedparent_base->state.store(task_state::unwrapped, std::memory_order_relaxed);static_cast<task_func<Sched, Func, Result>*>(parent_base)->destroy_func();// Set up a continuation on the child to set the result of the parentLIBASYNC_TRY {parent_base->add_ref();child_task.then(inline_scheduler(), unwrapped_func<Result, Child>(task_ptr(parent_base)));} LIBASYNC_CATCH(...) {// Use cancel_base here because the function object is already destroyed.static_cast<task_result<Result>*>(parent_base)->cancel_base(std::current_exception());}
}// Execution functions for root tasks:
// - With and without task unwraping
template<typename Sched, typename Result, typename Func, bool Unwrap>
struct root_exec_func: private func_base<Func> {template<typename F>explicit root_exec_func(F&& f): func_base<Func>(std::forward<F>(f)) {}void operator()(task_base* t){static_cast<task_result<Result>*>(t)->set_result(detail::invoke_fake_void(std::move(this->get_func())));static_cast<task_func<Sched, root_exec_func, Result>*>(t)->destroy_func();t->finish();}
};
template<typename Sched, typename Result, typename Func>
struct root_exec_func<Sched, Result, Func, true>: private func_base<Func> {template<typename F>explicit root_exec_func(F&& f): func_base<Func>(std::forward<F>(f)) {}void operator()(task_base* t){unwrapped_finish<Sched, Result, root_exec_func>(t, std::move(this->get_func())());}
};// Execution functions for continuation tasks:
// - With and without task unwraping
// - For void, value-based and task-based continuations
template<typename Sched, typename Parent, typename Result, typename Func, typename ValueCont, bool Unwrap>
struct continuation_exec_func: private func_base<Func> {template<typename F, typename P>continuation_exec_func(F&& f, P&& p): func_base<Func>(std::forward<F>(f)), parent(std::forward<P>(p)) {}void operator()(task_base* t){static_cast<task_result<Result>*>(t)->set_result(detail::invoke_fake_void(std::move(this->get_func()), std::move(parent)));static_cast<task_func<Sched, continuation_exec_func, Result>*>(t)->destroy_func();t->finish();}Parent parent;
};
template<typename Sched, typename Parent, typename Result, typename Func>
struct continuation_exec_func<Sched, Parent, Result, Func, std::true_type, false>: private func_base<Func> {template<typename F, typename P>continuation_exec_func(F&& f, P&& p): func_base<Func>(std::forward<F>(f)), parent(std::forward<P>(p)) {}void operator()(task_base* t){if (get_internal_task(parent)->state.load(std::memory_order_relaxed) == task_state::canceled)task_func<Sched, continuation_exec_func, Result>::cancel(t, std::exception_ptr(get_internal_task(parent)->get_exception()));else {static_cast<task_result<Result>*>(t)->set_result(detail::invoke_fake_void(std::move(this->get_func()), get_internal_task(parent)->get_result(parent)));static_cast<task_func<Sched, continuation_exec_func, Result>*>(t)->destroy_func();t->finish();}}Parent parent;
};
template<typename Sched, typename Parent, typename Result, typename Func>
struct continuation_exec_func<Sched, Parent, Result, Func, fake_void, false>: private func_base<Func> {template<typename F, typename P>continuation_exec_func(F&& f, P&& p): func_base<Func>(std::forward<F>(f)), parent(std::forward<P>(p)) {}void operator()(task_base* t){if (get_internal_task(parent)->state.load(std::memory_order_relaxed) == task_state::canceled)task_func<Sched, continuation_exec_func, Result>::cancel(t, std::exception_ptr(get_internal_task(parent)->get_exception()));else {static_cast<task_result<Result>*>(t)->set_result(detail::invoke_fake_void(std::move(this->get_func()), fake_void()));static_cast<task_func<Sched, continuation_exec_func, Result>*>(t)->destroy_func();t->finish();}}Parent parent;
};
template<typename Sched, typename Parent, typename Result, typename Func>
struct continuation_exec_func<Sched, Parent, Result, Func, std::false_type, true>: private func_base<Func> {template<typename F, typename P>continuation_exec_func(F&& f, P&& p): func_base<Func>(std::forward<F>(f)), parent(std::forward<P>(p)) {}void operator()(task_base* t){unwrapped_finish<Sched, Result, continuation_exec_func>(t, detail::invoke_fake_void(std::move(this->get_func()), std::move(parent)));}Parent parent;
};
template<typename Sched, typename Parent, typename Result, typename Func>
struct continuation_exec_func<Sched, Parent, Result, Func, std::true_type, true>: private func_base<Func> {template<typename F, typename P>continuation_exec_func(F&& f, P&& p): func_base<Func>(std::forward<F>(f)), parent(std::forward<P>(p)) {}void operator()(task_base* t){if (get_internal_task(parent)->state.load(std::memory_order_relaxed) == task_state::canceled)task_func<Sched, continuation_exec_func, Result>::cancel(t, std::exception_ptr(get_internal_task(parent)->get_exception()));elseunwrapped_finish<Sched, Result, continuation_exec_func>(t, detail::invoke_fake_void(std::move(this->get_func()), get_internal_task(parent)->get_result(parent)));}Parent parent;
};
template<typename Sched, typename Parent, typename Result, typename Func>
struct continuation_exec_func<Sched, Parent, Result, Func, fake_void, true>: private func_base<Func> {template<typename F, typename P>continuation_exec_func(F&& f, P&& p): func_base<Func>(std::forward<F>(f)), parent(std::forward<P>(p)) {}void operator()(task_base* t){if (get_internal_task(parent)->state.load(std::memory_order_relaxed) == task_state::canceled)task_func<Sched, continuation_exec_func, Result>::cancel(t, std::exception_ptr(get_internal_task(parent)->get_exception()));elseunwrapped_finish<Sched, Result, continuation_exec_func>(t, detail::invoke_fake_void(std::move(this->get_func()), fake_void()));}Parent parent;
};} // namespace detail
} // namespace async
这份代码是 async++ 库(一个 C++ 异步编程框架)的核心实现,主要定义了异步任务(Task)的底层数据结构、状态管理、生命周期控制及调度逻辑。以下从核心概念、代码结构、关键机制三个维度进行详细解析,帮助理解异步任务的运行原理。
2.2 核心概念与前置说明
在阅读代码前,需明确几个关键术语:
- Task(任务):异步执行的单元,包含执行逻辑(函数)、结果 / 异常存储、状态(未完成 / 已完成 / 已取消等)。
- Continuation(延续):一个任务完成后自动执行的后续任务(如
task.then(...)
中的后续逻辑)。 - VTable(虚函数表):手动实现的动态分派机制,避免编译器自动生成虚函数的冗余代码,减小二进制体积。
- Scheduler(调度器):负责将任务分配到线程执行的组件(代码中通过类型擦除隐藏具体实现)。
2.3 代码结构解析
代码位于 async::detail
命名空间(detail
表示内部实现,不对外暴露接口),核心结构分为 任务状态定义、基础任务类、结果存储类、带执行逻辑的任务类、辅助工具类 五部分。
2.3.1. 任务状态定义(task_state
与辅助函数)
定义任务的生命周期状态,通过原子变量保证线程安全。
enum class task_state: unsigned char {pending, // 未执行(初始状态)locked, // 锁定(用于 event_task,防止重复设置结果)unwrapped, // 等待子任务完成(用于任务嵌套,如 task 返回另一个 task)completed, // 执行完成(结果可用)canceled // 已取消(异常可用)
};// 判断任务是否进入“终态”(完成或取消,不再变化)
inline bool is_finished(task_state s) {return s == task_state::completed || s == task_state::canceled;
}
2.3.2. 基础任务类(task_base
)
所有任务的抽象基类,定义任务的通用行为(状态管理、延续任务管理、等待等),采用 引用计数 管理内存(继承自 ref_count_base
,task_base_deleter
为自定义删除器)。
关键成员变量
成员变量 | 作用 |
---|---|
std::atomic<task_state> state | 原子化的任务状态,保证多线程下状态读取 / 修改的安全性。 |
continuation_vector continuations | 存储当前任务的所有延续任务(任务完成后需执行的后续任务)。 |
const task_base_vtable* vtable | 手动实现的虚函数表,指向具体任务类型的 destroy /run /cancel 等方法。 |
bool event_task_got_task | 仅用于 event_task (事件任务),标记是否已获取关联任务,避免重复操作。 |
核心成员函数
(1)状态查询与内存管理
ready()
:判断任务是否完成(含内存屏障,确保读取到最新状态)。cpp
运行
bool ready() const {// memory_order_acquire:确保后续读取操作不重排到当前操作前(可见性)return is_finished(state.load(std::memory_order_acquire)); }
- 重载
operator new/delete
:使用 缓存行对齐分配(aligned_alloc
),避免多线程下的 “伪共享”(False Sharing),提升性能。
(2)延续任务管理
延续任务是异步编程的核心(如 task.then([]{...})
),task_base
提供 add_continuation
(添加延续)和 run_continuations
(执行所有延续)。
-
add_continuation(Sched& sched, task_ptr cont)
:- 先检查任务是否已终态:若已终态,直接执行延续任务;
- 若未终态,尝试将延续任务加入
continuation_vector
;若加入失败(任务刚进入终态),则直接执行。 - 用
std::atomic_thread_fence
保证内存可见性,避免指令重排导致的逻辑错误。
-
run_continuations()
:- 清空
continuation_vector
并锁定(防止后续添加延续); - 遍历所有延续任务,通过
vtable->schedule
调用调度器执行。
- 清空
(3)任务完成与等待
finish()
:标记任务为 “已完成”,并触发所有延续任务。void finish() {// memory_order_release:确保任务执行的写操作对其他线程可见state.store(task_state::completed, std::memory_order_release);run_continuations(); // 执行所有延续 }
wait()
:阻塞当前线程,等待任务进入终态(依赖wait_for_task
底层实现,通常是条件变量或原子自旋)。
2.3.3. 结果存储类(task_result_holder
与 task_result
)
(1)task_result_holder
:结果存储的模板特化
针对任务结果的不同类型(普通值、引用、void
),提供差异化的存储和访问方式,避免冗余代码。
- 普通值类型(
Result
):用union
存储结果(result
)或异常(except
),sched
存储调度器指针(类型擦除)。 - 引用类型(
Result&
):内部用指针存储引用(避免引用无法默认初始化的问题)。 void
类型:特化为fake_void
(占位类型),无需存储结果,仅需异常和调度器。
核心方法:
set_result(T&& t)
:构造结果(placement new,避免额外内存分配)。get_result(const Task&)
:根据任务类型(task
/shared_task
)返回结果(右值引用 / 左值引用)。- 析构函数:仅在任务 “已完成” 时销毁结果,“已取消” 时销毁异常(避免内存泄漏)。
(2)task_result
:继承 task_result_holder
,添加异常与取消逻辑
在结果存储基础上,补充异常管理和任务取消的核心逻辑:
set_exception(std::exception_ptr&& except_)
:存储取消时的异常(如std::future_error
)。get_exception()
:获取取消时的异常,用于后续抛出。wait_and_throw()
:等待任务完成,若为 “已取消” 则抛出异常。- 静态
vtable_impl
:定义task_result
的虚函数表(destroy
指向自定义删除器,run
/cancel
初始为nullptr
,由子类实现)。
2.3.4. 带执行逻辑的任务类(task_func
)
继承 task_result<Result>
和 func_holder<Func>
(函数存储),是 可执行的具体任务类型,关联任务的执行函数(Func
)和调度器(Sched
)。
核心功能
- 函数存储:通过
func_holder<Func>
存储执行逻辑(支持空基类优化,空函数对象不占内存)。 - 执行逻辑:
run(task_base* t)
静态方法,调用存储的函数,捕获异常并转为任务取消。static void run(task_base* t) LIBASYNC_NOEXCEPT {LIBASYNC_TRY { // 自定义宏,封装 try-catch// 调用任务的执行函数,传入任务指针(用于设置结果/取消)static_cast<task_func<Sched, Func, Result>*>(t)->get_func()(t);} LIBASYNC_CATCH(...) {// 捕获所有异常,将任务标记为“已取消”cancel(t, std::current_exception());} }
- 取消逻辑:
cancel(...)
静态方法,销毁执行函数(避免内存泄漏),并标记任务为 “已取消”。 - 调度逻辑:
schedule(...)
静态方法,通过类型擦除的调度器指针(sched
),将延续任务交给调度器执行。 - 虚函数表:
vtable_impl
覆盖task_result
的虚函数表,绑定destroy
/run
/cancel
/schedule
到task_func
的静态方法。
2.3.5. 辅助工具类与函数
(1)函数存储优化(func_base
与 func_holder
)
func_base
:对函数对象进行封装,支持 空基类优化(空函数对象不占用内存,如无捕获的 lambda)。func_holder
:用字节数组(std::uint8_t
)和 placement new 手动管理函数对象的构造 / 析构,避免动态内存分配。
(2)任务嵌套支持(unwrapped_func
与 unwrapped_finish
)
处理 “任务返回任务” 的场景(如 task<task<int>>
),自动 “解包” 子任务,将子任务的结果 / 异常传递给父任务:
unwrapped_func
:子任务的延续函数,子任务完成后,将其结果 / 异常设置到父任务。unwrapped_finish
:标记父任务为unwrapped
状态,销毁父任务的执行函数(已无用),并为子任务添加延续。
(3)执行函数模板(root_exec_func
与 continuation_exec_func
)
root_exec_func
:“根任务”(无依赖的初始任务)的执行逻辑,支持是否解包子任务(Unwrap
模板参数)。continuation_exec_func
:“延续任务” 的执行逻辑,根据父任务类型(Parent
)、结果类型(Result
)、是否解包(Unwrap
)提供多套实现,确保延续任务能正确接收父任务的结果或异常。
2.4 关键机制深度解析
2.4.1. 手动 VTable 机制(性能优化)
C++ 编译器自动生成的虚函数会带来额外的内存开销(每个对象一个 vptr)和代码冗余(每个虚函数一个调用点)。async++ 手动实现 VTable:
- 每个具体任务类型(如
task_func
)定义一个静态vtable_impl
,存储该类型的destroy
/run
/cancel
/schedule
函数指针。 - 基类
task_base
仅存储一个vtable
指针,指向具体类型的vtable_impl
。 - 调用时通过
vtable->run(t)
动态分派,避免编译器生成的虚函数 overhead,减小二进制体积。
2.4.2. 线程安全与内存可见性
所有共享状态(如 state
、continuation_vector
)均通过原子操作或内存屏障保证线程安全:
- 状态管理:
state
用std::atomic<task_state>
,读取用memory_order_acquire
,写入用memory_order_release
,确保多线程下状态的可见性和有序性。 - 延续任务添加:
continuation_vector::try_add
是线程安全的(内部用原子操作),避免多个线程同时添加延续导致的数据竞争。 - 内存屏障:
std::atomic_thread_fence
用于在非原子操作间插入内存屏障,防止指令重排(如add_continuation
中检查状态后执行延续,需屏障确保状态读取的正确性)。
2.4.3. 任务生命周期全流程
以一个简单的异步任务为例,完整生命周期如下:
- 创建任务:用户调用
async::spawn([]{ return 42; })
,内部构造task_func
对象,初始化state
为pending
,存储执行函数和调度器。 - 调度执行:调度器调用
task_func::run(t)
,执行用户函数,返回42
。 - 设置结果:
run
中调用task_result::set_result(42)
,用 placement new 构造结果到result
数组。 - 标记完成:调用
task_base::finish()
,将state
设为completed
(memory_order_release
),触发run_continuations
。 - 执行延续:
run_continuations
遍历continuation_vector
,通过vtable->schedule
将延续任务交给调度器执行。 - 内存释放:所有引用(如
task_ptr
)释放后,ref_count_base
触发task_base_deleter::do_delete
,通过vtable->destroy
销毁具体任务对象。
2.5 核心用法场景
task_base
的核心功能包括:
- 管理任务状态(
pending
/completed
/canceled
等); - 存储和执行延续任务(
continuation
); - 线程安全的状态更新与同步。
示例:自定义基于 task_base
的任务
假设我们要实现一个简单的 “加法任务”,继承 task_base
并实现其虚函数表(VTable),展示任务的创建、执行、状态更新和延续任务的触发流程。
步骤 1:定义任务类型与虚函数表实现
#include <async++.h>
#include <iostream>
#include <atomic>
#include <exception>
#include <memory>using namespace async;
using namespace async::detail;// 1. 自定义任务类型:计算两个数的和,继承 task_base
struct add_task : public task_base {int a, b; // 输入参数int result; // 存储计算结果std::exception_ptr except; // 存储异常(若有)// 构造函数:初始化输入参数add_task(int a_, int b_) : a(a_), b(b_) {// 绑定自定义 VTable(必须在构造时设置)this->vtable = &vtable_impl;}// 2. 实现 VTable 所需的函数// (1)销毁任务(释放资源)static void destroy(task_base* t) noexcept {delete static_cast<add_task*>(t);}// (2)执行任务逻辑(计算 a + b)static void run(task_base* t) noexcept {auto* self = static_cast<add_task*>(t);try {self->result = self->a + self->b; // 执行计算// 标记任务为“已完成”,并触发延续任务self->state.store(task_state::completed, std::memory_order_release);self->run_continuations();} catch (...) {// 捕获异常,标记任务为“已取消”self->except = std::current_exception();self->state.store(task_state::canceled, std::memory_order_release);self->run_continuations();}}// (3)取消任务(设置异常)static void cancel(task_base* t, std::exception_ptr&& e) noexcept {auto* self = static_cast<add_task*>(t);self->except = std::move(e);self->state.store(task_state::canceled, std::memory_order_release);self->run_continuations();}// (4)调度延续任务(简化版:直接执行)static void schedule(task_base* parent, task_ptr cont) {// 实际中应通过调度器分配线程,这里简化为直接执行cont->vtable->run(cont.get());}// 自定义 VTable 实例(绑定上述静态函数)static const task_base_vtable vtable_impl;
};// 初始化 VTable(必须在类外定义)
const task_base_vtable add_task::vtable_impl = {add_task::destroy, // destroyadd_task::run, // runadd_task::cancel, // canceladd_task::schedule // schedule
};
步骤 2:使用自定义任务,测试状态与延续任务
int main() {// 1. 创建自定义任务(a=10, b=20)task_ptr task = task_ptr(new add_task(10, 20));add_task* add = static_cast<add_task*>(task.get());std::cout << "初始状态: " << (add->state.load() == task_state::pending ? "pending" : "错误") << std::endl; // 输出:初始状态: pending// 2. 为任务添加延续任务(任务完成后执行)// 延续任务:打印计算结果auto* cont = new add_task(0, 0); // 复用 add_task 作为延续(仅作示例)cont->vtable->run = [](task_base* t) noexcept {auto* parent = static_cast<add_task*>(t->continuations.parent); // 获取父任务if (parent->state.load() == task_state::completed) {std::cout << "延续任务:计算结果为 " << parent->result << std::endl;} else {std::cout << "延续任务:任务被取消" << std::endl;}};// 将延续任务添加到父任务的延续列表add->continuations.try_add(task_ptr(cont));// 3. 执行任务(模拟调度器调用)std::cout << "执行任务..." << std::endl;add->vtable->run(add); // 调用 run 方法执行计算// 4. 检查任务状态std::cout << "最终状态: " << (add->state.load() == task_state::completed ? "completed" : "错误") << std::endl; // 输出:最终状态: completed// 5. 手动获取结果(实际中通过上层接口封装)std::cout << "任务结果: " << add->result << std::endl; // 输出:任务结果: 30return 0;
}
输出结果
初始状态: pending
执行任务...
延续任务:计算结果为 30
最终状态: completed
任务结果: 30
关键代码解析
-
VTable 绑定:自定义任务必须实现
task_base_vtable
中的destroy
/run
/cancel
/schedule
函数,并通过vtable
指针绑定,实现动态分派(类似虚函数的多态行为)。 -
状态管理:
- 任务初始状态为
pending
,执行完成后通过state.store(..., std::memory_order_release)
更新为completed
或canceled
,确保多线程可见性。 is_finished
函数判断任务是否进入终态(completed
/canceled
),避免重复处理。
- 任务初始状态为
-
延续任务:
- 通过
continuation_vector::try_add
添加延续任务(线程安全)。 - 任务完成后调用
run_continuations
遍历并执行所有延续任务,通过vtable->schedule
调度(示例中简化为直接执行,实际中由调度器分配线程)。
- 通过
-
异常处理:任务执行中抛出的异常被捕获后,存储到
except
成员,任务状态设为canceled
,延续任务可通过except
获取异常信息。
四、总结
这份代码是 async++ 库的 “骨架”,通过 模板特化、手动 VTable、原子操作、内存对齐 等技术,实现了高效、轻量的异步任务管理:
- 高效性:避免虚函数 overhead、伪共享,通过原子操作和内存屏障保证线程安全的同时最小化性能损耗。
- 灵活性:支持任务嵌套(解包)、延续任务、自定义调度器,适配不同异步场景。
- 鲁棒性:完整的异常处理(捕获执行异常并转为任务取消)、内存管理(引用计数 + 手动析构),避免内存泄漏和异常泄漏。
若需进一步理解,可结合 async++ 库的上层接口(如 task<T>
、shared_task<T>
、then
方法),观察底层 detail
组件如何协同工作。