使用毫秒级延迟查询 Parquet

导航至

我们认为,直接查询 Apache Parquet 文件中的数据可以实现与大多数专用文件格式相似或更好的存储效率和查询性能。虽然这需要大量的工程投入,但 Parquet 开放格式和广泛的生态系统支持使其成为各种数据系统的显而易见的选择。

在本文中,我们将解释快速查询 Parquet 格式存储的数据所需的几种高级技术,这些技术我们在 Apache Arrow Rust Parquet 读取器中实现。这些技术共同使 Rust 实现成为查询 Parquet 文件(无论是在本地磁盘还是远程对象存储上)速度最快(如果不是最快)的实现之一。它能够在 毫秒级的时间内查询 GB 级的 Parquet 数据。

背景

Apache Parquet 是一种越来越流行的开放格式,用于存储 分析数据集,并且由于其引人注目的组合,已成为经济高效、与 DBMS 无关的数据存储的事实标准:

  • 高压缩率
  • 适用于 S3 等商品化 blob 存储
  • 广泛的生态系统和工具支持
  • 跨许多不同平台和工具的可移植性
  • 支持 任意结构化数据

越来越多的其他系统,例如 DuckDBRedshift,允许直接查询 Parquet 存储的数据,但与它们的本地(自定义)文件格式相比,支持通常仍然是次要考虑因素。此类格式包括 DuckDB .duckdb 文件格式、Apache IOT TsFileGorilla 格式 等。

首次,以前仅在闭源商业实现中可用的相同复杂查询技术现在可以作为开源使用。所需的工程能力来自大型、运营良好的开源项目,这些项目拥有全球贡献者社区,例如 Apache ArrowApache Impala

Parquet 文件格式

在深入研究高效读取 Parquet 的细节之前,重要的是要了解文件布局。文件格式经过精心设计,可以快速定位所需信息,跳过不相关的部分,并高效地解码剩余部分。

  • Parquet 文件中的数据被分成称为 RowGroup 的水平切片
  • 每个 RowGroup 包含模式中每列的单个 ColumnChunk

例如,下图说明了一个 Parquet 文件,其中包含三列“A”、“B”和“C”,存储在两个 RowGroup 中,总共有 6 个 ColumnChunk。

Parquet File Format Diagram 12.05.2022v1

ColumnChunk 的逻辑值使用多种 可用编码之一写入到一个或多个 Data Page 中,这些 Data Page 在文件中按顺序附加。Parquet 文件的末尾是一个页脚,其中包含重要的元数据,例如

  • 文件的模式信息,例如列名和类型
  • RowGroup 和 ColumnChunk 在文件中的位置。页脚也可能包含其他专用数据结构
  • 每个 ColumnChunk 的可选统计信息,包括最小值/最大值和空值计数
  • 指向 OffsetIndexes 的可选指针,其中包含每个单独 Page 的位置
  • 指向 ColumnIndex 的可选指针,其中包含每个 Page 的行计数和摘要统计信息
  • 指向 BloomFilterData 的可选指针,可以快速检查某个值是否在 ColumnChunk 中

例如,前一个图中 2 个 Row Group 和 6 个 ColumnChunk 的逻辑结构可以存储在 Parquet 文件中,如下图所示(未按比例)。ColumnChunk 的页面首先出现,然后是页脚。数据、编码方案的有效性以及 Parquet 编码器的设置决定了每个 ColumnChunk 所需页面的数量和大小。在本例中,ColumnChunk 1 需要 2 页,而 ColumnChunk 6 只需要 1 页。除了其他信息外,页脚还包含每个 Data Page 的位置和列的类型。

Parquet File Format Diagram 2 12.05.2022v1

在创建 Parquet 文件时,需要考虑许多重要的标准,例如如何最佳地排序/聚类数据并将其结构化为 RowGroup 和 Data Page。这些“物理设计”考虑因素很复杂,值得单独撰写一系列文章,并且不在本博文中讨论。相反,我们专注于如何使用可用的结构来使查询非常快速。

优化查询

在任何查询处理系统中,以下技术通常可以提高性能:

  1. 减少必须从辅助存储器传输以进行处理的数据(减少 I/O)
  2. 减少解码数据的计算负载(减少 CPU)
  3. 交错/流水线化数据的读取和解码(提高并行性)

相同的原则也适用于查询 Parquet 文件,如下所述:

解码优化

Parquet 通过使用 复杂的编码技术(例如游程长度压缩、字典编码、增量编码等)来实现令人印象深刻的压缩率。因此,CPU 密集型的解码任务可能会主导查询延迟。Parquet 读取器可以使用多种技术来提高此任务的延迟和吞吐量,正如我们在 Rust 实现中所做的那样。

向量化解码

大多数分析系统一次解码多个值到列式内存格式(例如 Apache Arrow),而不是逐行处理数据。这通常称为向量化或列式处理,这样做的好处是:

  • 分摊调度开销,以切换到正在解码的列的类型
  • 通过从 ColumnChunk 读取连续值来提高缓存局部性
  • 通常允许在单个指令中解码多个值。
  • 通过单个大型分配避免许多小型堆分配,从而为字符串和字节数组等可变长度类型节省大量开销

因此,Rust Parquet Reader 实现了专用解码器,用于将 Parquet 直接读取到 列式内存格式(Arrow Arrays)。

流式解码

RowGroup 中哪些行存储在哪些 Page 中,ColumnChunk 中哪些行存储在哪些 Page 中,两者之间没有关系。例如,第 10,000 行的逻辑值可能在列 A 的第一个页面中,而在列 B 的第三个页面中。

向量化解码最简单的方法,也是 Parquet 解码器最初经常实现的方法,是一次解码整个 RowGroup(或 ColumnChunk)。

但是,考虑到 Parquet 的高压缩率,单个 RowGroup 很可能包含数百万行。一次解码如此多的行是不理想的,因为它:

  • 需要大量的中间 RAM:为处理优化的典型内存格式(例如 Apache Arrow)需要的内存远多于其 Parquet 编码形式。
  • 增加查询延迟:后续处理步骤(如过滤或聚合)只能在整个 RowGroup(或 ColumnChunk)解码完成后才能开始。

因此,最好的 Parquet 读取器支持“流式”数据输出,按需生成可配置大小的行批次。批次大小必须足够大,以分摊解码开销,但又足够小,以实现高效的内存使用,并允许下游处理在解码后续批次时并发开始。

Parquet File Streaming Decode Diagram 12.05.2022v1

虽然流式处理不是一个难以解释的功能,但解码的状态性质,尤其是在跨多列和 任意嵌套数据的情况下,其中行和值之间的关系不是固定的,这需要 复杂的中间缓冲和大量的工程工作才能正确处理。

字典保留

字典编码,也称为 分类编码,是一种技术,其中列中的每个值都不直接存储,而是存储在单独的列表(称为“字典”)中的索引。对于具有重复值(低基数)的列,此技术实现了 第三范式 的许多优点,并且对于字符串列(例如“城市”)尤其有效。

ColumnChunk 中的第一个页面可以选择是字典页面,其中包含列类型的值列表。然后,此 ColumnChunk 中的后续页面可以编码到此字典的索引,而不是直接编码值。

考虑到这种编码的有效性,如果 Parquet 解码器只是将字典数据解码为本机类型,它将重复且低效地复制相同的值,这对于字符串数据尤其灾难性。为了高效地处理字典编码的数据,必须在解码期间保留编码。方便的是,许多列式格式(例如 Arrow DictionaryArray)都支持这种兼容的编码。

保留字典编码可以大大提高读取到 Arrow 数组时的性能,在某些情况下超过 60 倍,并显著减少内存使用量。

保留字典的主要复杂因素是字典是按 ColumnChunk 存储的,因此字典在 RowGroup 之间会发生变化。读取器必须自动为跨多个 RowGroup 的批次重新计算字典,同时还要针对批次大小均匀划分为每个 RowGroup 的行数的情况进行优化。此外,一列可能只是 部分字典编码,这进一步复杂了实现。有关此技术及其复杂性的更多信息,请参见关于将此技术应用于 C++ Parquet 读取器的博客文章

投影下推

最基本的 Parquet 优化,也是最常描述的 Parquet 文件优化是投影下推,它可以减少 I/O 和 CPU 需求。此上下文中的投影意味着“选择部分列而不是全部列”。鉴于 Parquet 组织数据的方式,直接读取和解码引用列所需的 ColumnChunk 即可。

例如,考虑以下形式的 SQL 查询:

SELECT B from table where A > 35

此查询仅需要列 A 和 B(而不是 C)的数据,并且可以将投影“下推”到 Parquet 读取器。

具体来说,使用页脚中的信息,Parquet 读取器可以完全跳过获取 (I/O) 和解码 (CPU) 存储列 C 数据的 Data Page(在我们示例中为 ColumnChunk 3 和 ColumnChunk 6)。

Parquet File Projection Pushdown Diagram 12.05.2022v1

谓词下推

与投影下推类似,谓词下推也避免从 Parquet 文件中获取和解码数据,但它是使用过滤器表达式来实现的。此技术通常需要与查询引擎(例如 DataFusion)更紧密的集成,以确定有效谓词并在扫描期间对其进行评估。不幸的是,如果没有仔细的 API 设计,Parquet 解码器和查询引擎最终可能会紧密耦合,从而阻止重用(例如,Cloudera Parquet 谓词下推文档中存在不同的 Impala 和 Spark 实现)。Rust Parquet 读取器使用 RowSelection API 来避免这种耦合。

RowGroup 剪枝

许多基于 Parquet 的查询引擎支持的最简单的谓词下推形式是使用存储在页脚中的统计信息来跳过整个 RowGroup。我们将此操作称为 RowGroup 剪枝,它类似于许多经典 数据仓库 系统中的 分区剪枝

对于上面的示例查询,如果特定 RowGroup 中 A 的最大值小于 35,则解码器可以跳过获取和解码来自该整个 RowGroup 的任何 ColumnChunk。

Parquet File RowGroup Pruning Diagram 12.05.2022v1

请注意,基于最小值和最大值进行剪枝对于许多数据布局和列类型都有效,但并非全部有效。具体来说,对于具有许多不同的伪随机值(例如标识符或 uuid)的列,它不如有效。值得庆幸的是,对于这种情况,Parquet 还支持每个 ColumnChunk 的 Bloom 过滤器。我们正在积极致力于在 Apache Rust 的实现中 添加 Bloom 过滤器 支持。

Page 剪枝

更复杂的谓词下推形式是使用页脚元数据中的可选 页面索引 来排除整个 Data Page。解码器仅解码来自其他列的相应行,通常跳过整个页面。

由于多种原因,不同 ColumnChunk 中的页面通常包含不同数量的行,这一事实使这种优化变得复杂。虽然页面索引可以识别来自一列的所需页面,但从一列中剪枝页面并不会立即排除其他列中的整个页面。

Page 剪枝按如下步骤进行:

  • 将谓词与页面索引结合使用,以识别要跳过的页面
  • 使用偏移索引来确定哪些行范围对应于未跳过的页面
  • 计算跨未跳过页面的范围的交集,并仅解码这些行

最后一点非常难以实现,特别是对于嵌套列表,其中 单行可能对应于多个值。幸运的是,Rust Parquet 读取器在内部隐藏了这种复杂性,并且可以解码任意 RowSelections

例如,要扫描列 A 和 B,它们存储在 5 个 Data Page 中,如下图所示:

如果谓词是 A > 35,

  • Page 1 使用页面索引进行剪枝(最大值为 20),留下 [200-> onwards] 的 RowSelection,
  • Parquet 读取器完全跳过 Page 3(因为其最后一行索引为 99)
  • (仅)通过读取页面 2、4 和 5 来读取相关行。

如果谓词改为 A > 35 AND B = “F”,则页面索引甚至更有效:

  • 使用 A > 35,与之前一样产生 [200-> onwards] 的 RowSelection
  • 在 B 的剩余 Page 4 和 Page 5 上使用 B = “F”,产生 [100-244] 的 RowSelection
  • 相交两个 RowSelection 后,留下组合的 RowSelection [200-244]
  • Parquet 读取器仅解码来自 Page 2 和 Page 4 的这 50 行。

Telegraf configuration

对从 Arrow C++ 以及扩展的 pyarrow/pandas 读取和写入这些索引的支持在 PARQUET-1404 中进行了跟踪。

延迟物化

前两种形式的谓词下推仅对 RowGroup、ColumnChunk 和 Data Page 的元数据进行操作,在解码值之前。但是,相同的技术也扩展到在一个或多个列的值解码后但在解码其他列之前,这通常称为“延迟物化”。

当以下情况时,此技术尤其有效:

  • 谓词非常具有选择性,即过滤掉大量行
  • 每行很大,要么是因为行很宽(例如 JSON blob),要么是因为列很多
  • 选定的数据聚集在一起
  • 谓词所需的列解码相对便宜,例如 PrimitiveArray / DictionaryArray

SPARK-36527Impala 中有关于此技术优势的更多讨论。

例如,给定上面的谓词 A > 35 AND B = “F”,其中引擎使用页面索引确定只有 [100-244] 的 RowSelection 中的 50 行可能匹配,使用延迟物化,Parquet 解码器:

  • 解码列 A 的 50 个值
  • 在这些 50 个值上评估 A > 35
  • 在本例中,只有 5 行通过,导致 RowSelection:
    • RowSelection[205-206]
    • RowSelection[238-240]
  • 仅解码这些选择的列 B 的 5 行。

Parquet File Late Materialization Diagram 12.05.2022v1

在某些情况下,例如我们的示例中 B 存储单字符值,延迟物化机制的成本可能会超过解码的节省。然而,当满足上述某些条件时,节省可能是巨大的。查询引擎必须决定下推哪些谓词以及以何种顺序应用它们才能获得最佳结果。

虽然这超出了本文档的范围,但相同的技术也可以应用于多个谓词以及多列上的谓词。 有关更多信息,请参阅 Parquet crate 中的 RowFilter 接口,以及 DataFusion 中的 row_filter 实现。

I/O 下推

虽然 Parquet 被设计用于在 HDFS 分布式文件系统上高效访问,但它也适用于商品 blob 存储系统(如 AWS S3),因为它们具有非常相似的特性

  • 相对较慢的“随机访问”读取:在每个请求中读取大量(MB)数据段比为较小部分发出许多请求更有效
  • 检索首字节之前的显著延迟
  • 高单次请求成本:通常按请求收费,无论读取的字节数如何,这鼓励减少请求次数,而每次请求读取较大的连续数据段。

为了从这些系统中获得最佳读取效果,Parquet 读取器必须

  1. 最大限度地减少 I/O 请求的数量,同时应用各种下推技术,以避免获取大量未使用的数据。
  2. 与适当的任务调度机制集成,以交错 I/O 和对已获取数据的处理,以避免管道瓶颈。

由于这些都是重大的工程和集成挑战,许多 Parquet 读取器仍然需要将文件完整地获取到本地存储。

为了处理而获取整个文件对于以下几个原因来说并不理想

  1. 高延迟: 在获取整个文件之前无法开始解码(Parquet 元数据位于文件末尾,因此解码器必须先看到末尾才能解码其余部分)
  2. 浪费的工作: 获取整个文件会获取所有必要的数据,但也可能获取大量不必要的数据,这些数据在读取页脚后将被跳过。 这不必要地增加了成本。
  3. 需要昂贵的“本地连接”存储(或内存): 许多云环境不提供具有本地连接存储的计算资源 – 它们要么依赖昂贵的网络块存储(如 AWS EBS),要么将本地存储限制为某些 VM 类。

避免需要缓冲整个文件需要一个复杂的 Parquet 解码器,该解码器与 I/O 子系统集成,可以首先获取和解码元数据,然后为相关数据块进行范围获取,并与 Parquet 数据的解码交错进行。 这种优化需要精心的工程设计,以便从对象存储中获取足够大的数据块,从而使单次请求的开销不会超过减少传输字节数所带来的收益。 SPARK-36529 更详细地描述了顺序处理的挑战。

Parquet File IO Pushdown Diagram 12.05.2022v1

此图表中未包含诸如合并请求和确保实际实现所需的最小请求大小等细节。

Rust Parquet crate 提供了一个异步 Parquet 读取器,可以有效地从任何支持范围请求的 AsyncFileReader 中读取,该读取器

  • 可以有效地从任何支持范围请求的存储介质中读取
  • 与 Rust 的 futures 生态系统集成,以避免阻塞线程等待网络 I/O 并且可以轻松地交错 CPU 和网络
  • 同时请求多个范围,以允许实现合并相邻范围、并行获取范围等。
  • 使用先前描述的下推技术来消除尽可能多地获取数据的情况
  • 轻松与 Apache Arrow object_store crate 集成,您可以在 此处 阅读更多相关信息

为了让您了解可能实现的效果,下图显示了从远程文件获取页脚元数据、使用该元数据确定要读取哪些数据页,然后同时获取数据和解码的时间线。 为了匹配网络延迟、带宽和可用 CPU,此过程通常必须一次针对多个文件完成。

Parquet File IO Pushdown Diagram 2 12.05.2022v1

结论

我们希望您喜欢阅读有关 Parquet 文件格式以及用于快速查询 parquet 文件的各种技术的文章。

我们认为,大多数 Parquet 开源实现不具备本文中所述的广泛功能的原因在于,这需要付出巨大的努力,而这种努力以前只有在资金充足的商业企业中才有可能实现,而这些企业对其实现保持了闭源。

然而,随着 Apache Arrow 社区(包括 Rust 从业者和更广泛的 Arrow 社区)的成长和质量的提高,我们协作构建尖端开源实现的能力令人振奋且非常令人满意。 本博客中描述的技术是许多工程师在公司、业余爱好者和全球范围内在多个存储库(特别是 Apache Arrow DataFusionApache ArrowApache Arrow Ballista.)贡献的结果。

如果您有兴趣加入 DataFusion 社区,请联系我们