中级 Flux 用户面临的五大难题及优化 Flux 的资源

导航至

在阅读了Flux 初学者面临的五大难题及学习使用 Flux 的资源之后,您已经朝着成为高级 Flux 用户迈出了坚实的步伐。本文包括中级 Flux 用户面临的一些主要难题以及解决方案。

难题 1:不了解常见的 Flux 包和实用函数

有时人们在生活中遇到的最大难题仅仅是不知道现有解决方案的存在。虽然 Flux 拥有大量的 Flux 包 和实用函数,但有一些是如果您打算编写 Flux,则无法没有的。如果您不熟悉以下描述的常见 Flux 包和实用函数,我强烈建议您充分利用它们。

解决方案 1:一些重要的 Flux 小贴士和使编写 Flux 更容易的实用工具

以下是如果您打算编写任何 Flux 应该了解的四个 Flux 包和实用函数:

  1. map():此函数对输入表中的每一行应用一个函数,并将输出分配到指定的列。如果您不熟悉 map(),请花点时间查看文档。我们也会在下面使用它。
  2. aggregateWindow():aggregateWindow() 函数允许您将聚合函数或选择函数(例如 mean()count()min()max()sum()first()last())应用于由 every 参数指定的窗口中的数据。您可能已经熟悉 aggregateWindow(),因为它在使用 InfluxDB UI 的查询构建器时默认应用于您的数据。例如,您可能使用以下方法对每 1m 窗口的数据应用 mean():
    from(bucket:"items")
       |> range(start:-1h)
     	   |> aggregateWindow(every: 1m, fn: mean())
    但您知道您可以创建自己的实用函数来针对您的用例聚合数据吗?通过在 fn 参数中传递自己的函数来实现。例如,假设我们想 编写一个自定义聚合函数 来计算数据的平均值的两倍。要将此自定义聚合函数传递到我们的 aggregateWindow() 函数中,首先定义您的自定义函数作为变量。请注意,您必须在自定义函数中包含列参数,因为 aggregateWindow() 只操作具有该输入参数的函数。
    multByX = (x) => (column, tables=<-) =>
      tables
        |> mean(column:column)
        |> map(fn: (r) => ({ r with _value: r._value * x}))
    from(bucket: "my-bucket")
      |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
      |> filter(fn: (r) => r["_measurement"] == "cpu")
      |> filter(fn: (r) => r["_field"] == "usage_system")
      |> limit(n: 10)
      |> aggregateWindow(every: v.windowPeriod, fn: multByX(x:2.0))
  3. Flux InfluxDB 监控包:此包提供了用于 通过 InfluxDB 监控和警报 的工具。如果您通过 UI 创建了警报,那么您已经创建了一个简单的阈值或死信警报。然而,包中的函数可以帮助您构建更复杂的检查和自定义警报。我们建议您在任务中构建高级监控逻辑,然后将阈值或死信警报应用于该任务的输出。例如,您可以使用任务中的 map() 函数对数值数据应用自定义状态(例如,当数据的斜率为负或正时)。使用 stateChanges() 函数检测列随时间的变化。将这些描述性标签与 stateChanges() 函数结合使用,以检测数据斜率何时改变。使用任务将这些斜率变化事件写入新桶,并使用阈值警报接收通知。请参阅 使用 InfluxDB 的任务和检查进行监控如何使用 InfluxDB 监控状态 获取更多信息以及如何创建自定义任务以及如何与监控包结合使用自定义状态的示例。请参阅 deadman() 函数文档,了解如何使用 subDuration() 函数与监控包结合使用来检测主机在最后 5 分钟内没有报告数据的情况。
  4. 时间戳操作函数:操作时间戳:时间序列数据处理起来比较困难。Flux 包含了多个用于处理时间的函数。然而,其中一些函数散布在各种社区贡献的包和文档中的 Flux 包中。以下是一个所有这些函数的汇总列表,以便您更深入地了解如何使用 Flux 处理时间。以下函数允许您操作时间戳并执行各种时间序列变换。包括 这些函数来自 Flux 实验包。此包包括许多有用的函数,超出了时间序列变换的范围,可能对您很有用。

难题 2:未意识到性能提升,使查询更快

在编写 Flux 脚本时,有一些指导原则可以遵循,以优化您的查询以获得更快的性能。

解决方案 2:了解内存优化和新下推模式,以优化您的 Flux 脚本

为了提供优化指南的上下文,让我们首先花一点时间来了解 Flux 的工作原理。Flux 能够有效地查询数据,因为一些函数将数据转换的工作负载下推到存储,而不是在内存中执行转换。执行此类工作的函数组合称为下推模式。

最初,唯一的下推模式是一个基本的 Flux 查询:from() |> range() |> filter()。自 2019 年 9 月 InfluxDB Cloud 初次发布以来,Flux 已添加了新的下推模式。现在也支持以下下推模式

  • 裸聚合和选择器
    • |> count()
    • |> sum()
    • |> first()
    • |> last()
    • |> min()
    • |> max()
    • |> mean()
  • 组 + 裸聚合和选择器
    • |>group() |> count()
    • |>group() |> sum()
    • 等等。
  • 窗口 + 裸聚合和选择器
    • |>window() |> count()
    • |>window() |> sum()
    • 等等。
  • 聚合窗口 + 纯聚合和选择器
    • |>aggregateWindow(fn: count)
    • |>aggregateWindow(fn: sum)
    • 等等。

将以下任何一种模式添加到您的基本Flux查询中 from() |> range() |> filter(),以利用查询性能的新提升。如果您希望文档团队提供添加到Flux中的推下模式的公开列表,请毫不犹豫地通过我们的社区网站Slack频道联系我们。

对于任何希望了解更多关于Flux如何运行的进阶Flux用户,下面的图像是Flux执行查询的方式的可视化。首先,您编写一个查询,然后Flux规划器检查您的查询是否与现有的推下模式匹配。如果匹配,规划器将编写如何执行您的查询的计划。接下来,Flux执行器通过调用隐藏操作(这些隐藏操作基于您的推下模式)来执行您的查询,这些隐藏操作将数据读取出存储。然后,数据通过gRPC流回Flux,然后Flux将这些数据转换为Flux表或注解CSV。例如,如果我们执行一个查询,从|>range() |> filter() |>group() |>max(),一个相应的隐藏存储操作,称为ReadGroupMax,在存储端开始数据转换。然后,数据通过gRPC流回Flux。

Flux执行查询的方式<figcaption>Flux执行查询的方式的可视化。如果查询匹配推下模式,则存储在隐藏存储操作(推下模式有相应的隐藏存储操作)的帮助下执行数据转换。使用gRPC将数据从存储流回Flux。</figcaption>

障碍3:在错误的时间使用模式突变函数

用户可能会在Flux查询的错误位置应用模式突变函数,从而无意中降低查询性能。

解决方案3:学习何时应用模式突变

模式突变函数是指更改您的Flux表中列的任何函数。它们包括诸如keep()drop()rename()duplicate()set()之类的函数。如果可能的话,最好将这些函数保留在您的Flux脚本末尾。如果您在查询中使用聚合选择器函数,尝试在应用聚合函数后包括模式突变函数,以保留可能存在的任何推下模式。

障碍4:不确定任务编写的最佳实践

有时用户不知道如何最好地利用UI来构建任务。此外,一些用户可能不知道如何利用变量来提高任务效率。

解决方案4:任务编写建议。

如果您阅读过我的其他文章,您会知道我真心相信InfluxDB UI是您学习如何编写Flux时的最佳朋友。学习如何构建任务也不例外。在数据探索器中构建完您的任务后,使用右上角的另存为按钮将您的Flux脚本转换为任务。

InfluxDB UI - 保存为<figcaption> InfluxDB UI 的图片。使用“保存为”按钮将查询构建器或数据探索器中的脚本转换为任务。配置名称、每、cron 和偏移量选项。</figcaption>

下面的 Flux 脚本是通用 Flux 任务的示例。

option task = {name: "rateTask", every: 1h}

rate = from(bucket: "items")
  |> range(start: -task.every)
  |> filter(fn: (r)  => (r._measurement == "totalItems"))
  |> filter(fn: (r)  => (r._field == "items"))
  |> group(columns: ["itemGroupName"])
  |> aggregateWindow(fn: sum, every: task.every)
  |> map(fn: (r) => {
    return: _time: r._time,  _stop: r._stop, _start: r._start, _measurement: task.name, newTag: "rate", _value: r._value, itemGroupName: itemGroupName, 
  }
})

rate 
  |> to(bucket: "newItems")

使用任务配置选项来指定您希望任务如何执行。虽然您可以使用 UI 配置名称、每、cron、偏移量选项,但您还有其他选项可用。您还可以决定是否要包含并发重试选项。在上面的示例中,我们使用 every 选项来指定任务运行的频率以及使用 aggregateWindow() 函数聚合数据的频率。我们还使用 name 选项来轻松重命名我们的度量。使用 map() 函数向您的数据添加新字段或标签。向您的数据添加新字段或标签可以帮助您区分来自共享目标桶的各种任务输出。在上面的示例中,我们向我们的数据添加了一个标签“newTag”。现在我们可以轻松地查询该任务的输出:

from(bucket: "neItems")
  |> range(start: -task.every)
  |> filter(fn: (r)  => (r._measurement == "rateTask")
  |> filter(fn: (r)  => (r.newTag == "rate")

最后,如果您在任务中执行多个数据转换或聚合,请使用变量来提高该任务的效率。例如,如果您想对 CPU 统计数据进行下采样,您的任务可能如下所示:

option task = {name: "Downsampling CPU", every: 1m}

data = from(bucket: "my-bucket")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "my_measurement")

data
|> mean()
|> set(key: "agg_type",value: "mean_temp")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["agg_type"])

data
|> count()
|> set(key: "agg_type",value: "count_temp")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["agg_type"])

执行顶级查询并将结果存储在变量中。然后使用该变量计算平均值和计数。将顶级查询的输出存储在变量中,而不是两次查询数据,可以减少 Flux 查询引擎的工作量一半。在这里,我们使用 set() 函数而不是 map() 函数向我们的数据添加标签值。最后,值得注意的是,您可以使用 aggregateWindow() 函数代替类似的任务。该任务可以每周运行一次,您可以在 aggregateWindow() 函数中计算每小时的数据的计数和平均值。使用 aggregateWindow() 函数对长时间范围内多次聚合进行计算比频繁执行在短时间范围内进行聚合的任务对 Flux 查询引擎的工作量更大。

障碍 5:不确定何时编写下采样任务

用户有时不确定何时编写下采样任务。他们可能会有这样的问题:“我如何知道何时需要下采样任务?”或者“下采样任务如何影响我的基数?”。

解决方案 5:了解何时编写下采样任务

这个障碍的解决方案并不简单。我将尽量在这里总结它,但我强烈建议您参考下采样和保留数据InfluxDB v2.0 中的下采样以获取详细的解释。用户编写下采样任务有两个主要原因。首先,您需要改进您的时间序列数据的可视化效果和仪表板的性能。考虑对数据进行聚合以捕获您时间序列数据中的整体趋势,以减少数据中的噪声并提高您查询/可视化的性能。其次,您意识到您的数据具有无界的标签,并且您可能遇到系列基数失控的情况。在这种情况下,考虑通过利用短期保留策略将原始数据保留一段较短的时间,然后使用下采样任务删除问题标签并按您所需的方式聚合数据。例如,您可以使用 drop() 函数来删除该标签。您可以使用 操作监控模板来识别您是否遇到了基数问题。

请参阅监控任务和寻找无限维度问题的来源以获取更多有关何时存在维度问题的信息。

针对中级Flux用户的下一步行动

希望这篇文章能帮助您解决一些可能遇到的困难Flux问题。如果您在编写Flux时仍有问题,或者需要关于如何优化Flux脚本的建议,请向我们寻求帮助并分享您的经历!请在评论区分享您的想法、关注点或问题,在我们的社区网站Slack频道中。我们非常乐意获取您的反馈并帮助您解决遇到的问题!您的好问题是我们撰写这些文章的灵感来源。