TL;DR InfluxDB 技术提示 – 优化 InfluxDB Cloud 中的 Flux 性能

导航至

假设您正在使用 InfluxDB Cloud,并且充分利用 Flux 来创建自定义数据处理任务、检查和通知。但是,您注意到某些 Flux 脚本的执行速度不如预期。在这篇文章中,我们将学习优化 Flux 性能的最佳实践和工具。这里的一些信息在Flux 初学者面临的 5 大障碍以及学习使用 Flux 的资源Flux 中级用户面临的 5 大障碍以及优化 Flux 的资源中进行了概述。这些博客基于 Flux 团队的工程师 Faith Chikwekwe 和我的一次 InfluxDays 演示。我建议您也关注这些博客。

Flux 性能优化的一般建议

在深入了解一些可用于优化 Flux 性能的工具之前,让我们先深入了解 Flux 性能优化的一些一般建议

  1. 利用下推模式。
  2. 模式突变函数应在查询末尾应用。
  3. 使用变量以避免多次查询数据。
  4. 在需要时将处理工作分配到多个任务中。

我们将在以下部分详细讨论这些技巧。

利用下推模式

为了提供优化指南的背景,让我们首先花一点时间了解 Flux 的工作原理。Flux 能够高效地查询数据,因为某些函数将数据转换工作负载下推到存储,而不是在内存中执行转换。执行此工作的函数组合称为下推模式。最好尽可能尝试使用下推模式来优化 Flux 查询。要了解有关下推模式以及 Flux 工作原理的更多信息,请阅读Flux 中级用户面临的 5 大障碍以及优化 Flux 的资源中的“解决方案 2:了解内存优化和新的下推模式以优化 Flux 脚本”。

正确使用模式突变

模式突变函数是任何更改 Flux 表中列的函数。它们包括 keep()drop()rename()duplicate()set() 等函数。如果您在查询中使用 aggregatesselector 函数,请尝试在应用聚合函数后包含模式突变函数,以保留您可能拥有的任何下推模式。此外,请尽可能尝试使用更改组键来替换 keep()drop()。例如,当执行跨两个存储桶的两个字段的 join() 时,连接所有类似的列,而不是之后删除列。我们使用 array.from() 函数生成此示例的数据

import "array"
import "experimental"
start = experimental.subDuration(
d: -10m,
from: now(),
)
bucket1 = array.from(rows: [{_start: start, _stop: now(), _time: now(),_measurement: "mymeas", _field: "myfield", _value: "foo1"}])
|> yield(name: "bucket1")
 
bucket2 = array.from(rows: [{_start: start, _stop: now(), _time: now(),_measurement: "mymeas", _field: "myfield", _value: "foo2"}])
|> yield(name: "bucket2")

我们查询的 带注释的 CSV 输出如下所示

The annotated CSV output

不要join() 后不必要地使用 drop()

join(tables: {bucket1: bucket1, bucket2: bucket2}, on: ["_time"], method: "inner")
|> drop(columns:["_start_field1", "_stop_field1", "_measurement_field1", "myfield1"])
|> yield(name: "bad_join")

应该通过连接类似的列来替换为组键的更改

join(tables: {bucket1: bucket1, bucket2: bucket2}, on: ["_start","_stop""_time", "_measurement","_field"], method: "inner")
|> yield(name: "good_join")

以产生相同的结果

The annotated CSV output the same result

使用变量以避免多次查询数据

与其多次查询数据,不如将结果存储在变量中并引用它。换句话说

不要这样做

from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "my_measurement")
|> mean()
|> set(key: "agg_type",value: "mean_temp")
|> to(bucket: "downsampled", org: "my-org", tagColumns:["agg_type"]) 

from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "my_measurement")
|> count()
|> set(key: "agg_type",value: "count_temp")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["agg_type"])

应该这样做

data = from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "my_measurement")

data
|> mean()
|> set(key: "agg_type",value: "mean_temp")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["agg_type"])

data
|> count()
|> set(key: "agg_type",value: "count_temp")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["agg_type"])

将处理工作分配到多个任务中

您是否尝试在同一任务中执行模式突变、透视、连接、复杂数学和映射?如果您是,并且您正在经历较长的执行时间,请考虑将其中一些工作拆分到多个任务中。分离处理工作并并行执行这些任务可以帮助您缩短总体执行时间。

什么是 Flux Profiler 包?

Flux Profiler 包提供基于您的查询的性能信息。根据文档,以下 Flux 查询

import "profiler"

option profiler.enabledProfilers = ["query", "operator"]

from(bucket: "noaa")
  |> range(start: 2019-08-17T00:00:00Z, stop: 2019-08-17T00:30:00Z)
  |> filter(fn: (r) =>
    r._measurement == "h2o_feet" and
    r._field == "water_level" and
    r.location == "coyote_creek"
  )
  |> map(fn: (r) => ({ r with
    _value: r._value * 12.0,
    _measurement: "h2o_inches"
  }))
  |> drop(columns: ["_start", "_stop"])

从 Profiler 生成以下表格

Profiler tables

Flux Profiler 以纳秒为单位输出有关查询的性能信息。

  • 第一个表提供有关整个查询的信息,包括执行查询所花费的总持续时间以及花费在编译、队列等中的时间。
  • 第二个表提供有关查询花费最多时间的位置的信息。

需要注意的两个最重要的列是第一个表中的 TotalDuration 列和第二个表中的 DurationSum 列。此查询执行速度非常快,因此我不必担心对其进行优化。但是,我将描述进一步优化它的思考过程。

首先,我将尝试确定查询的哪个部分执行时间最长。从上面的查询中,我们可以看到 merged_ReadRange5_filter 操作的 DurationSum 最大,为 529282 纳秒。如果我计划将此查询转换为任务并按计划执行此转换工作,我应该考虑的第一件事是在较短的范围内查询数据并更频繁地运行任务。

接下来,我注意到 map() 函数贡献了第二长的 DurationSum 值。回顾我的 map() 函数,我不得不怀疑使用 map() 重命名测量是否是最有效的方法。也许我应该尝试使用 set() 函数,如下所示

from(bucket: "noaa")
  |> range(start: 2019-08-17T00:00:00Z, stop: 2019-08-17T00:30:00Z)
  |> filter(fn: (r) =>
    r._measurement == "h2o_feet" and
    r._field == "water_level" and
    r.location == "coyote_creek"
  )
 |> drop(columns: ["_start", "_stop"])
  |> set(key: "_measurement",value: "h2o_inches")
  |> map(fn: (r) => ({ r with
    _value: r._value * 12.0,
  }))

另请注意,我切换了函数的顺序,并在 map() 函数之前应用了 drop()set() 函数。运行 Profiler 后,我看到 TotalDuration 时间减少了,这表明这些都是很好的更改。由于 Flux 的性能优化不断进行,并且每个人的模式都大相径庭,因此没有针对 Flux 性能优化的硬性规定。相反,我鼓励您利用 Profiler 并进行一些实验,以找到最适合您的解决方案。

使用 Visual Studio Code 的 Flux 扩展来简化 Flux 优化发现

如果您尚未尝试,我鼓励您安装 Flux 扩展 for Visual Studio Code。要使用 Flux 扩展查询您的 InfluxDB Cloud 帐户,您必须先配置它并连接到您的云帐户。当尝试调试复杂的 Flux 脚本或尝试优化 Flux 脚本的性能时,我喜欢使用 Flux 扩展和 VS Code,因为我可以保存我的 Flux 脚本并同时比较 Profiler 的输出。

blog-flux<figcaption> 最初的“bad_join”查询(红色)被注释掉了,因为我首先运行了它。它的 TotalDuration 时间为 17608617 纳秒。连接多个类似的列并删除 drop() 将性能提高到 14160858 纳秒。</figcaption>

我决定测试上面“正确使用模式突变”部分中描述的查询。Profiler 证实了我的假设:连接多个类似的列比追溯删除冗余列更有效。虽然您也可以在 InfluxDB UI 中执行此工作,但我发现 Profiler 输出的并排比较对于此类实验很有帮助。

其他提示

以下是在尝试优化 Flux 查询性能时要考虑的其他替换或想法列表

  1. 您可以使用 experimental.join() 函数代替 join() 函数吗?
  2. 您可以在应用 map() 函数之前应用任何组来减少表中的行数吗?
  3. 您可以调整任何正则表达式以尽可能具体吗?
  4. |> sort(columns: ["_time"], desc: false) |> limit(n:1)|> last() 性能更好吗?

如果您找到其他对您有效的替换,我鼓励您在社区论坛或下面的评论部分分享它们。

获得帮助的最佳实践

当在社区论坛Slack 或通过 支持寻求有关优化 Flux 脚本性能的帮助时,请在您的帖子或请求中包含以下信息

  • 哪个查询有问题?
    • 确保共享它以及包含 Profiler 的输出。
  • 数据的基数是多少?(多少个序列)
  • 数据的密度是多少?(每个系列中每个单位时间有多少个点)
  • 有关数据结构(存在哪些测量、字段和标签键)的一般信息始终很有帮助。
    • 尝试使用 schema 包 来共享您的数据结构。
  • 您对查询应该运行多快的期望是什么?这种期望的基础是什么?

在帖子中包含尽可能多的这些信息将有助于我们更好、更快地为您提供帮助。以上几点也适用于命中内存限制的问题。

关于优化 InfluxDB Cloud 中的 Flux 性能的最终想法

如果这篇文章您应该获得一个收获,那就是优化 Flux 性能既是一门科学,也是一门艺术。这将需要实验来发现对您有效的优化。我鼓励您与社区分享您的劳动成果。在社区论坛中分享您的发现不仅可以帮助您的 InfluxData 社区成员,还可以帮助 Flux 团队取得进一步的性能优化进展。

我希望这篇 InfluxDB 技术提示文章能帮助您优化 InfluxDB 中的 Flux 脚本。如果您正在使用 Flux Profiler 并且需要帮助,请在我们的社区网站Slack 频道中寻求帮助。如果您正在开发一个很酷的物联网应用程序或在 InfluxDB 之上监控您的应用程序,我们很乐意听到您的故事,因此请务必分享您的故事!此外,请在评论部分分享您的想法、疑虑或问题。我们很乐意获得您的反馈并帮助您解决遇到的任何问题!

延伸阅读

虽然这篇文章旨在全面概述如何优化 Flux 查询的性能,但以下资源也可能让您感兴趣

  1. Flux 初学者面临的 5 大障碍以及学习使用 Flux 的资源:这篇文章描述了 Flux 初学者的常见障碍以及如何通过使用 InfluxDB UI、理解带注释的 CSV 等来解决这些障碍。
  2. Flux 中级用户面临的 5 大障碍以及优化 Flux 的资源:这篇文章描述了 Flux 中级和高级用户的常见障碍,同时更详细地介绍了下推模式、Flux 引擎的工作原理等等。
  3. 使用和理解 InfluxDB Cloud Usage Template:这篇文章描述了如何使用 InfluxDB Cloud Usage Template,它可以帮助您确定进入 InfluxDB 帐户的数据,并可以帮助 Flux 团队更好地了解您的 Flux 性能是否良好。
  4. TL;DR InfluxDB 技术提示 – 监控任务并查找失控基数的来源:这篇文章描述了如何使用 InfluxDB Operational Monitoring Template,它可以帮助您了解数据的基数。