使用 InfluxDB 和 AWS 创建物联网数据管道

导航到

本文最初发布在 The New Stack,并经授权在此重发。

可靠的数据管道对于工业至关重要,它提供了有关机械状态的关键信息,并用于训练预测模型以改进运营。

iot-data-pipeline

物联网(IoT)和运营技术(OT)领域充满了时序数据。传感器以稳定和大量的速度产生带时间戳的数据,处理所有这些数据可能是一项挑战。但您不希望收集一大堆数据,却让它只是闲置在那里,占用存储空间而不提供任何价值。

因此,为了从 IoT/OT 数据中获得价值,让我们看看如何使用 InfluxDB 和 亚马逊网络服务(AWS) 配置传感器以收集由 工业 设备生成的时间戳数据。您可以调整、扩展和扩展这里的概念以满足完整工业生产的需求。以下是我们将在示例中使用的工具以及原因。

  • InfluxDB:这个时序数据库刚刚发布了一个更新版本,在查询高基数数据、压缩和数据摄取等几个关键领域优于之前的版本。对于本例来说,至关重要的是 InfluxDB 的云产品 可在 AWS 中使用。将所有东西放在同一个地方可以减少许多情况中的数据传输和数据延迟问题。
  • Telegraf:Telegraf 是一个开源、基于插件的收集代理。轻量级且用 Go 语言编写,您可以在任何地方部署 Telegraf 实例。拥有超过 300 个可用的插件,Telegraf 可以从任何来源收集数据。您还可以编写自定义插件来收集尚未有插件的数据来源的数据。
  • AWS:亚马逊网络服务提供了一系列针对 物联网 的工具和服务。我们可以利用这些服务来简化数据处理和分析。
  • M5stickC+:这是一款简单的物联网设备,可以检测包括位置、加速度指标、陀螺仪指标、湿度、温度和压力在内的多种测量值。此设备提供多个数据流,类似于工业操作员在制造设备中面临的情况。

InfluxDB 和 AWS IoT 实际应用

以下示例展示了多种可能的数据管道之一,您可以根据各种工业设备的需要对其进行扩展。这可能包括工厂地面的机器或现场分布式设备。让我们先快速了解一下概述;然后我们将深入了解细节。

基本数据流是

设备 → AWS IoT Core → MQTT(规则/路由)→ Kinesis → Telegraf → InfluxDB → 可视化

aws-iot-core

M5stickC+设备被设置为直接与AWS进行身份验证。数据进入AWS IoT Core,这是一个使用MQTT将数据添加到特定于设备的主题的AWS服务。然后有一个规则选择通过该主题的任何数据,并将其重定向到监控工具Amazon Kinesis

在AWS中运行的虚拟机还运行了两个Docker容器。一个包含Telegraf的实例,另一个包含InfluxDB的实例。Telegraf从Kinesis收集数据流并将其写入InfluxDB。Telegraf还使用DynamoDB表作为此设置中的检查点,因此如果容器或实例关闭,当应用程序再次启动时,它将在数据流中的正确位置重新启动。

数据进入InfluxDB后,我们可以使用它来创建可视化。

好的,这就是基本的数据管道。现在,我们如何让它工作呢?

设备固件

第一步是从M5设备到AWS创建数据连接。为此,我们使用UI Flow,这是M5堆栈中的一部分拖放编辑器。查看下面的图像中的块,以了解设备正在收集什么数据以及这些数据如何映射到我们最终输出的内容。

device-firmware

在这里,我们可以看到这些数据发布到MQTT主题 m5sticks/MyThingGG/sensors

AWS IoT Core规则

设备数据发布到MQTT代理后,接下来我们需要在AWS IoT Core中订阅该主题。在主题过滤器字段中,输入m5sticks/+/sensors以确保所有来自设备的所有数据都进入MQTT主题。

接下来,我们需要创建另一个规则,以确保MQTT主题中的数据进入Kinesis。在IoT Core中,您可以使用SQL查询来完成此操作

SELECT *, topic(2) as thing, 'sensors' as measurement, timestamp() as timestamp FROM 'm5sticks/+/sensors'

在工业环境中,每个设备都应该有一个唯一名称。因此,为了将此数据管道扩展以容纳多个设备,我们使用MQTT主题中的+通配符,以确保所有设备的所有数据都进入正确的位置。

此查询向数据添加时间戳,以确保它符合InfluxDB数据模型的行协议

Telegraf策略

现在数据从设备流向Kinesis,我们需要将其放入InfluxDB。为此,我们使用Telegraf。下面的代码是确定Telegraf如何与Kinesis交互的策略。它启用了Telegraf从Kinesis流中读取的能力,并启用了对DynamoDB的读取和写入访问权限以进行检查点。

{
 "Version": "2012-10-17"
 "Statement": [
 {
 "Sid": "AllowReadFromKinesis",
 "Effect": "Allow",
 "Action": [
 "kinesis:GetShardIterator",
 "kinesis:GetRecords",
 "kinesis:DescribeStream"
 ],
 "Resource": [
 "arn:aws:kinesis:eu-west-3:xxxxxxxxx:stream/InfluxDBStream"
 ]
 },
 {
 "Sid": "AllowReadAndWriteDynamoDB",
 "Effect": "Allow",
 "Action": [
 "dynamodb:PutItem",
 "dynamodb:GetItem"
 ],
 "Resource": [
 "arn:aws:kinesis:eu-west-3:xxxxxxxxx:table/influx-db-telegraf"
 ] 
 }
 ]
}

Telegraf配置

以下Telegraf配置使用Docker容器网络,Telegraf Kinesis Consumer插件从Kinesis读取数据,以及InfluxDB v2输出插件将数据写入InfluxDB。请注意,字符串字段与设备固件UI中的值相匹配。

[agent]
debug = true

[[outputs.influxdb_v2]]

## The URLs of the InfluxDB Cluster nodes.
##
## Multiple URLs can be specified for a single cluster, only ONE of the
## urls will be written to each interval.
## urls exp: http://127.0.0.1:8086
urls = ["http://influxdb:8086"]

## Token for authentication.
token = "toto-token"

## Organization is the name of the organization you wish to write to; must exist.
organization = "aws"

## Destination bucket to write into.
bucket = "toto-bucket"

[[inputs.kinesis_consumer]]
## Amazon REGION of kinesis endpoint.
region = "eu-west-3"

## Amazon Credentials
## Credentials are loaded in the following order
## 1) Web identity provider credentials via STS if role_arn and web_identity_token_file are specified
## 2) Assumed credentials via STS if role_arn is specified
## 3) explicit credentials from 'access_key' and 'secret_key'
## 4) shared profile from 'profile'
## 5) environment variables
## 6) shared credentials file
## 7) EC2 Instance Profile

## Endpoint to make request against, the correct endpoint is automatically
## determined and this option should only be set if you wish to override the
## default.
##   ex: endpoint_url = "https://127.0.0.1:8000"
# endpoint_url = ""

## Kinesis StreamName must exist prior to starting telegraf.
streamname = "InfluxDBStream"

## Shard iterator type (only 'TRIM_HORIZON' and 'LATEST' currently supported)
# shard_iterator_type = "TRIM_HORIZON"

## Max undelivered messages
## This plugin uses tracking metrics, which ensure messages are read to
## outputs before acknowledging them to the original broker to ensure data
## is not lost. This option sets the maximum messages to read from the
## broker that have not been written by an output.
##
## This value needs to be picked with awareness of the agent's
## metric_batch_size value as well. Setting max undelivered messages too high
## can result in a constant stream of data batches to the output. While
## setting it too low may never flush the broker's messages.
# max_undelivered_messages = 1000

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "json"

## Tag keys is an array of keys that should be added as tags.
tag_keys = [
 "thing"
]

## String fields is an array of keys that should be added as string fields.
json_string_fields = [
 "pressure",
 "xGyr",
 "yAcc",
 "batteryPower",
 "xAcc",
 "temperature",
 "zAcc",
 "zGyr",
 "y",
 "x",
 "yGry",
 "humidity"

]

## Name key is the key used as the measurement name.
json_name_key = "measurement"

## Time key is the key containing the time that should be used to create the
## metric.
json_time_key "timestamp"

## Time format is the time layout that should be used to interpret the
## json_time_key. The time must be 'unix', 'unix_ms' or a time in the 
## "reference_time".
##   ex: json_time_format = "Mon Jan 2 15:04:05 -0700 MST 2006"
##       json_time_format = "2006-01-02T15:04:05Z07:00"
##       json_time_format = "unix"
##       json_time_format = "unix_ms"
json_time_format = "unix_ms"

## Optional
## Configuration for a dynamodb checkpoint
[inputs.kinesis_consumer.checkpoint_dynamodb]
## unique name for this consumer
app_name = "default"
table_name = "influx-db-telegraf"

Docker Compose

此Docker Compose文件通过SSH上传到EC2实例。

version: '3'

services:
  influxdb:
    image: influxdb:2.0.6
    volumes:
      # Mount for influxdb data directory and configuration
      - influxdbv2:/root/.influxdbv2
    ports:
      - "8086:8086"
# Use the influx cli to set up an influxdb instance
  influxdb_cli:
    links:
      - influxdb
    image: influxdb:2.0.6
# Use these same configurations parameters in you telegraf configuration, mytelegraf.conf
 # Wait for the influxd service in the influxdb has been fully bootstrapped before trying to set up an influxdb
   restart: on-failure:10
   depends_on:
   - influxdb
  telegraf:
    image: telegraf:1.25-alpine
    links:
      - influxdb
    volumes:
      # Mount for telegraf config
      - ./telegraf.conf:/etc/telegraf/telegraf.conf
    depends_on:
      - influxdb_cli

volumes:
  influxdbv2:

可视化

一旦您的数据进入InfluxDB,运行InfluxDB的用户可以使用您选择的工具创建可视化和仪表板。InfluxDB提供与Grafana的原生集成,并支持使用Flight SQL兼容的工具进行SQL查询。

结论

构建可靠的数据管道是工业运营的关键方面。这些数据提供了关于机械和设备当前状态的关键信息,可以训练用于改进机械操作和效率的预测模型。结合InfluxDB和AWS等领先技术,可以提供捕获、存储、分析和可视化这些关键过程数据的必要工具。

这里描述的使用案例只是构建数据管道的一种方式。您可以根据广泛的工业物联网/运营技术对其进行更新、扩展和修改,或者使用InfluxDB和AWS提供的物联网解决方案构建完全定制的解决方案。