InfluxDB:如何跨度量执行数学运算

导航到


如果你是 InfluxData 社区的一员,那么你可能在某个时候想要在度量之间执行数学运算。你做了一些谷歌搜索,偶然发现了这个 GitHub 问题 3552 并流下了几滴眼泪。今天,我得以成为好消息的传递者。InfluxData 发布了 Flux 技术预览,这是新的时间序列数据查询语言和引擎,并且它带来了在度量之间执行数学运算的能力。

在这篇博客中,我分享了两个如何跨度量执行数学运算的例子

  1. 如何计算每次请求写入数据库的行大小。按照这个例子操作是探索跨度量数学运算的最快方式。您只需简单地启动 sandbox 并复制粘贴代码来亲自尝试。
  2. 如何随时间“监控”换热器的效率。您可以在以下仓库中找到此部分的dataset和Flux查询。

要了解Flux的所有功能,请查看其规范和相关文档

如何计算每个请求写入数据库的行批大小

在克隆sandbox并运行./sandbox up之后,您将容器化运行整个TICK Stack。"telegraf"数据库包含从您的本地机器收集的几个指标。为了计算收集并发送到InfluxDB的指标批大小,我们需要找到在该时间段内写入数据库的行数,并将该值除以该时间段内的写入请求数。

首先,过滤数据以隔离发出的写入请求和写入的行数。将数据存储在“httpd”和“write”两个表中,分别。

httpd = from(bucket:"telegraf/autogen")
|> range(start: dashboardTime)
|> filter(fn:(r) => r._measurement == "influxdb_httpd" and r._field == "writeReq")

write = from(bucket:"telegraf/autogen")
|> range(start: dashboardTime)
|> filter(fn:(r) => r._measurement == "influxdb_write" and r._field == "pointReq")

接下来,连接两个表。Join默认为左连接。最后,我们使用Map函数将两个值相除,并计算Dashboard时间(-5m)内的平均批大小。

avg_batch_size = join(tables:{httpd:httpd, write:write}, on:["_time"])
|> map(fn:(r) => ({
_value: r._value_write / r._value_httpd}))
|> mean()

我将我的可视化类型更改为“表格”,因为我的Flux脚本只返回一个值。我们可以看到过去5分钟的平均批大小约为62行/写。

边注:虽然这个查询很简单,但效率不高。它只是为了演示目的。如果您想查看更长时间范围内的平均批大小,您可能需要1)对httpd表和write表进行窗口划分,并分别计算平均值和最大值。这样做可以在进行数学运算之前聚合数据,这将更快、更高效。

如何随时间“监控”换热器的效率

对于这个例子,我决定想象自己是一个化工厂的操作员,需要监控逆流换热器的温度。我从四个不同的温度传感器收集了冷流(TC)和热流(TH)的温度。在位置x1x2分别有两组进料(Tc2Th1)和出料(Tc1Th2)传感器。

在做出一些假设后,我可以使用这个公式来计算传热效率

我从每个传感器在两个不同时间收集温度读数,总共8个点。这个数据集很小,仅用于演示。我的数据库结构如下

数据库 测量 标签键 标签值 字段键 字段值 时间戳
传感器 Tc1, Tc2, Th1, Th2 位置 x1, x2 温度 总计8个 t1, t2

由于温度读数存储在不同的测量中,我再次应用JoinMap来计算效率。我使用Flux编辑器和Chronograf中的表格视图来可视化所有结果。

首先,我想收集每个传感器的温度读数。我以Th1开始。我需要准备数据。我删除了“_start”和“_stop”列,因为我没有进行任何分组或窗口操作。我可以删除“_measurement”和“_field”,因为它们对于所有数据都是相同的。最后,我对基于“位置”的分析不感兴趣,所以我也删除了这个。我将仅对相同时间戳上的值执行数学运算,所以我会保留“_time”列。

Th1 = from(bucket: "sensors")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "Th1" and r._field == "temperature")
  |> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])

现在我可以将相同的查询应用到Th2上。

Th2 = from(bucket: "sensors")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "Th2" and r._field == "temperature")
  |> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])

接下来,我合并这两个表。

TH = join(tables: {Th1: Th1, Th2: Th2}, on: ["_time"])

连接默认为左连接。表:{Th1: Th1, Th2: Th2}允许你指定后缀的命名(相当于Pandas中的“rsuffix/lsuffix”或SQL中的“table.id”语法)。

我也将这个逻辑应用到冷流上。

TC = join(tables: {Tc1: Tc1, Tc2: Tc2}, on: ["_time"])

接下来,我将TC与TH连接。

join(tables: {TC: TC, TH: TH}, on: ["_time"])

最后,我可以使用Map来计算所有测量的效率。这是整个代码的示例

Th1 = from(bucket: "sensors")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "Th1" and  r._field == "temperature")
  |> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])

 Th2 = from(bucket: "sensors")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "Th2" and r. _field == "temperature")
  |> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])

TH = join(tables: {Th1: Th1, Th2: Th2}, on: ["_time"])

Tc1 = from(bucket: "sensors")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "Tc1" and r._field == "temperature")
  |> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])

Tc2 = from(bucket: "sensors")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "Tc2" and r._field == "temperature")
  |> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])

 TCTH = join(tables: {Tc1: Tc1, Tc2: Tc2}, on: ["_time"]])
 join(tables: {TC: TC, TH: TH}, on: ["_time"])
|> map(fn: (r) => (r._value_Tc2 - r._value_Tc1)/(r._value_Th1 - r._value_Th2))
|> yield(name: "efficiency")

我发现热传递效率随时间下降。这是一个Flux强大功能非常简单的示例,但它激发了我的想象力。我能否仅使用OSS构建类似DeltaV Alarm Management解决方案的监控和警报工具?可能不行,但我可以梦想有人可能会这么做。

如果你像我一样,觉得情境化和比较很有用,我建议阅读我即将发布的用户体验回顾。在该回顾中,我比较了Flux连接与Pandas连接。Flux有一些独特之处。对我来说最显眼的是|>,管道符号。起初,我不喜欢它。我很少使用管道,我的小指对学习新的操作感到抱怨。现在,我发现它们极大地提高了可读性。每个管道符号都会返回一个结果。阅读Flux查询的感觉就像阅读项目符号。