当前位置: 首页 > news >正文

Rust 学习笔记:Stream

Rust 学习笔记:Stream

  • Rust 学习笔记:Stream
    • 组合流
    • 合并流

Rust 学习笔记:Stream

许多概念天然适合用 Stream 表示:

  • 队列中逐渐可用的项目
  • 文件系统中逐渐拉取的数据块
  • 网络中随时间到达的数据

消息传递中异步的 recv 方法会随时间产生一系列项目,称为流(Stream)。

迭代器和异步通道接收器之间有两个不同之处。第一个区别是时间:迭代器是同步的,而通道接收器是异步的。第二个是 API。当直接使用 Iterator 时,调用它的同步 next 方法,而对于 trpl::Receiver 流,我们调用异步 recv 方法。

Streams 类似于一种异步形式的迭代器。迭代器和 Rust 中的流之间的相似性意味着我们实际上可以从任何迭代器创建流。与使用迭代器一样,我们可以调用流的 next 方法,然后等待输出。

Stream trait 定义了一个底层接口,它有效地结合了 Iterator 和 Future trait。StreamExt 在 Stream 之上提供了一组更高级的 API,包括 next 方法以及其他类似于 Iterator trait 所提供的实用方法。Stream 和 StreamExt 还不是 Rust 标准库的一部分,但大多数生态系统的 crate 使用相同的定义。

Ext 是扩展的缩写,是 Rust 社区中用于将一个特性扩展到另一个特性的常用模式。

use trpl::StreamExt;fn main() {trpl::run(async {let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];let iter = values.iter().map(|n| n * 2);let mut stream = trpl::stream_from_iter(iter);while let Some(value) = stream.next().await {println!("The value was: {value}");}});
}

程序从一个数字数组开始,将其转换为迭代器,然后调用 map 函数对所有值进行翻倍操作。使用 trpl::stream_from_iter 函数将迭代器转换为流。接下来,当流中的项到达时,使用 while let 循环遍历它们。

程序输出:

The value was: 2
The value was: 4
The value was: 6
The value was: 8
The value was: 10
The value was: 12
The value was: 14
The value was: 16
The value was: 18
The value was: 20

既然 StreamExt 已经在作用域中,我们可以使用它的所有实用方法,就像使用迭代器一样。

例如,我们使用 filter 方法过滤除 3 和 5 的倍数以外的所有内容。

use trpl::StreamExt;fn main() {trpl::run(async {let values = 1..101;let iter = values.map(|n| n * 2);let stream = trpl::stream_from_iter(iter);let mut filtered =stream.filter(|value| value % 3 == 0 || value % 5 == 0);while let Some(value) = filtered.next().await {println!("The value was: {value}");}});
}

组合流

因为流是 future,我们可以将它们与任何其他类型的 future 一起使用,并以有趣的方式组合它们。

让我们首先构建一个小消息流,作为我们网络数据流的替代:

use trpl::{ReceiverStream, Stream, StreamExt};fn get_messages() -> impl Stream<Item = String> {let (tx, rx) = trpl::channel();let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];for message in messages {tx.send(format!("Message: '{message}'")).unwrap();}ReceiverStream::new(rx)
}fn main() {trpl::run(async {let mut messages = get_messages();while let Some(message) = messages.next().await {println!("{message}");}});
}

首先,我们创建一个名为 get_messages 的函数,它返回 impl Stream<Item = String>。为了实现它,我们创建了一个异步通道,并通过通道发送一系列字符串。我们还使用了一个新的类型:ReceiverStream,它通过 next 方法将 rx 接收器从 trpl::channel 转换为流。

回到 main 中,我们使用 while let 循环打印流中的所有消息。当我们运行这段代码时,我们得到了我们所期望的结果:

Message: 'a'
Message: 'b'
Message: 'c'
Message: 'd'
Message: 'e'
Message: 'f'
Message: 'g'
Message: 'h'
Message: 'i'
Message: 'j'

这些用常规的 Receiver 或 Iterator API 都能完成。让我们添加一个需要流的特性:添加一个适用于流中的每个项的超时,以及我们发出的项的延迟。

use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};fn get_messages() -> impl Stream<Item = String> {let (tx, rx) = trpl::channel();trpl::spawn_task(async move {let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];for (index, message) in messages.into_iter().enumerate() {let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };trpl::sleep(Duration::from_millis(time_to_sleep)).await;tx.send(format!("Message: '{message}'")).unwrap();}});ReceiverStream::new(rx)
}fn main() {trpl::run(async {let mut messages =pin!(get_messages().timeout(Duration::from_millis(200)));while let Some(result) = messages.next().await {match result {Ok(message) => println!("{message}"),Err(reason) => eprintln!("Problem: {reason:?}"),}}})
}

我们首先使用 timeout 方法向流添加一个超时,该方法来自 StreamExt trait。然后更新 while let 循环体,因为流现在返回一个 Result。Ok 变体表示消息及时到达;Err 变体表示在任何消息到达之前超时已经过了。我们匹配该结果,并在成功接收消息时打印消息,或者打印关于超时的通知。

接着为发送的消息添加一个可变延迟。在 get_messages 函数中,我们对偶数索引项应用 100 ms 的延迟,对奇索引项应用 300 ms 的延迟,因为我们的超时是 200 ms,所以这应该会影响一半的消息。

要在 get_messages 函数中的消息之间休眠而不阻塞,我们需要使用 async。然而,我们不能使 get_messages 本身成为一个异步函数,因为那样我们将返回 Future<Output = Stream<Item = String>> 而不是 Stream<Item = String>>。因此,我们将 get_messages 保留为返回流的常规函数,并生成一个任务来处理异步 sleep 调用。

请记住:在给定的 future 内,一切都是线性发生的。并发发生在 future 之间。

调用者必须等待 get_messages 本身来访问流,这将要求它在返回接收方流之前发送所有消息,包括每个消息之间的睡眠延迟。因此,我们设置的超时将是无用的。Stream 本身不会有延迟,它们都可能在流可用之前发生。

程序输出:

Message: 'a'
Problem: Elapsed(())
Message: 'b'
Message: 'c'
Problem: Elapsed(())
Message: 'd'
Message: 'e'
Problem: Elapsed(())
Message: 'f'
Message: 'g'
Problem: Elapsed(())
Message: 'h'
Message: 'i'
Problem: Elapsed(())
Message: 'j'

在接收偶数索引项消息前,都出现 Problem: Elapsed(()) 错误。

发送偶数索引项消息后需要休眠 300 ms,main 函数中轮询流的间隔是 200 ms。

第一次轮询的 result 匹配到 Err(reason),于是打印错误。

超时并不会阻止消息最终到达。我们仍然获得所有原始消息,因为我们的通道是无界的:它可以容纳内存中可以容纳的尽可能多的消息。

第二次轮询时,消息已经到达,result 匹配到 Ok(message),于是打印接收到的消息。

合并流

首先,让我们创建另一个流。在无限循环中,每隔 1 ms 发送一个数字。对于 async,只要在循环的每次迭代中至少有一个等待点,就不会阻塞其他任何东西。

fn get_intervals() -> impl Stream<Item = u32> {let (tx, rx) = trpl::channel();trpl::spawn_task(async move {let mut count = 0;loop {trpl::sleep(Duration::from_millis(1)).await;count += 1;tx.send(count).unwrap();}});ReceiverStream::new(rx)
}

返回类型将是 impl Stream<Item = u32>。因为所有这些都被封装在由 spawn_task创 建的任务中,所以所有这些(包括无限循环)都将随着运行时一起被清理。

回到 main 函数的 async 块,我们可以尝试合并消息和间隔流:

fn main() {trpl::run(async {let messages = get_messages().timeout(Duration::from_millis(200));let intervals = get_intervals();let merged = messages.merge(intervals);})
}

merge 方法将多个流合并为一个流,该流在项可用时立即从任何流生成项,而不强加任何特定的顺序。

但是,这个 merge 调用不能编译!这是因为这两个流具有不同的类型。messages 流的类型为 Timeout<impl Stream<Item = String>>,其中 Timeout 是为超时调用实现 Stream 的类型。interval 流的类型为 impl Stream<Item = u32>。要合并这两个流,我们需要转换其中一个以匹配另一个。

修改 interval 流:

fn main() {trpl::run(async {let messages = get_messages().timeout(Duration::from_millis(200));let intervals = get_intervals().map(|count| format!("Interval: {count}")).timeout(Duration::from_secs(10));let merged = messages.merge(intervals);let mut stream = pin!(merged);while let Some(result) = stream.next().await {match result {Ok(message) => println!("{message}"),Err(reason) => eprintln!("Problem: {reason:?}"),}}})
}

首先,我们使用 map 方法将流中的数字转换为字符串。

其次,我们需要匹配 messages 流的类型 Timeout<…>,这里我们用 timeout 创建了一个超时。

最后,我们需要使流可变,以便 while let 循环的 next 调用可以遍历流,并将其 pin,以便这样做是安全的。

运行程序,将会出现两个问题。首先,它永远不会停止!其次,来自英文字母的消息将被隐藏在所有间隔计数器消息的中间。

...
Interval: 329
Interval: 330
Interval: 331
Interval: 332
Interval: 333
Interval: 334
Interval: 335
Interval: 336
Interval: 337
Interval: 338
Message: 'd'
Interval: 339
Interval: 340
Interval: 341
Interval: 342
Interval: 343
Interval: 344
Interval: 345
Interval: 346
...

修改程序,解决这两个问题:

fn main() {trpl::run(async {let messages = get_messages().timeout(Duration::from_millis(200));let intervals = get_intervals().map(|count| format!("Interval: {count}")).throttle(Duration::from_millis(100)).timeout(Duration::from_secs(10));let merged = messages.merge(intervals).take(20);let mut stream = pin!(merged);while let Some(result) = stream.next().await {match result {Ok(message) => println!("{message}"),Err(reason) => eprintln!("Problem: {reason:?}"),}}})
}

首先,我们在 intervals 流上使用 throttle 方法。节流是一种限制函数调用速率的方法。在这种情况下,限制流轮询的频率为每 100 ms 一次,这样 intervals 流就不会压倒 messages 流。

take 方法用于限制从流中接受的项的数量。我们将 take 方法应用于合并的流,因为我们想限制最终的输出,而不仅仅是一个流或另一个流。

现在,当我们运行程序时,它在从合并流中取出 20 个项后就停止。

Interval: 1
Message: 'a'
Interval: 2
Interval: 3
Problem: Elapsed(())
Interval: 4
Message: 'b'
Interval: 5
Message: 'c'
Interval: 6
Interval: 7
Problem: Elapsed(())
Interval: 8
Message: 'd'
Interval: 9
Message: 'e'
Interval: 10
Interval: 11
Problem: Elapsed(())
Interval: 12

尽管我们有一个源流可以每毫秒产生一个事件,但是节流调用产生了一个新的流,它包装了原始流,这样原始流就只能以节流速率轮询,而不是它自己的“本地”速率。我们并非忽略 intervals 流发送的消息,而是从一开始就不会产生这些消息!这是 Rust future 在工作中固有的“懒惰”,允许我们选择我们的性能特征。

我们还需要处理最后一件事:错误!对于这两种基于通道的流,当通道的另一端关闭时,send 调用可能会失败。到目前为止,我们通过调用 unwrap 忽略了这种可能性,但在一个行为良好的应用程序中,我们应该显式处理错误,至少通过结束循环,这样我们就不会再发送任何消息。

修改两个函数:

fn get_messages() -> impl Stream<Item = String> {let (tx, rx) = trpl::channel();trpl::spawn_task(async move {let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];for (index, message) in messages.into_iter().enumerate() {let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };trpl::sleep(Duration::from_millis(time_to_sleep)).await;if let Err(send_error) = tx.send(format!("Message: '{message}'")) {eprintln!("Cannot send message '{message}': {send_error}");break;}}});ReceiverStream::new(rx)
}fn get_intervals() -> impl Stream<Item = u32> {let (tx, rx) = trpl::channel();trpl::spawn_task(async move {let mut count = 0;loop {trpl::sleep(Duration::from_millis(1)).await;count += 1;if let Err(send_error) = tx.send(count) {eprintln!("Could not send interval {count}: {send_error}");break;};}});ReceiverStream::new(rx)
}

两个函数都使用了一个简单的错误策略:打印问题,然后跳出循环。

相关文章:

  • 光谱数据分析的方法有哪些?
  • “交错推理”降低首token耗时,并且显著提升推理准确性!!
  • 使用 PyMuPDF 和 PySide6/PyQt6 编写的 PDF 查看器 (显示树状书签和缩略图列表,没有文字选择功能)
  • 异步爬虫---
  • C++11 Generalized(non-trivial) Unions:从入门到精通
  • 音乐调性关系与音准训练指南
  • 深刻理解深度学习的注意力机制Attention
  • vLLM用2*(8 H800)部署DeepSeek-R1-0528-685B
  • ubuntu 拒绝ssh连接,连不上ssh,无法远程登录: Connection failed.
  • 第18篇:数据库中间件架构中的服务治理与限流熔断机制设计
  • [Java恶补day24] 74. 搜索二维矩阵
  • 【MacOS】系统数据占用超大存储空间,原因、定位、清理方式记录
  • vue常用框架,及更新内容
  • ServiceNow培训第1期
  • 50种3D效果演示(OpenGL)
  • openeuler 虚拟机:Nginx 日志分析脚本
  • 从开发到上线:iOS App混淆保护的完整生命周期管理(含Ipa Guard)
  • 直角坐标系-zernike多项式波面拟合
  • 初学时间复杂度
  • MRI中的“髓鞘探测器”:T1w/T2w比值揭秘
  • 网站改备案信息/百度的网站网址
  • django 开放api 做网站/免费seo视频教学
  • 小程序平台收费/郑州seo服务技术
  • 广州外贸网站效果/绍兴seo推广
  • 给孩子做的饭网站/西安seo推广公司
  • 有专门做市场分析的网站么/网站优化比较好的公司