InfluxDB:如何在测量之间进行连接和数学运算
作者:Anais Dotis-Georgiou / 产品, 用例, 开发者
2018 年 12 月 13 日
导航至
如果您是 InfluxData 社区的一份子,那么您可能在某些时候想要跨测量执行数学运算。您进行了一些谷歌搜索,偶然发现了这个 GitHub 问题 3552,并流下了一滴眼泪。好吧,今天我来给大家带来好消息。InfluxData 发布了 Flux 的技术预览版,这是一种用于时间序列数据的新查询语言和引擎,它带来了跨测量执行数学运算的能力。
在本博客中,我将分享两个关于如何跨测量执行数学运算的示例
- 如何计算每个请求写入数据库的行批大小。按照此示例是探索跨测量数学运算的最快方法。您可以简单地启动并运行 sandbox,并复制粘贴代码以亲自尝试。
- 如何“监控”热交换器随时间的效率。您可以在此 repo 中找到此部分的数据集和 Flux 查询。
要了解 Flux 的所有功能,请查看 规范 以及配套的 文档。
如何计算每个请求写入数据库的行批大小
在您克隆 sandbox 并运行 ./sandbox up
后,您将以容器化方式运行整个 TICK Stack。“telegraf”数据库包含从本地机器收集的多个指标。为了计算批次大小 (正在收集并写入 InfluxDB 的指标),我们需要找到随时间写入数据库的行数,并将该值除以同一时间段内的写入请求数。
首先,过滤数据以隔离发出的写入请求数和写入的行数。将这些数据分别存储在两个表“httpd”和“write”中。
httpd = from(bucket:"telegraf/autogen")
|> range(start: dashboardTime)
|> filter(fn:(r) => r._measurement == "influxdb_httpd" and r._field == "writeReq")
write = from(bucket:"telegraf/autogen")
|> range(start: dashboardTime)
|> filter(fn:(r) => r._measurement == "influxdb_write" and r._field == "pointReq")
接下来,连接两个表。Join 默认为左连接。最后,我们使用 Map 函数来除以这两个值,并计算仪表板时间(-5 分钟)内的平均批大小。
avg_batch_size = join(tables:{httpd:httpd, write:write}, on:["_time"])
|> map(fn:(r) => ({
_value: r._value_write / r._value_httpd}))
|> mean()
我将可视化类型更改为“表格”,因为我的 Flux 脚本只返回一个值。我们可以看到,过去 5 分钟的平均批大小约为 62 行/写入。
旁注:虽然此查询很简单,但效率相当低。它仅用于演示目的。如果您想查看更长时间范围内的平均批大小,您可能需要 1) 对 httpd 表和写入表进行窗口化,以及 2) 分别计算平均值和最大值。这样做将允许您在跨测量执行数学运算之前聚合数据,这将更快、更高效。
如何“监控”热交换器随时间的效率
对于此示例,我决定想象自己是一家化工厂的操作员,我需要监控逆流热交换器的温度。我从四个不同的温度传感器收集冷流 (TC
) 和热流 (TH
) 的温度。位置 x1
和 x2
分别有两个入口 (Tc2
, Th1
) 传感器和两个出口 (Tc1
, Th2
) 传感器。
在做出一些假设后,我可以用这个公式计算传热效率
我从每个传感器在 2 个不同时间收集温度读数,总共 8 个点。此数据集很小,仅用于演示目的。我的数据库结构如下
数据库 | 测量 | 标签键 | 标签值 | 字段键 | 字段值 | 时间戳 |
传感器 | Tc1、Tc2、Th1、Th2 | 位置 | x1、x2 | 温度 | 共 8 个 | t1、t2 |
由于温度读数存储在不同的测量中,因此我再次应用 Join 和 Map 来计算效率。我正在使用 Chronograf 中的 Flux 编辑器和表格视图来可视化所有结果。
首先,我想收集每个传感器的温度读数。我从 Th1 开始。我需要准备数据。我删除“_start”和“_stop”列,因为我没有执行任何分组或窗口化。我可以删除“_measurement”和“_field”,因为它们对于我的所有数据都是相同的。最后,我对基于“position”执行任何分析不感兴趣,因此我也可以删除它。我将仅对相同时间戳上的值执行数学运算,因此我保留“_time”列。
Th1 = from(bucket: "sensors")
|> range(start: dashboardTime)
|> filter(fn: (r) => r._measurement == "Th1" and r._field == "temperature")
|> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])
现在我可以将相同的查询应用于 Th2
。
Th2 = from(bucket: "sensors")
|> range(start: dashboardTime)
|> filter(fn: (r) => r._measurement == "Th2" and r._field == "temperature")
|> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])
接下来,我连接两个表。
TH = join(tables: {Th1: Th1, Th2: Th2}, on: ["_time"])
Join 默认为左连接。tables: {Th1: Th1, Th2: Th2}
允许您指定后缀的命名(相当于 Pandas 中的“rsuffix/lsuffix”或 SQL 中的“table.id”语法)。
我也将此逻辑应用于冷流
TC = join(tables: {Tc1: Tc1, Tc2: Tc2}, on: ["_time"])
接下来,我将 TC 与 TH 连接起来。
join(tables: {TC: TC, TH: TH}, on: ["_time"])
最后,我可以使用 Map 来计算所有测量中的效率。这就是所有代码组合在一起的样子
Th1 = from(bucket: "sensors")
|> range(start: dashboardTime)
|> filter(fn: (r) => r._measurement == "Th1" and r._field == "temperature")
|> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])
Th2 = from(bucket: "sensors")
|> range(start: dashboardTime)
|> filter(fn: (r) => r._measurement == "Th2" and r. _field == "temperature")
|> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])
TH = join(tables: {Th1: Th1, Th2: Th2}, on: ["_time"])
Tc1 = from(bucket: "sensors")
|> range(start: dashboardTime)
|> filter(fn: (r) => r._measurement == "Tc1" and r._field == "temperature")
|> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])
Tc2 = from(bucket: "sensors")
|> range(start: dashboardTime)
|> filter(fn: (r) => r._measurement == "Tc2" and r._field == "temperature")
|> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])
TCTH = join(tables: {Tc1: Tc1, Tc2: Tc2}, on: ["_time"]])
join(tables: {TC: TC, TH: TH}, on: ["_time"])
|> map(fn: (r) => (r._value_Tc2 - r._value_Tc1)/(r._value_Th1 - r._value_Th2))
|> yield(name: "efficiency")
我可以看到传热效率随时间降低了。这是 Flux 功能的一个非常简单的示例,但这让我的想象力飞速运转。我可以用 OSS 构建一个类似于 DeltaV 报警管理解决方案的监控和警报工具吗?可能不会,但我可以梦想有人可能会。
如果您像我一样发现情境化和比较很有用,我建议您阅读我即将发布的 UX 评测。在该评测中,我将 Flux Joins 与 Pandas Joins 进行了比较。Flux 有一些特殊性。对我来说最明显的一个是 |>
,管道前向。起初,我不喜欢它。我几乎从不使用管道,我的小指在想到必须学习新的笔画时就抱怨。现在,我发现它们大大提高了可读性。每个管道前向都返回一个结果。阅读 Flux 查询感觉就像阅读项目符号点。