如何阅读 InfluxDB 3.0 查询计划
由 Nga Tran / 开发者
2024年1月29日
导航至
本文解释了如何在 InfluxDB 3.0 中读取查询计划,并需要基本了解 InfluxDB 3.0 系统架构。
InfluxDB 3.0 支持两种查询语言:SQL 和 InfluxQL。数据库根据查询计划指令执行 SQL 或 InfluxQL 编写的查询。要查看查询计划而不运行查询,请在查询前添加关键词 EXPLAIN
,如下所示
EXPLAIN
SELECT city, min_temp, time
FROM temperature
ORDER BY city ASC, time DESC;
输出将如下所示:
图 1:查询计划的简化输出
有两种类型的计划:逻辑计划和物理计划。
逻辑计划:这是一个为特定 SQL 或 InfluxQL 查询生成的计划,不知道底层数据组织或集群配置。因为 InfluxDB 3.0 是建立在 DataFusion 之上,逻辑计划与您在任何数据格式或存储中看到的数据类似。
物理计划:这是一个从查询的对应逻辑计划加上集群配置(例如,CPU数量)和底层数据组织(例如,文件数量,文件中数据的布局等)信息生成的计划。物理计划针对您的数据和InfluxDB集群配置是特定的。如果您将相同的数据加载到具有不同配置的不同集群中,相同的查询可能会生成不同的物理查询计划。同样,在相同集群上在不同时间运行相同的查询,其计划可能根据当时的数据而有所不同。
理解查询计划可以帮助解释查询为什么慢。例如,如果计划显示您的查询读取了多个文件,您可以添加更多过滤器以减少其需要读取的数据量,或者修改您的集群配置/设计以创建较少但更大的文件。本文档重点介绍如何阅读查询计划。使查询运行更快的技术取决于它为什么慢,这超出了本文的范围。
查询计划是一棵树
查询计划是一棵倒置的树,应该从下往上阅读。在树格式中,我们可以用以下方式表示图1中的物理计划:
图2:图1中物理计划的树结构
树中每个节点的名称以Exec
结尾,表示处理、转换并将数据发送到树中下一级的ExecutionPlan
。首先,两个ParquetExec
节点并行读取Parquet文件,每个节点输出一个数据流到其对应的SortExec
节点。负责按city
升序和time
降序排序数据的SortExc
节点。然后将来自两个SortExec
节点的排序输出合并,由SortPreservingMergeExec
节点进行(排序)合并,以返回排序后的数据。
如何理解大型查询计划
大型查询计划可能看起来令人畏惧,但如果您遵循以下步骤,可以快速了解该计划做什么。
- 像往常一样,从下往上,一次一个
Exec
节点。 - 理解每个
Exec
节点的任务。大部分信息可以在DataFusion物理计划文档或直接从其仓库中获得。不在DataFusion文档中的ExecutionPlans
是InfluxDB特定的——更多信息可以在InfluxDB仓库中找到。 - 回想一下
Exec
节点的输入数据看起来是什么样子,以及它可能有多大或有多小。 - 考虑该
Exec
节点可能发送的数据量以及它的外观。
使用这些步骤,您可以估计计划需要做多少工作。然而,explain
命令显示计划而不执行它。如果您想确切知道计划和其每个ExecutionPlan的执行时间,您需要其他工具。
显示每个ExecutionPlan确切运行时间的工具
- 运行
EXPLAIN ANALYZE,
以打印带有执行计数器以及运行时间和生成的行数等信息的‘explain plan’(见图1)。 - 还有其他工具,例如使用Jaeger的分布式跟踪,我们将在未来的文章中描述。
调试更多信息
如果计划需要读取许多文件,则EXPLAIN
报告不会显示所有文件。要查看所有文件,请使用EXPLAIN VERBOSE.
与EXPLAIN
一样,EXPLAIN VERBOSE
不会运行查询,也不会告诉您运行时间。相反,您将获得从EXPLAIN
报告中省略的所有信息以及InfluxDB 3.0查询器和DataFusion在返回最终物理计划之前生成的所有中间物理计划。这对于调试非常有帮助,因为您可以看到计划何时添加或删除一个我用这个ExecutionPlan替换的操作,以及InfluxDB和DataFusion如何优化您的查询。
领先数据的典型计划的示例
让我们深入探讨一个例子,该例子涵盖了典型的ExecutionPlans以及针对领先数据的InfluxDB特定的计划。
数据组织
为了更容易解释下面的计划,图3显示了计划所读取的数据组织。一旦您习惯了阅读查询计划,您就可以从计划本身中找出这一点。一些需要注意的细节
- 系统中可能有更多数据。这仅仅是查询应用查询谓词剪枝超出范围的分区后的数据。
- 最近接收到的数据正在被摄取,尚未持久化。在计划中,
RecordBatchesExec
表示尚未持久化到Parquet文件中的摄取器数据。 - 从存储中检索了四个Parquet文件,并由两个包含每个两个文件的
ParquetExec
节点表示- 在第一个节点中,两个文件
file_1
和file_2
与任何其他文件在时间上不重叠,并且没有重复的数据。文件内的数据永远不会重复,因此对于非重叠文件,永远不会需要去重。 - 在第二个节点中,两个文件
file_3
和file_4
彼此重叠,并且与表示为RecordBatchesExec
的摄取数据重叠。
- 在第一个节点中,两个文件
图3:图4中查询计划的数据
查询和查询计划
EXPLAIN
SELECT city, count(1)
FROM temperature
WHERE time >= to_timestamp(200) AND time < to_timestamp(700)
AND state = 'MA'
GROUP BY city
ORDER BY city ASC;
图4:领先(最新)数据的典型查询计划。注意:左侧列的颜色与下面的图相对应。
阅读逻辑计划
图5中的逻辑计划显示,首先发生表扫描,然后查询谓词过滤数据。接下来,计划聚合数据以计算每个城市的行数。最后,计划排序并返回数据。 图5:图4的逻辑计划
阅读物理计划
让我们从下往上阅读。最底层或叶子节点始终是ParquetExec
或RecordBatchExec
。这个计划中有三个,所以让我们逐一介绍。
这三个底层的叶子节点由两个ParquetExec
节点和一个RecordBatchesExec
节点组成。
第一个ParquetExec
图6:第一个ParquetExec
- 这个
ParquetExec
包含两组文件。每组可以包含一个或多个文件,但在本例中,每组只有一个文件。节点并行执行这些组,并按顺序读取每个组中的文件。因此,在本例中,两个文件是并行读取的。 1/1/237/2cbb3992-4607-494d-82e4-66c480123189.parquet
:这是对象存储中文件的路径。它位于结构db_id/table_id/partition_hash_id/文件uuid.parquet
中,每个部分分别告诉我们- 查询的是哪个数据库和表
- 文件属于哪个分区(你可以计算这个查询读取了多少个分区)
- 这是哪个文件
projection=[__chunk_order, city, state, time]
:这个表中有很多列,但节点只读取这四个。__chunk_order
列是 InfluxDB 代码生成的一个人工列,用于在去重时保持块/文件的顺序。output_ordering=[state@2 ASC, city@1 ASC, time@3 ASC, __chunk_order@0 ASC]
:这个ParquetExec
节点将按state ASC, city ASC, time ASC, __chunk_order ASC
对其输出进行排序。InfluxDB 在存储 Parquet 文件时自动对其进行排序,以提高存储压缩和查询效率。predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA
:这是用于数据修剪的查询中的过滤器。pruning_predicate=time_max@0 >= 200 AND time_min@1 < 700 AND state_min@2 <= MA AND MA <= state_max@3
:这是从上述谓词转换的实际修剪谓词。它用于过滤掉谓词之外的文件。截至 2023 年 12 月,InfluxDB 3.0 仅根据time
过滤文件。请注意,这个谓词用于修剪 选择分区中的文件。
RecordBatchesExec
图 7:RecordBatchesExec
来自 ingester 的数据可以包含许多块,但通常,就像这个例子一样,只有一个。这个节点只从四个列中发送数据到输出,就像 ParquetExec
节点一样。我们将 过滤列 的动作称为 投影下推。因此,它在查询计划中名为 projection
。
第二个 ParquetExec
图 8:第二个 ParquetExec
读取第二个 ParquetExec
节点的操作与上面类似。请注意,这两个 ParquetExec
节点中的文件属于同一个分区(237
)。
数据扫描结构
为什么我们要将同一分区的 Parquet 文件发送到不同的 ParquetExec
?有很多原因,但主要有两个
- 通过分割非重叠部分来最小化去重所需的工作(这是本例中的情况)。
- 通过分割非重叠部分来提高并行性。
我们如何知道数据重叠? 图 9:DeduplicationExec 是数据重叠的信号
图 9 中的 DeduplicationExec
告诉我们,前面的数据(即其下面的数据)重叠。更具体地说,两个文件中的数据重叠,并且/或者与 ingester 的数据重叠。
FilterExec: time@3 >= 200 AND time@3 < 700 AND state@2 = MA
:这是我们过滤掉满足条件time@3 >= 200 AND time@3 < 700 AND state@2 = MA
的所有内容的地方。前面的操作在可能的情况下修剪数据。它不保证修剪所有数据。我们需要这个过滤器来完成和精确的过滤。CoalesceBatchesExec: target_batch_size=8192
是一种在可能的情况下将小数据分组到更大的组中的方法。有关其工作原理,请参阅 DataFusion 文档。SortExec: expr=[state@2 升序,city@1 升序,time@3 升序,__chunk_order@0 升序]
:这将在state 升序, city 升序, time 升序, __chunk_order 升序
上对数据进行排序。请注意,此排序仅适用于来自ingesters的数据,因为来自Parquet文件的数据已经按照该顺序排序。UnionExec
只是一个将多个流拉在一起的地方。执行速度快,不进行合并。SortPreservingMergeExec: [state@2 升序,city@1 升序,time@3 升序,__chunk_order@0 升序]
:此操作合并预先排序的数据。当你看到这个时,你就知道下面的数据已经排序,并且输出是单个流。DeduplicateExec: [state@2 升序,city@1 升序,time@3 升序]
:此操作从单个输入流严格去重排序数据。这就是为什么你经常在DeduplicateExec
下看到SortPreservingMergeExec
,但它不是必需的。只要DeduplicateExec
的输入是单个排序数据流,它就会正确工作。
我们如何知道数据没有重叠? 图10:没有DeduplicateExec
意味着文件没有重叠
当一个ParquetExec
或RecordBatchesExec
分支没有指向DeduplicateExec
,我们知道该Exec
处理的文件没有重叠。
ProjectionExec: expr=[city@0 as city]
:这过滤列数据,并且只发送来自列city
的数据。
其他执行计划
现在让我们看看计划的其余部分。 图11:计划的其余部分结构
UnionExec
:合并数据流。请注意,输出流的数量与输入流的数量相同。上面的执行计划负责进一步合并或拆分流。这个UnionExec
是合并/拆分的中继步骤。RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
:以轮询方式将三个输入流拆分为四个输出流。这个集群有四个可用的核心,因此这个RepartitionExec将数据分为四个流以提高并行执行。AggregateExec: mode=Partial, gby=[city@0 as city], aggr=[COUNT(Int64(1))]
:将数据分组到具有相同city
值的组中。因为有四个输入流,所以每个流都单独聚合,这创建了四个输出流。这也意味着输出数据没有完全聚合,如mode=Partial
标志所示。RepartitionExec: partitioning=Hash([city@0], 4), input_partitions=4
:根据hash(city)
重新分配数据到四个流,以便相同的城市进入同一个流。AggregateExec: mode=FinalPartitioned, gby=[city@0 as city], aggr=[COUNT(Int64(1))]
:因为同一城市的行在同一个流中,所以我们只需要进行最终聚合。SortExec: expr=[city@0 升序 NULLS LAST]
:根据查询请求对每个四个数据流按city
排序。SortPreservingMergeExec: [city@0 升序 NULLS LAST]
:(排序)合并四个排序流以返回最终结果。
如果看到某个计划读取很多文件并对所有文件进行去重,你可能会问:“所有文件都重叠吗?”答案是是或否,这取决于具体情况。有时压缩器可能落后,如果你给它一些时间来压缩小而重叠的文件,你的查询将更快地读取更少的文件。如果文件仍然很多,你可能需要检查压缩器的负载,并根据需要添加更多资源。我们之所以对非重叠文件进行去重,还有其他原因,那就是由于你的查询器的内存限制,但这些是未来博客文章的主题。
结论
EXPLAIN
是一种了解 InfluxDB 如何执行你的查询以及为什么它运行得快或慢的方法。你通常可以重写你的查询来添加更多过滤器或删除不必要的排序(查询中的 order by
)以提高查询速度。有时,查询之所以慢,是因为你的系统缺少资源。在这种情况下,是时候重新评估集群配置或咨询 InfluxDB 支持团队了。