第15章 并发编程

文章目录
- 第15章 并发编程
- 15.1 使用线程同时运行代码
- 线程基础与创建
- 基本线程操作
- 线程与所有权
- 线程管理与配置
- 错误处理与恐慌传播
- 15.2 消息传递并发
- 通道基础
- 基本通道操作
- 多生产者场景
- 同步通道
- 高级通道模式
- 15.3 共享状态并发
- Mutex互斥锁
- 基本Mutex使用
- 高级Mutex模式
- RwLock读写锁
- 条件变量
- 原子类型
- 15.4 Sync和Send trait
- Send Trait
- Sync Trait
- 自定义Send和Sync类型
- 实战:构建并发Web服务器
- 并发编程最佳实践
- 性能优化建议
- 总结
第15章 并发编程
并发编程是现代软件开发中不可或缺的重要组成部分,它允许程序同时执行多个任务,从而充分利用多核处理器的能力,提高程序的性能和响应性。Rust以其独特的所有权系统和类型系统,在保证内存安全的同时,提供了强大而灵活的并发编程能力。本章将深入探讨Rust的并发编程特性,包括线程管理、消息传递、共享状态以及Sync和Send trait。
15.1 使用线程同时运行代码
线程基础与创建
线程是操作系统能够进行运算调度的最小单位,Rust标准库提供了强大的线程支持,允许我们创建和管理多个并发执行的线程。
基本线程操作
use std::thread;
use std::time::Duration;fn thread_basics() {println!("=== 线程基础演示 ===");// 创建新线程let handle = thread::spawn(|| {for i in 1..=5 {println!("子线程: 计数 {}", i);thread::sleep(Duration::from_millis(500));}});// 主线程继续执行for i in 1..=3 {println!("主线程: 计数 {}", i);thread::sleep(Duration::from_millis(300));}// 等待子线程完成handle.join().unwrap();println!("所有线程执行完成");
}
线程与所有权
由于线程可能比创建它们的函数活得更久,因此线程闭包经常需要使用move关键字来获取所需数据的所有权。
fn thread_ownership() {println!("=== 线程与所有权 ===");let data = vec![1, 2, 3, 4, 5];// 使用move关键字将data的所有权转移到线程中let handle = thread::spawn(move || {println!("在线程中访问数据: {:?}", data);// data在这里被消费});// 这里不能再使用data,因为所有权已经移动到了线程中// println!("{:?}", data); // 这行会编译错误handle.join().unwrap();// 共享不可变数据let shared_data = Arc::new(vec!["hello", "world", "from", "threads"]);let mut handles = vec![];for i in 0..3 {let data_clone = Arc::clone(&shared_data);let handle = thread::spawn(move || {println!("线程 {}: 数据长度 = {}", i, data_clone.len());});handles.push(handle);}for handle in handles {handle.join().unwrap();}
}
线程管理与配置
Rust允许我们配置线程的各个方面,包括栈大小、名称等。
fn thread_management() {println!("=== 线程管理 ===");// 创建带配置的线程let builder = thread::Builder::new().name("worker-thread".into()).stack_size(32 * 1024); // 32KB栈大小let handle = builder.spawn(|| {let name = thread::current().name().unwrap_or("unnamed");println!("线程 '{}' 开始执行", name);// 模拟工作for i in 1..=3 {println!("线程 '{}' 工作 {}...", name, i);thread::sleep(Duration::from_millis(200));}println!("线程 '{}' 执行完成", name);}).unwrap();handle.join().unwrap();// 获取系统信息println!("可用的并行度: {:?}", thread::available_parallelism());
}
错误处理与恐慌传播
线程中的恐慌不会自动传播到主线程,但我们可以通过JoinHandle来检测和处理。
fn thread_error_handling() {println!("=== 线程错误处理 ===");// 正常执行的线程let success_handle = thread::spawn(|| {println!("这个线程会正常完成");42});// 会恐慌的线程let panic_handle = thread::spawn(|| {println!("这个线程将会恐慌");panic!("故意制造的恐慌!");});// 处理正常线程的结果match success_handle.join() {Ok(result) => println!("正常线程结果: {}", result),Err(_) => println!("正常线程意外恐慌"),}// 处理恐慌线程match panic_handle.join() {Ok(_) => println!("恐慌线程正常完成 - 这不应该发生"),Err(e) => {println!("捕获到线程恐慌");if let Some(s) = e.downcast_ref::<&str>() {println!("恐慌信息: {}", s);}}}
}
15.2 消息传递并发
通道基础
通道是Rust中实现消息传递并发的主要机制,它允许多个线程之间通过发送和接收消息来进行通信。
基本通道操作
use std::sync::mpsc; // mpsc: 多个生产者,单个消费者fn channel_basics() {println!("=== 通道基础 ===");// 创建通道let (tx, rx) = mpsc::channel();// 在子线程中发送消息thread::spawn(move || {let messages = vec!["hello", "world", "from", "thread"];for msg in messages {tx.send(msg.to_string()).unwrap();thread::sleep(Duration::from_millis(100));}// tx在这里被丢弃,通道会自动关闭});// 在主线程中接收消息for received in rx {println!("收到: {}", received);}println!("通道已关闭,所有消息已接收");
}
多生产者场景
fn multiple_producers() {println!("=== 多生产者示例 ===");let (tx, rx) = mpsc::channel();let mut handles = vec![];// 创建多个生产者线程for producer_id in 0..3 {let thread_tx = tx.clone(); // 克隆发送端let handle = thread::spawn(move || {for i in 0..3 {let message = format!("生产者 {} - 消息 {}", producer_id, i);thread_tx.send(message).unwrap();thread::sleep(Duration::from_millis(50));}println!("生产者 {} 完成", producer_id);});handles.push(handle);}// 丢弃原始的tx,这样当所有克隆的tx都被丢弃时,通道会关闭drop(tx);// 接收所有消息let mut received_count = 0;for message in rx {println!("消费者收到: {}", message);received_count += 1;}println!("总共收到 {} 条消息", received_count);// 等待所有生产者线程完成for handle in handles {handle.join().unwrap();}
}
同步通道
Rust还提供了同步通道,它在发送时会阻塞直到消息被接收。
fn sync_channels() {println!("=== 同步通道 ===");// 创建同步通道,缓冲区大小为2let (tx, rx) = mpsc::sync_channel(2);let producer_handle = thread::spawn(move || {for i in 0..5 {println!("生产者准备发送消息 {}", i);tx.send(i).unwrap();println!("生产者成功发送消息 {}", i);}});// 慢速消费,演示同步行为for _ in 0..5 {thread::sleep(Duration::from_millis(500));let msg = rx.recv().unwrap();println!("消费者收到: {}", msg);}producer_handle.join().unwrap();
}
高级通道模式
fn advanced_channel_patterns() {println!("=== 高级通道模式 ===");// 1. 带超时的接收let (tx, rx) = mpsc::channel();thread::spawn(move || {thread::sleep(Duration::from_millis(500));tx.send("延迟的消息".to_string()).unwrap();});match rx.recv_timeout(Duration::from_millis(100)) {Ok(msg) => println!("及时收到: {}", msg),Err(mpsc::RecvTimeoutError::Timeout) => println!("接收超时"),Err(mpsc::RecvTimeoutError::Disconnected) => println!("通道已断开"),}// 2. 非阻塞接收let (tx, rx) = mpsc::channel();match rx.try_recv() {Ok(msg) => println!("立即收到: {}", msg),Err(mpsc::TryRecvError::Empty) => println!("没有可用消息"),Err(mpsc::TryRecvError::Disconnected) => println!("通道已断开"),}tx.send("测试消息".to_string()).unwrap();// 3. 选择操作(使用crossbeam-channel,功能更强大)#[cfg(feature = "crossbeam")]{use crossbeam_channel::{select, unbounded};let (tx1, rx1) = unbounded();let (tx2, rx2) = unbounded();thread::spawn(move || {thread::sleep(Duration::from_millis(100));tx1.send("第一个通道").unwrap();});thread::spawn(move || {thread::sleep(Duration::from_millis(200));tx2.send("第二个通道").unwrap();});select! {recv(rx1) -> msg => println!("从 {} 收到消息", msg.unwrap()),recv(rx2) -> msg => println!("从 {} 收到消息", msg.unwrap()),}}
}
15.3 共享状态并发
Mutex互斥锁
Mutex(互斥锁)是共享状态并发的基础,它保证一次只有一个线程可以访问数据。
基本Mutex使用
use std::sync::{Mutex, Arc};fn mutex_basics() {println!("=== Mutex基础 ===");let counter = Arc::new(Mutex::new(0));let mut handles = vec![];for _ in 0..10 {let counter = Arc::clone(&counter);let handle = thread::spawn(move || {let mut num = counter.lock().unwrap();*num += 1;});handles.push(handle);}for handle in handles {handle.join().unwrap();}println!("最终计数: {}", *counter.lock().unwrap());
}
高级Mutex模式
fn advanced_mutex_patterns() {println!("=== 高级Mutex模式 ===");// 1. 带复杂数据的Mutexstruct SharedData {values: Vec<i32>,metadata: String,}let shared_data = Arc::new(Mutex::new(SharedData {values: Vec::new(),metadata: "初始状态".to_string(),}));let mut handles = vec![];for i in 0..5 {let data = Arc::clone(&shared_data);let handle = thread::spawn(move || {let mut guard = data.lock().unwrap();guard.values.push(i);guard.metadata = format!("由线程 {} 更新", i);println!("线程 {} 更新了数据", i);});handles.push(handle);}for handle in handles {handle.join().unwrap();}let final_data = shared_data.lock().unwrap();println!("最终数据: {:?}", final_data.values);println!("元数据: {}", final_data.metadata);// 2. 尝试锁let data = Arc::new(Mutex::new(0));let data_clone = Arc::clone(&data);// 在一个线程中持有锁let holder_handle = thread::spawn(move || {let _guard = data_clone.lock().unwrap();println!("持有锁的线程开始睡眠");thread::sleep(Duration::from_secs(2));println!("持有锁的线程释放锁");});thread::sleep(Duration::from_millis(100));// 尝试获取锁match data.try_lock() {Ok(mut guard) => {println!("成功获取锁");*guard += 1;}Err(_) => {println("无法获取锁,锁正被其他线程持有");}}holder_handle.join().unwrap();
}
RwLock读写锁
RwLock(读写锁)允许多个读取者或一个写入者同时访问数据,适用于读多写少的场景。
use std::sync::RwLock;fn rwlock_example() {println!("=== RwLock示例 ===");let data = Arc::new(RwLock::new(String::from("初始值")));let mut handles = vec![];// 创建多个读取者for reader_id in 0..3 {let data = Arc::clone(&data);let handle = thread::spawn(move || {for _ in 0..3 {let value = data.read().unwrap();println!("读取者 {}: {}", reader_id, *value);thread::sleep(Duration::from_millis(100));}});handles.push(handle);}// 创建一个写入者let writer_data = Arc::clone(&data);let writer_handle = thread::spawn(move || {for i in 0..2 {thread::sleep(Duration::from_millis(200));let mut value = writer_data.write().unwrap();*value = format!("被写入者修改的值 {}", i);println!("写入者: 更新值为 {}", *value);}});handles.push(writer_handle);for handle in handles {handle.join().unwrap();}println!("最终值: {}", *data.read().unwrap());
}
条件变量
条件变量允许线程在等待某个条件成立时挂起,并在条件可能成立时被唤醒。
use std::sync::{Condvar, Mutex};fn condition_variable_example() {println!("=== 条件变量示例 ===");let pair = Arc::new((Mutex::new(false), Condvar::new()));let pair2 = Arc::clone(&pair);// 等待条件的线程let waiter_handle = thread::spawn(move || {let (lock, cvar) = &*pair2;let mut started = lock.lock().unwrap();println!("等待线程: 等待条件成立");while !*started {started = cvar.wait(started).unwrap();}println!("等待线程: 条件成立,继续执行");});// 设置条件的线程thread::spawn(move || {let (lock, cvar) = &*pair;println!("设置线程: 工作2秒...");thread::sleep(Duration::from_secs(2));println!("设置线程: 设置条件并通知");{let mut started = lock.lock().unwrap();*started = true;}cvar.notify_one();});waiter_handle.join().unwrap();println!("所有线程完成");
}
原子类型
原子类型提供了无需互斥锁的线程安全操作,性能更高但功能有限。
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;fn atomic_operations() {println!("=== 原子操作 ===");let counter = Arc::new(AtomicUsize::new(0));let mut handles = vec![];// 创建多个增加计数的线程for _ in 0..10 {let counter = Arc::clone(&counter);let handle = thread::spawn(move || {for _ in 0..1000 {counter.fetch_add(1, Ordering::SeqCst);}});handles.push(handle);}for handle in handles {handle.join().unwrap();}println!("最终计数: {}", counter.load(Ordering::SeqCst));// 比较并交换操作let value = Arc::new(AtomicUsize::new(0));let mut swap_handles = vec![];for i in 0..5 {let value = Arc::clone(&value);let handle = thread::spawn(move || {let mut current = value.load(Ordering::SeqCst);loop {let new = current + 1;match value.compare_exchange_weak(current, new, Ordering::SeqCst, Ordering::SeqCst) {Ok(_) => {println!("线程 {} 成功将值从 {} 更新到 {}", i, current, new);break;}Err(actual) => {println!("线程 {} 更新失败,当前值为 {}", i, actual);current = actual;}}}});swap_handles.push(handle);}for handle in swap_handles {handle.join().unwrap();}println!("最终值: {}", value.load(Ordering::SeqCst));
}
15.4 Sync和Send trait
Send Trait
Send marker trait表示类型的所有权可以在线程间安全传递。
fn send_trait_demo() {println!("=== Send Trait 演示 ===");// 大多数类型都是Send的let value = 42;let handle = thread::spawn(move || {println!("在线程中使用值: {}", value);});handle.join().unwrap();// 一些类型不是Send的,比如Rc<T>let rc_value = std::rc::Rc::new(42);// 下面的代码会编译错误,因为Rc<T>不是Send的// let handle = thread::spawn(move || {// println!("Rc value: {}", rc_value);// });println!("Rc<T> 不是 Send,因此不能跨线程传递");// 但Arc<T>是Send的let arc_value = std::sync::Arc::new(42);let handle = thread::spawn(move || {println!("Arc value: {}", arc_value);});handle.join().unwrap();println!("Arc<T> 是 Send,可以跨线程传递");
}
Sync Trait
Sync marker trait表示类型的引用可以安全地在多个线程间共享。
fn sync_trait_demo() {println!("=== Sync Trait 演示 ===");// 不可变引用是Sync的let data = vec![1, 2, 3];let data_ref = &data;let handle = thread::spawn(move || {println!("在线程中访问数据长度: {}", data_ref.len());});handle.join().unwrap();// Mutex<T>是Sync的,即使T不是let mutex_data = Arc::new(Mutex::new(vec![1, 2, 3]));let mut handles = vec![];for i in 0..3 {let data = Arc::clone(&mutex_data);let handle = thread::spawn(move || {let mut guard = data.lock().unwrap();guard.push(i);println!("线程 {} 添加了值 {}", i, i);});handles.push(handle);}for handle in handles {handle.join().unwrap();}println!("最终数据: {:?}", mutex_data.lock().unwrap());// Cell<T>不是Sync的let cell_data = std::cell::Cell::new(42);// 下面的代码会编译错误,因为Cell<T>不是Sync的// let handle = thread::spawn(move || {// cell_data.set(100);// });println!("Cell<T> 不是 Sync,不能跨线程共享引用");
}
自定义Send和Sync类型
我们可以为自己的类型手动实现Send和Sync trait,但需要确保线程安全性。
// 自定义线程安全类型
struct ThreadSafeCounter {count: AtomicUsize,name: String,
}impl ThreadSafeCounter {fn new(name: &str) -> Self {ThreadSafeCounter {count: AtomicUsize::new(0),name: name.to_string(),}}fn increment(&self) -> usize {self.count.fetch_add(1, Ordering::SeqCst)}fn get(&self) -> usize {self.count.load(Ordering::SeqCst)}
}// 因为只包含Send和Sync的字段,所以自动实现Send和Sync
// 我们可以显式声明,但编译器会自动推导
// unsafe impl Send for ThreadSafeCounter {}
// unsafe impl Sync for ThreadSafeCounter {}fn custom_send_sync_demo() {println!("=== 自定义 Send + Sync 类型 ===");let counter = Arc::new(ThreadSafeCounter::new("global_counter"));let mut handles = vec![];for i in 0..5 {let counter = Arc::clone(&counter);let handle = thread::spawn(move || {for _ in 0..100 {let old = counter.increment();println!("线程 {}: 计数从 {} 增加到 {}", i, old, old + 1);}});handles.push(handle);}for handle in handles {handle.join().unwrap();}println!("最终计数: {}", counter.get());
}
实战:构建并发Web服务器
让我们构建一个简单的并发Web服务器来展示这些概念的组合使用。
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
use std::collections::HashMap;struct ThreadPool {workers: Vec<Worker>,sender: mpsc::Sender<Message>,
}type Job = Box<dyn FnOnce() + Send + 'static>;enum Message {NewJob(Job),Terminate,
}impl ThreadPool {fn new(size: usize) -> ThreadPool {assert!(size > 0);let (sender, receiver) = mpsc::channel();let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, Arc::clone(&receiver)));}ThreadPool { workers, sender }}fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static,{let job = Box::new(f);self.sender.send(Message::NewJob(job)).unwrap();}
}impl Drop for ThreadPool {fn drop(&mut self) {println!("正在停止所有工作线程...");for _ in &self.workers {self.sender.send(Message::Terminate).unwrap();}for worker in &mut self.workers {println!("关闭工作线程 {}", worker.id);if let Some(thread) = worker.thread.take() {thread.join().unwrap();}}}
}struct Worker {id: usize,thread: Option<thread::JoinHandle<()>>,
}impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {let thread = thread::spawn(move || loop {let message = receiver.lock().unwrap().recv().unwrap();match message {Message::NewJob(job) => {println!("工作线程 {} 获得任务,执行中。", id);job();}Message::Terminate => {println!("工作线程 {} 收到终止信号。", id);break;}}});Worker {id,thread: Some(thread),}}
}fn handle_connection(mut stream: TcpStream, request_count: Arc<AtomicUsize>) {let mut buffer = [0; 1024];stream.read(&mut buffer).unwrap();let request_count = request_count.fetch_add(1, Ordering::SeqCst) + 1;let get = b"GET / HTTP/1.1\r\n";let sleep = b"GET /sleep HTTP/1.1\r\n";let (status_line, contents) = if buffer.starts_with(get) {("HTTP/1.1 200 OK", "<h1>Hello!</h1><p>这是快速响应</p>")} else if buffer.starts_with(sleep) {thread::sleep(Duration::from_secs(5));("HTTP/1.1 200 OK", "<h1>Hello!</h1><p>这是慢速响应</p>")} else {("HTTP/1.1 404 NOT FOUND", "<h1>Oops!</h1><p>页面未找到</p>")};let response = format!("{}\r\nContent-Length: {}\r\n\r\n{}",status_line,contents.len(),contents);stream.write(response.as_bytes()).unwrap();stream.flush().unwrap();println!("请求 #{} 已处理", request_count);
}fn concurrent_web_server() {println!("=== 并发Web服务器演示 ===");let listener = TcpListener::bind("127.0.0.1:8080").unwrap();let pool = ThreadPool::new(4);let request_count = Arc::new(AtomicUsize::new(0));println!("服务器运行在 http://127.0.0.1:8080");println!("可用端点:");println!(" GET / - 快速响应");println!(" GET /sleep - 5秒延迟响应");println!(" 其他路径 - 404响应");for stream in listener.incoming().take(10) { // 只处理10个连接用于演示let stream = stream.unwrap();let request_count = Arc::clone(&request_count);pool.execute(move || {handle_connection(stream, request_count);});}println!("已处理10个请求,服务器关闭");
}fn main() {thread_basics();thread_ownership();thread_management();thread_error_handling();channel_basics();multiple_producers();sync_channels();advanced_channel_patterns();mutex_basics();advanced_mutex_patterns();rwlock_example();condition_variable_example();atomic_operations();send_trait_demo();sync_trait_demo();custom_send_sync_demo();concurrent_web_server();
}
并发编程最佳实践
性能优化建议
fn concurrency_best_practices() {println!("=== 并发编程最佳实践 ===");println!("\n1. 选择合适的并发模型:");println!(" - 消息传递: 适用于任务间通信");println!(" - 共享状态: 适用于需要共享数据的场景");println!(" - 无锁数据结构: 适用于高性能场景");println!("\n2. 避免锁的滥用:");println!(" - 尽量使用更细粒度的锁");println!(" - 考虑使用RwLock替代Mutex");println!(" - 在可能的情况下使用原子操作");println!("\n3. 处理错误和恐慌:");println!(" - 总是处理线程的JoinHandle");println!(" - 考虑使用catch_unwind处理恐慌");println!(" - 确保资源在恐慌时也能正确清理");println!("\n4. 性能考虑:");println!(" - 避免不必要的线程创建");println!(" - 使用线程池管理线程生命周期");println!(" - 注意缓存一致性和false sharing");println!("\n5. 测试和调试:");println!(" - 使用工具检测数据竞争");println!(" - 编写并发测试用例");println!(" - 使用日志和指标监控并发行为");
}// 数据竞争检测示例
#[cfg(test)]
mod concurrency_tests {use super::*;use std::sync::atomic::{AtomicBool, Ordering};#[test]fn test_thread_safety() {let data = Arc::new(AtomicBool::new(false));let data_clone = Arc::clone(&data);let handle = thread::spawn(move || {data_clone.store(true, Ordering::SeqCst);});handle.join().unwrap();assert!(data.load(Ordering::SeqCst));}#[test]fn test_message_passing() {let (tx, rx) = mpsc::channel();thread::spawn(move || {tx.send(42).unwrap();});assert_eq!(rx.recv().unwrap(), 42);}
}
总结
并发编程是Rust的强项之一,其所有权系统和类型系统在编译时就能防止数据竞争和其他常见的并发错误。通过本章的学习,我们掌握了:
- 线程管理:创建、配置和管理线程,理解线程与所有权的关系
- 消息传递:使用通道进行线程间通信,包括多生产者和同步通道
- 共享状态:使用Mutex、RwLock、条件变量和原子类型安全地共享数据
- Sync和Send trait:理解这些trait如何保证线程安全,以及如何创建自定义的线程安全类型
Rust的并发编程模型既强大又安全,能够在编译期捕获许多其他语言在运行时才会发现的并发错误。通过合理运用这些工具和模式,我们可以构建出既高效又可靠的并发应用程序。
在下一章中,我们将探讨Rust的面向对象编程特性,学习如何用Rust实现面向对象的设计模式,以及如何在Rust的哲学和面向对象编程之间找到平衡。
