当前位置: 首页 > news >正文

Async++ 源码分析4---continuation_vector.h

一、Async++ 代码目录结构

Async++ 项目的目录结构清晰,主要包含根目录下的配置文件、源代码目录、头文件目录以及示例代码目录,具体结构如下:

asyncplusplus/
├── .gitignore               # Git 忽略文件配置
├── Async++Config.cmake.in   # CMake 配置模板文件
├── CMakeLists.txt           # CMake 构建脚本
├── LICENSE                  # 许可证文件(MIT 许可证)
├── README.md                # 项目说明文档
├── examples/                # 示例代码目录
│   └── gtk_scheduler.cpp    # GTK 调度器示例
├── src/                     # 源代码目录
│   ├── fifo_queue.h         # FIFO 队列实现
│   ├── internal.h           # 内部头文件(包含类型定义、宏等)
│   ├── scheduler.cpp        # 调度器实现
│   ├── singleton.h          # 单例模式实现
│   ├── task_wait_event.h    # 任务等待事件实现
│   ├── threadpool_scheduler.cpp  # 线程池调度器实现
│   └── work_steal_queue.h   # 工作窃取队列实现
└── include/                 # 头文件目录├── async++.h            # 主头文件(对外提供统一接口)└── async++/             # 子模块头文件目录├── aligned_alloc.h├── cancel.h├── continuation_vector.h├── parallel_for.h├── parallel_invoke.h├── parallel_reduce.h├── partitioner.h    # 分区器相关定义├── range.h          # 范围(迭代器对)相关定义├── ref_count.h├── scheduler.h      # 调度器接口定义├── scheduler_fwd.h├── task.h           # 任务类定义├── task_base.h      # 任务基类定义├── traits.h└── when_all_any.h

二、continuation_vector源码分析

2.1 源码

// Copyright (c) 2015 Amanieu d'Antras
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.#ifndef ASYNCXX_H_
# error "Do not include this header directly, include <async++.h> instead."
#endifnamespace async {
namespace detail {// Compress the flags in the low bits of the pointer if the structures are
// suitably aligned. Fall back to a separate flags variable otherwise.
template<std::uintptr_t Mask, bool Enable>
class compressed_ptr {void* ptr;std::uintptr_t flags;public:compressed_ptr() = default;compressed_ptr(void* ptr_, std::uintptr_t flags_): ptr(ptr_), flags(flags_) {}template<typename T>T* get_ptr() const{return static_cast<T*>(ptr);}std::uintptr_t get_flags() const{return flags;}void set_ptr(void* p){ptr = p;}void set_flags(std::uintptr_t f){flags = f;}
};
template<std::uintptr_t Mask>
class compressed_ptr<Mask, true> {std::uintptr_t data;public:compressed_ptr() = default;compressed_ptr(void* ptr_, std::uintptr_t flags_): data(reinterpret_cast<std::uintptr_t>(ptr_) | flags_) {}template<typename T>T* get_ptr() const{return reinterpret_cast<T*>(data & ~Mask);}std::uintptr_t get_flags() const{return data & Mask;}void set_ptr(void* p){data = reinterpret_cast<std::uintptr_t>(p) | (data & Mask);}void set_flags(std::uintptr_t f){data = (data & ~Mask) | f;}
};// Thread-safe vector of task_ptr which is optimized for the common case of
// only having a single continuation.
class continuation_vector {// Heap-allocated data for the slow pathstruct vector_data {std::vector<task_base*> vector;std::mutex lock;};// Flags to describe the state of the vectorenum flags {// If set, no more changes are allowed to internal_datais_locked = 1,// If set, the pointer is a vector_data* instead of a task_base*. If// there are 0 or 1 elements in the vector, the task_base* form is used.is_vector = 2};static const std::uintptr_t flags_mask = 3;// Embed the two bits in the data if they are suitably aligned. We only// check the alignment of vector_data here because task_base isn't defined// yet. Since we align task_base to LIBASYNC_CACHELINE_SIZE just use that.typedef compressed_ptr<flags_mask, (LIBASYNC_CACHELINE_SIZE & flags_mask) == 0 &&(std::alignment_of<vector_data>::value & flags_mask) == 0> internal_data;// All changes to the internal data are atomicstd::atomic<internal_data> atomic_data;public:// Start unlocked with zero elements in the fast pathcontinuation_vector(){// Workaround for a bug in certain versions of clang with libc++// error: no viable conversion from 'async::detail::compressed_ptr<3, true>' to '_Atomic(async::detail::compressed_ptr<3, true>)'atomic_data.store(internal_data(nullptr, 0), std::memory_order_relaxed);}// Free any left over data~continuation_vector(){// Converting to task_ptr instead of using remove_ref because task_base// isn't defined yet at this point.internal_data data = atomic_data.load(std::memory_order_relaxed);if (data.get_flags() & flags::is_vector) {// No need to lock the mutex, we are the only thread at this pointfor (task_base* i: data.get_ptr<vector_data>()->vector)(task_ptr(i));delete data.get_ptr<vector_data>();} else {// If the data is locked then the inline pointer is already goneif (!(data.get_flags() & flags::is_locked))task_ptr tmp(data.get_ptr<task_base>());}}// Try adding an element to the vector. This fails and returns false if// the vector has been locked. In that case t is not modified.bool try_add(task_ptr&& t){// Cache to avoid re-allocating vector_data multiple times. This is// automatically freed if it is not successfully saved to atomic_data.std::unique_ptr<vector_data> vector;// Compare-exchange loop on atomic_datainternal_data data = atomic_data.load(std::memory_order_relaxed);internal_data new_data;do {// Return immediately if the vector is lockedif (data.get_flags() & flags::is_locked)return false;if (data.get_flags() & flags::is_vector) {// Larger vectors use a mutex, so grab the lockstd::atomic_thread_fence(std::memory_order_acquire);std::lock_guard<std::mutex> locked(data.get_ptr<vector_data>()->lock);// We need to check again if the vector has been locked here// to avoid a race condition with flush_and_lockif (atomic_data.load(std::memory_order_relaxed).get_flags() & flags::is_locked)return false;// Add the element to the vector and returndata.get_ptr<vector_data>()->vector.push_back(t.release());return true;} else {if (data.get_ptr<task_base>()) {// Going from 1 to 2 elements, allocate a vector_dataif (!vector)vector.reset(new vector_data{{data.get_ptr<task_base>(), t.get()}, {}});new_data = {vector.get(), flags::is_vector};} else {// Going from 0 to 1 elementsnew_data = {t.get(), 0};}}} while (!atomic_data.compare_exchange_weak(data, new_data, std::memory_order_release, std::memory_order_relaxed));// If we reach this point then atomic_data was successfully changed.// Since the pointers are now saved in the vector, release them from// the smart pointers.t.release();vector.release();return true;}// Lock the vector and flush all elements through the given functiontemplate<typename Func> void flush_and_lock(Func&& func){// Try to lock the vector using a compare-exchange loopinternal_data data = atomic_data.load(std::memory_order_relaxed);internal_data new_data;do {new_data = data;new_data.set_flags(data.get_flags() | flags::is_locked);} while (!atomic_data.compare_exchange_weak(data, new_data, std::memory_order_acquire, std::memory_order_relaxed));if (data.get_flags() & flags::is_vector) {// If we are using vector_data, lock it and flush all elementsstd::lock_guard<std::mutex> locked(data.get_ptr<vector_data>()->lock);for (auto i: data.get_ptr<vector_data>()->vector)func(task_ptr(i));// Clear the vector to save memory. Note that we don't actually free// the vector_data here because other threads may still be using it.// This isn't a very significant cost since multiple continuations// are relatively rare.data.get_ptr<vector_data>()->vector.clear();} else {// If there is an inline element, just pass it onif (data.get_ptr<task_base>())func(task_ptr(data.get_ptr<task_base>()));}}
};} // namespace detail
} // namespace async

    这段代码是 Async++ 框架底层的任务延续向量(continuation_vector) 实现,位于 async::detail 命名空间,核心作用是线程安全地管理任务的延续列表(即一个任务完成后需要执行的后续任务),并针对 “单延续” 这一常见场景做了性能优化。以下是分层解析:

    2.2. 核心前置概念

    在并发框架中,“任务延续”(Continuation)指一个任务(Task)完成后自动触发的后续任务(例如 task.then([]{...}) 中的回调)。continuation_vector 负责存储和管理这些延续任务,需满足:

    • 线程安全(多线程可能同时添加延续任务);
    • 性能优化(大多数场景只有 1 个延续任务,避免过度使用复杂数据结构);
    • 支持 “锁定”(任务完成后禁止添加新延续,仅执行已有延续)。

    2.3 关键组件解析

    (1)compressed_ptr:指针压缩存储工具

    这是一个模板类,核心功能是将 “指针” 和 “低阶标志位” 压缩到同一块内存中,避免额外的内存开销(尤其适合高频访问的底层组件)。

    模板参数作用
    Mask标志位的掩码(例如 0b11 表示用低 2 位存储标志)
    Enable是否启用压缩(true 则压缩,false 则用独立变量存储指针和标志)

    工作原理:若内存对齐满足要求(例如指针地址的低 2 位恒为 0,因内存按 4 字节对齐),则可将这 2 位空闲位用作 “标志位”,无需额外分配内存存储标志。

    • 压缩模式(Enable=true):用 std::uintptr_t 存储 “指针地址 | 标志位”,通过 data & ~Mask 提取指针,data & Mask 提取标志;
    • 非压缩模式(Enable=false):用独立的 void* ptr 和 std::uintptr_t flags 存储,适合对齐不满足的场景。

    框架中的用途continuation_vector 用 compressed_ptr<3, ...>Mask=0b11,即 2 个标志位)存储 “延续任务的指针 + 向量状态标志”,节省内存并提升缓存命中率。

    (2)continuation_vector:延续向量类

    核心是线程安全的延续任务列表,针对 “0 个 / 1 个延续任务” 的常见场景做了 “快速路径(Fast Path)” 优化,仅在超过 1 个延续任务时才使用堆分配的向量和锁。

    内部状态定义
    • vector_data 结构体:“慢路径” 数据结构,用于存储多个延续任务:

      cpp

      运行

      struct vector_data {std::vector<task_base*> vector;  // 存储多个延续任务的指针std::mutex lock;                 // 保证多线程添加任务的线程安全
      };
      
    • flags 枚举:向量的状态标志(用 compressed_ptr 的低 2 位存储):
      标志值含义
      is_locked向量已锁定(任务完成,禁止添加新延续,仅执行已有延续)
      is_vector向量处于 “多任务模式”(用 vector_data 存储,而非单个任务指针)
    • atomic_data 成员:用 std::atomic<compressed_ptr> 存储向量的核心数据,确保多线程操作的原子性。
    核心方法解析

    continuation_vector 的方法围绕 “添加延续任务” 和 “执行并锁定延续任务” 两大核心逻辑展开,均保证线程安全。

    ① 构造 / 析构:初始化与资源释放
    • 构造函数:初始化 atomic_data 为 “空指针 + 0 标志”(即初始状态:无延续任务、未锁定、未启用多任务模式)。
    • 析构函数:释放所有延续任务的资源(避免内存泄漏):
      • 若处于 “多任务模式”(is_vector):遍历 vector_data 释放所有 task_base*,并删除 vector_data 对象;
      • 若处于 “单任务模式” 且未锁定:释放单个 task_base*
    ② try_add(task_ptr&& t):尝试添加延续任务

    功能:线程安全地向向量中添加一个延续任务,若向量已锁定则返回 false(拒绝添加)。核心逻辑(分路径优化)

    1. 快速路径(0→1 个任务):若当前无延续任务(指针为空、无标志),直接用 compare_exchange_weak 原子更新 atomic_data,将任务指针存入 compressed_ptr,无需锁,性能极高。

    2. 过渡路径(1→2 个任务):若已有 1 个任务,需分配 vector_data 对象,将已有任务和新任务存入 vector_data.vector,并原子更新 atomic_data 为 “vector_data* + is_vector 标志”,完成从 “单任务” 到 “多任务” 的切换。

    3. 慢路径(≥2 个任务):若已处于 “多任务模式”,先加 vector_data.lock 保证线程安全,再将新任务加入 vector,最后释放锁。

    关键保障

    • 用 compare_exchange_weak 实现原子更新,避免多线程竞争;
    • 若添加过程中向量被锁定,直接返回 false,确保任务完成后不再接受新延续。
    ③ flush_and_lock(Func&& func):执行延续任务并锁定向量

    功能:锁定向量(禁止后续添加),遍历所有延续任务并通过传入的 func 执行(例如调度任务到线程池)。核心步骤

    1. 原子锁定:通过 compare_exchange_weak 原子更新 atomic_data 的 is_locked 标志,确保一旦锁定,其他线程无法再添加任务;
    2. 执行延续任务
      • 多任务模式:加锁后遍历 vector_data.vector,将每个 task_base* 包装为 task_ptr 传入 func 执行,最后清空向量(节省内存);
      • 单任务模式:直接将单个 task_base* 传入 func 执行;
    3. 线程安全保障:多任务模式下用 std::lock_guard 保护遍历过程,避免添加任务与执行任务的竞争。

    2.4. 设计亮点

    (1)分层优化:针对常见场景降低开销
    • 快速路径(0/1 个延续):无锁、无堆分配,仅操作原子变量,适配 “大多数任务只有 1 个延续” 的实际场景,性能接近单变量操作;
    • 慢路径(≥2 个延续):仅在必要时使用堆分配和锁,平衡了通用性和性能。
    (2)线程安全与原子操作
    • 所有状态更新(添加任务、锁定)均通过 std::atomic 的原子操作(load/compare_exchange_weak)实现,避免数据竞争;
    • 多任务模式下用 std::mutex 保护向量修改,确保线程安全的同时最小化锁粒度。
    (3)内存高效:指针压缩存储

    通过 compressed_ptr 复用指针的空闲位存储标志,避免额外的内存开销(例如无需用 struct { void* ptr; int flags; } 占用更多字节),提升缓存利用率。

    2.5. 框架中的典型用途

    continuation_vector 是 Async++ 任务模型的底层支撑,例如:

    • 当调用 task.then(cb) 时,cb 会被包装为 task_base 并通过 try_add 存入原任务的 continuation_vector
    • 当原任务执行完成后,框架会调用 flush_and_lock,将所有延续任务调度到线程池执行,并锁定向量禁止后续添加。

    2.6. 关键注意事项

    • 命名空间限制:位于 detail 命名空间,属于框架内部实现,外部不应直接依赖(可能随版本变化);
    • 任务生命周期:依赖 task_ptr(智能指针)管理 task_base 的生命周期,避免内存泄漏;
    • 锁粒度控制:仅在 “多任务添加” 和 “多任务执行” 时加锁,快速路径无锁,最大化并发性能。

    三、Demo 使用 continuation_vector 管理延续任务

    3.1. 定义具体任务类型

    先定义两个实际的任务类(继承 task_base),用于作为 “延续任务”:

    // 任务1:打印字符串
    class PrintTask : public task_base {
    private:std::string msg;
    public:explicit PrintTask(std::string msg_) : msg(std::move(msg_)) {}void execute() const override {std::cout << "[Task] " << msg << " (Thread ID: " << std::this_thread::get_id() << ")\n";}
    };// 任务2:计算并打印结果
    class CalcTask : public task_base {
    private:int a, b;
    public:CalcTask(int a_, int b_) : a(a_), b(b_) {}void execute() const override {int sum = a + b;std::cout << "[Task] " << a << " + " << b << " = " << sum << " (Thread ID: " << std::this_thread::get_id() << ")\n";}
    };
    

    3.2. 多线程场景:测试 continuation_vector 的线程安全

    模拟 “多线程同时添加延续任务”,再触发 “执行所有延续并锁定”:

    int main() {// 1. 创建 continuation_vector 实例(管理延续任务)async::detail::continuation_vector cont_vec;// 2. 多线程同时添加延续任务(测试线程安全)const int THREAD_NUM = 3;  // 3个线程添加任务std::vector<std::thread> add_threads;for (int i = 0; i < THREAD_NUM; ++i) {add_threads.emplace_back([&cont_vec, i]() {// 创建任务(PrintTask 或 CalcTask)task_ptr task;if (i % 2 == 0) {task = task_ptr(new PrintTask("Hello from thread " + std::to_string(i)));} else {task = task_ptr(new CalcTask(i * 10, i * 20));}// 尝试添加任务到 continuation_vectorbool success = cont_vec.try_add(std::move(task));if (success) {std::cout << "Thread " << i << " added task successfully\n";} else {std::cout << "Thread " << i << " failed to add task (vector locked)\n";}});}// 等待所有线程完成添加for (auto& t : add_threads) {t.join();}std::cout << "=== All add threads finished ===\n\n";// 3. 执行所有延续任务并锁定向量(禁止后续添加)std::cout << "=== Executing all continuations ===\n";cont_vec.flush_and_lock([](task_ptr&& task) {// 执行延续任务(调用 task_base::execute())task.get()->execute();});std::cout << "=== All continuations executed ===\n\n";// 4. 尝试添加新任务(向量已锁定,应失败)task_ptr new_task(new PrintTask("Try adding after lock"));bool success = cont_vec.try_add(std::move(new_task));std::cout << "Add task after lock: " << (success ? "Success" : "Failed") << "\n";return 0;
    }
    

    3.3 Demo 输出与解析

    典型输出(线程 ID 可能不同)

    Thread 0 added task successfully
    Thread 1 added task successfully
    Thread 2 added task successfully
    === All add threads finished ====== Executing all continuations ===
    [Task] Hello from thread 0 (Thread ID: 140703372859136)
    [Task] 10 + 20 = 30 (Thread ID: 140703372859136)
    [Task] Hello from thread 2 (Thread ID: 140703372859136)
    === All continuations executed ===Add task after lock: Failed
    

    关键现象解析

    1. 线程安全添加:3 个线程同时调用 try_add,均成功添加任务(continuation_vector 通过原子操作和锁保证线程安全);
    2. 执行延续任务flush_and_lock 遍历所有任务,通过传入的 lambda 调用 execute(),执行所有延续逻辑;
    3. 锁定后禁止添加flush_and_lock 后调用 try_add 返回 false,符合 “任务完成后不再接受新延续” 的设计目标。

    四、核心知识点回顾

    1. 路径优化:Demo 中添加 3 个任务,continuation_vector 会自动从 “单任务快速路径” 切换到 “多任务慢路径”(分配 vector_data 并加锁);
    2. 资源安全task_ptr 确保任务不会内存泄漏(添加失败时自动释放,添加成功后由 continuation_vector 管理,执行后释放);
    3. 锁定语义flush_and_lock 是 “一次性操作”,锁定后向量状态不可逆转,确保延续任务仅执行一次。

    五、注意事项

    1. 依赖框架头文件:实际使用需确保包含 Async++ 的 continuation_vector.h 等头文件,且编译时链接框架库;
    2. 任务类型约束:自定义任务需继承 task_base 并实现 execute(),符合框架底层接口;
    3. 移动语义try_add 要求传入 task_ptr&&(移动语义),避免任务所有权拷贝导致的重复释放。

    通过该 Demo 可直观理解 continuation_vector 如何支撑 Async++ 的 “任务延续” 机制 —— 线程安全管理延续列表,兼顾性能(快速路径)与通用性(慢路径)。

    http://www.dtcms.com/a/438708.html

    相关文章:

  • 一个做搞笑类视频的网站取名网站建设平台协议书
  • 机器学习、数据科学、深度学习、神经网络的区别与联系
  • 上海嘉定网站设计商城首页网站
  • 显示系统(二):显示适配器(显卡)背后的历史
  • 嘉兴网站制作案例无代码制作网页
  • 网站页面seo做网站的语
  • 建设银行注册网站的用户名怎么写建设一个货架网站
  • 排序(Sort)
  • [Web网页] Web 基础
  • 做商铺的网站有那些怎么检查网站的死链
  • 网站做qq发送链接广东省建设监理协会网站 - 首页
  • 操作系统应用开发(十八)RustDesk-API服务器数据库——东方仙盟金丹期
  • 济南小型网站建设厦门人才网唯一官方网站
  • 1518. 换水问题
  • 中国空间站完成了多少404错误页面放在网站的哪里
  • 新媒体运营需要哪些技能seo知识分享
  • Java数据类型与字符串操作全解析
  • linux文件系统学习
  • Effective Python 第37条:用组合类实现多层结构,避免嵌套内置类型
  • C语言计算矩阵的逆
  • 如何传递上层变数到编译过的模组当中
  • 广东十大网站建设外贸公司招聘条件
  • C语言类型转换与错误处理
  • 线上宣传方式昆明网络推广优化
  • 日语学习-日语知识点小记-进阶-JLPT-N1阶段应用练习(3):语法 +考え方16+2022年7月N1
  • 51单片机AD/DA
  • 九.寄生参数对变换器参数设计的影响
  • MapSet练习OJ题讲解(2易2中1难)
  • 1.2.1 RAG:构建你的专属知识库
  • 做网站找个人还是公司电商网站建设的意义