Rust异步运行时最小实现 - extreme 分享
Rust语言通过定义了Future Trait , 奠定了异步语法的基石,而Rust的异步代码时惰性的,必须有一个运行时来驱动,Rust本身还没提供这样的实现,社区中有不少开源方案,比如tokio等。
Tokio的运行时是一个事件循环,利用了不同平台的异步非阻塞特性,比如kqueue,epoll等。
我一直想要弄清楚runtime是怎么调度Future,而Future完成时又是怎么通知runtime,extreme 实现了一个最小运行时,可以让一窥究竟。
use std::sync::{Arc, Condvar, Mutex};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};#[derive(Default)]
struct Park(Mutex<bool>, Condvar);fn unpark(park: &Park) {*park.0.lock().unwrap() = true;park.1.notify_one();
}static VTABLE: RawWakerVTable = RawWakerVTable::new(|clone_me| unsafe {let arc = Arc::from_raw(clone_me as *const Park);std::mem::forget(arc.clone());RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)},|wake_me| unsafe { unpark(&Arc::from_raw(wake_me as *const Park)) },|wake_by_ref_me| unsafe { unpark(&*(wake_by_ref_me as *const Park)) },|drop_me| unsafe { drop(Arc::from_raw(drop_me as *const Park)) },
);/// Run a `Future`.
pub fn run<F: std::future::Future>(mut f: F) -> F::Output {let mut f = unsafe { std::pin::Pin::new_unchecked(&mut f) };let park = Arc::new(Park::default());let sender = Arc::into_raw(park.clone());let raw_waker = RawWaker::new(sender as *const _, &VTABLE);let waker = unsafe { Waker::from_raw(raw_waker) };let mut cx = Context::from_waker(&waker);loop {match f.as_mut().poll(&mut cx) {Poll::Pending => {let mut runnable = park.0.lock().unwrap();while !*runnable {runnable = park.1.wait(runnable).unwrap();}*runnable = false;}Poll::Ready(val) => return val,}}
}
这个简短的例子表达了实现一个运行时的最低需求
- 实现
RawWakerVTable
- 如何通过Waker唤醒runtime继续调度,这里用了信号量
本质上运行时可以抽象成一个不断运行的循环体,在循环体内不断调用Future的poll方法。
(当Future返回Poll::Pending时,此处简化为使用信号量的等待操作)
这个例子也说明了Future的调用能返回时,需要调用存储在ctx里面的Waker::waker()方法,唤醒运行时继续执行阻塞的异步任务