Rust异步运行时最小实现 - extreme 分享

发布于:2025-09-09 ⋅ 阅读:(15) ⋅ 点赞:(0)

Rust语言通过定义了Future Trait , 奠定了异步语法的基石,而Rust的异步代码时惰性的,必须有一个运行时来驱动,Rust本身还没提供这样的实现,社区中有不少开源方案,比如tokio等。

Tokio的运行时是一个事件循环,利用了不同平台的异步非阻塞特性,比如kqueue,epoll等。

我一直想要弄清楚runtime是怎么调度Future,而Future完成时又是怎么通知runtime,extreme 实现了一个最小运行时,可以让一窥究竟。

use std::sync::{Arc, Condvar, Mutex};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};

#[derive(Default)]
struct Park(Mutex<bool>, Condvar);

fn unpark(park: &Park) {
    *park.0.lock().unwrap() = true;
    park.1.notify_one();
}

static VTABLE: RawWakerVTable = RawWakerVTable::new(
    |clone_me| unsafe {
        let arc = Arc::from_raw(clone_me as *const Park);
        std::mem::forget(arc.clone());
        RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
    },
    |wake_me| unsafe { unpark(&Arc::from_raw(wake_me as *const Park)) },
    |wake_by_ref_me| unsafe { unpark(&*(wake_by_ref_me as *const Park)) },
    |drop_me| unsafe { drop(Arc::from_raw(drop_me as *const Park)) },
);

/// Run a `Future`.
pub fn run<F: std::future::Future>(mut f: F) -> F::Output {
    let mut f = unsafe { std::pin::Pin::new_unchecked(&mut f) };
    let park = Arc::new(Park::default());
    let sender = Arc::into_raw(park.clone());
    let raw_waker = RawWaker::new(sender as *const _, &VTABLE);
    let waker = unsafe { Waker::from_raw(raw_waker) };
    let mut cx = Context::from_waker(&waker);

    loop {
        match f.as_mut().poll(&mut cx) {
            Poll::Pending => {
                let mut runnable = park.0.lock().unwrap();
                while !*runnable {
                    runnable = park.1.wait(runnable).unwrap();
                }
                *runnable = false;
            }
            Poll::Ready(val) => return val,
        }
    }
}

这个简短的例子表达了实现一个运行时的最低需求

  • 实现RawWakerVTable
  • 如何通过Waker唤醒runtime继续调度,这里用了信号量

本质上运行时可以抽象成一个不断运行的循环体,在循环体内不断调用Future的poll方法。

(当Future返回Poll::Pending时,此处简化为使用信号量的等待操作)

这个例子也说明了Future的调用能返回时,需要调用存储在ctx里面的Waker::waker()方法,唤醒运行时继续执行阻塞的异步任务


网站公告

今日签到

点亮在社区的每一天
去签到