Rust 异步编程实战指南
2026/3/22
Rust 异步编程 Tokio
Rust 异步编程实战指南
异步 Rust 很强大,但也很复杂。这是实用指南,不是理论教材。
为什么要异步?
何时需要异步
| 场景 | 同步 vs 异步 |
|---|---|
| CPU 密集 | 同步更好 |
| I/O 密集(少量并发) | 同步足够 |
| I/O 密集(大量并发) | 异步必要 |
| 混合负载 | 异步 + spawn_blocking |
异步的收益
// 同步:1000 个连接 = 1000 个线程
for stream in listener.incoming() {
let stream = stream?;
thread::spawn(|| handle_client(stream)); // 每个连接一个线程
}
// 异步:1000 个连接 = 几个线程
while let Ok((stream, _)) = listener.accept().await {
tokio::spawn(async move { handle_client(stream).await });
}
运行时选择
Tokio vs async-std
| 特性 | Tokio | async-std |
|---|---|---|
| 市场份额 | ~80% | ~15% |
| API 风格 | 更底层 | 更像 std |
| 启动速度 | 更快 | 稍慢 |
| 文档 | 优秀 | 良好 |
| 生态 | 最丰富 | 中等 |
推荐:新项目用 Tokio,除非有特殊原因。
最小 Tokio 程序
#[tokio::main]
async fn main() {
println!("Hello, async!");
}
等价于:
fn main() {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async {
println!("Hello, async!");
})
}
基础概念
Future
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
async/.await
// async fn 返回 impl Future
async fn fetch(url: &str) -> Result<String, Error> {
let response = reqwest::get(url).await?; // 等待完成
let body = response.text().await?;
Ok(body)
}
spawn vs block_on
// block_on:阻塞当前线程
let result = tokio::runtime::Runtime::new().block_on(async { 42 });
// spawn:非阻塞,返回 JoinHandle
let handle = tokio::spawn(async { 42 });
let result = handle.await.unwrap();
常用模式
超时
use tokio::time::{timeout, Duration};
async fn fetch_with_timeout(url: &str) -> Result<String, Error> {
match timeout(Duration::from_secs(5), reqwest::get(url)).await {
Ok(response) => Ok(response?.text().await?),
Err(_) => Err(Error::Timeout),
}
}
重试
use tokio::time::{sleep, Duration};
async fn fetch_with_retry(url: &str, max_retries: u32) -> Result<String, Error> {
let mut retries = 0;
loop {
match reqwest::get(url).await {
Ok(response) => return Ok(response.text().await?),
Err(e) if retries < max_retries => {
retries += 1;
sleep(Duration::from_millis(100 * retries as u64)).await;
}
Err(e) => return Err(e),
}
}
}
并发控制
use tokio::sync::Semaphore;
async fn fetch_all(urls: Vec<&str>) -> Vec<Result<String, Error>> {
let semaphore = Arc::new(Semaphore::new(10)); // 最多 10 个并发
let mut handles = vec![];
for url in urls {
let permit = semaphore.clone().acquire_owned().await.unwrap();
let handle = tokio::spawn(async move {
let result = reqwest::get(url).await?.text().await;
drop(permit); // 释放许可
result
});
handles.push(handle);
}
futures::future::join_all(handles).await
}
取消
use tokio::select;
use tokio::sync::oneshot;
async fn cancellable_operation(mut cancel: oneshot::Receiver<()>) {
loop {
select! {
_ = &mut cancel => {
println!("Cancelled!");
return;
}
_ = do_work() => {
// 继续工作
}
}
}
}
错误处理
async fn 中的 ?
async fn fetch(url: &str) -> Result<String, reqwest::Error> {
let response = reqwest::get(url).await?; // ? 正常工作
Ok(response.text().await?)
}
TryStream
use futures::TryStreamExt;
async fn process_items() -> Result<(), Error> {
let mut stream = fetch_items().await?;
while let Some(item) = stream.try_next().await? {
process(item).await?;
}
Ok(())
}
常见陷阱
1. 阻塞异步运行时
// ❌ 错误:阻塞整个运行时
async fn bad() {
std::thread::sleep(Duration::from_secs(1)); // 阻塞!
}
// ✅ 正确:使用异步 sleep
async fn good() {
tokio::time::sleep(Duration::from_secs(1)).await;
}
// ✅ 正确:CPU 密集任务用 spawn_blocking
async fn cpu_intensive() -> Result<i32, Error> {
tokio::task::spawn_blocking(|| {
// 在这里做 CPU 密集工作
expensive_computation()
}).await?
}
2. 持有锁跨越 await
// ❌ 错误:死锁风险
async fn bad(mutex: Arc<Mutex<Data>>) {
let data = mutex.lock().unwrap();
something_async().await; // 持有锁时 await!
}
// ✅ 正确:使用 async Mutex
async fn good(mutex: Arc<tokio::sync::Mutex<Data>>) {
let data = mutex.lock().await;
something_async().await; // OK
}
3. 忘记 await
// ❌ 错误:future 被丢弃,不执行
async fn bad() {
async_operation(); // 没有.await,不执行!
}
// ✅ 正确
async fn good() {
async_operation().await;
}
4. Send trait 问题
// ❌ 错误:Rc 不是 Send
async fn bad() {
let data = Rc::new(42);
tokio::spawn(async move {
println!("{}", *data); // Rc 不能跨线程
});
}
// ✅ 正确:使用 Arc
async fn good() {
let data = Arc::new(42);
tokio::spawn(async move {
println!("{}", *data); // OK
});
}
性能优化
1. 避免不必要的 clone
// ❌ 每次迭代都 clone
while let Some(item) = items.next().await {
let config = config.clone(); // 不必要的 clone
process(item, &config).await;
}
// ✅ 只 clone 一次
let config = Arc::new(config);
while let Some(item) = items.next().await {
process(item, &config).await;
}
2. 使用 buffer
use futures::stream::{self, StreamExt};
// 批量处理
let results = stream::iter(urls)
.map(|url| async move { fetch(url).await })
.buffer_unordered(10) // 10 个并发
.collect::<Vec<_>>()
.await;
3. 选择正确的 channel
| Channel | 用途 |
|---|---|
oneshot | 单值,一次性 |
mpsc | 多生产者,单消费者 |
broadcast | 多生产者,多消费者 |
watch | 单值,多消费者,只保留最新 |
测试异步代码
tokio::test
#[tokio::test]
async fn test_fetch() {
let result = fetch("https://example.com").await;
assert!(result.is_ok());
}
Mock
use mockall::predicate::*;
use mockall::mock;
mock! {
HttpClient {}
#[tokio::main]
impl HttpClientTrait for HttpClient {
async fn fetch(&self, url: &str) -> Result<String, Error>;
}
}
#[tokio::test]
async fn test_with_mock() {
let mut mock = MockHttpClient::new();
mock.expect_fetch()
.with(eq("https://example.com"))
.returning(|_| Ok("mocked response".to_string()));
let result = mock.fetch("https://example.com").await;
assert_eq!(result.unwrap(), "mocked response");
}
生态工具
常用 crate
| Crate | 用途 |
|---|---|
tokio | 运行时 |
async-std | 另一个运行时 |
futures | Future 工具 |
tokio-stream | Stream 扩展 |
reqwest | HTTP 客户端 |
tokio-postgres | PostgreSQL |
deadpool-postgres | 连接池 |
redis | Redis 客户端 |
监控
use tokio::metrics::RuntimeMetrics;
#[tokio::main]
async fn main() {
let runtime = tokio::runtime::Handle::current();
let metrics = runtime.metrics();
println!("Active tasks: {}", metrics.active_tasks_count());
println!("IO drivers: {}", metrics.io_driver_count());
}
最佳实践总结
- 不要阻塞运行时 - 用
spawn_blocking处理 CPU 密集任务 - 选择正确的同步原语 - 跨 await 用 async Mutex
- 控制并发 - 用 Semaphore 限制并发数
- 处理取消 - 用
select!和 cancellation tokens - 错误处理 - 用
?和TryStream - 测试 - 用
#[tokio::test]和 mock
学习资源
总结
异步 Rust 的关键:
- 选择 Tokio - 生态最好
- 理解 Future - poll 机制
- 避免阻塞 - 不在 async 中阻塞
- 正确使用锁 - 跨 await 用 async Mutex
- 控制并发 - Semaphore、buffer_unordered
异步 Rust 确实复杂,但对于高并发 I/O 场景,它是值得的。