Flux 中跨测量数学的实际应用

导航至

我最近花了很多时间研究 2.0 Alpha 版本,我想告诉大家:即将推出的一些新功能真的非常酷!特别是对于物联网。我最近一直在使用的功能是跨测量进行数学运算的能力,这对于 InfluxDB 中的物联网数据来说真是一个游戏规则改变者。

让我们花一点时间来看看原因。您可能知道,我一直在构建大量物联网传感器,将数据流式传输到 InfluxDB 的各个实例。我使用的主要传感器之一是 SenseAir K-30 CO2 传感器。它是一款非常精确的传感器,虽然价格不便宜。我已经使用它大约一年了,它非常可靠和准确,但我想知道温度和压力如何影响 CO2 读数。我的发现当然是,这些因素确实很重要。

我在这里 阅读了相关资料,但本质上,温度和大气压会影响测量腔室内的气体量,而不是浓度,因此应该对此进行补偿。使用一些参考值(如绝对零度和海平面压力)进行计算非常简单。

我们将使用 理想气体定律 公式

校正后的 CO2 ppm = 测量的 CO2 ppm * ((测量的温度 * 参考压力 ) / (测量的压力 * 参考温度))

太好了。简单!几乎。我有一个温度/压力传感器,将值存储在我的 InfluxDB 实例的“environment”测量中。我的 CO2 传感器将其读数存储在“k30_reader”测量中。如果您不使用 Flux,您已经看到了这里的问题:这些值位于不同的测量中,因此我要么无法进行此计算,要么必须进行大量的体操运动,才能先将所有这些值重写到公共测量中。这两种都不是真正可行的答案,对吗?

输入 Flux 和跨测量数学!因此,让我们逐步了解我如何在 Flux 中完成此操作(在 DevRel 同事 Anais 等人的大量帮助下)。首先回顾一下 Flux 如何返回值。让我们记住,当我在 Flux 中提交查询时,我会得到一个值表。当我们逐步了解 Flux 查询的构建方式时,这一点变得很重要,因此请牢记这一点。

Tref = 298.15
Pref = 1013.25

Tmeas = from(bucket: "telegraf")
   |> range(start: v.timeRangeStart)
   |> filter(fn: (r) => r._measurement == "environment" and (r._field == "temp_c"))
   |> fill(column: "_value", usePrevious: true)
   |> aggregateWindow(every: 30s, fn: mean)
   |> keep(columns: ["_value", "_time"])

首先,我定义了我的参考温度和压力值。参考温度通常为 25ºC,转换为开尔文,参考压力为海平面压力。然后,我查询测量的温度值,并将其存储在名为“Tmeas”的表中。如果我对这个表进行 yield() ,我将看到我的温度值

然后,我对 CO2 和压力值重复此操作

CO2meas = from(bucket: "telegraf")
   |> range(start: v.timeRangeStart)
   |> filter(fn: (r) => r._measurement == "k30_reader" and (r._field == "co2"))
   |> fill(column: "_value", usePrevious: true)
   |> aggregateWindow(every: 30s, fn: mean)
   |> keep(columns: ["_value", "_time"])
Pmeas = from(bucket: "telegraf")
   |> range(start: v.timeRangeStart)
   |> filter(fn: (r) => r._measurement == "environment" and (r._field == "pressure"))
   |> fill(column: "_value", usePrevious: true)
   |> aggregateWindow(every: 30s, fn: mean)
   |> keep(columns: ["_value", "_time"])

太棒了!我现在有 3 个表,其中包含进行计算所需的所有值。我现在要做的就是将它们组合在一起,我将通过一系列 join() 语句来完成

first_join = join(tables: {CO2meas: CO2meas, Tmeas: Tmeas}, on: ["_time"])
    |> fill(column: "_value_CO2meas", usePrevious: true)
    |>fill(column: "_value_CO2meas", value: 400.00)
    |> fill(column: "_value_Tmeas",usePrevious: true)
    |>fill(column: "_value_Tmeas", value: 20.0)

第一个 join() 为我提供了一个包含 CO2 和温度值的表,所以我已经完成了一半。

second_join = join(tables: {first_join: first_join, Pmeas: Pmeas}, on: ["_time"])
   |>fill(column: "_value", usePrevious: true)
   |>fill(column: "_value", value: 1013.25)
   |>map(fn: (r) => ({_time: r._time, _Pmeas: r._value, _CO2meas:r._value_CO2meas, _Tmeas:r._value_Tmeas}))

第二个 join() 为我提供了一个包含所有测量值的表!您可能认为我已经完成了,但为了进行最终计算,由于我在 time 上进行连接,因此我必须构建一个最终表,在其中为表中的每一行填充参考值。

final = second_join
    |>map(fn: (r) => ({Pmeas: r._Pmeas, CO2meas:r._CO2meas, Tmeas:r._Tmeas, Pref: Pref, Tref: Tref, _time: r._time,}))
    |> keep(columns: ["_time", "CO2meas", "Pmeas", "Tmeas", "Pref", "Tref"])

现在我可以创建一个最终表并计算每行(时间)的补偿 CO2 值。

CO2corr = final
    |> map(fn: (r) => ({"_time": r._time, "CO2-Measured": r.CO2meas, "CO2-Adjusted": r.CO2meas * (((r.Tmeas + 273.15) * r.Pref) / (r.Pmeas * r.Tref))}))
   |> yield()

由于这是我的最终表,我在末尾调用 yield() ,以便显示这些值。

现在我有一个图表,显示了原始测量的 CO2 值和补偿后的 CO2 值!所有这些都是使用 Flux 的跨测量数学即时完成的!

实际上,我一直想对传感器进行补偿很长时间,我很高兴现在可以使用 Flux 快速轻松地完成它!那么您将在 Flux 中使用跨测量数学做什么呢?请随时在 Twitter 上向我发送您的想法、解决方案等。 @davidgsIoT