如何阅读 InfluxDB 3.0 查询计划

导航至

这篇博文解释了如何在 InfluxDB 3.0 中阅读查询计划,并需要具备 InfluxDB 3.0 系统架构 的基本知识。

InfluxDB 3.0 支持两种查询语言:SQL 和 InfluxQL。数据库根据查询计划的指令执行以 SQL 或 InfluxQL 编写的查询。要在不运行查询的情况下查看计划,请在查询前面添加关键字 EXPLAIN,如下所示

EXPLAIN 
SELECT   city, min_temp, time 
FROM     temperature 
ORDER BY city ASC, time DESC;

输出将如下所示:
图 1:查询计划的简化输出

计划有两种类型:逻辑计划和物理计划。

逻辑计划: 这是为特定的 SQL 或 InfluxQL 查询生成的计划,无需了解底层数据组织或集群配置。由于 InfluxDB 3.0 构建于 DataFusion 之上,因此逻辑计划与您在 DataFusion 中看到的任何数据格式或存储非常相似。

物理计划: 这是从查询的相应逻辑计划加上集群配置(例如,CPU 数量)和底层数据组织(例如,文件数量、文件中的数据布局等)信息生成的计划。物理计划特定于您的数据和 InfluxDB 集群配置。如果您将相同的数据加载到具有不同配置的不同集群,则相同的查询可能会生成不同的物理查询计划。同样,在不同时间在同一集群上运行相同的查询可能会有不同的计划,具体取决于您当时的数据。

理解查询计划可以帮助解释为什么查询速度慢。例如,如果计划显示您的查询读取了许多文件,您可以添加更多过滤器以减少需要读取的数据量,或者修改您的集群配置/设计以创建更少但更大的文件。本文档重点介绍如何阅读查询计划。提高查询运行速度的技术取决于查询速度慢的原因,并且超出了这篇博文的范围。

查询计划是一棵树

查询计划是一棵倒置的树,应该从下往上阅读。在树形格式中,我们可以用以下方式表示图 1 的物理计划:
图 2:图 1 中物理计划的树结构

树中每个节点的名称都以 Exec 结尾,表示一个 ExecutionPlan,它处理、转换数据并将数据发送到树的下一层。首先,两个 ParquetExec 节点并行读取 Parquet 文件,每个节点将其数据流输出到其对应的 SortExec 节点。SortExc 节点负责按 city 升序和 time 降序对数据进行排序。UnionExec 节点合并来自两个 SortExec 节点的已排序输出,然后由 SortPreservingMergeExec 节点(排序)合并,以返回已排序的数据。

如何理解大型查询计划

大型查询计划可能看起来令人生畏,但如果您遵循以下步骤,您可以快速了解计划的作用。

  1. 始终从下往上,一次阅读一个 Exec 节点。
  2. 了解每个 Exec 节点的工作。大部分信息可在 DataFusion Physical Plan 文档 中找到,或直接从 其 repo 中获取。DataFusion 文档中没有的 ExecutionPlans 是 InfluxDB 特有的——更多信息可在 InfluxDB repo 中找到。
  3. 回顾 Exec 节点的输入数据是什么样的,以及它可能有多大/多小。
  4. 考虑 Exec 节点可能发送多少数据以及数据会是什么样子的。

使用这些步骤,您可以估计计划必须完成多少工作。但是,explain 命令显示的是计划,而没有执行它。如果您想确切知道计划及其每个 ExecutionPlan 执行需要多长时间,您需要其他工具。

显示每个 ExecutionPlan 确切运行时的工具

  1. 运行 EXPLAIN ANALYZE, 以打印出“explain plan”(参见图 1),其中注释了执行计数器和诸如运行时和生成行数的信息。
  2. 还有其他工具,例如使用 Jaeger 的分布式跟踪,我们将在以后的博文中介绍。

更多用于调试的信息

如果计划必须读取许多文件,则 EXPLAIN 报告不会显示所有文件。要查看所有文件,请使用 EXPLAIN VERBOSE.EXPLAIN 一样,EXPLAIN VERBOSE 不会运行查询,也不会告诉您运行时。相反,您将获得从 EXPLAIN 报告中省略的所有信息,以及 InfluxDB 3.0 查询器和 DataFusion 在返回最终物理计划之前生成的所有中间物理计划。这对于调试非常有用,因为您可以看到计划何时添加或删除运算符,以及 InfluxDB 和 DataFusion 为优化您的查询所做的工作。

前沿数据的典型计划示例

让我们深入研究一个示例,该示例涵盖了典型的 ExecutionPlans 以及 InfluxDB 特有的关于前沿数据的 ExecutionPlans。

数据组织

为了更容易解释下面的计划,图 3 显示了计划读取的数据组织。一旦您习惯了阅读查询计划,您就可以从计划本身中弄清楚这一点。需要注意的一些细节

  • 系统中可能存在更多数据。这只是查询在应用查询的谓词以修剪超出范围的分区后读取的数据。
  • 最近接收到的数据正在被摄取,尚未持久化。在计划中,RecordBatchesExec 表示来自摄取器但尚未持久化到 Parquet 文件的数据。
  • 从存储中检索了四个 Parquet 文件,由两个 ParquetExec 节点表示,每个节点包含两个文件
    • 在第一个节点中,两个文件 file_1file_2时间上不与任何其他文件重叠,并且没有任何重复数据。文件内的数据永远不会有重复项,因此对于非重叠文件,永远不需要重复数据删除。
    • 在第二个节点中,两个文件 file_3file_4 彼此重叠,并且与 RecordBatchesExec 表示的摄取数据重叠。


图 3:图 4 中查询计划的数据

查询和查询计划

EXPLAIN 
SELECT city, count(1) 
FROM   temperature 
WHERE  time >= to_timestamp(200) AND time < to_timestamp(700) 
AND state = 'MA' 
GROUP BY city 
ORDER BY city ASC;

Reading Query Plans Diagram 4_01.17.24v1.png 图 4:前沿(最新)数据的典型查询计划。注意:左列中的颜色对应于下面的图。

阅读逻辑计划

图 5 中的逻辑计划显示,表扫描首先发生,然后查询谓词过滤数据。接下来,计划聚合数据以计算每个城市的行数计数。最后,计划对数据进行排序并返回数据。 图 5:来自图 4 的逻辑计划

阅读物理计划

让我们从下往上开始阅读。底部或叶节点始终是 ParquetExecRecordBatchExec。此计划中有三个,因此让我们逐一介绍。

底部三个叶节点由两个 ParquetExec 节点和一个 RecordBatchesExec 节点组成。

第一个 ParqetExec 图 6:第一个 ParquetExec

  • ParquetExec 包括两组文件。每组可以包含一个或多个文件,但在本示例中,每组中有一个文件。该节点并行执行这些组,并按顺序读取每组中的文件。因此,在本示例中,这两个文件是并行读取的。
  • 1/1/237/2cbb3992-4607-494d-82e4-66c480123189.parquet:这是对象存储中文件的路径。它的结构是 db_id/table_id/partition_hash_id/uuid_of_the_file.parquet,每个段分别告诉我们
    • 查询哪个数据库和表
    • 文件属于哪个分区(您可以计算此查询读取了多少分区)
    • 它是哪个文件
  • projection=[__chunk_order, city, state, time]:此表中有许多列,但该节点仅读取这四列。__chunk_order 列是 InfluxDB 代码生成的人工列,用于保持块/文件排序以进行重复数据删除。
  • output_ordering=[state@2 ASC, city@1 ASC, time@3 ASC, __chunk_order@0 ASC]:此 ParquetExec 节点将按 state ASC, city ASC, time ASC, __chunk_order ASC 对其输出进行排序。InfluxDB 在存储 Parquet 文件时会自动对其进行排序,以提高存储压缩率和查询效率。
  • predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA:这是查询中用于数据修剪的过滤器。
  • pruning_predicate=time_max@0 >= 200 AND time_min@1 < 700 AND state_min@2 <= MA AND MA <= state_max@3:这是从上面的谓词转换而来的实际修剪谓词。它用于过滤该谓词之外的文件。目前(2023 年 12 月),InfluxDB 3.0 仅根据 time. 过滤文件。请注意,此谓词用于从选定的分区中修剪文件

RecordBatchesExec

图 7:RecordBatchesExec

来自摄取器的数据可能位于许多块中,但通常情况下,如本示例所示,只有一个块。此节点仅将来自四列的数据发送到输出,就像 ParquetExec 节点一样。我们将过滤列的操作称为投影下推。因此,它在查询计划中具有名称 projection

第二个 ParquetExec 图 8:第二个 ParquetExec

读取第二个 ParquetExec 节点与上面的节点类似。请注意,两个 ParquetExec 节点中的文件都属于同一分区 (237)。

数据扫描结构

为什么我们将来自同一分区的 Parquet 文件发送到不同的 ParquetExec?原因有很多,但主要原因有两个

  1. 通过将非重叠部分与重叠部分分开(在本示例中就是这种情况),最大限度地减少重复数据删除所需的工作。
  2. 通过拆分非重叠部分来提高并行性。

我们如何知道数据是否重叠? 图 9:DeduplicationExec 是重叠数据的信号

图 9 中的 DeduplicationExec 告诉我们,前面的数据(即下面的数据)是重叠的。更具体地说,两个文件中的数据重叠和/或与来自摄取器的数据重叠。

  • FilterExec: time@3 >= 200 AND time@3 < 700 AND state@2 = MA:这是我们过滤掉满足条件 time@3 >= 200 AND time@3 < 700 AND state@2 = MA 的所有内容的地方。之前的操作仅在可能的情况下修剪数据。它不保证修剪所有数据。我们需要此过滤器来执行完整而精确的过滤。
  • CoalesceBatchesExec: target_batch_size=8192 是一种将小数据分组为更大组的方法(如果可能)。有关其工作原理,请参阅 DataFusion 文档
  • SortExec: expr=[state@2 ASC,city@1 ASC,time@3 ASC,__chunk_order@0 ASC]:这按 state ASC, city ASC, time ASC, __chunk_order ASC 对数据进行排序。请注意,此排序仅适用于来自摄取器的数据,因为来自 Parquet 文件的数据已按该顺序排序。
  • UnionExec 只是一个将多个数据流拉到一起的地方。它执行速度很快,并且不合并任何内容。
  • SortPreservingMergeExec: [state@2 ASC,city@1 ASC,time@3 ASC,__chunk_order@0 ASC]:此操作合并预排序的数据。当您看到此操作时,您知道其下方的数据已排序,并且输出在一个数据流中。
  • DeduplicateExec: [state@2 ASC,city@1 ASC,time@3 ASC]:此操作严格从一个输入流中删除重复的排序数据。这就是为什么您经常在 DeduplicateExec 下看到 SortPreservingMergeExec,但这不是必需的。只要 DeduplicateExec 的输入是单个排序数据流,它就可以正常工作。

我们如何知道数据不重叠? 图 10:没有 DeduplicateExec 表示文件不重叠

ParquetExecRecordBatchesExec 分支不通向 DeduplicateExec 时,我们知道该 Exec 处理的文件不重叠。

  • ProjectionExec: expr=[city@0 as city]:这过滤列数据,仅发送来自 city 列的数据。

其他 ExecutionPlans

现在让我们看看计划的其余部分。 图 11:计划结构的其余部分

  • UnionExec:联合数据流。请注意,输出流的数量与输入流的数量相同。上面的 ExecutionPlan 负责进一步合并或拆分数据流。此 UnionExec 是合并/拆分的中间步骤。
  • RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3:这以轮询方式将三个输入流拆分为四个输出流。此集群有四个可用核心,因此此 RepartitionExec 将数据划分为四个流以提高并行执行。
  • AggregateExec: mode=Partial, gby=[city@0 as city], aggr=[COUNT(Int64(1))]:这会将数据分组为具有相同 city 值的分组。由于有四个输入流,因此每个流都分别聚合,这将创建四个输出流。这也意味着输出数据未完全聚合,如 mode=Partial 标志所示。
  • RepartitionExec: partitioning=Hash([city@0], 4), input_partitions=4:这会在 hash(city) 上重新分区数据到四个流中,以便同一城市进入同一流。
  • AggregateExec: mode=FinalPartitioned, gby=[city@0 as city], aggr=[COUNT(Int64(1))]:由于同一城市的行位于同一流中,因此我们只需要进行最终聚合。
  • SortExec: expr=[city@0 ASC NULLS LAST]:根据查询请求,对四个数据流中的每一个按 city 进行排序。
  • SortPreservingMergeExec: [city@0 ASC NULLS LAST]:(排序)合并四个已排序的流以返回最终结果。

如果您看到一个计划读取许多文件并对所有文件执行重复数据删除,您可能会问:“所有文件都重叠还是不重叠?” 答案是肯定或否定,具体取决于具体情况。有时,压缩器可能会落后,如果您给它一些时间来压缩小型和重叠的文件,您的查询将读取更少的文件,速度更快。如果仍然有很多文件,您可能需要检查压缩器的工作负载并根据需要添加更多资源。由于查询器的内存限制,我们重复数据删除非重叠文件还有其他原因,但这些是未来博文的主题。

结论

EXPLAIN 是一种了解 InfluxDB 如何执行您的查询以及为什么它速度快或慢的方法。您通常可以重写查询以添加更多过滤器或删除不必要的排序(查询中的 order by)以使查询运行更快。其他时候,查询速度慢是因为您的系统缺少资源。在这种情况下,是时候重新评估集群配置或咨询 InfluxDB 支持团队了。