CppCon 2014 学习:ASYNC SEQUENCES AND ALGORITHMS
“ASYNC SEQUENCES AND ALGORITHMS”和“BUILDING USABLE ASYNC PRIMITIVES”是现代C++或其他语言里,关于异步编程的主题,核心点在于:
Async Sequences and Algorithms
- 异步序列:不像普通的同步序列一次性全部生成,异步序列的元素可能是“未来”才产生的,或者依赖异步事件。
- 异步算法:能处理异步序列的算法,比如异步过滤、映射、归约等。
- 应用场景:网络请求流、事件流、文件IO流、用户输入等。
Building Usable Async Primitives
- 异步原语:基础的异步构件,比如异步任务、协程、future/promise、观察者模式、事件循环等。
- 目标:让异步编程更简单、安全、高效,避免回调地狱,方便组合。
- 设计原则:易用、可组合、避免竞态条件、易于取消和错误处理。
举个简单类比
同步算法处理的是“立刻可用”的数据流,异步算法处理的是“随时可能到达”的数据流,需要用异步原语来“等待”或“监听”。
如果你想,我可以帮你介绍某种语言(如C++20的std::async
、协程,或者JavaScript的Promise/async-await)中如何实现这些,或者给你示范简单的异步序列代码示例。你感兴趣吗?
不同类型的异步数据流的资源使用特点:
🔸 原文拆解与中文理解:
使用所有可用资源
意味着某些算法或流设计是“激进并发”的,可能同时启动多个任务、线程、I/O 请求等,目的是最大化吞吐。
独立且激进执行(即非惰性)
这类任务之间互不依赖,可以并行启动且立即开始计算,例如:并行 map、并发下载等。
使用最少资源
与“使用所有资源”相对,指设计上偏向保守、节省,避免过多内存、线程或 CPU 占用,例如延迟加载、懒惰求值等。
相互依赖且惰性执行
例如流式解析 JSON、逐帧处理视频等场景,前一项未完成,后一项不能启动。同时,只有被请求时才执行,属于懒执行。
异步与并发流不总是“字节流”
很关键:现代 async/await、reactive、generator 风格的“流”可以是:
stream<T>
(泛型值)Observable<int>
、IAsyncEnumerable<T>
(.NET)std::ranges::view
(C++23)
并不限于传统 socket 的“字节流”。
一切归根结底是算法
重点在于:是否**并行?懒惰?按需?回压?断流重试?**这些行为必须靠算法决定,而不是靠底层 API。
带时间索引的
<T>
序列
比如音视频帧流、事件流、传感器数据流,每个元素不仅有值 T
,还有时间戳/顺序信息。
这类流常用于:
- 音视频处理(帧序列)
- 时间序列分析(金融/IoT)
- 延迟控制的网络算法(rate limiting)
有些异步流会尽可能使用全部资源并立即启动处理,尤其当任务彼此独立。而有些设计则非常节省资源,仅在必要时启动,特别当任
务之间存在依赖。异步流不一定是字节流,它们可以是任意类型的泛型数据流。关键在于选择合适的算法模式,特别是在处理带时间信息的异步数据序列时。
如你需要我将这些思想用在 C++ 代码、图示、或结合某个库(如 ICU、RxCpp、Coro、async-generator)来展示,请告诉我,我可以继续展开。
你列出的这些术语看起来像是在理解某种异步事件、系统信号或时间序列事件的“示例类别”场景。下面是对它们的逐一解释,适用于异步编程、事件驱动架构(如 C++ 协程、操作系统内核、信号处理等):
1. Periodic Timer(周期性定时器)
- 定义:一个定时器每隔固定时间间隔触发一次事件。
- 场景:心跳检测、UI 刷新、定期保存状态等。
- 示例:
// 每隔 1 秒触发一次 while (true) {co_await timer(1s);do_something(); }
2. MouseClicks(鼠标点击)
- 定义:用户点击鼠标时系统生成的事件。
- 场景:UI 交互处理、游戏控制、绘图应用。
- 特性:离散的、不确定发生时间,通常通过事件监听器处理。
- 示例:
while (true) {auto click = co_await next_mouse_click();handle(click); }
3. MouseMoves(鼠标移动)
- 定义:鼠标在屏幕上移动所触发的连续事件。
- 场景:拖拽操作、实时绘图、悬停提示。
- 特性:事件频率较高;可能需要节流处理(throttling)。
4. NetworkPackets(网络数据包)
- 定义:网络接口接收到的每一个数据包。
- 场景:聊天应用、实时通信、服务器监听请求。
- 特性:异步到达;可能乱序;需解析和确认。
5. FileReads/Writes(文件读写)
- 定义:对文件系统进行的读取/写入操作。
- 场景:加载配置文件、日志记录、保存用户数据。
- 异步特性:
- I/O 操作较慢
- 通常通过异步文件 API 处理,以避免阻塞线程
这些是什么的“示例”?
这些是**异步序列(asynchronous sequences)**的典型例子,也就是:
系统中某些事件/操作 “以时间推移发生”,可以被程序“订阅”或“异步等待”。
在现代 C++(如 C++23)的协程、generator
, awaitable
、sender/receiver
模型中,这些是常见的用例。
统一理解:
名称 | 是否连续 | 是否异步 | 应用场景 |
---|---|---|---|
Periodic Timer | 是 | 是 | 定期执行 |
MouseClicks | 否 | 是 | 用户交互事件 |
MouseMoves | 是 | 是 | 实时输入 |
NetworkPackets | 是 | 是 | 网络通信、协议处理 |
FileReads/Writes | 否 | 是 | I/O 操作,适合异步处理 |
如果你正在看的是 CppCon 或某个关于 “异步序列 / generator / ranges / async” 的讲座,这些就是作为“数据流的例子”列出来的。 | |||
需要我配合具体代码讲解这些场景在 C++ 中如何实现吗? | |||
你贴的这段文档是对 ReactiveX 响应式编程模型中 Observable 和 Observer 的系统性解释。我来用 C++ 的视角,逐步为你分解这段内容,帮助你彻底理解其中的机制和逻辑。 |
1. 基本结构:Observable 和 Observer 是什么?
🔹 Observable(事件源):
是一个可以发出(emit)数据或事件的对象。
它就像一个异步的生产者,在某个时机会发出数据(调用 onNext)、发出错误(onError),或结束(onCompleted)。
🔹 Observer(观察者):
是一个可以订阅 Observable 并处理数据的对象。
它提供三个处理函数:
onNext(T)
:处理发出的数据项onError(exception)
:处理错误onCompleted()
:处理结束信号
在 C++ 中,我们会用函数对象(比如 lambda)来写这些处理函数。
2. 为什么说它是“非阻塞、并发友好”的?
传统调用(同步):
auto result = someMethod();
process(result); // 等 someMethod 完成
Rx 式调用(异步):
observable.subscribe([](int item) { process(item); },[](std::exception_ptr ep) { handleError(ep); },[]() { std::cout << "complete" << std::endl; }
);
// 不阻塞,继续做别的事情
也就是说:观察者是哨兵(sentinel),站在那里等待事件,而不是主动轮询或阻塞。
3. 冷 vs 热 Observable
Cold Observable:
- 只有在订阅时才开始发出数据。
- 每个订阅者都会看到完整的数据流(从头开始)。
类似于:你点播一部电影(自己控制播放)。
Hot Observable:
- 数据从创建时就开始发出。
- 新订阅者只能看到订阅之后的数据。
类似于:你收看直播(只看到你加入之后的内容)。
4. 订阅机制(subscribe)
调用 .subscribe()
的时候会触发 Observable 的数据流,并将数据发送给 Observer。
例如(RxCpp):
auto source = rxcpp::observable<>::range(1, 3);
source.subscribe([](int x) { std::cout << "onNext: " << x << std::endl; },[]() { std::cout << "onCompleted" << std::endl; }
);
这个订阅者会依次接收:1、2、3,然后收到“完成”信号。
5. Marble Diagram(弹珠图)
弹珠图是 ReactiveX 中用来表示数据流的图形语言。
它的结构是这样的:
--1--2--3--| // 发出数据项 1, 2, 3,然后完成
--1--X // 发出 1,然后出错(X 表示 error)
- 横线表示时间线
- 数字表示发出的数据项(onNext)
|
表示完成(onCompleted)X
表示出错(onError)
在调试 Rx 程序时,用它来理解流的“形态”很有帮助。
6. 操作符(Operators)
操作符就是对 Observable 进行处理的工具,类似于 C++ STL 中的 transform
, filter
, reduce
。
rxcpp::observable<>::range(1, 5).filter([](int i) { return i % 2 == 0; }) // 过滤偶数.map([](int i) { return i * 10; }) // 每个值乘 10.subscribe([](int x){ std::cout << x << std::endl; });
输出:
20
40
这和下面的 STL 写法很相似:
for (int i = 1; i <= 5; ++i) {if (i % 2 == 0) {std::cout << i * 10 << std::endl;}
}
7. 链式操作的本质(区别 builder)
Rx 的操作链不是在一个对象上操作,而是生成新的 Observable
每个操作符(map、filter 等)返回的是一个新的事件源,所以每次链式调用本质上都是构建一个 流的管道(pipeline)。
8. 退订(Unsubscribe)
有时你想手动中断监听。
auto sub = observable.subscribe(onNext, onError, onCompleted
);
sub.unsubscribe(); // 停止接收
退订的作用:
- 告诉 Observable:你不再关心数据
- 它可能会中止发出数据(节省资源)
总结(C++角度记忆)
概念 | 类比 C++ 概念 |
---|---|
Observable | 异步数据生成器,像 std::function<void(Observer)> |
Observer | 类似于回调组合(onNext/onError/onComplete) |
subscribe() | 注册回调并开始数据流 |
冷 / 热 Observable | 点播 vs 直播 |
map/filter等操作符 | 类似 STL 算法链 |
unsubscribe | 提前退出回调流 |
如果你希望,我可以帮你写一个完整的 RxCpp 项目模板或某个具体例子(比如:网络请求、传感器数据、数据流转换等)。 | |
当然,下面是你这段代码的详细注释版,帮你理解每一步在做什么: |
“COMBINE TIME AND MOVEMENT”这段代码的核心思想是:将基于时间的轨道运动和基于鼠标的移动位置结合起来,生成一个动态变化的点,供后续绘制使用。
// 生成基于时间的轨道点Observable,
// 先用updates.milliseconds()拿到时间流,再转成0~1周期,再映射到轨道点坐标
auto auto_orbit_points = orbitPointsFromTimeInPeriod(timeInPeriodFromMilliseconds(updates.milliseconds() // 获取当前时间(毫秒),作为事件流)
);
// 生成基于鼠标移动的点Observable
auto auto_location_points = mouse.moves() // 获取鼠标移动事件的Observable流,发出鼠标位置.map(pointFromMouse); // 把鼠标事件映射成ofPoint类型的位置坐标
// 把两个Observable:位置点和轨道点,合并起来
// combine_latest会在两个Observable中任意一个发出新值时,
// 用std::plus<>(即加法)合并它们的最新值,发出合并后的新点
location_points.combine_latest(std::plus<>(), orbit_points).subscribe([=](ofPoint c) { // 订阅合并后的点流// 每当有新值时,更新绘图时用的中心点center = c;});
具体理解如下:
- 时间驱动的轨道点(auto_orbit_points)
通过时间(毫秒)不断产生一个0到1之间的周期值,
再将这个周期值映射成圆周上的点(轨道点)。
这相当于让点随着时间在一个圆上运动。 - 鼠标位置点(auto_location_points)
监听鼠标移动事件,获取当前鼠标位置。
通过map
转换成点的形式,实时反映鼠标当前坐标。 - 合并轨道点和鼠标点(combine_latest)
使用combine_latest
,当任一点(轨道点或鼠标点)更新时,
取两个点的最新值相加,生成一个新的点。
这个点代表轨道点加上鼠标移动的偏移,产生“轨道+鼠标”的叠加效果。 - 订阅合成后的点流,更新绘制中心
通过订阅合成后的点流,每次产生新点时,
更新绘制用的中心点center
,让绘制跟随轨道和鼠标的结合动态变化。
简单说就是:
- 点沿着轨道根据时间运动,
- 点又根据鼠标移动做偏移,
- 最终合成一个点,实时更新界面显示。
这样可以实现一种既跟时间动又跟用户操作互动的动画效果。
很适合制作交互式动画、游戏里跟随鼠标绕圈运动的元素。
总结:
auto_orbit_points
:轨道上的点,随着时间变化持续发出。auto_location_points
:鼠标位置点流。combine_latest
:两个点相加,生成一个新的点流。subscribe
:订阅这个新的点流,每次更新时设置center
,供后续绘制使用。
ORBIT FROM TIME
// 返回一个发出0.0到1.0之间周期值的Observable,
// 输入是以毫秒计时的时间流
rxcpp::observable<float>
ofxCircle::timeInPeriodFromMilliseconds(rxcpp::observable<unsigned long long> timeInMilliseconds) {return timeInMilliseconds.map([this](unsigned long long tick) {// 将tick(时间戳毫秒数)对轨道周期(orbit_period秒,转成毫秒)取模,// 使其周期性循环,然后映射到0.0~1.0区间return ofMap(tick % int(orbit_period * 1000), // 取模周期毫秒数0, // 输入范围起点int(orbit_period * 1000), // 输入范围终点0.0, // 输出范围起点1.0 // 输出范围终点);});
}
// 返回一个发出圆周上点的Observable,
// 输入是0.0到1.0之间表示周期进度的时间流
rxcpp::observable<ofPoint>
ofxCircle::orbitPointsFromTimeInPeriod(rxcpp::observable<float> timeInPeriod) {return timeInPeriod.map([this](float t) {// 根据t计算对应圆周上的点坐标// t乘以2π得到角度,计算圆上x,y坐标return ofPoint(orbit_radius * std::cos(t * 2 * 3.14), // x坐标orbit_radius * std::sin(t * 2 * 3.14) // y坐标);});
}
理解这个“ORBIT FROM TIME”代码片段的意图,可以总结为:
核心目的:
基于时间,计算一个点沿着圆周轨迹的位置,实现“轨道运动”的效果。
具体流程:
- 输入时间(毫秒)流
- 程序接收一个不断变化的时间流(单位是毫秒,通常是系统更新时间)。
- 将时间转换为周期内的相对位置(0.0 到 1.0)
- 以一个固定的轨道周期(
orbit_period
,单位秒)为基准,将当前毫秒时间转换成该周期内的相对进度,比如时间流在周期中的进度百分比。 - 这个百分比用0到1的浮点数表示,表示当前位置在轨道周期里的相对位置。
- 以一个固定的轨道周期(
- 将周期进度映射到圆周上的点坐标
- 利用三角函数(cos 和 sin),把这个0~1的进度转换成对应圆上的点(
ofPoint
),其中x = r*cos(θ)
,y = r*sin(θ)
,角度θ = 进度 * 2π。 - 这里的
orbit_radius
是圆的半径。
- 利用三角函数(cos 和 sin),把这个0~1的进度转换成对应圆上的点(
代码用意:
- 利用RxCpp的
observable
和map
函数实现数据流的变换。 - 不断根据时间变化产生新的点坐标,实现动画中点沿圆轨迹运动的效果。
- 这种设计非常适合响应式编程,可以优雅地处理异步时间更新。
简单类比:
这就像你有一个秒针指针,它不停转动,通过时间计算秒针当前的位置,显示秒针的位置坐标。
你觉得这个讲解清楚吗?要不要我帮你写一个更简单的伪代码版本?
Rx 的起源(ORIGIN)
Reactive Extensions(简称 Rx)最初是由微软为 .NET 平台开发的,叫做 Rx.NET。
Rx.NET 跟随了 LINQ 的设计理念
LINQ(Language Integrated Query)是 .NET 里用于查询数据的一个强大且优雅的语言特性,允许用统一的方式操作各种数据源(数组、数据库、XML 等)。
Rx.NET 借鉴了 LINQ 的设计思路,把 数据流和异步事件看作“可查询的序列”,并且用类似 LINQ 的操作符(比如 map
、filter
、combineLatest
等)来组合和处理异步事件流。
换句话说:
Rx.NET 就是把 异步事件序列 看成类似于 LINQ 里操作的“集合”,用链式、声明式的风格来处理异步流,实现响应式编程。
这段 C# 代码展示了 LINQ 的基本用法:
List<string> fruits = new List<string> { "apple", "passionfruit", "banana", "mango","orange", "blueberry", "grape", "strawberry" };
// 从 fruits 里筛选出长度小于6的字符串
IEnumerable<string> query = fruits.Where(fruit => fruit.Length < 6);
// 生成1到10的整数序列,并计算每个数的平方
IEnumerable<int> squares = Enumerable.Range(1, 10).Select(x => x * x);
具体解释:
fruits
是一个字符串列表,包含多种水果名。query
用 LINQ 的Where
方法过滤fruits
,筛选出字符串长度小于6的水果名字,比如"apple"
,"mango"
,"grape"
。Enumerable.Range(1, 10)
生成从 1 到 10 的整数序列(共10个数)。.Select(x => x * x)
把上面的序列每个数映射为它的平方,结果是1, 4, 9, ..., 100
。
这段代码展示了 LINQ 的核心思想:
- 把数据看作“序列”;
- 通过高阶函数(
Where
,Select
)声明式地过滤和转换数据; - 返回结果是延迟执行的
IEnumerable<T>
,直到真正遍历时才计算。
如果你想,我可以帮你把这段代码用 Rx(Reactive Extensions)写个类似版本,方便理解两者异同!你需要吗?
LINQ(Language Integrated Query)理解
- LINQ 是一组针对
IEnumerable<T>
(和其他序列接口)的算法和查询操作符,用来方便地操作集合数据。 - 它让你可以用类似 SQL 的语法来查询、过滤、投影(转换)、分组等操作序列数据。
- 这些操作符底层实现类似于 C++ 中的范围算法(range-based algorithms),比如
std::transform
、std::copy_if
、std::accumulate
等。 - 命名和语法借鉴自 SQL,比如
Where
类似于 SQL 的WHERE
条件过滤,Select
类似于SELECT
投影,GroupBy
类似于GROUP BY
分组。
举例
Where
— 过滤满足条件的元素Select
— 对元素做映射/转换OrderBy
— 排序GroupBy
— 分组Join
— 连接两个序列
这让 C# 集合操作更直观,更易于表达复杂查询。
LINQ 与 C++ range 算法的对比
- C++ Range算法是面向迭代器的一组通用算法,比如
std::transform
、std::copy_if
,操作容器的元素。 - LINQ是语言集成的查询表达式,支持延迟执行和链式调用,更像是声明式操作序列。
- 两者目的类似:方便遍历和处理集合,但 LINQ 提供了更丰富的查询表达能力和语法糖。
“Rx is LINQ inverted”(Rx 是 LINQ 的反转),总结了 ReactiveX(Rx)和 LINQ 的核心区别和联系。我们来逐一解释这句话中涉及的概念:
LINQ:Pull 模型
- LINQ(Language Integrated Query) 是用于操作集合的统一查询语法。
- 它基于
IEnumerable<T>
,是一种 Pull 模型(拉取)。 - 消费者主动调用
MoveNext()
来获取下一个数据项(就像for
循环或迭代器)。 - 特点:
- 拉:数据消费者主动要数据。
- 同步:调用者等待数据返回。
示例:
IEnumerable<int> squares = Enumerable.Range(1, 5).Select(x => x * x);
foreach (var s in squares) {Console.WriteLine(s); // 消费者主动拉取数据
}
Rx:Push 模型
- Rx(Reactive Extensions) 是对异步数据流的函数式编程扩展。
- 它基于
IObservable<T>
,是一种 Push 模型(推送)。 - 数据源(Observable)主动调用
OnNext()
推送数据给观察者(Observer)。 - 特点:
- 推:数据源自动推送数据。
- 异步:数据到来后自动触发回调,不用阻塞等待。
示例:
IObservable<int> numbers = Observable.Range(1, 5);
numbers.Subscribe(x => Console.WriteLine(x)); // 数据源推送数据到观察者
时间维度的关键差异:时间感知
- LINQ 操作的是已存在的、同步的数据(例如内存中的集合),不涉及时间。
- Rx 操作的是未来的数据流,时间是第一类概念(First-class concept),因为:
- 数据可以在任何时间发出(例如用户点击、传感器、网络响应)。
- Rx 提供了丰富的时间操作符:如
Delay
,Interval
,Throttle
,Timeout
等。
对照总结:
特性 | LINQ (IEnumerable ) | Rx (IObservable ) |
---|---|---|
数据流方向 | Pull(拉) | Push(推) |
时间 | 静态、同步集合 | 动态、异步事件流 |
示例 | foreach 循环 | Subscribe 观察 |
应用场景 | 数组、列表等数据结构 | 鼠标事件、网络、动画等 |
终止方式 | MoveNext == false | OnCompleted() 或 OnError() |
总结一句话:
Rx 是对 LINQ 的反转 —— 它不是你去拉数据,而是数据自己找上门来。
如果你已经熟悉 LINQ,那么理解 Rx 就像“反转”你的思维方式,从「我要数据」变成「我等数据来」。
是否需要我用 C++ / RxCpp 举一个 Pull vs Push 的实际对比例子?
LINQ (Pull)
// IEnumerable<T>接口定义,表示一个可枚举的集合
// GetEnumerator方法启动对集合的一个独立的、有序的遍历
IEnumerator<T> IEnumerable<T>::GetEnumerator();
// 返回一个IEnumerator<T>枚举器,允许按顺序访问集合元素
// IEnumerator<T>接口继承了IDisposable接口,支持资源释放和取消操作
void IEnumerator<T>::IDisposable::Dispose();
// 释放枚举器占用的资源或取消枚举(如关闭文件、数据库连接等)
// IEnumerator<T>接口的核心方法,移动到下一个元素
bool IEnumerator<T>::MoveNext();
// 移动到下一个元素,
// 如果还有元素,返回true;遍历结束返回false
// IEnumerator<T>接口的属性,获取当前元素
T IEnumerator<T>::Current;
// 获取当前元素的值,
// 如果当前状态无效(未调用MoveNext或已遍历完),访问此属性会抛异常
Rx (Push)
// IObservable<T> 接口:表示一个事件源(Observable)
// Subscribe 方法建立订阅关系,Observer 开始接收数据流
IDisposable IObservable<T>::Subscribe(Observer<T> observer);
// 订阅一个观察者(Observer),开始接收事件(即数据项、错误或完成通知)
// 返回 IDisposable 接口的对象,用于取消订阅(停止接收事件)
// IDisposable 接口的 Dispose 方法:取消订阅
void IDisposable::Dispose();
// 调用此方法表示取消订阅,Observer 将不再接收来自 Observable 的任何事件
// Observer<T> 是观察者接口的一个实现,用于接收 Observable 发出的事件
Observer<T>::OnComplete();
// 当 Observable 正常结束(无错误)时调用,等价于 IEnumerator<T>::MoveNext() 返回 false
// 表示数据流已完成,后续不再有 OnNext 调用
Observer<T>::OnNext(T value);
// 当 Observable 发出一个新数据项时调用,等价于 IEnumerator<T>::Current
// 用于处理收到的数据项(数据流中的一个元素)
Observer<T>::OnError(Exception ex);
// 当 Observable 发生错误时调用,等价于 IEnumerator<T>::Current 抛出异常
// 一旦调用该方法,数据流立即终止,不再有 OnNext 或 OnComplete 调用
TIME
// 获取当前调度器的时间(当前时间点)
// 可以用来和调度时间进行比较
DateTimeOffset Scheduler::Now;
// 使用调度器在指定的未来时间执行某个动作
// 参数:dueTime 是希望执行动作的时间点(DateTimeOffset)
// 参数:action 是一个无参数的回调函数(Action)
// 返回:一个 IDisposable 对象,用于取消该调度任务(如果还未执行)
IDisposable Scheduler::Schedule(DateTimeOffset dueTime, Action action);
// 取消调度任务的执行(如果还未被调度器执行)
// 调用后,该任务不会被执行;如果已执行则没有作用
void IDisposable::Dispose();
// 定义要被调度执行的具体操作(无参数、无返回值)
// 调度器将在指定时间点调用这个函数
void Action::Action();
描述 ReactiveX(Rx)协议契约(contract) 中事件流的行为规则,尤其是有关 OnNext
/ OnError
/ OnCompleted
/ Dispose
的执行顺序与约束。
以下是逐条的解释与理解:
1. OnNext, OnError 和 OnComplete 调用总是串行的(serialized)
- 意思是:
OnNext()
、OnError()
和OnCompleted()
这些回调方法不会并发调用,一定是顺序、一个接一个地执行。 - 目的:避免线程竞争和状态不一致问题。
举个例子:
即使 Observable
在多个线程上发出事件,Rx 会确保你在 Observer
中的处理是安全的:
observable.Subscribe(x => {// 这个回调不会与 OnError 或 OnComplete 同时执行
});
2. Dispose、OnError 和 OnComplete 都会终止订阅
Dispose()
:手动取消订阅,终止数据流。OnError()
:发生错误后,数据流终止,不会再发出 OnNext 或 OnCompleted。OnCompleted()
:数据正常发完了,流终止。
一旦其中一个发生,订阅生命周期结束,Observable 不会再推送数据。
3. OnError 和 OnComplete 会触发 Dispose
- 在 Rx 中,你无需手动调用 Dispose。
- 如果 Observable 调用了
OnError()
或OnCompleted()
,Rx 框架自动帮你调用 Dispose(),回收资源。
等效逻辑如下:
OnError():-> stop further OnNext-> call Dispose()
OnCompleted():-> stop further OnNext-> call Dispose()
总结为一句话:
在 Rx 中,
OnNext
/OnError
/OnCompleted
是严格按顺序调用的;一旦OnError
或OnCompleted
被触发,订阅立即终止,并自动触发Dispose
释放资源。
“Time in Rx is Discrete, not Continuous” 是在说明 Reactive Extensions (Rx) 与 Functional Reactive Programming (FRP) 的一个核心区别,特别是它们如何处理“时间”这个概念。
理解这句话:
Rx 中的时间是离散的 (Discrete)
- 事件发生时才传递值。每次
OnNext
表示一个离散的时间点。 - 时间由事件驱动(push):如果没有事件发生,Rx 系统是“静止”的。
- 时间流不是连续线,而是像这样:
----x--------x---x--------x----->↑ ↑ ↑ ↑OnNext OnNext ... OnCompleted
FRP(Functional Reactive Programming)中的时间是连续的 (Continuous)
- 在原始 FRP 模型(如 Yampa、Elm 最初版本)中,值是随时间连续变化的:
- “行为 (Behavior)”:是一个关于时间的函数,例如:
position(t) = sin(t)
- 系统随时间自动演化,而不是靠事件触发。
- “行为 (Behavior)”:是一个关于时间的函数,例如:
比较:Rx vs FRP
特性 | Rx(ReactiveX) | FRP(连续时间模型) |
---|---|---|
时间模型 | 离散事件(Discrete) | 连续行为(Continuous) |
表达方式 | Observable 推送数据流 | Behavior、Signal 等连续函数 |
是否事件驱动 | 是 | 否(通常是基于时间步进或模拟) |
实现复杂度 | 简洁、工程实用 | 抽象性强、实现成本高 |
示例库 | RxJS、RxCpp、RxJava、Rx.NET | Yampa(Haskell)、Elm、Sodium 等 |
举个例子:小球绕圆轨道运动
- Rx 实现:
- 用
Observable<float>
推送“时间”值(离散),每次用cos(t)
、sin(t)
计算位置。 - 你只在特定时间点更新位置。
- 用
- FRP 实现:
position(t) = (cos(t), sin(t))
是一个关于时间的函数,系统随时间自然连续变化。
总结
在 Rx 中,时间是事件驱动的、离散的 —— 只有当事件发生时,才有“时间前进”的体现。这与理想化的 FRP 模型(时间连续演化)是不同的。
LIFETIME SUBSCRIPTION
// lifetime 生命周期相关
// 判断当前 subscription 是否仍处于订阅状态(未取消)
bool subscription::is_subscribed();
// 取消订阅,释放资源,停止接收事件(例如 onNext)
void subscription::unsubscribe();
// nested lifetimes 嵌套生命周期(组合订阅)
// 将一个 subscription 添加到 composite_subscription 中,形成父子关系。
// 返回一个 weak_subscription,可以用于单独移除该子订阅。
weak_subscription composite_subscription::add(Subscription /*通常是一个可取消的订阅对象*/);
// 从 composite_subscription 中移除一个子订阅,解除它与组合的绑定。
// 如果不再需要该订阅由 composite_subscription 管控,可以调用此方法。
void composite_subscription::remove(weak_subscription);
使用场景示例(简要)
unsubscribe()
:在组件销毁、窗口关闭或用户取消操作时手动停止观察流。composite_subscription::add()
:例如 UI 页面中多个事件绑定,可以整体取消。remove()
:例如某个按钮的监听不再需要响应时,从页面组合订阅中移除。
如果你还想要实际例子或代码片段演示如何使用这些接口,我可以继续提供。
理解 RxCpp 中的 Subscription 与 Lifetime(生命周期) 是掌握反应式编程核心概念的关键。
什么是 Subscription(订阅)?
在 RxCpp(Reactive Extensions for C++) 中,一个 subscription
表示的是 你对一个 observable 的监听关系。
它控制的是:
- 你是否还在“监听”这个数据流。
- 什么时候停止监听(即取消订阅)。
Lifetime(生命周期)
每个 subscription 都有生命周期(lifetime):
生命周期相关的方法:
bool subscription::is_subscribed();
- 判断当前是否还处于订阅状态(true 表示还活着/有效)。
void subscription::unsubscribe();
- 主动取消订阅。调用这个方法后,observable 就不再向你发送数据了。
- 取消订阅后,
is_subscribed()
将返回 false。
composite_subscription:管理多个订阅
weak_subscription composite_subscription::add(subscription s);
- 把一个
subscription
添加到一个composite_subscription
里,形成“主从”关系。 - 返回的是一个
weak_subscription
,可单独移除。
void composite_subscription::remove(weak_subscription ws);
- 从组合中移除指定的
subscription
,让它不再受统一管理。
应用场景举例
场景1:组件销毁时统一取消多个订阅
composite_subscription cs;
auto sub1 = observable1.subscribe(...);
auto sub2 = observable2.subscribe(...);
cs.add(sub1);
cs.add(sub2);
// 页面销毁时
cs.unsubscribe(); // 取消所有订阅
场景2:手动单独取消订阅
auto sub = observable.subscribe(...);
// 某个事件发生时停止监听
sub.unsubscribe();
小结
功能 | 方法/类 | 说明 |
---|---|---|
判断是否仍在订阅 | is_subscribed() | 是否还活跃 |
主动取消订阅 | unsubscribe() | 停止监听 |
管理多个订阅 | composite_subscription | 如同“订阅集合” |
添加/移除子订阅 | add() / remove() | 控制子订阅生命周期 |
如需我演示具体使用 RxCpp 的一个例子(比如定时器 + 用户点击),我可以给你完整代码。是否需要? |
FACTORY OBSERVABLE<>
// 创建一个自定义的 observable,通过传入一个 OnSubscribe 函数,允许手动控制事件的发射(OnNext、OnError、OnCompleted)
static observable<T> observable<>::create<T>(OnSubscribe /* void(subscriber<T> out) */);
// 从多个单独的值创建 observable,依次发射这些值
static observable<T0> observable<>::from(T0, TN...);
// 从一个容器(如 Collection)中创建 observable,依次发射容器中的元素
static observable<T> observable<>::iterate(Collection<T>);
// 生成一个数值范围内的 observable,按给定步长发射值,类似于区间函数
static observable<T> observable<>::range(T first, T last, difference_type step);
// 生成一个定时发射递增 long 类型整数的 observable,从指定初始时间开始,周期性发射
static observable<long> observable<>::interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period);
// 创建一个永不发射任何事件且永不完成的 observable,通常用于测试或特殊场景
static observable<T> observable<>::never<T>();
// 创建一个立即完成但不发射任何值的 observable,表示一个空的事件流
static observable<T> observable<>::empty<T>();
// 创建一个立即发射错误事件的 observable,常用于测试错误处理逻辑
static observable<T> observable<>::error<T>(Exception);
在RxCpp(Reactive Extensions for C++)中,observable<>
是一个模板类,表示一个事件源,可以发射一系列异步事件(数据流)。
“FACTORY”指的是一组静态工厂方法(static factory methods),用来创建不同类型和行为的observable对象。
也就是说,observable<>
类本身不会直接实例化具体事件流,而是通过这些工厂函数来创建各种事件源,比如:
create()
:自定义事件发射逻辑。from()
:从一组值创建事件流。iterate()
:从集合创建事件流。range()
:产生数值区间事件流。interval()
:周期性发射事件流。never()
:永不发射事件。empty()
:立即完成的空事件流。error()
:立即发出错误事件。
这些工厂方法让你方便地根据需要构建不同的事件流,observable<>
就是事件流的抽象接口。
总结:
FACTORY observable<> 就是指observable<>
这个模板类通过一组工厂静态方法来创建各种事件源(observable),以支持灵活的异步数据流构造和组合。
INSTANCE OBSERVABLE
// 订阅函数:通过传入生命周期管理对象和事件回调来订阅Observable事件流
composite_subscription observable<T>::subscribe(composite_subscription lifetime, // 管理订阅生命周期的对象,可用于取消订阅OnNext /*void(T)*/, // 事件到达时调用,处理每个发射的数据TOnError /*void(std::exception_ptr)*/, // 发生错误时调用,接收异常信息OnCompleted /*void()*/ // 流结束时调用,无参数
);
// 操作符:对Observable发射的事件流进行转换和组合
// 过滤操作符,根据传入的谓词函数保留满足条件的事件
observable<T> observable<T>::filter(Predicate /*bool(T)*/);
// 映射操作符,将事件流中类型为T的事件映射转换成类型为U的新事件
observable<U> observable<T>::map(Transform /*U(T)*/);
// 扁平映射操作符,
// 1) Extract:将事件T映射成一个新的Observable<U>(事件流)
// 2) Transform:将原事件T和内部Observable发射的事件U组合成新事件V
observable<V> observable<T>::flat_map(Extract /*observable<U>(T)*/,Transform /*V(T,U)*/
);
// 组合最新操作符,传入多个Observable,当任一Observable产生新事件时,
// 使用所有Observable的最新事件调用Transform生成新的事件U
observable<U> observable<T0>::combine_latest(Transform /*U(T0,TN...)*/,observable<TN>...
);
// 合并操作符,将多个Observable的事件交错合并成一个Observable<T>
observable<T> observable<T>::merge(observable<T>...);
// 连接操作符,按顺序连接多个Observable,前一个完成后才订阅下一个,保证事件顺序
observable<T> observable<T>::concat(observable<T>...);
理解“INSTANCE OBSERVABLE”的核心,就是了解Observable在ReactiveX中的本质和作用:
Observable 是什么?
- Observable 是一个模板类(泛型类),表示一个事件源或数据流,发射的数据类型是
T
。 - 它封装了异步或同步的数据流,允许你观察(subscribe)这些数据流的变化。
- 你可以把它想象成一个可以“产生”类型为
T
的数据项序列的对象。
“INSTANCE Observable”的含义
- 实例化的Observable 就是具体的事件流对象,可以发出(emit)
T
类型的事件。 - 你可以对这个实例调用 subscribe 方法,注册观察者来监听事件。
- 它可以通过各种操作符(map、filter、combine_latest 等)来转换、组合、过滤这些事件,产生新的Observable实例。
总结
- Observable是事件流的模板定义。
- 实例(instance)是特定数据流的实际对象。
- 通过实例,你可以操作、转换和订阅事件流。
如果用一句话总结:
Observable是一个“可观察的事件流”的抽象类型,实例就是具体的那个事件流对象。
THREAD-SAFETY COORDINATION
// 默认 - 非线程安全
// 当所有的 observable 都在同一个线程运行时,不做任何操作
auto noop_immediate = identity_immediate();
// 同样不做任何操作,但明确用于当前线程的 trampoline 调度
auto noop_trampoline = identity_current_thread();
// 线程安全选项(需要用户选择开启)
// 使用互斥锁(mutex)来序列化来自多个线程的调用,保证线程安全
auto serialize_with_pool = serialize_event_loop();
// 类似上面,但会创建一个新线程来序列化调用,保证线程安全
auto serialize_with_new = serialize_new_thread();
// 使用队列将所有调用转移到一个新的线程池中的线程,进行异步调度
auto observe_on_pool = observe_on_event_loop();
// 使用队列将所有调用转移到一个新创建的线程,进行异步调度
auto observe_on_new = observe_on_new_thread();
“THREAD-SAFETY”和“COORDINATION”在Rx(Reactive Extensions)和并发编程中是两个非常重要的概念。
THREAD-SAFETY(线程安全)
线程安全指的是代码在多线程环境下运行时,不会因为多个线程同时访问共享资源而导致数据竞态(race conditions)、不一致或崩溃。
在Rx里,事件流(Observable发出的事件)可能来自不同的线程。为了保证事件处理正确,必须确保对观察者的调用是线程安全的。也就是说,事件的发出(onNext/onError/onCompleted)不应该同时从多个线程调用,避免状态损坏。
COORDINATION(调度协调)
调度协调是指控制事件的产生和消费在哪个线程或线程池上执行。
在Rx中,“调度器”(Scheduler)负责管理事件的执行环境,比如:
- 在哪个线程生成事件
- 在哪个线程观察事件
- 如何序列化事件调用,保证线程安全
协调调度器帮助Rx实现跨线程切换,比如UI线程安全更新界面,或者后台线程异步处理任务。
结合起来理解:
- 线程安全是“怎么保证多个线程访问不冲突”
- 调度协调是“用什么机制/线程去调度事件的执行”
Rx通过协调器(Scheduler)实现线程安全的调用和线程间切换,使得事件流在多线程环境下仍然有序且安全。