InfluxDB 中的 Flux 聚合:现在还是稍后

导航至

在处理大量时间序列数据时,聚合是一个强大的工具。事实上,大多数时候,您更关心数据集的 minmaxmeancountlast 值,而不是您正在收集的原始值。

了解到这一点,InfluxDBFlux 语言使运行这些聚合变得尽可能容易,无论何时何地您需要,但有时这会导致人们以效率不高的方式运行它们。以下是一些确保您的聚合查询尽可能快速运行的方法。

不要过早聚合

尽管聚合函数很强大,并且您很想使用它们,但请注意不要过早使用它们。通常,我们会看到有人使用这样的查询

from(bucket: "myBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> filter(fn: (r) => r["host"] == "myHost")
  |> filter(fn: (r) => r["cpu"] == "cpu-total")
  |> yield(name: "mean")

虽然此查询将返回您想要的确切结果,在本例中是特定主机的平均总 CPU,但它也做了很多不必要的工作。

由于 `aggregateWindow` 函数在对主机和 CPU 的 `filter` 调用之前被调用,InfluxDB 最终首先计算所有主机和 CPU 值的平均值,然后在完成所有这些繁重的工作后丢弃原始数据。

相反,在使用聚合函数之前,尽可能执行所有过滤。这减少了进入这些计算的数据总量,这将大大提高您的查询速度,尤其是在大型数据集上。

from(bucket: "myBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["host"] == "myHost")
  |> filter(fn: (r) => r["cpu"] == "cpu-total")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> yield(name: "mean")

您仍然可以在调用聚合后过滤数据,这在您想根据聚合结果进行过滤时非常有用。但是对于仅检查存储桶中原始数据的过滤器,最好先应用它们。事实上,这正是 InfluxDB UI 的查询构建器自动执行的操作!

也不要过晚聚合

虽然您不想在聚合方面领先于您的数据,但也有可能在查询中过晚调用它们,从而导致响应时间变慢。例如

from(bucket: "myBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["host"] == "myHost")
  |> filter(fn: (r) => r["cpu"] == "cpu-total")
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> last(column: "_time")

同样,这将完成您想要的操作:为您提供匹配系列中每个字段的最后一个值,并将其呈现为以字段作为列的单行。但是,此查询将使 InfluxDB 对整个数据集执行 pivot 操作,然后再次立即丢弃那些辛勤工作。

在这里,最好在透视数据之前调用 last 聚合函数,以减少必须转换的数据量,使其仅为您最终想要的数据。

from(bucket: "myBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["host"] == "myHost")
  |> filter(fn: (r) => r["cpu"] == "cpu-total")
  |> last(column: "_time")
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

下推机会

当您的 Flux 查询执行时,它做的第一件事是从 InfluxDB 的存储层读取您的测量数据,然后在内存中对该数据执行计算和转换。其中一些步骤可以由存储层来执行,这意味着首先要读取到内存中的数据更少。

当这种情况发生时,我们称之为将这些步骤“下推”到 Flux 下面的层,即存储引擎层,而可以执行此操作的查询模式称为下推模式。这些不仅导致更少的数据被读取到内存中,而且还减少了 Flux 引擎必须完成的工作量。

第一个也是最基本的下推模式是大多数 Flux 查询使用的常见的 from() |> range() |> filter()。如果您可以将聚合函数紧接在其中一个之后,那么其中一些函数也可以下推到存储层。让我们看一下我们的最后一个查询

from(bucket: "myBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["host"] == "myHost")
  |> filter(fn: (r) => r["cpu"] == "cpu-total")
  |> last(column: "_time")
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

因为我们将 last 函数紧接在 from() |> range() |> filter() 模式之后,它也将被下推到存储层。这意味着实际读取到 Flux 运行时的唯一数据是我们选定系列的 CPU 测量的最后一个值,而内存中执行的唯一步骤是最后的透视步骤。如果我们将其留在 pivot 之后调用 last,那么不仅所有额外的数据都必须读取到内存中,而且 pivotlast 函数都将在 Flux 层中执行。

总结

总而言之,聚合非常棒,Flux 在这方面非常出色,您可以通过在应用聚合之前尽可能多地进行过滤,等待在聚合之后再执行任何数据转换,并留意使用下推模式的机会,让存储层完成更多繁重的工作,从而使其更好地为您服务。

延伸阅读