InfluxDB:如何跨度量执行数学运算
作者:Anais Dotis-Georgiou / 产品,用例,开发者
2018 年 12 月 13 日
导航到
如果你是 InfluxData 社区的一员,那么你可能在某个时候想要在度量之间执行数学运算。你做了一些谷歌搜索,偶然发现了这个 GitHub 问题 3552 并流下了几滴眼泪。今天,我得以成为好消息的传递者。InfluxData 发布了 Flux 技术预览,这是新的时间序列数据查询语言和引擎,并且它带来了在度量之间执行数学运算的能力。
在这篇博客中,我分享了两个如何跨度量执行数学运算的例子
- 如何计算每次请求写入数据库的行大小。按照这个例子操作是探索跨度量数学运算的最快方式。您只需简单地启动 sandbox 并复制粘贴代码来亲自尝试。
- 如何随时间“监控”换热器的效率。您可以在以下仓库中找到此部分的dataset和Flux查询。
如何计算每个请求写入数据库的行批大小
在克隆sandbox并运行./sandbox up
之后,您将容器化运行整个TICK Stack。"telegraf"数据库包含从您的本地机器收集的几个指标。为了计算收集并发送到InfluxDB的指标批大小,我们需要找到在该时间段内写入数据库的行数,并将该值除以该时间段内的写入请求数。
首先,过滤数据以隔离发出的写入请求和写入的行数。将数据存储在“httpd”和“write”两个表中,分别。
httpd = from(bucket:"telegraf/autogen")
|> range(start: dashboardTime)
|> filter(fn:(r) => r._measurement == "influxdb_httpd" and r._field == "writeReq")
write = from(bucket:"telegraf/autogen")
|> range(start: dashboardTime)
|> filter(fn:(r) => r._measurement == "influxdb_write" and r._field == "pointReq")
接下来,连接两个表。Join默认为左连接。最后,我们使用Map函数将两个值相除,并计算Dashboard时间(-5m)内的平均批大小。
avg_batch_size = join(tables:{httpd:httpd, write:write}, on:["_time"])
|> map(fn:(r) => ({
_value: r._value_write / r._value_httpd}))
|> mean()
我将我的可视化类型更改为“表格”,因为我的Flux脚本只返回一个值。我们可以看到过去5分钟的平均批大小约为62行/写。
边注:虽然这个查询很简单,但效率不高。它只是为了演示目的。如果您想查看更长时间范围内的平均批大小,您可能需要1)对httpd表和write表进行窗口划分,并分别计算平均值和最大值。这样做可以在进行数学运算之前聚合数据,这将更快、更高效。
如何随时间“监控”换热器的效率
对于这个例子,我决定想象自己是一个化工厂的操作员,需要监控逆流换热器的温度。我从四个不同的温度传感器收集了冷流(TC
)和热流(TH
)的温度。在位置x1
和x2
分别有两组进料(Tc2
,Th1
)和出料(Tc1
,Th2
)传感器。
在做出一些假设后,我可以使用这个公式来计算传热效率
我从每个传感器在两个不同时间收集温度读数,总共8个点。这个数据集很小,仅用于演示。我的数据库结构如下
数据库 | 测量 | 标签键 | 标签值 | 字段键 | 字段值 | 时间戳 |
传感器 | Tc1, Tc2, Th1, Th2 | 位置 | x1, x2 | 温度 | 总计8个 | t1, t2 |
由于温度读数存储在不同的测量中,我再次应用Join和Map来计算效率。我使用Flux编辑器和Chronograf中的表格视图来可视化所有结果。
首先,我想收集每个传感器的温度读数。我以Th1开始。我需要准备数据。我删除了“_start”和“_stop”列,因为我没有进行任何分组或窗口操作。我可以删除“_measurement”和“_field”,因为它们对于所有数据都是相同的。最后,我对基于“位置”的分析不感兴趣,所以我也删除了这个。我将仅对相同时间戳上的值执行数学运算,所以我会保留“_time”列。
Th1 = from(bucket: "sensors")
|> range(start: dashboardTime)
|> filter(fn: (r) => r._measurement == "Th1" and r._field == "temperature")
|> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])
现在我可以将相同的查询应用到Th2
上。
Th2 = from(bucket: "sensors")
|> range(start: dashboardTime)
|> filter(fn: (r) => r._measurement == "Th2" and r._field == "temperature")
|> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])
接下来,我合并这两个表。
TH = join(tables: {Th1: Th1, Th2: Th2}, on: ["_time"])
连接默认为左连接。表:{Th1: Th1, Th2: Th2}
允许你指定后缀的命名(相当于Pandas中的“rsuffix/lsuffix”或SQL中的“table.id”语法)。
我也将这个逻辑应用到冷流上。
TC = join(tables: {Tc1: Tc1, Tc2: Tc2}, on: ["_time"])
接下来,我将TC与TH连接。
join(tables: {TC: TC, TH: TH}, on: ["_time"])
最后,我可以使用Map来计算所有测量的效率。这是整个代码的示例
Th1 = from(bucket: "sensors")
|> range(start: dashboardTime)
|> filter(fn: (r) => r._measurement == "Th1" and r._field == "temperature")
|> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])
Th2 = from(bucket: "sensors")
|> range(start: dashboardTime)
|> filter(fn: (r) => r._measurement == "Th2" and r. _field == "temperature")
|> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])
TH = join(tables: {Th1: Th1, Th2: Th2}, on: ["_time"])
Tc1 = from(bucket: "sensors")
|> range(start: dashboardTime)
|> filter(fn: (r) => r._measurement == "Tc1" and r._field == "temperature")
|> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])
Tc2 = from(bucket: "sensors")
|> range(start: dashboardTime)
|> filter(fn: (r) => r._measurement == "Tc2" and r._field == "temperature")
|> drop(columns:["_start", "_stop", "_measurement", "position", "_field"])
TCTH = join(tables: {Tc1: Tc1, Tc2: Tc2}, on: ["_time"]])
join(tables: {TC: TC, TH: TH}, on: ["_time"])
|> map(fn: (r) => (r._value_Tc2 - r._value_Tc1)/(r._value_Th1 - r._value_Th2))
|> yield(name: "efficiency")
我发现热传递效率随时间下降。这是一个Flux强大功能非常简单的示例,但它激发了我的想象力。我能否仅使用OSS构建类似DeltaV Alarm Management解决方案的监控和警报工具?可能不行,但我可以梦想有人可能会这么做。
如果你像我一样,觉得情境化和比较很有用,我建议阅读我即将发布的用户体验回顾。在该回顾中,我比较了Flux连接与Pandas连接。Flux有一些独特之处。对我来说最显眼的是|>
,管道符号。起初,我不喜欢它。我很少使用管道,我的小指对学习新的操作感到抱怨。现在,我发现它们极大地提高了可读性。每个管道符号都会返回一个结果。阅读Flux查询的感觉就像阅读项目符号。