InfluxDB 中的 Flux 聚合:现在还是以后

导航至

在处理大量时间序列数据时,聚合是一个强大的工具。实际上,你通常更关心数据集的 minmaxmeancountlast 值,而不是你收集的原始值。

了解这一点,InfluxDBFlux 语言都使得在这些需要的地方运行这些聚合尽可能简单,有时这导致人们以效率不高的方式进行聚合。以下是一些确保您的聚合查询尽可能快地运行的方法。

不要过早地进行聚合

虽然聚合函数很棒,您也很想使用它们,但请务必注意不要过早地使用它们。我们经常看到有人使用这样的查询

from(bucket: "myBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> filter(fn: (r) => r["host"] == "myHost")
  |> filter(fn: (r) => r["cpu"] == "cpu-total")
  |> yield(name: "mean")

虽然这个查询将返回您想要的确切结果,即特定主机的平均总 CPU,但它也做了比它需要做的工作更多。

因为 aggregateWindow 函数在调用主机和 cpu 上的 filter 之前被调用,InfluxDB 首先计算出所有主机和 cpu 值的平均值,然后在完成所有这些艰苦的工作之后才丢弃原始数据。

相反,在调用聚合函数之前尽可能进行所有过滤。这减少了进入这些计算的总数据量,这将为您的大型数据集查询提供很大的速度提升。

from(bucket: "myBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["host"] == "myHost")
  |> filter(fn: (r) => r["cpu"] == "cpu-total")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> yield(name: "mean")

您仍然可以在调用聚合后过滤数据,这在您想基于聚合结果进行过滤时很有用。但对于仅检查您桶中原始数据的过滤器,始终最好先应用它们。实际上,这正是 InfluxDB UI 的查询构建器自动执行的操作!

也不要太晚进行聚合

虽然您不希望在聚合数据时领先,但也可能在查询中太晚调用它们,导致响应时间变慢。例如

from(bucket: "myBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["host"] == "myHost")
  |> filter(fn: (r) => r["cpu"] == "cpu-total")
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> last(column: "_time")

这同样会做到您想要的效果:以单行形式呈现匹配系列中每个字段的最后值,字段作为列。然而,这个查询会让InfluxDB对整个数据集只进行一次pivot 操作,然后再立即丢弃这些辛苦的工作。

在这里,最好在数据转换之前调用last 聚合函数,以减少最终需要转换的数据量。

from(bucket: "myBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["host"] == "myHost")
  |> filter(fn: (r) => r["cpu"] == "cpu-total")
  |> last(column: "_time")
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

向下推送的机会

当您的Flux查询执行时,它首先会从InfluxDB的存储层读取您的测量数据,然后在内存中对这些数据进行计算和转换。其中一些步骤可以由存储层代替执行,这意味着开始时就不需要将大量数据读入内存。

当这种情况发生时,我们称之为“向下推送”这些步骤到Flux以下的一层,即存储引擎层,能够执行此操作的查询模式被称为向下推送模式。这不仅会导致读取到内存中的数据更少,还会减少Flux引擎需要完成的工作量。

第一个也是最基础的向下推送模式是大多数Flux查询使用的通用from() |> range() |> filter() 模式。如果您可以在其中之一之后立即放置聚合函数,其中一些也可以向下推送至存储层。让我们来看看我们的上一个查询

from(bucket: "myBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["host"] == "myHost")
  |> filter(fn: (r) => r["cpu"] == "cpu-total")
  |> last(column: "_time")
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

因为我们把last函数立即放置在一个from() |> range() |> filter()模式之后,它也会被向下推送至存储层。这意味着实际上只有我们选择的系列中cpu测量的最后值会被读入Flux运行时,而在内存中执行的唯一步骤是最后的pivot步骤。如果我们把它放在pivot之后调用last,不仅所有这些额外数据都需要读入内存,而且pivotlast函数都会在Flux层中执行。

总结

所以,总结一下,聚合很好,Flux在这方面做得很好,您可以通过尽可能多的过滤在应用聚合之前,等待在聚合之后执行任何数据转换,并留意使用可以由存储层执行更多繁重工作的向下推送模式的机会来让它为您工作得更好。

进一步阅读