当前位置: 首页 > news >正文

异步编程深度解析

Rust 异步编程深度解析

概述

Rust 的异步编程模型基于 Future trait 和 async/await 语法。它允许编写高效的并发代码,特别适合 I/O 密集型应用。

异步基础

async/await 是 Rust 异步编程的核心语法。async 函数返回一个 Future,await 用于等待 Future 完成。

基础示例

use std::time::Duration;async fn say_hello() {println!("Hello from async!");
}async fn delayed_greeting(name: String, delay_ms: u64) {tokio::time::sleep(Duration::from_millis(delay_ms)).await;println!("Hello, {}!", name);
}

复杂案例:实现异步任务管理系统

use tokio::time::{sleep, Duration, Instant};
use tokio::sync::{mpsc, RwLock, Semaphore};
use std::sync::Arc;
use std::collections::HashMap;// 任务状态
#[derive(Debug, Clone, PartialEq)]
enum TaskStatus {Pending,Running,Completed,Failed(String),
}// 任务定义
#[derive(Clone)]
struct Task {id: u64,name: String,priority: u8,estimated_duration: Duration,
}impl Task {fn new(id: u64, name: &str, priority: u8, estimated_duration: Duration) -> Self {Task {id,name: name.to_string(),priority,estimated_duration,}}
}// 任务结果
#[derive(Debug)]
struct TaskResult {task_id: u64,status: TaskStatus,duration: Duration,output: String,
}// 异步任务执行器
struct AsyncTaskExecutor {tasks: Arc<RwLock<HashMap<u64, TaskStatus>>>,max_concurrent: Arc<Semaphore>,result_sender: mpsc::Sender<TaskResult>,
}impl AsyncTaskExecutor {fn new(max_concurrent: usize) -> (Self, mpsc::Receiver<TaskResult>) {let (tx, rx) = mpsc::channel(100);let executor = AsyncTaskExecutor {tasks: Arc::new(RwLock::new(HashMap::new())),max_concurrent: Arc::new(Semaphore::new(max_concurrent)),result_sender: tx,};(executor, rx)}async fn submit(&self, task: Task) {// 记录任务状态{let mut tasks = self.tasks.write().await;tasks.insert(task.id, TaskStatus::Pending);}// 克隆需要的数据let tasks = Arc::clone(&self.tasks);let semaphore = Arc::clone(&self.max_concurrent);let sender = self.result_sender.clone();// 在新任务中执行tokio::spawn(async move {// 获取信号量许可let _permit = semaphore.acquire().await.unwrap();// 更新状态为运行中{let mut task_map = tasks.write().await;task_map.insert(task.id, TaskStatus::Running);}let start = Instant::now();// 模拟任务执行let result = Self::execute_task(&task).await;let duration = start.elapsed();// 更新状态let status = match &result {Ok(output) => {let mut task_map = tasks.write().await;task_map.insert(task.id, TaskStatus::Completed);TaskStatus::Completed}Err(e) => {let mut task_map = tasks.write().await;let failed_status = TaskStatus::Failed(e.clone());task_map.insert(task.id, failed_status.clone());failed_status}};// 发送结果let task_result = TaskResult {task_id: task.id,status,duration,output: result.unwrap_or_else(|e| e),};let _ = sender.send(task_result).await;});}async fn execute_task(task: &Task) -> Result<String, String> {println!("开始执行任务 {}: {}", task.id, task.name);// 模拟异步工作sleep(task.estimated_duration).await;// 模拟一些任务可能失败if task.priority < 3 && task.id % 7 == 0 {return Err(format!("任务 {} 执行失败", task.id));}Ok(format!("任务 {} 完成: {}", task.id, task.name))}async fn get_status(&self, task_id: u64) -> Option<TaskStatus> {let tasks = self.tasks.read().await;tasks.get(&task_id).cloned()}async fn get_all_statuses(&self) -> HashMap<u64, TaskStatus> {let tasks = self.tasks.read().await;tasks.clone()}
}// 任务监控器
struct TaskMonitor {completed_count: Arc<RwLock<usize>>,failed_count: Arc<RwLock<usize>>,total_duration: Arc<RwLock<Duration>>,
}impl TaskMonitor {fn new() -> Self {TaskMonitor {completed_count: Arc::new(RwLock::new(0)),failed_count: Arc::new(RwLock::new(0)),total_duration: Arc::new(RwLock::new(Duration::ZERO)),}}async fn record_result(&self, result: &TaskResult) {match &result.status {TaskStatus::Completed => {let mut count = self.completed_count.write().await;*count += 1;}TaskStatus::Failed(_) => {let mut count = self.failed_count.write().await;*count += 1;}_ => {}}let mut duration = self.total_duration.write().await;*duration += result.duration;}async fn print_stats(&self) {let completed = *self.completed_count.read().await;let failed = *self.failed_count.read().await;let total_duration = *self.total_duration.read().await;println!("\n=== 任务统计 ===");println!("完成: {}", completed);println!("失败: {}", failed);println!("总耗时: {:?}", total_duration);if completed + failed > 0 {let avg = total_duration / (completed + failed) as u32;println!("平均耗时: {:?}", avg);}}
}// 演示异步任务执行
async fn demonstrate_async_executor() {let (executor, mut result_receiver) = AsyncTaskExecutor::new(3);let monitor = TaskMonitor::new();// 提交多个任务let task_count = 15;for i in 1..=task_count {let task = Task::new(i,&format!("任务-{}", i),(i % 10) as u8,Duration::from_millis(100 * (i % 5 + 1)),);executor.submit(task).await;}// 接收并处理结果let monitor_clone = monitor.clone();let result_handler = tokio::spawn(async move {let mut received = 0;while received < task_count {if let Some(result) = result_receiver.recv().await {println!("收到结果: {:?}", result);monitor_clone.record_result(&result).await;received += 1;}}});// 等待所有任务完成result_handler.await.unwrap();// 打印统计信息monitor.print_stats().await;
}// 并发HTTP请求模拟
async fn fetch_url(url: &str, delay_ms: u64) -> Result<String, String> {println!("开始请求: {}", url);sleep(Duration::from_millis(delay_ms)).await;if delay_ms > 300 {Err(format!("请求超时: {}", url))} else {Ok(format!("响应来自: {}", url))}
}async fn demonstrate_concurrent_requests() {let urls = vec![("https://example.com/api/1", 100),("https://example.com/api/2", 150),("https://example.com/api/3", 200),("https://example.com/api/4", 350),("https://example.com/api/5", 120),];let mut tasks = vec![];for (url, delay) in urls {let task = tokio::spawn(fetch_url(url, delay));tasks.push(task);}// 等待所有请求完成let results = futures::future::join_all(tasks).await;println!("\n=== 请求结果 ===");for result in results {match result {Ok(Ok(response)) => println!("成功: {}", response),Ok(Err(e)) => println!("错误: {}", e),Err(e) => println!("任务错误: {}", e),}}
}// 实现异步流处理
use futures::stream::{self, StreamExt};async fn demonstrate_stream_processing() {let stream = stream::iter(1..=10).map(|x| async move {sleep(Duration::from_millis(50)).await;x * 2}).buffer_unordered(3); // 最多3个并发let results: Vec<i32> = stream.collect().await;println!("\n=== 流处理结果 ===");println!("结果: {:?}", results);
}// 超时控制
async fn task_with_timeout(duration: Duration) -> Result<String, &'static str> {sleep(duration).await;Ok("任务完成".to_string())
}async fn demonstrate_timeout() {println!("\n=== 超时控制 ===");// 任务1:在超时前完成match tokio::time::timeout(Duration::from_secs(1),task_with_timeout(Duration::from_millis(500))).await {Ok(Ok(result)) => println!("任务1完成: {}", result),Ok(Err(_)) => println!("任务1失败"),Err(_) => println!("任务1超时"),}// 任务2:超时match tokio::time::timeout(Duration::from_millis(500),task_with_timeout(Duration::from_secs(2))).await {Ok(Ok(result)) => println!("任务2完成: {}", result),Ok(Err(_)) => println!("任务2失败"),Err(_) => println!("任务2超时"),}
}// select! 宏使用示例
async fn demonstrate_select() {use tokio::time::sleep;println!("\n=== Select 示例 ===");let task1 = async {sleep(Duration::from_millis(100)).await;"任务1"};let task2 = async {sleep(Duration::from_millis(200)).await;"任务2"};tokio::select! {result = task1 => println!("首先完成: {}", result),result = task2 => println!("首先完成: {}", result),}
}// 异步锁示例
async fn demonstrate_async_locks() {let data = Arc::new(RwLock::new(vec![1, 2, 3]));let mut handles = vec![];for i in 0..5 {let data_clone = Arc::clone(&data);let handle = tokio::spawn(async move {{let mut d = data_clone.write().await;d.push(i);println!("线程 {} 写入完成", i);}sleep(Duration::from_millis(10)).await;{let d = data_clone.read().await;println!("线程 {} 读取: {:?}", i, *d);}});handles.push(handle);}for handle in handles {handle.await.unwrap();}let final_data = data.read().await;println!("最终数据: {:?}", *final_data);
}impl Clone for TaskMonitor {fn clone(&self) -> Self {TaskMonitor {completed_count: Arc::clone(&self.completed_count),failed_count: Arc::clone(&self.failed_count),total_duration: Arc::clone(&self.total_duration),}}
}#[tokio::main]
async fn main() {println!("=== 异步任务执行器 ===");demonstrate_async_executor().await;println!("\n=== 并发HTTP请求 ===");demonstrate_concurrent_requests().await;demonstrate_stream_processing().await;demonstrate_timeout().await;demonstrate_select().await;println!("\n=== 异步锁 ===");demonstrate_async_locks().await;
}

异步运行时选择

常用的异步运行时:

  • tokio: 功能完整,适合生产环境
  • async-std: 类似标准库的API
  • smol: 轻量级运行时

总结

Rust 的异步编程提供了高效的并发处理能力。通过 Future、async/await 和各种异步原语,可以编写高性能的异步应用。

http://www.dtcms.com/a/536232.html

相关文章:

  • Redis GEO 地理位置搜索:实战示例 + 底层原理解析
  • Java的中间件
  • 邢台学校网站建设价格百度企业服务平台
  • 建网站要注意的细节物流网站首页图片
  • 牙根尖挺使用失误的常见原因分析及规避方法
  • 麒光AI-OCT大模型:心血管诊疗的智能革命与未来展望
  • AI 驱动的 ITSM:数字化转型时代的 IT 服务新范式
  • EasyGBS视频实时监控系助力实现换热站全景可视化管理
  • HarmonyOS安全加密与TEE开发实战
  • 门户网站建设 简报网络培训视频如何加速
  • uniapp引入uniim后聊天列表无法加载出来
  • AWS Auto Scaling:自动扩容,让服务器像呼吸一样灵活
  • 实战|AWS Snowcone边缘计算落地工业场景:从技术原理到代码实现
  • uni-app facebook登录
  • 【设计模式笔记07】:迪米特法则
  • SIP协议详解:从请求到挂断的全生命周期
  • 药材网网站技术建设手机网站微信链接怎么做
  • 【Linux】 第一个系统程序——进度条
  • 旅游seo整站优化宁波做网站有哪些公司公司
  • CircleCI 让持续集成变得简单而高效的开源框架
  • Ribbon是如何与服务注册中心nacos交互的
  • 自然语言处理框架:Bert和Transformer
  • (N_157)基于springboot,vue服装商城系统
  • 介绍一下Ribbon
  • 潍坊有哪些网站旅行社网站规划与建设的流程图
  • (项目管理系列课程)项目规划阶段:项目进度管理-估算活动持续时间
  • STM32定时器的输入捕获模式(测量PWM的周期与占空比)
  • 智慧校园数字孪生选型指南:选对平台做好交付,从平台适配到交付落地的全流程解决方案
  • php招生网站开发标准网站建设价格
  • 【NestJS】在 nest.js 项目中,如何使用 Postgresql 来做缓存?