CppCon 2015 学习:C++ Coroutines
1. 核心思想:C++ 协程的“负开销抽象”
- C++ 协程(Coroutines)是 C++20 引入的语言特性,允许以更自然、结构化的方式编写异步代码或生成器。
- 通过协程,程序结构更清晰,而且性能可以比手写状态机还要更优。
- 本质是:协程 在抽象的同时不引入开销,甚至可能减少代码量、减少分支跳转等,从而带来性能收益。
张图展示了一个“异步状态机(Async State Machine)”。以下是其结构和运行方式的简要说明:
- 初始状态:流程从左侧的黑点(起点)开始。
- Connecting(连接中):第一个状态,尝试连接某个资源(例如服务器)。
- 如果连接失败,状态会转为“Failed(失败)”。
- 如果连接成功,状态会进入“Reading(读取中)”。
- Reading(读取中):读取数据的状态。
- 数据读取完成后,状态会转为“Completed(完成)”。
- 另外,Reading状态有一个循环,可以再次回到Reading(例如需要多次读取数据)。
- Failed(失败):连接失败的状态。从这个状态可以返回到“Connecting”重新尝试连接。
- Completed(完成):流程成功结束,进入最终状态(右侧的黑点)。
展示了同步 vs 异步编程的对比。这段示例是为了说明:
int tcp_reader(int total) {char buf[4 * 1024];auto conn = Tcp::Connect("127.0.0.1", 1337);for (;;) {auto bytesRead = conn.Read(buf, sizeof(buf));total -= bytesRead;if (total <= 0 || bytesRead == 0) return total;}
}
“如果是同步操作,代码很简单;但异步就不那么直观了。”
代码分解与理解
示例代码(同步版本)
int tcp_reader(int total)
{char buf[4 * 1024]; // 每次读取的缓存auto conn = Tcp::Connect("127.0.0.1", 1337); // 同步连接 TCPfor (;;){auto bytesRead = conn.Read(buf, sizeof(buf)); // 同步读取total -= bytesRead; // 剩余读取量递减if (total <= 0 || bytesRead == 0)return total; // 读完了或者对端关闭连接,退出循环}
}
核心逻辑说明
行 | 含义 | ||
---|---|---|---|
Tcp::Connect(...) | 建立 TCP 连接,同步调用,函数在连接成功前会阻塞 | ||
conn.Read(...) | 从连接中读取数据,也是同步的,阻塞直到数据到来或连接关闭 | ||
total -= bytesRead | 累计读取了多少字节,直到 total 为 0 为止 | ||
`if (total <= 0 | bytesRead == 0)` | 退出条件:读取完毕 or 连接关闭 |
演讲中要传达的重点:
“Trivial if synchronous”
这句话的意思是:
如果是同步调用(如上述代码),逻辑是线性且清晰的,比如:
- 先连接
- 然后读取
- 然后判断是否继续
这种风格的代码结构易于理解、易于维护。
但异步就不简单了…
在传统的异步编程中(比如回调、手写状态机),你无法这么直接写:
- 必须分阶段处理(连接 -> 回调 -> 读取 -> 回调)
- 状态必须保存和恢复(如哪个阶段、读到多少等)
- 会导致“回调地狱”或复杂的状态机
所以协程的意义是:
协程让你像写同步代码一样写异步逻辑
但编译器帮你构造状态机,不用你手动管理状态!
举个异步协程版本(C++20)
如果你用协程写同样的逻辑,可能长这样:
task<int> tcp_reader(int total)
{char buf[4 * 1024];auto conn = co_await Tcp::ConnectAsync("127.0.0.1", 1337); // 非阻塞连接for (;;){auto bytesRead = co_await conn.ReadAsync(buf, sizeof(buf)); // 非阻塞读取total -= bytesRead;if (total <= 0 || bytesRead == 0)co_return total;}
}
你可以看到:
- 逻辑保持清晰(像同步代码一样)
- 但实际是异步非阻塞的(不会卡住线程)
- 状态由编译器生成的协程状态机维护,开发者不用显式写
总结
同步版本 | 简单易懂,但阻塞线程 |
---|---|
异步回调 | 性能好,但结构复杂 |
协程版本 | 性能好 + 结构清晰,是理想结合 |
所以,演讲者通过这个例子在强调: |
C++ 协程 = 写同步风格的异步代码 = “负开销抽象”
C++ 标准库中 std::future<T>
和 std::promise<T>
的内部实现机制和它们共享的底层结构 shared_state<T>
。
核心概念:std::future<T>
与 std::promise<T>
它们是 线程通信机制:一个线程设置结果(promise
),另一个线程等待结果(future
)。
std::promise<T>
:生产者,用来设置值或异常std::future<T>
:消费者,用来等待并获取值
它们共享一个底层状态对象:shared_state<T>
你列出的结构说明
让我们一一拆解你列的字段,理解 C++ 标准库中 std::future
的底层实现逻辑。
shared_state<T>
这个类是**future
和 promise
共享的核心对象**。
template<typename T>
struct shared_state {std::atomic<long> refCnt; // 引用计数std::mutex lock; // 保证线程安全std::variant<empty, T, std::exception_ptr> value; // 存储结果或异常std::condition_variable ready; // 通知等待线程
};
std::future<T>
template<typename T>
class future {intrusive_ptr<shared_state<T>> state;
public:void wait(); // 等待结果变为 readyT get(); // 获取值或抛出异常
};
wait()
会在state->ready
上阻塞,直到结果被设置get()
会获取value
中的内容,如果是exception_ptr
,会重新抛出异常
std::promise<T>
template<typename T>
class promise {intrusive_ptr<shared_state<T>> state;
public:void set_value(T val); // 设置正常值void set_exception(std::exception_ptr); // 设置异常
};
set_value()
设置state->value = T
,并通知ready
set_exception()
设置state->value = exception_ptr
,并通知ready
intrusive_ptr 是什么?
intrusive_ptr
是一种引用计数智能指针,但它不像 shared_ptr
把引用计数封装在外部,而是把引用计数嵌入对象本身(如 shared_state
)中。
std::atomic<long> refCnt; // 在 shared_state 内部
好处是:
- 更少的内存分配
- 更高的性能
- 被 Boost 和一些高性能库广泛使用
举个使用例子
std::promise<int> p;
std::future<int> f = p.get_future();
std::thread t([&]() {std::this_thread::sleep_for(std::chrono::seconds(1));p.set_value(42); // 设置值,唤醒主线程
});
std::cout << f.get(); // 阻塞直到拿到结果
t.join();
这里:
promise
和future
共享一个shared_state<int>
- 主线程在
f.get()
上阻塞,内部wait()
在等待shared_state.ready
- 子线程调用
set_value(42)
后通知ready
,唤醒主线程继续
与协程的关系
这个模型和协程里的 co_await
有很大关联:
- 协程的挂起/恢复背后也是靠类似
shared_state
的机制来传递数据 - C++20 中的
std::future<T>
不能直接用于co_await
,但概念类似
协程库(如cppcoro
,folly::coro
)会自定义轻量的shared_state
实现来支持co_await
。
总结一图(简化):
+--------------------+| shared_state<T> ||--------------------|| atomic refCnt | <- intrusive_ptr 管理引用| mutex lock || variant value | <- T 或 异常| cond_var ready |+--------------------+^ ^| |+-------+ +-------+| |std::promise<T> std::future<T>set_value/set_exception wait()/get()
展示如何在没有协程(co_await)支持的 C++ 中,用 std::future
和 continuation (.then
) 链式调用来模拟协程风格的异步逻辑。
总体意图:异步 TCP 读取器
这个函数:
future<int> tcp_reader(int64_t total)
的目的是:
建立 TCP 连接,然后不断读取数据直到
total
字节被读取完,或连接断开(读到 0 字节),最终返回剩余的total
。
拆解结构分析
1. struct State
struct State {char buf[4 * 1024];int64_t total;Tcp::Connection conn;explicit State(int64_t total) : total(total) {}
};
用来在异步回调链中保存共享状态(类似于闭包环境):
- 缓冲区
buf
- 剩余字节数
total
- 连接对象
conn
通过make_shared<State>(total)
分配到堆上,确保生命周期跨越多次异步调用。
2. Tcp::Connect(...).then(...)
Tcp::Connect("127.0.0.1", 1337).then([state](future<Tcp::Connection> conn) {state->conn = std::move(conn.get());...});
这是发起连接的异步调用:
Tcp::Connect(...)
返回future<Tcp::Connection>
.then(...)
是链式 continuation,等连接完成后继续执行- 回调中将
conn.get()
的结果存入state->conn
3. do_while(...)
异步循环
return do_while([state]() -> future<bool> {if (state->total <= 0)return make_ready_future(false);return state->conn.read(state->buf, sizeof(state->buf)).then([state](future<int> nBytesFut) {auto nBytes = nBytesFut.get();if (nBytes == 0)return make_ready_future(false);state->total -= nBytes;return make_ready_future(true);});
});
do_while(...)
的意义:
这是一个模拟异步循环的辅助函数:
future<void> do_while(function<future<bool>()> body) {return body().then([=](future<bool> notDone) {return notDone.get() ? do_while(body) : make_ready_future();});
}
- 调用
body()
(返回future<bool>
) - 如果
true
,递归再次调用do_while(body)
- 如果
false
,完成循环,返回一个完成的future<void>
body()
的意义:
- 异步读取数据
- 如果
total <= 0
或read()
返回 0,就停止循环 - 否则减少
total
并继续循环
最终返回值是 future<int>
代码最后返回整个连接 + 循环结束后剩余的 total
,理论上应该包装在 .then(...)
的链尾:
.then([state]() {return make_ready_future(state->total);
});
这在你提供的代码中省略了,但从类型 future<int>
推断是应该加上的。
本质与目的
这段代码等价于同步逻辑:
Tcp::Connection conn = Tcp::Connect(...);
while (total > 0) {int n = conn.read(buf, sizeof(buf));if (n == 0) break;total -= n;
}
return total;
但由于它是 完全异步的,不阻塞线程,所以要用:
future<T>
.then()
链接操作do_while()
模拟异步循环State
来保存上下文
现代协程下的写法(C++20)
如果用协程,这段逻辑可以简化成几行:
task<int> tcp_reader(int64_t total) {char buf[4 * 1024];auto conn = co_await Tcp::ConnectAsync("127.0.0.1", 1337);while (total > 0) {int n = co_await conn.read_async(buf, sizeof(buf));if (n == 0) break;total -= n;}co_return total;
}
协程让你写“同步风格”的代码,但运行时仍是异步的,本质上还是状态机 + continuation + shared_state,只是你不用自己写这些了。
总结
概念 | 说明 |
---|---|
State | 保存异步操作跨调用的共享状态 |
.then(...) | 链接异步操作 |
do_while(...) | 模拟异步的循环 |
make_ready_future(...) | 创建立即完成的 future |
Tcp::Connect 、read() | 异步操作,返回 future<T> |
这段代码展示的是:在没有协程的情况下,如何手动实现非阻塞、异步控制流的方式。这为后来 C++ 协程设计提供了动机和反面教材。 |
这段代码是一个同步的 TCP 读取器,它展示了最“直白”的读取逻辑。我们来逐行分析和理解它的意义。
完整代码
int tcp_reader(int total)
{char buf[4 * 1024]; // 每次读取最多 4KBauto conn = Tcp::Connect("127.0.0.1", 1337); // 同步建立连接for (;;){auto bytesRead = conn.Read(buf, sizeof(buf)); // 同步读取数据total -= bytesRead; // 累计减少要读取的总字节数if (total <= 0 || bytesRead == 0)return total; // 读取完毕,或对端关闭连接(读到 0)}
}
含义逐步解释
行 | 含义 | ||
---|---|---|---|
char buf[4 * 1024]; | 设置读取缓冲区,大小为 4KB | ||
Tcp::Connect(...) | 建立 TCP 连接,同步阻塞 | ||
conn.Read(...) | 同步读取数据到 buf ,返回实际读取的字节数 | ||
total -= bytesRead; | 更新剩余要读取的字节总数 | ||
`if (total <= 0 | bytesRead == 0)` | 读取完目标字节数或连接关闭时退出循环 | |
return total; | 返回剩余未读取的字节数(理论上为 0) |
用意总结
同步读取固定长度的数据,直到完成或连接断开。
- 简单直观(没有异步逻辑、回调或状态机)
- 但是线程是阻塞的,在读取过程中无法做其他事情
适用场景
优点 | 缺点 |
---|---|
结构清晰、调试容易 | 阻塞线程,不适合高并发场景 |
无需管理复杂状态 | 线程资源浪费,响应慢 |
易于写和维护 | 只能线性执行,不可扩展 |
与协程版本对比
你之前提到的协程/异步版本,可以实现相同功能,但不会阻塞线程,适用于高性能服务器。
同步版本(现在的):
int tcp_reader(int total) {for (...) {conn.Read(...) // 阻塞}
}
协程版本(伪代码):
task<int> tcp_reader(int total) {while (total > 0) {int bytes = co_await conn.read_async();if (bytes == 0) break;total -= bytes;}co_return total;
}
同样的逻辑,但协程版本不会阻塞主线程,可以支持成千上万个并发连接。
总结一句话
这段同步代码展示了最基础的阻塞式读取逻辑:易写但不可扩展,是协程和异步模型的“对比起点”。
展示了如何用基于 std::future
和回调 (.then
) 的异步风格,完成一个异步 TCP 读取器的逻辑。它模拟了“异步循环读取数据直到满足条件”的场景。
代码结构拆解与理解
1. struct State
struct State {char buf[4 * 1024]; // 读缓冲区int64_t total; // 剩余待读取字节数Tcp::Connection conn; // 连接对象explicit State(int64_t total) : total(total) {}
};
- 保存跨异步调用共享的状态
- 通过
make_shared
在堆上创建,保证生命周期延续
2. auto state = make_shared<State>(total);
- 创建
State
对象,初始化剩余读取字节数
3. 异步连接
return Tcp::Connect("127.0.0.1", 1337).then([state](future<Tcp::Connection> connFut) {state->conn = std::move(connFut.get());// ...}
);
- 异步连接服务器
- 连接完成后,取得连接对象,存入
state->conn
4. 异步循环 do_while
return do_while([state]() -> future<bool> {if (state->total <= 0) return make_ready_future(false); // 读取完毕,停止循环return state->conn.read(state->buf, sizeof(state->buf)).then([state](future<int> nBytesFut) {auto nBytes = nBytesFut.get();if (nBytes == 0)return make_ready_future(false); // 连接关闭,停止循环state->total -= nBytes;return make_ready_future(true); // 继续循环});
});
- 每次循环尝试读取数据
- 如果读到 0 字节,或者总字节数读完,返回
false
停止循环 - 否则更新剩余字节数,返回
true
继续循环
5. do_while
的定义逻辑(你之前给的)
future<void> do_while(function<future<bool>()> body) {return body().then([=](future<bool> notDone) {return notDone.get() ? do_while(body) : make_ready_future();});
}
- 递归异步调用,直到
body()
返回的future<bool>
为false
- 这样实现了异步的循环行为
总结
组件 | 作用 |
---|---|
State | 持有跨异步调用的共享状态 |
Tcp::Connect() | 异步连接服务器 |
.then() | 异步链式回调 |
do_while() | 异步循环控制流 |
make_ready_future() | 创建一个已完成的 future |
这段代码的作用是:
异步连接服务器,异步循环读取数据直到读取完指定字节数或连接关闭,整个过程不阻塞线程,返回
future<int64_t>
(这里代码没写完整,但应该返回剩余total
)
这段代码展示了一个完整的异步 TCP 读取器实现,结合了异步连接、异步循环读取、以及最后返回剩余未读字节数的逻辑。下面帮你理清思路,确保每一步都理解:
代码整体结构分析
future<int> tcp_reader(int64_t total) {struct State {char buf[4 * 1024];int64_t total;Tcp::Connection conn;explicit State(int64_t total) : total(total) {}};auto state = make_shared<State>(total);return Tcp::Connect("127.0.0.1", 1337).then([state](future<Tcp::Connection> connFut) {state->conn = std::move(connFut.get());return do_while([state]() -> future<bool> {if (state->total <= 0)return make_ready_future(false);return state->conn.read(state->buf, sizeof(state->buf)).then([state](future<int> nBytesFut) {auto nBytes = nBytesFut.get();if (nBytes == 0)return make_ready_future(false);state->total -= nBytes;return make_ready_future(true);});});}).then([state](future<void>){return make_ready_future(state->total);});
}
各部分功能详解
1. State
结构体
- 保存缓冲区、剩余读字节数、TCP连接对象
- 用
shared_ptr
管理生命周期,确保跨多次异步调用有效
2. Tcp::Connect(...)
- 异步发起 TCP 连接,返回
future<Tcp::Connection>
.then(...)
连接完成后执行,获得连接对象
3. do_while(...)
异步循环
- 反复执行读取操作
- 直到剩余
total
≤ 0 或读取到 0 字节时停止 - 每次调用
conn.read(...)
返回异步的future<int>
,表示读取的字节数
4. 最后 .then(...)
- 在循环完成后,返回剩余未读的字节数
state->total
你这段代码为什么正确?
- 以异步方式表达了 TCP 连接及读数据过程
- 使用
do_while
实现异步循环,避免阻塞线程 - 利用
shared_ptr<State>
维持异步中间状态 - 通过链式调用
.then
实现异步流程编排 - 返回的是
future<int>
,即最终剩余字节数
小建议
- 代码中有一处
auto nBytes = nBytesFut.get()
后面缺少分号,补充下; then
前的.then
出现两次可去掉重复;
总结
这段代码是一个典型异步状态机实现,模拟了协程“暂停和恢复”的流程,帮助理解协程背后异步执行的本质。
如果你想,我可以帮你:
- 写出
do_while
和make_ready_future
的完整示例 - 给你演示如何用 C++20 协程改写这段代码
- 讲讲这段代码背后异步模型和协程的关联
这组代码演示了一个 手写的异步状态机 版本的 TCP 读取器,完全用回调和状态控制实现了异步读取过程。
代码理解拆解(分三部分)
1/3 部分:类定义和流程注释
class tcp_reader {char buf[64 * 1024];Tcp::Connection conn;promise<int> done; // 最终完成信号,返回剩余字节数int total; // 剩余要读取的字节数explicit tcp_reader(int total) : total(total) {}void OnConnect(error_code ec, Tcp::Connection newCon);void OnRead(error_code ec, int bytesRead);void OnError(error_code ec);void OnComplete();
public:static future<int> start(int total);
};
promise<int> done;
:外部获取结果的异步信号,内部通过set_value
或set_exception
通知完成或失败- 这里每个异步事件(连接、读取、错误、完成)对应一个成员函数处理,体现异步状态机模式
start
是入口,返回一个future<int>
,等待读取完成的结果
2/3 部分:启动流程和连接回调
future<int> tcp_reader::start(int total) {auto p = make_unique<tcp_reader>(total);auto result = p->done.get_future();Tcp::Connect("127.0.0.1", 1337,[raw = p.get()](auto ec, auto newConn) {raw->OnConnect(ec, std::move(newConn));});p.release(); // 放弃unique_ptr,生命周期交给异步状态机自己管理return result;
}
void tcp_reader::OnConnect(error_code ec, Tcp::Connection newCon) {if (ec) return OnError(ec);conn = std::move(newCon);// 开始异步读conn.Read(buf, sizeof(buf),[this](error_code ec, int bytesRead) {OnRead(ec, bytesRead);});
}
start
分配一个tcp_reader
实例并发起异步连接- 连接完成后调用
OnConnect
:- 如果有错误则调用
OnError
- 成功则开始异步读取,并指定回调为
OnRead
- 如果有错误则调用
3/3 部分:读取回调、错误和完成处理
void tcp_reader::OnRead(error_code ec, int bytesRead) {if (ec) return OnError(ec);total -= bytesRead;if (total <= 0 || bytesRead == 0) return OnComplete();// 继续异步读conn.Read(buf, sizeof(buf),[this](error_code ec, int bytesRead) {OnRead(ec, bytesRead);});
}
void tcp_reader::OnError(error_code ec) {auto cleanMe = unique_ptr<tcp_reader>(this); // 释放自身done.set_exception(make_exception_ptr(system_error(ec)));
}
void tcp_reader::OnComplete() {auto cleanMe = unique_ptr<tcp_reader>(this); // 释放自身done.set_value(total);
}
- 每次读取回调:
- 出错则
OnError
,设置异常并释放对象 - 读取完成(数据读完或连接关闭)则
OnComplete
,设置结果并释放对象 - 否则继续下一次异步读取,形成异步循环
- 出错则
unique_ptr<tcp_reader>(this)
是技巧,用于在异步完成后自动释放tcp_reader
对象,避免内存泄漏
总结
- 这是典型的手写异步状态机:每个事件对应一个回调函数,状态由类成员变量保存,异步流程通过连续回调展开
- 通过
promise
/future
把异步结果暴露给调用方 - 通过
unique_ptr(this)
技巧实现异步对象的自管理生命周期 - 代码逻辑清晰但回调链较深,写起来复杂,容易出错
- 这种写法是现代协程出现前广泛使用的异步编程范式
这段内容对比了几种不同的异步 TCP 读取器实现风格,目的是展示从传统阻塞、回调异步,到用 C++20 协程(coroutine) 的简洁表达。
传统同步写法(阻塞版)
auto tcp_reader(int total) -> int {char buf[4 * 1024];auto conn = Tcp::Connect("127.0.0.1", 1337); // 阻塞连接for (;;) {auto bytesRead = conn.Read(buf, sizeof(buf)); // 阻塞读total -= bytesRead;if (total <= 0 || bytesRead == 0) return total;}
}
Tcp::Connect
和conn.Read
都是同步阻塞的,线程会一直等待- 代码简单,流程直观
- 缺点是阻塞线程,无法利用异步优势
用 future
和回调实现的异步版本(之前写的)
- 通过
future
和.then()
实现异步链式调用 - 需要用回调写异步循环(例如
do_while
) - 代码复杂度较高,逻辑拆分到多个回调函数中
C++20 协程版本(trivial coroutine)
auto tcp_reader(int total) -> future<int> {char buf[4 * 1024];auto conn = co_await Tcp::Connect("127.0.0.1", 1337);for (;;) {auto bytesRead = co_await conn.Read(buf, sizeof(buf));total -= bytesRead;if (total <= 0 || bytesRead == 0)co_return total;}
}
- 关键点是使用
co_await
来等待异步操作完成 - 代码表现得像同步顺序代码,但底层是异步非阻塞的
- 更简洁、易读,写异步像写同步代码
- 编译器自动将代码转换成状态机,隐藏了复杂细节
总结
实现方式 | 优点 | 缺点 |
---|---|---|
同步阻塞 | 代码简单,直观 | 阻塞线程,性能受限 |
传统回调 + future | 支持异步,非阻塞 | 回调嵌套多,代码复杂,难维护 |
C++20 协程 (co_await) | 语义清晰,写法简洁,异步性能好 | 需要编译器和库支持 |
你这段“理解”就是在总结这个发展过程: |
用协程,异步代码写起来像同步代码,极大降低异步编程复杂度。
这段内容主要是讲协程(Coroutines)在性能和二进制大小上的表现,跟手写异步代码(Hand-Crafted async state machine)做了对比。
你提供的信息拆解:
实现方式 | 吞吐率(MB/s) | 二进制大小(KB) | 相对性能 |
---|---|---|---|
Coroutines | 495 | 30 | 1.3x |
Hand-Crafted | 380 | 25 | 0.85x |
- Coroutines(协程) 吞吐率高达 495 MB/s,二进制大小约 30 KB
- 手写异步状态机 吞吐率是 380 MB/s,二进制大小约 25 KB
- 协程在性能上比手写异步好约 1.3 倍,且二进制大小略大但差别不大
额外信息
- 测试环境是 Visual C++ 2015 RTM,测试机型是 Lenovo W540 笔记本
- 传输量是 1GB 数据,使用 loopback 地址(本地回环接口)
结论与理解
- 协程带来的开销非常小,反而因为编译器优化和减少复杂回调,协程实现的代码性能优于手工写的异步状态机
- 二进制大小差异很小,协程多出的代码大小并不显著,说明协程不会明显膨胀程序体积
- 实际吞吐率提升说明协程写法不仅易用,而且在高性能场景下也有优势
总结
- C++ 协程不是牺牲性能换简洁,相反它能提供性能提升,同时让代码更易维护
- 这是协程设计的“大杀器”:用更自然的写法带来更好的性能
协程更贴近底层(Closer to the metal)
它把传统异步编程和协程的关系,用分层结构来对比:
传统手写异步(Handcrafted State Machines)
- 层次结构:
State Machines (状态机逻辑)↓ I/O Abstractions (基于回调的异步 I/O 抽象)↓ OS / Low Level Libraries↓ Hardware
- 你需要自己写状态机逻辑,处理回调,非常繁琐。
- I/O 抽象是基于回调的,复杂度高。
- 逻辑比较“高层”,离底层硬件还有多层抽象。
协程(Coroutines)
- 层次结构:
I/O Abstractions (基于 awaitable 的异步 I/O 抽象)↓ OS / Low Level Libraries↓ Hardware
- 协程用
awaitable
(可等待对象)封装异步操作,省去了自己写状态机的麻烦。 - 这样写的代码在编译器自动生成的状态机后,更贴近 OS 和硬件层,实现了高效的协作式多任务。
- 简化了异步编程模型,同时性能更接近底层。
总结
- 传统回调状态机是高层逻辑,离硬件远且复杂。
- 协程把状态机隐藏在编译器中,直接建立在高效的底层异步 I/O 抽象之上,更贴近硬件执行。
- 这让异步代码更自然、更高效。
这就是为什么协程不仅写起来简单,而且还能实现接近底层的性能表现。
你这段内容主要讲的是如何将高级异步调用映射到底层操作系统的异步 I/O API,比如 conn.Read
这种接口,最终是怎么跟 OS 提供的接口对接的。
代码示例回顾
conn.Read(buf, sizeof(buf),[this](error_code ec, int bytesRead) { OnRead(ec, bytesRead); });
conn.Read
是一个异步读接口,接受一个缓冲区、大小和回调- 回调是读取完成时调用的,比如传入错误码和读取到的字节数
Read 函数模板接口
template <class Cb>
void Read(void* buf, size_t bytes, Cb && cb);
- 这是一个通用接口模板,接受任意可调用对象(lambda、函数指针、函数对象等)
- 这是异步 I/O 抽象层,隐藏底层实现细节
OS 底层对应 API
- Windows:
WSARecv(fd, ..., OVERLAPPED*);
WSARecv
是 Windows 套接字的异步读接口- 通过传入一个
OVERLAPPED*
结构体,告诉系统异步操作的状态和完成通知
- POSIX:
aio_read(fd, ..., aiocbp*);
aio_read
是 POSIX 异步 I/O 读接口- 传入一个
aiocbp
(aiocb*
,异步 I/O 控制块)描述读请求和完成状态
抽象与映射关系
层级 | 例子 | 作用 |
---|---|---|
函数对象/回调 | cb (lambda 或函数对象) | 用户传入的异步操作完成回调 |
异步操作结构体 | OVERLAPPED* / aiocbp* | OS 用来管理异步操作状态和结果 |
OS 异步接口 | WSARecv / aio_read | 底层操作系统提供的异步读写调用 |
Read
实现中,会把用户的回调cb
封装到一个函数对象里- 这个函数对象与底层的
OVERLAPPED
或aiocbp
结构体关联 - 当 OS 异步操作完成时,系统会触发对应事件,最终调用这个回调通知用户
总结
- 高级异步调用(
conn.Read
)是通过一个统一的异步抽象层实现的 - 抽象层隐藏了 OS 细节,将回调或函数对象与底层异步状态结构(OVERLAPPED/aiocbp)绑定
- 最终调用 OS 提供的异步读接口(
WSARecv
、aio_read
),实现非阻塞 I/O
这段代码展示了基于 Windows Overlapped I/O 模型封装异步读操作的实现思路,并且结合了回调函数对象,最终在异步完成时调用用户传入的回调。
关键点分析
1. OverlappedBase
抽象类
struct OverlappedBase : os_async_context {virtual void Invoke(std::error_code, int bytes) = 0;virtual ~OverlappedBase() {}static void io_complete_callback(CompletionPacket& p) {auto me = std::unique_ptr<OverlappedBase>(static_cast<OverlappedBase*>(p.overlapped));me->Invoke(p.error, p.byteTransferred);}
};
OverlappedBase
继承自某个底层os_async_context
(可能是包裹了 OVERLAPPED/aiocbp)- 抽象接口
Invoke
用于回调异步完成通知 - 静态函数
io_complete_callback
作为线程池回调入口,接收到完成事件后,把p.overlapped
指针转成OverlappedBase*
,然后调用Invoke
- 这里使用了
unique_ptr
,确保事件处理完毕后释放内存,避免泄漏
2. 线程池关联套接字句柄和回调
ThreadPool::AssociateHandle(sock.native_handle(), &OverlappedBase::io_complete_callback);
- 关联套接字
sock
和线程池,传入统一的完成回调io_complete_callback
- 这样 OS 异步完成后,线程池调用这个函数处理完成事件
3. 回调适配器 CompletionWithCount
template <typename Fn>
struct CompletionWithCount : OverlappedBase, private Fn {CompletionWithCount(Fn fn) : Fn(std::move(fn)) {}void Invoke(std::error_code ec, int count) override { Fn::operator()(ec, count); }
};
- 这是一个模板继承类,把任意用户函数对象
Fn
包装为OverlappedBase
的实现 - 重写
Invoke
,调用用户传入的函数对象(回调) - 让用户回调和底层异步完成处理耦合在一起
4. 创建包装回调对象的辅助函数
template <typename Fn>
std::unique_ptr<OverlappedBase> make_handler_with_count(Fn&& fn) {return std::make_unique<CompletionWithCount<std::decay_t<Fn>>>(std::forward<Fn>(fn));
}
- 根据传入的回调生成对应的
CompletionWithCount
对象,方便封装回调
5. Read
的实现(两个重载)
template <typename F>
void Read(void* buf, int len, F&& cb) {return Read(buf, len, make_handler_with_count(std::forward<F>(cb)));
}
void Read(void* buf, int len, std::unique_ptr<detail::OverlappedBase> o) {auto error = sock.Receive(buf, len, o.get());if (error) {if (error.value() != kIoPending) {o->Invoke(error, 0);return;}}o.release();
}
- 第一个模板版本:把用户传入的回调封装成
OverlappedBase
派生对象,调用第二个版本 - 第二个版本调用底层的
sock.Receive
,传入缓冲区、长度和OverlappedBase*
(对应底层的 OVERLAPPED) - 如果调用立即失败(非挂起状态),马上调用回调
Invoke
- 如果操作成功挂起,释放指针,交给 OS 异步完成后回调处理
总结
- 这是一个典型的封装底层 OVERLAPPED 异步模型的异步读实现
- 通过继承和函数对象,把用户回调绑定到底层异步上下文,确保完成后能正确回调用户
- 线程池负责监听 IO 完成事件,调用统一的
io_complete_callback
分发到具体的Invoke
- 这样封装后,用户只需传入普通回调,底层帮你处理了内存管理和异步完成的细节
C++20 协程中 co_await
表达式的本质机制 —— 它是如何工作的。下面是详细解释:
await conn.Read(buf, sizeof(buf));
是什么?
这是一个协程表达式(co_await 的语法糖):
auto bytesRead = await conn.Read(buf, sizeof(buf));
实际的执行是把 conn.Read(...)
这个返回值(是一个Awaitable 类型)传给 C++ 协程机制来拆解为:
auto awaitable = conn.Read(buf, sizeof(buf));
if (!awaitable.await_ready()) {awaitable.await_suspend(coroutine_handle);// 让出执行,挂起协程,等事件完成后恢复
}
auto result = awaitable.await_resume(); // 拿到最终结果
Awaitable 概念:和 Future<T>
类似,但有三个接口
C++ 协程不需要 std::future<T>
,它只需要支持以下这三件事的对象(称为 Awaitable):
方法名 | 说明 |
---|---|
bool await_ready() | 是否已经准备好?返回 true 表示不需要挂起,直接执行 await_resume() |
void await_suspend(std::coroutine_handle<>) | 如果 await_ready() 返回 false,则挂起协程,注册回调在某个时间点恢复协程 |
T await_resume() | 恢复协程时调用,获取协程表达式的最终结果(如读取到的字节数) |
这些接口的组合称为 Awaiter 或 Awaitable 实现(类型) |
举个例子
以 conn.Read(buf, size)
为例,它可能返回某种 ReadAwaiter
对象:
struct ReadAwaiter {bool await_ready() const noexcept { return false; } // 假设永远异步void await_suspend(std::coroutine_handle<> h) {conn.Read(buf, size, [h](error_code ec, int n) {// 保存结果this->nBytes = n;this->ec = ec;h.resume(); // 继续协程});}int await_resume() {if (ec) throw system_error(ec);return nBytes;}char* buf;size_t size;Tcp::Connection& conn;error_code ec;int nBytes;
};
然后你就可以这样写:
int n = co_await ReadAwaiter{conn, buf, size};
总结
你写的:
auto bytesRead = await conn.Read(buf, sizeof(buf));
其实等价于:
auto awaitable = conn.Read(...);
if (!awaitable.await_ready())awaitable.await_suspend(handle); // handle 是协程的上下文
auto bytesRead = awaitable.await_resume();
C++20 的协程魔法:
它把复杂的“回调地狱”变成“同步风格的写法”,底层还是异步 + 回调。
这页的内容讲解了 C++ 协程中 co_await <expr>
背后的展开机制,也就是 co_await
的底层等效逻辑。我们来逐行解释它的含义。
co_await <expr>
的展开过程
在 C++20 中:
auto result = co_await <expr>;
等价于:
{auto&& tmp = <expr>;if (!tmp.await_ready()) {tmp.await_suspend(coroutine_handle); // 挂起协程}return tmp.await_resume(); // 恢复时调用,拿结果
}
三个步骤详解
步骤 | 含义 |
---|---|
auto&& tmp = <expr>; | <expr> 是你写的 co_await 后面跟的那个 awaitable 对象,比如 conn.Read(...) ,这里先评估表达式并保存为一个临时对象 |
if (!tmp.await_ready()) | 判断是否已经完成(同步完成的话就不用挂起) |
tmp.await_suspend(coroutine_handle); | 如果还没完成,就把当前协程挂起,并传入当前协程的句柄,通常这时注册一个 I/O 回调 |
tmp.await_resume(); | 当协程恢复时执行,得到最终值(比如读取的字节数),或者抛出异常 |
举个实际的例子:
auto bytesRead = co_await conn.Read(buf, sizeof(buf));
实际展开成:
{auto&& awaitable = conn.Read(buf, sizeof(buf));if (!awaitable.await_ready()) {awaitable.await_suspend(handle); // handle 是当前协程的句柄}auto bytesRead = awaitable.await_resume();
}
其中:
await_ready()
返回true
代表同步完成(如从缓存读数据)await_suspend(...)
是关键,用于注册异步操作(比如网络 I/O 回调)await_resume()
返回真正的结果(如读了多少字节)
小结
co_await
是 C++20 协程的核心语法。- 它本质上是对对象的 3 个方法(
await_ready
,await_suspend
,await_resume
)的组合调用。 - 这是一个低开销的协程机制,不需要像
std::future
那样共享状态或锁。
这段代码展示的是一个封装底层异步 I/O 的 基类 OverlappedBase
,它作为抽象接口,让你可以把异步操作(如 Windows 的 OVERLAPPED
或 POSIX 的 aiocb
)与 C++ 的函数对象和 RAII 模式结合起来。我们来详细理解下:
struct OverlappedBase : os_async_context {virtual void Invoke(std::error_code, int bytes) = 0;virtual ~OverlappedBase() {}static void io_complete_callback(CompletionPacket& p) {auto me = static_cast<OverlappedBase*>(p.overlapped);auto cleanMe = unique_ptr<OverlappedBase>(me);me->Invoke(p.error, p.byteTransferred);}
};
代码逐行解析:
struct OverlappedBase : os_async_context {
- 这个结构体继承自
os_async_context
,这个类型代表系统原生的异步上下文(如 Windows 的OVERLAPPED
、POSIX 的aiocb
)。 - 它的目的,是为了可以被操作系统 I/O 接口识别,并参与底层事件通知。
virtual void Invoke(std::error_code, int bytes) = 0;
- 这是一个纯虚函数,子类必须实现。
- 操作完成后,系统会调用
Invoke()
,传入结果(错误码和读取的字节数)。
virtual ~OverlappedBase() {}
- 虚析构保证派生类析构函数会被正确调用,用于正确释放资源。
static void io_complete_callback(CompletionPacket& p) {auto me = static_cast<OverlappedBase*>(p.overlapped);auto cleanMe = unique_ptr<OverlappedBase>(me);me->Invoke(p.error, p.byteTransferred);
}
这个函数非常关键!
- 它是被操作系统异步框架调用的回调函数,比如当
WSARecv()
或aio_read()
完成后,系统会调用你注册的完成函数。 - 它从完成包
p
中提取出你的自定义对象OverlappedBase
。 - 用
unique_ptr
管理它,保证调用后自动销毁(RAII)。 - 最后调用
Invoke()
,将完成事件传递给业务逻辑。
为什么这样设计?
这是为了解决「如何把底层 I/O 回调变成 C++ 对象和协程可等待的形式」的问题。
- 把
OVERLAPPED
包装成一个继承自OverlappedBase
的对象。 - 系统 I/O 完成后会调用你提供的静态回调(例如
io_complete_callback()
)。 - 你可以在
Invoke()
中继续逻辑,比如恢复协程。
总结
OverlappedBase
是用于封装底层异步 I/O + 回调的抽象接口。- 它使得异步 I/O 能够和 C++ 的函数对象、RAII、协程模型融合。
- 它是从系统回调进入你的 C++ 世界的关键桥梁。
需要的话,我可以展示如何从这个结构继续构建awaitable
对象,让你可以写出:
int bytes = co_await conn.Read(buf, len);
这段代码是之前 OverlappedBase
的“协程化”版本,叫做 AwaiterBase
。它展示了如何将底层异步 I/O 封装为 co_await
可用的 awaiter。
struct AwaiterBase : os_async_context {coroutine_handle<> resume;std::error_code err;int bytes;static void io_complete_callback(CompletionPacket& p) {auto me = static_cast<AwaiterBase*>(p.overlapped);me->err = p.error;me->bytes = p.byteTransferred;me->resume();}
};
AwaiterBase
是什么?
它是一个继承自 os_async_context
的结构体,表示“某个异步操作的协程挂起点”。
你可以把它想成:
“当操作系统的 I/O 完成时,我要恢复挂起的协程。”
分析结构体成员
struct AwaiterBase : os_async_context {
- 继承自
os_async_context
(比如 Windows 的OVERLAPPED
或 POSIX 的aiocb
),用于操作系统识别和传递异步上下文。
coroutine_handle<> resume;
resume
存储的是协程的句柄(co_await
会挂起协程,并记录在哪里恢复)。
std::error_code err;
int bytes;
- 存储异步操作的结果(错误码 + 成功读取的字节数)。
操作系统回调
static void io_complete_callback(CompletionPacket& p) {auto me = static_cast<AwaiterBase*>(p.overlapped);me->err = p.error;me->bytes = p.byteTransferred;me->resume(); // 恢复协程执行
}
这是协程世界的入口。
- 操作系统完成异步操作后,调用这个函数。
- 将结果写入
AwaiterBase
的成员变量。 - 然后 通过
resume()
恢复协程执行。
底层实现说明
右边这几句是讲实现的低成本本质:
sizeof(void*) // 结构体只需指针大小
no dtor // 无析构函数,意味着可以不管理生命周期(交给协程)
mov rcx, [rcx] // 恢复协程指令很简单,只是调用句柄
jmp [rcx] // 跳转到恢复点
说明这段 AwaiterBase
实现非常轻量,可以被优化为几条汇编指令,性能极高。
总结
特性 | 说明 |
---|---|
用途 | 将 OS 异步操作包装成 co_await awaitable |
结构体职责 | 保存协程句柄、错误码、字节数 |
回调逻辑 | 操作完成时恢复协程 |
性能 | 极简指令开销(只读写寄存器 + 跳转) |
这正是 C++20 协程和底层 async I/O 桥接的关键点。 |
这段代码展示了如何将底层异步 I/O(例如 sock.Receive(...)
)封装成可以用 co_await
的 Awaiter 对象,从而实现:
await conn.Read(buf, sizeof(buf));
总体结构理解
这是一个 Connection::Read()
方法,它不是立即读取数据,而是返回一个 awaiter 对象,这个对象定义了如何在协程中挂起、恢复,以及如何获取结果。
分解说明
auto Connection::Read(void* buf, int len) {
返回的是一个自定义的 awaiter 类型的对象。
struct awaiter : AwaiterBase {Connection* me;void* buf;
定义了一个结构体 awaiter
继承自前面讲的 AwaiterBase
,它包含:
- 当前连接指针
me
(用于发起读取); - 缓冲区指针
buf
; - 读取的
bytes
继承自AwaiterBase
。
bool await_ready()
bool await_ready() { return false; }
表示 是否可以立即返回结果。这里返回 false
,意味着需要挂起协程。
⏸ void await_suspend(coroutine_handle<> h)
void await_suspend(coroutine_handle<> h) {this->resume = h;auto error = me->sock.Receive(buf, bytes, this);if (error.value() != kIoPending)throw system_error(err);
}
- 协程挂起时,系统会调用这个函数。
- 存下
coroutine_handle
用于后续恢复。 - 调用异步接收函数
Receive(...)
。 - 如果立刻完成(没有 pending),抛出异常表示失败。
注意:传入的this
是继承自os_async_context
的 Awaiter,所以底层 I/O 系统会在完成后使用它调用io_complete_callback(...)
。
int await_resume()
int await_resume() {if (this->err) throw system_error(err);return bytes;
}
当异步完成后恢复协程时,会调用这个方法:
- 如果出错,抛异常;
- 否则返回读取的字节数。
底层配合
这依赖前面定义的 AwaiterBase
:
struct AwaiterBase : os_async_context {coroutine_handle<> resume;std::error_code err;int bytes;static void io_complete_callback(CompletionPacket& p) {auto me = static_cast<AwaiterBase*>(p.overlapped);me->err = p.error;me->bytes = p.byteTransferred;me->resume(); // 恢复协程}
};
在 I/O 完成时系统调用这个回调函数,它设置结果并恢复协程。
最终效果
你就可以写这样的协程代码:
future<void> process() {char buf[1024];auto conn = co_await Tcp::Connect(...);int n = co_await conn.Read(buf, sizeof(buf));// 使用 buf 中的内容
}
而这背后通过 awaiter
对象,转换为操作系统的异步事件模型。
总结
部分 | 功能 |
---|---|
Read() 返回 awaiter | 支持协程 co_await |
await_ready() | 是否同步完成 |
await_suspend() | 异步挂起,发起系统 I/O |
await_resume() | 恢复后提取结果或抛异常 |
AwaiterBase | 连接系统回调,恢复协程 |
这段代码展示了使用 C++ 协程(co_await
) 实现的 异步 TCP 数据读取函数 的简化版本。我们逐步理解其结构和含义:
完整形式
auto tcp_reader(int total) -> future<int>
{char buf[4 * 1024];auto conn = await Tcp::Connect("127.0.0.1", 1337);for (;;){auto bytesRead = await conn.Read(buf, sizeof(buf));total -= bytesRead;if (total <= 0 || bytesRead == 0)return total;}
}
各部分解释
co_await Tcp::Connect(...)
- 发起异步连接,返回一个
Tcp::Connection
。 - 该连接类型应该实现了
co_await
机制(通过await_ready / await_suspend / await_resume
)。
循环读取数据
auto bytesRead = await conn.Read(buf, sizeof(buf));
- 每次循环异步读取最多 4 KB 数据。
conn.Read(...)
是协程版本,返回 awaiter。
判断退出条件
total -= bytesRead;
if (total <= 0 || bytesRead == 0)return total;
- 减少还需读取的字节数。
- 若已完成(
total <= 0
)或连接关闭(bytesRead == 0
),则返回剩余字节数(可能为 0)。
对比:同步版 vs 协程版
同步版:
int tcp_reader(int total)
{char buf[4 * 1024];auto conn = Tcp::Connect("127.0.0.1", 1337);for (;;) {auto bytesRead = conn.Read(buf, sizeof(buf)); // 阻塞total -= bytesRead;if (total <= 0 || bytesRead == 0)return total;}
}
协程版(非阻塞):
future<int> tcp_reader(int total)
{char buf[4 * 1024];auto conn = await Tcp::Connect("127.0.0.1", 1337);for (;;) {auto bytesRead = await conn.Read(buf, sizeof(buf)); // 非阻塞等待total -= bytesRead;if (total <= 0 || bytesRead == 0)return total;}
}
总结
要素 | 含义 |
---|---|
co_await | 非阻塞异步等待 |
future<int> | 返回结果在未来某时完成 |
await conn.Read(...) | 挂起直到有数据到达 |
if (bytesRead == 0) | 连接关闭时终止读取 |
异步风格 | 无需手写状态机或回调逻辑 |
如果你理解到这里,已经掌握了协程写法在网络 I/O 场景中的基本应用。 |
我们可以通过优化协程的底层实现来提升性能,尤其是应对如下情况:
背景:50% I/O 同步完成,50% 异步挂起
Windows 平台上的异步 I/O(如 ReadFile
, WSARecv
, 等)可能会:
- 同步完成:I/O 在调用时立刻完成,立即返回。
- 异步完成:返回
ERROR_IO_PENDING
,稍后再通知完成。
这就带来一个优化点:
优化:SetFileCompletionNotificationModes(...)
SetFileCompletionNotificationModes(h, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS);
意图:
如果一个异步 I/O 操作同步完成,就不需要:
- 将完成事件排队到 I/O 完成端口(I/OCP)
- 不需要触发回调(节省上下文切换、调度、线程唤醒等成本)
实际用途
配合我们之前的 AwaiterBase
+ coroutine_handle
,你可以在 await_suspend
中检查是否同步完成:
void await_suspend(coroutine_handle<> h) {this->resume = h;auto error = me->sock.Receive(buf, bytes, this);if (error.value() != kIoPending) {// 同步完成:直接 resumeerr = error;resume(); // 不等回调,立即恢复协程}
}
配合上:
SetFileCompletionNotificationModes(sock.native_handle(), FILE_SKIP_COMPLETION_PORT_ON_SUCCESS);
好处
优化点 | 效果 |
---|---|
跳过 I/OCP 通知 | 减少线程切换、调度延迟 |
协程立即恢复 | 提升吞吐与响应速度 |
简化控制流 | 无需等待 completion callback |
总结
原始机制 | 优化点 |
---|---|
所有 I/O 都经过回调唤醒 | 利用 同步完成绕过回调路径 |
成本:内核态回调 + IOCP + resume | 成本:直接 resume 协程,低延迟 |
await_suspend → callback → resume | await_suspend → resume (if sync) |
代码展示了如何利用同步完成的 I/O 操作来提高效率。我们来逐行理解这段代码的核心逻辑和优化点:
代码分析:
void Read(void* buf, int len, std::unique_ptr<detail::OverlappedBase> o)
{auto error = sock.Receive(buf, len, o.get());if (error) {if (error.value() != kIoPending) {// 如果不是挂起,说明 I/O 是同步完成的o->Invoke(error, 0);return;}}// 异步完成:释放 ownership,等待系统调用完成并触发回调o.release();
}
核心思想:利用同步完成,减少延迟
1. sock.Receive(...)
:
- 这个是底层平台调用,例如 Windows 的
WSARecv
,可能会立即完成(同步)或返回ERROR_IO_PENDING
(异步)。
2. if (error && error.value() != kIoPending)
:
- 这个判断表示“同步完成”,不需要等待回调。
- 调用
o->Invoke(...)
立即触发后续逻辑(比如 resume 协程或调用用户回调)。
3. o.release()
:
- 否则是异步完成,释放
unique_ptr
的所有权,让系统异步调用完成时管理其生命周期。
为什么这么做高效?
同步完成 | 异步完成 |
---|---|
不需要上下文切换 | 需要 I/O 回调和唤醒协程 |
更低延迟 | 多一步调度流程 |
减少内核负担 | 需要内核管理 completion 事件 |
结合协程使用:
这段代码正是配合 co_await
或 awaiter
使用时的基础。协程里你可能会这样:
bool await_ready() { return false; }
void await_suspend(coroutine_handle<> h) {this->resume = h;auto error = sock.Receive(buf, len, this);if (error && error.value() != kIoPending) {// 同步完成,立即恢复协程err = error;resume(); }
}
int await_resume() {if (err) throw system_error(err);return bytes;
}
总结
你展示的这段代码是实现高效协程/异步模型的关键步骤之一:
- 同步完成:立即处理
- 异步完成:交由 I/O 完成端口处理
- 优化协程表现,尤其适用于高频、低延迟的网络场景
提到的内容重点在于如何**利用同步完成(synchronous I/O completions)**来优化性能,并结合了:
SetFileCompletionNotificationModes(h, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS);
这句 Windows API 调用以及下面的 Read()
实现。
逐步理解
① SetFileCompletionNotificationModes
SetFileCompletionNotificationModes(h, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS);
- Windows 专用优化
- 作用:如果 I/O 操作同步完成,就不向 I/O 完成端口发送通知(节省 CPU 和上下文切换)。
h
是 socket/file handle,通常设置在 socket 创建后、I/O 操作之前。
② Read()
实现:
void Read(void* buf, int len, std::unique_ptr<detail::OverlappedBase> o)
{auto error = sock.Receive(buf, len, o.get());if (error.value() != kIoPending) {o->Invoke(error, len); // 同步完成:立即回调return;}o.release(); // 异步完成:交给系统管理 overlapped 结构
}
优化点总结:
行为 | 解释 |
---|---|
error.value() != kIoPending | 同步完成,不等待回调,直接处理结果 |
o->Invoke(...) | 立即调用回调(或 resume 协程) |
o.release() | 异步完成,释放智能指针,由 OS 管理内存和回调 |
③ 为什么 FILE_SKIP_COMPLETION_PORT_ON_SUCCESS
很关键?
- 如果你不加这句设置,即使操作同步完成,I/O 完成端口还是会收到通知 —— 浪费资源。
- 加上这句后,避免不必要的回调队列入队,节省内核和线程池调度。
应用于协程
配合协程实现时,这种结构会大大减少上下文切换的成本:
bool await_ready() { return false; }
void await_suspend(coroutine_handle<> h) {this->resume = h;auto err = sock.Receive(buf, len, this);if (err && err.value() != kIoPending) {this->err = err;this->resume(); // 同步完成,立即恢复协程}
}
总结
你展示的 Read()
函数配合 SetFileCompletionNotificationModes
:
- 避免不必要的 IOCP 通知
- 实现“立即完成就立即响应”的高效 async 模式
- 是构建高性能 awaitable coroutine 网络 I/O的核心技巧之一
堆栈信息和代码逻辑说明了一个严重的问题:同步完成路径导致的递归调用 → 最终堆栈溢出(stack overflow)。
你的问题本质:同步完成触发同步回调,再次触发同步完成…
复现条件
你使用了:
SetFileCompletionNotificationModes(h, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS);
并写了类似这样的逻辑:
auto error = sock.Receive(buf, len, o.get());
if (error.value() != kIoPending) {o->Invoke(error, len); // 立即调用回调return;
}
而 o->Invoke()
最终会调用你的回调,比如:
[&](std::error_code ec, int bytesRead) {this->OnRead(ec, bytesRead); // 立即开始下一次读取
}
OnRead()
再次调用 Read()
,再次同步完成,又进入 Invoke()
…
➡ 无限递归 → Stack Overflow
正确处理方式
你需要打破同步完成情况下的递归链,方法有:
方法 1:使用 post()
/ queue
异步跳出递归
if (error.value() != kIoPending) {io_context.post([o = std::move(o), error, len]() mutable {o->Invoke(error, len);});return;
}
或者使用 ThreadPool::QueueUserWorkItem()
之类函数来延迟调用。
方法 2:显式检测嵌套/递归
你也可以在 OnRead()
里检查是否在回调上下文中,跳转异步,但这比较脆弱。
方法 3:使用 trampoline / coroutine 更优雅地建模
协程天然解决这种“控制流爆炸”的问题。你之前提到的协程 Read()
示例就很好地避免了这个问题,因为 resume 是延迟调度:
void await_suspend(coroutine_handle<> h) {this->resume = h;auto error = me->sock.Receive(buf, bytes, this);if (error && error.value() != kIoPending) {this->err = error;this->resume(); // resume coroutine, not stack function}
}
因为协程 resume()
本身不会递归调用另一个协程,它使用调度器挂起与恢复。
总结建议
问题 | 原因 | 解决方法 |
---|---|---|
Stack Overflow | 同步完成导致 Invoke() 递归 | 异步跳出调用栈(如 post/reschedule) |
函数回调风格 | 容易出现“栈炸弹” | 用协程 await 模型更安全优雅 |
同步优化 | FILE_SKIP_COMPLETION_PORT_ON_SUCCESS 是有效优化 | 但要小心同步路径上的陷阱 |
这个 tcp_reader::OnRead
的用法目前最大问题是:
同步完成时直接递归调用 OnRead
,会导致栈溢出。
要点
- 当
conn.Read()
立即同步完成(Invoke()
直接调用了回调),回调又马上调用OnRead
,导致递归调用。 - 需要打断这种递归,让后续回调在调用栈外执行。
用例中的改进版写法示例
void tcp_reader::OnRead(std::error_code ec, int bytesRead) {if (ec) return OnError(ec);total -= (int)bytesRead;if (total <= 0 || bytesRead == 0) return OnComplete();auto bufSize = sizeof(buf);// 延迟调用OnRead,防止同步完成递归auto self = this;conn.Read(buf, bufSize,[self](std::error_code ec, int bytesRead) {// 使用事件循环或者线程池的post机制保证异步执行// 这里用一个假设的Post函数实现Post([self, ec, bytesRead]() {self->OnRead(ec, bytesRead);});});
}
关于 Post
的说明
Post
是把任务安排在事件循环/线程池异步执行,不是直接调用。- Windows下用
CreateThreadpoolWork
+SubmitThreadpoolWork
,或者用asio::post()
等类似。 - 如果没有事件循环,也可以用
std::async(std::launch::deferred, ...)
简单模拟。
为什么这样做?
这样能确保即使 conn.Read
同步完成,回调里的 OnRead
也不会马上递归调用,而是排队到事件循环里异步执行,避免调用栈爆炸。
这段代码思路是在 OnRead
里用 do { ... } while (conn.Read(...));
来处理同步完成的 I/O,避免递归:
解析
conn.Read(...)
返回一个bool
,表示是否同步完成(比如立即返回了数据);- 如果同步完成,
conn.Read
直接返回true
,循环继续,OnRead
在循环里自己处理下一批数据; - 如果异步完成,
conn.Read
返回false
,退出循环,后续由回调继续触发OnRead
。
优点
- 避免了递归调用栈溢出的问题;
- 利用了同步完成的 I/O,减少上下文切换开销。
你要保证的地方
conn.Read
实现必须保证同步完成时返回true
,异步时返回false
;- 回调里再调用
OnRead
,继续处理后续异步完成。
伪代码示意:
void tcp_reader::OnRead(std::error_code ec, int bytesRead) {do {if (ec) return OnError(ec);total -= (int)bytesRead;if (total <= 0 || bytesRead == 0) return OnComplete();bytesRead = sizeof(buf);} while (conn.Read(buf, bytesRead, [this](std::error_code ec, int bytesRead) {OnRead(ec, bytesRead);}));
}
总结
- 这是一个高效且简洁的同步+异步混合处理方式;
- 避免了递归栈溢出,也充分利用同步完成提升性能;
- 体现了 C++ 复杂异步 I/O 设计的典范。
这段代码展示了利用同步完成优化后的性能提升,以及结合了 awaitable 设计的 Connection::Read
实现。
性能对比总结
版本 | MB/s | 可执行文件大小(KB) |
---|---|---|
Handcrafted (原始) | 380 | 30 |
Coroutine (原始) | 495 | 25 |
Handcrafted (同步完成优化后) | 485 | 30 |
- 同步完成优化(
SetFileCompletionNotificationModes(h, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS)
)让手写状态机版本性能大幅提升,几乎追平 Coroutine 版本。 - Coroutine 版本体积略小,性能也很强。
代码核心解析
auto Connection::Read(void* buf, int len) {struct awaiter : AwaiterBase {Connection* me;void* buf;awaiter(Connection* me, void* buf, int len) : me(me), buf(buf) { bytes = len; }bool await_ready() { return false; }void await_suspend(coroutine_handle<> h) {this->resume = h;auto error = me->sock.Receive(buf, bytes, this);if (error.value() == kIoPending) return; // 异步挂起,等待回调resumeif (error) throw system_error(err); // 出错直接抛异常return; // 同步完成,无需挂起}int await_resume() {if (this->err) throw system_error(err);return bytes;}};return awaiter{ this, buf, len };
}
await_ready()
总是返回false
,保证总是挂起(但实际是否挂起由await_suspend
里的逻辑决定)。await_suspend
发起异步请求,若同步完成(error.value() != kIoPending
),则不挂起,直接返回。await_resume
返回读取的字节数或者抛出异常。
结合之前的同步完成优化:
- 当 I/O 同步完成时,
await_suspend
不挂起 coroutine,直接返回,避免额外调度开销。 - 当异步时,coroutine 挂起,等待完成回调调用
resume()
恢复执行。
总结
- 这个设计将传统回调式 I/O 与 C++20 协程无缝结合。
- 利用操作系统的
FILE_SKIP_COMPLETION_PORT_ON_SUCCESS
优化,减少同步完成时的上下文切换。 - 性能接近手写状态机,但代码更简洁且易维护。
这段代码是一个典型的基于协程的 awaitable I/O 设计,实现了异步读取操作。
核心点解析:
1. AwaiterBase 结构体
struct AwaiterBase : os_async_context {coroutine_handle<> resume; // 用于恢复协程执行的句柄std::error_code err; // 记录I/O错误状态int bytes; // 记录读到的字节数static void io_complete_callback(CompletionPacket& p) {auto me = static_cast<AwaiterBase*>(p.overlapped);me->err = p.error;me->bytes = p.byteTransferred;me->resume(); // I/O完成后恢复协程}
};
- 继承自
os_async_context
,表示这是操作系统异步上下文。 resume
是协程恢复执行的关键。io_complete_callback
是 I/O 完成时由操作系统线程池回调的函数,更新状态后调用resume()
。
2. Connection::Read 的 Awaiter
auto Connection::Read(void* buf, int len) {struct awaiter : AwaiterBase {Connection* me;void* buf;awaiter(Connection* me, void* buf, int len) : me(me), buf(buf) { bytes = len; }bool await_ready() { return false; } // 总是挂起,等着 I/O 结果bool await_suspend(coroutine_handle<> h) {this->resume = h;auto error = me->sock.Receive(buf, bytes, this); // 发起异步读if (error.value() == kIoPending) return true; // 返回 true 表示挂起协程,等待回调恢复if (error) throw system_error(err); // 立即错误,抛异常return false; // 同步完成,无需挂起}int await_resume() {if (this->err) throw system_error(err);return bytes; // 返回实际读到的字节数}};return awaiter{ this, buf, len };
}
await_ready()
返回 false,保证进入await_suspend
。await_suspend()
发起读操作:- 如果返回
kIoPending
,表示异步操作,协程挂起,等待回调恢复。 - 如果立即完成(不是
kIoPending
),则不挂起协程,直接返回。 - 出错时抛异常。
- 如果返回
await_resume()
返回读取字节数或抛异常。
总结:
- 这就是典型的协程 Awaitable 对象,它把异步 I/O 通过协程无缝表达。
io_complete_callback
会在 I/O 完成时被系统调用,然后通过resume()
恢复挂起的协程,协程从await_suspend
之后继续执行。- 如果 I/O 同步完成(同步返回成功),则协程不挂起,提升性能。
这段内容描述的是 C++20 协程中的 co_await
运算符的语义展开过程,帮你理清它的执行流程:
await expr 的等价展开示意
{auto&& tmp = <expr>; // 计算表达式,得到一个 Awaitable 对象引用if (!tmp.await_ready()) { // 询问 Awaitable 是否准备好(是否需要挂起)tmp.await_suspend(<coroutine_handle>); // 如果没准备好,挂起协程并保存恢复点}return tmp.await_resume(); // 协程恢复时返回结果
}
具体步骤解释
auto&& tmp = <expr>;
将co_await
后的表达式求值,得到一个 Awaitable 类型对象(比如刚才说的awaiter
)。tmp.await_ready()
询问 Awaitable 是否已经准备好(同步完成)。如果返回true
,协程不挂起,直接继续执行。tmp.await_suspend(coroutine_handle)
如果异步,调用挂起函数保存协程上下文,协程状态挂起,等待事件完成后恢复。tmp.await_resume()
当事件完成时协程被恢复,从这里继续执行,返回操作结果或抛异常。
简单图示:
co_await expr
↓ 等价于 ↓
auto&& tmp = expr;
if (!tmp.await_ready()) {tmp.await_suspend(coroutine_handle);
}
return tmp.await_resume();
你可以把这个看成:
- 检查状态(是否准备好)
- 挂起协程,等待完成
- 恢复协程,返回结果
是 co_await <expr>
在编译器里的展开等价形式,具体是:
{auto&& tmp = <expr>;if (!await_ready(tmp) && await_suspend(tmp, <coroutine-handle>)) {// 协程挂起(suspend)return;}// 协程继续执行,调用 await_resume() 获取结果(resume)return await_resume(tmp);
}
解释
auto&& tmp = <expr>;
计算co_await
后的表达式,得到一个 awaitable 对象。await_ready(tmp)
询问该 awaitable 是否已经准备好(是否可同步返回结果)。- 返回
true
,表示准备好了,协程不用挂起直接执行下一步。 - 返回
false
,则可能需要挂起。
- 返回
await_suspend(tmp, coroutine_handle)
如果没准备好,调用挂起函数,让协程挂起等待异步操作完成。- 返回
true
,表示协程确实挂起了,控制权交出,函数直接返回等待恢复。 - 返回
false
,表示挂起失败或不需要挂起,协程继续执行。
- 返回
await_resume(tmp)
挂起恢复后,调用这个函数获取结果或抛异常。
重点区别于之前:
await_suspend
返回值决定是否真的挂起协程。- 只有当
await_ready
为 false 且await_suspend
返回 true 时才挂起。
如果理解这三步(ready、suspend、resume),就掌握了协程等待的核心机制。
这段代码展示了如何用一个固定的 OverlappedBase
对象(存在成员变量 wo
中),避免在每次异步读操作时都做内存分配,从而提升性能。
代码关键点解读
class tcp_reader {std::unique_ptr<detail::OverlappedBase> wo; // 保存回调对象,避免重复分配...tcp_reader(int64_t total) : total(total) {// 创建一次,绑定回调Lambda到wowo = detail::make_handler_with_count([this](auto ec, int nBytes) { OnRead(ec, nBytes); });...}void OnRead(std::error_code ec, int bytesRead) {if (ec) return OnError(ec);do {total -= (int)bytesRead;if (total <= 0 || bytesRead == 0) return OnComplete();bytesRead = sizeof(buf);} while (conn.Read(buf, bytesRead, wo.get())); // 复用同一个wo指针}
};
为什么这么写?
- 异步I/O的回调包装器
OverlappedBase
往往是动态分配的(new),每次都分配和释放内存会有性能开销。 - 这里将回调对象
wo
持有在类成员中,只分配一次,后续多次读操作共用它,避免频繁分配释放。 conn.Read(...)
传入的回调对象用的是这个复用的指针,保证异步完成时调用同一个回调处理逻辑。
这也符合之前的同步完成优化
在 Read()
中调用 conn.Read()
时,可能同步完成,则回调立即调用;否则I/O挂起,完成时异步调用回调。共用同一个回调对象简化管理。
总结就是:
将回调包装器对象生命周期绑定到读者对象,避免多次分配和释放,提升性能。
总结了几个主流语言中协程(coroutines)和异步编程的流行写法,展示了协程正在各大语言生态里普及的趋势。让我帮你总结和理解一下:
几个语言的简单协程代码示例
Dart 示例
import 'package:http/http.dart' as http;
Future<int> getPage(String t) async {var c = http.Client();try {var r = await c.get(Uri.parse('http://url/search?q=$t'));print(r.body);return r.body.length;} finally {await c.close();}
}
void main() async {int len = await getPage("coroutines");print("Length: $len");
}
Python 示例 (PEP 492)
import asyncio
async def abinary(n):if n <= 0:return 1l = await abinary(n - 1)r = await abinary(n - 1)return l + 1 + r
async def main():result = await abinary(3)print(result)
asyncio.run(main())
C# 示例
using System;
using System.Threading.Tasks;
class Program {static async Task<string> WaitAsynchronouslyAsync() {await Task.Delay(10000);return "Finished";}static async Task Main(string[] args) {string result = await WaitAsynchronouslyAsync();Console.WriteLine(result);}
}
PHP Hack 示例
async function gen1(): Awaitable<int> {$x = await Batcher::fetch(1);$y = await Batcher::fetch(2);return $x + $y;
}
C++20 示例 (简化示意)
#include <coroutine>
#include <iostream>
#include <thread>
#include <chrono>
struct Task {struct promise_type {Task get_return_object() { return {}; }std::suspend_never initial_suspend() { return {}; }std::suspend_never final_suspend() noexcept { return {}; }void return_void() {}void unhandled_exception() { std::terminate(); }};
};
Task example() {std::cout << "Start\n";co_await std::suspend_always{}; // 简单模拟awaitstd::cout << "Resume\n";
}
int main() {auto t = example();std::cout << "Main running\n";// 这里就不展开完整协程调度了,C++20协程还需要结合事件循环/调度器
}
要帮你写某个语言里具体复杂点的协程用法,或者讲讲底层原理,都可以告诉我!
主要点
- Dart (1.9)
使用async
和await
,写起来非常直观,类似同步代码。Future<int>
表示异步返回整数的“未来值”。 - Python (PEP 0492)
使用async def
和await
,写异步递归或其他异步逻辑。
例子里的递归函数abinary
是异步计算,体现异步代码也能写复杂递归。 - C#
用async Task<string>
和await
实现异步等待,C# 的异步/等待是非常成熟且被广泛使用的模型。 - HACK(Facebook的语言)
同样有async
和await
,并且用类型标注Awaitable<int>
,也是协程范式的应用。 - C++17 (当时还在尝试,后续C++20才正式有协程支持)
用future<string>
和类似await sleep_for()
的语法示例,展示了当时对异步支持的探索。
协程为什么这么火?
- 代码更简洁:异步操作写成同步风格,逻辑清晰。
- 性能好:相比线程,协程开销更小,适合大量并发。
- 跨语言趋势:几乎所有现代语言都支持协程/async-await,表明这是异步编程的未来方向。
这部分讲的是C++协程设计的总体原则和理念,很核心也很经典!
总结一下这几个设计原则和概念的含义:
Generalized Function — Coroutine设计理念
- POF(Plain Old Function)与Coroutine统一接口
用户和编译器不需要关心函数是普通函数还是协程,调用方式保持一致,这样降低了使用门槛。 - 设计师与用户关注点分离
协程库设计者可以自由实现不同的协程语义(比如任务、生成器、async-await),用户只需要用await
或co_yield
等简单语法使用协程。 - Monad(单子)思想
你可以理解为协程的await
相当于“绑定”(bind)操作,它将异步操作串联起来,类似函数式编程的单子概念。 - “await = suspend”
await
表达式触发协程挂起(suspend),之后等待异步操作完成。 - 支持多种协程模型
- Task(任务):比如异步计算
- Generator(生成器):支持
yield
,懒惰生成数据流 - Async Generator:结合
await
和yield
,既能等待异步结果,也能逐步产生数据
C++协程设计原则
- 可扩展性
能支持上亿协程同时运行(轻量级的切换机制) - 高效性
挂起和恢复操作的开销要接近普通函数调用 - 无缝集成现有设施
比如线程、异步IO、异常处理,协程机制不增加额外开销 - 开放的协程机制
允许库设计者定义自定义的协程类型,支持多种语义的协程库(生成器、任务、管道、goroutine等) - 适用无异常环境
支持在禁用异常的环境下依然可以使用协程
总结
这个设计理念让C++协程非常灵活和强大,既方便用户使用,也给库设计者留足了定制空间,同时又能做到高性能和大规模并发。