TL;DR InfluxDB 技术提示 – 如何使用 Flux 和 InfluxDB 提取值、可视化标量和执行自定义聚合

导航至

在这篇文章中,我们将学习如何使用 reduce()findColumn()findRecord() Flux 函数,以使用 InfluxDB 执行自定义聚合。

此 TL;DR 假设您已注册 InfluxDB Cloud 帐户 – 注册免费帐户是开始使用 InfluxDB 的最简单方法 – 或安装了 InfluxDB 2.0 OSS

为了轻松演示这些函数的工作原理,让我们使用 array.from() 函数构建一个临时表以在查询中使用

import "experimental/array"
 
data = array.from(rows: [{_time: 2020-01-01T00:00:00Z, mytag: "t0", _field: "f0", _value: "1.0"},
{_time: 2020-01-02T00:00:00Z, mytag: "t1", _field: "f1", _value: "1.5"},
{_time: 2020-01-02T00:00:00Z, mytag: "t0", _field: "f0", _value: "2.0"},
{_time: 2020-01-01T00:00:00Z, mytag: "t1", _field: "f1", _value: "2.5"}])
|> group(columns: ["mytag"], mode:"by")
|> yield(name: "mydata")

array.from() 函数从对象列表创建一个表。按标签分组以将表拆分为两个表,以更好地表示如果您通过行协议或使用 Telegraf 写入数据时数据的外观。如果您想了解有关 InfluxDB 输出格式 此 TL;DR带注释的 CSV 的更多信息,请阅读。

influxdb flux functions array

问: findRecord() 是做什么的? 答: findRecord() 函数从表流中第一个表的指定索引处返回记录。它需要一个 key 参数,该参数表示输入流中的组键。例如,让我们仔细看一下上面 Flux 脚本在原始数据视图中的输出。请注意,组键对于“mytag”列设置为 true。这是因为我们按“mytag”分组,因此它是组键的一部分。我们将在 findRecord() 函数调用中使用此键。另请注意,我们有两个表。表 0(我们的第一个表)的第一行中,“_value”列中的值等于 1.0。

influxdb flux functions findrecord

我们在以下脚本中使用 findRecord

import "experimental/array"
 
data = array.from(rows: [{_time: 2020-01-01T00:00:00Z, mytag: "t0", _field: "f0", _value: 1.0},
{_time: 2020-01-01T00:00:00Z, mytag: "t1", _field: "f1", _value: 1.5},
{_time: 2020-01-02T00:00:00Z, mytag: "t0", _field: "f0", _value: 2.0},
{_time: 2020-01-02T00:00:00Z, mytag: "t1", _field: "f1", _value: 2.5}])
|> group(columns: ["mytag"], mode:"by")
 
x = data |> findRecord(fn: (key) => key.mytag == "t0", idx: 0)
data |> group() |> limit(n:1, offset: 0) |> map(fn: (r) => ({ r with myRecord: x._value}))

变量 x 返回第一个表(表 0)的第一行 idx:0,其中“mytag”列是组键的一部分,并且 key 等于“t0”。换句话说,x._value = 1.0

本节将解释此脚本的最后一行。

问: 如何使用 InfluxDB 可视化标量? 答: 您无法使用 InfluxDB UI 可视化标量。InfluxDB UI 仅可视化表。但是,您可以编写一行 Flux 代码以将新列附加到表中,并包含标量值以在数据上下文中对其进行可视化。脚本中的最后一行使您能够可视化 findRecord 的输出,并验证您是否返回了正确的行。我们将所有数据分组在一起,并将输出限制为一行,以便我们可以将 findRecord() 中的标量值映射到标记为“findRecord”的新列中。您还可以使用 array.from()构建一个新表 以可视化您的标量值。

influxdb flux scalars

虽然您无法在 UI 中可视化标量,但您可以利用其他工具来打印标量。我鼓励您尝试使用 Flux REPL,您可以在其中打印标量。此外,如果您是 Visual Studio 用户,请查看 Visual Studio Flux 扩展,您也可以在其中打印标量。

注意:如果您强烈需要开箱即用地在 UI 中可视化标量,我们很想知道!请提供产品反馈作为功能请求,或在 社区 中分享您的想法。

问: 如果我已经完全过滤了我的数据并返回了所需的输出,我该如何使用 findRecord()? 答: 如果您已经使用 Flux 将数据准备成所需的形状,请将 key 设置为 true 以获取如下记录

import "experimental/array"
 
data = array.from(rows: [{_time: 2020-01-01T00:00:00Z, mytag: "t0", _field: "f0", _value: 1.0},
{_time: 2020-01-01T00:00:00Z, mytag: "t1", _field: "f1", _value: 1.5},
{_time: 2020-01-02T00:00:00Z, mytag: "t0", _field: "f0", _value: 2.0},
{_time: 2020-01-02T00:00:00Z, mytag: "t1", _field: "f1", _value: 2.5}])
|> group(columns: ["mytag"], mode:"by")
|> filter(fn: (r) => r.mytag == "t0")
 
 
x = data |> findRecord(fn: (key) => true, idx: 0)
data |> group() |> limit(n:1, offset: 0) |> map(fn: (r) => ({ r with myRecord: x._value}))

请注意,在此示例中,我们在应用 findRecord() 函数之前过滤了“t0”标签,而不是在 findRecord() 函数中应用此过滤器。

问: 您能分享一个使用 findRecord 的实际示例吗? 答: 为了回答这个问题,假设您正在处理地时数据。具体来说,您正在全球范围内分析地震。首先,您想找到昨天最大的地震。利用 Flux Geo 包,您可以找到过去 24 小时内最大地震的坐标

import "experimental/geo"
max_earthquake =
from(bucket:"Earthquake")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "geo")
|> geo.toRows()
|> group()
|> max(column: "mag")
|> findRecord(fn: (key) => true, idx: 0)

请注意,我们已经使用 max() 函数将数据过滤到单个值。我们不想执行额外的分组或过滤,因此我们将 findRecord() key 参数设置为 true,以轻松返回最大地震的坐标作为标量。现在我们已经隔离了最大地震位置的坐标,我们可以查看最大地震周围 200 公里区域内的所有地震活动,以通过将这些坐标传递到以下查询中来更深入地了解事件

from(bucket:"Earthquake")
|> range(start: -7d)
|> geo.filterRows(region: {lat: max_eq.lat, lon: max_eq.lon, radius: 200.0}, strict: false)
|> group()
|> count(column: "mag")

此查询为我们提供了最大地震 200 公里半径范围内地震总数的 计数

问: findColumn() 是做什么的? 答: findColumn() 函数与 findRecord() 函数类似,不同之处在于它从表流中第一个表的指定列中返回一个值数组,其中组键值与指定的谓词匹配。以下 Flux 脚本将生成与上述相同的结果,但使用 findColumn() 代替

import "experimental/array"
 
data = array.from(rows: [{_time: 2020-01-01T00:00:00Z, mytag: "t0", _field: "f0", _value: 1.0},
{_time: 2020-01-01T00:00:00Z, mytag: "t1", _field: "f1", _value: 1.5},
{_time: 2020-01-02T00:00:00Z, mytag: "t0", _field: "f0", _value: 2.0},
{_time: 2020-01-02T00:00:00Z, mytag: "t1", _field: "f1", _value: 2.5}])
|> group(columns: ["mytag"], mode:"by")
 
x = data |> findColumn(fn: (key) => key.mytag == "t0", column: "_value")
data |> group() |> limit(n:1, offset: 0) |> map(fn: (r) => ({ r with myRecord: x[0]}))

问: reduce() 是做什么的? 答: reduce() 函数根据 reducer fn 聚合每个表中的记录,从而提供了一种创建自定义聚合的方法。每个表的输出都是表的组键,列对应于 reducer 记录中的每个字段。这是一个计算“_value”列中所有值之和的示例。

import "experimental/array"
data = array.from(rows: [{_time: 2020-01-01T00:00:00Z, mytag: "t0", _field: "f0", _value: 1.0},
{_time: 2020-01-01T00:00:00Z, mytag: "t1", _field: "f1", _value: 1.5},
{_time: 2020-01-02T00:00:00Z, mytag: "t0", _field: "f0", _value: 2.0},
{_time: 2020-01-02T00:00:00Z, mytag: "t1", _field: "f1", _value: 2.5}])
|> group(columns: ["mytag"], mode:"by")
 
data
|> reduce(fn: (r, accumulator) => ({ sum: r._value + accumulator.sum }), identity: {sum: 0.0})

这是输出

influxdb flux functions reduce

问: 如何将 reduce() 和 findRecord() 或 findColumn 一起使用? 答: findRecord() 和 reduce() 函数经常一起使用以执行自定义聚合。让我们逐步了解如何将这两个函数一起使用。我们使用 reduce() 函数查找每个表的“_value”列中值的总和,然后我们使用 findRecord() 提取第一个总和。现在我们可以将此总和与 map() 函数一起使用以进行其他计算。

import "experimental/array"
data = array.from(rows: [{_time: 2020-01-01T00:00:00Z, mytag: "t0", _field: "f0", _value: 1.0},
{_time: 2020-01-01T00:00:00Z, mytag: "t1", _field: "f1", _value: 1.5},
{_time: 2020-01-02T00:00:00Z, mytag: "t0", _field: "f0", _value: 2.0},
{_time: 2020-01-02T00:00:00Z, mytag: "t1", _field: "f1", _value: 2.5}])
|> group(columns: ["mytag"], mode:"by")
x = data
|> reduce(fn: (r, accumulator) => ({ sum: r._value + accumulator.sum }), identity: {sum: 0.0})
|> findRecord(fn: (key) => key.mytag == "t0", idx: 0)
data
|> map(fn: (r) => ({ r with myRatio: r._value/x.sum}))

第一列中值的总和等于 3。“myRatio”列描述了“_value”列中每个值与总和之间的比率,其中 x.sum=3。这是相应的输出

influxdb flux functions findcolumn

我希望本教程能帮助您更好地理解如何使用 findRecord、findColumn 和 reduce() 函数。与往常一样,如果您遇到障碍,请在我们的 社区站点Slack 频道上分享。我们很乐意获得您的反馈并帮助您解决遇到的任何问题。