TL;DR InfluxDB 技术技巧 使用 Flux 和 InfluxDB 提取值、可视化标量值和执行自定义聚合
作者:Anais Dotis-Georgiou / 产品, 用例, 开发者
2020年9月15日
导航至
在这篇文章中,我们将学习如何使用 reduce()、findColumn() 和 findRecord() Flux 函数在 InfluxDB 中执行自定义聚合。
本 TL;DR 假设您已经注册了 InfluxDB Cloud 账户(注册免费账户是开始使用 InfluxDB 的最简单方法)或安装了 InfluxDB 2.0 OSS。
为了便于演示这些函数的工作原理,我们将使用 array.from() 函数构建一个临时表,以便在查询中使用。
import "experimental/array"
data = array.from(rows: [{_time: 2020-01-01T00:00:00Z, mytag: "t0", _field: "f0", _value: "1.0"},
{_time: 2020-01-02T00:00:00Z, mytag: "t1", _field: "f1", _value: "1.5"},
{_time: 2020-01-02T00:00:00Z, mytag: "t0", _field: "f0", _value: "2.0"},
{_time: 2020-01-01T00:00:00Z, mytag: "t1", _field: "f1", _value: "2.5"}])
|> group(columns: ["mytag"], mode:"by")
|> yield(name: "mydata")
Array.from() 函数可以从一个对象列表中创建一个表格。按标签分组可以将表格拆分为两个表格,以更好地表示如果您通过行协议或 Telegraf 编写数据时数据的外观。如果您想了解更多关于 InfluxDB 输出格式的内容,请阅读这篇 TL;DR,或标注的 CSV。
问题:findRecord() 函数是什么?回答:findRecord() 函数从一系列表格中的第一个表格返回一个指定索引的记录。它期望一个表示输入流中组键的键参数。例如,让我们仔细看看上面 Flux 脚本的原始数据视图的输出。注意,组键已设置为“mytag”列的 true。这是因为我们按“mytag”分组,所以它是组键的一部分。我们将在我们的 findRecord() 函数调用中使用此键。另外,请注意,我们有两个表格。表 0(即我们的第一个表格)的第一行的“_value”列的值等于 1.0。
我们在以下脚本中使用 findRecord
import "experimental/array"
data = array.from(rows: [{_time: 2020-01-01T00:00:00Z, mytag: "t0", _field: "f0", _value: 1.0},
{_time: 2020-01-01T00:00:00Z, mytag: "t1", _field: "f1", _value: 1.5},
{_time: 2020-01-02T00:00:00Z, mytag: "t0", _field: "f0", _value: 2.0},
{_time: 2020-01-02T00:00:00Z, mytag: "t1", _field: "f1", _value: 2.5}])
|> group(columns: ["mytag"], mode:"by")
x = data |> findRecord(fn: (key) => key.mytag == "t0", idx: 0)
data |> group() |> limit(n:1, offset: 0) |> map(fn: (r) => ({ r with myRecord: x._value}))
变量 x
返回第一个表格,即表 0 的第一行,其中“mytag”列是组键的一部分,键等于“t0”。换句话说,x._value = 1.0
。
本节将解释此脚本的最后一行。
问题:我如何使用 InfluxDB 可视化标量?回答:您不能使用 InfluxDB UI 可视化标量。InfluxDB UI 只能可视化表格。但是,您可以使用一行 Flux 代码向表格添加新列,并在数据上下文中可视化您的标量值。脚本的最后一行使您能够可视化 findRecord 的输出,并验证您是否返回了正确的行。我们将所有数据分组在一起,并将输出限制为单行,以便将 findRecord() 的标量值映射到新的列“findRecord”。您还可以使用 array.from() 来 构建新表格 以可视化标量值。
尽管您不能在 UI 中可视化标量,但您可以利用其他工具来打印标量。我鼓励您尝试使用 Flux REPL,您可以在其中打印标量。如果您是 Visual Studio 用户,请查看 Visual Studio Flux 扩展,您也可以在其中打印标量。
注意:如果您迫切需要在 UI 中直接可视化标量,我们希望了解!请提供产品反馈作为功能请求或分享您的想法社区。
问题:如果我已经使用 Flux 完全过滤了我的数据并返回了所需的输出,我该如何使用 findRecord()?回答:如果您已经使用 Flux 将数据准备成所需的形状,请将键设置为 true 以获取类似以下格式的记录
import "experimental/array"
data = array.from(rows: [{_time: 2020-01-01T00:00:00Z, mytag: "t0", _field: "f0", _value: 1.0},
{_time: 2020-01-01T00:00:00Z, mytag: "t1", _field: "f1", _value: 1.5},
{_time: 2020-01-02T00:00:00Z, mytag: "t0", _field: "f0", _value: 2.0},
{_time: 2020-01-02T00:00:00Z, mytag: "t1", _field: "f1", _value: 2.5}])
|> group(columns: ["mytag"], mode:"by")
|> filter(fn: (r) => r.mytag == "t0")
x = data |> findRecord(fn: (key) => true, idx: 0)
data |> group() |> limit(n:1, offset: 0) |> map(fn: (r) => ({ r with myRecord: x._value}))
请注意,在这个例子中,我们在应用findRecord()函数之前对“t0”标签进行了过滤,而不是在findRecord()函数内部应用这个过滤。
问题:你能分享一个使用findRecord的实际例子吗?答案:为了回答这个问题,假设你正在处理地理时间数据。具体来说,你正在分析全球地震。首先,你想找到最后一天最大的地震。利用Flux Geo包可以帮助你找到过去24小时内最大的地震的坐标。
import "experimental/geo"
max_earthquake =
from(bucket:"Earthquake")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "geo")
|> geo.toRows()
|> group()
|> max(column: "mag")
|> findRecord(fn: (key) => true, idx: 0)
请注意,我们已经使用max()函数将我们的数据过滤到单个值。我们不想执行额外的分组或过滤,因此我们将findRecord()的键参数设置为true,以便轻松返回最大地震的坐标作为标量。现在我们已经隔离了最大地震位置的坐标,我们可以查看围绕最大地震的200公里区域内的所有地震活动,通过将这些坐标传递给以下查询来更深入地了解事件
from(bucket:"Earthquake")
|> range(start: -7d)
|> geo.filterRows(region: {lat: max_eq.lat, lon: max_eq.lon, radius: 200.0}, strict: false)
|> group()
|> count(column: "mag")
此查询给出的是在最大地震200公里范围内地震总数的计数。
问题:findColumn()函数做什么?答案:findColumn()函数与findRecord()函数类似,但它从第一个表格中返回一个值数组,这些值在流表中的列值与指定的谓词匹配。以下Flux脚本会产生与上面相同的结果,但使用findColumn()代替。
import "experimental/array"
data = array.from(rows: [{_time: 2020-01-01T00:00:00Z, mytag: "t0", _field: "f0", _value: 1.0},
{_time: 2020-01-01T00:00:00Z, mytag: "t1", _field: "f1", _value: 1.5},
{_time: 2020-01-02T00:00:00Z, mytag: "t0", _field: "f0", _value: 2.0},
{_time: 2020-01-02T00:00:00Z, mytag: "t1", _field: "f1", _value: 2.5}])
|> group(columns: ["mytag"], mode:"by")
x = data |> findColumn(fn: (key) => key.mytag == "t0", column: "_value")
data |> group() |> limit(n:1, offset: 0) |> map(fn: (r) => ({ r with myRecord: x[0]}))
问题:reduce()函数做什么?答案:reduce()函数根据reducer,fn对每个表中的记录进行聚合,提供了一种创建自定义聚合的方法。每个表的输出是该表的组键,与reducer记录中的每个字段对应的列。以下是一个计算“_value”列中所有值的总和的示例。
import "experimental/array"
data = array.from(rows: [{_time: 2020-01-01T00:00:00Z, mytag: "t0", _field: "f0", _value: 1.0},
{_time: 2020-01-01T00:00:00Z, mytag: "t1", _field: "f1", _value: 1.5},
{_time: 2020-01-02T00:00:00Z, mytag: "t0", _field: "f0", _value: 2.0},
{_time: 2020-01-02T00:00:00Z, mytag: "t1", _field: "f1", _value: 2.5}])
|> group(columns: ["mytag"], mode:"by")
data
|> reduce(fn: (r, accumulator) => ({ sum: r._value + accumulator.sum }), identity: {sum: 0.0})
以下是输出
问题:我如何使用reduce()和findRecord()或findColumn一起使用?答案:findRecord()和reduce()函数通常一起使用以执行自定义聚合。让我们通过一个示例来了解如何一起使用这两个函数。我们使用reduce()函数来找到每个表中“_value”列值的总和,然后我们使用findRecord()来提取第一个总和。现在我们可以使用这个总和与map()函数进行额外的计算。
import "experimental/array"
data = array.from(rows: [{_time: 2020-01-01T00:00:00Z, mytag: "t0", _field: "f0", _value: 1.0},
{_time: 2020-01-01T00:00:00Z, mytag: "t1", _field: "f1", _value: 1.5},
{_time: 2020-01-02T00:00:00Z, mytag: "t0", _field: "f0", _value: 2.0},
{_time: 2020-01-02T00:00:00Z, mytag: "t1", _field: "f1", _value: 2.5}])
|> group(columns: ["mytag"], mode:"by")
x = data
|> reduce(fn: (r, accumulator) => ({ sum: r._value + accumulator.sum }), identity: {sum: 0.0})
|> findRecord(fn: (key) => key.mytag == "t0", idx: 0)
data
|> map(fn: (r) => ({ r with myRatio: r._value/x.sum}))
第一列值的总和等于3。“myRatio”列描述了“_value”列中每个值与总和的比率,其中x.sum=3。以下是相应的输出
我希望这个教程能帮助你更好地理解如何使用findRecord、findColumn和reduce()函数。一如既往,如果你遇到困难,请在我们社区网站或Slack频道上分享它们。我们很高兴得到你的反馈,并帮助你解决任何问题。