← 返回首页

Rust 异步编程实战指南

2026/3/22

Rust 异步编程 Tokio

Rust 异步编程实战指南

异步 Rust 很强大,但也很复杂。这是实用指南,不是理论教材。


为什么要异步?

何时需要异步

场景同步 vs 异步
CPU 密集同步更好
I/O 密集(少量并发)同步足够
I/O 密集(大量并发)异步必要
混合负载异步 + spawn_blocking

异步的收益

// 同步:1000 个连接 = 1000 个线程
for stream in listener.incoming() {
    let stream = stream?;
    thread::spawn(|| handle_client(stream)); // 每个连接一个线程
}

// 异步:1000 个连接 = 几个线程
while let Ok((stream, _)) = listener.accept().await {
    tokio::spawn(async move { handle_client(stream).await });
}

运行时选择

Tokio vs async-std

特性Tokioasync-std
市场份额~80%~15%
API 风格更底层更像 std
启动速度更快稍慢
文档优秀良好
生态最丰富中等

推荐:新项目用 Tokio,除非有特殊原因。

最小 Tokio 程序

#[tokio::main]
async fn main() {
    println!("Hello, async!");
}

等价于:

fn main() {
    tokio::runtime::Runtime::new()
        .unwrap()
        .block_on(async {
            println!("Hello, async!");
        })
}

基础概念

Future

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}

async/.await

// async fn 返回 impl Future
async fn fetch(url: &str) -> Result<String, Error> {
    let response = reqwest::get(url).await?;  // 等待完成
    let body = response.text().await?;
    Ok(body)
}

spawn vs block_on

// block_on:阻塞当前线程
let result = tokio::runtime::Runtime::new().block_on(async { 42 });

// spawn:非阻塞,返回 JoinHandle
let handle = tokio::spawn(async { 42 });
let result = handle.await.unwrap();

常用模式

超时

use tokio::time::{timeout, Duration};

async fn fetch_with_timeout(url: &str) -> Result<String, Error> {
    match timeout(Duration::from_secs(5), reqwest::get(url)).await {
        Ok(response) => Ok(response?.text().await?),
        Err(_) => Err(Error::Timeout),
    }
}

重试

use tokio::time::{sleep, Duration};

async fn fetch_with_retry(url: &str, max_retries: u32) -> Result<String, Error> {
    let mut retries = 0;
    loop {
        match reqwest::get(url).await {
            Ok(response) => return Ok(response.text().await?),
            Err(e) if retries < max_retries => {
                retries += 1;
                sleep(Duration::from_millis(100 * retries as u64)).await;
            }
            Err(e) => return Err(e),
        }
    }
}

并发控制

use tokio::sync::Semaphore;

async fn fetch_all(urls: Vec<&str>) -> Vec<Result<String, Error>> {
    let semaphore = Arc::new(Semaphore::new(10)); // 最多 10 个并发
    let mut handles = vec![];

    for url in urls {
        let permit = semaphore.clone().acquire_owned().await.unwrap();
        let handle = tokio::spawn(async move {
            let result = reqwest::get(url).await?.text().await;
            drop(permit); // 释放许可
            result
        });
        handles.push(handle);
    }

    futures::future::join_all(handles).await
}

取消

use tokio::select;
use tokio::sync::oneshot;

async fn cancellable_operation(mut cancel: oneshot::Receiver<()>) {
    loop {
        select! {
            _ = &mut cancel => {
                println!("Cancelled!");
                return;
            }
            _ = do_work() => {
                // 继续工作
            }
        }
    }
}

错误处理

async fn 中的 ?

async fn fetch(url: &str) -> Result<String, reqwest::Error> {
    let response = reqwest::get(url).await?;  // ? 正常工作
    Ok(response.text().await?)
}

TryStream

use futures::TryStreamExt;

async fn process_items() -> Result<(), Error> {
    let mut stream = fetch_items().await?;

    while let Some(item) = stream.try_next().await? {
        process(item).await?;
    }

    Ok(())
}

常见陷阱

1. 阻塞异步运行时

// ❌ 错误:阻塞整个运行时
async fn bad() {
    std::thread::sleep(Duration::from_secs(1)); // 阻塞!
}

// ✅ 正确:使用异步 sleep
async fn good() {
    tokio::time::sleep(Duration::from_secs(1)).await;
}

// ✅ 正确:CPU 密集任务用 spawn_blocking
async fn cpu_intensive() -> Result<i32, Error> {
    tokio::task::spawn_blocking(|| {
        // 在这里做 CPU 密集工作
        expensive_computation()
    }).await?
}

2. 持有锁跨越 await

// ❌ 错误:死锁风险
async fn bad(mutex: Arc<Mutex<Data>>) {
    let data = mutex.lock().unwrap();
    something_async().await; // 持有锁时 await!
}

// ✅ 正确:使用 async Mutex
async fn good(mutex: Arc<tokio::sync::Mutex<Data>>) {
    let data = mutex.lock().await;
    something_async().await; // OK
}

3. 忘记 await

// ❌ 错误:future 被丢弃,不执行
async fn bad() {
    async_operation(); // 没有.await,不执行!
}

// ✅ 正确
async fn good() {
    async_operation().await;
}

4. Send trait 问题

// ❌ 错误:Rc 不是 Send
async fn bad() {
    let data = Rc::new(42);
    tokio::spawn(async move {
        println!("{}", *data); // Rc 不能跨线程
    });
}

// ✅ 正确:使用 Arc
async fn good() {
    let data = Arc::new(42);
    tokio::spawn(async move {
        println!("{}", *data); // OK
    });
}

性能优化

1. 避免不必要的 clone

// ❌ 每次迭代都 clone
while let Some(item) = items.next().await {
    let config = config.clone(); // 不必要的 clone
    process(item, &config).await;
}

// ✅ 只 clone 一次
let config = Arc::new(config);
while let Some(item) = items.next().await {
    process(item, &config).await;
}

2. 使用 buffer

use futures::stream::{self, StreamExt};

// 批量处理
let results = stream::iter(urls)
    .map(|url| async move { fetch(url).await })
    .buffer_unordered(10) // 10 个并发
    .collect::<Vec<_>>()
    .await;

3. 选择正确的 channel

Channel用途
oneshot单值,一次性
mpsc多生产者,单消费者
broadcast多生产者,多消费者
watch单值,多消费者,只保留最新

测试异步代码

tokio::test

#[tokio::test]
async fn test_fetch() {
    let result = fetch("https://example.com").await;
    assert!(result.is_ok());
}

Mock

use mockall::predicate::*;
use mockall::mock;

mock! {
    HttpClient {}

    #[tokio::main]
    impl HttpClientTrait for HttpClient {
        async fn fetch(&self, url: &str) -> Result<String, Error>;
    }
}

#[tokio::test]
async fn test_with_mock() {
    let mut mock = MockHttpClient::new();
    mock.expect_fetch()
        .with(eq("https://example.com"))
        .returning(|_| Ok("mocked response".to_string()));

    let result = mock.fetch("https://example.com").await;
    assert_eq!(result.unwrap(), "mocked response");
}

生态工具

常用 crate

Crate用途
tokio运行时
async-std另一个运行时
futuresFuture 工具
tokio-streamStream 扩展
reqwestHTTP 客户端
tokio-postgresPostgreSQL
deadpool-postgres连接池
redisRedis 客户端

监控

use tokio::metrics::RuntimeMetrics;

#[tokio::main]
async fn main() {
    let runtime = tokio::runtime::Handle::current();
    let metrics = runtime.metrics();

    println!("Active tasks: {}", metrics.active_tasks_count());
    println!("IO drivers: {}", metrics.io_driver_count());
}

最佳实践总结

  1. 不要阻塞运行时 - 用 spawn_blocking 处理 CPU 密集任务
  2. 选择正确的同步原语 - 跨 await 用 async Mutex
  3. 控制并发 - 用 Semaphore 限制并发数
  4. 处理取消 - 用 select! 和 cancellation tokens
  5. 错误处理 - 用 ?TryStream
  6. 测试 - 用 #[tokio::test] 和 mock

学习资源


总结

异步 Rust 的关键:

  1. 选择 Tokio - 生态最好
  2. 理解 Future - poll 机制
  3. 避免阻塞 - 不在 async 中阻塞
  4. 正确使用锁 - 跨 await 用 async Mutex
  5. 控制并发 - Semaphore、buffer_unordered

异步 Rust 确实复杂,但对于高并发 I/O 场景,它是值得的。

📝 文章反馈

你的反馈能帮助我写出更好的文章