使用 InfluxDB 作为物联网边缘 Historian

导航至

InfluxDB 越来越多地用于物联网解决方案中,以存储来自连接设备的数据。现在,它也可以在物联网边缘网关上用作数据 Historian,以分析、可视化并最终将聚合的物联网数据传输到中央服务器。在本文中,我们将研究三种简单的方法,您可以将物联网边缘设备上的 InfluxDB 实例连接到云中的另一个 InfluxDB 实例。

准备您的边缘 InfluxDB 实例

首先,我们需要一个本地 InfluxDB 实例用作我们的边缘 Historian。最简单的方法是在您的本地开发中运行官方 Docker 镜像

docker run -p 8086:8086 influxdb

然后,您需要通过访问 http://localhost:8086 并按照屏幕上的说明完成设置。记住您在设置期间使用的组织 - 您稍后需要该信息!

接下来,导航到加载数据 -> API 令牌屏幕,并获取在设置期间为您生成的令牌字符串 - 您稍后也需要它。

提示:默认令牌允许您控制 InfluxDB 中的一切 - 它非常强大。当将您的解决方案部署到生产环境时,您将需要创建一个新的、功能较弱的令牌,该令牌仅具有从特定存储桶读取或写入的权限。

然后使用这些值设置您的 Influx CLI,我们将在本文的其余部分广泛使用它

influx config create –active –name "edge" –host "http://localhost:8086" –org "your_edge_org" –token "your_edge_token"

现在是时候在此实例上设置一个存储桶来存储我们的设备数据了

influx bucket create --name "devices" --retention 1h

我们在此存储桶上使用 1 小时的保留策略。在您在功率相对较低的物联网边缘设备上收集大量高频传感器数据的真实场景中,您将没有太多空间来无限期地存储它。InfluxDB 的保留策略使您可以轻松地限制它。不过,不用担心丢失数据 - 本文将向您展示如何通过将其传输到您的云实例来长期保留数据。

模拟物联网设备

虽然本文的其余部分适用于您正在收集的任何物联网数据,但我们将通过使用模拟设备生成一些人工传感器数据来保持示例简单。为此,我们将使用 Telegraf 项目中的 new inputs.mock 插件。我们将生成两个字段:温度和湿度,就像您可能从廉价的 DHT 传感器获得的那样。

首先,我们将仅使用本文所需的插件生成一个新的 Telegraf 配置

telegraf --aggregator-filter none --processor-filter none --input-filter mock --output-filter influxdb_v2 config > telegraf.conf

这将为您创建一个名为 telegraf.conf 的新文件,其中包含 inputs.mockoutputs.influxdb_v2 的部分。

您需要使用以下数据填写这些部分

[[outputs.influxdb_v2]]
  urls = ["http://127.0.0.1:8086"]
  token = "your_edge_token"
  organization = "your_edge_org"
  bucket = "devices"

[[inputs.mock]]
  ## Set the metric name to use for reporting
  metric_name = "dht"

[[inputs.mock.stock]]
  name = "temperature"
  price = 20.0
  volatility = 0.05

[[inputs.mock.stock]]
  name = "humidity"
  price = 50.0
  volatility = 0.05

[inputs.mock.tags]
  "device_id" = "001"

第一部分告诉 Telegraf 如何连接到您的边缘实例以向其写入数据。使用您在设置边缘实例期间捕获的组织名称和令牌字符串。

第二部分定义了我们的模拟设备,其中包含 temperaturehumidity 字段以及 device_id 标签。在这里,我使用 inputs.mock.stock 类型来生成一个字段,该字段的值随时间随机但逐渐变化,就像温度和湿度一样。还有其他生成器类型,您可以用来更准确地模拟其他形式的数据。

提示:您可以使用多个 inputs.mock 部分来同时模拟多个设备,为每个设备赋予不同的 device_id 标签值。

我们还将默认 Telegraf 收集间隔更改为一秒,因为物联网设备通常不会在记录之间等待 10 秒,因此向上滚动到 agent 部分并将 interval 更改为 1s,如下所示

[agent]
  ## Default data collection interval for all inputs
  interval = "1s"

现在您可以使用此配置启动 Telegraf,并让它开始向您的边缘实例发送数据,同时我们准备好您的云实例

telegraf –config ./telegraf.conf

准备您的云 InfluxDB 实例

您的云实例将是您一次聚合来自多个边缘设备的数据的地方。它不必专门“在云中”,它可以运行在您的私有数据中心或任何其他地方,关键是它位于您的所有边缘设备都可以与之通信的地方。如果您还没有可用的实例,您可以在 InfluxDB Cloud 平台上注册免费帐户。本文中的所有内容都将在免费层级内运行,因此您无需花费任何费用即可继续操作。

与我们的边缘实例一样,您将需要从您的云实例收集主机、组织名称和访问令牌。然后我们将使用它们在 CLI 中创建第二个配置,以便您可以在您的边缘实例和您的云实例之间来回切换

influx config create –active –name "cloud" –host "your_cloud_host" –org "your_cloud_org" –token "your_cloud_token"

提示:您可以通过运行 influx config list 查看您的两个配置,并分别使用 influx config edgeinflux config cloud 在它们之间切换。然后我们将创建一个存储桶来存储来自我们边缘设备的数据

influx bucket create --name "edge_devices" --retention 30d

与我们边缘实例上的 devices 存储桶不同,这里我们将使用 30 天的保留策略,因为我们的云实例有足够的空间(并且它符合 InfluxDB Cloud 的免费层级使用限制)。

在您的边缘主机中存储云身份验证

此步骤是可选的,但强烈建议使用。您需要在 Flux 任务中使用您的云凭据才能从您的边缘实例连接到您的云实例,但是将这些凭据以纯文本形式保存在您的脚本中安全性很差。

因此,我们将使用 InfluxDB 的内置密钥存储将它们放入只写空间,我们可以使用以下命令从我们的脚本中引用这些密钥

influx config edge
influx secret update --key cloud_host  --value "your_cloud_host"
influx secret update --key cloud_org   --value "your_cloud_org"
influx secret update --key cloud_token --value "your_cloud_token"

基本存储和转发

最简单的设置是获取您边缘实例中的所有数据,并按原样将其转发到您的云实例。为此,在您的边缘实例上创建一个新任务,设置为每分钟运行一次。

您可以从 http://localhost:8086 的 UI 或使用 Flux 的 VisualStudio Code 插件 来完成此操作,我就是这样做的

Timeline - Add-Task

我们将为该任务使用以下 Flux 脚本

import "influxdata/influxdb/tasks"
import "influxdata/influxdb/secrets"

option task = {name: "northbound", every: 1m, offset: 0s}

cloud_host = secrets.get(key: "cloud_host")
cloud_org = secrets.get(key: "cloud_org")
cloud_token = secrets.get(key: "cloud_token")

from(bucket: "devices")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> set(key: "edge_id", value: "001")
|> to(host: cloud_host, org: cloud_org, token: cloud_token, bucket: "edge_devices")

让我们逐步了解一下

import "influxdata/influxdb/tasks"
import "influxdata/influxdb/secrets"

import 行将一些关键功能引入到我们任务的脚本中。第一行给了我们一个 tasks 包,我们将使用它来查找此任务上次成功运行的时间。第二行给了我们一个 secrets 对象,它将使我们能够访问我们之前保存到 InfluxDB 密钥存储的云凭据。

option task = {name: "northbound", every: 1m, offset: 0s}

当我们使用我们在 VSCode 表单中输入的值创建此新任务时,会自动为我们包含此行。这就是 InfluxDB 任务存储这些值的方式。

cloud_host = secrets.get(key: "cloud_host")
cloud_org = secrets.get(key: "cloud_org")
cloud_token = secrets.get(key: "cloud_token")

接下来的三行检索我们保存的密钥,并将它们放入 Flux 变量中。我们将这些变量传递给我们的 Flux to() 函数,以便它可以连接到我们的云实例。

from(bucket: "devices")
|> range(start: tasks.lastSuccess(orTime: -1h))

我们查询的第一部分指定要从我们的本地边缘实例获取哪些数据。我们正在从我们配置 Telegraf 写入的 devices 存储桶中读取数据。这里我们使用我们之前导入的 tasks 包来查找此任务上次运行的时间,这将使我们只获取尚未发送到云的新数据,从而节省一些带宽。它还使此任务更加健壮,因为如果它因任何原因(例如您的边缘和云之间的连接问题)而失败,则下一次运行将包含上一次运行中的所有数据。

orTime 参数提供了一个默认时间,该时间将在首次运行任务时使用。

|> set(key: "edge_id", value: "001")

下一行向我们的数据添加了一些额外信息。在典型的边缘物联网场景中,您将拥有多个边缘位置,并且每个位置都将拥有多个设备。我们已经从 Telegraf 收集了一个 device_id,但是该数据中没有任何信息说明它是从哪个边缘位置收集的。一旦我们从多个位置收集的数据在云中混合在一起,就很难区分它们。因此,在这里我们将在发送数据之前将 edge_id 注入到数据流中,以识别我们的边缘位置。

|> to(host: cloud_host, org: cloud_org, token: cloud_token, bucket: "edge_devices")

最后,我们准备好将数据发送到云端!最后一步只需要一个函数调用,即内置的 to() 函数,它可以将您的查询数据写入本地存储桶,或者在本例中,通过传入远程 InfluxDB 实例的身份验证凭据写入远程存储桶。

就是这样!保存您的任务,从现在开始,您的所有本地数据将每分钟自动复制到您的云实例中!

转发前进行降采样

虽然我们刚刚编写的任务很简单并且运行良好,但它仍然会向您的云端发送大量数据。在真实场景中,每个边缘都将有数十个(如果不是数百个)设备,其中一些设备将以每秒多次甚至微秒的速度生成数据!即使使用我们的模拟设备,我们每秒也会生成一个新值。对于您将在云中运行的高级用例来说,这可能是很多非常详细的数据,您很可能不需要这些数据,它只会占用额外的带宽来传输和额外的空间来存储。

相反,如果我们可以在发送数据之前减少数据量,即使这意味着丢失一些粒度,该怎么办?此过程称为降采样或物化视图,并且在将数据移动到越来越高的级别时非常常见。使用 Flux 非常容易做到这一点,就在我们的任务内部!它看起来是这样的

import "influxdata/influxdb/tasks"
import "influxdata/influxdb/secrets"

option task = {name: "northbound", every: 1m, offset: 0s}

cloud_host = secrets.get(key: "cloud_host")
cloud_org = secrets.get(key: "cloud_org")
cloud_token = secrets.get(key: "cloud_token")

from(bucket: "devices")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> set(key: "edge_id", value: "001")
|> to(host: cloud_host, org: cloud_org, token: cloud_token, bucket: "edge_devices")

如您所见,我们只需要在我们的 Flux 代码中添加一行

|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)

此行获取我们从存储桶中读取的数据,并计算滚动 5 分钟窗口内每个字段的平均(均值)值,将 300 个单独的测量值减少为一个。然后,像以前一样,将降采样数据发送到我们的云实例。

Flux 提供了许多内置的 聚合函数,除了 mean 之外,您还可以计算最小值、最大值、积分、标准差等等!如果您想要更复杂的东西,例如摆门压缩算法,您也可以定义自己的聚合函数,这完全取决于您。

为边缘的高弹性进行调整

到目前为止,我们有一个任务,它在发送到云端的数据方面既高效,又能应对短暂的连接问题。但是,如果这些连接问题持续超过一个小时怎么办?对于智能建筑或工厂来说,这可能不是问题,但如果您的边缘是风力涡轮机或石油钻井平台,则这确实有可能发生。由于我们收集原始数据的 devices 存储桶只有 1 小时的保留策略,因此任何超过该时间的停机都将导致数据丢失。我们可以延长保留时间,但是当您不得不保存越来越多的高容量数据时,存储空间就会成为一个问题。

解决此问题的方法是将我们较小的降采样数据在边缘存储更长的时间。为此,我们首先需要一个新的存储桶

influx config edge
influx bucket create --name northbound --retention 1d

对于我们的新存储桶,我们将其称为 northbound,我们使用 1 天的保留策略,而不是仅仅一小时。这将使我们的边缘实例在开始丢失数据之前断开连接整整 24 小时。

下一步是将我们的任务分成两个。首先,创建一个名为 aggregate_local 的新任务,并将其设置为每分钟运行一次。此任务将执行来自 devices 存储桶的数据的降采样,并将该数据存储在 northbound 存储桶中。

import "influxdata/influxdb/tasks"

option task = {name: "aggregate_local", every: 1m, offset: 0s}

from(bucket: "devices")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> set(key: "edge_id", value: "001")
|> to(bucket: "northbound")

此任务不再需要导入 influxdata/influxdb/secrets,或从密钥存储中获取凭据,因为它正在将我们的降采样数据写入本地存储桶。在此任务中,我们同时执行使用 aggregateWindow 进行的降采样和使用 set 函数注入的 edge_id,以便写入 northbound 的数据正是我们想要发送到云端的数据。

然后,第二个任务将从 northbound 存储桶中读取数据,并将其未经修改地发送到我们的云实例。使用名称 sync_northbound 创建此任务,并将其设置为每 5 分钟运行一次。

import "influxdata/influxdb/tasks"
import "influxdata/influxdb/secrets"

option task = {name: "sync_northbound", every: 5m, offset: 0s}

cloud_host = secrets.get(key: "cloud_host")
cloud_org = secrets.get(key: "cloud_org")
cloud_token = secrets.get(key: "cloud_token")

from(bucket: "northbound")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> to(host: cloud_host, org: cloud_org, token: cloud_token, bucket: "edge_devices")

在这里,我们再次使用密钥存储来检索我们的凭据,但是我们的查询不需要 aggregateWindowset 调用,因为我们已经在将数据写入 northbound 存储桶之前执行了这些步骤。

结论

现在我们已经在边缘运行了 InfluxDB,从连接的设备(在本例中是模拟设备)收集数据,临时存储数据并在边缘,同时将其传输到云中运行的另一个 InfluxDB 实例。我们经历了任务脚本的多次迭代,以通过降采样使此过程更有效,并通过延长本地保留时间来提高对连接问题的鲁棒性。所有这些都是仅使用 InfluxDB 和少量 Flux 脚本代码行完成的。

但这只是您可以做的事情的一小部分!一旦您开始根据自己的特定需求定制此设置,您就可以使用更智能的降采样算法来增强您的 Flux 脚本,使用来自 SQL 服务器的元数据增强您的设备数据,根据来自设备的阈值或数据丢失 触发警报 等等。