当前位置: 首页 > news >正文

【Rust线程池】如何构建Rust线程池、Rayon线程池用法详细解析

在这里插入图片描述

✨✨ 欢迎大家来到景天科技苑✨✨

🎈🎈 养成好习惯,先赞后看哦~🎈🎈

🏆 作者简介:景天科技苑
🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。
🏆《博客》:Rust开发,Python全栈,Golang开发,云原生开发,PyQt5和Tkinter桌面开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi,flask等框架,云原生K8S,linux,shell脚本等实操经验,网站搭建,数据库等分享。

所属的专栏:Rust高性能并发编程
景天的主页:景天科技苑

在这里插入图片描述

文章目录

  • Rust线程池
    • rayon 线程池
      • 一、Rayon核心API详解
        • 1.1 常用方法
        • 1.2 常见并行组合子(Combinators)
        • 1.3 自定义并行任务
      • 二、Rayon线程池ThreadPoolBuilder
        • 1. new() 方法
        • 2. num_threads() 方法
        • 3. thread_name() 方法
        • 4. build() 方法
        • 5. build_global 方法
        • 6. 其他方法
        • 7. 它还提供了一些回调函数的设置
        • 9. 注意事项

Rust线程池

线程池是一种并发编程的设计模式,它由一组预先创建的线程组成,用于执行多个任务。
线程池的主要作用是在任务到达时,重用已创建的线程,避免频繁地创建和销毁线程,从而提高系统的性能和资源利用率。
线程池通常用于需要处理大量短期任务或并发请求的应用程序。

线程池的优势包括:
• 减少线程创建和销毁的开销:线程的创建和销毁是一项昂贵的操作,线程池通过重用线程减少了这些开销,提高了系统的响应速度和效率。
• 控制并发度:线程池可以限制同时执行的线程数量,从而有效控制系统的并发度,避免资源耗尽和过度竞争。
• 任务调度和负载均衡:线程池使用任务队列和调度算法来管理和分配任务,确保任务按照合理的方式分配给可用的线程,实现负载均衡和最优的资源利用。

rayon 线程池

Rayon 是 Rust 中的一个并行计算库,它可以让你更容易地编写并行代码,以充分利用多核处理器。
Rayon 提供了一种简单的 API,允许你将迭代操作并行化,从而加速处理大规模数据集的能力。
除了这些核心功能外,它还提供构建线程池的能力。
rayon::ThreadPoolBuilder 是 Rayon 库中的一个结构体,用于自定义和配置 Rayon线程池的行为。
线程池是 Rayon 的核心部分,它管理并行任务的执行。
通过使用ThreadPoolBuilder,你可以根据你的需求定制 Rayon 线程池的行为,以便更好地适应你的并行计算任务。
在创建线程池之后,你可以使用 Rayon 提供的方法来并行执行任务,利用多核处理器的性能优势。

一、Rayon核心API详解

Rayon最核心的API是并行迭代器(ParallelIterator),其中包含丰富的方法集。

1.1 常用方法

par_iter():创建并行迭代器(只读)。
par_iter_mut():创建可变并行迭代器。
into_par_iter():消耗数据源,创建并行迭代器。
示例:

use rayon::prelude::*;fn main() {let mut nums = vec![10, 20, 30, 40];// 并行修改元素值nums.par_iter_mut().for_each(|x| *x += 1);println!("{:?}", nums);
}

在这里插入图片描述

不能直接在 par_iter 中嵌套 par_iter,否则会阻塞或 panic。使用独立线程池可以避免嵌套并行死锁。

1.2 常见并行组合子(Combinators)

map():并行映射。
filter():并行过滤。
reduce():并行归约,合并结果。
fold():类似reduce,但支持初始状态和结果合并。
find_any() / find_first():并行查找元素。

示例(并行筛选与转换):

use rayon::prelude::*;fn main() {let nums = (0..1_000_000).collect::<Vec<_>>();// 并行筛选出偶数并求平方let squares: Vec<_> = nums.par_iter().filter(|&&x| x % 2 == 0).map(|&x| x * x).collect();println!("筛选后元素个数:{}", squares.len());
}

这段代码,报错就是要计算的数据超过i32类型的最大值导致的
我们在创建squares的时候,类型Vec<_>,编译器会默认为i32,计算的数据很大,迭代0到1000000,然后计算偶数的平方,超过i32最大值,导致报错
在这里插入图片描述

🚩 解决方案:
创建squares时,指定更大的数据类型:

use rayon::prelude::*;fn main() {let nums = (0..1_000_000).collect::<Vec<_>>();// 并行筛选出偶数并求平方//将squares指定更大的数据类型let squares: Vec<u128> = nums.par_iter().filter(|&&x| x % 2 == 0).map(|&x| x * x).collect();println!("筛选后元素个数:{}", squares.len());
}

在这里插入图片描述

1.3 自定义并行任务

Rayon提供了更底层的接口,让你可以手动并行执行任务。
1.3.1 join方法
执行两个并行任务,等待任务完成后继续执行。

use rayon::join;fn fib(n: usize) -> usize {if n < 2 {return n;}let (a, b) = join(|| fib(n - 1),|| fib(n - 2));a + b
}fn main() {let result = fib(20);println!("斐波那契数:{}", result);
}

在这里插入图片描述

1.3.2 scope方法
并行执行多个互相独立的任务,生命周期灵活控制。

use rayon::scope;fn main() {let mut a = 0;let mut b = 0;scope(|s| {s.spawn(|_| {a = expensive_compute_a();});s.spawn(|_| {b = expensive_compute_b();});});println!("结果:{}, {}", a, b);
}fn expensive_compute_a() -> i32 {100
}
fn expensive_compute_b() -> i32 {200
}

在这里插入图片描述

二、Rayon线程池ThreadPoolBuilder

rayon::ThreadPoolBuilder 是 Rayon 库中用于 自定义线程池配置 的结构体,适用于对并发行为有更精细控制需求的场景。
Rayon 默认使用一个全局线程池(rayon::spawn、par_iter 默认使用它),但在某些情况下,我们希望:
控制线程池线程数量;
设置线程名、线程栈大小;
使用多个独立的线程池隔离并发任务;
嵌套 Rayon 调用时避免死锁(多线程池互不干扰);
这时,就需要用到 ThreadPoolBuilder。

ThreadPoolBuilder 是以设计模式中的构建者模式设计的, 以下是一些ThreadPoolBuilder 的主要方法:

1. new() 方法

创建一个新的 ThreadPoolBuilder 实例

use rayon::ThreadPoolBuilder;
fn main() {let builder = ThreadPoolBuilder::new();
}
2. num_threads() 方法

设置线程池的线程数量。
你可以通过这个方法指定线程池中的线程数,以控制并行度。
默认情况下,Rayon 会根据 CPU 内核数量自动设置线程数。

use rayon::ThreadPoolBuilder;
fn main() {let builder = ThreadPoolBuilder::new().num_threads(4); //设置线程池中的线程数量为4
}
3. thread_name() 方法

为线程池中的线程设置一个名称,这可以帮助你在调试时更容易识别线程。

use rayon::ThreadPoolBuilder;
fn main() {let builder = ThreadPoolBuilder::new().thread_name(|i| format!("worker-{}", i));
}

查看每次执行的线程名

use rayon::prelude::*;
use rayon::ThreadPoolBuilder;
use std::thread;fn main() {let pool = ThreadPoolBuilder::new().num_threads(4).thread_name(|i| format!("my-pool-{}", i)).build().unwrap();pool.install(|| {let v: Vec<_> = (0..1000).into_par_iter().map(|x| {// 获取当前线程名let thread_thread = thread::current();let thread_name = thread_thread.name().unwrap_or("unknown");println!("元素 {} 由线程 {} 处理", x, thread_name);x * x}).collect();println!("结果: {:?}", &v[..10]);});
}

在这里插入图片描述

4. build() 方法

通过 build 方法来创建线程池。
这个方法会将之前的配置应用于线程池并返回一个 rayon::ThreadPool 实例。

use rayon::ThreadPoolBuilder;
fn main() {let pool = ThreadPoolBuilder::new().num_threads(4).thread_name(|i| format!("worker-{}", i)).build().unwrap(); // 使用unwrap()来处理潜在的错误
}
5. build_global 方法

通过 build_global 方法创建一个全局的线程池
不推荐你主动调用这个方法初始化全局的线程池,使用默认的配置就好,记得全局的线程池只会初始化一次。多次调用会 panic

fn main() {rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap();
}
6. 其他方法

ThreadPoolBuilder 还提供了其他一些方法,用于配置线程池的行为,
如 stack_size() 用于设置线程栈的大小。
设置每个线程的栈大小(单位:字节):

let builder = rayon::ThreadPoolBuilder::new().stack_size(8 * 1024 * 1024); // 8MB

适用于递归深度大、调用栈复杂的程序。

7. 它还提供了一些回调函数的设置

start_handler() 用于设置线程启动时的回调函数等。
线程启动时调用的回调函数,可以用于初始化日志、TLS 等:

let builder = rayon::ThreadPoolBuilder::new().start_handler(|idx| {println!("线程 {} 启动", idx);});

spawn_handler 实现定制化的函数来产生线程。
panic_handler 提供对panic 处理的回调函数。
exit_handler 提供线程退出时的回调。

let builder = rayon::ThreadPoolBuilder::new().exit_handler(|idx| {println!("线程 {} 退出", idx);});

下面这个例子演示了使用 rayon 线程池计算斐波那契数列:

//使用ranyon线程池计算斐波那契数列
fn fib(n: u128) -> u128 {if n == 0 {return 0;}if n == 1 {return 1;}let (a, b) = rayon::join(|| fib(n - 1),|| fib(n - 2));a + b
}fn rayon_threadpool() {let pool = rayon::ThreadPoolBuilder::new().num_threads(10).build().unwrap();pool.install(|| {let result = fib(20);println!("result = {}", result);});
}fn main() {rayon_threadpool();
}

在这里插入图片描述

• rayon::ThreadPoolBuilder 用来创建一个线程池。设置使用 10 个线程
• pool.install() 在线程池中运行 fib
• rayon::join 用于并行执行两个函数并等待它们的结果。它使得你可以同时执行两个独立的任务,然后等待它们都完成,以便将它们的结果合并到一起。
通过在 join 中传入 fib 递归任务, 实现并行计算 fib 数列
与直接 spawn thread 相比, 使用 rayon 的线程池有以下优点:
• 线程可重用, 避免频繁创建/销毁线程的开销
• 线程数可配置, 一般根据 CPU 核心数设置
• 避免大量线程造成资源竞争问题

9. 注意事项

build_global() 只能调用一次;多次调用会 panic;
自定义线程池的 .install() 不能跨线程池嵌套调用 .par_iter();
不建议将阻塞操作(如IO)放入线程池中执行,Rayon主要用于CPU密集型任务;
一旦线程池创建完成,线程数不可更改,需重新构建。

http://www.dtcms.com/a/297441.html

相关文章:

  • SQLFluff
  • 数字增加变化到目标数值动画,js实现
  • react+threejs实现自适应分屏查看/3D场景对比功能/双场景对比查看器
  • GitHub git push 推送大文件
  • Linux: network: wireshark: tcp的segment重组是怎么判断出来的
  • Git下载与安装全攻略
  • reflections:Java非常好用的反射工具包
  • SEC_FirePower 第二天作业
  • 【深度学习新浪潮】Claude code是什么样的一款产品?
  • Keepalived 原理及配置(高可用)
  • 校园二手交易小程序的设计与实现
  • 局域网 IP地址
  • mid360连接机载电脑,远程桌面连接不上的情况
  • 智慧校园(智能出入口控制系统,考勤,消费机,电子班牌等)标准化学校建设,加速业务规模发展,满足学校、家长及学生对智能化、便捷化校园管理的需求清单如下
  • 三骏破局AI时代:电科金仓以“马背智慧”重定义数据库一体机
  • 从数据脱敏到SHAP解释:用Streamlit+XGBoost构建可复现的川崎病诊断系统
  • 12. isaacsim4.2教程-ROS 导航
  • 剖析 Web3 与传统网络模型的安全框架
  • IAR编辑器如何让左侧的工具栏显示出来?
  • Spring之【Bean后置处理器】
  • ELK Stack技术栈
  • 编译器-gcc/g++和自动化构建-make/Makefile
  • 软件工程:软件需求
  • Maximator增压器DLE 5-1-GG Artikelnr.1000015617
  • 走入Linux的世界:编辑器Vim
  • WPF高级学习(一)
  • 仙人掌cacti中的RCE案例
  • 虚拟直线阈值告警人员计数算法暑期应用
  • VoWiFi技术深度解析:架构、流程与演进
  • Oracle MCP本地部署测试