使用InfluxDB作为物联网边缘历史记录器
作者:Michael Hall / 产品,用例,开发者
2022年3月17日
导航到
InfluxDB越来越多地被用于物联网解决方案中以存储连接设备的数据。现在它也可以用作物联网边缘网关的数据历史记录器,用于分析、可视化,并最终将聚合的物联网数据传输到集中的服务器。在这篇文章中,我们将探讨三种简单的方法,您可以将您边缘设备上的InfluxDB实例连接到云中的另一个InfluxDB实例。
准备您的InfluxDB边缘实例
首先,我们需要一个本地的InfluxDB实例来用作我们的边缘历史记录器。最简单的方法是在本地开发环境中运行官方的Docker镜像
docker run -p 8086:8086 influxdb
然后,您需要通过访问https://127.0.0.1:8086并遵循屏幕上的说明来完成设置。记住您在此设置期间使用的组织名称——您稍后需要这些信息!
接下来,导航到“加载数据” -> “API令牌”页面,并获取在设置期间为您生成的令牌字符串——您稍后也需要它。
提示:默认令牌允许您控制InfluxDB中的所有内容——它非常强大。当您将解决方案部署到生产环境中时,您将需要创建一个新令牌,该令牌的权限较弱,只能从特定的桶中读取或写入。
然后使用这些值设置您的Influx CLI,我们将在本文的其余部分广泛使用它
influx config create –active –name "edge" –host "https://127.0.0.1:8086" –org "your_edge_org" –token "your_edge_token"
现在,我们需要在这个实例上设置一个桶来存储我们的设备数据
influx bucket create --name "devices" --retention 1h
我们在这个桶上使用1小时的保留策略。在现实世界中,您可能在一个相对低功耗的物联网边缘设备上收集大量的高频传感器数据,您不会有太多空间来无限期地存储它。InfluxDB的保留策略使您很容易对它进行限制。不过,不用担心丢失数据——本文将向您展示如何通过将其传输到云实例来长期保留这些数据。
模拟物联网设备
虽然本文的其余部分适用于您收集的任何物联网数据,但我们将通过使用模拟设备来生成一些人工传感器数据来保持示例简单。为此,我们将使用来自Telegraf项目的新的inputs.mock插件。我们将生成两个字段:温度和湿度,就像您可能从廉价的DHT传感器中获得的那样。
首先,我们将使用仅适用于本文的插件生成一个新的Telegraf配置
telegraf --aggregator-filter none --processor-filter none --input-filter mock --output-filter influxdb_v2 config > telegraf.conf
这将为您创建一个名为 telegraf.conf
的新文件,其中包含 inputs.mock
和 outputs.influxdb_v2
的部分。
您需要填写以下数据到这些部分
[[outputs.influxdb_v2]]
urls = ["http://127.0.0.1:8086"]
token = "your_edge_token"
organization = "your_edge_org"
bucket = "devices"
并且
[[inputs.mock]]
## Set the metric name to use for reporting
metric_name = "dht"
[[inputs.mock.stock]]
name = "temperature"
price = 20.0
volatility = 0.05
[[inputs.mock.stock]]
name = "humidity"
price = 50.0
volatility = 0.05
[inputs.mock.tags]
"device_id" = "001"
第一部分告诉 Telegraf 如何连接到您的边缘实例以写入数据。使用您在边缘实例设置过程中捕获的组织名称和令牌字符串。
第二部分定义了我们的模拟设备,包含 temperature
和 humidity
字段以及 device_id
标签。在这里,我使用 inputs.mock.stock
类型来生成一个值随时间随机但逐渐变化的字段,就像温度和湿度一样。您还可以使用其他生成器类型来更精确地模拟其他形式的数据。
提示:您可以使用多个 inputs.mock
部分同时模拟多个设备,并为每个设备分配不同的 device_id
标签值。
我们还将默认的 Telegraf 收集间隔更改为每秒一次,因为物联网设备通常不会在记录之间等待10秒,所以滚动到代理部分,将 interval
改为 1s
,如下所示
[agent]
## Default data collection interval for all inputs
interval = "1s"
现在您可以使用此配置启动 Telegraf,并让它开始将数据发送到您的边缘实例,同时我们准备您的云实例
telegraf –config ./telegraf.conf
准备您的 InfluxDB 云实例
您的云实例将是您一次从多个边缘设备聚合数据的地方。它不必是“在云中”具体,它可以在您的私有数据中心或任何其他地方运行,关键在于它位于所有边缘设备都能与之通信的地方。如果您还没有可用的此类实例,您可以在 InfluxDB 云平台上 注册一个免费账户。本文中的所有内容都将运行在免费层内,因此您跟随它不会产生任何费用。
就像我们的边缘实例一样,您需要从您的云实例中收集主机、组织名称和访问令牌。然后我们将使用它们在 CLI 中创建第二个配置,这样您就可以在边缘实例和云实例之间切换
influx config create –active –name "cloud" –host "your_cloud_host" –org "your_cloud_org" –token "your_cloud_token"
提示:您可以通过运行 influx config list 来查看您的两个配置,并使用 influx config edge
或 influx config cloud
分别在它们之间切换。然后我们将创建一个存储从我们的边缘设备传入的数据的 bucket
influx bucket create --name "edge_devices" --retention 30d
与我们的边缘实例上的 devices
bucket 不同,这里我们将使用30天的保留策略,因为我们的云实例有足够的空间(并且它符合 InfluxDB 云的免费层使用限制)。
将云身份验证存储在您的边缘主机上
此步骤是可选的,但强烈推荐。您需要使用云凭据在 Flux 任务中连接到您的云实例,但将凭据以纯文本形式保留在脚本中是安全性较差的做法。
因此,我们将利用 InfluxDB 内置的秘密存储来将这些放入一个只写空间,我们可以在脚本中使用以下命令从中引用它们
influx config edge
influx secret update --key cloud_host --value "your_cloud_host"
influx secret update --key cloud_org --value "your_cloud_org"
influx secret update --key cloud_token --value "your_cloud_token"
基本存储和转发
最简单的设置是从您的边缘实例获取所有数据,并将其原样转发到云实例。为此,在您的边缘实例上创建一个新的任务,设置为每分钟运行一次。
您可以通过用户界面在https://127.0.0.1:8086进行此操作,或使用VisualStudio Code插件,这就是我这样做的方式。
我们将使用以下Flux脚本来完成这项任务。
import "influxdata/influxdb/tasks"
import "influxdata/influxdb/secrets"
option task = {name: "northbound", every: 1m, offset: 0s}
cloud_host = secrets.get(key: "cloud_host")
cloud_org = secrets.get(key: "cloud_org")
cloud_token = secrets.get(key: "cloud_token")
from(bucket: "devices")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> set(key: "edge_id", value: "001")
|> to(host: cloud_host, org: cloud_org, token: cloud_token, bucket: "edge_devices")
让我们一步一步来分析。
import "influxdata/influxdb/tasks"
import "influxdata/influxdb/secrets"
导入行将一些关键功能引入我们的任务脚本中。第一行为我们提供了一个任务包,我们将使用它来找到此任务上次成功运行的时间。第二行提供了一个密钥对象,它将使我们能够访问之前保存到InfluxDB密钥存储中的云凭证。
option task = {name: "northbound", every: 1m, offset: 0s}
此行是在我们创建此新任务时自动为我们包含的,使用了我们在VSCode表单中输入的值。这就是InfluxDB任务存储这些值的方式。
cloud_host = secrets.get(key: "cloud_host")
cloud_org = secrets.get(key: "cloud_org")
cloud_token = secrets.get(key: "cloud_token")
接下来的三行检索我们的保存的密钥,并将它们放置在Flux变量中。我们将将这些变量传递给我们的Flux to()函数,以便它可以连接到我们的云实例。
from(bucket: "devices")
|> range(start: tasks.lastSuccess(orTime: -1h))
查询的第一部分指定从我们的本地边缘实例获取什么数据。我们正在从配置Telegraf写入的设备桶中读取数据。在这里,我们使用之前导入的任务包来找到此任务上次运行的时间,这将使我们仅获取尚未发送到云的新数据,从而节省一些带宽。这也使此任务更健壮,因为如果由于任何原因(例如边缘和云之间的连接问题)它失败,下一次运行将包括前一次运行的所有数据。
orTime
参数提供了一个默认时间,第一次运行任务时将使用该时间。
|> set(key: "edge_id", value: "001")
下一行添加了一些额外的信息到我们的数据中。在一个典型的边缘物联网场景中,您将有多个边缘位置,每个位置将有多个设备。我们已经在Telegraf中收集了一个device_id,但该数据中没有说明是从哪个边缘位置收集的。一旦来自多个位置的数据在云中混合在一起,就很难区分它们。因此,在发送数据之前,我们将注入一个edge_id到数据流中,以标识我们的边缘位置。
|> to(host: cloud_host, org: cloud_org, token: cloud_token, bucket: "edge_devices")
最后,我们准备好将数据发送到云了!最后一步只需要一个函数调用,即内置的to()函数,它可以写入查询数据到本地桶,或者在这种情况下,通过传递远程InfluxDB实例的认证凭证,写入远程桶。
就是这样!保存您的任务,从现在开始,您所有的本地数据将每分钟自动复制到您的云实例中!
转发前降采样
虽然我们刚才编写的任务很简单且运行良好,但它仍然会将大量数据发送到您的云。在现实世界中,您在每个边缘上可能会有数十个甚至数百个设备,其中一些设备每秒会产生多次或甚至微秒级的数据。即使在我们模拟的设备中,我们每秒也会产生一个新值。这将是大量您可能不需要用于在云中运行的高级用例的非常详细的数据,它只会占用额外的带宽来传输和额外的空间来存储。
相反,如果我们能在发送之前减少数据的量,即使这意味着丢失一些粒度,那会怎么样?这个过程被称为降采样或物化视图,当将数据提升到更高的层级时,这极为常见。使用Flux进行操作也非常简单,就在我们的任务中!下面是它的样子
import "influxdata/influxdb/tasks"
import "influxdata/influxdb/secrets"
option task = {name: "northbound", every: 1m, offset: 0s}
cloud_host = secrets.get(key: "cloud_host")
cloud_org = secrets.get(key: "cloud_org")
cloud_token = secrets.get(key: "cloud_token")
from(bucket: "devices")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> set(key: "edge_id", value: "001")
|> to(host: cloud_host, org: cloud_org, token: cloud_token, bucket: "edge_devices")
如您所见,我们只需在Flux代码中添加一行
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
这行代码从我们的桶中读取数据,并计算每个字段在5分钟滚动窗口内的平均值,将300个单独的测量值缩减为一个。然后,这些降采样数据就像之前一样被发送到我们的云实例。
Flux提供了许多内置的聚合函数,除了mean
之外,您还可以计算最小值、最大值、积分、标准差等等!如果您想实现更复杂的函数,例如摆门压缩算法,一切皆由您决定。
针对边缘高可靠性进行适配
到目前为止,我们有一个任务,它在向云发送数据方面既高效,又能应对短暂的连接问题。但如果连接问题持续超过一小时呢?这对智能建筑或工厂来说可能不是问题,但如果您的边缘是风力涡轮机或石油钻井平台,这确实可能发生。因为我们的设备桶只保留1小时的数据保留策略,任何超过这个时间的故障都会导致数据丢失。我们可以延长保留期,但这样存储空间就成为一个问题,因为您需要保存越来越多的数据。
解决这个问题的方法是将较小的降采样数据存储在边缘更长时间。要做到这一点,我们首先需要一个新桶
influx config edge
influx bucket create --name northbound --retention 1d
对于我们的新桶,我们将其命名为northbound
,我们使用1天的保留策略,而不是只有1小时。这将允许我们的边缘实例在完全断开连接24小时后才开始丢失数据。
下一步是将任务分成两部分。首先,创建一个名为aggregate_local
的新任务,并设置为每分钟运行一次。这个任务将执行从devices
桶中的数据降采样,并将其存储在northbound
桶中。
import "influxdata/influxdb/tasks"
option task = {name: "aggregate_local", every: 1m, offset: 0s}
from(bucket: "devices")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> set(key: "edge_id", value: "001")
|> to(bucket: "northbound")
这个任务不再需要导入influxdata/influxdb/secrets
或从密钥存储中获取凭证,因为它将我们的降采样数据写入本地桶。在这个任务中,我们使用aggregateWindow
执行降采样,并使用set
函数注入edge_id
,这样写入northbound的数据就正好是我们想要发送到云的数据。
第二个任务将读取northbound
桶中的数据,并将其未经修改地发送到我们的云实例。创建一个名为sync_northbound
的任务,并设置为每5分钟运行一次。
import "influxdata/influxdb/tasks"
import "influxdata/influxdb/secrets"
option task = {name: "sync_northbound", every: 5m, offset: 0s}
cloud_host = secrets.get(key: "cloud_host")
cloud_org = secrets.get(key: "cloud_org")
cloud_token = secrets.get(key: "cloud_token")
from(bucket: "northbound")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> to(host: cloud_host, org: cloud_org, token: cloud_token, bucket: "edge_devices")
在这里,我们再次使用密钥存储检索我们的凭证,但我们的查询不需要aggregateWindow
或set
调用,因为我们已经在这之前执行了这些步骤,在将数据写入northbound
桶之前。
结论
现在我们已经在边缘运行InfluxDB,从连接的设备(在本例中是模拟设备)收集数据,暂时存储在边缘,同时也将其发送到云中运行的另一个InfluxDB实例。我们通过多次迭代任务脚本来使这个过程更高效,通过扩展本地保留期来提高对连接问题的鲁棒性。所有这些仅使用InfluxDB和少量的Flux脚本就完成了。
这只是您能做的事情的一小部分!一旦您开始根据自身具体需求调整此设置,您可以使用更智能的下采样算法增强您的Flux脚本,使用来自SQL服务器的元数据增强您的设备数据,根据阈值或设备数据丢失触发警报,以及更多。