InfluxDB和GeoData - 应急发电机
作者:Jay Clifford / 产品,用例,开发者
2021年11月11日
导航到
随着LTE(长期演进)的广泛应用,我们正在看到越来越多的物联网设备在我们的星球偏远地区上线。
设想以下场景:一个国家目前正在经历因电网故障而引发的全国性紧急情况。为了缓解电力短缺,政府已在偏远地区部署了发电机,以向最偏远的村庄供电。问题是,村庄仍然报告停电,因为应急发电机燃料耗尽。
那么我们作为物联网专家如何应对这种情况?本博客文章将使用InfluxDB的地理数据工具来转换和可视化远程发电机传感器产生的数据,目的是提高监控和响应速度。
地理数据
那么什么是地理数据?地理数据可以包含两种不同形式的地理数据,空间和时空。空间数据与描述与地球上的某个地点关联的信息(如指标)有关。一个常见的例子是经纬度。经纬度基本上是一组数值,与地球上的一个特定点相关
- (纬度:48.858093,经度:2.294694) = 埃菲尔铁塔
- (纬度:51.510357,经度:-0.116773) = 大本钟
您应该明白了。在这个场景中,我们将使用经纬度将我们的应急发电机放置在地图上。时空数据是空间和时间的数据集合。一个有效的时空数据例子是在某个地区索引一个物种,在这些统计数字在一段时间内根据环境因素波动。
我们的用例可以大致与时空数据相关联。我们正在存储
- 发电机位置(空间)
- 发电机随时间变化的状态(时间)
解决方案
本博客的模拟器代码和InfluxDB模板可以在这里找到。
模拟器
由于我们没有现成的应急发电机车队进行监控,我们必须运用想象力以及Python来生成我们的场景。我不会讨论模拟器的实现细节,但以下图表可以给您一个整体的概念。
现在我们已经将数据存储到InfluxDB中,让我们看看我们有什么
结果 | 表格 | _time | _value | _field | _measurement | generatorID |
0 | 2021-10-15T13:32:11Z | 624 | pressure | generator_stats | generator6 | |
1 | 2021-10-15T13:32:01Z | 619 | fuel | generator_stats | generator5 | |
2 | 2021-10-15T13:32:28Z | 86 | temperature | generator_stats | generator2 | |
3 | 2021-10-15T13:32:03Z | -91.58045 | lon | generator_stats | generator3 | |
4 | 2021-10-15T13:31:43Z | 44.27804 | lat | generator_stats | generator1 | |
5 | 2021-10-15T13:31:43Z | 174 | pressure | generator_stats | generator1 | |
6 | 2021-10-15T13:31:48Z | 415 | pressure | generator_stats | generator4 | |
7 | 2021-10-15T13:31:48Z | 33 | temperature | generator_stats | generator4 | |
8 | 2021-10-15T13:32:03Z | 48 | temperature | generator_stats | generator3 |
您可以在上面的表格中看到,我们拥有6个不同的发电机输出
- 温度
- 燃料水平
- 压力
- 经度和纬度
数据准备
在我们可以将数据可视化在InfluxDB Cloud的地图面板中之前,我们需要准备我们的数据。幸运的是,Flux让这个过程变得非常简单。
注意:在本用例中,我们假设应急发电机在每次样本发送时都会发送其地理数据。可能的情况是,静态传感器不会产生地理数据,或者仅在第一次初始化时产生。请参阅附录中的功能建议,其中我们可以使用变量来设置这些静态值。
from(bucket: "emergency _generator")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "generator_stats")
|> filter(fn: (r) => r["_field"] == "lon" or r["_field"] == "lat" or r["_field"] == "fuel")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> rename(columns: {fuel: "_value"})
分解
- 选择开始时间和结束时间范围内的数据。
- 过滤包含测量值等于"generator_stats"的数据。
- 然后过滤这部分数据,只返回包含数据经度、纬度和燃油水平字段的数据。
- 将表转换,使_fields列的值成为列。与这些字段相关联的数据(目前位于_value列中)移动到相应的列下。
- 我们将新列重命名为"fuel",并将其改回"_value"(地图可视化工具期望此列)。
我们的数据现在看起来像这样
结果 | 表格 | _time | _measurement | generatorID | _value | lat | lon |
0 | 2021-10-15T12:40:57.103569828Z | generator_stats | generator1 | 784 | 44.27804 | -88.27205 | |
1 | 2021-10-15T12:40:36.03140913Z | generator_stats | generator2 | 624 | 34.09668 | -117.71978 | |
2 | 2021-10-15T12:50:42.905288313Z | generator_stats | generator3 | 756 | 41.6764 | -91.58045 |
InfluxDB地图
我们的数据已准备好!接下来是可视化。
-
- 创建一个新的仪表板并选择地图标题。
- 打开查询编辑器,复制并粘贴我们之前创建的Flux查询,并提交查询。
- 接下来,导航到地图的自定义面板。取消勾选使用S2 Cell ID用于经纬度。S2 cell id是用于聚合多个绘图点并合并为一个的地理位置计算。这对于跨越多个区域且有很多点的用例非常出色(S2为这些用例提供可扩展性和鲁棒性)。查看这篇文章了解更多信息)。由于我们的发电机有一个单一的位置点并且不会移动,因此我们将使用原始的经度和纬度来演示此示例。将纬度和经度下拉菜单更改为与我们Flux查询列匹配。它应该看起来像这样。
- 最后,让我们将阈值更新为交通灯系统。这对于一目了然地监控绘图中每个发电机的燃油水平将非常重要。
现在选择绿色勾选,你就可以完成了!我在完成仪表板时进行了一些创意发挥,现在看起来像这样:
Grafana Geomap
对于我们的Grafana高级用户,也有好消息。这个可视化也可以使用Geomap面板实现。
- 点击添加面板。
- 从可视化面板中选择地理图。
- 像在地图上一样,我们添加Flux查询。 注意:为了准备Grafana,我们必须进行一些额外的步骤来清理我们的数据。我们删除_start和_stop列,因为它们对于这个可视化不是必需的。然后,我们执行一个group()函数,将每个数据帧合并成一个。
- 最后,我们从地理图选项面板中选择我们的纬度和经度字段。
我使用了InfluxDB仪表板模板并进行了重现:
结论
在正确的环境中,地理数据是一个强大的工具。让我们回到我们的场景:通过使用地图标题,我们在地理图上绘制了每个应急发电机的位置,并附加了从每个发电机的收集到的燃料水平。这使得操作员能够以规模监控每个发电机,并根据燃料短缺规划燃料路线(我们可能在以后的博客中涉及这一点)。
那么您怎么看?我很高兴听到您对地理数据以及您当前如何使用它的看法!请访问InfluxData Slack和社区论坛(请确保@Jay Clifford)。让我们在那里继续讨论。
附录
添加静态地理数据
注意:目前无法在InfluxDB中完成此操作,因为变量只能在选择仪表板UI中选择。如果您想能够在Flux查询中选择变量值,请支持此功能请求。
如上“数据准备”部分所述,您可能需要提供手动地理数据来绘制地图。我们可以通过使用变量来完成此操作
- 让我们以我们的6个发电机为例,为每个创建一个键值对(
- 在创建我们的Flux查询时,我们使用map()函数而不是pivot()
import "strings" from(bucket: "emergency _generator") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r["_measurement"] == "generator_stats") |> filter(fn: (r) => r["_field"] == "fuel") |> map(fn: (r) => ({ r with lat: strings.split(v: v.gen_geo_data[r.generatorID], t: ":")[0], lon: strings.split(v: v.gen_geo_data[r.generatorID], t: ":")[1]}))
这将向我们的表格添加lat和lon列。对于每一行,generatorID用作我们的键值对变量的输入。然后我们使用split()函数使用冒号(:)作为分隔符拆分返回的坐标字符串。
SQL方法
最后,我们可以考虑时间序列和关系存储的混合。让我们考虑以下场景。在部署应急发电机时,我们将它们的详细信息注册到两个不同的数据存储中
- 关系型:此数据库将存储我们的静态数据:(GenID,地理数据,制造商等。)
- 时间序列:此数据库将存储我们的时间数据(GenID,燃料,温度,压力)
注意:GenID在我们两个数据存储中保持一致。这很快就会变得明显。
Flux允许我们摄取我们的关系型数据并将其与我们的时间序列数据相结合。让我们分解以下Flux查询
// Import the "sql" package
import "sql"
// Query data from PostgreSQL
GenGeo = sql.from(
driverName: "postgres",
dataSourceName: "postgresql://127.0.0.1?sslmode=disable",
query: "SELECT GenID, lat, lon FROM generators"
)
// Query data from InfluxDB
GenTemporal =from(bucket: "emergency _generator")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "generator_stats")
|> filter(fn: (r) => r["_field"] == "fuel")
// Join InfluxDB query results with PostgreSQL query results
join(tables: {metric: GenTemporal, info: GenGeo}, on: ["GenID"])
- 导入我们的SQL库。
- 创建一个名为 GenGeo 的变量。 GenGeo 存储了我们的sql.from() 查询结果。我们的查询检索了 GenID、lat 和 lon 列。
- 创建一个名为 GenTemporal 的变量,并存储原始查询的结果(省略 lat 和 lon 字段)。
- 最后,我们执行一个 join() 操作。这允许我们将存储在变量中的两个表合并在一起。GenID 提供了“锚点”。 如果 GenID(GenGeo) = GenID(GenTemporal),则将 EngineID 的 lat 和 lon 添加到时间序列行。
如你所见,有多个途径可以达到相同的结果。每种途径都有其自身的优点和缺点。这就是为什么灵活性对于任何平台架构都至关重要。