Python MQTT 教程:使用 InfluxDB 存储 IoT 指标
作者:社区 / 产品, 用例, 开发者
2022 年 5 月 5 日
导航至
本文由 Alexandre Couëdelo 撰写,最初发表于 The New Stack。向下滚动查看作者简介和照片。
MQTT 是一种标准的 messaging 协议,用于 物联网 (IoT),因为它只需要最少的资源,并且可以由连接设备中的小型微控制器执行。
IoT 设备对这种轻量级协议有真正的需求,因为它保证了快速可靠的通信,同时硬件要求最低,从而降低了功耗和制造成本。
像智能温度传感器这样的 IoT 设备定期通过互联网传输信息,但在您可以从该测量中推断出任何有意义的信息之前,您需要将其存储在合适的数据库中。智能传感器测量并不复杂,但它们高度依赖于时间常数 - 测量发生的时间 - 因此, 时间序列数据库,如 InfluxDB,提供了一种高效的选项来存储和操作这类数据。
在本文中,您将学习如何使用 Python 创建一个智能传感器,该传感器使用 MQTT 通过互联网传输测量数据,并将数据存储在 InfluxDB 中。您还将了解 InfluxDB 生态系统,该生态系统提供数据库和 UI 工具,可以可视化和查询您的数据。
什么是 MQTT?
MQTT 是一种基于事件(发布和订阅)的通信协议,专门为使 IoT 设备能够在高延迟和低带宽环境中高效通信而设计。高延迟和低带宽环境通常指蜂窝网络,如 2G 或 3G。网络成本是 运营 IoT 基础设施的主要费用之一。使用像 MQTT 这样的轻量级协议有助于降低 IoT 设备的费用。
MQTT 的核心组件是 broker,它协调 pub/sub(发布和订阅)通信。这个概念用图表可以更好地解释
当传感器(发布者)将其消息发送到主题时,MQTT broker 管理主题;然后 broker 将消息发送回订阅该主题的任何应用程序(消费者)。消费者可以是用户的智能手机或云中的后端应用程序。
最常见的模式是拥有用于原始数据(温度、湿度等)的主题和用于处理数据的主题。后端应用程序将监听 temperature,处理数据,并在相关时发布消息到 temperature analyzed。这就是 InfluxDB 发挥作用的地方;后端应用程序需要将数据存储在某个地方,以便能够获得每日平均温度或比单点测量更相关的温度预测。
IoT 传感器 最流行的用例是资产跟踪(世界各地卡车和货物的位置)、远程区域监控(例如,农场的温度和湿度)、资源优化(能源和用水消耗)以及位置/工作场所分析(污染、噪音和空气质量)。
在本文中,您将通过创建一个伪造的智能温度传感器来构建一个基于远程区域监控用例的示例。
使用 InfluxDB 存储 IoT 指标
本教程分三个部分展开。首先,您创建一个发布 temperaturetopic 的智能传感器。然后您设置 InfluxDB,最后,您创建一个后端应用程序,该应用程序使用来自 temperature 主题的消息并将它们存储在数据库中。
您可以使用此 GitHub 存储库 来学习本教程。
使用 Python 创建智能传感器
首先,您将创建一个简单的 Python 脚本,将数据发送到 MQTT;这代表智能温度传感器。此脚本将是您系统中的 MQTT 发布者。
安装 paho-mqtt 库
pip install paho-mqtt
然后您需要生成随机数据。对于此任务,您将使用 Faker。如果您尚未安装它,则需要安装它
pip install Faker
现在创建您的智能传感器(MQTT 发布者)。在本示例中,您将使用一个名为 mqtt.eclipseprojects.io 的公共测试 MQTT broker,并将随机整数(代表温度)每秒发送到一个名为 temperature
的主题。
创建一个名为 smart_sensor.py
的新 Python 文件,并使用以下代码
"""
MQTT Smart temperature Sensor
"""
import time
import paho.mqtt.client as mqtt
from faker import Faker
# let's connect to the MQTT broker
MQTT_BROKER_URL = "mqtt.eclipseprojects.io"
MQTT_PUBLISH_TOPIC = "temperature"
mqttc = mqtt.Client()
mqttc.connect(MQTT_BROKER_URL)
# Init faker our fake data provider
fake = Faker()
# Infinite loop of fake data sent to the Broker
while True:
temperature = fake.random_int(min=0, max=30)
mqttc.publish(MQTT_PUBLISH_TOPIC, temperature)
print(f"Published new temperature measurement: {temperature}")
time.sleep(1)
现在您可以运行您的脚本
python smart_sensor.py
太棒了!现在您正在向 MQTT broker 发送数据,因此是时候设置 InfluxDB 并创建另一个脚本来订阅您的 temperature 主题并将数据存储在数据库中。
设置 InfluxDB
您可以使用 Docker 来运行 InfluxDB,这非常适合本地开发。但是,InfluxDB 支持多种平台:Linux、macOS、Windows、Docker 和 Kubernetes。您可以在 InfluxDB 的安装页面 上选择最适合您需求的安装方式。您还可以使用 InfluxDB Cloud ,无需在您的机器上安装任何东西即可开始使用。
要开始设置,您需要定义一个 docker-compose.yml
文件,该文件定义以下配置
version: '3.3'
services:
influxdb:
image: influxdb:2.0.7
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: ${INFLUXDB_USERNAME}
DOCKER_INFLUXDB_INIT_PASSWORD: ${INFLUXDB_PASSWORD}
DOCKER_INFLUXDB_INIT_ORG: ${INFLUXDB_ORG}
DOCKER_INFLUXDB_INIT_BUCKET: ${INFLUXDB_BUCKET}
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: ${INFLUXDB_TOKEN}
ports:
- "8086:8086"
您还需要创建一个 .env
文件来定义此 docker-compose.yml
中需要的环境变量
INFLUXDB_USERNAME=admin
INFLUXDB_PASSWORD=admin1234
INFLUXDB_TOKEN=F-QFQpmCL9UkR3qyoXnLkzWj03s6m4eCvYgDl1ePfHBf9ph7yxaSgQ6WN0i9giNgRTfONwVMK1f977r_g71oNQ==
INFLUXDB_URL="http://localhost:8086"
INFLUXDB_ORG=iot
INFLUXDB_BUCKET=temperature
现在,启动 InfluxDB。您应该在您的 docker-compose
命令中使用 --env-file
,以强制 Docker 考虑 .env
docker-compose --env-file .env up
使用本地运行的 InfluxDB 转到 http://localhost:8086
,您应该会进入 InfluxDB UI。您将在您的 .env
文件中找到凭据。正如您所看到的,InfluxDB
不仅仅是一个数据库;它是一个帮助管理和可视化您的数据的生态系统。您稍后将了解更多关于这方面的信息。
现在是时候创建一个 MQTT 消费者,它接收您的温度测量值并将它们存储在 InfluxDB 中。
创建 MQTT 和 InfluxDB 客户端
首先,您需要安装 influxdb-client
pip install 'influxdb-client[ciso]'
将所有常量保存在一个地方也是一个好主意。它可以防止您重复自己并犯错误。由于您已经将最重要的一个存储在 .env
中,您将需要 dotenv
来在您的脚本中加载它们
pip install python-dotenv
现在您需要从 MQTT 逻辑开始。订阅主题需要两个回调函数:on_connect
和 on_message
。
当您的应用程序成功连接到 broker 时,会调用 on_connect
。您将使用此函数订阅主题 temperature
。因此,每当您的智能传感器在该主题上发布消息时,都会调用 on_message
函数。您将使用 on_message
回调将温度测量值发送到 InfluxDB。
使用下面的代码创建一个名为 influxdb_consumer.py
的新 Python 文件
"""
MQTT subscriber - Listen to a topic and sends data to InfluxDB
"""
import os
from dotenv import load_dotenv
import paho.mqtt.client as mqtt
load_dotenv() # take environment variables from .env.
# InfluxDB config
# TODO
# MQTT broker config
MQTT_BROKER_URL = "mqtt.eclipseprojects.io"
MQTT_PUBLISH_TOPIC = "temperature"
mqttc = mqtt.Client()
mqttc.connect(MQTT_BROKER_URL)
def on_connect(client, userdata, flags, rc):
""" The callback for when the client connects to the broker."""
print("Connected with result code "+str(rc))
# Subscribe to a topic
client.subscribe(MQTT_PUBLISH_TOPIC)
def on_message(client, userdata, msg):
""" The callback for when a PUBLISH message is received from the server."""
print(msg.topic+" "+str(msg.payload))
## InfluxDB logic
# TODO
## MQTT logic - Register callbacks and start MQTT client
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.loop_forever()
InfluxDB 相关代码仍然在上面的示例中缺失,但在您执行此操作之前,您应该测试您的系统,以查看智能传感器发送的消息是否被消费者接收。确保 smart_sensor.py
仍在运行,然后运行 influxdb_consumer.py
python influxdb_consumer.py
您应该看到温度测量值正在传入
现在您已经能够从 broker 接收消息,将它们存储在 InfluxDB 中。
接下来,您需要配置 InfluxDB 客户端。您将使用预配置的 INFLUXDB_TOKEN
,这对于测试目的很方便,但您也可以通过 UI 创建新令牌。您的 InfluxDB 实例应该仍在运行。
返回到 UI (http://localhost:8086) 并生成一个新的身份验证令牌;然后单击 Data。之后,在 Client Libraries 中,您需要选择 Python。此部分允许您创建身份验证令牌
import os
from dotenv import load_dotenv
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import ASYNCHRONOUS
import paho.mqtt.client as mqtt
load_dotenv() # take environment variables from .env.
# InfluxDB config
BUCKET = os.getenv('INFLUXDB_BUCKET')
client = InfluxDBClient(url=os.getenv('INFLUXDB_URL'),
token=os.getenv('INFLUXDB_TOKEN'), org=os.getenv('INFLUXDB_ORG'))
write_api = client.write_api()
关于 InfluxDB,一个重要的知识点是字段和标签之间的区别。两者都是键值对,但标签充当您记录的索引。在本例中,field
是测量值,tag
可以是过滤后的数据,例如,按 location
。InfluxDB 的其他关键概念在此页面 上定义。
现在更新 on_message
回调,使其成为将测量值写入 InfluxDB 的代码
def on_message(client, userdata, msg):
""" The callback for when a PUBLISH message is received from the server."""
print(msg.topic+" "+str(msg.payload))
# We received bytes we need to convert into something usable
measurement = int(msg.payload)
## InfluxDB logic
point = Point(MQTT_PUBLISH_TOPIC).tag("location", "New York").field("temperature", measurement )
write_api.write(bucket=BUCKET, record=point)
确保 influxdb_consumer.py
和 smart_sensor.py
仍在运行。如果它们正在运行,则新的数据点应每秒添加到数据库中,以便您可以在 InfluxDB UI 中可视化您的数据。
在 InfluxDB Web UI 中可视化数据
转到 http://localhost:8086 并单击 Data;然后选择 Buckets。您应该看到您的 temperature bucket
选择 temperature bucket。现在您应该已进入 Data Explorer,您可以在其中查询您的数据并将其可视化
这里的一个不错的功能是,一旦您创建了满足您需求的可视化,您可以单击“另存为”,并将其添加到您选择的仪表板
感谢 UI,您的应用程序不需要任何第三方可视化工具和仪表板。
结论
现在您知道如何使用 Python 为 MQTT 创建发布者和消费者。不仅如此,您还可以将您的智能传感器测量值存储在时间序列数据库中,并实时可视化它们。
InfluxDB 数据库通过提供快速查询和类型相关数据的聚合,在操作 IoT 数据方面特别高效。它还附带强大的 UI 的额外好处,您可以在其中可视化您的数据并在同一位置创建仪表板。
关于作者
Alexandre 是一位复杂系统工程和管理专家。自从他开始职业生涯,通过为加拿大一家领先的金融机构的数字化转型做出贡献以来,他就一直在拥抱 DevOps 文化。他的热情是 DevOps 革命和工业工程。他很喜欢他有足够的事后见之明来获得两全其美的效果。