当前位置: 首页 > news >正文

Rust语言典型并发模式小结

摘要

Rust 语言以其独特的“所有权”系统和 Send/Sync trait 在编译时保证了内存安全与线程安全,为并发编程提供了坚实的基础。小子梳理了 Rust 中处理不同类型并发任务的经典模式,从底层的标准库线程抽象,到专为数据并行设计的 Rayon 框架,再到面向 I/O 密集型场景的 Tokio 异步运行时,以飨各位看官。

1. 基础并发原语:std::thread 与同步机制

Rust 标准库 std::thread 提供了与操作系统线程 1:1 映射的并发原语。每个 thread::spawn 调用都会创建一个新的 OS 线程,这决定了其适用于计算密集型(CPU-bound)任务,而非需要大规模并发的 I/O 密集型(I/O-bound)任务,因为 OS 线程的创建和上下文切换成本相对较高。

基于 std::thread,可以构建出以下几种基础并发模式:

  • 1.1 Fork-Join 模型:这是最基础的并行计算模式。通过将任务分解(Fork),分发给多个线程执行,然后等待所有线程完成并合并(Join)结果。早期的实现依赖于 move 闭包来转移所有权,这限制了线程间对外部栈数据的安全访问。

    代码示例:

    use std::thread;let data = vec![1, 2, 3, 4, 5, 6, 7, 8];
    let chunk1 = data[0..4].to_vec();
    let chunk2 = data[4..8].to_vec();let handle1 = thread::spawn(move || chunk1.iter().sum::<i32>());
    let handle2 = thread::spawn(move || chunk2.iter().sum::<i32>());let sum1 = handle1.join().unwrap();
    let sum2 = handle2.join().unwrap();assert_eq!(sum1 + sum2, 36);
    
  • 1.2 作用域线程 (Scoped Threads):自 Rust 1.63 引入的 std::thread::scope 是对 Fork-Join 模型的重大改进。它通过词法作用域确保了派生线程的生命周期不会超过其父作用域,从而允许线程安全地借用外部作用域的数据。

    代码示例:

       use std::thread;let mut numbers = vec![1, 2, 3, 4];// 使用作用域线程安全地借用并修改 `numbers`thread::scope(|s| {// 将 `numbers` 分割成两个不重叠的可变切片,以向编译器证明线程安全let (left, right) = numbers.split_at_mut(2);s.spawn(move || {for x in left {*x *= 2;}});s.spawn(move || {for x in right {*x *= 2;}});});assert_eq!(numbers, vec![2, 4, 6, 8]);
    
  • 1.3 共享状态并发 (Shared-State Concurrency):当多个线程需要访问可变共享数据时,Arc<Mutex<T>> 是 Rust 中的经典范式。Arc 允许多个线程拥有对同一份数据的引用,而 Mutex(互斥锁)则通过锁定机制保证了在任意时刻只有一个线程能修改数据,从而防止数据竞争。

    代码示例:

    use std::sync::{Arc, Mutex};
    use std::thread;let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];for _ in 0..10 {let counter_clone = Arc::clone(&counter);let handle = thread::spawn(move || {let mut num = counter_clone.lock().unwrap();*num += 1;});handles.push(handle);
    }for handle in handles {handle.join().unwrap();
    }assert_eq!(*counter.lock().unwrap(), 10);
    
  • 1.4 消息传递并发 (Message-Passing Concurrency):此模式遵循 “Do not communicate by sharing memory; instead, share memory by communicating” 的哲学。线程通过通道(Channel)进行通信,std::sync::mpsc(多生产者,单消费者)是标准库的实现。这种模式可以构建出高效的“工作者池”(Worker Pool)模型,其工作流程如下:

    Channels
    Worker Threads
    Main Thread
    sends jobs
    receives job
    receives job
    receives job
    sends result
    sends result
    sends result
    receives results
    Job Channel
    Result Channel
    Worker 1
    Worker 2
    Worker N
    Producer

    代码示例:

    use std::sync::{mpsc, Arc, Mutex};
    use std::thread;let jobs = vec![1, 2, 3, 4, 5];
    let job_count = jobs.len();
    let (job_tx, job_rx) = mpsc::channel();
    let job_rx = Arc::new(Mutex::new(job_rx)); // 允许多个工作者共享接收端
    let (result_tx, result_rx) = mpsc::channel();for _ in 0..2 { // 创建 2 个工作者线程let rx = Arc::clone(&job_rx);let tx = result_tx.clone();thread::spawn(move || {while let Ok(job) = rx.lock().unwrap().recv() {// 模拟工作:将数字加倍tx.send(job * 2).unwrap();}});
    }// 主线程发送任务
    for job in jobs {job_tx.send(job).unwrap();
    }
    drop(job_tx); // 关闭发送端,通知工作者任务已结束// 收集结果
    let mut results: Vec<i32> = result_rx.iter().take(job_count).collect();
    results.sort(); // 结果顺序不确定,排序后断言assert_eq!(results, vec![2, 4, 6, 8, 10]);
    

2. 数据并行框架:Rayon

Rayon 是一个专为数据并行(Data Parallelism)设计的第三方库,它极大地简化了将顺序计算转换为并行计算的过程。其核心优势在于对“工作窃取”(Work-Stealing)调度算法的精妙实现。

代码示例:

use rayon::prelude::*;let data = vec![1, 2, 3, 4, 5];// 顺序计算
let sum_sequential = data.iter().map(|x| x * x).sum::<i32>();// Rayon 并行计算
let sum_parallel = data.par_iter().map(|x| x * x).sum::<i32>();assert_eq!(sum_sequential, sum_parallel);

底层机制

  1. 全局线程池:Rayon 在首次使用时会创建一个全局的、与 CPU 逻辑核心数相等的线程池。

  2. 工作窃取调度器:每个工作线程都维护一个本地的双端队列(Deque)。线程从自身队列的尾部(LIFO)获取任务。当一个线程空闲时,它会成为“窃贼”,从其他线程队列的头部(FIFO)“窃取”任务。这种机制的示意图如下:

    Rayon Thread Pool
    Worker 1 (Victim)
    Worker 2 (Thief)
    Pops from its Tail (LIFO)
    Steals from other's Head (FIFO)
    Tail
    (empty)
    Head
    Tail
    Task B
    Task A
    Head
    Thread 1
    Thread 2
  3. 分治迭代器 (ParallelIterator):Rayon 的 par_iter() 采用分治策略,递归地将大数据块分割成小块,为工作窃取调度器提供了丰富的任务源。

3. I/O 并发运行时:Tokio

当应用程序的瓶颈在于等待外部资源(如网络、磁盘)时,异步编程是更优的解决方案。Tokio是 Rust 生态中最成熟的异步运行时,虽然不在标准库,但已经成为事实的“标准”库。它实现了 M:N 线程模型,即用少量(M个)系统线程来驱动大量(N个)轻量级的异步任务(Futures/Tasks)。

核心原理

  • 非阻塞 I/O:Tokio 的所有 I/O 操作都是非阻塞的。

  • 事件驱动调度:当一个异步任务等待 I/O 时,它会让出控制权。当 I/O 事件完成时,Tokio 的事件循环会唤醒相应的任务。

  • M:N 调度模型:如下图所示,少量 OS 线程高效地复用,执行成千上万的异步任务。

    Tokio Runtime
    OS Thread 1
    OS Thread 2
    OS Thread M
    Task N
    Task D
    Task E
    Task A
    Task B
    Task C

与 Go 语言协程的对比
熟悉Go 语言看官知道Go 语言的 go my_func() 语法非常简洁,它创建的是一个协程 (Goroutine)。协程是由 Go 运行时(Runtime)管理的、非常轻量级的用户态线程。

在 Rust 中,与 Goroutine 最直接的对应物是异步任务 (Asynchronous Task)

这里的关键区别在于:

  • Go: 协程和其运行时是内置于语言的。go 关键字就是语法的一部分。
  • Rust: 语言本身只提供了 async/.await 这两个关键字来定义异步行为。但如何执行这些异步任务,则是由一个称为异步运行时 (Async Runtime) 的库来负责的(如tokio)。

因此,go my_func() 在 Rust 中的等价操作是 tokio::spawn(async { ... }) (或者使用其他异步运行时如 async-stdspawn 函数)。

对比分析:
特性Go Goroutine (go)Rust Async Task (tokio::spawn)
概念轻量级用户态线程 (协程)轻量级用户态任务 (Task/Future)
创建语法go my_func()tokio::spawn(async { my_async_func().await; })
调度由 Go 运行时在 M:N 模型上调度由 Tokio 运行时在 M:N 模型上调度
运行时语言内置,开发者无需选择库提供,开发者需选择 (最常用的是 Tokio)
动态增长的栈栈大小在编译时确定(Future 是状态机)
通信channels 是首选通信方式同样有强大的 channels (如 tokio::sync::mpsc),也常用 Arc<Mutex<T>>
核心差异运行时与语言深度绑定,开箱即用。语言提供异步原语,运行时作为库解耦,提供了更大的灵活性和控制力。

当你看到 Go 代码里的 go 关键字时,你应该想到 Rust 代码里的 tokio::spawn。它们都实现了同样的目标:以极低的成本启动成千上万个并发任务,并由一个高效的运行时在少量系统线程上进行调度,非常适合 I/O 密集型场景。

4. 混合负载并发模型:Tokio + Rayon

在现实世界的复杂应用中,往往同时存在 I/O 密集和 CPU 密集的双重负载。经典的解决方案是隔离不同类型的工作负载,形成一种混合并发模型。

模型架构

  1. Tokio 作为主调度器:处理所有 I/O 密集型任务。
  2. Rayon 作为并行计算引擎:执行 CPU 密集型计算。
  3. tokio::task::spawn_blocking 作为桥梁:将同步阻塞的计算任务卸载到专用的线程池,防止其阻塞异步运行时。

该模型的工作流程如下图所示:

发现需要密集计算
将计算任务移交
在此执行
计算完成
.await 异步等待
封装响应
外部请求/事件
Tokio 运行时 (异步任务)
调用 spawn_blocking
阻塞任务线程池
Rayon 并行计算
.par_iter()...
返回结果
Tokio 运行时 (异步任务)
返回响应/完成

4.1 可运行示例:基于 Axum 的混合负载 Web 服务

以下示例使用 Axum Web 框架构建一个 API 端点,演示了该混合模型。

项目设置 (Cargo.toml):

[package]
name = "concurrency-example"
version = "0.1.0"
# use cargo 1.87+
edition = "2024" [dependencies]
tokio = { version = "1.45", features = ["full"] } 
axum = "0.8"
rayon = "1.10"futures = "0.3"

源代码 (src/main.rs):

use axum::{routing::get, Router};
use rayon::prelude::*;
use std::time::Instant;
use tokio::net::TcpListener;/// 模拟一个 CPU 密集型计算函数。
/// 它使用 Rayon 的并行迭代器来充分利用所有 CPU 核心。
fn complex_computation(data: Vec<u64>) -> u64 {data.par_iter().map(|&n| {// 这是一个任意的、耗时的计算,用于模拟真实负载// 修正:为 fold 的初始值 0 指定类型为 u64(0..n % 500).fold(0u64, |acc, x| acc.wrapping_add(x as u64))}).sum()
}/// Axum 的异步 Handler。
/// 它负责处理 Web 请求,并在需要时将计算任务卸载到阻塞线程池。
async fn handle_computation() -> String {println!("接收到计算请求...");let start_time = Instant::now();// 准备计算数据let data: Vec<u64> = (1000..2000).collect();// 使用 `spawn_blocking` 将 CPU 密集型任务移出异步运行时。// `move` 关键字确保 `data` 的所有权被移入闭包。let result = tokio::task::spawn_blocking(move || {complex_computation(data)}).await.unwrap(); // .await 异步等待计算完成并获取结果let duration = start_time.elapsed();println!("计算完成,耗时: {:?}", duration);format!("计算结果是: {}. 计算耗时: {:.2}ms",result,duration.as_secs_f64() * 1000.0)
}#[tokio::main]
async fn main() {let app = Router::new().route("/compute", get(handle_computation));// 修正:使用 tokio::net::TcpListener 和 axum::serve,以适配新版 axum APIlet listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();println!("浏览器中请求 http://{}/compute", listener.local_addr().unwrap());axum::serve(listener, app).await.unwrap();
}

5. 小结

Rust 提供了层次分明、功能强大的并发工具集。开发者应根据任务的具体性质选择最合适的并发模型:

  • 对于数据并行CPU 密集型任务,Rayon 提供了较好的易用性和性能。
  • 对于I/O 密集型和需要大规模并发的场景,Tokio 的异步模型是标准选择。
  • 对于混合负载的现代应用,通过 tokio::task::spawn_blockingTokioRayon 结合,形成一种职责分离、高效协作的混合并发模型,是构建高性能、高响应性系统的关键策略。

对这些模式的深刻理解和恰当运用,是发挥 Rust 语言全部潜能的核心所在。希望这篇小总结对各位看官有所裨益。

相关文章:

  • Day32
  • verl multi-node train 教程
  • 创新项目实训纪实——总结与反思
  • vue中computed和watch区别
  • RabbitMQ--集群副本
  • 通过 O-RAN 传感进行异常识别和防护
  • 【橘子的AI | 每日一课】Day4!机器学习 (ML) 基础
  • Python 基础语法 (4)【适合0基础】
  • StarRocks Community Monthly Newsletter (May)
  • 线性代数(1)线性方程组的多种解法
  • 如何设计一个既提供绘图Tools又提供example_data的MCP服务器:
  • 计算机组成原理期末题目解析
  • nohz_full 参数对内核软硬锁检测机制的影响分析
  • 大模型笔记6:微调
  • Redis中的zset的底层实现
  • 【Create my OS】5 内核线程
  • 【图片识别改名】如何批量识别大量图片的文字并重命名图片,基于WPF和京东OCR识别接口的实现方案
  • srm管理系统供应商管理在线询价比价管理电子采购(java)
  • redis序列化
  • 嵌入式学习笔记C语言阶段--16函数指针
  • 视频发布到哪些平台可以赚钱/吉林关键词优化的方法
  • 男人女人做羞羞事网站/网络营销师怎么考
  • 动易网站怎么进入后台/深圳百度首页优化
  • 重庆电子工程职业学院教务网/seo是干啥的
  • 做类似淘宝的网站开发需要什么/seo优化公司如何做
  • 集团网站建设方案书/市场营销公司排名