使用 Flux 从 MySQL 获取物联网传感器元数据

导航至

如果您已部署物联网解决方案,您必须决定在哪里以及如何存储所有数据。至少从我的角度来看,存储传感器数据的最佳和最简单的地方当然是InfluxDB。我这么说可能不会让您感到意外。但是,关于您需要存储的其他数据怎么办?关于传感器的数据?比如传感器制造商、投入使用日期、客户 ID、运行的平台等。您知道,所有这些传感器元数据。

一种解决方案当然是简单地将所有这些信息作为标签添加到 InfluxDB 中的传感器数据中,然后继续您的工作。但是,您真的希望将所有传感器数据与每个数据点一起存储吗?许多事情在当时看起来是个好主意,但随着现实的打击,很快就会变成一个糟糕的主意。由于大部分元数据不会经常更改,并且可能也与客户信息相关联,因此它们最有可能存储在传统的 RDBMS 中。您可能已经有一个包含客户数据的 RDBMS,那么为什么不继续利用这一投资呢?正如我反复说的,这不是存储传感器数据的最佳位置。因此,现在您的物联网数据分布在两个不同的数据库中。您如何访问它并将它们合并到一个您可以查看所有数据的地方呢?

Flux 就是答案

告诉我你早就预料到了。你肯定能预见到这一点。好吧,公平地说,你可能确实预料到了,毕竟能通过Flux获取基于SQL的数据,这是Flux的美丽之处:它是可扩展的!因此,我们现在有一个扩展,允许您通过Flux从MySQL、MariaDB或Postgres读取数据。当我听说这个SQL连接器已经准备就绪时,我不得不试试。我将向您展示我构建的内容以及如何构建。

构建客户数据库

首先,我创建了一个包含一些客户信息的MySQL数据库。我在一个名为IoTMeta的新数据库中创建了一个表,其中包含一些传感器元数据。我还添加了另一个包含有关这些传感器客户信息的表。

2 tables of data

这些表非常基础。我使用与InfluxDB实例中的Sensor_id标签相对应的数据填充了Sensor_ID字段。我敢打赌,你现在已经知道我要做什么了。我添加了大量数据

因此,我的传感器元数据库现在包含了我运行的所有传感器的相关信息,以及有关传感器所有者的“客户数据”。现在是时候将这些信息整合到有用的东西中。

使用Flux查询数据

首先,我在Flux中构建了一个查询,以获取一些传感器数据,但我对传感器数据本身并不感兴趣。我正在寻找一个标识标签值:Sensor_id。这个查询看起来可能有点奇怪,但请相信我,最后它会变得有道理。

temperature = from(bucket: "telegraf") 
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop) 
  |> filter(fn: (r) => r._measurement == "temperature" and (r._field == "temp_c")) 
  |> last() 
  |> map(fn: (r) => { 
       return { query: r.Sensor_id } 
       }) |> tableFind(fn: (key) => true) |> getRecord(idx: 0)

它返回一个包含一行数据的表,然后提取出Sensor_id标签,你可能想说“什么?”记住:Flux将所有内容都返回为表。我本质上需要从表中提取一个标量值。在这种情况下,它是特定标签的字符串值。这就是如何做到这一点的方法。

接下来,我将获取MySQL数据库的用户名和密码,它们方便地存储在InfluxDB秘密存储库中。

uname = secrets.get(key: "SQL_USER") 
pass = secrets.get(key: "SQL_PASSW")

等等,我是怎么把那些值放入这个秘密存储库的?好吧,让我们回顾一下。

curl -XPATCH https://127.0.0.1:9999/api/v2/orgs/<org-id>/secrets -H 'Authorization: Token <token>' -H 'Content-type: application/json' --data '{ "SQL_USER": "<username>" }'

需要注意的是,您可以从URL中获取<org-id>。它不是InfluxDB中您组织的实际名称。然后您为SQL_PASSW秘密做同样的事情。您实际上可以称它们为任何您想要的名字。现在您不必在查询中以明文形式放置用户名/密码

接下来,我将使用所有这些信息构建我的SQL查询

sq = sql.from(
  driverName: "mysql",
  dataSourceName: "${uname}:${pass}@tcp(localhost:3306)/IoTMeta",
  query: "SELECT * FROM Sensor_data, Customer_Data WHERE  Sensor_data.Sensor_ID = ${"\""+temperature.query+"\"  AND Sensor_data.measurement = \"temperature\" AND Sensor_data.CustomerID = Customer_Data.Customer_ID"}" //"SELECT * FROM Sensor_data WHERE Sensor_ID = ${"\""+temperature.query+"\" AND measurement = \"temperature\""}" //q //  humidity.query //"SELECT * FROM Sensor_Data WHERE Sensor_ID = \"THPL001\""// humidity.query 
)

您将看到我在SQL查询中使用了我第一个Flux查询的值。太酷了!您可能还会注意到,我在那个SQL查询中执行了一个join操作,以便我可以从数据库中的两个表中获取数据。这有多酷?接下来,我将格式化结果表,只包含我想要显示的列

fin = sq
|> map(fn: (r) => ({Sensor_id: r.Sensor_ID, Owner: r._Sensor_owner, Manufacturer: r.Sensor_mfg, MCU_Class: r.MCU_class, MCU_Vendor: r.MCU_vendor, Customer: r.Customer, Address: r.Address, Phone: r.phone}))

现在我有一个包含我的传感器所有元数据以及有关该传感器的客户联系数据的表。现在是时候施展魔法了

这是什么魔法?我有一个包含传感器所有元数据、一些客户数据以及传感器的读数的表?是的,我确实有。而且这里真正神奇的是:由于您可以同时从SQL数据库和InfluxDB桶中获取数据,因此您还可以将数据联合到一个单独的表中。

下面是我是如何做到这一点的

temp = from(bucket: "telegraf")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "temperature" and (r._field == "temp_c"))

得到了一个传感器数据的表。我已经有了SQL中的元数据表,所以……

j1 = join(tables: {temp: temp, fin: fin}, on: ["Sensor_id"] )
|> map(fn: (r) => ({_value: r._value, _time: r._time, Owner: r.Owner, Manufacturer: r.Manufacturer, MCU_Class: r.MCU_Class, MCU_Vendor: r.MCU_Vendor, Customer: r.Customer, Address: r.Address, Phone: r.phone}))
|> yield()

我已经根据公共元素(即 Sensor_id 字段)将这两个表连接起来,现在有一个表把所有信息都放在了一个地方!

你可以用无数种方法来使用这种合并来自不同来源数据的能力。我很乐意听听你如何实施类似的方法来更好地理解你的传感器部署。

我使用的是 InfluxDB 2.0 的 Alpha18 版本构建了所有这些,这就是我正在运行的版本——实际上,我基于 master 自定义构建了我的版本,因为我添加了一些 Flux 的功能,但这将是另一篇文章的主题。对于这些功能,开源 InfluxDB 2.0 的 Alpha 版本完全可以胜任。你绝对应该尝试一下!