在 Rust 的异步生态中,常用 Tokio 或 async-std 作为运行时。
当系统进入高并发场景时,仅使用 async/await 还不够,还需要 并发控制、任务调度和超时机制。
下面是 Rust 异步编程高级模式的核心内容。
一、Rust 异步并发模型
Rust 的 async 模型基于 Future + Executor:
Future -> 表示异步任务
Executor -> 负责调度任务
Runtime -> 提供 I/O + 调度
示例:
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
sleep(Duration::from_secs(1)).await;
println!("done");
}
这里:
Future表示任务tokio runtime负责调度
二、并发控制(Concurrency Control)
高并发系统必须限制任务数量,否则会:
- 内存爆炸
- IO 资源耗尽
- 线程调度过载
Rust 常用 三种并发控制方式。
1 Semaphore(信号量)
使用 Tokio 的 Semaphore 限制并发数。
示例:
use tokio::sync::Semaphore;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let semaphore = Arc::new(Semaphore::new(3));
for i in 0..10 {
let permit = semaphore.clone().acquire_owned().await.unwrap();
tokio::spawn(async move {
println!("task {} running", i);
drop(permit);
});
}
}
作用:
最多允许3个任务同时执行
适合场景:
- HTTP 请求限制
- 数据库连接控制
- API 限流
2 限制并发任务(buffer_unordered)
在 Futures 中可以使用:
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let results = stream::iter(0..10)
.map(|i| async move { i * 2 })
.buffer_unordered(3)
.collect::<Vec<_>>()
.await;
println!("{:?}", results);
}
作用:
最多3个任务并发执行
优点:
- 写法优雅
- 自动调度
3 Mutex / RwLock
当任务需要共享数据时:
use tokio::sync::Mutex;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let counter = Arc::new(Mutex::new(0));
for _ in 0..10 {
let c = counter.clone();
tokio::spawn(async move {
let mut num = c.lock().await;
*num += 1;
});
}
}
适合:
- 共享状态
- 计数器
- 缓存
三、任务超时机制(Timeout)
在网络系统中必须有 超时控制,否则任务可能 永远等待。
Rust 中通常使用 tokio::time::timeout。
1 timeout
use tokio::time::{timeout, Duration};
async fn long_task() {
tokio::time::sleep(Duration::from_secs(5)).await;
}
#[tokio::main]
async fn main() {
let result = timeout(Duration::from_secs(2), long_task()).await;
match result {
Ok(_) => println!("completed"),
Err(_) => println!("timeout"),
}
}
输出:
timeout
四、select! 并发等待模式
select! 是 Rust async 的 核心并发工具。
示例:
use tokio::select;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
select! {
_ = sleep(Duration::from_secs(2)) => {
println!("timer finished");
}
_ = sleep(Duration::from_secs(5)) => {
println!("long task finished");
}
}
}
结果:
timer finished
用途:
- 超时控制
- 多任务竞争
- 网络事件监听
五、任务取消(Cancellation)
Rust Future 可以被取消:
drop Future -> 任务终止
示例:
let handle = tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(10)).await;
});
handle.abort();
作用:
主动取消任务
六、生产级并发模式
真实系统常见架构:
task queue
↓
worker pool
↓
semaphore limit
↓
timeout control
Rust 实现示例:
任务队列
↓
Tokio worker
↓
Semaphore 限制
↓
timeout 防止卡死
七、完整并发示例
use tokio::{sync::Semaphore, time::{timeout, Duration}};
use std::sync::Arc;
async fn job(i: i32) {
tokio::time::sleep(Duration::from_secs(1)).await;
println!("job {}", i);
}
#[tokio::main]
async fn main() {
let sem = Arc::new(Semaphore::new(5));
for i in 0..20 {
let permit = sem.clone().acquire_owned().await.unwrap();
tokio::spawn(async move {
let _ = timeout(Duration::from_secs(2), job(i)).await;
drop(permit);
});
}
}
效果:
最大并发5
任务超时2秒
八、Rust 异步并发最佳实践
1 使用 Tokio runtime
Tokio = Rust async 标准生态
2 控制并发数量
推荐:
Semaphore
buffer_unordered
3 所有 IO 必须有超时
例如:
HTTP
数据库
RPC
4 避免 async 中阻塞
错误示例:
std::thread::sleep()
正确:
tokio::time::sleep()
九、Rust 高并发架构示意
HTTP Server
│
Tokio Runtime
│
Task Scheduler
│
Semaphore Limit
│
Async IO
Rust 的 async 架构优势:
- 零成本抽象
- 内存安全
- 高并发性能
✅ 总结
Rust 异步高级模式核心:
并发控制
Semaphore
buffer_unordered
Mutex / RwLock
超时机制
timeout
select!
任务取消
abort
这些组合可以构建 高并发服务(API / 爬虫 / 网关 / 微服务)。
如果你愿意,我可以继续讲一个 Rust 高并发服务器完整架构(Tokio + Hyper + Tower + Axum),这基本是目前 Rust Web 后端的主流设计。