当前位置: 首页 > news >正文

CppCon 2014 学习:ASYNC SEQUENCES AND ALGORITHMS

“ASYNC SEQUENCES AND ALGORITHMS”和“BUILDING USABLE ASYNC PRIMITIVES”是现代C++或其他语言里,关于异步编程的主题,核心点在于:

Async Sequences and Algorithms

  • 异步序列:不像普通的同步序列一次性全部生成,异步序列的元素可能是“未来”才产生的,或者依赖异步事件。
  • 异步算法:能处理异步序列的算法,比如异步过滤、映射、归约等。
  • 应用场景:网络请求流、事件流、文件IO流、用户输入等。

Building Usable Async Primitives

  • 异步原语:基础的异步构件,比如异步任务、协程、future/promise、观察者模式、事件循环等。
  • 目标:让异步编程更简单、安全、高效,避免回调地狱,方便组合。
  • 设计原则:易用、可组合、避免竞态条件、易于取消和错误处理。

举个简单类比

同步算法处理的是“立刻可用”的数据流,异步算法处理的是“随时可能到达”的数据流,需要用异步原语来“等待”或“监听”。
如果你想,我可以帮你介绍某种语言(如C++20的std::async、协程,或者JavaScript的Promise/async-await)中如何实现这些,或者给你示范简单的异步序列代码示例。你感兴趣吗?

不同类型的异步数据流的资源使用特点

🔸 原文拆解与中文理解:

使用所有可用资源

意味着某些算法或流设计是“激进并发”的,可能同时启动多个任务、线程、I/O 请求等,目的是最大化吞吐。

独立且激进执行(即非惰性)

这类任务之间互不依赖,可以并行启动且立即开始计算,例如:并行 map、并发下载等。

使用最少资源

与“使用所有资源”相对,指设计上偏向保守、节省,避免过多内存、线程或 CPU 占用,例如延迟加载、懒惰求值等。

相互依赖且惰性执行

例如流式解析 JSON、逐帧处理视频等场景,前一项未完成,后一项不能启动。同时,只有被请求时才执行,属于懒执行。

异步与并发流不总是“字节流”

很关键:现代 async/await、reactive、generator 风格的“流”可以是:

  • stream<T> (泛型值)
  • Observable<int>IAsyncEnumerable<T>(.NET)
  • std::ranges::view(C++23)
    并不限于传统 socket 的“字节流”。

一切归根结底是算法

重点在于:是否**并行?懒惰?按需?回压?断流重试?**这些行为必须靠算法决定,而不是靠底层 API。

带时间索引的 <T> 序列

比如音视频帧流、事件流、传感器数据流,每个元素不仅有值 T,还有时间戳/顺序信息
这类流常用于:

  • 音视频处理(帧序列)
  • 时间序列分析(金融/IoT)
  • 延迟控制的网络算法(rate limiting)

有些异步流会尽可能使用全部资源并立即启动处理,尤其当任务彼此独立。而有些设计则非常节省资源,仅在必要时启动,特别当任

务之间存在依赖。异步流不一定是字节流,它们可以是任意类型的泛型数据流。关键在于选择合适的算法模式,特别是在处理带时间信息的异步数据序列时。
如你需要我将这些思想用在 C++ 代码、图示、或结合某个库(如 ICU、RxCpp、Coro、async-generator)来展示,请告诉我,我可以继续展开。

你列出的这些术语看起来像是在理解某种异步事件、系统信号或时间序列事件的“示例类别”场景。下面是对它们的逐一解释,适用于异步编程、事件驱动架构(如 C++ 协程、操作系统内核、信号处理等):

1. Periodic Timer(周期性定时器)

  • 定义:一个定时器每隔固定时间间隔触发一次事件。
  • 场景:心跳检测、UI 刷新、定期保存状态等。
  • 示例
    // 每隔 1 秒触发一次
    while (true) {co_await timer(1s);do_something();
    }
    

2. MouseClicks(鼠标点击)

  • 定义:用户点击鼠标时系统生成的事件。
  • 场景:UI 交互处理、游戏控制、绘图应用。
  • 特性:离散的、不确定发生时间,通常通过事件监听器处理。
  • 示例
    while (true) {auto click = co_await next_mouse_click();handle(click);
    }
    

3. MouseMoves(鼠标移动)

  • 定义:鼠标在屏幕上移动所触发的连续事件。
  • 场景:拖拽操作、实时绘图、悬停提示。
  • 特性:事件频率较高;可能需要节流处理(throttling)。

4. NetworkPackets(网络数据包)

  • 定义:网络接口接收到的每一个数据包。
  • 场景:聊天应用、实时通信、服务器监听请求。
  • 特性:异步到达;可能乱序;需解析和确认。

5. FileReads/Writes(文件读写)

  • 定义:对文件系统进行的读取/写入操作。
  • 场景:加载配置文件、日志记录、保存用户数据。
  • 异步特性
    • I/O 操作较慢
    • 通常通过异步文件 API 处理,以避免阻塞线程

这些是什么的“示例”?

这些是**异步序列(asynchronous sequences)**的典型例子,也就是:

系统中某些事件/操作 “以时间推移发生”,可以被程序“订阅”或“异步等待”。

在现代 C++(如 C++23)的协程、generator, awaitablesender/receiver 模型中,这些是常见的用例。

统一理解:

名称是否连续是否异步应用场景
Periodic Timer定期执行
MouseClicks用户交互事件
MouseMoves实时输入
NetworkPackets网络通信、协议处理
FileReads/WritesI/O 操作,适合异步处理
如果你正在看的是 CppCon 或某个关于 “异步序列 / generator / ranges / async” 的讲座,这些就是作为“数据流的例子”列出来的。
需要我配合具体代码讲解这些场景在 C++ 中如何实现吗?
你贴的这段文档是对 ReactiveX 响应式编程模型中 Observable 和 Observer 的系统性解释。我来用 C++ 的视角,逐步为你分解这段内容,帮助你彻底理解其中的机制和逻辑。

1. 基本结构:Observable 和 Observer 是什么?

🔹 Observable(事件源):

是一个可以发出(emit)数据或事件的对象。
它就像一个异步的生产者,在某个时机会发出数据(调用 onNext)、发出错误(onError),或结束(onCompleted)。

🔹 Observer(观察者):

是一个可以订阅 Observable 并处理数据的对象。
它提供三个处理函数:

  • onNext(T):处理发出的数据项
  • onError(exception):处理错误
  • onCompleted():处理结束信号
    在 C++ 中,我们会用函数对象(比如 lambda)来写这些处理函数。

2. 为什么说它是“非阻塞、并发友好”的?

传统调用(同步):

auto result = someMethod();
process(result); // 等 someMethod 完成

Rx 式调用(异步):

observable.subscribe([](int item) { process(item); },[](std::exception_ptr ep) { handleError(ep); },[]() { std::cout << "complete" << std::endl; }
);
// 不阻塞,继续做别的事情

也就是说:观察者是哨兵(sentinel),站在那里等待事件,而不是主动轮询或阻塞。

3. 冷 vs 热 Observable

Cold Observable:

  • 只有在订阅时才开始发出数据。
  • 每个订阅者都会看到完整的数据流(从头开始)。
    类似于:你点播一部电影(自己控制播放)。

Hot Observable:

  • 数据从创建时就开始发出。
  • 新订阅者只能看到订阅之后的数据。
    类似于:你收看直播(只看到你加入之后的内容)。

4. 订阅机制(subscribe)

调用 .subscribe() 的时候会触发 Observable 的数据流,并将数据发送给 Observer。
例如(RxCpp):

auto source = rxcpp::observable<>::range(1, 3);
source.subscribe([](int x) { std::cout << "onNext: " << x << std::endl; },[]() { std::cout << "onCompleted" << std::endl; }
);

这个订阅者会依次接收:1、2、3,然后收到“完成”信号。

5. Marble Diagram(弹珠图)

弹珠图是 ReactiveX 中用来表示数据流的图形语言。
它的结构是这样的:

--1--2--3--|        // 发出数据项 1, 2, 3,然后完成
--1--X         // 发出 1,然后出错(X 表示 error)
  • 横线表示时间线
  • 数字表示发出的数据项(onNext)
  • | 表示完成(onCompleted)
  • X 表示出错(onError)
    在调试 Rx 程序时,用它来理解流的“形态”很有帮助。

6. 操作符(Operators)

操作符就是对 Observable 进行处理的工具,类似于 C++ STL 中的 transform, filter, reduce

rxcpp::observable<>::range(1, 5).filter([](int i) { return i % 2 == 0; })    // 过滤偶数.map([](int i) { return i * 10; })           // 每个值乘 10.subscribe([](int x){ std::cout << x << std::endl; });

输出:

20
40

这和下面的 STL 写法很相似:

for (int i = 1; i <= 5; ++i) {if (i % 2 == 0) {std::cout << i * 10 << std::endl;}
}

7. 链式操作的本质(区别 builder)

Rx 的操作链不是在一个对象上操作,而是生成新的 Observable

每个操作符(map、filter 等)返回的是一个新的事件源,所以每次链式调用本质上都是构建一个 流的管道(pipeline)

8. 退订(Unsubscribe)

有时你想手动中断监听。

auto sub = observable.subscribe(onNext, onError, onCompleted
);
sub.unsubscribe(); // 停止接收

退订的作用:

  • 告诉 Observable:你不再关心数据
  • 它可能会中止发出数据(节省资源)

总结(C++角度记忆)

概念类比 C++ 概念
Observable异步数据生成器,像 std::function<void(Observer)>
Observer类似于回调组合(onNext/onError/onComplete)
subscribe()注册回调并开始数据流
冷 / 热 Observable点播 vs 直播
map/filter等操作符类似 STL 算法链
unsubscribe提前退出回调流
如果你希望,我可以帮你写一个完整的 RxCpp 项目模板或某个具体例子(比如:网络请求、传感器数据、数据流转换等)。
当然,下面是你这段代码的详细注释版,帮你理解每一步在做什么:

“COMBINE TIME AND MOVEMENT”这段代码的核心思想是:将基于时间的轨道运动和基于鼠标的移动位置结合起来,生成一个动态变化的点,供后续绘制使用。

// 生成基于时间的轨道点Observable,
// 先用updates.milliseconds()拿到时间流,再转成0~1周期,再映射到轨道点坐标
auto auto_orbit_points = orbitPointsFromTimeInPeriod(timeInPeriodFromMilliseconds(updates.milliseconds()    // 获取当前时间(毫秒),作为事件流)
);
// 生成基于鼠标移动的点Observable
auto auto_location_points = mouse.moves()          // 获取鼠标移动事件的Observable流,发出鼠标位置.map(pointFromMouse); // 把鼠标事件映射成ofPoint类型的位置坐标
// 把两个Observable:位置点和轨道点,合并起来
// combine_latest会在两个Observable中任意一个发出新值时,
// 用std::plus<>(即加法)合并它们的最新值,发出合并后的新点
location_points.combine_latest(std::plus<>(), orbit_points).subscribe([=](ofPoint c) {      // 订阅合并后的点流// 每当有新值时,更新绘图时用的中心点center = c;});

具体理解如下:

  1. 时间驱动的轨道点(auto_orbit_points)
    通过时间(毫秒)不断产生一个0到1之间的周期值,
    再将这个周期值映射成圆周上的点(轨道点)。
    这相当于让点随着时间在一个圆上运动。
  2. 鼠标位置点(auto_location_points)
    监听鼠标移动事件,获取当前鼠标位置。
    通过map转换成点的形式,实时反映鼠标当前坐标。
  3. 合并轨道点和鼠标点(combine_latest)
    使用combine_latest,当任一点(轨道点或鼠标点)更新时,
    取两个点的最新值相加,生成一个新的点。
    这个点代表轨道点加上鼠标移动的偏移,产生“轨道+鼠标”的叠加效果。
  4. 订阅合成后的点流,更新绘制中心
    通过订阅合成后的点流,每次产生新点时,
    更新绘制用的中心点center,让绘制跟随轨道和鼠标的结合动态变化。

简单说就是:

  • 点沿着轨道根据时间运动,
  • 点又根据鼠标移动做偏移,
  • 最终合成一个点,实时更新界面显示。
    这样可以实现一种既跟时间动又跟用户操作互动的动画效果。
    很适合制作交互式动画、游戏里跟随鼠标绕圈运动的元素。

总结:

  • auto_orbit_points:轨道上的点,随着时间变化持续发出。
  • auto_location_points:鼠标位置点流。
  • combine_latest:两个点相加,生成一个新的点流。
  • subscribe:订阅这个新的点流,每次更新时设置center,供后续绘制使用。

ORBIT FROM TIME

// 返回一个发出0.0到1.0之间周期值的Observable,
// 输入是以毫秒计时的时间流
rxcpp::observable<float>
ofxCircle::timeInPeriodFromMilliseconds(rxcpp::observable<unsigned long long> timeInMilliseconds) {return timeInMilliseconds.map([this](unsigned long long tick) {// 将tick(时间戳毫秒数)对轨道周期(orbit_period秒,转成毫秒)取模,// 使其周期性循环,然后映射到0.0~1.0区间return ofMap(tick % int(orbit_period * 1000), // 取模周期毫秒数0,                               // 输入范围起点int(orbit_period * 1000),        // 输入范围终点0.0,                             // 输出范围起点1.0                              // 输出范围终点);});
}
// 返回一个发出圆周上点的Observable,
// 输入是0.0到1.0之间表示周期进度的时间流
rxcpp::observable<ofPoint>
ofxCircle::orbitPointsFromTimeInPeriod(rxcpp::observable<float> timeInPeriod) {return timeInPeriod.map([this](float t) {// 根据t计算对应圆周上的点坐标// t乘以2π得到角度,计算圆上x,y坐标return ofPoint(orbit_radius * std::cos(t * 2 * 3.14),  // x坐标orbit_radius * std::sin(t * 2 * 3.14)   // y坐标);});
}

理解这个“ORBIT FROM TIME”代码片段的意图,可以总结为:

核心目的:

基于时间,计算一个点沿着圆周轨迹的位置,实现“轨道运动”的效果。

具体流程:

  1. 输入时间(毫秒)流
    • 程序接收一个不断变化的时间流(单位是毫秒,通常是系统更新时间)。
  2. 将时间转换为周期内的相对位置(0.0 到 1.0)
    • 以一个固定的轨道周期(orbit_period,单位秒)为基准,将当前毫秒时间转换成该周期内的相对进度,比如时间流在周期中的进度百分比。
    • 这个百分比用0到1的浮点数表示,表示当前位置在轨道周期里的相对位置。
  3. 将周期进度映射到圆周上的点坐标
    • 利用三角函数(cos 和 sin),把这个0~1的进度转换成对应圆上的点(ofPoint),其中 x = r*cos(θ), y = r*sin(θ),角度θ = 进度 * 2π。
    • 这里的orbit_radius是圆的半径。

代码用意:

  • 利用RxCpp的observablemap函数实现数据流的变换。
  • 不断根据时间变化产生新的点坐标,实现动画中点沿圆轨迹运动的效果。
  • 这种设计非常适合响应式编程,可以优雅地处理异步时间更新。

简单类比:

这就像你有一个秒针指针,它不停转动,通过时间计算秒针当前的位置,显示秒针的位置坐标。
你觉得这个讲解清楚吗?要不要我帮你写一个更简单的伪代码版本?

Rx 的起源(ORIGIN)

Reactive Extensions(简称 Rx)最初是由微软为 .NET 平台开发的,叫做 Rx.NET
Rx.NET 跟随了 LINQ 的设计理念
LINQ(Language Integrated Query)是 .NET 里用于查询数据的一个强大且优雅的语言特性,允许用统一的方式操作各种数据源(数组、数据库、XML 等)。
Rx.NET 借鉴了 LINQ 的设计思路,把 数据流和异步事件看作“可查询的序列”,并且用类似 LINQ 的操作符(比如 mapfiltercombineLatest 等)来组合和处理异步事件流。

换句话说:

Rx.NET 就是把 异步事件序列 看成类似于 LINQ 里操作的“集合”,用链式、声明式的风格来处理异步流,实现响应式编程。

这段 C# 代码展示了 LINQ 的基本用法:

List<string> fruits = new List<string> { "apple", "passionfruit", "banana", "mango","orange", "blueberry", "grape", "strawberry" };
// 从 fruits 里筛选出长度小于6的字符串
IEnumerable<string> query = fruits.Where(fruit => fruit.Length < 6);
// 生成1到10的整数序列,并计算每个数的平方
IEnumerable<int> squares = Enumerable.Range(1, 10).Select(x => x * x);

具体解释:

  1. fruits 是一个字符串列表,包含多种水果名。
  2. query 用 LINQ 的 Where 方法过滤 fruits,筛选出字符串长度小于6的水果名字,比如 "apple", "mango", "grape"
  3. Enumerable.Range(1, 10) 生成从 1 到 10 的整数序列(共10个数)。
  4. .Select(x => x * x) 把上面的序列每个数映射为它的平方,结果是 1, 4, 9, ..., 100

这段代码展示了 LINQ 的核心思想:

  • 把数据看作“序列”;
  • 通过高阶函数(Where, Select声明式地过滤和转换数据;
  • 返回结果是延迟执行的 IEnumerable<T>,直到真正遍历时才计算。
    如果你想,我可以帮你把这段代码用 Rx(Reactive Extensions)写个类似版本,方便理解两者异同!你需要吗?

LINQ(Language Integrated Query)理解

  • LINQ 是一组针对 IEnumerable<T>(和其他序列接口)的算法和查询操作符,用来方便地操作集合数据。
  • 它让你可以用类似 SQL 的语法来查询、过滤、投影(转换)、分组等操作序列数据。
  • 这些操作符底层实现类似于 C++ 中的范围算法(range-based algorithms),比如 std::transformstd::copy_ifstd::accumulate 等。
  • 命名和语法借鉴自 SQL,比如 Where 类似于 SQL 的 WHERE 条件过滤,Select 类似于 SELECT 投影,GroupBy 类似于 GROUP BY 分组。

举例

  • Where — 过滤满足条件的元素
  • Select — 对元素做映射/转换
  • OrderBy — 排序
  • GroupBy — 分组
  • Join — 连接两个序列
    这让 C# 集合操作更直观,更易于表达复杂查询。

LINQ 与 C++ range 算法的对比

  • C++ Range算法是面向迭代器的一组通用算法,比如 std::transformstd::copy_if,操作容器的元素。
  • LINQ是语言集成的查询表达式,支持延迟执行和链式调用,更像是声明式操作序列。
  • 两者目的类似:方便遍历和处理集合,但 LINQ 提供了更丰富的查询表达能力和语法糖。

“Rx is LINQ inverted”(Rx 是 LINQ 的反转),总结了 ReactiveX(Rx)和 LINQ 的核心区别和联系。我们来逐一解释这句话中涉及的概念:

LINQ:Pull 模型

  • LINQ(Language Integrated Query) 是用于操作集合的统一查询语法。
  • 它基于 IEnumerable<T>,是一种 Pull 模型(拉取)
  • 消费者主动调用 MoveNext() 来获取下一个数据项(就像 for 循环或迭代器)。
  • 特点:
    • 拉:数据消费者主动要数据。
    • 同步:调用者等待数据返回。
示例:
IEnumerable<int> squares = Enumerable.Range(1, 5).Select(x => x * x);
foreach (var s in squares) {Console.WriteLine(s); // 消费者主动拉取数据
}

Rx:Push 模型

  • Rx(Reactive Extensions) 是对异步数据流的函数式编程扩展。
  • 它基于 IObservable<T>,是一种 Push 模型(推送)
  • 数据源(Observable)主动调用 OnNext() 推送数据给观察者(Observer)。
  • 特点:
    • 推:数据源自动推送数据。
    • 异步:数据到来后自动触发回调,不用阻塞等待。
示例:
IObservable<int> numbers = Observable.Range(1, 5);
numbers.Subscribe(x => Console.WriteLine(x)); // 数据源推送数据到观察者

时间维度的关键差异:时间感知

  • LINQ 操作的是已存在的、同步的数据(例如内存中的集合),不涉及时间
  • Rx 操作的是未来的数据流,时间是第一类概念(First-class concept),因为:
    • 数据可以在任何时间发出(例如用户点击、传感器、网络响应)。
    • Rx 提供了丰富的时间操作符:如 Delay, Interval, Throttle, Timeout 等。

对照总结:

特性LINQ (IEnumerable)Rx (IObservable)
数据流方向Pull(拉)Push(推)
时间静态、同步集合动态、异步事件流
示例foreach 循环Subscribe 观察
应用场景数组、列表等数据结构鼠标事件、网络、动画等
终止方式MoveNext == falseOnCompleted()OnError()

总结一句话:

Rx 是对 LINQ 的反转 —— 它不是你去拉数据,而是数据自己找上门来。

如果你已经熟悉 LINQ,那么理解 Rx 就像“反转”你的思维方式,从「我要数据」变成「我等数据来」。
是否需要我用 C++ / RxCpp 举一个 Pull vs Push 的实际对比例子?

LINQ (Pull)

// IEnumerable<T>接口定义,表示一个可枚举的集合
// GetEnumerator方法启动对集合的一个独立的、有序的遍历
IEnumerator<T> IEnumerable<T>::GetEnumerator();  
// 返回一个IEnumerator<T>枚举器,允许按顺序访问集合元素
// IEnumerator<T>接口继承了IDisposable接口,支持资源释放和取消操作
void IEnumerator<T>::IDisposable::Dispose();  
// 释放枚举器占用的资源或取消枚举(如关闭文件、数据库连接等)
// IEnumerator<T>接口的核心方法,移动到下一个元素
bool IEnumerator<T>::MoveNext();  
// 移动到下一个元素,
// 如果还有元素,返回true;遍历结束返回false
// IEnumerator<T>接口的属性,获取当前元素
T IEnumerator<T>::Current;  
// 获取当前元素的值,
// 如果当前状态无效(未调用MoveNext或已遍历完),访问此属性会抛异常

Rx (Push)

// IObservable<T> 接口:表示一个事件源(Observable)
// Subscribe 方法建立订阅关系,Observer 开始接收数据流
IDisposable IObservable<T>::Subscribe(Observer<T> observer);
// 订阅一个观察者(Observer),开始接收事件(即数据项、错误或完成通知)
// 返回 IDisposable 接口的对象,用于取消订阅(停止接收事件)
// IDisposable 接口的 Dispose 方法:取消订阅
void IDisposable::Dispose();
// 调用此方法表示取消订阅,Observer 将不再接收来自 Observable 的任何事件
// Observer<T> 是观察者接口的一个实现,用于接收 Observable 发出的事件
Observer<T>::OnComplete();
// 当 Observable 正常结束(无错误)时调用,等价于 IEnumerator<T>::MoveNext() 返回 false
// 表示数据流已完成,后续不再有 OnNext 调用
Observer<T>::OnNext(T value);
// 当 Observable 发出一个新数据项时调用,等价于 IEnumerator<T>::Current
// 用于处理收到的数据项(数据流中的一个元素)
Observer<T>::OnError(Exception ex);
// 当 Observable 发生错误时调用,等价于 IEnumerator<T>::Current 抛出异常
// 一旦调用该方法,数据流立即终止,不再有 OnNext 或 OnComplete 调用

TIME

// 获取当前调度器的时间(当前时间点)
// 可以用来和调度时间进行比较
DateTimeOffset Scheduler::Now;
// 使用调度器在指定的未来时间执行某个动作
// 参数:dueTime 是希望执行动作的时间点(DateTimeOffset)
// 参数:action 是一个无参数的回调函数(Action)
// 返回:一个 IDisposable 对象,用于取消该调度任务(如果还未执行)
IDisposable Scheduler::Schedule(DateTimeOffset dueTime, Action action);
// 取消调度任务的执行(如果还未被调度器执行)
// 调用后,该任务不会被执行;如果已执行则没有作用
void IDisposable::Dispose();
// 定义要被调度执行的具体操作(无参数、无返回值)
// 调度器将在指定时间点调用这个函数
void Action::Action();

描述 ReactiveX(Rx)协议契约(contract) 中事件流的行为规则,尤其是有关 OnNext / OnError / OnCompleted / Dispose 的执行顺序与约束。

以下是逐条的解释与理解:

1. OnNext, OnError 和 OnComplete 调用总是串行的(serialized)

  • 意思是:OnNext()OnError()OnCompleted() 这些回调方法不会并发调用,一定是顺序一个接一个地执行
  • 目的:避免线程竞争和状态不一致问题。
举个例子:

即使 Observable 在多个线程上发出事件,Rx 会确保你在 Observer 中的处理是安全的:

observable.Subscribe(x => {// 这个回调不会与 OnError 或 OnComplete 同时执行
});

2. Dispose、OnError 和 OnComplete 都会终止订阅

  • Dispose():手动取消订阅,终止数据流。
  • OnError():发生错误后,数据流终止,不会再发出 OnNext 或 OnCompleted
  • OnCompleted():数据正常发完了,流终止。
    一旦其中一个发生,订阅生命周期结束,Observable 不会再推送数据。

3. OnError 和 OnComplete 会触发 Dispose

  • 在 Rx 中,你无需手动调用 Dispose
  • 如果 Observable 调用了 OnError()OnCompleted(),Rx 框架自动帮你调用 Dispose(),回收资源。
    等效逻辑如下:
OnError():-> stop further OnNext-> call Dispose()
OnCompleted():-> stop further OnNext-> call Dispose()

总结为一句话:

在 Rx 中,OnNext / OnError / OnCompleted 是严格按顺序调用的;一旦 OnErrorOnCompleted 被触发,订阅立即终止,并自动触发 Dispose 释放资源。

“Time in Rx is Discrete, not Continuous” 是在说明 Reactive Extensions (Rx)Functional Reactive Programming (FRP) 的一个核心区别,特别是它们如何处理“时间”这个概念。

理解这句话:

Rx 中的时间是离散的 (Discrete)

  • 事件发生时才传递值。每次 OnNext 表示一个离散的时间点
  • 时间由事件驱动(push):如果没有事件发生,Rx 系统是“静止”的。
  • 时间流不是连续线,而是像这样:
    ----x--------x---x--------x----->↑        ↑   ↑        ↑OnNext    OnNext ...   OnCompleted
    

FRP(Functional Reactive Programming)中的时间是连续的 (Continuous)

  • 在原始 FRP 模型(如 Yampa、Elm 最初版本)中,值是随时间连续变化的:
    • “行为 (Behavior)”:是一个关于时间的函数,例如:position(t) = sin(t)
    • 系统随时间自动演化,而不是靠事件触发。

比较:Rx vs FRP

特性Rx(ReactiveX)FRP(连续时间模型)
时间模型离散事件(Discrete)连续行为(Continuous)
表达方式Observable 推送数据流Behavior、Signal 等连续函数
是否事件驱动否(通常是基于时间步进或模拟)
实现复杂度简洁、工程实用抽象性强、实现成本高
示例库RxJS、RxCpp、RxJava、Rx.NETYampa(Haskell)、Elm、Sodium 等

举个例子:小球绕圆轨道运动

  • Rx 实现
    • Observable<float> 推送“时间”值(离散),每次用 cos(t)sin(t) 计算位置。
    • 你只在特定时间点更新位置。
  • FRP 实现
    • position(t) = (cos(t), sin(t)) 是一个关于时间的函数,系统随时间自然连续变化。

总结

在 Rx 中,时间是事件驱动的、离散的 —— 只有当事件发生时,才有“时间前进”的体现。这与理想化的 FRP 模型(时间连续演化)是不同的。

LIFETIME SUBSCRIPTION

// lifetime 生命周期相关
// 判断当前 subscription 是否仍处于订阅状态(未取消)
bool subscription::is_subscribed();
// 取消订阅,释放资源,停止接收事件(例如 onNext)
void subscription::unsubscribe();
// nested lifetimes 嵌套生命周期(组合订阅)
// 将一个 subscription 添加到 composite_subscription 中,形成父子关系。
// 返回一个 weak_subscription,可以用于单独移除该子订阅。
weak_subscription composite_subscription::add(Subscription /*通常是一个可取消的订阅对象*/);
// 从 composite_subscription 中移除一个子订阅,解除它与组合的绑定。
// 如果不再需要该订阅由 composite_subscription 管控,可以调用此方法。
void composite_subscription::remove(weak_subscription);

使用场景示例(简要)

  • unsubscribe():在组件销毁、窗口关闭或用户取消操作时手动停止观察流。
  • composite_subscription::add():例如 UI 页面中多个事件绑定,可以整体取消。
  • remove():例如某个按钮的监听不再需要响应时,从页面组合订阅中移除。
    如果你还想要实际例子或代码片段演示如何使用这些接口,我可以继续提供。

理解 RxCpp 中的 Subscription 与 Lifetime(生命周期) 是掌握反应式编程核心概念的关键。

什么是 Subscription(订阅)?

RxCpp(Reactive Extensions for C++) 中,一个 subscription 表示的是 你对一个 observable 的监听关系
它控制的是:

  • 你是否还在“监听”这个数据流。
  • 什么时候停止监听(即取消订阅)。

Lifetime(生命周期)

每个 subscription 都有生命周期(lifetime):

生命周期相关的方法:

bool subscription::is_subscribed();
  • 判断当前是否还处于订阅状态(true 表示还活着/有效)。
void subscription::unsubscribe();
  • 主动取消订阅。调用这个方法后,observable 就不再向你发送数据了。
  • 取消订阅后,is_subscribed() 将返回 false。

composite_subscription:管理多个订阅

weak_subscription composite_subscription::add(subscription s);
  • 把一个 subscription 添加到一个 composite_subscription 里,形成“主从”关系。
  • 返回的是一个 weak_subscription,可单独移除。
void composite_subscription::remove(weak_subscription ws);
  • 从组合中移除指定的 subscription,让它不再受统一管理。

应用场景举例

场景1:组件销毁时统一取消多个订阅

composite_subscription cs;
auto sub1 = observable1.subscribe(...);
auto sub2 = observable2.subscribe(...);
cs.add(sub1);
cs.add(sub2);
// 页面销毁时
cs.unsubscribe(); // 取消所有订阅

场景2:手动单独取消订阅

auto sub = observable.subscribe(...);
// 某个事件发生时停止监听
sub.unsubscribe();

小结

功能方法/类说明
判断是否仍在订阅is_subscribed()是否还活跃
主动取消订阅unsubscribe()停止监听
管理多个订阅composite_subscription如同“订阅集合”
添加/移除子订阅add() / remove()控制子订阅生命周期
如需我演示具体使用 RxCpp 的一个例子(比如定时器 + 用户点击),我可以给你完整代码。是否需要?

FACTORY OBSERVABLE<>

// 创建一个自定义的 observable,通过传入一个 OnSubscribe 函数,允许手动控制事件的发射(OnNext、OnError、OnCompleted)
static observable<T> observable<>::create<T>(OnSubscribe /* void(subscriber<T> out) */);
// 从多个单独的值创建 observable,依次发射这些值
static observable<T0> observable<>::from(T0, TN...);
// 从一个容器(如 Collection)中创建 observable,依次发射容器中的元素
static observable<T> observable<>::iterate(Collection<T>);
// 生成一个数值范围内的 observable,按给定步长发射值,类似于区间函数
static observable<T> observable<>::range(T first, T last, difference_type step);
// 生成一个定时发射递增 long 类型整数的 observable,从指定初始时间开始,周期性发射
static observable<long> observable<>::interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period);
// 创建一个永不发射任何事件且永不完成的 observable,通常用于测试或特殊场景
static observable<T> observable<>::never<T>();
// 创建一个立即完成但不发射任何值的 observable,表示一个空的事件流
static observable<T> observable<>::empty<T>();
// 创建一个立即发射错误事件的 observable,常用于测试错误处理逻辑
static observable<T> observable<>::error<T>(Exception);

在RxCpp(Reactive Extensions for C++)中,observable<> 是一个模板类,表示一个事件源,可以发射一系列异步事件(数据流)。
“FACTORY”指的是一组静态工厂方法(static factory methods),用来创建不同类型和行为的observable对象
也就是说,observable<> 类本身不会直接实例化具体事件流,而是通过这些工厂函数来创建各种事件源,比如:

  • create():自定义事件发射逻辑。
  • from():从一组值创建事件流。
  • iterate():从集合创建事件流。
  • range():产生数值区间事件流。
  • interval():周期性发射事件流。
  • never():永不发射事件。
  • empty():立即完成的空事件流。
  • error():立即发出错误事件。
    这些工厂方法让你方便地根据需要构建不同的事件流,observable<>就是事件流的抽象接口。
    总结:
    FACTORY observable<> 就是指 observable<> 这个模板类通过一组工厂静态方法来创建各种事件源(observable),以支持灵活的异步数据流构造和组合。

INSTANCE OBSERVABLE

// 订阅函数:通过传入生命周期管理对象和事件回调来订阅Observable事件流
composite_subscription observable<T>::subscribe(composite_subscription lifetime,          // 管理订阅生命周期的对象,可用于取消订阅OnNext /*void(T)*/,                       // 事件到达时调用,处理每个发射的数据TOnError /*void(std::exception_ptr)*/,    // 发生错误时调用,接收异常信息OnCompleted /*void()*/                    // 流结束时调用,无参数
);
// 操作符:对Observable发射的事件流进行转换和组合
// 过滤操作符,根据传入的谓词函数保留满足条件的事件
observable<T> observable<T>::filter(Predicate /*bool(T)*/);
// 映射操作符,将事件流中类型为T的事件映射转换成类型为U的新事件
observable<U> observable<T>::map(Transform /*U(T)*/);
// 扁平映射操作符,
// 1) Extract:将事件T映射成一个新的Observable<U>(事件流)
// 2) Transform:将原事件T和内部Observable发射的事件U组合成新事件V
observable<V> observable<T>::flat_map(Extract /*observable<U>(T)*/,Transform /*V(T,U)*/
);
// 组合最新操作符,传入多个Observable,当任一Observable产生新事件时,
// 使用所有Observable的最新事件调用Transform生成新的事件U
observable<U> observable<T0>::combine_latest(Transform /*U(T0,TN...)*/,observable<TN>...
);
// 合并操作符,将多个Observable的事件交错合并成一个Observable<T>
observable<T> observable<T>::merge(observable<T>...);
// 连接操作符,按顺序连接多个Observable,前一个完成后才订阅下一个,保证事件顺序
observable<T> observable<T>::concat(observable<T>...);

理解“INSTANCE OBSERVABLE”的核心,就是了解Observable在ReactiveX中的本质和作用:

Observable 是什么?

  • Observable 是一个模板类(泛型类),表示一个事件源或数据流,发射的数据类型是 T
  • 它封装了异步或同步的数据流,允许你观察(subscribe)这些数据流的变化。
  • 你可以把它想象成一个可以“产生”类型为 T 的数据项序列的对象。

“INSTANCE Observable”的含义

  • 实例化的Observable 就是具体的事件流对象,可以发出(emit)T类型的事件。
  • 你可以对这个实例调用 subscribe 方法,注册观察者来监听事件。
  • 它可以通过各种操作符(map、filter、combine_latest 等)来转换、组合、过滤这些事件,产生新的Observable实例。

总结

  • Observable是事件流的模板定义。
  • 实例(instance)是特定数据流的实际对象。
  • 通过实例,你可以操作、转换和订阅事件流。
    如果用一句话总结:

Observable是一个“可观察的事件流”的抽象类型,实例就是具体的那个事件流对象。

THREAD-SAFETY COORDINATION

// 默认 - 非线程安全
// 当所有的 observable 都在同一个线程运行时,不做任何操作
auto noop_immediate = identity_immediate();
// 同样不做任何操作,但明确用于当前线程的 trampoline 调度
auto noop_trampoline = identity_current_thread();
// 线程安全选项(需要用户选择开启)
// 使用互斥锁(mutex)来序列化来自多个线程的调用,保证线程安全
auto serialize_with_pool = serialize_event_loop();
// 类似上面,但会创建一个新线程来序列化调用,保证线程安全
auto serialize_with_new = serialize_new_thread();
// 使用队列将所有调用转移到一个新的线程池中的线程,进行异步调度
auto observe_on_pool = observe_on_event_loop();
// 使用队列将所有调用转移到一个新创建的线程,进行异步调度
auto observe_on_new = observe_on_new_thread();

“THREAD-SAFETY”和“COORDINATION”在Rx(Reactive Extensions)和并发编程中是两个非常重要的概念。

THREAD-SAFETY(线程安全)

线程安全指的是代码在多线程环境下运行时,不会因为多个线程同时访问共享资源而导致数据竞态(race conditions)、不一致或崩溃。
在Rx里,事件流(Observable发出的事件)可能来自不同的线程。为了保证事件处理正确,必须确保对观察者的调用是线程安全的。也就是说,事件的发出(onNext/onError/onCompleted)不应该同时从多个线程调用,避免状态损坏。

COORDINATION(调度协调)

调度协调是指控制事件的产生和消费在哪个线程或线程池上执行。
在Rx中,“调度器”(Scheduler)负责管理事件的执行环境,比如:

  • 在哪个线程生成事件
  • 在哪个线程观察事件
  • 如何序列化事件调用,保证线程安全
    协调调度器帮助Rx实现跨线程切换,比如UI线程安全更新界面,或者后台线程异步处理任务。

结合起来理解:

  • 线程安全是“怎么保证多个线程访问不冲突”
  • 调度协调是“用什么机制/线程去调度事件的执行”
    Rx通过协调器(Scheduler)实现线程安全的调用和线程间切换,使得事件流在多线程环境下仍然有序且安全。

相关文章:

  • golang -- slice 底层逻辑
  • javaEE->多线程:定时器
  • 【Java学习笔记】枚举
  • 初学大模型部署以及案例应用(windows+wsl+dify+mysql+Ollama+Xinference)
  • python打卡day42
  • Mask_RCNN 环境配置及训练
  • leetcode hot100 二叉树(一)
  • 第七部分:第四节 - 在 NestJS 应用中集成 MySQL (使用 TypeORM):结构化厨房的原材料管理系统
  • 剑指offer hot100 第三周
  • 查看make命令执行后涉及的预编译宏定义的值
  • java synchronized关键字用法
  • io流2——字节输入流,文件拷贝
  • Codeforces 1027 Div3(ABCDEF)
  • Java网络编程基础:从阻塞式I/O到线程池模型
  • DAY 34 超大力王爱学Python
  • C++ —— STL容器——string类
  • ps中通过拷贝的图层和通过剪切的图层
  • java多线程与JUC
  • ck-editor5的研究 (4):初步使用 CKEditor5 的插件功能
  • Cesium快速入门到精通系列教程三
  • 用angular做的网站/深圳网络推广软件
  • python自学免费教程/seo软件安卓版
  • k网站建设/seo优化服务公司
  • 做网站设计服务商/电商培训机构有哪些哪家比较好
  • 网站建设 软文/线上运营的5个步骤
  • 单页销售网站如何赚钱/地推项目平台