InfluxDB 和 GeoData - 应急发电机
作者:Jay Clifford / 产品, 用例, 开发者
2021 年 11 月 11 日
导航至
随着 LTE(长期演进)的广泛使用,我们看到越来越多的物联网设备在地球的偏远地区上线。
设想一下这种情况: 一个国家目前正经历因电网故障引发的国家紧急状态。为了缓解电力短缺,政府在其国家的偏远地区部署了发电机,为最偏远的村庄供电。问题是?由于应急发电机燃料耗尽,村庄仍在报告停电。
那么,作为物联网专家,我们如何应对这种情况?这篇博文将部署 InfluxDB 的地理数据工具来转换和可视化远程发电机传感器生成的数据,目的是提高监控和响应时间。
地理数据
什么是地理数据?地理数据可以包含两种不同形式的地理数据,即空间数据和时空数据。空间数据与描述与地球上某个位置关联的信息(例如指标)有关。一个常见的例子是经度和纬度。经度和纬度本质上是一对数值,与地球上的特定点相关
- (纬度:48.858093,经度:2.294694)= 埃菲尔铁塔
- (纬度:51.510357,经度:-0.116773)= 大本钟
您明白了吧。在这种情况下,我们将使用经度和纬度将我们的应急发电机放置在地图上。时空数据是空间和时间的结合。时空数据的一个有效例子是在某个区域内对某个物种进行索引,在一段时间内,这些统计数据会根据环境因素而波动。
我们的用例可以松散地与时空数据相关联。我们正在存储
- 发电机的地点(空间)
- 发电机随时间的状态(时间)
解决方案
这篇博文的模拟器代码和 InfluxDB 模板可以在这里找到。
模拟器
由于我们没有在现场监控的应急发电机组,我们必须利用我们的想象力和 Python 来生成我们的场景。我不会讨论模拟器的实现细节,但此图表让您对它正在做什么有一个总体了解。
现在我们的数据已在 InfluxDB 中,让我们看看我们有什么
结果 | 表格 | _time | _value | _field | _measurement | generatorID |
0 | 2021-10-15T13:32:11Z | 624 | 压力 | generator_stats | generator6 | |
1 | 2021-10-15T13:32:01Z | 619 | 燃料 | generator_stats | generator5 | |
2 | 2021-10-15T13:32:28Z | 86 | 温度 | generator_stats | generator2 | |
3 | 2021-10-15T13:32:03Z | -91.58045 | 经度 | generator_stats | generator3 | |
4 | 2021-10-15T13:31:43Z | 44.27804 | 纬度 | generator_stats | generator1 | |
5 | 2021-10-15T13:31:43Z | 174 | 压力 | generator_stats | generator1 | |
6 | 2021-10-15T13:31:48Z | 415 | 压力 | generator_stats | generator4 | |
7 | 2021-10-15T13:31:48Z | 33 | 温度 | generator_stats | generator4 | |
8 | 2021-10-15T13:32:03Z | 48 | 温度 | generator_stats | generator3 |
您可以在上表中看到,我们有 6 个不同的发电机输出
- 温度
- 燃油液位
- 压力
- 经度和纬度
数据准备
在 InfluxDB Cloud 的地图面板中可视化我们的数据之前,我们需要准备我们的数据。值得庆幸的是,Flux 使这变得非常简单
注意:在本用例中,我们做了一个关键假设,即应急发电机在每个样本中都发送其地理数据。地理数据可能不是由静态传感器生成的,或者仅在首次初始化时生成。有关我们可以使用变量静态设置这些功能的特性建议,请参见附录。
from(bucket: "emergency _generator")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "generator_stats")
|> filter(fn: (r) => r["_field"] == "lon" or r["_field"] == "lat" or r["_field"] == "fuel")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> rename(columns: {fuel: "_value"})
分解
- 选择开始时间和停止时间范围之间的数据。
- 筛选 measurement 等于 "generator_stats" 的数据
- 然后筛选该数据子集,仅返回包含数据的经度、纬度和燃油液位的字段。
- 转换表格,使 _fields 列值变为列。与这些字段关联的数据(当前在 _value 列中)移动到适当的列下。
- 我们将名为 "fuel" 的新列重命名回 "_value"(地图可视化工具需要此列)。
现在我们的数据看起来像这样
结果 | 表格 | _time | _measurement | generatorID | _value | 纬度 | 经度 |
0 | 2021-10-15T12:40:57.103569828Z | generator_stats | generator1 | 784 | 44.27804 | -88.27205 | |
1 | 2021-10-15T12:40:36.03140913Z | generator_stats | generator2 | 624 | 34.09668 | -117.71978 | |
2 | 2021-10-15T12:50:42.905288313Z | generator_stats | generator3 | 756 | 41.6764 | -91.58045 |
InfluxDB 地图
我们的数据已准备就绪!开始可视化。
-
- 创建一个新的仪表板并选择地图标题。
- 打开查询编辑器并复制粘贴我们之前制作的 Flux 查询并提交查询。
- 接下来,导航到地图的自定义面板。取消选中对经纬度使用 S2 单元 ID。S2 单元 ID 是一个计算出的地理位置,用于聚合多个汇总到一个的绘图点。这非常适合跨越多个区域且具有许多点的用例(S2 为这些用例提供可扩展性和稳健性)。查看这篇精彩文章以获取更多信息)。由于我们的发电机具有单个位置点且不移动,因此在本示例中我们将使用原始经度和纬度。更改纬度和经度下拉列表以匹配我们的 Flux 查询列。它应该看起来像这样。
- 最后,让我们将阈值更新为交通信号灯系统。这对于一目了然地监控每个绘制发电机的燃油液位非常重要。
- 创建一个新的仪表板并选择地图标题。
现在选择绿色勾号,您就完成了!我进行了一些创造性的自由来完成仪表板,现在看起来像这样:
Grafana Geomap
对于我们的 Grafana 高级用户来说,也有好消息。这种可视化也可以使用 Geomap 面板实现
- 单击添加面板。
- 从可视化面板中选择 Geomap。
- 与地图一样,我们添加我们的 Flux 查询。注意:我们必须采取一些额外的步骤来清理我们的数据,为 Grafana 做好准备。我们删除 _start 和 _stop 列,因为此可视化不需要它们。然后,我们执行 group(),它将每个数据帧组合成一个。
- 最后,我们在 Geomap 选项面板中选择我们的纬度和经度字段。
我采用了我们的 InfluxDB 仪表板模板并进行了重现:
结论
在正确的上下文中,地理数据是一个强大的工具。让我们回到我们的场景:通过使用地图标题,我们在地理地图上绘制了每个应急发电机,并附加了从每个发电机收集的燃油液位。这允许操作员大规模监控每个发电机,并根据燃油短缺情况规划燃料路线(我们可能会在以后的博客中介绍这一点)。
您觉得怎么样?我很高兴听到您对地理数据以及您目前如何使用它的想法!前往 InfluxData Slack 和 社区论坛(只需 @Jay Clifford)。让我们在那里继续讨论。
附录
添加静态地理数据
注意:这目前在 InfluxDB 中是不可能的,因为变量只能在仪表板 UI 中选择。如果您希望能够从 Flux 查询中选择变量值,请支持此功能请求。
如上文“数据准备”部分所述,可能需要您提供手动地理数据才能绘制地图。我们可以通过使用变量来做到这一点
- 让我们以我们的 6 个发电机为例,为每个发电机创建一个键值对(<generatorID,":")
- 在创建我们的 Flux 查询时,我们使用 map() 函数而不是 pivot()
import "strings" from(bucket: "emergency _generator") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r["_measurement"] == "generator_stats") |> filter(fn: (r) => r["_field"] == "fuel") |> map(fn: (r) => ({ r with lat: strings.split(v: v.gen_geo_data[r.generatorID], t: ":")[0], lon: strings.split(v: v.gen_geo_data[r.generatorID], t: ":")[1]}))
这将向我们的表格添加 lat 和 lon 列。对于每一行,generatorID 都用作我们的键值对变量的输入。然后我们使用 split() 使用冒号 (:) 作为分隔符来分解返回的坐标字符串。
SQL 方式
最后,我们可以考虑时间序列和关系存储的混合。让我们考虑以下场景。在部署应急发电机期间,我们在两个单独的数据存储中注册它们的详细信息
- 关系型:此数据库将保存我们的静态数据:(GenID,地理数据,制造商等)
- 时间序列:此数据库将保存我们的时间数据(GenID、燃料、温度、压力)
注意:GenID 在我们的两个数据存储中保持不变。很快就会明白为什么。
方便的是,Flux 允许我们摄取我们的关系数据并将其与我们的时间数据结合起来。让我们分解以下 Flux 查询
// Import the "sql" package
import "sql"
// Query data from PostgreSQL
GenGeo = sql.from(
driverName: "postgres",
dataSourceName: "postgresql://localhost?sslmode=disable",
query: "SELECT GenID, lat, lon FROM generators"
)
// Query data from InfluxDB
GenTemporal =from(bucket: "emergency _generator")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "generator_stats")
|> filter(fn: (r) => r["_field"] == "fuel")
// Join InfluxDB query results with PostgreSQL query results
join(tables: {metric: GenTemporal, info: GenGeo}, on: ["GenID"])
- 导入我们的 SQL 库。
- 创建一个名为 GenGeo 的变量。GenGeo 保存来自我们的 sql.from() 查询的结果。我们的查询提取列 GenID、lat 和 lon。
- 创建一个名为 GenTemporal 的变量并存储我们原始查询的结果(省略 lat 和 lon 字段)。
- 最后,我们执行 join()。这允许我们将存储在我们变量中的两个表合并。GenID 提供“锚点”。如果 GenID(GenGeo) = GenID(GenTempoeral),则将 EngineID 的 lat 和 lon 添加到时间序列行。
如您所见,有许多途径可以达到相同的结果。每条路线都有其自身的价值和缺点。这就是为什么灵活性对于任何平台架构都至关重要。