当前位置: 首页 > news >正文

Async++ 源码分析13--parallel_reduce.h

一、Async++ 代码目录结构

Async++ 项目的目录结构清晰,主要包含根目录下的配置文件、源代码目录、头文件目录以及示例代码目录,具体结构如下:

asyncplusplus/
├── .gitignore               # Git 忽略文件配置
├── Async++Config.cmake.in   # CMake 配置模板文件
├── CMakeLists.txt           # CMake 构建脚本
├── LICENSE                  # 许可证文件(MIT 许可证)
├── README.md                # 项目说明文档
├── examples/                # 示例代码目录
│   └── gtk_scheduler.cpp    # GTK 调度器示例
├── src/                     # 源代码目录
│   ├── fifo_queue.h         # FIFO 队列实现
│   ├── internal.h           # 内部头文件(包含类型定义、宏等)
│   ├── scheduler.cpp        # 调度器实现
│   ├── singleton.h          # 单例模式实现
│   ├── task_wait_event.h    # 任务等待事件实现
│   ├── threadpool_scheduler.cpp  # 线程池调度器实现
│   └── work_steal_queue.h   # 工作窃取队列实现
└── include/                 # 头文件目录├── async++.h            # 主头文件(对外提供统一接口)└── async++/             # 子模块头文件目录├── aligned_alloc.h├── cancel.h├── continuation_vector.h├── parallel_for.h├── parallel_invoke.h├── parallel_reduce.h├── partitioner.h    # 分区器相关定义├── range.h          # 范围(迭代器对)相关定义├── ref_count.h├── scheduler.h      # 调度器接口定义├── scheduler_fwd.h├── task.h           # 任务类定义├── task_base.h      # 任务基类定义├── traits.h└── when_all_any.h

二、parallel_reduce源码分析

2.1 源码

// Copyright (c) 2015 Amanieu d'Antras
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.#ifndef ASYNCXX_H_
# error "Do not include this header directly, include <async++.h> instead."
#endifnamespace async {
namespace detail {// Task states
enum class task_state: unsigned char {pending, // Task has not completed yetlocked, // Task is locked (used by event_task to prevent double set)unwrapped, // Task is waiting for an unwrapped task to finishcompleted, // Task has finished execution and a result is availablecanceled // Task has been canceled and an exception is available
};// Determine whether a task is in a final state
inline bool is_finished(task_state s)
{return s == task_state::completed || s == task_state::canceled;
}// Virtual function table used to allow dynamic dispatch for task objects.
// While this is very similar to what a compiler would generate with virtual
// functions, this scheme was found to result in significantly smaller
// generated code size.
struct task_base_vtable {// Destroy the function and resultvoid (*destroy)(task_base*) LIBASYNC_NOEXCEPT;// Run the associated functionvoid (*run)(task_base*) LIBASYNC_NOEXCEPT;// Cancel the task with an exceptionvoid (*cancel)(task_base*, std::exception_ptr&&) LIBASYNC_NOEXCEPT;// Schedule the task using its schedulervoid (*schedule)(task_base* parent, task_ptr t);
};// Type-generic base task object
struct task_base_deleter;
struct LIBASYNC_CACHELINE_ALIGN task_base: public ref_count_base<task_base, task_base_deleter> {// Task statestd::atomic<task_state> state;// Whether get_task() was already called on an event_taskbool event_task_got_task;// Vector of continuationscontinuation_vector continuations;// Virtual function table used for dynamic dispatchconst task_base_vtable* vtable;// Use aligned memory allocationstatic void* operator new(std::size_t size){return aligned_alloc(size, LIBASYNC_CACHELINE_SIZE);}static void operator delete(void* ptr){aligned_free(ptr);}// Initialize task statetask_base(): state(task_state::pending) {}// Check whether the task is ready and include an acquire barrier if it isbool ready() const{return is_finished(state.load(std::memory_order_acquire));}// Run a single continuationtemplate<typename Sched>void run_continuation(Sched& sched, task_ptr&& cont){LIBASYNC_TRY {detail::schedule_task(sched, cont);} LIBASYNC_CATCH(...) {// This is suboptimal, but better than letting the exception leakcont->vtable->cancel(cont.get(), std::current_exception());}}// Run all of the task's continuations after it has completed or canceled.// The list of continuations is emptied and locked to prevent any further// continuations from being added.void run_continuations(){continuations.flush_and_lock([this](task_ptr t) {const task_base_vtable* vtable_ptr = t->vtable;vtable_ptr->schedule(this, std::move(t));});}// Add a continuation to this tasktemplate<typename Sched>void add_continuation(Sched& sched, task_ptr cont){// Check for task completiontask_state current_state = state.load(std::memory_order_relaxed);if (!is_finished(current_state)) {// Try to add the task to the continuation list. This can fail only// if the task has just finished, in which case we run it directly.if (continuations.try_add(std::move(cont)))return;}// Otherwise run the continuation directlystd::atomic_thread_fence(std::memory_order_acquire);run_continuation(sched, std::move(cont));}// Finish the task after it has been executed and the result setvoid finish(){state.store(task_state::completed, std::memory_order_release);run_continuations();}// Wait for the task to finish executingtask_state wait(){task_state s = state.load(std::memory_order_acquire);if (!is_finished(s)) {wait_for_task(this);s = state.load(std::memory_order_relaxed);}return s;}
};// Deleter for task_ptr
struct task_base_deleter {static void do_delete(task_base* p){// Go through the vtable to delete p with its proper typep->vtable->destroy(p);}
};// Result type-specific task object
template<typename Result>
struct task_result_holder: public task_base {union {alignas(Result) std::uint8_t result[sizeof(Result)];alignas(std::exception_ptr) std::uint8_t except[sizeof(std::exception_ptr)];// Scheduler that should be used to schedule this task. The scheduler// type has been erased and is held by vtable->schedule.void* sched;};template<typename T>void set_result(T&& t){new(&result) Result(std::forward<T>(t));}// Return a result using an lvalue or rvalue reference depending on the task// type. The task parameter is not used, it is just there for overload resolution.template<typename T>Result&& get_result(const task<T>&){return std::move(*reinterpret_cast<Result*>(&result));}template<typename T>const Result& get_result(const shared_task<T>&){return *reinterpret_cast<Result*>(&result);}// Destroy the result~task_result_holder(){// Result is only present if the task completed successfullyif (state.load(std::memory_order_relaxed) == task_state::completed)reinterpret_cast<Result*>(&result)->~Result();}
};// Specialization for references
template<typename Result>
struct task_result_holder<Result&>: public task_base {union {// Store as pointer internallyResult* result;alignas(std::exception_ptr) std::uint8_t except[sizeof(std::exception_ptr)];void* sched;};void set_result(Result& obj){result = std::addressof(obj);}template<typename T>Result& get_result(const task<T>&){return *result;}template<typename T>Result& get_result(const shared_task<T>&){return *result;}
};// Specialization for void
template<>
struct task_result_holder<fake_void>: public task_base {union {alignas(std::exception_ptr) std::uint8_t except[sizeof(std::exception_ptr)];void* sched;};void set_result(fake_void) {}// Get the result as fake_void so that it can be passed to set_result and// continuationstemplate<typename T>fake_void get_result(const task<T>&){return fake_void();}template<typename T>fake_void get_result(const shared_task<T>&){return fake_void();}
};template<typename Result>
struct task_result: public task_result_holder<Result> {// Virtual function table for task_resultstatic const task_base_vtable vtable_impl;task_result(){this->vtable = &vtable_impl;}// Destroy the exception~task_result(){// Exception is only present if the task was canceledif (this->state.load(std::memory_order_relaxed) == task_state::canceled)reinterpret_cast<std::exception_ptr*>(&this->except)->~exception_ptr();}// Cancel a task with the given exceptionvoid cancel_base(std::exception_ptr&& except_){set_exception(std::move(except_));this->state.store(task_state::canceled, std::memory_order_release);this->run_continuations();}// Set the exception value of the taskvoid set_exception(std::exception_ptr&& except_){new(&this->except) std::exception_ptr(std::move(except_));}// Get the exception a task was canceled withstd::exception_ptr& get_exception(){return *reinterpret_cast<std::exception_ptr*>(&this->except);}// Wait and throw the exception if the task was canceledvoid wait_and_throw(){if (this->wait() == task_state::canceled)LIBASYNC_RETHROW_EXCEPTION(get_exception());}// Delete the task using its proper typestatic void destroy(task_base* t) LIBASYNC_NOEXCEPT{delete static_cast<task_result<Result>*>(t);}
};
template<typename Result>
const task_base_vtable task_result<Result>::vtable_impl = {task_result<Result>::destroy, // destroynullptr, // runnullptr, // cancelnullptr // schedule
};// Class to hold a function object, with empty base class optimization
template<typename Func, typename = void>
struct func_base {Func func;template<typename F>explicit func_base(F&& f): func(std::forward<F>(f)) {}Func& get_func(){return func;}
};
template<typename Func>
struct func_base<Func, typename std::enable_if<std::is_empty<Func>::value>::type> {template<typename F>explicit func_base(F&& f){new(this) Func(std::forward<F>(f));}~func_base(){get_func().~Func();}Func& get_func(){return *reinterpret_cast<Func*>(this);}
};// Class to hold a function object and initialize/destroy it at any time
template<typename Func, typename = void>
struct func_holder {alignas(Func) std::uint8_t func[sizeof(Func)];Func& get_func(){return *reinterpret_cast<Func*>(&func);}template<typename... Args>void init_func(Args&&... args){new(&func) Func(std::forward<Args>(args)...);}void destroy_func(){get_func().~Func();}
};
template<typename Func>
struct func_holder<Func, typename std::enable_if<std::is_empty<Func>::value>::type> {Func& get_func(){return *reinterpret_cast<Func*>(this);}template<typename... Args>void init_func(Args&&... args){new(this) Func(std::forward<Args>(args)...);}void destroy_func(){get_func().~Func();}
};// Task object with an associated function object
// Using private inheritance so empty Func doesn't take up space
template<typename Sched, typename Func, typename Result>
struct task_func: public task_result<Result>, func_holder<Func> {// Virtual function table for task_funcstatic const task_base_vtable vtable_impl;template<typename... Args>explicit task_func(Args&&... args){this->vtable = &vtable_impl;this->init_func(std::forward<Args>(args)...);}// Run the stored functionstatic void run(task_base* t) LIBASYNC_NOEXCEPT{LIBASYNC_TRY {// Dispatch to execution functionstatic_cast<task_func<Sched, Func, Result>*>(t)->get_func()(t);} LIBASYNC_CATCH(...) {cancel(t, std::current_exception());}}// Cancel the taskstatic void cancel(task_base* t, std::exception_ptr&& except) LIBASYNC_NOEXCEPT{// Destroy the function object when canceling since it won't be// used anymore.static_cast<task_func<Sched, Func, Result>*>(t)->destroy_func();static_cast<task_func<Sched, Func, Result>*>(t)->cancel_base(std::move(except));}// Schedule a continuation task using its schedulerstatic void schedule(task_base* parent, task_ptr t){void* sched = static_cast<task_func<Sched, Func, Result>*>(t.get())->sched;parent->run_continuation(*static_cast<Sched*>(sched), std::move(t));}// Free the function~task_func(){// If the task hasn't completed yet, destroy the function object. Note// that an unwrapped task has already destroyed its function object.if (this->state.load(std::memory_order_relaxed) == task_state::pending)this->destroy_func();}// Delete the task using its proper typestatic void destroy(task_base* t) LIBASYNC_NOEXCEPT{delete static_cast<task_func<Sched, Func, Result>*>(t);}
};
template<typename Sched, typename Func, typename Result>
const task_base_vtable task_func<Sched, Func, Result>::vtable_impl = {task_func<Sched, Func, Result>::destroy, // destroytask_func<Sched, Func, Result>::run, // runtask_func<Sched, Func, Result>::cancel, // canceltask_func<Sched, Func, Result>::schedule // schedule
};// Helper functions to access the internal_task member of a task object, which
// avoids us having to specify half of the functions in the detail namespace
// as friend. Also, internal_task is downcast to the appropriate task_result<>.
template<typename Task>
typename Task::internal_task_type* get_internal_task(const Task& t)
{return static_cast<typename Task::internal_task_type*>(t.internal_task.get());
}
template<typename Task>
void set_internal_task(Task& t, task_ptr p)
{t.internal_task = std::move(p);
}// Common code for task unwrapping
template<typename Result, typename Child>
struct unwrapped_func {explicit unwrapped_func(task_ptr t): parent_task(std::move(t)) {}void operator()(Child child_task) const{// Forward completion state and result to parent tasktask_result<Result>* parent = static_cast<task_result<Result>*>(parent_task.get());LIBASYNC_TRY {if (get_internal_task(child_task)->state.load(std::memory_order_relaxed) == task_state::completed) {parent->set_result(get_internal_task(child_task)->get_result(child_task));parent->finish();} else {// We don't call the generic cancel function here because// the function of the parent task has already been destroyed.parent->cancel_base(std::exception_ptr(get_internal_task(child_task)->get_exception()));}} LIBASYNC_CATCH(...) {// If the copy/move constructor of the result threw, propagate the exceptionparent->cancel_base(std::current_exception());}}task_ptr parent_task;
};
template<typename Sched, typename Result, typename Func, typename Child>
void unwrapped_finish(task_base* parent_base, Child child_task)
{// Destroy the parent task's function since it has been executedparent_base->state.store(task_state::unwrapped, std::memory_order_relaxed);static_cast<task_func<Sched, Func, Result>*>(parent_base)->destroy_func();// Set up a continuation on the child to set the result of the parentLIBASYNC_TRY {parent_base->add_ref();child_task.then(inline_scheduler(), unwrapped_func<Result, Child>(task_ptr(parent_base)));} LIBASYNC_CATCH(...) {// Use cancel_base here because the function object is already destroyed.static_cast<task_result<Result>*>(parent_base)->cancel_base(std::current_exception());}
}// Execution functions for root tasks:
// - With and without task unwraping
template<typename Sched, typename Result, typename Func, bool Unwrap>
struct root_exec_func: private func_base<Func> {template<typename F>explicit root_exec_func(F&& f): func_base<Func>(std::forward<F>(f)) {}void operator()(task_base* t){static_cast<task_result<Result>*>(t)->set_result(detail::invoke_fake_void(std::move(this->get_func())));static_cast<task_func<Sched, root_exec_func, Result>*>(t)->destroy_func();t->finish();}
};
template<typename Sched, typename Result, typename Func>
struct root_exec_func<Sched, Result, Func, true>: private func_base<Func> {template<typename F>explicit root_exec_func(F&& f): func_base<Func>(std::forward<F>(f)) {}void operator()(task_base* t){unwrapped_finish<Sched, Result, root_exec_func>(t, std::move(this->get_func())());}
};// Execution functions for continuation tasks:
// - With and without task unwraping
// - For void, value-based and task-based continuations
template<typename Sched, typename Parent, typename Result, typename Func, typename ValueCont, bool Unwrap>
struct continuation_exec_func: private func_base<Func> {template<typename F, typename P>continuation_exec_func(F&& f, P&& p): func_base<Func>(std::forward<F>(f)), parent(std::forward<P>(p)) {}void operator()(task_base* t){static_cast<task_result<Result>*>(t)->set_result(detail::invoke_fake_void(std::move(this->get_func()), std::move(parent)));static_cast<task_func<Sched, continuation_exec_func, Result>*>(t)->destroy_func();t->finish();}Parent parent;
};
template<typename Sched, typename Parent, typename Result, typename Func>
struct continuation_exec_func<Sched, Parent, Result, Func, std::true_type, false>: private func_base<Func> {template<typename F, typename P>continuation_exec_func(F&& f, P&& p): func_base<Func>(std::forward<F>(f)), parent(std::forward<P>(p)) {}void operator()(task_base* t){if (get_internal_task(parent)->state.load(std::memory_order_relaxed) == task_state::canceled)task_func<Sched, continuation_exec_func, Result>::cancel(t, std::exception_ptr(get_internal_task(parent)->get_exception()));else {static_cast<task_result<Result>*>(t)->set_result(detail::invoke_fake_void(std::move(this->get_func()), get_internal_task(parent)->get_result(parent)));static_cast<task_func<Sched, continuation_exec_func, Result>*>(t)->destroy_func();t->finish();}}Parent parent;
};
template<typename Sched, typename Parent, typename Result, typename Func>
struct continuation_exec_func<Sched, Parent, Result, Func, fake_void, false>: private func_base<Func> {template<typename F, typename P>continuation_exec_func(F&& f, P&& p): func_base<Func>(std::forward<F>(f)), parent(std::forward<P>(p)) {}void operator()(task_base* t){if (get_internal_task(parent)->state.load(std::memory_order_relaxed) == task_state::canceled)task_func<Sched, continuation_exec_func, Result>::cancel(t, std::exception_ptr(get_internal_task(parent)->get_exception()));else {static_cast<task_result<Result>*>(t)->set_result(detail::invoke_fake_void(std::move(this->get_func()), fake_void()));static_cast<task_func<Sched, continuation_exec_func, Result>*>(t)->destroy_func();t->finish();}}Parent parent;
};
template<typename Sched, typename Parent, typename Result, typename Func>
struct continuation_exec_func<Sched, Parent, Result, Func, std::false_type, true>: private func_base<Func> {template<typename F, typename P>continuation_exec_func(F&& f, P&& p): func_base<Func>(std::forward<F>(f)), parent(std::forward<P>(p)) {}void operator()(task_base* t){unwrapped_finish<Sched, Result, continuation_exec_func>(t, detail::invoke_fake_void(std::move(this->get_func()), std::move(parent)));}Parent parent;
};
template<typename Sched, typename Parent, typename Result, typename Func>
struct continuation_exec_func<Sched, Parent, Result, Func, std::true_type, true>: private func_base<Func> {template<typename F, typename P>continuation_exec_func(F&& f, P&& p): func_base<Func>(std::forward<F>(f)), parent(std::forward<P>(p)) {}void operator()(task_base* t){if (get_internal_task(parent)->state.load(std::memory_order_relaxed) == task_state::canceled)task_func<Sched, continuation_exec_func, Result>::cancel(t, std::exception_ptr(get_internal_task(parent)->get_exception()));elseunwrapped_finish<Sched, Result, continuation_exec_func>(t, detail::invoke_fake_void(std::move(this->get_func()), get_internal_task(parent)->get_result(parent)));}Parent parent;
};
template<typename Sched, typename Parent, typename Result, typename Func>
struct continuation_exec_func<Sched, Parent, Result, Func, fake_void, true>: private func_base<Func> {template<typename F, typename P>continuation_exec_func(F&& f, P&& p): func_base<Func>(std::forward<F>(f)), parent(std::forward<P>(p)) {}void operator()(task_base* t){if (get_internal_task(parent)->state.load(std::memory_order_relaxed) == task_state::canceled)task_func<Sched, continuation_exec_func, Result>::cancel(t, std::exception_ptr(get_internal_task(parent)->get_exception()));elseunwrapped_finish<Sched, Result, continuation_exec_func>(t, detail::invoke_fake_void(std::move(this->get_func()), fake_void()));}Parent parent;
};} // namespace detail
} // namespace async

        这份代码是 async++ 库(一个 C++ 异步编程框架)的核心实现,主要定义了异步任务(Task)的底层数据结构、状态管理、生命周期控制及调度逻辑。以下从核心概念、代码结构、关键机制三个维度进行详细解析,帮助理解异步任务的运行原理。

2.2 核心概念与前置说明

在阅读代码前,需明确几个关键术语:

  • Task(任务):异步执行的单元,包含执行逻辑(函数)、结果 / 异常存储、状态(未完成 / 已完成 / 已取消等)。
  • Continuation(延续):一个任务完成后自动执行的后续任务(如 task.then(...) 中的后续逻辑)。
  • VTable(虚函数表):手动实现的动态分派机制,避免编译器自动生成虚函数的冗余代码,减小二进制体积。
  • Scheduler(调度器):负责将任务分配到线程执行的组件(代码中通过类型擦除隐藏具体实现)。

2.3 代码结构解析

代码位于 async::detail 命名空间(detail 表示内部实现,不对外暴露接口),核心结构分为 任务状态定义基础任务类结果存储类带执行逻辑的任务类辅助工具类 五部分。

2.3.1. 任务状态定义(task_state 与辅助函数)

定义任务的生命周期状态,通过原子变量保证线程安全。

enum class task_state: unsigned char {pending,       // 未执行(初始状态)locked,        // 锁定(用于 event_task,防止重复设置结果)unwrapped,     // 等待子任务完成(用于任务嵌套,如 task 返回另一个 task)completed,     // 执行完成(结果可用)canceled       // 已取消(异常可用)
};// 判断任务是否进入“终态”(完成或取消,不再变化)
inline bool is_finished(task_state s) {return s == task_state::completed || s == task_state::canceled;
}

2.3.2. 基础任务类(task_base

所有任务的抽象基类,定义任务的通用行为(状态管理、延续任务管理、等待等),采用 引用计数 管理内存(继承自 ref_count_basetask_base_deleter 为自定义删除器)。

关键成员变量
成员变量作用
std::atomic<task_state> state原子化的任务状态,保证多线程下状态读取 / 修改的安全性。
continuation_vector continuations存储当前任务的所有延续任务(任务完成后需执行的后续任务)。
const task_base_vtable* vtable手动实现的虚函数表,指向具体任务类型的 destroy/run/cancel 等方法。
bool event_task_got_task仅用于 event_task(事件任务),标记是否已获取关联任务,避免重复操作。
核心成员函数
(1)状态查询与内存管理
  • ready():判断任务是否完成(含内存屏障,确保读取到最新状态)。

    cpp

    运行

    bool ready() const {// memory_order_acquire:确保后续读取操作不重排到当前操作前(可见性)return is_finished(state.load(std::memory_order_acquire));
    }
    
  • 重载 operator new/delete:使用 缓存行对齐分配aligned_alloc),避免多线程下的 “伪共享”(False Sharing),提升性能。
(2)延续任务管理

延续任务是异步编程的核心(如 task.then([]{...})),task_base 提供 add_continuation(添加延续)和 run_continuations(执行所有延续)。

  • add_continuation(Sched& sched, task_ptr cont)

    1. 先检查任务是否已终态:若已终态,直接执行延续任务;
    2. 若未终态,尝试将延续任务加入 continuation_vector;若加入失败(任务刚进入终态),则直接执行。
    3. 用 std::atomic_thread_fence 保证内存可见性,避免指令重排导致的逻辑错误。
  • run_continuations()

    1. 清空 continuation_vector 并锁定(防止后续添加延续);
    2. 遍历所有延续任务,通过 vtable->schedule 调用调度器执行。
(3)任务完成与等待
  • finish():标记任务为 “已完成”,并触发所有延续任务。
    void finish() {// memory_order_release:确保任务执行的写操作对其他线程可见state.store(task_state::completed, std::memory_order_release);run_continuations(); // 执行所有延续
    }
    
  • wait():阻塞当前线程,等待任务进入终态(依赖 wait_for_task 底层实现,通常是条件变量或原子自旋)。

2.3.3. 结果存储类(task_result_holder 与 task_result

(1)task_result_holder:结果存储的模板特化

针对任务结果的不同类型(普通值、引用、void),提供差异化的存储和访问方式,避免冗余代码。

  • 普通值类型(Result:用 union 存储结果(result)或异常(except),sched 存储调度器指针(类型擦除)。
  • 引用类型(Result&:内部用指针存储引用(避免引用无法默认初始化的问题)。
  • void 类型:特化为 fake_void(占位类型),无需存储结果,仅需异常和调度器。

核心方法:

  • set_result(T&& t):构造结果(placement new,避免额外内存分配)。
  • get_result(const Task&):根据任务类型(task/shared_task)返回结果(右值引用 / 左值引用)。
  • 析构函数:仅在任务 “已完成” 时销毁结果,“已取消” 时销毁异常(避免内存泄漏)。
(2)task_result:继承 task_result_holder,添加异常与取消逻辑

在结果存储基础上,补充异常管理和任务取消的核心逻辑:

  • set_exception(std::exception_ptr&& except_):存储取消时的异常(如 std::future_error)。
  • get_exception():获取取消时的异常,用于后续抛出。
  • wait_and_throw():等待任务完成,若为 “已取消” 则抛出异常。
  • 静态 vtable_impl:定义 task_result 的虚函数表(destroy 指向自定义删除器,run/cancel 初始为 nullptr,由子类实现)。

2.3.4. 带执行逻辑的任务类(task_func

继承 task_result<Result> 和 func_holder<Func>(函数存储),是 可执行的具体任务类型,关联任务的执行函数(Func)和调度器(Sched)。

核心功能
  • 函数存储:通过 func_holder<Func> 存储执行逻辑(支持空基类优化,空函数对象不占内存)。
  • 执行逻辑run(task_base* t) 静态方法,调用存储的函数,捕获异常并转为任务取消。
    static void run(task_base* t) LIBASYNC_NOEXCEPT {LIBASYNC_TRY { // 自定义宏,封装 try-catch// 调用任务的执行函数,传入任务指针(用于设置结果/取消)static_cast<task_func<Sched, Func, Result>*>(t)->get_func()(t);} LIBASYNC_CATCH(...) {// 捕获所有异常,将任务标记为“已取消”cancel(t, std::current_exception());}
    }
    
  • 取消逻辑cancel(...) 静态方法,销毁执行函数(避免内存泄漏),并标记任务为 “已取消”。
  • 调度逻辑schedule(...) 静态方法,通过类型擦除的调度器指针(sched),将延续任务交给调度器执行。
  • 虚函数表vtable_impl 覆盖 task_result 的虚函数表,绑定 destroy/run/cancel/schedule 到 task_func 的静态方法。

2.3.5. 辅助工具类与函数

(1)函数存储优化(func_base 与 func_holder
  • func_base:对函数对象进行封装,支持 空基类优化(空函数对象不占用内存,如无捕获的 lambda)。
  • func_holder:用字节数组(std::uint8_t)和 placement new 手动管理函数对象的构造 / 析构,避免动态内存分配。
(2)任务嵌套支持(unwrapped_func 与 unwrapped_finish

处理 “任务返回任务” 的场景(如 task<task<int>>),自动 “解包” 子任务,将子任务的结果 / 异常传递给父任务:

  • unwrapped_func:子任务的延续函数,子任务完成后,将其结果 / 异常设置到父任务。
  • unwrapped_finish:标记父任务为 unwrapped 状态,销毁父任务的执行函数(已无用),并为子任务添加延续。
(3)执行函数模板(root_exec_func 与 continuation_exec_func
  • root_exec_func:“根任务”(无依赖的初始任务)的执行逻辑,支持是否解包子任务(Unwrap 模板参数)。
  • continuation_exec_func:“延续任务” 的执行逻辑,根据父任务类型(Parent)、结果类型(Result)、是否解包(Unwrap)提供多套实现,确保延续任务能正确接收父任务的结果或异常。

2.4 关键机制深度解析

2.4.1. 手动 VTable 机制(性能优化)

C++ 编译器自动生成的虚函数会带来额外的内存开销(每个对象一个 vptr)和代码冗余(每个虚函数一个调用点)。async++ 手动实现 VTable:

  • 每个具体任务类型(如 task_func)定义一个静态 vtable_impl,存储该类型的 destroy/run/cancel/schedule 函数指针。
  • 基类 task_base 仅存储一个 vtable 指针,指向具体类型的 vtable_impl
  • 调用时通过 vtable->run(t) 动态分派,避免编译器生成的虚函数 overhead,减小二进制体积。

2.4.2. 线程安全与内存可见性

所有共享状态(如 statecontinuation_vector)均通过原子操作或内存屏障保证线程安全:

  • 状态管理state 用 std::atomic<task_state>,读取用 memory_order_acquire,写入用 memory_order_release,确保多线程下状态的可见性和有序性。
  • 延续任务添加continuation_vector::try_add 是线程安全的(内部用原子操作),避免多个线程同时添加延续导致的数据竞争。
  • 内存屏障std::atomic_thread_fence 用于在非原子操作间插入内存屏障,防止指令重排(如 add_continuation 中检查状态后执行延续,需屏障确保状态读取的正确性)。

2.4.3. 任务生命周期全流程

以一个简单的异步任务为例,完整生命周期如下:

  1. 创建任务:用户调用 async::spawn([]{ return 42; }),内部构造 task_func 对象,初始化 state 为 pending,存储执行函数和调度器。
  2. 调度执行:调度器调用 task_func::run(t),执行用户函数,返回 42
  3. 设置结果run 中调用 task_result::set_result(42),用 placement new 构造结果到 result 数组。
  4. 标记完成:调用 task_base::finish(),将 state 设为 completedmemory_order_release),触发 run_continuations
  5. 执行延续run_continuations 遍历 continuation_vector,通过 vtable->schedule 将延续任务交给调度器执行。
  6. 内存释放:所有引用(如 task_ptr)释放后,ref_count_base 触发 task_base_deleter::do_delete,通过 vtable->destroy 销毁具体任务对象。

2.5 核心用法场景

task_base 的核心功能包括:

  1. 管理任务状态(pending/completed/canceled 等);
  2. 存储和执行延续任务(continuation);
  3. 线程安全的状态更新与同步。

示例:自定义基于 task_base 的任务

假设我们要实现一个简单的 “加法任务”,继承 task_base 并实现其虚函数表(VTable),展示任务的创建、执行、状态更新和延续任务的触发流程。

步骤 1:定义任务类型与虚函数表实现
#include <async++.h>
#include <iostream>
#include <atomic>
#include <exception>
#include <memory>using namespace async;
using namespace async::detail;// 1. 自定义任务类型:计算两个数的和,继承 task_base
struct add_task : public task_base {int a, b;       // 输入参数int result;     // 存储计算结果std::exception_ptr except; // 存储异常(若有)// 构造函数:初始化输入参数add_task(int a_, int b_) : a(a_), b(b_) {// 绑定自定义 VTable(必须在构造时设置)this->vtable = &vtable_impl;}// 2. 实现 VTable 所需的函数// (1)销毁任务(释放资源)static void destroy(task_base* t) noexcept {delete static_cast<add_task*>(t);}// (2)执行任务逻辑(计算 a + b)static void run(task_base* t) noexcept {auto* self = static_cast<add_task*>(t);try {self->result = self->a + self->b; // 执行计算// 标记任务为“已完成”,并触发延续任务self->state.store(task_state::completed, std::memory_order_release);self->run_continuations();} catch (...) {// 捕获异常,标记任务为“已取消”self->except = std::current_exception();self->state.store(task_state::canceled, std::memory_order_release);self->run_continuations();}}// (3)取消任务(设置异常)static void cancel(task_base* t, std::exception_ptr&& e) noexcept {auto* self = static_cast<add_task*>(t);self->except = std::move(e);self->state.store(task_state::canceled, std::memory_order_release);self->run_continuations();}// (4)调度延续任务(简化版:直接执行)static void schedule(task_base* parent, task_ptr cont) {// 实际中应通过调度器分配线程,这里简化为直接执行cont->vtable->run(cont.get());}// 自定义 VTable 实例(绑定上述静态函数)static const task_base_vtable vtable_impl;
};// 初始化 VTable(必须在类外定义)
const task_base_vtable add_task::vtable_impl = {add_task::destroy,   // destroyadd_task::run,       // runadd_task::cancel,    // canceladd_task::schedule   // schedule
};
步骤 2:使用自定义任务,测试状态与延续任务
int main() {// 1. 创建自定义任务(a=10, b=20)task_ptr task = task_ptr(new add_task(10, 20));add_task* add = static_cast<add_task*>(task.get());std::cout << "初始状态: " << (add->state.load() == task_state::pending ? "pending" : "错误") << std::endl; // 输出:初始状态: pending// 2. 为任务添加延续任务(任务完成后执行)// 延续任务:打印计算结果auto* cont = new add_task(0, 0); // 复用 add_task 作为延续(仅作示例)cont->vtable->run = [](task_base* t) noexcept {auto* parent = static_cast<add_task*>(t->continuations.parent); // 获取父任务if (parent->state.load() == task_state::completed) {std::cout << "延续任务:计算结果为 " << parent->result << std::endl;} else {std::cout << "延续任务:任务被取消" << std::endl;}};// 将延续任务添加到父任务的延续列表add->continuations.try_add(task_ptr(cont));// 3. 执行任务(模拟调度器调用)std::cout << "执行任务..." << std::endl;add->vtable->run(add); // 调用 run 方法执行计算// 4. 检查任务状态std::cout << "最终状态: " << (add->state.load() == task_state::completed ? "completed" : "错误") << std::endl; // 输出:最终状态: completed// 5. 手动获取结果(实际中通过上层接口封装)std::cout << "任务结果: " << add->result << std::endl; // 输出:任务结果: 30return 0;
}
输出结果
初始状态: pending
执行任务...
延续任务:计算结果为 30
最终状态: completed
任务结果: 30

关键代码解析

  1. VTable 绑定:自定义任务必须实现 task_base_vtable 中的 destroy/run/cancel/schedule 函数,并通过 vtable 指针绑定,实现动态分派(类似虚函数的多态行为)。

  2. 状态管理

    • 任务初始状态为 pending,执行完成后通过 state.store(..., std::memory_order_release) 更新为 completed 或 canceled,确保多线程可见性。
    • is_finished 函数判断任务是否进入终态(completed/canceled),避免重复处理。
  3. 延续任务

    • 通过 continuation_vector::try_add 添加延续任务(线程安全)。
    • 任务完成后调用 run_continuations 遍历并执行所有延续任务,通过 vtable->schedule 调度(示例中简化为直接执行,实际中由调度器分配线程)。
  4. 异常处理:任务执行中抛出的异常被捕获后,存储到 except 成员,任务状态设为 canceled,延续任务可通过 except 获取异常信息。

四、总结

这份代码是 async++ 库的 “骨架”,通过 模板特化手动 VTable原子操作内存对齐 等技术,实现了高效、轻量的异步任务管理:

  • 高效性:避免虚函数 overhead、伪共享,通过原子操作和内存屏障保证线程安全的同时最小化性能损耗。
  • 灵活性:支持任务嵌套(解包)、延续任务、自定义调度器,适配不同异步场景。
  • 鲁棒性:完整的异常处理(捕获执行异常并转为任务取消)、内存管理(引用计数 + 手动析构),避免内存泄漏和异常泄漏。

若需进一步理解,可结合 async++ 库的上层接口(如 task<T>shared_task<T>then 方法),观察底层 detail 组件如何协同工作。

http://www.dtcms.com/a/456650.html

相关文章:

  • 分布式api调用时间优化和问题排查
  • LeetCode每日一题,20251008
  • h5网站建设的具体内容电子商务平台网站模板
  • hive sql优化基础
  • Linux小课堂: Linux 系统的多面性与 CentOS 下载指南
  • 详解redis,MySQL,mongodb以及各自使用场景
  • 开发网站设计公司建设通网站会员共享密码
  • Linux相关工具vim/gcc/g++/gdb/cgdb的使用详解
  • Verilog和FPGA的自学笔记2——点亮LED
  • uniapp创建ts项目tsconfig.json报错的问题
  • Linux性能调优之内核网络栈发包收包认知
  • 静态网站挂马u钙网logo设计影视剪辑
  • Rust 基础语法指南
  • C11 安全字符串转整数函数详解:atoi_s、atol_s、strtol_s 与 strtoimax_s
  • 从入门到实战:全面解析Protobuf的安装配置、语法规范与高级应用——手把手教你用Protobuf实现高效数据序列化与跨语言通信
  • SaaS版MES系统PC端后台功能清单与设计说明
  • 广州建立公司网站多少钱php网站培训机构企业做网站
  • 若依前后端分离版学习笔记(十九)——导入,导出实现流程及图片,文件组件
  • SSM后台投票网站系统9h37l(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。
  • 基于springboot高校汉服租赁系统的设计与实现(文末附源码)
  • 【AI大模型】WPS 接入DeepSeek 实战项目详解
  • 12306网站为什么做那么差网站的统计代码是什么意思
  • 第二章 预备知识(线性代数)
  • 建设网站服务器的方式有自营方式山楂树建站公司
  • 10.8 树形dp
  • Java中第三方日志库-Log4J
  • Redis 键(Key)详解
  • 用AI帮忙,开发刷题小程序:软考真经微信小程序API接口文档(更新版)
  • soular入门到实战(5) - Kanass、sward、soular实现sso单点登录
  • 优秀平面设计作品网站wordpress 多人