毫秒级延迟查询Parquet

导航至

我们认为,直接查询Apache Parquet文件中的数据可以与大多数专业文件格式相比,甚至实现更高的存储效率和查询性能。虽然这需要大量的工程努力,但Parquet的开源格式和广泛的生态系统支持使其成为广泛类别的数据系统的明显选择。

在这篇文章中,我们解释了在Apache Arrow Rust Parquet读取器中实现的一些查询存储在Parquet格式中的数据的先进技术。这些技术共同使得Rust实现成为查询Parquet文件最快的一种,无论是在本地磁盘还是远程对象存储。它能够在毫秒内查询GB级的Parquet文件。

背景

Apache Parquet是一种越来越受欢迎的用于存储分析数据集的开源格式,已成为成本效益高、与DBMS无关的数据存储的事实标准。最初是为Hadoop生态系统创建的,但由于其有吸引力的组合,Parquet的适用范围现在已广泛扩展到数据分析生态系统。

  • 高压缩率
  • 易于与S3等商品blob存储兼容
  • 广泛的生态系统和工具支持
  • 跨多个不同平台和工具的可移植性
  • 支持任意结构化数据

现在,其他系统,如DuckDBRedshift,允许直接查询存储在Parquet中的数据,但与它们本机(自定义)文件格式相比,支持通常是一个次要的考虑因素。这些格式包括DuckDB的.duckdb文件格式,Apache IOT的TsFileGorilla格式等。

首次,以前仅在闭源商业实现中可用的复杂查询技术,现在作为开源软件可用。所需的工程能力来自大型、高效的开源项目,如Apache ArrowApache Impala,它们拥有全球性的贡献者社区。

Parquet文件格式

在深入了解高效读取Parquet的细节之前,了解文件布局非常重要。文件格式设计得非常精心,可以快速定位所需信息,跳过无关部分,并高效地解码剩余内容。

  • Parquet文件中的数据被划分为水平切片,称为RowGroups
  • 每个RowGroup包含模式中每个列的单个ColumnChunk

例如,以下图表展示了包含三个列“A”、“B”和“C”的Parquet文件,这些列存储在两个RowGroups中,总共有6个ColumnChunks。

Parquet File Format Diagram 12.05.2022v1

ColumnChunk的逻辑值使用多种可用编码之一写入文件中的一个或多个数据页,这些数据页按顺序附加到文件中。在Parquet文件的末尾是页脚,其中包含重要元数据,例如:

  • 文件的架构信息,如列名和数据类型
  • 文件中RowGroup和ColumnChunk的位置。页脚还可能包含其他特殊数据结构
  • 可选的每个ColumnChunk的统计数据,包括最小/最大值和空值计数
  • 可选的指向包含每个单独页面位置的OffsetIndexes的指针
  • 可选的指向包含每个页面行数和摘要统计信息的ColumnIndex的指针
  • 可选的指向包含可以快速检查ColumnChunk中是否存在值的BloomFilterData的指针

例如,前面图中2个RowGroup和6个ColumnChunk的逻辑结构可能如图所示(非比例)存储在Parquet文件中。ColumnChunk的页面首先出现,然后是页脚。所需页面数量和大小由数据、编码方案的有效性和Parquet编码器的设置决定。在这种情况下,ColumnChunk 1需要2个页面,而ColumnChunk 6只需要1个页面。除了其他信息外,页脚还包含每个数据页的位置和列的类型。

Parquet File Format Diagram 2 12.05.2022v1

创建Parquet文件时有许多重要标准要考虑,例如如何最佳地排序/聚类数据并将其组织到RowGroup和数据页中。这种“物理设计”考虑因素很复杂,值得单独的文章系列,本博客文章没有涉及。相反,我们关注如何使用现有的结构来使查询非常快。

查询优化

在任何查询处理系统中,以下技术通常可以提高性能

  1. 减少必须从辅助存储器传输以进行处理的数据(减少I/O)
  2. 减少解码数据的计算负载(减少CPU)
  3. 交错/流水线数据的读取和解码(提高并行性)

以下原则也适用于查询Parquet文件,如下所述

解码优化

Parquet通过使用运行长度压缩、字典编码、delta编码等复杂编码技术实现了令人印象深刻的压缩比率。因此,解码的CPU密集型任务可能会主导查询延迟。Parquet读取器可以使用多种技术来提高该任务的延迟和吞吐量,正如我们在Rust实现中所做的那样。

向量化解码

大多数分析系统一次将多个值解码到列式内存格式,如Apache Arrow,而不是逐行处理数据。这通常称为向量或列式处理,它是有益的,因为它

  • 将类型切换的开销分摊到正在解码的列上
  • 通过从ColumnChunk读取连续值来提高缓存局部性
  • 通常允许在一次指令中解码多个值。
  • 通过单个大分配避免许多小堆分配,对于字符串和字节数组等可变长度类型可以节省大量空间

因此,Rust Parquet Reader实现了专门的解码器,可以直接将Parquet读取到列式内存格式(Arrow Arrays)中。

流式解码

在ColumnChunks中,存储在哪些页面中的行之间没有关系。例如,第10,000行的逻辑值可能在列A的第一个页面和列B的第三个页面中。

向量化解码的最简单方法是每次解码整个RowGroup(或ColumnChunk)。这是在Parquet解码器中通常最初实现的方法。

然而,考虑到Parquet的高压缩比,单个RowGroup可能包含数百万行。一次性解码这么多行不是最佳选择,因为它

  • 需要大量的中间RAM:典型的内存格式优化处理,如Apache Arrow,需要比它们的Parquet编码形式更多。
  • 增加查询延迟:后续处理步骤(如过滤或聚合)只能在整个RowGroup(或ColumnChunk)解码完成后才能开始。

因此,最佳的Parquet读取器支持“流式”数据输出,通过按需生成可配置大小的行批次来实现。批次大小必须足够大,以分摊解码开销,但又足够小,以便高效使用内存,并允许在解码下一个批次的同时开始下游处理。

Parquet File Streaming Decode Diagram 12.05.2022v1

虽然流式传输不是一项复杂的特性,但解码的状态性,特别是在多个列和任意嵌套数据的情况下,其中行和值之间的关系不是固定的,需要复杂的中间缓冲和大量的工程努力来正确处理。

字典保留

字典编码,也称为类别编码,是一种技术,其中列中的每个值不是直接存储,而是存储在一个单独的列表中,称为“字典”。这种技术实现了许多具有重复值的列(低基数)的第三范式的好处,对于字符串列(如“城市”)尤其有效。

ColumnChunk的第一个页面可以是可选的字典页面,其中包含列类型的值列表。在此ColumnChunk中的后续页面可以编码到这个字典中的索引,而不是直接编码值。

鉴于这种编码的有效性,如果Parquet解码器只是将字典数据解码为原生类型,那么它将低效地反复复制相同的值,这对字符串数据尤其灾难性。为了有效地处理字典编码的数据,必须在解码过程中保留编码。方便的是,许多列格式,如Arrow的DictionaryArray,支持这种兼容编码。

保留字典编码在将数据读入Arrow数组时显著提高了性能,在某些情况下超过60倍,同时使用显著更少的内存。

保留字典的主要复杂因素是字典是按ColumnChunk存储的,因此字典在RowGroup之间会发生变化。读取器必须自动为跨越多个RowGroup的批次重新计算字典,同时优化批次大小能够均匀地整除每个RowGroup的行数的情况。此外,一个列可能只部分字典编码,进一步复杂化实现。有关这种技术和其复杂性的更多信息,请参阅博客文章,该文章介绍了将此技术应用于C++ Parquet读取器。

投影下推

最基础的 Parquet 优化,也是最常见的 Parquet 文件优化方法,是投影下推,这可以减少 I/O 和 CPU 的需求。这里的“投影”意味着“选择一些而不是所有列”。鉴于 Parquet 的数据组织方式,只读取和解码所需列的 ColumnChunk 是很直接的。

例如,考虑一个如下形式的 SQL 查询:

SELECT B FROM table WHERE A > 35

此查询只需要 A 和 B 列的数据(而不是 C 列),并且投影可以“下推”到 Parquet 读取器。

具体来说,使用页脚中的信息,Parquet 读取器可以完全跳过获取(I/O)和解码(CPU)存储 C 列数据的 Data Pages(在我们的示例中是 ColumnChunk 3 和 ColumnChunk 6)。

Parquet File Projection Pushdown Diagram 12.05.2022v1

谓词下推

与投影下推类似,谓词下推也避免了从 Parquet 文件中获取和解析数据,但它使用过滤表达式来实现。这种技术通常需要与查询引擎(如 DataFusion)更紧密的集成,以确定有效的谓词并在扫描过程中评估它们。遗憾的是,如果没有仔细的 API 设计,Parquet 解码器和查询引擎可能会紧密耦合,从而防止重用(例如,在 Cloudera Parquet 谓词下推文档中有不同的 Impala 和 Spark 实现)。Rust Parquet 读取器使用 RowSelection API 来避免这种耦合。

RowGroup 剪枝

谓词下推的最简单形式,由许多基于 Parquet 的查询引擎支持,使用页脚中存储的统计信息来跳过整个 RowGroup。我们称此操作为 RowGroup 剪枝,这与许多经典 数据仓库 系统中的 分区剪枝 类似。

对于上面的示例查询,如果某个 RowGroup 中 A 的最大值小于 35,解码器可以跳过从该 整个 RowGroup 中获取和解析任何 ColumnChunk。

Parquet File RowGroup Pruning Diagram 12.05.2022v1

请注意,基于最小值和最大值的剪枝对于许多数据布局和列类型都是有效的,但并非所有类型都有效。具体来说,对于具有许多不同的伪随机值(例如标识符或 uuids)的列,这种剪枝效果较差。幸运的是,对于这种情况,Parquet 也支持每个 ColumnChunk 的 Bloom 过滤器。我们正在积极开发在 Apache Rust 实现中添加 bloom 过滤器的功能 (添加 bloom 过滤器)

页剪枝

谓词下推的一种更复杂的形式,使用页脚元数据中的可选 页面索引 来排除整个数据页面。解码器只解码其他列的对应行,通常跳过整个页面。

由于各种原因,不同 ColumnChunk 中的页面通常包含不同数量的行,这使得这种优化变得复杂。虽然页面索引可能标识了所需页面的一个列,但从一个列中剪枝页面并不立即排除其他列中的整个页面。

页剪枝的步骤如下:

  • 结合谓词和页面索引来识别要跳过的页面
  • 使用偏移量索引来确定非跳过页面对应的行范围
  • 计算非跳过页面范围之间的交集,并只解码这些行

这个最后一点实现起来非常复杂,尤其是在嵌套列表中,其中一行可能对应多个值。幸运的是,Rust Parquet读取器在内部隐藏了这种复杂性,并且可以解码任意的行选择

例如,扫描图下所示的5个数据页中存储的列A和B

如果谓词是A > 35,

  • 使用页面索引剪枝页面1(最大值为20),留下行选择[200-及以后],
  • Parquet读取器完全跳过页面3(因为它最后的行为99)
  • 只读取页面2、4和5的相关行。

如果谓词改为A > 35 AND B = “F”,页面索引将更加有效

  • 使用A > 35,得到与之前相同的行选择[200-及以后]
  • 使用B = “F”,在B的剩余页面4和页面5上,得到行选择[100-244]
  • 交集这两个行选择留下组合行选择[200-244]
  • Parquet读取器仅从页面2和页面4解码这50行。

Telegraf configuration

从Arrow C++读取和写入这些索引的支持,以及通过pyarrow/pandas扩展,在PARQUET-1404中跟踪。

后期实现

前两种谓词下推形式仅对解码值之前存储在行组、列块和数据页面上的元数据操作。然而,相同的技巧也扩展到解码一个或多个列值之后但在解码其他列之前,这通常被称为“后期实现”。

这种技术在以下情况下特别有效

  • 谓词非常具有选择性,即过滤掉大量行
  • 每一行都很大,这可能是由于宽行(例如JSON blob)或许多列
  • 选定的数据聚集在一起
  • 所需谓词的列相对容易解码,例如PrimitiveArray / DictionaryArray

有关此技术益处的更多讨论,请参阅SPARK-36527Impala

例如,考虑上述的谓词A > 35 AND B = “F”,其中引擎使用页面索引确定只有50行在行选择[100-244]中可能匹配,使用后期实现,Parquet解码器

  • 解码列A的50个值
  • 评估这些50个值上的A > 35
  • 在这种情况下,只有5行通过,导致行选择
    • 行选择[205-206]
    • 行选择[238-240]
  • 只对那些选择解码列B的5行

Parquet File Late Materialization Diagram 12.05.2022v1

在某些情况下,例如我们的例子中B存储单字符值,后期实现机制的成本可能会超过解码的节省。但是,如果满足上述某些条件,节省可能非常可观。查询引擎必须决定要下推哪些谓词以及以何种顺序应用它们,以获得最佳结果。

虽然这超出了本文件的范畴,但相同的技巧也可以应用于多个谓词以及多列谓词。有关更多信息,请参阅Parquet crate中的RowFilter接口,以及DataFusion中的row_filter实现。

I/O下推

虽然Parquet是为在HDFS分布式文件系统上高效访问而设计的,但它与通用blob存储系统(如AWS S3)配合得非常好,因为它们具有非常相似的特征

  • 相对较慢的“随机访问”读取:在每个请求中读取大量(MBs)数据部分比发出许多请求读取更小的部分要高效得多
  • 在检索第一个字节之前存在显著的延迟
  • 每请求成本高:通常按请求计费,而不管读取的字节数多少,这鼓励发出较少的请求,每个请求都读取大量连续的数据部分。

为了从这样的系统中读取最佳数据,Parquet读取器必须

  1. 最小化I/O请求的数量,同时应用各种下推技术以避免获取大量未使用的数据。
  2. 与适当的任务调度机制集成,以在获取的数据上交错I/O和处理,以避免管道瓶颈。

由于这些都是重大的工程和集成挑战,许多Parquet读取器仍然需要将整个文件从本地存储中检索出来。

为了处理而检索整个文件在几个方面并不理想

  1. 高延迟:解码必须在整个文件检索完毕后开始(Parquet元数据位于文件末尾,因此解码器必须在解码其余部分之前看到末尾)
  2. 浪费工作:检索整个文件检索了所有必要的数据,但也可能检索到大量不必要的数据,在读取页脚后会跳过。这无端增加了成本。
  3. 需要昂贵的“本地连接”存储(或内存):许多云环境不提供具有本地连接存储的计算资源——它们要么依赖昂贵的网络块存储,如AWS EBS,要么将本地存储限制为某些VM类的资源。

避免需要缓冲整个文件需要一个复杂的Parquet解码器,与I/O子系统集成,可以最初检索并解码元数据,然后进行相关数据块的区间检索,同时交错解码Parquet数据。这种优化需要仔细的工程,以从对象存储中检索足够大的数据块,这样每请求的开销就不会主导减少的字节数带来的收益。《SPARK-36529》更详细地描述了顺序处理中的挑战。

Parquet File IO Pushdown Diagram 12.05.2022v1

本图中未包含合并请求和确保实际实现所需的最低请求大小的细节。

Rust Parquet crate提供了一个异步Parquet读取器,可以有效地从任何AsyncFileReader读取

  • 有效地从支持范围请求的任何存储介质读取
  • 与Rust的未来生态系统集成,以避免线程在网络I/O上阻塞,并且可以轻松地交错CPU和网络
  • 同时请求多个范围,以便实现可以合并相邻范围,并行检索范围等。
  • 使用之前描述的下推技术尽可能消除数据检索
  • 可以轻松与Apache Arrow object_store crate集成,您可以在此处了解更多信息

为了说明可能实现的内容,以下图片显示从远程文件检索页脚元数据的时间线,使用该元数据确定要读取的数据页,然后同时检索数据和解码。这个过程通常必须同时处理多个文件,以匹配网络延迟、带宽和可用CPU。

Parquet File IO Pushdown Diagram 2 12.05.2022v1

结论

我们希望您喜欢阅读有关Parquet文件格式以及用于快速查询Parquet文件的各种技术的介绍。

我们认为,大多数开源Parquet实现没有本文中描述的广泛特性,原因是这需要巨大的努力,以往这只有在资金充足的商业企业才能实现,而且这些企业将它们的实现保持为闭源。

然而,随着Apache Arrow社区的增长和质量提升,包括Rust实践者和更广泛的Arrow社区,我们合作并构建前沿开源实现的能力令人振奋且非常满足。本文中描述的技术是多家公司、爱好者以及全球多个仓库中的许多工程师贡献的结果,特别是Apache Arrow DataFusionApache ArrowApache Arrow Ballista

如果您有兴趣加入DataFusion社区,请联系我们