使用 Rustlang 的 Async Tokio 运行时处理 CPU 密集型任务
作者:Andrew Lamb / 用例, 开发者
2022 年 2 月 28 日
导航至
本文最初于 2022 年 1 月 14 日发表在 The New Stack 上,并经许可在此处重新发表。
尽管术语 async
及其与异步网络 I/O 的关联,但这篇博文认为,Rust async
生态系统核心的 Tokio 运行时也是 CPU 密集型作业(例如分析引擎中发现的作业)的理想选择。
什么是 Tokio?
Rust 内置了对异步 (async
) 编程模型的支持,类似于 JavaScript 等语言。
为了充分利用多核和异步 I/O,必须使用运行时。虽然 Rust 社区有多种替代方案,但 Tokio 是事实上的标准。 Tokio.rs 将其描述为:“Rust 编程语言的异步运行时。它提供了编写网络应用程序所需的构建块。”
虽然此描述强调了 Tokio 在网络通信中的应用,但运行时也可用于其他目的,我们将在下面探讨。
为什么将 Tokio 用于 CPU 任务?
事实证明,现代分析引擎总是需要处理来自网络的客户端请求,并使用网络与对象存储系统(如 AWS S3、GCP Cloud Storage 和 Azure Blob Storage)进行通信。
因此,任何以 Rust 实现的此类系统最终都将使用 Tokio 进行网络和至少一部分存储 I/O(是的,我知道,最初 Tokio 异步文件 I/O 并不是真正的异步,但它 即将到来 )。
分析系统还进行 CPU 密集型计算,我将其定义为以消耗大量 CPU 的方式处理数据,用于存储重组、预计算各种索引或直接回答客户端查询。这些计算通常分解为许多独立的块(我将其称为“任务”),然后并行运行这些任务,以利用现代 CPU 中提供的多个内核。
确定何时运行哪些任务通常由称为“任务调度器”的东西完成,该调度器将任务映射到可用的硬件内核/操作系统线程。
在各种类型的任务调度器、工作池、线程池等方面,已经进行了多年的学术和工业工作。
我使用(并实现,我有点惭愧地承认)多个自定义任务调度器的经验是,它们最初很容易工作(比如 99.9% 的时间),但随后需要大量(大量!)时间来处理边缘情况(快速关闭、任务取消、耗尽等)。由于它们使用了较低级别的线程原语,它们也出了名的难以测试,并且竞争条件比比皆是。我不建议这样做。
因此,当在 Rust 生态系统中寻找任务调度器时,正如我们为 InfluxDB IOx 和 DataFusion 所做的那样,您自然而然地会想到 Tokio,而且它看起来相当不错
- 您已经拥有 Tokio(没有新的依赖项)。
- Tokio 实现了 复杂的work-stealing 调度器。
- Tokio 有效地内置了对延续 (
async
/await
) 的语言支持,以及许多相对成熟的流、异步锁定、通道、取消等库。 - Tokio 以 良好的测试 而闻名,并在整个 Rust 生态系统中得到广泛使用。
- Tokio 通常将正在运行的任务和它们在其上运行的 future 保留在同一个执行器线程上,这对于缓存局部性非常有利。
- Tokio 文档齐全、积极维护并且一直在改进。(Tokio 控制台是在我撰写此博客时宣布的 )。
因此,使用 Tokio
作为 CPU 密集型任务的任务调度器是理所当然的,对吗?错!
对使用 Tokio 的常见异议
事实证明,使用 Tokio 是一个相当热门的话题,我想说的是,并非所有人仍然 100% 确信,因此才有了这篇博文。在 DataFusion 和 InfluxDB IOx 的早期,我们 非常担心 这个问题。以下是一些常见的异议
Tokio 文档说不要这样做
旧版本的 Tokio 文档(例如, 1.10.0)包含(在我看来)著名的告诫
“如果您的代码是 CPU 密集型的,并且您希望限制用于运行它的线程数,则应在另一个线程池(如 Rayon)上运行它。”
我相信这种措辞在我们团队内部以及更广泛的 Rust 社区中都引起了极大的困惑。许多人认为这意味着 Tokio Runtime
永远不应用于 CPU 密集型任务。关键点实际上是同一个 Runtime
实例(同一个线程池)不应用于 I/O 和 CPU,并且我们随后澄清了 文档的意图(关于 PR的详细信息)。
顺便说一句,Tokio 文档建议使用 Rayon 处理 CPU 密集型任务。Rayon 对于许多应用程序来说是一个不错的选择,但它不支持 async
,因此如果您的代码必须执行任何 I/O,您将不得不跨越痛苦的同步/异步边界。我还发现,将基于拉取的执行模型(其中任务必须等待所有输入就绪后才能运行)映射到 Rayon 具有挑战性。
尾部延迟会困扰您
智者说:“将 Tokio 用于 CPU 密集型工作会增加您的请求尾部延迟,这是不可接受的。” 但是等等!您可能会说:“尾部延迟?我正在编写数据库,这听起来像是 Web 服务器在高负载下才有的学术问题……”
并非如此:考虑一下活性检查,这对于当今使用容器编排系统(哎 Kubernetes)部署的系统来说是必不可少的。检查您的进程是否运行良好的方法通常是对类似 /health
的内容发出 HTTP 请求。如果该请求位于某个任务队列中,因为 Tokio 正在充分利用您的 CPU 有效地处理大量数据处理任务,则 Kubernetes 不会收到所需的“一切正常”响应,并会杀死您的进程。
这种推理得出的经典结论是,由于尾部延迟至关重要,因此您不能将 Tokio 用于 CPU 密集型任务。
但是,正如 Tokio 文档所建议的那样,为了避免在完全饱和 CPU 的同时被 Kubernetes 和朋友杀死,真正重要的是使用单独的线程池 - 一个用于“延迟很重要”的任务,例如响应 /health
,另一个用于 CPU 密集型任务。这些线程池的最佳线程数因您的需求而异,这是另一篇单独文章的好主题。
也许通过将 Tokio Runtime
视为一个复杂的线程池,使用不同 Runtime
实例的想法可能会显得更可接受,我们将在下面的专用执行器中演示如何做到这一点。
每个任务的高开销
但是“等等!” 我听到您说(或者每个人都在 Hacker News上听到您说),Tokio 具有每个任务的高开销。人们可以制造出比 Tokio 更快地处理微小任务的线程池,我一点也不感到惊讶。
但是,我还没有看到一个我可以信任其生产工作负载的系统,也没有一个具有如此强大的生态系统支持的系统。
值得庆幸的是,对于许多工作负载,可以使用“矢量化处理”来摊销每个任务的开销。这是一种花哨的说法,即每个任务一次处理数千行,而不是一行。您当然不能发疯;您确实需要将您的工作分解为合理大小的块,并且您不能摊销所有工作负载。但是,对于我的应用程序关心的所有实例,Tokio 任务开销都淹没在噪声中。
如何将 Tokio 用于 CPU 密集型任务?
因此,让我们假设我说服了您,使用 Tokio 处理 CPU 密集型工作可能是可以的。您如何做到这一点?
首先,至关重要的是,您的代码需要遵循格言“异步代码不应在没有达到 .await
的情况下花费很长时间”,正如 Alice Ryhl 的帖子中所解释的那样。这是为了让调度器有机会调度其他内容、窃取工作等。
当然,“很长时间”取决于您的应用程序;在优化响应尾部延迟时,Ryhl 建议 10 到 100 微秒。我认为在优化 CPU 时,10 到 100 毫秒对于任务来说也很好。但是,由于我 估计的每个任务的 Tokio 开销 在 ~10 纳秒范围内,因此即使是 10 毫秒的任务,也几乎不可能测量 Tokio Runtime 开销。
其次,在单独的 Runtime
实例上运行您的任务。您如何做到这一点?很高兴您问了。
专用执行器
这是我们在 InfluxDB IOx 中如何在单独的 Tokio Runtime 上运行任务的简化版本。(有关更多详细信息,请参阅 此仓库。)
pub struct DedicatedExecutor {
state: Arc<Mutex<State>>,
}
/// Runs futures (and any `tasks` that are `tokio::task::spawned` by
/// them) on a separate Tokio Executor
struct State {
/// Channel for requests -- the dedicated executor takes requests
/// from here and runs them.
requests: Option<std::sync::mpsc::Sender<Task>>,
/// Thread which has a different Tokio runtime
/// installed and spawns tasks there
thread: Option<std::thread::JoinHandle<()>>,
}
impl DedicatedExecutor {
/// Creates a new `DedicatedExecutor` with a dedicated Tokio
/// executor that is separate from the threadpool created via
/// `[tokio::main]`.
pub fn new(thread_name: &str, num_threads: usize) -> Self {
let thread_name = thread_name.to_string();
let (tx, rx) = std::sync::mpsc::channel::<Task>();
let thread = std::thread::spawn(move || {
// Create a new Runtime to run tasks
let runtime = Tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name(&thread_name)
.worker_threads(num_threads)
// Lower OS priority of worker threads to prioritize main runtime
.on_thread_start(move || set_current_thread_priority_low())
.build()
.expect("Creating Tokio runtime");
// Pull task requests off the channel and send them to the executor
runtime.block_on(async move {
while let Ok(task) = rx.recv() {
Tokio::task::spawn(async move {
task.run().await;
});
}
let state = State {
requests: Some(tx),
thread: Some(thread),
};
Self {
state: Arc::new(Mutex::new(state)),
}
}
此代码创建一个新的 std::thread
,它创建一个单独的多线程 Tokio Runtime
来运行任务,然后从 Channel
读取任务并将它们 spawn
到新的 Runtime
上。
注意:新线程是关键。如果您尝试在主线程或 Tokio 创建的线程之一上创建新的 Runtime
,您将收到错误,因为已经安装了 Runtime
。
这是将任务发送到第二个 Runtime
的相应代码。
impl DedicatedExecutor {
/// Runs the specified Future (and any tasks it spawns) on the
/// `DedicatedExecutor`.
pub fn spawn<T>(&self, task: T) -> Job<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();
let fut = Box::pin(async move {
let task_output = task.await;
tx.send(task_output).ok()
});
let mut state = self.state.lock();
let task = Task {
fut,
};
if let Some(requests) = &mut state.requests {
// would fail if someone has started shutdown
requests.send(task).ok();
} else {
warn!("tried to schedule task on an executor that was shutdown");
}
Job { rx, cancel }
}
}
Job
上面的代码使用 Future
的包装器,称为 Job
,它处理将专用执行器的结果传输回主执行器,如下所示
#[pin_project(PinnedDrop)]
pub struct Job<T> {
#[pin]
rx: Receiver<T>,
}
impl<T> Future for Job<T> {
type Output = Result<T, Error>;
fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
this.rx.poll(cx)
}
}
就是这样!您可以在这个 GitHub gist中找到所有代码。
后续步骤
InfluxData 是开源的坚定拥护者和支持者。InfluxDB IOx 大量使用 Apache Arrow 列式内存格式的 Rust 实现以及 Apache Arrow DataFusion 查询引擎,并为之做出贡献,后者使用 Tokio 执行其查询计划。
我们热爱社区贡献,包括文档和代码。DataFusion 和 Arrow 都有 活跃的社区。欢迎随时过来打招呼。