TL;DR InfluxDB 技术小贴士 ─ 在 InfluxDB Cloud 中优化 Flux 性能

导航至

因此,您正在使用 InfluxDB Cloud,并充分利用 Flux 创建自定义数据处理任务、检查和通知。然而,您发现一些 Flux 脚本没有您期望的执行得那么快。在这篇文章中,我们将学习优化 Flux 性能的最佳实践和工具。以下信息部分摘自Flux 初学者五大难题及学习使用 Flux 的资源Flux 中级用户五大难题及优化 Flux 的资源。这些博客基于 Faith Chikwekwe(Flux 团队工程师)和我的InfluxDays 演讲。我建议您也关注这些博客。

Flux 性能优化的通用建议

在深入了解一些可用于优化 Flux 性能的工具之前,让我们先深入了解一些 Flux 性能优化的通用建议。

  1. 充分利用下推模式。
  2. 应将模式突变函数应用于查询的末尾。
  3. 使用变量以避免多次查询数据。
  4. 当需要时,将处理工作分散到多个任务中。

我们将在以下各节中详细讨论这些技巧。

充分利用下推模式

为了提供优化指南的背景,让我们首先花一点时间来了解 Flux 的工作原理。Flux 能够高效地查询数据,因为一些函数将数据转换的工作负载下推到存储,而不是在内存中执行转换。执行此工作的函数组合称为下推模式。在可能的情况下,最好尝试使用下推模式来优化您的 Flux 查询。有关下推模式和 Flux 的工作原理的更多信息,请参阅Flux 中级用户五大难题及优化 Flux 的资源中的“解决方案 2:了解内存优化和新的下推模式以优化您的 Flux 脚本”。

正确使用模式突变

模式突变函数是指任何会改变您Flux表中列的函数。这些函数包括keep()drop()rename()duplicate()set()等。如果您在查询中使用聚合选择器函数,请在应用聚合函数后尝试包括模式突变函数,以保留您可能有的任何下推模式。此外,尽可能用对分组键的更改来替换keep()drop()。例如,在从两个桶的两个字段执行连接时,连接到所有类似列,而不是之后删除列。我们使用array.from()函数生成此示例的数据

import "array"
import "experimental"
start = experimental.subDuration(
d: -10m,
from: now(),
)
bucket1 = array.from(rows: [{_start: start, _stop: now(), _time: now(),_measurement: "mymeas", _field: "myfield", _value: "foo1"}])
|> yield(name: "bucket1")
 
bucket2 = array.from(rows: [{_start: start, _stop: now(), _time: now(),_measurement: "mymeas", _field: "myfield", _value: "foo2"}])
|> yield(name: "bucket2")

查询的注释CSV输出如下

The annotated CSV output

不要连接后不必要地使用drop()

join(tables: {bucket1: bucket1, bucket2: bucket2}, on: ["_time"], method: "inner")
|> drop(columns:["_start_field1", "_stop_field1", "_measurement_field1", "myfield1"])
|> yield(name: "bad_join")

应该通过连接类似列来替换对分组键的更改

join(tables: {bucket1: bucket1, bucket2: bucket2}, on: ["_start","_stop""_time", "_measurement","_field"], method: "inner")
|> yield(name: "good_join")

以得到相同的结果

The annotated CSV output the same result

使用变量以避免多次查询数据

与其多次查询数据,不如将结果存储在变量中并引用它。换句话说

不要这样做

from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "my_measurement")
|> mean()
|> set(key: "agg_type",value: "mean_temp")
|> to(bucket: "downsampled", org: "my-org", tagColumns:["agg_type"]) 

from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "my_measurement")
|> count()
|> set(key: "agg_type",value: "count_temp")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["agg_type"])

而是这样做

data = from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "my_measurement")

data
|> mean()
|> set(key: "agg_type",value: "mean_temp")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["agg_type"])

data
|> count()
|> set(key: "agg_type",value: "count_temp")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["agg_type"])

将处理工作分配到多个任务中

您是否尝试在同一个任务中执行模式突变、转置、连接、复杂数学和映射?如果您这样做并且遇到了执行时间过长的问题,请考虑将一些工作分配到多个任务中。将处理工作分离并在并行执行这些任务可以帮助您减少整体执行时间。

Flux分析器包是什么?

Flux分析器包根据您的查询提供性能信息。根据文档,以下Flux查询

import "profiler"

option profiler.enabledProfilers = ["query", "operator"]

from(bucket: "noaa")
  |> range(start: 2019-08-17T00:00:00Z, stop: 2019-08-17T00:30:00Z)
  |> filter(fn: (r) =>
    r._measurement == "h2o_feet" and
    r._field == "water_level" and
    r.location == "coyote_creek"
  )
  |> map(fn: (r) => ({ r with
    _value: r._value * 12.0,
    _measurement: "h2o_inches"
  }))
  |> drop(columns: ["_start", "_stop"])

从分析器输出以下表格

Profiler tables

Flux分析器以纳秒为单位输出查询的性能信息。

  • 第一个表格提供了关于整个查询的信息,包括查询执行的总持续时间以及编译、排队等所花费的时间。
  • 第二个表格提供了关于查询花费最多时间的地方的信息。

最重要的两个列是第一个表中的TotalDuration列和第二个表中的DurationSum列。此查询执行得非常快,所以我无需担心优化它。然而,我会描述进一步优化的思路。

首先,我会尝试确定查询中执行时间最长的部分。从上述查询中,我们可以看到merged_ReadRange5_filter操作具有最大的DurationSum为529282 ns。如果我要计划将此查询转换为任务并在计划中执行此转换工作,我首先应该考虑查询更短范围的数据并更频繁地运行任务。

接下来,我注意到map()函数对第二长的DurationSum值做出了贡献。回顾我的map()函数,我不得不怀疑使用map()函数重命名测量值是否是最有效的方法。或许我应该尝试使用set()函数,如下所示

from(bucket: "noaa")
  |> range(start: 2019-08-17T00:00:00Z, stop: 2019-08-17T00:30:00Z)
  |> filter(fn: (r) =>
    r._measurement == "h2o_feet" and
    r._field == "water_level" and
    r.location == "coyote_creek"
  )
 |> drop(columns: ["_start", "_stop"])
  |> set(key: "_measurement",value: "h2o_inches")
  |> map(fn: (r) => ({ r with
    _value: r._value * 12.0,
  }))

此外,我还注意到我更改了函数的顺序,并在map()函数之前应用了drop()函数set()函数。运行Profiler后,我发现TotalDuration时间有所下降,这表明这些更改都是好的。由于Flux的性能优化是持续进行的,每个人的模式都各不相同,所以没有针对Flux性能优化的硬性规则。相反,我鼓励您利用Profiler进行一些实验,以找到最适合您的解决方案。

使用Visual Studio Code的Flux扩展来简化Flux优化发现

如果您还没有尝试过,我鼓励您安装Flux扩展,用于Visual Studio Code。要使用Flux扩展查询您的InfluxDB Cloud账户,您必须先配置它并连接到您的云账户。我喜欢使用Flux扩展和VS Code来调试复杂的Flux脚本或尝试优化Flux脚本的性能,因为我可以在保存Flux脚本的同时比较Profiler的输出。

blog-flux<figcaption>原始的“bad_join”查询(红色)已注释,因为我先运行了它。它的TotalDuration时间为17608617 ns。通过在多个like列上连接并移除drop(),性能提高到了14160858 ns。</figcaption>

我决定测试上面“正确使用模式突变”部分中描述的查询。Profiler证实了我的假设:在多个like列上连接比事后删除冗余列更有效。虽然您也可以在InfluxDB UI中执行这项工作,但我发现Profiler输出的并排比较对于此类实验很有帮助。

其他提示

以下是您在尝试优化Flux查询性能时可以考虑的其他替换或想法列表

  1. 您是否可以使用experimental.join()函数而不是join()函数
  2. 在应用map()函数之前,您是否可以应用任何将减少您表中的行数的组?
  3. 您是否可以调整正则表达式以尽可能具体?
  4. 是否有|> sort(columns: ["_time"], desc: false) |> limit(n:1)|> last()表现得更好?

如果您找到了对您有用的其他替换,我鼓励您在社区论坛或下面的评论部分分享。

寻求帮助的最佳实践

当在社区论坛Slack或通过支持寻求帮助以优化您的Flux脚本性能时,请在您的帖子或请求中包括以下信息

  • 出现问题的查询是什么?
    • 请确保分享它以及包括Profiler的输出。
  • 您的数据基数是多少?(有多少系列)
  • 您数据的密度是多少?(每个系列中每单位时间有多少点)
  • 关于您数据结构的通用信息(存在哪些度量、字段和标签键)总是有帮助的。
    • 尝试使用schema包来共享您数据结构。
  • 您对查询运行速度的期望是什么?这个期望的基础是什么?

在帖子中包含尽可能多的这些信息将帮助我们更好地更快地为您提供帮助。上述点也适用于达到内存限制的问题。

关于在InfluxDB云中优化Flux性能的最终思考

如果您从这个博客中获得了一个重要的启示,那就是优化Flux性能是一门科学也是一门艺术。您需要通过实验来发现适合您的优化方法。我鼓励您与社区分享您的劳动成果。在社区论坛中分享您的发现不仅可以帮助您的InfluxData社区成员,还可以帮助Flux团队进一步推进性能优化。

我希望这篇InfluxDB技术技巧文章能帮助您优化您的Flux脚本在InfluxDB中的性能。如果您正在使用Flux分析器并需要帮助,请在我们的社区网站Slack频道中寻求帮助。如果您正在开发酷炫的物联网应用程序或在InfluxDB上监控您的应用程序,我们很乐意了解这些情况,所以请确保分享您的经历!此外,请分享您的想法、关注点或问题在评论区。我们很高兴得到您的反馈并帮助您解决遇到的问题!

进一步阅读

尽管本文旨在提供关于如何优化Flux查询性能的全面概述,以下资源也可能引起您的兴趣

  1. Flux初学者的五大障碍和如何使用Flux的资源:这篇文章描述了Flux初学者常见的问题以及如何通过使用InfluxDB UI、理解标注的CSV等来解决这些问题。
  2. 中级Flux用户的五大障碍和优化Flux的资源:这篇文章描述了中级和高级Flux用户的常见问题,同时提供了更多关于推下模式、Flux引擎的工作方式等详细信息。
  3. 使用和理解InfluxDB云使用模板:这篇文章描述了如何使用InfluxDB云使用模板,该模板可以帮助您确定InfluxDB账户中的数据,并帮助Flux团队更好地了解您的Flux性能是否良好。
  4. TL;DR InfluxDB技术技巧 – 监控任务和找到过载基数的原因:这篇文章描述了如何使用InfluxDB运营监控模板,该模板可以帮助您理解您数据的大小(基数)。