当前位置: 首页 > news >正文

rust之Tokio学习1

任务

一个 Tokio 任务是一个异步的绿色线程,它们通过 tokio::spawn 进行创建,该函数会返回一个 JoinHandle 类型的句柄
调用者可以使用该句柄跟创建的任务进行交互

示例

spawn 函数的参数是一个 async 语句块,该语句块甚至可以返回一个值,然后调用者可以通过 JoinHandle 句柄获取该值

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
       10086
    });

	// .await 会返回一个 Result
	// 若 spawn 创建的任务正常运行结束,则返回一个 Ok(T)的值,否则会返回一个错误 Err
	// 例如任务内部发生了 panic 或任务因为运行时关闭被强制取消时
    let out = handle.await.unwrap();
    // 打印 10086
    println!("GOT {}", out);
}

任务是调度器管理的执行单元。spawn生成的任务会首先提交给调度器,然后由它负责调度执行

注意:
执行任务的线程未必是创建任务的线程,任务完全有可能运行在另一个不同的线程上,而且任务在生成后,它还可能会在线程间被移动
任务在 Tokio 中远比看上去要更轻量,例如创建一个任务仅仅需要一次 64 字节大小的内存分配
当使用 Tokio 创建一个任务时,该任务类型的生命周期必须是 'static。意味着,在任务中不能使用外部数据的引用

错误用法

async block may outlive the current function, but it borrows v, which is owned by the current function

use tokio::task;

#[tokio::main]
async fn main() {
    let v = vec![1, 2, 3];

    task::spawn(async {
    	// spawn 出的任务引用了外部环境中的变量 v
        println!("Here's a vec: {:?}", v);
    });
}

默认情况下,变量并不是通过 move 的方式转移进 async 语句块, v 变量的所有权依然属于 main 函数
因为任务内部的 println! 是通过借用的方式使用了 v,但是这种借用并不能满足 'static 生命周期的要求

在报错的同时,Rust 编译器还给出了相当有帮助的提示:
为 async 语句块使用 move 关键字,这样就能将 v 的所有权从 main 函数转移到新创建的任务中

但 move 有一个问题,一个数据只能被一个任务使用。这时,Arc 起作用了,它还是线程安全的

共享状态

在 main 函数中对 HashMap 进行初始化,然后使用 Arc 克隆一份它的所有权并将其传入到生成的异步任务中
事实上在 Tokio 中,这里的 Arc 被称为 handle,或者更宽泛的说,handle 在 Tokio 中可以用来访问某个共享状态

use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    println!("Listening");

    let db = Arc::new(Mutex::new(HashMap::new()));

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // 将 handle 克隆一份
        let db = db.clone();

        println!("Accepted");
        tokio::spawn(async move {
            process(socket, db).await;
        });
    }
}

为何使用了 std::sync::Mutex 来保护 HashMap,而不是使用 tokio::sync::Mutex?

在使用 Tokio 编写异步代码时,一个常见的错误无条件地使用 tokio::sync::Mutex
而真相是 Tokio 提供的异步锁只应该在跨多个 .await 调用时使用
而且 Tokio 的 Mutex 实际上内部使用的也是 std::sync::Mutex

锁如果在多个 .await 过程中持有,应该使用 Tokio 提供的锁,原因是 .await的过程中锁可能在线程间转移
若使用标准库的同步锁存在死锁的可能性
例如某个任务刚获取完锁,还没使用完就因为 .await 让出了当前线程的所有权,结果下个任务又去获取了锁,造成死锁
锁竞争不多的情况下,使用 std::sync::Mutex
锁竞争多,可以考虑使用三方库提供的性能更高的锁,例如 parking_lot::Mutex

Send 约束

tokio::spawn 生成的任务必须实现 Send 特征,因为当这些任务在 .await 执行过程中发生阻塞时,Tokio 调度器会将任务在线程间移动

一个任务要实现 Send 特征,那它在 .await 调用的过程中所持有的全部数据都必须实现 Send 特征。当 .await 调用发生阻塞时,任务会让出当前线程所有权给调度器,然后当任务准备好后,调度器会从上一次暂停的位置继续执行该任务。该流程能正确的工作,任务必须将.await之后使用的所有状态保存起来,这样才能在中断后恢复现场并继续执行。若这些状态实现了 Send 特征(可以在线程间安全地移动),那任务自然也就可以在线程间安全地移动。

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        // 语句块的使用强制了 `rc` 会在 `.await` 被调用前就被释放,
        // 因此 `rc` 并不会影响 `.await`的安全性
        {
            let rc = Rc::new("hello");
            println!("{}", rc);
        }

        // `rc` 的作用范围已经失效,因此当任务让出所有权给当前线程时,它无需作为状态被保存起来
        yield_now().await;
    });
}

但是下面代码就不行:

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        let rc = Rc::new("hello");


        // `rc` 在 `.await` 后还被继续使用,因此它必须被作为任务的状态保存起来
        yield_now().await;


        // 事实上,注释掉下面一行代码,依然会报错
        // 原因是:是否保存,不取决于 `rc` 是否被使用,而是取决于 `.await`在调用时是否仍然处于 `rc` 的作用域中
        println!("{}", rc);

        // rc 作用域在这里结束
    });
}

报错如下:

error: future cannot be sent between threads safely
   --> src/main.rs:6:5
    |
6   | /     tokio::spawn(async {
7   | |         let rc = Rc::new("hello");
...   |
19  | |     });
    | |______^ future created by async block is not `Send`
    |
    = help: within `{async block@src/main.rs:6:18: 6:23}`, the trait `Send` is not implemented for `Rc<&str>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:11:21
    |
7   |         let rc = Rc::new("hello");
    |             -- has type `Rc<&str>` which is not `Send`
...
11  |         yield_now().await;
    |                     ^^^^^ await occurs here, with `rc` maybe used later
note: required by a bound in `tokio::spawn`
   --> /Users/bytedance/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.43.0/src/task/spawn.rs:168:21
    |
166 |     pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
    |            ----- required by a bound in this function
167 |     where
168 |         F: Future + Send + 'static,
    |                     ^^^^ required by this bound in `spawn`

rc 是否会保存到任务状态中,取决于 .await 的调用是否处于它的作用域中
上面代码中,就算注释掉 println! 函数,该报错依然会报错,因为 rc 的作用域直到 async 的末尾才结束!

锁竞争

当竞争不多时,使用阻塞性的锁去保护共享数据是一个正确的选择
当一个锁竞争触发后,当前正在执行任务(请求锁)的线程会被阻塞,并等待锁被前一个使用者释放
锁竞争不仅仅会导致当前的任务被阻塞,还会导致执行任务的线程被阻塞,因此该线程准备执行的其它任务也会因此被阻塞

默认情况下,Tokio 调度器使用了多线程模式,此时如果有大量的任务都需要访问同一个锁,那么锁竞争将变得激烈起来
当同步锁的竞争变成一个问题时,使用 Tokio 提供的异步锁几乎并不能解决问题,可以考虑如下选项:
(1)创建专门的任务并使用消息传递的方式来管理状态
(2)将锁进行分片
(3)重构代码以避免锁

由于每一个 key 都是独立的,因此对锁进行分片将成为一个不错的选择:

type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;

fn new_sharded_db(num_shards: usize) -> ShardedDb {
    let mut db = Vec::with_capacity(num_shards);
    for _ in 0..num_shards {
        db.push(Mutex::new(HashMap::new()));
    }
    Arc::new(db)
}

在这里,创建了 N 个不同的存储实例,每个实例都会存储不同的分片数据
例如有a-i共 9 个不同的 key, 可以将存储分成 3 个实例:第一个实例可以存储 a-c,第二个d-f,以此类推
在这种情况下,访问 b 时,只需要锁住第一个实例,此时二、三实例依然可以正常访问,因此锁被成功的分片了

在分片后,使用给定的 key 找到对应的值就变成了两个步骤:
(1)使用 key 通过特定的算法寻找到对应的分片
(2)使用该 key 从分片中查询到值

let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);

这里使用 hash 算法来进行分片,但是该算法有个缺陷:
分片的数量不能变,一旦变了后,那之前落入分片 1 的key很可能将落入到其它分片中,最终全部乱掉
此时可以考虑 dashmap,它提供了更复杂、更精妙的支持分片的 hash map

.await期间持有锁

异常代码

use std::sync::{Mutex, MutexGuard};

async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
	// MutexGuard 是 Mutex 加锁后返回的一个守卫类型
	// 当调用 Mutex 的 lock 方法成功获取锁时,会返回一个 MutexGuard<T> 实例
	// 只要这个 MutexGuard 实例存在,锁就会一直被持有,其他线程无法获取该锁
	// 当 MutexGuard 实例离开作用域时,会自动释放锁
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;

    do_something_async().await;
} // 锁在这里超出作用域
4   |     let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    |         -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`

错误的原因在于 std::sync::MutexGuard 类型并没有实现 Send 特征,不能将一个 Mutex 锁发送到另一个线程,因为 .await 可能会让任务转移到另一个线程上执行

提前释放锁
让 Mutex 锁在 .await 被调用前就被释放掉

async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    {
        let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
        *lock += 1;
    } // lock在这里超出作用域 (被释放)

    do_something_async().await;
}

下面的代码不工作

use std::sync::{Mutex, MutexGuard};

async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;
    drop(lock);

    do_something_async().await;
}

编译器在这里不够聪明,目前它只能根据作用域的范围来判断,drop 虽然释放了锁,但是锁的作用域依然会持续到函数的结束,未来也许编译器会改进,但是现在不行。

既然锁没有实现 Send, 那主动给它实现如何?
不可以,如果一个任务获取了锁,然后还没释放就在 .await 期间被挂起,接着开始执行另一个任务,这个任务又去获取锁,就会导致死锁

其它解决方法:
重构代码:在 .await 期间不持有锁
可以把 Mutex 放入一个结构体中,并且只在该结构体的非异步方法中使用该锁:

use std::sync::Mutex;

struct CanIncrement {
    mutex: Mutex<i32>,
}
impl CanIncrement {
    // 该方法不是 `async`
    fn increment(&self) {
        let mut lock = self.mutex.lock().unwrap();
        *lock += 1;
    }
}

async fn increment_and_do_stuff(can_incr: &CanIncrement) {
    can_incr.increment();
    do_something_async().await;
}

使用异步任务和通过消息传递来管理状态
该方法常常用于共享的资源是 I/O 类型的资源时

使用 Tokio 提供的异步锁
Tokio 提供的锁最大的优点就是:它可以在 .await 执行期间被持有,而且不会有任何问题
代价就是,这种异步锁的性能开销会更高,因此如果可以,使用之前的两种方法来解决会更好。

use tokio::sync::Mutex; // 注意,这里使用的是 Tokio 提供的锁

// 下面的代码会编译
// 但是就这个例子而言,之前的方式会更好
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock = mutex.lock().await;
    *lock += 1;

    do_something_async().await;
} // 锁在这里被释放

消息传递

错误示例

use mini_redis::client;

#[tokio::main]
async fn main() {
    // 创建到服务器的连接
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();

    // 生成两个任务,一个用于获取 key, 一个用于设置 key
    let t1 = tokio::spawn(async {
        let res = client.get("hello").await;
    });

    let t2 = tokio::spawn(async {
        client.set("foo", "bar".into()).await;
    });

    t1.await.unwrap();
    t2.await.unwrap();
}

两个任务都需要去访问 client,但是 client 并没有实现 Copy 特征,再加上并没有实现相应的共享代码,因此自然会报错
方法 set 和 get 都使用了 client 的可变引用 &mut self,由此还会造成同时借用两个可变引用的错误

std::sync::Mutex 无法被使用,同步锁无法跨越 .await 调用时使用
是不是可以使用 tokio::sync:Mutex ,答案是可以用,但是同时就只能运行一个请求
若客户端实现了 redis 的 pipelining, 那这个异步锁就会导致连接利用率不足
因此,消息登场了

tokio的消息通道

Tokio 提供了多种消息通道,可以满足不同场景的需求:
mpsc, 多生产者,单消费者模式
oneshot, 单生产者,单消费者,一次只能发送一条消息
broadcast,多生产者,多消费者,其中每一条发送的消息都可以被所有接收者收到,因此是广播
watch,单生产者,多消费者,只保存一条最新的消息,因此接收者只能看到最近的一条消息,例如,这种模式适用于配置文件变化的监听
async-channel:多生产者、多消费者,且每一条消息只能被其中一个消费者接收

以上这些消息通道都有一个共同点:适用于 async 编程
对于其它场景,可以使用在多线程章节中提到过的 std::sync::mpsc 和 crossbeam::channel
这些通道在等待消息时会阻塞当前的线程,因此不适用于 async 编程

消息定义

use bytes::Bytes;

#[derive(Debug)]
// 枚举类型允许你定义一个类型,它可以有多个不同的变体,每个变体可以携带不同的数据。
enum Command {
    Get {
        key: String,
    },
    Set {
        key: String,
        val: Bytes,
    }
}

创建消息通道

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
    
	// mpsc 是多发单收
    let tx2 = tx.clone();

	// tokio::spawn 创建的异步任务可能会在 main 函数结束之后才执行
	// 如果不使用 move 关键字,async 块会以引用的方式捕获 tx2
	// 当 main 函数结束时,tx2 的所有权会被释放
	// 此时异步任务再去访问 tx2 就会出现悬垂引用
    tokio::spawn(async move {
        tx.send("sending from first handle").await;
    });

    tokio::spawn(async move {
        tx2.send("sending from second handle").await;
    });

	// 接收者是在管理 redis 连接的任务中
	// 当该任务发现所有发送者都关闭时,它知道它的使命可以完成了,因此它会关闭 redis 连接
    while let Some(message) = rx.recv().await {
        println!("GOT = {}", message);
    }
}

一个任务可以通过此通道将命令发送给管理 redis 连接的任务,同时由于通道支持多个生产者,因此多个任务可以同时发送命令
创建该通道会返回一个发送和接收句柄,这两个句柄可以分别被使用,例如它们可以被移动到不同的任务中。

通道的缓冲队列长度是 32,意味着如果消息发送的比接收的快,这些消息将被存储在缓冲队列中,一旦存满了 32 条消息,使用send(...).await的发送者会进入睡眠,直到缓冲队列可以放入新的消息(被接收者消费了)

可以使用 clone 方法克隆多个发送者,但是接收者无法被克隆,因为通道是 mpsc 类型

当所有的发送者都被 Drop 掉后(超出作用域或被 drop(...)函数主动释放),就不再会有任何消息发送给该通道
此时 recv 方法将返回 None,也意味着该通道已经被关闭

生成管理任务

创建一个管理任务,它会管理 redis 的连接,当然,首先需要创建一条到 redis 的连接

use mini_redis::client;
// 将消息通道接收者 rx 的所有权转移到管理任务中
let manager = tokio::spawn(async move {
    // Establish a connection to the server
    // 建立到 redis 服务器的连接
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();

    // 开始接收消息
    // 当从消息通道接收到一个命令时,该管理任务会将此命令通过 redis 连接发送到服务器
    while let Some(cmd) = rx.recv().await {
        use Command::*;

        match cmd {
            Get { key } => {
                client.get(&key).await;
            }
            Set { key, val } => {
                client.set(&key, val).await;
            }
        }
    }
});
// 由于有两个任务,因此需要两个发送者
let tx2 = tx.clone();

// 生成两个任务,一个用于获取 key,一个用于设置 key
let t1 = tokio::spawn(async move {
    let cmd = Command::Get {
        key: "hello".to_string(),
    };

    tx.send(cmd).await.unwrap();
});

let t2 = tokio::spawn(async move {
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: "bar".into(),
    };

    tx2.send(cmd).await.unwrap();
});

在 main 函数的末尾,让 3 个任务,按照需要的顺序开始运行:

t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();

可执行版本

use tokio::sync::mpsc;
use bytes::Bytes;

// 定义命令枚举类型
enum Command {
    Get {
        key: String,
    },
    Set {
        key: String,
        val: Bytes,
    }
}

#[tokio::main]
async fn main() {
    // 创建一个 mpsc 消息通道,缓冲区大小为 32
    let (tx, mut rx) = mpsc::channel::<Command>(32);

    // 将消息通道接收者 rx 的所有权转移到管理任务中
    let manager = tokio::spawn(async move {
        // 开始接收消息
        while let Some(cmd) = rx.recv().await {
            use Command::*;

            match cmd {
                Get { key } => {
                    // client.get(&key).await;
                    println!("get {}", key);
                }
                Set { key, val } => {
                    // client.set(&key, val).await;
                    println!("set {} {}", key, String::from_utf8_lossy(&val));
                }
            }
        }
    });

    // 由于有两个任务,因此我们需要两个发送者
    let tx2 = tx.clone();

    // 生成两个任务,一个用于获取 key,一个用于设置 key
    let t1 = tokio::spawn(async move {
        let cmd = Command::Get {
            key: "hello".to_string(),
        };

        tx.send(cmd).await.unwrap();
    });

    let t2 = tokio::spawn(async move {
        let cmd = Command::Set {
            key: "foo".to_string(),
            val: "bar".into(),
        };

        tx2.send(cmd).await.unwrap();
    });

    // 等待任务完成
    t1.await.unwrap();
    t2.await.unwrap();
    manager.await.unwrap();
}

接收响应消息

让发出命令的任务从管理任务那里获取命令执行的结果
使用 oneshot 消息通道,因为它针对一发一收的使用类型做过特别优化,且特别适用于此时的场景:接收一条从管理任务发送的结果消息

use tokio::sync::oneshot;

let (tx, rx) = oneshot::channel();

使用方式跟 mpsc 很像,但是它并没有缓存长度,因为只能发送一条,接收一条
还有一点不同:无法对返回的两个句柄进行 clone

为了让管理任务将结果准确的返回到发送者手中,这个管道的发送端必须要随着命令一起发送, 然后发出命令的任务保留管道的接收端
一个比较好的实现就是将管道的发送端放入 Command 的数据结构中,同时使用一个别名来代表该发送端

use tokio::sync::oneshot;
use bytes::Bytes;

#[derive(Debug)]
enum Command {
    Get {
        key: String,
        // oneshot::Sender<mini_redis::Result<Option<Bytes>>>
        resp: Responder<Option<Bytes>>,
    },
    Set {
        key: String,
        val: Bytes,
        // // oneshot::Sender<mini_redis::Result<()>>
        resp: Responder<()>,
    },
}


/// 管理任务可以使用该发送端将命令执行的结果传回给发出命令的任务
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;

下面,更新发送命令的代码:

let t1 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Get {
        key: "hello".to_string(),
        resp: resp_tx,
    };

    // 发送 GET 请求
    tx.send(cmd).await.unwrap();

    // 等待回复
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});

let t2 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: "bar".into(),
        resp: resp_tx,
    };

    // 发送 SET 请求
    tx2.send(cmd).await.unwrap();

    // 等待回复
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});

最后,更新管理任务:

while let Some(cmd) = rx.recv().await {
    match cmd {
        Command::Get { key, resp } => {
            let res = client.get(&key).await;
            // 忽略错误
            let _ = resp.send(res);
        }
        Command::Set { key, val, resp } => {
            let res = client.set(&key, val).await;
            // 忽略错误
            let _ = resp.send(res);
        }
    }
}

注意
往 oneshot 中发送消息时,并没有使用 .await,原因是该发送操作要么直接成功、要么失败,并不需要等待
当 oneshot 的接受端被 drop 后,继续发送消息会直接返回 Err 错误,它表示接收者已经不感兴趣
对于本场景,接收者不感兴趣是非常合理的操作,并不是一种错误,因此可以直接忽略

无论何时使用消息通道,都需要对缓存队列的长度进行限制,这样系统才能优雅的处理各种负载状况。
Tokio 在设计时就考虑了这种状况,例如 async 操作在 Tokio 中是惰性的:

loop {
    async_op();
}

async_op 不是惰性的,而是在每次循环时立即执行,那该循环会立即将一个 async_op 发送到缓冲队列中,然后开始执行下一个循环
Async Rust 和 Tokio 中,async_op 根本就不会运行,也就不会往消息队列中写入消息。原因是没有调用 .await,就算使用了 .await 上面的代码也不会有问题,因为只有等当前循环的任务结束后,才会开始下一次循环。

loop {
    // 当前 `async_op` 完成后,才会开始下一次循环
    async_op().await;
}

总之,在 Tokio 中必须要显式地引入并发和队列:

tokio::spawn
select!
join!
mpsc::channel

当这么做时,需要小心的控制并发度来确保系统的安全。
例如,当使用一个循环去接收 TCP 连接时,要确保当前打开的 socket 数量在可控范围内,而不是毫无原则的接收连接
再比如,当使用 mpsc::channel 时,要设置一个缓冲值

附录

unwrap

主要用于处理 Option 和 Result<T, E> 类型的值

Option 类型中的 unwrap
Option 是 Rust 标准库中的一个枚举类型,用于表示一个值可能存在(Some(T))或不存在(None)的情况

enum Option<T> {
    Some(T),
    None,
}

Option 类型实现了 unwrap 方法

pub fn unwrap(self) -> T {
    match self {
        Some(val) => val,
        None => panic!("called `Option::unwrap()` on a `None` value"),
    }
}

unwrap 方法的功能是从 Option 中提取出其中的值
如果 Option 是 Some(T) 变体,它会返回其中包含的值
如果是 None 变体,它会触发一个 panic,导致程序崩溃

fn main() {
    let some_value: Option<i32> = Some(42);
    let result = some_value.unwrap();
    println!("The value is: {}", result); // 输出: The value is: 42

    let none_value: Option<i32> = None;
    let _ = none_value.unwrap(); // 这里会触发 panic
}

当非常确定 Option 一定是 Some(T) 变体时,可以使用 unwrap 来简洁地提取值
例如,在一些测试代码或者初始化过程中,某些值是确定存在的。
风险:如果 Option 是 None 变体,使用 unwrap 会导致程序崩溃。因此,在生产环境中,不建议直接使用 unwrap

Result<T, E> 类型中的 unwrap
Result<T, E> 也是 Rust 标准库中的一个枚举类型,用于表示一个操作可能成功(Ok(T))或失败(Err(E))的情况。其定义如下:

enum Result<T, E> {
    Ok(T),
    Err(E),
}

Result<T, E> 类型实现了 unwrap 方法:

pub fn unwrap(self) -> T {
    match self {
        Ok(t) => t,
        Err(e) => panic!("called `Result::unwrap()` on an `Err` value: {:?}", e),
    }
}

unwrap 方法的功能是从 Result<T, E> 中提取出其中的值
如果 Result<T, E> 是 Ok(T) 变体,它会返回其中包含的值
如果是 Err(E) 变体,它会触发一个 panic,并输出错误信息

fn divide(a: i32, b: i32) -> Result<i32, &'static str> {
    if b == 0 {
        Err("Division by zero")
    } else {
        Ok(a / b)
    }
}

fn main() {
    let result1 = divide(10, 2);
    let value1 = result1.unwrap();
    println!("The result is: {}", value1); // 输出: The result is: 5

    let result2 = divide(10, 0);
    let _ = result2.unwrap(); // 这里会触发 panic
}

当非常确定操作一定会成功,或者在开发和调试阶段需要快速验证代码逻辑时,可以使用 unwrap
风险:如果操作失败,unwrap 会导致程序崩溃。在生产环境中,应该避免直接使用 unwrap,而是使用更安全的错误处理方式,如 match 语句、if let 语句或 ? 运算符

为了避免程序因 unwrap 触发 panic 而崩溃,可以使用以下更安全的方法

  1. match 语句
let some_value: Option<i32> = Some(42);
match some_value {
    Some(val) => println!("The value is: {}", val),
    None => println!("No value found."),
}
  1. if let 语句
let some_value: Option<i32> = Some(42);
if let Some(val) = some_value {
    println!("The value is: {}", val);
} else {
    println!("No value found.");
}
  1. ? 运算符
use std::fs::File;

fn open_file() -> Result<File, std::io::Error> {
	// 当在一个返回 Result 类型的函数中使用 ? 运算符时
	// 如果 Result 是 Ok 变体,它会提取出其中的值并继续执行后续代码
	// 如果是 Err 变体,它会直接将错误返回给调用者,实现错误的快速传播
    let file = File::open("example.txt")?;
    Ok(file)
}

综上所述,unwrap 是一个方便但有风险的方法,在使用时需要谨慎考虑
在生产环境中,建议使用更安全的错误处理方式来避免程序崩溃

yield_now

在异步编程中,任务调度是一个关键问题。
有时一个异步任务可能会占用较多的执行时间,导致其他任务无法及时得到执行。
tokio::task::yield_now 函数的作用就是让当前正在执行的异步任务主动让出执行权,将控制权交还给 tokio 运行时,这样运行时就可以调度其他等待的任务执行。当其他任务执行一段时间后,运行时会再回来继续执行这个让出执行权的任务。

pub async fn yield_now() -> YieldNow

它是一个异步函数(async fn),意味着调用它会返回一个 Future
调用 yield_now 后,返回的 YieldNow 类型的 Future 在 .await 时会让出当前任务的执行权,当该 Future 完成时,当前任务会恢复执行

作用
避免长时间占用 CPU
当一个异步任务中有一段比较耗时的计算逻辑时,为了避免该任务长时间占用 CPU 资源,影响其他任务的执行
可以在适当的位置调用 yield_now().await 让出执行权,让其他任务有机会执行

任务协作
在多个异步任务协作的场景中,有时需要控制任务的执行顺序,通过调用 yield_now().await 可以实现任务之间的协作
确保各个任务按预期顺序执行

use tokio::task;

#[tokio::main]
async fn main() {
    // 创建两个异步任务
    let task1 = task::spawn(async {
        for i in 0..5 {
            println!("Task 1: Step {}", i);
            if i == 2 {
                // 当执行到第 3 步时,让出执行权
                task::yield_now().await;
            }
        }
    });

    let task2 = task::spawn(async {
        for i in 0..5 {
            println!("Task 2: Step {}", i);
        }
    });

    // 等待两个任务完成
    let _ = tokio::try_join!(task1, task2);
}

“锁竞争不仅会导致当前任务被阻塞,还会导致执行任务的线程被阻塞、该线程准备执行的其它任务也会被阻塞”

在 Rust 中,任务(Task)是异步执行的基本单元。
当一个任务尝试获取一个已经被其他任务或线程持有的锁时,该任务无法立即获得锁,就会进入阻塞状态
处于阻塞状态的任务会暂停执行,等待锁被释放,之后才有机会继续执行

use std::sync::{Mutex, Arc};
use tokio::task;

#[tokio::main]
async fn main() {
    let data = Arc::new(Mutex::new(0));

    let handle1 = task::spawn({
        let data = Arc::clone(&data);
        async move {
            let mut guard = data.lock().unwrap();
            // 模拟一些耗时操作
            tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
            *guard += 1;
        }
    });

    let handle2 = task::spawn({
        let data = Arc::clone(&data);
        async move {
            let mut guard = data.lock().unwrap();
            *guard += 1;
        }
    });

    let _ = tokio::try_join!(handle1, handle2);
}

如果 handle1 先获取到了锁,并且在进行耗时操作(这里用 tokio::time::sleep 模拟),那么 handle2 尝试获取锁时就会被阻塞,直到 handle1 释放锁。

在 Rust 的异步编程中,任务通常是在线程池中的线程上执行的。
当一个任务因为锁竞争被阻塞时,执行该任务的线程也会被阻塞
这是因为线程需要等待任务获取到锁并继续执行,在这期间线程无法去执行其他操作。

由于线程被阻塞,原本计划在该线程上执行的其他任务也无法得到执行。
线程就像一个执行者,当它被当前任务的锁竞争问题困住时,就没办法去处理其他待执行的任务,从而导致这些任务也被阻塞。

假设一个线程池中有一个线程负责执行多个异步任务,当其中一个任务因为锁竞争被阻塞时,该线程会等待这个任务获取锁
而线程池分配给这个线程的其他任务就只能等待,直到该线程从阻塞状态恢复并可以继续执行任务

current_thread运行时

在该设置下会使用一个单线程的调度器(执行器),所有的任务都会创建并执行在当前线程上,因此不再会有锁竞争
current_thread 是一个轻量级、单线程的运行时,当任务数不多或连接数不多时是一个很好的选择
例如想在一个异步客户端库的基础上提供给用户同步的 API 访问时,该模式就很适用

// Builder 是一个构建器模式的类型,用于创建和配置 tokio 运行时实例
use tokio::runtime::Builder;
use tokio::task;

fn main() {
	// Builder::new_current_thread():
	// 调用 Builder 类型的静态方法 new_current_thread,该方法返回一个用于构建单线程运行时的 Builder 实例
	// 单线程运行时意味着所有的异步任务都将在当前线程上执行,而不会创建额外的线程
	// 这种模式适用于对资源占用有严格要求或者不需要高并发的场景
	// 
	// .enable_all():
	// 调用 Builder 实例的 enable_all 方法,该方法会启用运行时的所有功能,包括 I/O 事件循环、定时器、任务调度等
	// 这样可以确保运行时具备完整的功能来执行各种异步操作
	// .build():
	// 调用 Builder 实例的 build 方法,尝试构建并初始化 tokio 运行时实例
	// 该方法返回一个 Result 类型,可能包含成功构建的运行时实例(Ok 变体),也可能包含构建过程中出现的错误(Err 变体)
    let rt = Builder::new_current_thread()
       .enable_all()
       .build()
       .unwrap();

	// 调用运行时实例 rt 的 block_on 方法,该方法会阻塞当前线程,直到传入的异步代码执行完成
	// block_on 方法接收一个 Future 作为参数,这里传入一个 async 块,async 块会被自动转换为一个 Future
    rt.block_on(async {
    	// 该函数接收一个 Future 作为参数,并返回一个 JoinHandle 类型的句柄,通过该句柄可以等待任务完成并获取其结果。
        let handle = task::spawn(async {
            println!("Running task on current thread runtime");
        });

        handle.await.unwrap();
    });
}

enum

use bytes::Bytes;

enum Command {
    Get {
        key: String,
    },
    Set {
        key: String,
        val: Bytes,
    }
}

fn main() {
    // 创建一个 Get 命令实例
    let get_command = Command::Get {
        key: "my_key".to_string(),
    };

    // 创建一个 Set 命令实例
    let set_command = Command::Set {
        key: "another_key".to_string(),
        val: Bytes::from("some_value"),
    };

    // 处理命令
    match get_command {
        Command::Get { key } => println!("Getting value for key: {}", key),
        Command::Set { key, val } => println!("Setting key: {} with value: {:?}", key, val),
    }
}

move关键字的功能

move 关键字的主要作用是将闭包(这里是 async 块)所捕获的变量的所有权转移到闭包内部。在 Rust 中,闭包默认以引用的方式捕获变量,但在异步任务中,由于任务的执行时机是不确定的,可能会在变量的原始所有者超出作用域之后才执行。如果闭包以引用的方式捕获变量,就会出现悬垂引用的问题,导致程序崩溃。

异步任务应该是独立的,不依赖于外部环境的生命周期。通过使用 move 关键字,将所需的变量的所有权转移到任务内部,确保任务可以在任何时候执行,而不受外部变量生命周期的限制。

tokio 的运行时可能会在多个线程之间调度异步任务。如果闭包以引用的方式捕获变量,可能会导致数据竞争和其他并发问题,因为多个线程可能同时访问同一个变量。使用 move 关键字将变量的所有权转移到闭包内部,避免了多个线程同时访问同一个变量的问题,提高了多线程环境下的安全性。

tokio::sync::oneshot

Tokio 异步运行时提供的一个用于一次性消息传递的同步原语
它允许一个任务向另一个任务发送单个值,并且接收任务可以等待这个值的到来

oneshot 通道由两个部分组成:发送端(Sender)和接收端(Receiver)
发送端用于发送单个值,接收端用于接收这个值
一旦发送端发送了值,或者被丢弃,接收端的等待操作就会完成。oneshot 通道是一次性的,即只能发送和接收一次消息

异步任务结果返回:
当一个异步任务完成后,需要将结果返回给另一个任务时,可以使用 oneshot 通道
例如,一个异步计算任务完成后,将计算结果发送给等待的任务

任务间同步:
一个任务需要等待另一个任务完成某个操作后才能继续执行,可以使用 oneshot 通道来实现这种同步

API 介绍

  1. channel 函数
pub fn channel<T>() -> (Sender<T>, Receiver<T>)

功能:创建一个新的 oneshot 通道,返回一个元组,包含发送端 Sender 和接收端 Receiver
参数:泛型参数 T 表示要发送的值的类型

  1. Sender 类型
pub struct Sender<T>;

send 方法:

pub fn send(self, t: T) -> Result<(), SendError<T>>

功能:尝试将值 t 发送到 oneshot 通道。如果接收端仍然在等待,则发送成功;如果接收端已经被丢弃,则发送失败
参数:self 表示发送端实例,t 是要发送的值
返回值:Result 类型,成功时返回 Ok(()),失败时返回 Err(SendError),SendError 包含未发送的值

  1. Receiver 类型
pub struct Receiver<T>;

await 操作:
Receiver 实现了 Future trait,因此可以使用 await 关键字等待接收值

pub async fn await(self) -> Result<T, Canceled>

功能:等待从 oneshot 通道接收值。如果发送端发送了值,则返回 Ok(T);如果发送端被丢弃,则返回 Err(Canceled)

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    // 创建一个 oneshot 通道
    let (tx, rx) = oneshot::channel::<i32>();

    // 启动一个异步任务来发送值
    tokio::spawn(async move {
        // 模拟一些异步操作
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        // 发送值
        let _ = tx.send(42);
    });

    // 等待接收值
    match rx.await {
        Ok(value) => {
            println!("Received value: {}", value);
        }
        Err(_) => {
            println!("The sender was dropped before sending a value.");
        }
    }
}

注意:
一次性使用:oneshot 通道只能发送和接收一次消息。发送后再次尝试发送会失败,接收后再次尝试接收也不会有结果
错误处理:在使用 send 方法和 await 接收值时,都需要进行错误处理,以应对发送端或接收端被丢弃的情况
通过使用 tokio::sync::oneshot,可以方便地在异步任务之间进行一次性的消息传递和同步

为什么 use Command::*; 要放入到代码内部

放在代码块内部(如 while let 循环内部)而非模块顶部,主要是基于作用域和命名冲突方面的考量,

Rust 中的 use 语句用于引入模块中的项,它的作用范围遵循 Rust 的作用域规则
当把 use Command::*; 放在代码块内部时,其引入的 Command 枚举的所有变体(如 Get、Set 等)仅在该代码块内部可见

避免全局命名污染:
若把 use Command::*; 放在模块顶部,Command 枚举的所有变体将在整个模块中可见,可能会与其他模块或代码中的同名项产生冲突。
将其放在代码块内部可以限制这些变体的可见范围,减少命名冲突的可能性。

代码可读性:
将 use 语句放在使用这些变体的代码附近,能让读者更清晰地了解这些变体的使用上下文,增强代码的可读性

命名冲突避免
如果项目中有多个枚举或类型包含同名的项,把 use 语句放在代码块内部可以避免命名冲突。

enum Command {
    Get {
        key: String,
    },
    Set {
        key: String,
        val: String,
    }
}

enum AnotherCommand {
    Get {
        id: u32,
    }
}

fn main() {
    let (tx, rx) = tokio::sync::mpsc::channel::<Command>(32);
    let manager = tokio::spawn(async move {
        while let Some(cmd) = rx.recv().await {
        	// 引入的 Command 枚举的所有变体(如 Get、Set 等)仅在该代码块内部可见
            use Command::*;
            match cmd {
                Get { key } => {
                    println!("get {}", key);
                }
                Set { key, val } => {
                    println!("set {} {}", key, val);
                }
            }
        }
    });
    // 即使 AnotherCommand 也有 Get 变体,由于 use Command::*; 作用域限制,不会产生命名冲突
}

use Command::*; 放在 while let 循环内部,仅在该循环内引入 Command 枚举的变体,避免了与 AnotherCommand 枚举中的 Get 变体产生命名冲突。

往 oneshot 发送消息并没有使用 .await,原因是该发送操作要么直接成功/失败,并不需要等待

while let Some(cmd) = rx.recv().await {
    match cmd {
        Command::Get { key, resp } => {
            let res = client.get(&key).await;
            // 忽略错误
            let _ = resp.send(res);
        }
        Command::Set { key, val, resp } => {
            let res = client.set(&key, val).await;
            // 忽略错误
            let _ = resp.send(res);
        }
    }
}

和 resp.send 方法的返回类型有关

在 Rust 的异步编程中,await 关键字只能用于实现了 Future trait 的类型。当对一个 Future 类型的值使用 await 时,当前的异步任务会暂停执行,直到该 Future 完成,然后继续执行后续代码

通常情况下,resp 是一个消息通道的发送端(例如 tokio::sync::oneshot::Sender 或 tokio::sync::mpsc::Sender 等),其 send 方法返回的可能是一个普通的 Result 类型,而非 Future 类型。

以 tokio::sync::oneshot::Sender 为例

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel::<i32>();
    let result = tx.send(42);
    // result 的类型是 Result<(), oneshot::error::SendError<i32>>
    // 它不是 Future 类型,所以不需要 await
    match result {
        Ok(_) => println!("Value sent successfully"),
        Err(_) => println!("Failed to send value"),
    }
}

send 方法会立即尝试发送值,并返回一个 Result 类型,表示发送操作的结果
由于它不是 Future 类型,所以不需要使用 await 来等待其完成

resp.send 不需要 await 是因为它返回的不是 Future 类型,而是一个普通的 Result 类型,表示发送操作的即时结果
如果返回的是 Future 类型,那么就需要使用 await 来等待其完成。

其他人提供的示例

数据 -> UDP 套接字 -> tx -> rx -> tx2 -> rx2 -> UDP 套接字

// mpsc:多生产者单消费者(MPSC)消息通道,用于在不同的异步任务之间传递消息
use tokio::{net::UdpSocket, sync::mpsc, time::{sleep, Duration}};
use std::{io::Result, sync::Arc, net::SocketAddr};

// 异步函数 process
// 接收一个字节切片的引用 data 作为输入
// 返回一个 Result<Vec<u8>> 类型,表示可能产生 I/O 错误的操作结果,结果类型是一个字节向量。
async fn process(data: &[u8]) -> Result<Vec<u8>> {
	// 空的字节向量
    let res = Vec::<u8>::new();
    println!("start process, len:{}, sleep 2s", data.len());
    sleep(Duration::from_secs(2)).await;
    println!("finish process, len:{}", data.len());
    // 返回空的字节向量
    Ok(res)
}

async fn start_server(port: u16) -> Result<()> {
    let server_addr = format!("0.0.0.0:{port}").parse::<SocketAddr>().unwrap();
    let sock = UdpSocket::bind(server_addr).await.unwrap();
    // 将 UDP 套接字包装在 Arc 中,以便在多个异步任务之间安全地共享
    let reader = Arc::new(sock);
    // 克隆 Arc 指针,得到另一个可以用于发送数据的引用
    let sender = reader.clone();

	// 用于将接收到的数据传递给数据处理任务,通道容量为 1000
	// 通道中传递的数据类型是 (Vec<u8>, SocketAddr),表示字节向量和客户端地址的元组。
    let (tx, mut rx) = mpsc::channel::<(Vec<u8>, SocketAddr)>(1_000);
    // 用于将处理后的数据传递给数据发送任务,通道容量同样为 1000。
    let (tx2, mut rx2) = mpsc::channel::<(Vec<u8>, SocketAddr)>(1_000);

	// 从 rx 接收消息,发送到 tx2
    tokio::spawn(async move { // process data
        while let Some((bytes, addr)) = rx.recv().await {
            let tx3 = tx2.clone();
            tokio::spawn(async move {
                let data = process(&bytes).await.unwrap();
                tx3.send((data, addr)).await.unwrap();
            });
        }
    });

	// 从 rx2 接收消息,发送到 udp 套接字
    tokio::spawn(async move { // send data
        while let Some((bytes, addr)) = rx2.recv().await {
            let len = sender.send_to(&bytes, &addr).await.unwrap();
            println!("send to data: {:?}", len);
        }
    });

	// 创建一个长度为 512 的字节数组 buf,用于存储接收到的数据
    let mut buf = [0; 512];
    loop { // receive data
    	// 从 UDP 套接字接收消息,然后发送到 tx
        let (len, addr) = reader.recv_from(&mut buf).await?;
        tx.send((buf[..len].to_vec(), addr)).await.unwrap();
    }
}



#[tokio::main]
async fn main() -> Result<()> {
    start_server(9999).await
}

相关文章:

  • 【GDB】 断点的相关设置
  • python turtle模块有哪几种命令
  • Web自动化之Selenium控制已经打开的浏览器(Chrome,Edge)
  • DPVS-4: dpvs.conf配置文件解读
  • 开源机器学习框架
  • 搭建Docker Harbor仓库
  • 【MySQL篇】持久化和非持久化统计信息的深度剖析(含analyze命令和mysqlcheck工具两种收集方式)
  • Leetcode350:两个数组的交集 II
  • 高通Camera点亮3——Camera Module
  • 记录此刻:历时两月,初步实现基于FPGA的NVMe SSD固态硬盘存储控制器设计!
  • SpringBoot+Mybatis-Plus实现动态数据源
  • Deepseek-强化学习算法(通俗易懂版)
  • 【带你 langchain 双排系列教程】9.LangChain基于RAG 实现文档问答:从入门到实战
  • inet_pton()函数的概念和使用案例
  • JavaScript作用域和闭包,原理与用途?
  • 面向对象程序设计-实验十一
  • CSS—盒模型(3分钟结合示例精通盒模型)
  • (十七)WebGL中 图像处理的初识
  • docker部署RustDesk自建服务器
  • 数据库(MySQL):使用命令从零开始在Navicat创建一个数据库及其数据表(一).创建基础表
  • 网站上地图怎么做的/搜狗登录入口
  • 新网网站空间购买/高端seo服务
  • 上海微信网站开发/合肥做网站的公司有哪些
  • 小程序制作流程微信/河北百度竞价优化
  • 做网站私活多少钱/怎么做游戏推广员
  • 怎么做淘课网站/营口建网站的公司