MQTT、Telegraf 和 InfluxDB Cloud v3 入门
作者:Anais Dotis-Georgiou / 开发者
2024 年 11 月 19 日
导航至
如果您正在寻找深入了解物联网数据收集的世界,您可能已经接触过 MQTT——一种轻量级消息传递协议,旨在高效地在设备之间传输数据。在这篇文章中,我们将逐步介绍如何使用 Telegraf(一种插件驱动的代理,用于收集、处理和写入指标)从 MQTT 代理(我们将使用 Mosquitto,一种流行的开源 MQTT 代理)收集数据,并将其发送到 InfluxDB Cloud v3,这是一种为存储和分析实时生成器数据(如温度、负载、功率和燃料使用情况)而构建的时间序列数据库。无论您是跟踪智能家居设备还是监控任何其他使用 MQTT 的设备,我们都能满足您的需求。让我们开始吧!本博客文章的对应存储库可以在此处找到。 这是 InfluxDB Cloud Serverless UI 中三个发电机的燃料值的屏幕截图,由 Telegraf 使用 MQTT Consumer Plugin 收集。
要求和运行
有多种方法可以在此存储库中运行示例,但推荐且最有效的方法是使用 Docker。Docker 让您可以轻松启动模拟器,而无需担心手动依赖项安装。要使用 Docker,请按照以下步骤操作
- 克隆存储库。
git clone https://github.com/InfluxCommunity/MQTT_Simulators.git
- 在 telegraf.com 中包含您的 InfluxDB 凭据
- 构建 Docker 镜像
docker build emergency_generator/. -t emergency-generator:latest
- 最后,使用 Docker Compose 部署模拟器
docker-compose up -d
那些喜欢在本地不使用 Docker 运行模拟器的人员必须手动安装 Mosquitto MQTT 代理并设置 Python 环境。
-
在您的机器上安装 Mosquitto
sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa sudo apt-get update sudo apt-get install mosquitto
-
启动 Mosquitto 代理
sudo systemctl enable mosquitto sudo systemctl start mosquitto
- 克隆存储库并导航到模拟器
git clone https://github.com/InfluxCommunity/MQTT_Simulators.git
- 安装所需的 Python 包
python3 -m pip install --no-cache-dir -r requirements.txt
- 设置您的环境变量
export GENERATORS=3 export BROKER=localhost
- 运行模拟器
python3 src/emergency_generator.py
虽然这两种方法都可以让您启动并运行,但 Docker 简化了流程,避免了潜在的配置问题,并确保了环境的一致性。
代码演练
该存储库包含几个示例,但我们将重点介绍 emergency_generator 示例。我们将分解模拟器的核心组件、MQTT 发布者和 Telegraf 配置,以便您清楚地了解所有内容是如何组合在一起的。通过本次演练结束时,您将对代码的工作原理以及如何将其应用到您的用例中有一个扎实的理解。
此脚本模拟紧急发电机,生成温度、负载、功率和燃油油位等数据。关键组件是
- emergency_generator 类:表示单个发电机。它使用 ID、位置(使用 Faker 获取随机美国坐标)和基本燃料油位初始化发电机,并根据负载计算温度、负载、功率和燃料使用量。每个发电机都通过
returnTemperature
、returnPower
和returnFuelLevel
返回数据来模拟其性能。 - runEmergencyGenerator 函数:此函数创建 emergency_generator 类的实例,连接到 MQTT 代理,并以随机间隔(5 到 15 秒之间)持续将发电机健康数据发布到 MQTT 代理。数据发布到名为 emergency_generator 的主题。
- 线程:该脚本可以使用 Python 的 threading 模块并发运行多个发电机实例。
GENERATORS
环境变量控制发电机的数量,每个线程运行一个单独的发电机,持续发布其状态。
此脚本处理将数据发布到 MQTT 代理
- mqtt_publisher 类:此类连接到 MQTT 代理,并提供用于将数据发送(发布)到 MQTT 主题的方法。它可以使用标准或安全连接(使用用户名和密码)连接到代理。
- publish_to_topic 方法:此方法格式化发电机数据并将其发布到指定的 MQTT 主题,并将发电机 ID 附加到主题名称。消息在发布之前被序列化为 JSON。
此配置文件用于 Telegraf,这是一种指标收集代理,用于从 MQTT 代理拉取数据并将其发送到 InfluxDB。
[[inputs.mqtt_consumer]]
:此输入插件订阅 MQTT 主题(在本例中为 #,它订阅所有主题)。数据应为 JSON 格式,它使用 json_v2 解析器处理传入的 MQTT 消息以提取发电机数据(例如,generatorID)。[[outputs.influxdb_v2]]
:此输出插件配置 Telegraf 将收集的数据发送到 InfluxDB 实例,指定 URL、身份验证令牌和数据将存储的目标存储桶。
这是 Mosquitto MQTT 代理的配置文件。
allow_anonymous true
:允许匿名连接到代理,这意味着客户端可以发布和订阅而无需身份验证。listener 1883
:配置代理侦听端口 1883 以进行 MQTT 连接。注释表明,在初始设置期间,您可以使用未加密的连接来生成安全通信证书。获得证书后,您可以禁用未加密的侦听器。
利用 Telegraf
Telegraf 是使用 MQTT 收集物联网数据的绝佳工具,因为它轻量级、高度可定制且支持实时数据摄取。凭借其原生的 mqtt_consumer 插件,Telegraf 可以轻松订阅 MQTT 主题,处理各种格式的消息(如 JSON),并直接将数据发送到 InfluxDB 或其他数据库。它高效处理大量数据的能力,加上最小的资源开销,使其成为低延迟、可扩展性和灵活性至关重要的物联网环境的理想选择。此外,Telegraf 庞大的插件库允许与其他系统无缝集成,从而增强物联网数据监控和分析。
Telegraf 在从 MQTT 代理摄取数据并将其转换为可以写入 InfluxDB 的格式方面发挥着至关重要的作用。让我们分解一下如何使用 Telegraf 使用以下负载处理 JSON 格式的发电机数据
{
"generatorID": "generator1",
"lat": 40.68066,
"lon": -73.47429,
"temperature": 186,
"power": 186,
"load": 2,
"fuel": 277
}
为了捕获此数据,我们配置了 Telegraf mqtt_consumer 插件。此插件订阅 MQTT 代理上的主题并侦听传入消息。以下是处理负载并提取关键字段的配置
[[inputs.mqtt_consumer]]
servers = ["tcp://mosquitto:1883"]
topics = ["#"]
qos = 2
connection_timeout = "30s"
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
measurement_name = "genData"
[[inputs.mqtt_consumer.json_v2.object]]
path = "@this"
disable_prepend_keys = true
tags = ["generatorID"]
在此配置中
servers
定义了您的 MQTT 代理的地址。topics
设置为 #,这意味着我们订阅所有主题。如果需要,您可以指定更精确的主题。data_format
设置为 json_v2,这告诉 Telegraf 负载是 JSON 格式。- 在 JSON 解析部分内部,我们将
measurement_name
定义为 “genData”,这意味着来自此输入的所有数据都将写入 InfluxDB 中该指标下。 - 我们使用
json_v2.object
处理器来指定应将整个 JSON 对象视为原样,同时提取 generatorID 作为标签,以便在 InfluxDB 中更好地进行筛选和查询。
使用此配置,Telegraf 有效地摄取实时发电机数据,将其转换为结构化格式,以便在 InfluxDB 中进行存储和分析。
最终想法
与往常一样,开始使用 InfluxDB v3 Cloud 和 Telegraf。如果您需要帮助,请在我们的社区网站或 Slack 频道上联系我们。如果您也在使用 MQTT、Telegraf 或 InfluxDB 进行物联网项目,我们很乐意听到您的消息。我鼓励您与我们分享您的用例并获得一件连帽衫!