如何阅读 InfluxDB 3.0 查询计划

导航至

本文解释了如何在 InfluxDB 3.0 中读取查询计划,并需要基本了解 InfluxDB 3.0 系统架构

InfluxDB 3.0 支持两种查询语言:SQL 和 InfluxQL。数据库根据查询计划指令执行 SQL 或 InfluxQL 编写的查询。要查看查询计划而不运行查询,请在查询前添加关键词 EXPLAIN,如下所示

EXPLAIN 
SELECT   city, min_temp, time 
FROM     temperature 
ORDER BY city ASC, time DESC;

输出将如下所示:
图 1:查询计划的简化输出

有两种类型的计划:逻辑计划和物理计划。

逻辑计划:这是一个为特定 SQL 或 InfluxQL 查询生成的计划,不知道底层数据组织或集群配置。因为 InfluxDB 3.0 是建立在 DataFusion 之上,逻辑计划与您在任何数据格式或存储中看到的数据类似。

物理计划:这是一个从查询的对应逻辑计划加上集群配置(例如,CPU数量)和底层数据组织(例如,文件数量,文件中数据的布局等)信息生成的计划。物理计划针对您的数据和InfluxDB集群配置是特定的。如果您将相同的数据加载到具有不同配置的不同集群中,相同的查询可能会生成不同的物理查询计划。同样,在相同集群上在不同时间运行相同的查询,其计划可能根据当时的数据而有所不同。

理解查询计划可以帮助解释查询为什么慢。例如,如果计划显示您的查询读取了多个文件,您可以添加更多过滤器以减少其需要读取的数据量,或者修改您的集群配置/设计以创建较少但更大的文件。本文档重点介绍如何阅读查询计划。使查询运行更快的技术取决于它为什么慢,这超出了本文的范围。

查询计划是一棵树

查询计划是一棵倒置的树,应该从下往上阅读。在树格式中,我们可以用以下方式表示图1中的物理计划:
图2:图1中物理计划的树结构

树中每个节点的名称以Exec结尾,表示处理、转换并将数据发送到树中下一级的ExecutionPlan。首先,两个ParquetExec节点并行读取Parquet文件,每个节点输出一个数据流到其对应的SortExec节点。负责按city升序和time降序排序数据的SortExc节点。然后将来自两个SortExec节点的排序输出合并,由SortPreservingMergeExec节点进行(排序)合并,以返回排序后的数据。

如何理解大型查询计划

大型查询计划可能看起来令人畏惧,但如果您遵循以下步骤,可以快速了解该计划做什么。

  1. 像往常一样,从下往上,一次一个Exec节点。
  2. 理解每个Exec节点的任务。大部分信息可以在DataFusion物理计划文档或直接从其仓库中获得。不在DataFusion文档中的ExecutionPlans是InfluxDB特定的——更多信息可以在InfluxDB仓库中找到。
  3. 回想一下Exec节点的输入数据看起来是什么样子,以及它可能有多大或有多小。
  4. 考虑该Exec节点可能发送的数据量以及它的外观。

使用这些步骤,您可以估计计划需要做多少工作。然而,explain命令显示计划而不执行它。如果您想确切知道计划和其每个ExecutionPlan的执行时间,您需要其他工具。

显示每个ExecutionPlan确切运行时间的工具

  1. 运行EXPLAIN ANALYZE,以打印带有执行计数器以及运行时间和生成的行数等信息的‘explain plan’(见图1)。
  2. 还有其他工具,例如使用Jaeger的分布式跟踪,我们将在未来的文章中描述。

调试更多信息

如果计划需要读取许多文件,则EXPLAIN报告不会显示所有文件。要查看所有文件,请使用EXPLAIN VERBOSE.EXPLAIN一样,EXPLAIN VERBOSE不会运行查询,也不会告诉您运行时间。相反,您将获得从EXPLAIN报告中省略的所有信息以及InfluxDB 3.0查询器和DataFusion在返回最终物理计划之前生成的所有中间物理计划。这对于调试非常有帮助,因为您可以看到计划何时添加或删除一个我用这个ExecutionPlan替换的操作,以及InfluxDB和DataFusion如何优化您的查询。

领先数据的典型计划的示例

让我们深入探讨一个例子,该例子涵盖了典型的ExecutionPlans以及针对领先数据的InfluxDB特定的计划。

数据组织

为了更容易解释下面的计划,图3显示了计划所读取的数据组织。一旦您习惯了阅读查询计划,您就可以从计划本身中找出这一点。一些需要注意的细节

  • 系统中可能有更多数据。这仅仅是查询应用查询谓词剪枝超出范围的分区后的数据。
  • 最近接收到的数据正在被摄取,尚未持久化。在计划中,RecordBatchesExec表示尚未持久化到Parquet文件中的摄取器数据。
  • 从存储中检索了四个Parquet文件,并由两个包含每个两个文件的ParquetExec节点表示
    • 在第一个节点中,两个文件file_1file_2与任何其他文件在时间上不重叠,并且没有重复的数据。文件内的数据永远不会重复,因此对于非重叠文件,永远不会需要去重。
    • 在第二个节点中,两个文件file_3file_4彼此重叠,并且与表示为RecordBatchesExec的摄取数据重叠。


图3:图4中查询计划的数据

查询和查询计划

EXPLAIN 
SELECT city, count(1) 
FROM   temperature 
WHERE  time >= to_timestamp(200) AND time < to_timestamp(700) 
AND state = 'MA' 
GROUP BY city 
ORDER BY city ASC;

Reading Query Plans Diagram 4_01.17.24v1.png 图4:领先(最新)数据的典型查询计划。注意:左侧列的颜色与下面的图相对应。

阅读逻辑计划

图5中的逻辑计划显示,首先发生表扫描,然后查询谓词过滤数据。接下来,计划聚合数据以计算每个城市的行数。最后,计划排序并返回数据。 图5:图4的逻辑计划

阅读物理计划

让我们从下往上阅读。最底层或叶子节点始终是ParquetExecRecordBatchExec。这个计划中有三个,所以让我们逐一介绍。

这三个底层的叶子节点由两个ParquetExec节点和一个RecordBatchesExec节点组成。

第一个ParquetExec 图6:第一个ParquetExec

  • 这个 ParquetExec 包含两组文件。每组可以包含一个或多个文件,但在本例中,每组只有一个文件。节点并行执行这些组,并按顺序读取每个组中的文件。因此,在本例中,两个文件是并行读取的。
  • 1/1/237/2cbb3992-4607-494d-82e4-66c480123189.parquet:这是对象存储中文件的路径。它位于结构 db_id/table_id/partition_hash_id/文件uuid.parquet 中,每个部分分别告诉我们
    • 查询的是哪个数据库和表
    • 文件属于哪个分区(你可以计算这个查询读取了多少个分区)
    • 这是哪个文件
  • projection=[__chunk_order, city, state, time]:这个表中有很多列,但节点只读取这四个。__chunk_order 列是 InfluxDB 代码生成的一个人工列,用于在去重时保持块/文件的顺序。
  • output_ordering=[state@2 ASC, city@1 ASC, time@3 ASC, __chunk_order@0 ASC]:这个 ParquetExec 节点将按 state ASC, city ASC, time ASC, __chunk_order ASC 对其输出进行排序。InfluxDB 在存储 Parquet 文件时自动对其进行排序,以提高存储压缩和查询效率。
  • predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA:这是用于数据修剪的查询中的过滤器。
  • pruning_predicate=time_max@0 >= 200 AND time_min@1 < 700 AND state_min@2 <= MA AND MA <= state_max@3:这是从上述谓词转换的实际修剪谓词。它用于过滤掉谓词之外的文件。截至 2023 年 12 月,InfluxDB 3.0 仅根据 time 过滤文件。请注意,这个谓词用于修剪 选择分区中的文件

RecordBatchesExec

图 7:RecordBatchesExec

来自 ingester 的数据可以包含许多块,但通常,就像这个例子一样,只有一个。这个节点只从四个列中发送数据到输出,就像 ParquetExec 节点一样。我们将 过滤列 的动作称为 投影下推。因此,它在查询计划中名为 projection

第二个 ParquetExec 图 8:第二个 ParquetExec

读取第二个 ParquetExec 节点的操作与上面类似。请注意,这两个 ParquetExec 节点中的文件属于同一个分区(237)。

数据扫描结构

为什么我们要将同一分区的 Parquet 文件发送到不同的 ParquetExec?有很多原因,但主要有两个

  1. 通过分割非重叠部分来最小化去重所需的工作(这是本例中的情况)。
  2. 通过分割非重叠部分来提高并行性。

我们如何知道数据重叠? 图 9:DeduplicationExec 是数据重叠的信号

图 9 中的 DeduplicationExec 告诉我们,前面的数据(即其下面的数据)重叠。更具体地说,两个文件中的数据重叠,并且/或者与 ingester 的数据重叠。

  • FilterExec: time@3 >= 200 AND time@3 < 700 AND state@2 = MA:这是我们过滤掉满足条件 time@3 >= 200 AND time@3 < 700 AND state@2 = MA 的所有内容的地方。前面的操作在可能的情况下修剪数据。它不保证修剪所有数据。我们需要这个过滤器来完成和精确的过滤。
  • CoalesceBatchesExec: target_batch_size=8192 是一种在可能的情况下将小数据分组到更大的组中的方法。有关其工作原理,请参阅 DataFusion 文档
  • SortExec: expr=[state@2 升序,city@1 升序,time@3 升序,__chunk_order@0 升序]:这将在state 升序, city 升序, time 升序, __chunk_order 升序上对数据进行排序。请注意,此排序仅适用于来自ingesters的数据,因为来自Parquet文件的数据已经按照该顺序排序。
  • UnionExec只是一个将多个流拉在一起的地方。执行速度快,不进行合并。
  • SortPreservingMergeExec: [state@2 升序,city@1 升序,time@3 升序,__chunk_order@0 升序]:此操作合并预先排序的数据。当你看到这个时,你就知道下面的数据已经排序,并且输出是单个流。
  • DeduplicateExec: [state@2 升序,city@1 升序,time@3 升序]:此操作从单个输入流严格去重排序数据。这就是为什么你经常在DeduplicateExec下看到SortPreservingMergeExec,但它不是必需的。只要DeduplicateExec的输入是单个排序数据流,它就会正确工作。

我们如何知道数据没有重叠? 图10:没有DeduplicateExec意味着文件没有重叠

当一个ParquetExecRecordBatchesExec分支没有指向DeduplicateExec,我们知道该Exec处理的文件没有重叠。

  • ProjectionExec: expr=[city@0 as city]:这过滤列数据,并且只发送来自列city的数据。

其他执行计划

现在让我们看看计划的其余部分。 图11:计划的其余部分结构

  • UnionExec:合并数据流。请注意,输出流的数量与输入流的数量相同。上面的执行计划负责进一步合并或拆分流。这个UnionExec是合并/拆分的中继步骤。
  • RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3:以轮询方式将三个输入流拆分为四个输出流。这个集群有四个可用的核心,因此这个RepartitionExec将数据分为四个流以提高并行执行。
  • AggregateExec: mode=Partial, gby=[city@0 as city], aggr=[COUNT(Int64(1))]:将数据分组到具有相同city值的组中。因为有四个输入流,所以每个流都单独聚合,这创建了四个输出流。这也意味着输出数据没有完全聚合,如mode=Partial标志所示。
  • RepartitionExec: partitioning=Hash([city@0], 4), input_partitions=4:根据hash(city)重新分配数据到四个流,以便相同的城市进入同一个流。
  • AggregateExec: mode=FinalPartitioned, gby=[city@0 as city], aggr=[COUNT(Int64(1))]:因为同一城市的行在同一个流中,所以我们只需要进行最终聚合。
  • SortExec: expr=[city@0 升序 NULLS LAST]:根据查询请求对每个四个数据流按city排序。
  • SortPreservingMergeExec: [city@0 升序 NULLS LAST]:(排序)合并四个排序流以返回最终结果。

如果看到某个计划读取很多文件并对所有文件进行去重,你可能会问:“所有文件都重叠吗?”答案是是或否,这取决于具体情况。有时压缩器可能落后,如果你给它一些时间来压缩小而重叠的文件,你的查询将更快地读取更少的文件。如果文件仍然很多,你可能需要检查压缩器的负载,并根据需要添加更多资源。我们之所以对非重叠文件进行去重,还有其他原因,那就是由于你的查询器的内存限制,但这些是未来博客文章的主题。

结论

EXPLAIN 是一种了解 InfluxDB 如何执行你的查询以及为什么它运行得快或慢的方法。你通常可以重写你的查询来添加更多过滤器或删除不必要的排序(查询中的 order by)以提高查询速度。有时,查询之所以慢,是因为你的系统缺少资源。在这种情况下,是时候重新评估集群配置或咨询 InfluxDB 支持团队了。