InfluxDB:如何在测量之间进行连接和数学运算

导航至


如果您是 InfluxData 社区的一份子,那么您可能在某些时候想要跨测量执行数学运算。您进行了一些谷歌搜索,偶然发现了这个 GitHub 问题 3552,并流下了一滴眼泪。好吧,今天我来给大家带来好消息。InfluxData 发布了 Flux 的技术预览版,这是一种用于时间序列数据的新查询语言和引擎,它带来了跨测量执行数学运算的能力。

在本博客中,我将分享两个关于如何跨测量执行数学运算的示例

  1. 如何计算每个请求写入数据库的行批大小。按照此示例是探索跨测量数学运算的最快方法。您可以简单地启动并运行 sandbox,并复制粘贴代码以亲自尝试。
  2. 如何“监控”热交换器随时间的效率。您可以在此 repo 中找到此部分的数据集和 Flux 查询。

要了解 Flux 的所有功能,请查看 规范 以及配套的 文档

如何计算每个请求写入数据库的行批大小

在您克隆 sandbox 并运行 ./sandbox up 后,您将以容器化方式运行整个 TICK Stack。“telegraf”数据库包含从本地机器收集的多个指标。为了计算批次大小 (正在收集并写入 InfluxDB 的指标),我们需要找到随时间写入数据库的行数,并将该值除以同一时间段内的写入请求数。

首先,过滤数据以隔离发出的写入请求数和写入的行数。将这些数据分别存储在两个表“httpd”和“write”中。

httpd = from(bucket:"telegraf/autogen")
|> range(start: dashboardTime)
|> filter(fn:(r) => r._measurement == "influxdb_httpd" and r._field == "writeReq")

write = from(bucket:"telegraf/autogen")
|> range(start: dashboardTime)
|> filter(fn:(r) => r._measurement == "influxdb_write" and r._field == "pointReq")

接下来,连接两个表。Join 默认为左连接。最后,我们使用 Map 函数来除以这两个值,并计算仪表板时间(-5 分钟)内的平均批大小。

avg_batch_size = join(tables:{httpd:httpd, write:write}, on:["_time"])
|> map(fn:(r) => ({
_value: r._value_write / r._value_httpd}))
|> mean()

我将可视化类型更改为“表格”,因为我的 Flux 脚本只返回一个值。我们可以看到,过去 5 分钟的平均批大小约为 62 行/写入。

旁注:虽然此查询很简单,但效率相当低。它仅用于演示目的。如果您想查看更长时间范围内的平均批大小,您可能需要 1) 对 httpd 表和写入表进行窗口化,以及 2) 分别计算平均值和最大值。这样做将允许您在跨测量执行数学运算之前聚合数据,这将更快、更高效。

如何“监控”热交换器随时间的效率

对于此示例,我决定想象自己是一家化工厂的操作员,我需要监控逆流热交换器的温度。我从四个不同的温度传感器收集冷流 (TC) 和热流 (TH) 的温度。位置 x1x2 分别有两个入口 (Tc2, Th1) 传感器和两个出口 (Tc1, Th2) 传感器。

在做出一些假设后,我可以用这个公式计算传热效率

我从每个传感器在 2 个不同时间收集温度读数,总共 8 个点。此数据集很小,仅用于演示目的。我的数据库结构如下

数据库 测量 标签键 标签值 字段键 字段值 时间戳
传感器 Tc1、Tc2、Th1、Th2 位置 x1、x2 温度 共 8 个 t1、t2

由于温度读数存储在不同的测量中,因此我再次应用 JoinMap 来计算效率。我正在使用 Chronograf 中的 Flux 编辑器和表格视图来可视化所有结果。

首先,我想收集每个传感器的温度读数。我从 Th1 开始。我需要准备数据。我删除“_start”和“_stop”列,因为我没有执行任何分组或窗口化。我可以删除“_measurement”和“_field”,因为它们对于我的所有数据都是相同的。最后,我对基于“position”执行任何分析不感兴趣,因此我也可以删除它。我将仅对相同时间戳上的值执行数学运算,因此我保留“_time”列。

Th1 = from(bucket: "sensors")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "Th1" and r._field == "temperature")
  |> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])

现在我可以将相同的查询应用于 Th2

Th2 = from(bucket: "sensors")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "Th2" and r._field == "temperature")
  |> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])

接下来,我连接两个表。

TH = join(tables: {Th1: Th1, Th2: Th2}, on: ["_time"])

Join 默认为左连接。tables: {Th1: Th1, Th2: Th2} 允许您指定后缀的命名(相当于 Pandas 中的“rsuffix/lsuffix”或 SQL 中的“table.id”语法)。

我也将此逻辑应用于冷流

TC = join(tables: {Tc1: Tc1, Tc2: Tc2}, on: ["_time"])

接下来,我将 TC 与 TH 连接起来。

join(tables: {TC: TC, TH: TH}, on: ["_time"])

最后,我可以使用 Map 来计算所有测量中的效率。这就是所有代码组合在一起的样子

Th1 = from(bucket: "sensors")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "Th1" and  r._field == "temperature")
  |> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])

 Th2 = from(bucket: "sensors")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "Th2" and r. _field == "temperature")
  |> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])

TH = join(tables: {Th1: Th1, Th2: Th2}, on: ["_time"])

Tc1 = from(bucket: "sensors")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "Tc1" and r._field == "temperature")
  |> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])

Tc2 = from(bucket: "sensors")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "Tc2" and r._field == "temperature")
  |> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])

 TCTH = join(tables: {Tc1: Tc1, Tc2: Tc2}, on: ["_time"]])
 join(tables: {TC: TC, TH: TH}, on: ["_time"])
|> map(fn: (r) => (r._value_Tc2 - r._value_Tc1)/(r._value_Th1 - r._value_Th2))
|> yield(name: "efficiency")

我可以看到传热效率随时间降低了。这是 Flux 功能的一个非常简单的示例,但这让我的想象力飞速运转。我可以用 OSS 构建一个类似于 DeltaV 报警管理解决方案的监控和警报工具吗?可能不会,但我可以梦想有人可能会。

如果您像我一样发现情境化和比较很有用,我建议您阅读我即将发布的 UX 评测。在该评测中,我将 Flux Joins 与 Pandas Joins 进行了比较。Flux 有一些特殊性。对我来说最明显的一个是 |>,管道前向。起初,我不喜欢它。我几乎从不使用管道,我的小指在想到必须学习新的笔画时就抱怨。现在,我发现它们大大提高了可读性。每个管道前向都返回一个结果。阅读 Flux 查询感觉就像阅读项目符号点。