使用Rustlang的Async Tokio运行时处理CPU密集型任务

导航至

本文最初于2022年1月14日发表在《The New Stack》上,经授权在此重发。

尽管“异步”一词及其与异步网络I/O的关联,但本文认为Rust异步生态系统核心的Tokio运行时也适合用于CPU密集型任务,如分析引擎中发现的任务。

什么是Tokio?

Rust内置了对异步(async)编程模型的支持,类似于JavaScript等语言。

为了充分利用多核心和异步I/O,必须使用运行时,虽然Rust社区有几种替代方案,但Tokio是事实上的标准。Tokio.rs将其描述为:“Rust编程语言的异步运行时。它提供了编写网络应用程序所需的构建块。”

尽管此描述强调了Tokio在网络通信方面的应用,但运行时也可以用于其他目的,如下文所述。

为什么要在CPU任务中使用Tokio?

现代分析引擎不可避免地需要处理来自网络的客户端请求,以及使用网络与对象存储系统(如AWS S3、GCP Cloud Storage和Azure Blob Storage)通信。

因此,任何用Rust实现的此类系统最终都会使用Tokio进行网络通信,以及至少部分存储I/O(是的,我知道,最初Tokio的异步文件I/O并不是真正的异步,但它即将到来)。

分析系统也进行CPU密集型计算,我将其定义为以消耗大量CPU进行存储重组、预计算各种索引或直接回答客户端查询的方式来处理数据。这些计算通常被分解成许多独立的块,我将其称为“任务”,然后并行运行以利用现代CPU中可用的众多核心。

确定何时运行哪些任务通常由称为“任务调度器”的东西来完成,它将任务映射到可用的硬件核心/操作系统线程。

在任务调度器、工作者池、线程池等不同类型方面,已经有许多年学术和工业界的研究工作。

根据我在几个自定义任务调度器(以及实现,我不得不承认有些尴尬)上的工作经验,它们最初启动起来很容易(99.9% 的时间),但随后需要大量的时间来解决边缘情况(快速关闭、任务取消、资源释放等)。由于它们使用了底层的线程原语,因此测试起来也特别困难,竞争条件随处可见。我不建议这样做。

因此,当我们为 InfluxDB IOx 和 DataFusion 寻找任务调度器时,正如我们之前所做的那样,自然会考虑 Tokio,而且它看起来相当不错。

  1. 您已经拥有 Tokio(无需新的依赖项)。
  2. Tokio 实现了一个复杂的任务窃取调度器
  3. Tokio 内置了对语言的延续性支持(async/await),并且拥有许多相对成熟的库,用于流、异步锁定、通道、取消等。
  4. Tokio 著名的是经过充分测试并且被 Rust 生态系统广泛使用。
  5. Tokio 通常会将正在运行的任务和它们运行的未来保持在同一个执行器线程上,这对缓存局部性来说是个优点。
  6. Tokio 有很好的文档,积极维护,并且一直在不断改进。(在我撰写这篇博客时Tokio 控制台已经宣布)。

因此,将 Tokio 用作 CPU 重量级任务的调度器似乎是顺理成章的?大错特错!

使用 Tokio 的常见反对意见

使用 Tokio 是一个相当热门的话题,我认为至今仍没有所有人都100%信服,因此我写了这篇博客。在 DataFusion 和 InfluxDB IOx 的早期阶段,我们非常担忧了这个问题。以下是一些常见的反对意见。

Tokio 文档说不

较旧版本的 Tokio 文档(例如,1.10.0)中包含了我心中著名的警告:

“如果你的代码是 CPU 密集型的,并且希望限制运行它所使用的线程数,你应该在另一个线程池(如 Rayon)上运行它。”

我认为这种措辞在我们团队以及更广泛的 Rust 社区中引起了很大的困惑。许多人将其解读为 Tokio 的 Runtime 从来不应该用于 CPU 密集型任务。关键点实际上在于不应使用相同的 Runtime实例(即相同的线程池)同时用于 I/O 和 CPU,我们随后在文档中澄清了 了意图(关于PR的详细内容)。

顺便说一句,Tokio 文档建议使用Rayon 进行 CPU 密集型任务。对于许多应用来说,Rayon 是一个很好的选择,但它不支持 async,因此如果你的代码需要进行任何 I/O,你将不得不跨越痛苦的同步/异步边界。我还发现将基于拉取的执行模型映射到 Rayon 上具有挑战性,在这种模型中,任务必须等待所有输入都准备就绪后才能运行。

尾部延迟将跟踪你

智者曾说:“使用Tokio进行CPU密集型工作会增加你的请求尾部延迟,这是不可接受的。”但等等!你可能说:“尾部延迟?我正在写一个数据库,这听起来像是针对高负载的Web服务器的学术问题……”

其实并非如此:考虑一下生存性检查,这是现在使用容器编排系统(比如Kubernetes)部署的系统所必需的。检查你的进程是否表现良好的通常是向类似/health的HTTP请求。如果这个请求因为Tokio正高效地使用你的CPU来处理大量数据处理任务而坐在某个任务队列中,Kubernetes将无法收到所需的“一切正常”响应,并会杀死你的进程。

这种推理导致了一个经典的结论:由于尾部延迟至关重要,你不能使用Tokio进行CPU密集型任务。

然而,正如Tokio文档所建议的,为了避免在完全饱和CPU的同时被Kubernetes及其同类程序“炸毁”,真正重要的是要使用单独的线程池——一个用于“延迟很重要”的任务,比如响应/health,另一个用于CPU密集型任务。这些线程池的最优线程数取决于你的需求,这是另一个单独文章的好主题。

也许通过将Tokio的Runtime视为一个复杂的线程池,使用不同Runtime实例的想法可能看起来更容易接受,以下我们将展示如何使用专门的执行器来实现这一点。

每项请求的高开销

但是,“等等!”我听到你这么说(或者每个人都在Hacker News上这么说),Tokio有很高的每任务开销。我并不惊讶人们可以创建出比Tokio更快处理微任务的线程池。

然而,我还没有看到这样一个我可以信赖用于我的生产工作负载的系统,也没有一个具有如此强大的生态系统支持的系统。

幸运的是,对于许多工作负载,可以通过“向量化处理”来摊销每任务的开销。这是一个术语,意思是指每个任务一次处理数千行数据,而不是一行。当然,你不能做得太过分;你需要将工作分成合理的块,并且不是所有的任务都可以摊销。然而,对于我应用程序关心的所有实例,Tokio的任务开销都微不足道。

如何使用Tokio进行CPU密集型任务?

所以,让我们假设我说服了你使用Tokio进行CPU密集型工作可能是可以的。你该如何做到这一点呢?

首先,最重要的是,你的代码需要遵循“异步代码不应长时间不达到.await”的原则,正如Alice Ryhl在其博文中解释的那样。这是为了让调度程序有机会安排其他任务,窃取工作等。

当然,“长时间”取决于你的应用程序;Ryhl建议在优化响应尾部延迟时使用10到100微秒。我认为在优化CPU时,10到100毫秒也是可以接受的。然而,鉴于我估计的每任务Tokio开销大约在10纳秒范围内,用10毫秒的任务甚至几乎无法测量Tokio Runtime的开销。

其次,在单独的Runtime实例上运行你的任务。你怎么做?很高兴你问了。

专用执行器

以下是InfluxDB IOx中在单独的Tokio运行时上运行任务的简化版本。(更多详情,请参阅这个仓库。)

pub struct DedicatedExecutor {
    state: Arc<Mutex<State>>,                                                                                                          
}                                                                                                                                      

/// Runs futures (and any `tasks` that are `tokio::task::spawned` by                                                                   
/// them) on a separate Tokio Executor                                                                                                 
struct State {                                                                                                    
    /// Channel for requests -- the dedicated executor takes requests                                                                  
    /// from here and runs them.                                                                                                       
    requests: Option<std::sync::mpsc::Sender<Task>>,                                                                                   

    /// Thread which has a different Tokio runtime
    /// installed and spawns tasks there                                                                                            
    thread: Option<std::thread::JoinHandle<()>>,                                                                                       
}                                               

impl DedicatedExecutor {                                                                                                               
    /// Creates a new `DedicatedExecutor` with a dedicated Tokio                                                                       
    /// executor that is separate from the threadpool created via                                                                      
    /// `[tokio::main]`.                                                                                                    
    pub fn new(thread_name: &str, num_threads: usize) -> Self {                                                                        
        let thread_name = thread_name.to_string();                                                                                     

        let (tx, rx) = std::sync::mpsc::channel::<Task>();                                                                             

        let thread = std::thread::spawn(move || { 
            // Create a new Runtime to run tasks                                                                                                                                                                                                                                
            let runtime = Tokio::runtime::Builder::new_multi_thread()                                                                  
                .enable_all()                                                                                                          
                .thread_name(&thread_name)                                                                                             
                .worker_threads(num_threads)
                // Lower OS priority of worker threads to prioritize main runtime                                                                                                                                                          
                .on_thread_start(move || set_current_thread_priority_low())                                                 
                .build()                                                                                                               
                .expect("Creating Tokio runtime");                                                                                     

         // Pull task requests off the channel and send them to the executor                                                                                                                                                         
         runtime.block_on(async move {                                                                                                        
                while let Ok(task) = rx.recv() {                                                                                                                                                                                                                              
                    Tokio::task::spawn(async move {                                                                                    
                        task.run().await;                                                                                              
                    });                                                                                                                
                }                                                                

        let state = State {                                                                                                            
            requests: Some(tx),                                                                                                        
            thread: Some(thread),                                                                                                      
        };                                                                                                                             

        Self {                                                                                                                         
            state: Arc::new(Mutex::new(state)),                                                                                        
        }                                                                                                                              
    }

此代码创建一个新的std::thread,它创建一个单独的多线程Tokio Runtime来运行任务,然后从Channel读取任务并在新的Runtime上启动它们。

注意:新线程是关键。如果您尝试在主线程或Tokio创建的线程之一上创建新的Runtime,您将遇到错误,因为已经安装了一个Runtime

以下是向第二个Runtime发送任务的相应代码。

impl DedicatedExecutor {                                                                                                               

    /// Runs the specified Future (and any tasks it spawns) on the                                                                     
    /// `DedicatedExecutor`.                                                                        
    pub fn spawn<T>(&self, task: T) -> Job<T::Output>                                                                                  
    where                                                                                                                              
        T: Future + Send + 'static,                                                                                                    
        T::Output: Send + 'static,                                                                                                     
    {                                                                                                                                  
        let (tx, rx) = tokio::sync::oneshot::channel();                                                                                

        let fut = Box::pin(async move {                                                                                                
            let task_output = task.await;                                                                                              
            tx.send(task_output).ok()                                                                                                                      
        });                                                                                                                            
        let mut state = self.state.lock();                                                                                             
        let task = Task {                                                                                                              
            fut,                                                                                                                       
        };                                                                                                                             

        if let Some(requests) = &mut state.requests {                                                                                  
            // would fail if someone has started shutdown                                                                              
            requests.send(task).ok();                                                                                                  
        } else {                                                                                                                       
            warn!("tried to schedule task on an executor that was shutdown");                                                          
        }                                                                                                                              

        Job { rx, cancel }                                                                                                             
    }  
 }

工作

上面的代码使用了一个名为JobFuture包装器,该包装器处理将专用执行器的结果传递回主执行器,其外观如下所示

#[pin_project(PinnedDrop)]                                                                                                             
pub struct Job<T> {                                                                                                  
    #[pin]                                                                                                                             
    rx: Receiver<T>,                                                                                                                   
}                                                                                                                                      

impl<T> Future for Job<T> {                                                                                                            
    type Output = Result<T, Error>;                                                                                                    

    fn poll(                                                                                                                           
        self: Pin<&mut Self>,                                                                                                          
        cx: &mut std::task::Context<'_>,                                                                                               
    ) -> std::task::Poll<Self::Output> {                                                                                               
        let this = self.project();                                                                                                     
        this.rx.poll(cx)                                                                                                               
    }                                                                                                                                  
}

就是这样!您可以在GitHub gist中找到所有代码。

下一步

InfluxData是开源的坚定信仰者和支持者。InfluxDB IOx在Apache Arrow列式内存格式和Apache Arrow DataFusion查询引擎的实现中做出了重要贡献,该查询引擎使用Tokio来执行查询计划。

我们热爱社区贡献,包括文档和代码。DataFusion和Arrow都有充满活力的社区。随时欢迎来打个招呼。