使用 Rustlang 的 Async Tokio 运行时处理 CPU 密集型任务

导航至

本文最初于 2022 年 1 月 14 日发表在 The New Stack 上,并经许可在此处重新发表。 

尽管术语 async 及其与异步网络 I/O 的关联,但这篇博文认为,Rust async 生态系统核心的 Tokio 运行时也是 CPU 密集型作业(例如分析引擎中发现的作业)的理想选择。

什么是 Tokio?

Rust 内置了对异步 (async) 编程模型的支持,类似于 JavaScript 等语言。

为了充分利用多核和异步 I/O,必须使用运行时。虽然 Rust 社区有多种替代方案,但 Tokio 是事实上的标准。 Tokio.rs 将其描述为:“Rust 编程语言的异步运行时。它提供了编写网络应用程序所需的构建块。”

虽然此描述强调了 Tokio 在网络通信中的应用,但运行时也可用于其他目的,我们将在下面探讨。

为什么将 Tokio 用于 CPU 任务?

事实证明,现代分析引擎总是需要处理来自网络的客户端请求,并使用网络与对象存储系统(如 AWS S3、GCP Cloud Storage 和 Azure Blob Storage)进行通信。

因此,任何以 Rust 实现的此类系统最终都将使用 Tokio 进行网络和至少一部分存储 I/O(是的,我知道,最初 Tokio 异步文件 I/O 并不是真正的异步,但它 即将到来 )。

分析系统还进行 CPU 密集型计算,我将其定义为以消耗大量 CPU 的方式处理数据,用于存储重组、预计算各种索引或直接回答客户端查询。这些计算通常分解为许多独立的块(我将其称为“任务”),然后并行运行这些任务,以利用现代 CPU 中提供的多个内核。

确定何时运行哪些任务通常由称为“任务调度器”的东西完成,该调度器将任务映射到可用的硬件内核/操作系统线程。

在各种类型的任务调度器、工作池、线程池等方面,已经进行了多年的学术和工业工作。

我使用(并实现,我有点惭愧地承认)多个自定义任务调度器的经验是,它们最初很容易工作(比如 99.9% 的时间),但随后需要大量(大量!)时间来处理边缘情况(快速关闭、任务取消、耗尽等)。由于它们使用了较低级别的线程原语,它们也出了名的难以测试,并且竞争条件比比皆是。我不建议这样做。

因此,当在 Rust 生态系统中寻找任务调度器时,正如我们为 InfluxDB IOx 和 DataFusion 所做的那样,您自然而然地会想到 Tokio,而且它看起来相当不错

  1. 您已经拥有 Tokio(没有新的依赖项)。
  2. Tokio 实现了 复杂的work-stealing 调度器
  3. Tokio 有效地内置了对延续 (async/await) 的语言支持,以及许多相对成熟的流、异步锁定、通道、取消等库。
  4. Tokio 以 良好的测试 而闻名,并在整个 Rust 生态系统中得到广泛使用。
  5. Tokio 通常将正在运行的任务和它们在其上运行的 future 保留在同一个执行器线程上,这对于缓存局部性非常有利。
  6. Tokio 文档齐全、积极维护并且一直在改进。(Tokio 控制台是在我撰写此博客时宣布的 )。

因此,使用 Tokio 作为 CPU 密集型任务的任务调度器是理所当然的,对吗?错!

对使用 Tokio 的常见异议

事实证明,使用 Tokio 是一个相当热门的话题,我想说的是,并非所有人仍然 100% 确信,因此才有了这篇博文。在 DataFusion 和 InfluxDB IOx 的早期,我们 非常担心 这个问题。以下是一些常见的异议

Tokio 文档说不要这样做

旧版本的 Tokio 文档(例如, 1.10.0)包含(在我看来)著名的告诫

“如果您的代码是 CPU 密集型的,并且您希望限制用于运行它的线程数,则应在另一个线程池(如 Rayon)上运行它。”

我相信这种措辞在我们团队内部以及更广泛的 Rust 社区中都引起了极大的困惑。许多人认为这意味着 Tokio Runtime 永远不应用于 CPU 密集型任务。关键点实际上是同一个 Runtime实例(同一个线程池)不应用于 I/O 和 CPU,并且我们随后澄清了 文档的意图(关于 PR的详细信息)。

顺便说一句,Tokio 文档建议使用 Rayon 处理 CPU 密集型任务。Rayon 对于许多应用程序来说是一个不错的选择,但它不支持 async,因此如果您的代码必须执行任何 I/O,您将不得不跨越痛苦的同步/异步边界。我还发现,将基于拉取的执行模型(其中任务必须等待所有输入就绪后才能运行)映射到 Rayon 具有挑战性。

尾部延迟会困扰您

智者说:“将 Tokio 用于 CPU 密集型工作会增加您的请求尾部延迟,这是不可接受的。” 但是等等!您可能会说:“尾部延迟?我正在编写数据库,这听起来像是 Web 服务器在高负载下才有的学术问题……”

并非如此:考虑一下活性检查,这对于当今使用容器编排系统(哎 Kubernetes)部署的系统来说是必不可少的。检查您的进程是否运行良好的方法通常是对类似 /health 的内容发出 HTTP 请求。如果该请求位于某个任务队列中,因为 Tokio 正在充分利用您的 CPU 有效地处理大量数据处理任务,则 Kubernetes 不会收到所需的“一切正常”响应,并会杀死您的进程。

这种推理得出的经典结论是,由于尾部延迟至关重要,因此您不能将 Tokio 用于 CPU 密集型任务。

但是,正如 Tokio 文档所建议的那样,为了避免在完全饱和 CPU 的同时被 Kubernetes 和朋友杀死,真正重要的是使用单独的线程池 - 一个用于“延迟很重要”的任务,例如响应 /health,另一个用于 CPU 密集型任务。这些线程池的最佳线程数因您的需求而异,这是另一篇单独文章的好主题。

也许通过将 Tokio Runtime 视为一个复杂的线程池,使用不同 Runtime 实例的想法可能会显得更可接受,我们将在下面的专用执行器中演示如何做到这一点。

每个任务的高开销

但是“等等!” 我听到您说(或者每个人都在 Hacker News上听到您说),Tokio 具有每个任务的高开销。人们可以制造出比 Tokio 更快地处理微小任务的线程池,我一点也不感到惊讶。

但是,我还没有看到一个我可以信任其生产工作负载的系统,也没有一个具有如此强大的生态系统支持的系统。

值得庆幸的是,对于许多工作负载,可以使用“矢量化处理”来摊销每个任务的开销。这是一种花哨的说法,即每个任务一次处理数千行,而不是一行。您当然不能发疯;您确实需要将您的工作分解为合理大小的块,并且您不能摊销所有工作负载。但是,对于我的应用程序关心的所有实例,Tokio 任务开销都淹没在噪声中。

如何将 Tokio 用于 CPU 密集型任务?

因此,让我们假设我说服了您,使用 Tokio 处理 CPU 密集型工作可能是可以的。您如何做到这一点?

首先,至关重要的是,您的代码需要遵循格言“异步代码不应在没有达到 .await 的情况下花费很长时间”,正如 Alice Ryhl 的帖子中所解释的那样。这是为了让调度器有机会调度其他内容、窃取工作等。

当然,“很长时间”取决于您的应用程序;在优化响应尾部延迟时,Ryhl 建议 10 到 100 微秒。我认为在优化 CPU 时,10 到 100 毫秒对于任务来说也很好。但是,由于我 估计的每个任务的 Tokio 开销 在 ~10 纳秒范围内,因此即使是 10 毫秒的任务,也几乎不可能测量 Tokio Runtime 开销。

其次,在单独的 Runtime 实例上运行您的任务。您如何做到这一点?很高兴您问了。

专用执行器

这是我们在 InfluxDB IOx 中如何在单独的 Tokio Runtime 上运行任务的简化版本。(有关更多详细信息,请参阅 此仓库。)

pub struct DedicatedExecutor {
    state: Arc<Mutex<State>>,                                                                                                          
}                                                                                                                                      

/// Runs futures (and any `tasks` that are `tokio::task::spawned` by                                                                   
/// them) on a separate Tokio Executor                                                                                                 
struct State {                                                                                                    
    /// Channel for requests -- the dedicated executor takes requests                                                                  
    /// from here and runs them.                                                                                                       
    requests: Option<std::sync::mpsc::Sender<Task>>,                                                                                   

    /// Thread which has a different Tokio runtime
    /// installed and spawns tasks there                                                                                            
    thread: Option<std::thread::JoinHandle<()>>,                                                                                       
}                                               

impl DedicatedExecutor {                                                                                                               
    /// Creates a new `DedicatedExecutor` with a dedicated Tokio                                                                       
    /// executor that is separate from the threadpool created via                                                                      
    /// `[tokio::main]`.                                                                                                    
    pub fn new(thread_name: &str, num_threads: usize) -> Self {                                                                        
        let thread_name = thread_name.to_string();                                                                                     

        let (tx, rx) = std::sync::mpsc::channel::<Task>();                                                                             

        let thread = std::thread::spawn(move || { 
            // Create a new Runtime to run tasks                                                                                                                                                                                                                                
            let runtime = Tokio::runtime::Builder::new_multi_thread()                                                                  
                .enable_all()                                                                                                          
                .thread_name(&thread_name)                                                                                             
                .worker_threads(num_threads)
                // Lower OS priority of worker threads to prioritize main runtime                                                                                                                                                          
                .on_thread_start(move || set_current_thread_priority_low())                                                 
                .build()                                                                                                               
                .expect("Creating Tokio runtime");                                                                                     

         // Pull task requests off the channel and send them to the executor                                                                                                                                                         
         runtime.block_on(async move {                                                                                                        
                while let Ok(task) = rx.recv() {                                                                                                                                                                                                                              
                    Tokio::task::spawn(async move {                                                                                    
                        task.run().await;                                                                                              
                    });                                                                                                                
                }                                                                

        let state = State {                                                                                                            
            requests: Some(tx),                                                                                                        
            thread: Some(thread),                                                                                                      
        };                                                                                                                             

        Self {                                                                                                                         
            state: Arc::new(Mutex::new(state)),                                                                                        
        }                                                                                                                              
    }

此代码创建一个新的 std::thread,它创建一个单独的多线程 Tokio Runtime 来运行任务,然后从 Channel 读取任务并将它们 spawn 到新的 Runtime 上。

注意:新线程是关键。如果您尝试在主线程或 Tokio 创建的线程之一上创建新的 Runtime,您将收到错误,因为已经安装了 Runtime

这是将任务发送到第二个 Runtime 的相应代码。

impl DedicatedExecutor {                                                                                                               

    /// Runs the specified Future (and any tasks it spawns) on the                                                                     
    /// `DedicatedExecutor`.                                                                        
    pub fn spawn<T>(&self, task: T) -> Job<T::Output>                                                                                  
    where                                                                                                                              
        T: Future + Send + 'static,                                                                                                    
        T::Output: Send + 'static,                                                                                                     
    {                                                                                                                                  
        let (tx, rx) = tokio::sync::oneshot::channel();                                                                                

        let fut = Box::pin(async move {                                                                                                
            let task_output = task.await;                                                                                              
            tx.send(task_output).ok()                                                                                                                      
        });                                                                                                                            
        let mut state = self.state.lock();                                                                                             
        let task = Task {                                                                                                              
            fut,                                                                                                                       
        };                                                                                                                             

        if let Some(requests) = &mut state.requests {                                                                                  
            // would fail if someone has started shutdown                                                                              
            requests.send(task).ok();                                                                                                  
        } else {                                                                                                                       
            warn!("tried to schedule task on an executor that was shutdown");                                                          
        }                                                                                                                              

        Job { rx, cancel }                                                                                                             
    }  
 }

Job

上面的代码使用 Future 的包装器,称为 Job ,它处理将专用执行器的结果传输回主执行器,如下所示

#[pin_project(PinnedDrop)]                                                                                                             
pub struct Job<T> {                                                                                                  
    #[pin]                                                                                                                             
    rx: Receiver<T>,                                                                                                                   
}                                                                                                                                      

impl<T> Future for Job<T> {                                                                                                            
    type Output = Result<T, Error>;                                                                                                    

    fn poll(                                                                                                                           
        self: Pin<&mut Self>,                                                                                                          
        cx: &mut std::task::Context<'_>,                                                                                               
    ) -> std::task::Poll<Self::Output> {                                                                                               
        let this = self.project();                                                                                                     
        this.rx.poll(cx)                                                                                                               
    }                                                                                                                                  
}

就是这样!您可以在这个 GitHub gist中找到所有代码。

后续步骤

InfluxData 是开源的坚定拥护者和支持者。InfluxDB IOx 大量使用 Apache Arrow 列式内存格式的 Rust 实现以及 Apache Arrow DataFusion 查询引擎,并为之做出贡献,后者使用 Tokio 执行其查询计划。

我们热爱社区贡献,包括文档和代码。DataFusion 和 Arrow 都有 活跃的社区。欢迎随时过来打招呼。