在 Apache Arrow DataFusion 中快速聚合数百万组数据

导航至

TLDR(太长不看)

分组聚合是任何分析工具的核心部分,它可以创建对海量数据的易于理解的摘要。Apache Arrow DataFusion 的并行聚合功能在 28.0.0 版本中,对于具有大量(10,000 个或更多)组的查询,速度提高了 2-3 倍。

改进聚合性能对我们 DataFusion 的用户至关重要。InfluxDB,一个 时间序列数据平台,和 Coralogix,一个 全栈可观测性 平台,都聚合了大量的原始数据,以监控并为我们的客户创建洞察。提高 DataFusion 的性能使我们能够通过更快地生成洞察并减少资源消耗,从而提供更好的用户体验。由于 DataFusion 是开源的,并根据宽松的 Apache 2.0 许可证发布,因此整个 DataFusion 社区都将受益。

通过新的优化,DataFusion 的分组速度现在已接近 DuckDB,该系统经常报告 出色分组 基准测试性能数据。图 1 包含在单个 Parquet 文件上运行的 ClickBench 的代表性样本,完整结果在本文章末尾。

ClickBench-single Parquet file

图 1: 在单个 Parquet 文件上对 DataFusion 27.0.0、DataFusion 28.0.0 和 DuckDB 0.8.1 运行的 ClickBench 查询 16、17、18 和 19 的查询性能。

高基数分组简介

聚合是一个花哨的词汇,用于计算在具有一个或多个列中相同值的多行数据上的摘要统计信息。我们将具有相同值的行称为,“高基数”意味着数据集中存在大量不同的组。在撰写本文时,分析引擎中的“大量”组约为 10,000 个。

例如,ClickBench hits 数据集包含 1 亿次匿名用户点击,这些点击分布在一系列网站上。ClickBench 查询 17 是

SELECT "UserID", "SearchPhrase", COUNT(*) 
FROM hits
GROUP BY "UserID", "SearchPhrase" 
ORDER BY COUNT(*) 
DESC LIMIT 10;

用英语来说,此查询查找“所有点击中排名前十的(用户,搜索短语)组合”,并生成以下结果(前十名用户没有搜索短语)

用户 ID 搜索短语 计数 (UInt8(1))
1313338681122956954   29097
1907779576417363396   25333
2305303682471783379   10597
7982623143712728547   6669
7280399273658728997   6408
1090981537032625727   6196
5730251990344211405   6019
6018350421959114808   5990
835157184735512989   5209
770542365400669095   4906

ClickBench 数据集包含

  • 99,997,497 行总数 1

  • 17,630,976 个不同的用户(不同的用户 ID) 2

  • 6,019,103 个不同的搜索短语 3

  • 24,070,560 种不同的 (用户 ID, 搜索短语) 组合 4

因此,为了回答该查询,DataFusion 必须将 1 亿个不同的输入行中的每一个映射到 2400 万个不同的组 之一,并记录每个组中有多少行。

解决方案

与数据库和其他分析系统中的大多数概念一样,此算法的基本思想很简单,并在计算机科学入门课程中教授。您可以使用如下程序计算查询 5

import pandas as pd
from collections import defaultdict
from operator import itemgetter

# read file
hits = pd.read_parquet('hits.parquet', engine='pyarrow')

# build groups
counts = defaultdict(int)
for index, row in hits.iterrows():
    group = (row['UserID'], row['SearchPhrase']);
    # update the dict entry for the corresponding key
    counts[group] += 1

# Print the top 10 values
print (dict(sorted(counts.items(), key=itemgetter(1), reverse=True)[:10]))

这种方法虽然简单,但速度慢且内存效率非常低。计算不到 1% 的数据集的结果需要 40 多秒 6。DataFusion 28.0.0 和 DuckDB 0.8.1 都在 10 秒内计算出整个数据集的结果。

为了快速有效地回答此查询,您必须编写代码,使其能够

  1. 通过并行计算保持所有核心繁忙地进行聚合

  2. 快速更新聚合值,使用向量化循环,编译器很容易将其转换为现代 CPU 中可用的高性能 SIMD 指令。

本文的其余部分解释了 DataFusion 中的分组工作原理以及我们在 28.0.0 中所做的改进。

两阶段并行分区分组

DataFusion 27 和 28 都使用了最先进的两阶段并行哈希分区分组,类似于其他高性能向量化引擎,如 DuckDB 的并行分组聚合。在图中,它看起来像

parallel partitioned grouping

图 2: 两阶段重分区分组:数据从底部(源)到顶部(结果)分两个阶段流动。首先(步骤 1 和 2),每个核心将数据读取到核心特定的哈希表中,计算中间聚合,而无需任何跨核心协调。然后(步骤 3 和 4),DataFusion 按组值将数据(“重分区”)划分为不同的子集,每个子集都发送到特定的核心,该核心计算最终聚合。

这两个阶段对于在多核系统中保持核心繁忙至关重要。两个阶段都使用相同的哈希表方法(在下一节中解释),但组的分布方式和累加器发出的部分结果有所不同。第一阶段在数据生成后尽快聚合数据。但是,如图 2 所示,组可以位于任何输入中的任何位置,因此通常在许多不同的核心上找到相同的组。第二阶段使用哈希函数将数据均匀地重新分配到各个核心,因此每个组值都由一个核心处理,该核心发出该组的最终结果。

Core-A-Core-B

图 3: 聚合阶段中跨 2 个核心的组值分布。在第一阶段,每个核心处理的输入流中都存在每个组值 1, 2, 3, 4,。在第二阶段,重分区后,组值 12 由核心 A 处理,值 34 仅由核心 B 处理。

DataFusion 实现 中还有一些未在上面提及的其他细微之处,由于篇幅限制,例如

  1. 何时从第一阶段的哈希表中发出数据的策略(例如,因为数据是部分排序的)

  2. 处理每个聚合的特定过滤器(由于 FILTER SQL 子句)

  3. 中间值的数据类型(对于某些聚合(如 AVG),中间值的数据类型可能与最终输出不同)。

  4. 当内存使用量超过预算时采取的措施。

哈希分组

DataFusion 查询可以为每个组计算许多不同的聚合函数,包括 内置 和/或用户定义的 AggregateUDF。每个聚合函数的状态(称为累加器)都使用哈希表进行跟踪(DataFusion 使用出色的 HashBrown RawTable API),该哈希表逻辑上存储标识特定组值的“索引”。

27.0.0 中的哈希分组

如图 3 所示,DataFusion 27.0.0 将数据存储在 GroupState 结构中,不出所料,该结构跟踪每个组的状态。每个组的状态包括

  1. 组列的实际值,采用 Arrow Row 格式。

  2. 正在进行的累积(例如,COUNT 聚合的运行计数)对于每个组,采用两种可能的格式之一 AccumulatorRowAccumulator

  3. 用于跟踪哪些行与每个批次中每个聚合匹配的暂存空间。

Hash grouping in 27-0-0

图 4: DataFusion 27.0.0 中的哈希组运算符结构。哈希表将每个组映射到 GroupState,其中包含所有按组状态。

为了计算聚合,DataFusion 对每个输入批次执行以下步骤

  1. 使用 高效的向量化代码 计算哈希,该代码针对每种数据类型进行了专门优化。

  2. 使用哈希表确定每个输入行的组索引(为新看到的组创建新条目)。

  3. 更新 Accumulators 对于每个具有输入行的组, 如果有足够数量的行,则将这些行组装成连续的范围以进行向量化累加器处理。

DataFusion 还在表中存储哈希值,以避免在调整哈希表大小时可能代价高昂的哈希重新计算。

对于相对较少数量的不同组,此方案效果非常好:所有累加器都使用大量连续的行批次进行有效更新。

但是,由于以下原因,此方案对于高基数分组并不理想

  1. 每个组的多次分配,用于组值行格式,以及 RowAccumulators 和每个 AccumulatorAccumulator 也可能在其内部进行额外的分配。

  2. 非向量化更新: 累加器更新通常回退到较慢的非向量化形式,因为在每个输入批次中,不同组的数量很大(因此每个组的值数量很小)。

28.0.0 中的哈希分组

对于 28.0.0,我们遵循传统的系统优化原则:减少分配、类型专门化和积极的向量化,重写了核心分组实现。

DataFusion 28.0.0 使用相同的 RawTable,并且仍然存储组索引。如图 4 所示,主要区别在于

  1. 组值存储在以下位置之一

    • 内联在 RawTable 中(对于原始类型的单列),其中转换为 Row 格式的成本高于其收益

    • 在一个单独的 Rows 结构中,为所有组值进行单次连续分配,而不是每个组分配一次。累加器在内部管理所有组的状态,因此更新中间值的代码是一个紧密的类型专门化循环。新的 GroupsAccumulator 接口产生了高效的类型累加器更新循环。

Group State

图 5: DataFusion 28.0.0 中的哈希组运算符结构。组值可以直接存储在哈希表中,也可以使用 Arrow Row 格式在单个分配中存储。哈希表包含组索引。单个 GroupsAccumulator 存储所有组的每个聚合状态。

由于以下原因,这种新结构显着提高了高基数组的性能

  1. 减少分配: 不再有每个组的单独分配。

  2. 连续的本地累加器状态: 类型专门化的累加器使用 Rust Vec<T> 的某种本地类型,在单个连续分配中存储所有组的值。

  3. 向量化状态更新: 内部聚合更新循环是类型专门化的,并且以本地 Vecs 的形式,Rust 编译器(感谢 LLVM!)对其进行了良好的向量化处理。

注释

一些向量化分组实现将累加器状态按行方式直接存储在哈希表中,这通常可以有效地利用现代 CPU 缓存。以列式方式管理累加器状态可能会牺牲一些缓存局部性,但是,即使存在大量组和聚合,它也可以确保哈希表的大小保持较小,从而使编译器更容易向量化累加器更新。

根据重新计算哈希值的成本,DataFusion 28.0.0 可能会也可能不会在表中存储哈希值。这优化了计算哈希值的成本(例如,对于字符串,哈希值计算成本很高)与将其存储在哈希表中的成本之间的权衡。

将状态更新推送到 GroupsAccumulators 带来的一个微妙之处是,每个累加器都必须处理具有/不具有过滤以及输入中具有/不具有空值的类似变化。DataFusion 28.0.0 使用模板化的 NullState,它封装了跨累加器的这些常见模式。

代码结构在很大程度上受到 DataFusion 使用 Rust 实现这一事实的影响,Rust 是一种专注于速度和安全性的新(ish)系统编程语言。Rust 强烈反对 C/C++ 哈希分组实现中使用的许多传统指针转换“技巧”。DataFusion 聚合代码几乎完全是 safe 的,仅在必要时才偏离到 unsafe。(Rust 是一个绝佳的选择,因为它使 DataFusion 速度快、易于嵌入,并防止了许多与多线程 C/C++ 代码相关的崩溃和安全问题)。

ClickBench 结果

在单个 Parquet 文件上针对 DataFusion 27.0.0、DataFusion 28.0.0 和 DuckDB 0.8.1 运行 ClickBench 查询的完整结果如下。这些数字是在具有 8 个核心和 32 GB RAM 的 GCP e2-standard-8 machine 上使用 此处 的脚本运行的。

随着行业朝着由组件组装而成的数据系统发展,使用 Apache ArrowParquet 等开放标准而不是自定义存储和内存格式交换数据变得越来越重要。因此,此基准测试使用代表许多 DataFusion 用户的单个输入 Parquet 文件,并符合当前分析趋势,即避免在查询之前将数据加载/转换为自定义存储格式的高昂成本。

DataFusion 现在达到了接近 DuckDB 的速度来查询 Parquet 数据。虽然我们不打算与一个字面上写了 公平基准测试被认为很困难 的团队进行基准测试对决,但希望每个人都同意 DataFusion 28.0.0 是一个重大改进。

ClickBench - all queries

图 6: DataFusion 27.0.0、DataFusion 28.0.0 和 DuckDB 0.8.1 在针对单个 hits.parquet 文件 的所有 43 个 ClickBench 查询上的性能。数值越低越好。

注释

DataFusion 27.0.0 由于计划器错误(Q9、Q11、Q12、14)或内存不足(Q33)而无法运行多个查询。DataFusion 28.0.0 解决了这些问题。

DataFusion 在查询 21 和 22 中比 DuckDB 更快,这可能是由于字符串模式匹配的优化实现。

结论:性能至关重要

将聚合性能提高两倍以上,使使用 DataFusion 构建产品和项目的开发人员可以将更多时间用于增值领域特定功能。我们相信使用 DataFusion 构建系统比尝试从头开始构建类似系统要快得多。DataFusion 提高了生产力,因为它消除了重建众所周知但实施成本高昂的分析数据库技术的需要。虽然我们对 DataFusion 28.0.0 中的改进感到满意,但我们绝不会止步于此,并且正在追求 (更多)聚合性能。性能的未来是光明的。

致谢

DataFusion 是一项 社区努力,如果没有社区中许多人的贡献,这项工作是不可能实现的。特别感谢 sunchaoyjshenyahoNanJingmingmwangozankabakmustafasrepo 以及所有其他在 工作 期间贡献了想法、评论和鼓励的人。

关于 DataFusion

Apache Arrow DataFusion 是一个可扩展的查询引擎和数据库工具包,使用 Rust 编写,并使用 Apache Arrow 作为其内存格式。DataFusion 与 Apache Calcite、Facebook 的 Velox 和类似技术是下一代“解构数据库”架构的一部分,在这些架构中,新系统构建在快速、模块化组件的基础上,而不是作为单个紧密集成的系统。


1 SELECT COUNT(*) FROM 'hits.parquet';
2 SELECT COUNT(DISTINCT "UserID") as num_users FROM 'hits.parquet';
3 SELECT COUNT(DISTINCT "SearchPhrase") as num_phrases FROM 'hits.parquet';
4 SELECT COUNT(*) FROM (SELECT DISTINCT "UserID", "SearchPhrase" FROM 'hits.parquet')
5 完整脚本请访问 hash.py
6 hits_0.parquet,来自分区的 ClickBench 数据集的文件之一,它有 100,000 行,大小为 117 MB。整个数据集在一个 14 GB Parquet 文件中有 100,000,000 行。该脚本在 40 分钟后未完成整个数据集的运行,并且峰值时使用了 212GB RAM。