InfluxDB 作为 IoT Edge Historian:爬取/步行/跑步方法

导航至

本文最初发表于 The New Stack

InfluxDB - IoT Edge

照片由 Tim Vanderhoydonck 在 Unsplash 上拍摄

如何将数据导入数据库是开发者面临的最基本的数据处理问题之一。当您处理本地设备时,数据收集可能已经足够具有挑战性。从边缘设备添加数据会带来一系列全新的挑战。然而,物联网边缘设备 的指数级增长意味着公司需要经过验证且可靠的方法来从中收集数据。

以下是从边缘设备收集数据的三种不同方法。边缘设备具有不同的功能——处理能力、内存容量、连接性等——因此,为您的 边缘计算 用例找到合适的解决方案可能需要一些试验和错误。但是,您可以将这些方法用作构建解决方案的起点。

对于上下文,我们使用 InfluxDB 作为处理和存储解决方案,InfluxDB 的云版本是此处的最终目标。在这些示例中的每个边缘设备上也运行着 InfluxDB 的开源版本。我们使用 Flux 语言来创建执行数据转换和注释的任务。

爬取:基本存储和转发

这是将数据从边缘设备传输到 InfluxDB 云数据库的最基本设置。此处,原始数据直接从 InfluxDB 的边缘实例写入云实例。

基本存储和转发方法的过程如下所示

  1. 在边缘上运行的 InfluxDB 实例中创建一个存储桶。我们将存储桶命名为“devices”,并将该数据的保留策略设置为一小时。
  2. 在您的 InfluxDB 云实例中创建一个存储桶。我们将此存储桶命名为“edge_devices”,并将其设置为 30 天的保留策略。
  3. 在 InfluxDB 中创建一个任务,该任务每分钟将来自边缘“devices”存储桶的原始数据推送到云中“edge_devices”存储桶。
  4. 将“edge_host”标签注入到传入数据中。
# Create edge bucket "devices" with ~1hr retention policy
influx bucket create --name "devices" --retention 1h

# Create cloud bucket "edge_devices" with ~30 day retention policy
influx config cloud2
influx bucket create --name "edge_devices" --retention 30d

# Write a task that pushes raw data to "edge_devices" bucket every ~1 minute
import "influxdata/influxdb/tasks"
import "influxdata/influxdb/secrets"

option task = {name: "raw_northbound", every: 1m, offset: 0s}

cloud_host = secrets.get(key: "cloud_host")
cloud_org = secrets.get(key: "cloud_org")
cloud_token = secrets.get(key: "cloud_token")

from(bucket: "devices")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> set(key: "edge_id", value: "001")
|> to(host: cloud_host, org: cloud_org, token: cloud_token, bucket: "edge_devices")

# Inject an edge_id tag - See line 20

优点

  • 快速且易于设置。
  • 需要很少的边缘计算能力。

缺点

  • 增加了云实例中不干净数据的数量。
  • 需要更高的边缘数据连接吞吐量。

步行:下采样和转发

在这种方法中,来自边缘的原始数据在边缘进行内存中下采样,并且将下采样的数据推送到云端,在云端将其写入 InfluxDB。

基本存储和转发方法的过程如下所示

  1. 在边缘上运行的 InfluxDB 实例中创建一个存储桶。同样,我们将存储桶命名为“devices”,并将该数据的保留策略设置为一小时。
  2. 在您的 InfluxDB 云实例中创建一个存储桶。我们将此存储桶命名为“edge_aggregate”,并将其设置为 30 天的保留策略。
  3. 在您的 InfluxDB 边缘实例中创建一个任务,该任务对来自边缘“devices”存储桶的原始数据进行下采样,并将下采样的数据每分钟发送到云中“edge_aggregate”存储桶。
  4. 将“edge_host”标签注入到传入数据中。
# Create edge bucket "devices" with ~1hr retention policy
influx bucket create --name "devices" --retention 1h

# Create cloud bucket "edge_aggregate" with ~30 day retention policy
influx bucket create --name "edge_aggregate" --retention 30d

# Write a task that downsamples data and pushes to "edge_aggregate" bucket every ~1 minute
import "influxdata/influxdb/tasks"
import "influxdata/influxdb/secrets"

option task = {name: "aggregate_northbound", every: 1m, offset: 0s}

cloud_host = secrets.get(key: "cloud_host")
cloud_org = secrets.get(key: "cloud_org")
cloud_token = secrets.get(key: "cloud_token")

from(bucket: "devices")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> set(key: "edge_id", value: "001")
|> to(host: cloud_host, org: cloud_org, token: cloud_token, bucket: "edge_aggregate")

# Inject an edge_id tag - see line 20

优点

  • 快速且易于设置。
  • 减少了发送到云实例的不干净原始数据量。
  • 降低了边缘数据连接吞吐量需求。

缺点

  • 需要更多的 边缘计算 能力。
  • 需要更多边缘内存。
  • 需要边缘存储。

跑步:强大的边缘 Historian

在第三个示例中,我们在边缘端做了更多的工作。在这里,数据 Historian 的角色实际上被转移到了边缘,因为 InfluxDB 的边缘实例收集、处理和写入该数据。

按照先前方法的步骤,原始数据在边缘端进行下采样,但现在我们将下采样的数据也写入到边缘端的 InfluxDB 实例中。最后,将保存的下采样数据推送到 InfluxDB 的云实例。

  1. 在边缘上运行的 InfluxDB 实例中创建一个存储桶。将存储桶命名为“devices”,并将该数据的保留策略设置为一小时。
  2. 在边缘的 InfluxDB 中创建另一个存储桶,并将其命名为“northbound”,保留策略为一天。
  3. 在您的 InfluxDB 云实例中创建一个存储桶。我们将此存储桶命名为“edge_aggregate”,并将其设置为 30 天的保留策略。
  4. 在您的 InfluxDB 边缘实例中创建一个任务,该任务对来自边缘“devices”存储桶的原始数据进行下采样,并将下采样的数据每分钟写入边缘端的“northbound”存储桶。
  5. 在边缘端创建另一个任务,该任务每五分钟将数据从“northbound”推送到云端的“edge_aggregate”。
  6. 将“edge_host”标签注入到传入数据中。
# Create edge bucket "devices" with ~1hr retention policy
influx bucket create --name "devices" --retention 1h

# Create edge bucket "northbound" with ~1 day retention policy
influx bucket create --name northbound --retention 1d

# Create cloud bucket "edge_aggregate" with ~30 day retention policy
influx bucket create --name "edge_aggregate" --retention 30d

# Write a task that downsamples data into "northbound" bucket every ~1 minute
import "influxdata/influxdb/tasks"

option task = {name: "aggregate_local", every: 1m, offset: 0s}

from(bucket: "devices")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> set(key: "edge_id", value: "001")
|> to(bucket: "northbound")

# Write a task that pushes from "northbound" to "edge_aggregate" every ~5 minutes
import "influxdata/influxdb/tasks"
import "influxdata/influxdb/secrets"

option task = {name: "sync_northbound", every: 5m, offset: 0s}

cloud_host = secrets.get(key: "cloud_host")
cloud_org = secrets.get(key: "cloud_org")
cloud_token = secrets.get(key: "cloud_token")

from(bucket: "northbound")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> to(host: cloud_host, org: cloud_org, token: cloud_token, bucket: "edge_aggregate")

# Inject an edge_id tag - see line 18

优点

  • 快速且易于设置。
  • 显著减少了发送到云实例的不干净原始数据量。
  • 降低了边缘数据连接吞吐量需求。
  • 能够抵抗云端和边缘端之间的连接中断。

缺点

  • 需要更多边缘计算能力。
  • 需要显著更多的边缘内存。
  • 需要边缘存储容量。

正如我们所指出的,每种方法都有其优点和缺点,因此请选择最适合您的技术堆栈和用例的方法。

您可以在分布式边缘设备之间复制或调整这些任务,以最大限度地利用可用资源。

无论您选择哪种方法,这种类型的解决方案都可以让您更好地观察边缘设备。它还有助于通过减小中央存储实例中的数据集大小来简化整个生态系统中的数据处理。