使用 Flux 从 MySQL 获取 IoT 传感器元数据
作者:David G. Simmons / 用例, 开发者, 产品
2019年12月05日
导航至
如果您已部署 IoT 解决方案,则必须决定在何处以及如何存储所有数据。至少从我的角度来看,存储传感器数据最好和最简单的地方当然是 InfluxDB。我说这话您一定不会感到惊讶。但是您需要存储的其他数据呢?关于传感器的数据?例如传感器制造商、投入使用的日期、客户 ID、它运行在什么类型的平台上。您知道,所有传感器元数据。
当然,一种解决方案是将所有这些内容简单地作为标签添加到 InfluxDB 中的传感器数据中,然后继续您的一天。但是您真的想将所有传感器数据与每个数据点一起存储吗?在当时,很多事情看起来都是好主意,但是当现实来临时,很快就会演变成糟糕的主意。由于大多数元数据不会经常更改,并且可能还与客户信息相关联,因此最适合它的地方很可能是在传统的 RDBMS 中。很可能您已经拥有一个包含客户数据的 RDBMS,那么为什么不继续利用这项投资呢?正如我反复说过的,这不是存储传感器数据的最佳场所。因此,现在您的 IoT 数据存储在两个不同的数据库中。您如何访问它并将其合并到一个地方,以便您可以看到所有数据?
Flux 是答案
告诉我您看到了这一点。您肯定看到了这一点。好吧,公平地说,您可能已经看到了,因为毕竟,您将如何通过 Flux 获取基于 SQL 的数据?这就是 Flux 的美妙之处:它是可扩展的!因此,我们现在有一个扩展,允许您通过 Flux 从 MySQL、MariaDB 或 Postgres 读取数据。当我听说这个 SQL 连接器已准备就绪时,我不得不尝试一下。我将向您展示我构建的内容以及构建方式。
构建客户数据库
首先要做的是构建一个包含一些客户信息的 MySQL 数据库。我创建了一个名为 IoTMeta
的新数据库,我在其中放置了一个包含一些传感器元数据的表。我还添加了另一个表,其中包含有关这些传感器的客户信息。
非常基本的表,真的。Sensor_ID
字段我填充了与我的 InfluxDB 实例中的 Sensor_id
标签对应的数据。我敢打赌您已经可以看到我要做什么了。我添加了一堆数据
因此,现在我的传感器元数据数据库包含有关我在此处运行的每个传感器的一些信息,以及有关谁拥有这些传感器的“客户数据”。现在是时候将所有这些都拉入一些有用的东西中了。
使用 Flux 查询数据
首先,我在 Flux 中构建了一个查询来获取一些传感器数据,但我实际上对传感器数据本身不感兴趣。我正在寻找一个标识标签值:Sensor_id
。这个查询看起来有点奇怪,但最终会变得有意义,我保证。
temperature = from(bucket: "telegraf")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "temperature" and (r._field == "temp_c"))
|> last()
|> map(fn: (r) => {
return { query: r.Sensor_id }
}) |> tableFind(fn: (key) => true) |> getRecord(idx: 0)
它返回一个单行表,然后提取 Sensor_id
标签,您可能会说“什么?”请记住:Flux 以表格形式返回所有内容。我需要的是本质上是该表中的标量值。在这种情况下,它是所讨论标签的字符串值。这就是您执行此操作的方式。
接下来,我将获取我的 MySQL 数据库的用户名和密码,这些用户名和密码方便地存储在 InfluxDB 密钥存储中。
uname = secrets.get(key: "SQL_USER")
pass = secrets.get(key: "SQL_PASSW")
等等,我是如何将这些值放入此密钥存储中的?好的,让我们退后一步。
curl -XPATCH http://localhost:9999/api/v2/orgs/<org-id>/secrets -H 'Authorization: Token <token>' -H 'Content-type: application/json' --data '{ "SQL_USER": "<username>" }'
需要注意的一件事是,您从 URL 中获取 <org-id>
。它不是您在 InfluxDB 中的组织的实际名称。然后,您对 SQL_PASSW
密钥执行相同的操作。您可以随意调用它们。 现在您不必在查询中以纯文本形式输入用户名/密码。
接下来,我将使用所有这些来构建我的 SQL 查询
sq = sql.from(
driverName: "mysql",
dataSourceName: "${uname}:${pass}@tcp(localhost:3306)/IoTMeta",
query: "SELECT * FROM Sensor_data, Customer_Data WHERE Sensor_data.Sensor_ID = ${"\""+temperature.query+"\" AND Sensor_data.measurement = \"temperature\" AND Sensor_data.CustomerID = Customer_Data.Customer_ID"}" //"SELECT * FROM Sensor_data WHERE Sensor_ID = ${"\""+temperature.query+"\" AND measurement = \"temperature\""}" //q // humidity.query //"SELECT * FROM Sensor_Data WHERE Sensor_ID = \"THPL001\""// humidity.query
)
您会看到我在 SQL 查询中使用了我的第一个 Flux 查询中的值。酷!您可能还会注意到,我在该 SQL 查询中执行了 join
,以便我可以从数据库中的两个表中获取数据。这有多酷?接下来,我将格式化结果表,使其仅包含我要显示的列
fin = sq
|> map(fn: (r) => ({Sensor_id: r.Sensor_ID, Owner: r._Sensor_owner, Manufacturer: r.Sensor_mfg, MCU_Class: r.MCU_class, MCU_Vendor: r.MCU_vendor, Customer: r.Customer, Address: r.Address, Phone: r.phone}))
我现在有一个表,其中包含有关我的传感器的所有元数据,以及有关该传感器的所有客户联系数据。是时候进行一些魔法了
这是什么巫术?我有一个表,其中包含有关传感器的所有元数据、一些客户数据以及和传感器读数?是的。我确实有。这是真正神奇的事情:由于您可以从 SQL 数据库和 InfluxDB 存储桶中获取数据,因此您还可以将这些数据连接到一个表中。
这是我如何做到的
temp = from(bucket: "telegraf")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "temperature" and (r._field == "temp_c"))
为我获取传感器数据表。我已经有一个来自 SQL 的元数据表,所以 …
j1 = join(tables: {temp: temp, fin: fin}, on: ["Sensor_id"] )
|> map(fn: (r) => ({_value: r._value, _time: r._time, Owner: r.Owner, Manufacturer: r.Manufacturer, MCU_Class: r.MCU_Class, MCU_Vendor: r.MCU_Vendor, Customer: r.Customer, Address: r.Address, Phone: r.phone}))
|> yield()
我只需在公共元素(Sensor_id
字段)上连接这两个表,我就拥有了一个将所有内容都放在一个表中的表!
您可以使用此功能合并来自不同来源的数据的方式有很多。我很想听听您将如何实现类似的功能,以更好地了解您的传感器部署。
我使用 InfluxDB 2.0 的 Alpha18 版本完成了所有这些操作,这就是我运行的版本 — 实际上,我从 master
自定义构建了我的版本,因为我对 Flux 进行了一些添加,这是另一篇文章的全部内容。对于这些内容,OSS InfluxDB 2.0 的 Alpha 版本可以正常工作。您绝对应该尝试一下!