InfluxDB 作为物联网边缘历史记录器:爬行/行走/跑步方法

导航至

本文最初发表在The New Stack

InfluxDB - IoT Edge

图片由Tim VanderhoydonckUnsplash提供

如何将数据放入数据库是开发者面临的最基本的数据处理问题之一。当处理本地设备时,数据收集可能已经足够具有挑战性。从边缘设备添加数据则带来了一整套全新的挑战。然而,物联网边缘设备数量的指数级增长意味着公司需要从这些设备上收集数据的可靠和经过验证的方法。

以下是从边缘设备收集数据的三种不同方法。边缘设备具有不同的功能——处理能力、内存容量、连接性等——因此,为您的边缘计算用例找到正确的解决方案可能需要一些尝试和错误。然而,您可以用这些方法作为构建解决方案的起点。

为了便于理解,我们使用 InfluxDB 作为处理和存储解决方案,这里是 InfluxDB 云版本的目标目的地。这些示例中的每个边缘设备也运行 InfluxDB 的开源版本。我们使用 Flux 语言创建执行数据转换和注解的任务。

爬行:基本的存储和转发

这是将数据从边缘设备传输到 InfluxDB 云数据库的最基本设置。在这里,原始数据直接从边缘实例的 InfluxDB 写入云实例。

基本存储和转发过程如下

  1. 在边缘运行的 InfluxDB 实例中创建一个桶。我们将该桶称为“设备”,并为这些数据设置一个保留策略,保留时间为一小时。
  2. 在您的云实例的 InfluxDB 中创建一个桶。我们将该桶称为“edge_devices”,并设置 30 天的保留策略。
  3. 在 InfluxDB 中创建一个任务,将边缘的“devices”桶中的原始数据每分钟推送到云端的“edge_devices”桶。
  4. 向传入的数据注入“edge_host”标签。
# Create edge bucket "devices" with ~1hr retention policy
influx bucket create --name "devices" --retention 1h

# Create cloud bucket "edge_devices" with ~30 day retention policy
influx config cloud2
influx bucket create --name "edge_devices" --retention 30d

# Write a task that pushes raw data to "edge_devices" bucket every ~1 minute
import "influxdata/influxdb/tasks"
import "influxdata/influxdb/secrets"

option task = {name: "raw_northbound", every: 1m, offset: 0s}

cloud_host = secrets.get(key: "cloud_host")
cloud_org = secrets.get(key: "cloud_org")
cloud_token = secrets.get(key: "cloud_token")

from(bucket: "devices")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> set(key: "edge_id", value: "001")
|> to(host: cloud_host, org: cloud_org, token: cloud_token, bucket: "edge_devices")

# Inject an edge_id tag - See line 20

优点

  • 快速简单设置。
  • 边缘计算能力需求低。

缺点

  • 增加了云实例中不干净数据量。
  • 边缘数据连接需要更高的吞吐量。

行走:下采样并转发

在此方法中,边缘的原始数据在边缘内存中进行下采样,然后下采样数据被推送到云端,在那里它被写入InfluxDB。

基本存储和转发过程如下

  1. 在运行在边缘的InfluxDB实例中创建一个桶。再次,我们将该桶命名为“设备”,并为此数据设置一个一小时的保留策略。
  2. 在您的云实例中创建一个InfluxDB桶。我们将此桶命名为“edge_aggregate”,并使用30天的保留策略。
  3. 在您的边缘实例中创建一个任务,该任务将边缘“设备”桶中的原始数据下采样,并将下采样数据每分钟发送到云端的“edge_aggregate”桶。
  4. 向传入的数据注入“edge_host”标签。
# Create edge bucket "devices" with ~1hr retention policy
influx bucket create --name "devices" --retention 1h

# Create cloud bucket "edge_aggregate" with ~30 day retention policy
influx bucket create --name "edge_aggregate" --retention 30d

# Write a task that downsamples data and pushes to "edge_aggregate" bucket every ~1 minute
import "influxdata/influxdb/tasks"
import "influxdata/influxdb/secrets"

option task = {name: "aggregate_northbound", every: 1m, offset: 0s}

cloud_host = secrets.get(key: "cloud_host")
cloud_org = secrets.get(key: "cloud_org")
cloud_token = secrets.get(key: "cloud_token")

from(bucket: "devices")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> set(key: "edge_id", value: "001")
|> to(host: cloud_host, org: cloud_org, token: cloud_token, bucket: "edge_aggregate")

# Inject an edge_id tag - see line 20

优点

  • 快速简单设置。
  • 减少了发送到云端实例的未清理原始数据量。
  • 减少了边缘数据连接的吞吐量需求。

缺点

  • 需要更多的边缘计算能力。
  • 需要更多的边缘内存。
  • 需要边缘存储。

运行:鲁棒的边缘历史记录器

在此第三个示例中,我们在边缘做了更多的工作。在这里,数据历史记录器的角色实际上被移动到了边缘,因为InfluxDB的边缘实例收集、处理和写入数据。

遵循之前方法的步骤,原始数据在边缘进行下采样,但现在我们将下采样数据写入边缘的InfluxDB实例。最后,保存的下采样数据被推送到云端的InfluxDB实例。

  1. 在运行在边缘的InfluxDB实例中创建一个桶。将该桶命名为“设备”并为此数据设置一个一小时的保留策略。
  2. 在边缘的InfluxDB上创建另一个桶,并将其命名为“northbound”,保留策略为一天。
  3. 在您的云实例中创建一个InfluxDB桶。我们将此桶命名为“edge_aggregate”,并使用30天的保留策略。
  4. 在您的边缘实例中创建一个任务,将边缘“设备”桶中的原始数据下采样,并将下采样数据每分钟写入边缘的“northbound”桶。
  5. 在边缘创建另一个任务,每五分钟将“northbound”中的数据推送到云端的“edge_aggregate”。
  6. 向传入的数据注入“edge_host”标签。
# Create edge bucket "devices" with ~1hr retention policy
influx bucket create --name "devices" --retention 1h

# Create edge bucket "northbound" with ~1 day retention policy
influx bucket create --name northbound --retention 1d

# Create cloud bucket "edge_aggregate" with ~30 day retention policy
influx bucket create --name "edge_aggregate" --retention 30d

# Write a task that downsamples data into "northbound" bucket every ~1 minute
import "influxdata/influxdb/tasks"

option task = {name: "aggregate_local", every: 1m, offset: 0s}

from(bucket: "devices")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> set(key: "edge_id", value: "001")
|> to(bucket: "northbound")

# Write a task that pushes from "northbound" to "edge_aggregate" every ~5 minutes
import "influxdata/influxdb/tasks"
import "influxdata/influxdb/secrets"

option task = {name: "sync_northbound", every: 5m, offset: 0s}

cloud_host = secrets.get(key: "cloud_host")
cloud_org = secrets.get(key: "cloud_org")
cloud_token = secrets.get(key: "cloud_token")

from(bucket: "northbound")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> to(host: cloud_host, org: cloud_org, token: cloud_token, bucket: "edge_aggregate")

# Inject an edge_id tag - see line 18

优点

  • 快速简单设置。
  • 显著减少了发送到云端实例的未清理原始数据量。
  • 减少了边缘数据连接的吞吐量需求。
  • 对云和边缘之间的连接中断具有弹性。

缺点

  • 需要更多的边缘计算能力。
  • 需要显著更多的边缘内存。
  • 需要边缘存储容量。

如我们所注,这些方法的每个方法都有其优缺点,因此请选择最适合您的技术堆栈和用例的方法。

您可以在分布式边缘设备上复制或调整这些任务,以最大限度地利用可用资源。

无论您选择哪种方法,此类解决方案都可以为您提供更好的边缘设备可观察性。它还有助于通过减少中央存储实例中的数据集大小来简化整个生态系统中的数据处理。