Flux 中级用户面临的 5 大障碍以及优化 Flux 的资源

导航至

既然您已经阅读了 Flux 初学者面临的 5 大障碍以及学习使用 Flux 的资源,您已经走在成为高级 Flux 用户的道路上。这篇文章包括 Flux 中级用户面临的一些主要障碍以及解决方案。

障碍 1:不了解常用的 Flux 包和实用函数

有时人们在生活中面临的最大障碍仅仅是不了解现有的解决方案。虽然 Flux 拥有各种各样的 Flux 包 和实用函数,但如果您计划编写 Flux,有一些是您真正离不开的。如果您还不熟悉下面描述的常用 Flux 包和实用函数,我强烈建议您充分利用它们。

解决方案 1:一些重要的 Flux 技巧和 Flux 实用程序,使编写 Flux 更容易

如果您计划编写任何 Flux,您应该了解以下四个 Flux 包和实用函数

  1. map():此函数对于使用 Flux 至关重要。它将函数应用于输入表中的每一行,并将输出分配给指定的列。如果您还不熟悉 map(),请花一些时间查看文档。我们也会在下面使用它。
  2. aggregateWindow():aggregateWindow() 函数允许您将聚合或选择器函数(如 mean(), count(), min(), max(), sum(), first()last()) 应用于窗口内的数据,窗口由 every 参数指定。您可能已经熟悉 aggregateWindow(),因为它在您将查询构建器与 InfluxDB UI 一起使用时默认应用于您的数据。例如,您可以将 mean() 应用于每 1 分钟窗口的数据,使用:
    from(bucket:"items")
       |> range(start:-1h)
     	   |> aggregateWindow(every: 1m, fn: mean())
    但是您是否知道可以创建自己的实用函数来聚合您的用例数据?在 fn 参数中传入您自己的函数即可。例如,假设我们想要 编写自定义聚合函数,计算数据平均值的 2 倍。要将此自定义聚合函数传递到我们的 aggregateWindow() 函数中,首先将您的自定义函数定义为变量。请注意,您必须在自定义函数中包含 column 参数,因为 aggregateWindow() 仅对具有该输入参数的函数进行操作。
    multByX = (x) => (column, tables=<-) =>
      tables
        |> mean(column:column)
        |> map(fn: (r) => ({ r with _value: r._value * x}))
    from(bucket: "my-bucket")
      |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
      |> filter(fn: (r) => r["_measurement"] == "cpu")
      |> filter(fn: (r) => r["_field"] == "usage_system")
      |> limit(n: 10)
      |> aggregateWindow(every: v.windowPeriod, fn: multByX(x:2.0))
  3. Flux InfluxDB Monitor 包:此包提供用于 使用 InfluxDB 进行监控和警报 的工具。如果您通过 UI 创建了警报,那么您要么创建了简单的阈值警报,要么创建了死信警报。但是,包中的函数可能有助于您制作更复杂的检查和自定义警报。我们建议您在任务中制作高级监控逻辑,然后将阈值警报或死信警报应用于该任务的输出。例如,您可以使用任务中的 map() 函数标记具有自定义状态的数值数据(例如,当数据的斜率为负或正时)。stateChanges() 函数检测列中随时间变化的任何变化。将这些描述性标签与 stateChanges() 函数结合使用,以检测数据斜率的每次变化。将这些斜率变化事件写入新存储桶并使用阈值警报接收通知。请参阅 使用任务和检查进行 InfluxDB 监控的 TL;DR 技术提示如何使用 InfluxDB 监控状态的 TL;DR 技术提示,了解有关如何创建自定义任务以及如何分别将自定义状态与监控包结合使用的更多信息和示例。 请参阅 deadman() 函数文档,了解如何将监控包与 subDuration() 函数结合使用以检测主机在过去 5 分钟内未报告数据时的示例。
  4. 用于 操作时间戳 的函数:时间序列数据很难处理。Flux 包含多个处理时间的函数。但是,其中一些函数分散在文档中各个社区包贡献和 Flux 包之间。所有这些函数的整合列表使您可以更深入地了解如何使用 Flux 处理时间。以下函数允许您操作时间戳并执行各种时间序列转换。包括: * 来自 Flux Experimental Package 的函数。此包包含各种有用的函数,除了时间序列转换之外,这些函数可能对您有用。

障碍 2:不了解使查询更快的性能提升

在制作 Flux 脚本时,需要遵循某些准则,以优化查询以获得更快的性能。

解决方案 2:了解内存优化和新的下推模式以优化您的 Flux 脚本

为了为优化准则提供上下文,让我们首先花一些时间了解 Flux 的工作原理。Flux 能够有效地查询数据,因为某些函数将数据转换工作负载下推到存储,而不是在内存中执行转换。执行此工作的函数组合称为下推模式。

最初,唯一的下推模式是基本的 Flux 查询:from() |> range() |> filter()。自 2019 年 9 月 InfluxDB Cloud 首次发布以来,Flux 中添加了新的下推模式。现在也支持以下下推模式:

  • 裸聚合和选择器
    • |> count()
    • |> sum()
    • |> first()
    • |> last()
    • |> min()
    • |> max()
    • |> mean()
  • Group + 裸聚合和选择器
    • |>group() |> count()
    • |>group() |> sum()
    • ...等等。
  • Window + 裸聚合和选择器
    • |>window() |> count()
    • |>window() |> sum()
    • ...等等。
  • AggregateWindow + 裸聚合和选择器
    • |>aggregateWindow(fn: count)
    • |>aggregateWindow(fn: sum)
    • ...等等。

将这些模式中的任何一种附加到您的基本 Flux 查询 from() |> range() |> filter(),以利用查询性能的新提升。如果您希望文档团队提供添加到 Flux 的下推模式的 公共列表,请随时访问我们的 社区站点 或我们的 Slack 频道并告知我们。

对于任何想要更详细了解 Flux 如何运行的高级 Flux 用户,下图是 Flux 执行查询方式的可视化。首先,您编写一个查询,然后 Flux 计划器检查您的查询是否与现有的下推模式匹配。如果存在匹配,则计划器编写一个关于如何执行查询的计划。接下来,Flux 执行器通过调用启动存储读取 API 调用的隐藏操作来执行您的查询。这些隐藏操作因您的下推模式而异。例如,如果我们执行一个带有 from |>range() |> filter() |>group() |>max() 的查询,则相应的隐藏存储操作(称为 ReadGroupMax)会在存储端启动数据转换。然后,数据通过 gRPC 流回 Flux,Flux 随后可以将数据转换为 Flux 表或带注释的 CSV。

Flux 如何执行查询<figcaption> Flux 执行查询方式的可视化。如果查询与下推模式匹配,则存储在隐藏存储操作的帮助下执行数据转换(下推模式具有相应的隐藏存储操作)。gRPC 用于将数据从存储流回 Flux。</figcaption>

障碍 3:在错误的时间使用模式更改函数

用户可能会因在 Flux 查询中的错误位置应用模式更改函数而意外降低查询性能。

解决方案 3:了解何时应用模式更改

模式更改函数是任何更改 Flux 表中列的函数。它们包括诸如 keep()drop()rename()duplicate()set() 等函数。如果可以,最好将这些函数保留在 Flux 脚本的末尾。如果您在查询中使用 aggregatesselector 函数,请尝试在应用聚合函数后包含模式更改函数,以保留您可能拥有的任何下推模式。

障碍 4:不确定任务编写的最佳实践

有时用户不了解如何最好地利用 UI 来制作任务。此外,一些用户可能不了解如何利用变量来提高任务的效率。

解决方案 4:任务编写建议。

如果您阅读过 我的其他任何文章,您就会知道我真的相信 InfluxDB UI 是您学习如何编写 Flux 的最佳朋友。学习如何制作任务也不例外。在数据浏览器中制作任务后,使用右上角的另存为按钮将您的 Flux 脚本转换为任务。

InfluxDB UI -另存为<figcaption> InfluxDB UI 的图像。使用“另存为”按钮将查询构建器或数据浏览器中的脚本转换为任务。配置名称、every、cron 和 offset 选项。</figcaption>

下面的 Flux 脚本是通用 Flux 任务的示例

option task = {name: "rateTask", every: 1h}

rate = from(bucket: "items")
  |> range(start: -task.every)
  |> filter(fn: (r)  => (r._measurement == "totalItems"))
  |> filter(fn: (r)  => (r._field == "items"))
  |> group(columns: ["itemGroupName"])
  |> aggregateWindow(fn: sum, every: task.every)
  |> map(fn: (r) => {
    return: _time: r._time,  _stop: r._stop, _start: r._start, _measurement: task.name, newTag: "rate", _value: r._value, itemGroupName: itemGroupName, 
  }
})

rate 
  |> to(bucket: "newItems")

使用任务配置选项来指定您希望如何执行任务。虽然您可以使用用户界面配置名称、every、cron、offset 选项,但还有其他选项可供您使用。您还可以决定是否要包含并发重试选项。 在上面的示例中,我们使用 every 选项来指定我们希望任务运行的频率以及我们希望使用 aggregateWindow() 函数聚合数据的频率。我们还使用 name 选项来轻松重命名我们的 measurement。使用 map() 函数向您的数据添加新的字段或标签。如果您想区分来自共享目标存储桶的各种任务输出,向您的数据添加新的字段或标签可能会有所帮助。在上面的示例中,我们正在向我们的数据添加一个标签“newTag”。现在我们可以使用以下内容轻松查询该任务的输出

from(bucket: "neItems")
  |> range(start: -task.every)
  |> filter(fn: (r)  => (r._measurement == "rateTask")
  |> filter(fn: (r)  => (r.newTag == "rate")

最后,如果您在任务中执行多个数据转换或聚合,您需要使用变量来提高该任务的效率。例如,如果您想对 CPU 统计数据进行降采样,您的任务可能如下所示

option task = {name: "Downsampling CPU", every: 1m}

data = from(bucket: "my-bucket")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "my_measurement")

data
|> mean()
|> set(key: "agg_type",value: "mean_temp")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["agg_type"])

data
|> count()
|> set(key: "agg_type",value: "count_temp")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["agg_type"])

执行顶层查询并将结果存储在变量中。然后使用该变量来计算平均值和计数。将顶层查询的输出存储在变量中,而不是查询数据两次,这可以将 Flux 查询引擎的工作负载减少一半。在这里,我们使用 set() 函数而不是 map() 函数来向我们的数据添加标签值。最后,值得注意的是,您可以使用 aggregateWindow() 函数编写类似的任务。该任务可以每周运行一次,并且您可以计算 aggregateWindow() 函数中每小时的计数和平均值。与执行频繁地在短时间内计算聚合的任务相比,使用 aggregateWindow() 函数在长时间范围内计算多个聚合会给 Flux 查询引擎带来更多工作负载。

障碍 5:不确定何时编写降采样任务

用户有时不确定何时应该编写降采样任务。他们可能会有这样的问题:“我如何知道我何时需要降采样任务?”或“降采样任务如何影响我的基数?”。

解决方案 5:了解何时编写降采样任务

这个障碍的解决方案并不简单。我将尝试在此处总结一下,但我强烈建议您参考降采样和保留数据以及使用 InfluxDB v2.0 进行降采样以获得详细的解释。用户编写降采样任务主要有两个原因。首先,您需要改进时间序列数据的可视化效果和仪表板的性能。考虑聚合您的数据以捕获时间序列数据的总体趋势,从而减少数据中的噪声并提高您的查询/可视化性能。其次,您认识到您的数据具有无界标签,并且您可能会遇到失控的序列基数。在这种情况下,考虑使用短期的保留策略来短期保留具有无界标签的原始数据。然后使用降采样任务来删除问题标签并根据您的需要聚合您的数据。您或许可以使用 drop() 函数来删除所述标签。您可以使用操作监控模板来识别您何时遇到基数问题。

请参阅监控任务和查找失控基数的来源,以获取有关如何识别您何时遇到基数问题的更多信息。

针对 Flux 中级用户的障碍的后续步骤

我希望这篇文章能帮助您解决您可能遇到的一些困难的 Flux 问题。如果您在编写 Flux 时仍然遇到问题,或者想要关于如何优化 Flux 脚本的建议,请向我们寻求帮助并分享您的故事!在评论区、我们的社区网站或我们的 Slack 频道中分享您的想法、疑虑或问题。我们很乐意获得您的反馈并帮助您解决您遇到的任何问题!您的好问题是这些帖子的灵感来源。