Python MQTT教程:使用InfluxDB存储IoT指标

导航到

本文由Alexandre Couëdelo撰写,并首次发布在The New Stack。向下滚动以查看作者的简介和照片。

MQTT 是一种标准消息协议,用于物联网 (IoT),因为它需要最少的资源,并且可以由连接设备中的小型微控制器执行。

物联网设备对这种轻量级协议有实际需求,因为它保证了与最小硬件要求下的快速和可靠通信,同时保持能耗和制造成本低。

物联网设备如智能温度传感器会定期通过互联网传输信息,但在从这些测量中推断出任何有意义的 信息之前,您需要将其存储在合适的数据库中。智能传感器的测量并不复杂,但它们高度依赖于时间常数——即测量时的时间——因此,像InfluxDB这样的时间序列数据库提供了存储和操作这种数据的有效选项。

在这篇文章中,您将学习如何使用Python创建一个智能传感器,该传感器通过MQTT将测量结果发送到互联网,并将数据存储在InfluxDB中。您还将了解InfluxDB生态系统,它提供了数据库和UI工具,可以可视化和查询您的数据。

什么是MQTT?

MQTT是一种基于事件(发布和订阅)的通信协议,专门设计用于使物联网设备在低延迟和低带宽环境中高效通信。高延迟和低带宽环境通常指2G或3G等蜂窝网络。网络成本是运营物联网基础设施的主要开支之一。使用轻量级协议如MQTT有助于降低物联网设备的开支。

MQTT的关键组件是代理,它协调发布/订阅(发布和订阅)通信。这个概念可以用图表更好地解释

MQTT broker architectural diagram courtesy of Alexandre Couëdelo

MQTT代理架构图由Alexandre Couëdelo提供

MQTT代理管理主题,当传感器(发布者)将其消息发送到主题时;然后代理将消息发送回任何订阅了该主题的应用程序(消费者)。消费者可以是用户的智能手机或云中的后端应用程序。

最常见的模式是为原始数据(温度、湿度等)和经过处理的数据创建主题。后端应用程序将监听温度数据,处理数据,并在相关时向温度分析主题发布消息。这正是InfluxDB发挥作用的地方;后端应用程序需要将数据存储在某个地方,以便能够获取每日平均温度或比单点测量更相关的温度预报。

物联网传感器的最常见用途包括资产跟踪(全球卡车和货物的位置)、远程区域监控(例如农场中的温度和湿度)、资源优化(能源和水消耗)以及位置/工作场所分析(污染、噪音和空气质量)。

在这篇文章中,您将通过创建一个模拟智能温度传感器来构建一个基于远程区域监控用例的示例。

使用InfluxDB存储物联网指标

本教程分为三部分。首先,您将创建一个智能传感器,该传感器发布温度主题。然后您设置InfluxDB,最后创建一个后端应用程序,该应用程序从温度主题消费消息并将其存储在数据库中。

您可以使用此GitHub存储库来跟随本教程。

使用Python创建智能传感器

首先,您将创建一个简单的Python脚本来将数据发送到MQTT;这代表智能温度传感器。此脚本将是您系统中的MQTT发布者。

安装paho-mqtt库

pip install paho-mqtt

然后您需要生成随机数据。为此任务,您将使用Faker。如果您还没有安装它,您需要安装它

pip install Faker

现在创建您的智能传感器(MQTT发布者)。在这个例子中,您将使用一个名为 mqtt.eclipseprojects.io 的公共测试MQTT代理,并且每秒发送代表温度的随机整数到名为 temperature 的主题。

创建一个新的Python文件,命名为 smart_sensor.py ,并使用以下代码

"""
MQTT Smart temperature Sensor
"""

import time

import paho.mqtt.client as mqtt
from faker import Faker

# let's connect to the MQTT broker
MQTT_BROKER_URL    = "mqtt.eclipseprojects.io"
MQTT_PUBLISH_TOPIC = "temperature"

mqttc = mqtt.Client()
mqttc.connect(MQTT_BROKER_URL)

# Init faker our fake data provider
fake = Faker()

# Infinite loop of fake data sent to the Broker
while True:
    temperature = fake.random_int(min=0, max=30)
    mqttc.publish(MQTT_PUBLISH_TOPIC, temperature)
    print(f"Published new temperature measurement: {temperature}")
    time.sleep(1)

现在您可以运行您的脚本

python smart_sensor.py

太好了!现在您正在向MQTT代理发送数据,所以是时候设置InfluxDB并创建另一个脚本来订阅您的温度主题并将数据存储在数据库中了。

Smart temperature sensor running

智能温度传感器正在运行

设置InfluxDB

您可以使用Docker运行InfluxDB,它非常适合本地开发。但是,InfluxDB支持许多平台:Linux、macOS、Windows、Docker和Kubernetes。您可以在InfluxDB安装页面上选择最适合您需求的安装方式。您也可以使用InfluxDB Cloud开始使用,而无需在您的机器上安装任何东西。

要开始设置,您需要定义一个docker-compose.yml文件,该文件定义了以下配置

version: '3.3'

services:
    influxdb:
        image: influxdb:2.0.7
        environment:
            DOCKER_INFLUXDB_INIT_MODE: setup
            DOCKER_INFLUXDB_INIT_USERNAME: ${INFLUXDB_USERNAME}
            DOCKER_INFLUXDB_INIT_PASSWORD: ${INFLUXDB_PASSWORD}
            DOCKER_INFLUXDB_INIT_ORG: ${INFLUXDB_ORG}
            DOCKER_INFLUXDB_INIT_BUCKET: ${INFLUXDB_BUCKET}
            DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: ${INFLUXDB_TOKEN}
        ports:
        - "8086:8086"

您还需要创建一个.env文件来定义在上述docker-compose.yml中所需的环境变量

INFLUXDB_USERNAME=admin
INFLUXDB_PASSWORD=admin1234
INFLUXDB_TOKEN=F-QFQpmCL9UkR3qyoXnLkzWj03s6m4eCvYgDl1ePfHBf9ph7yxaSgQ6WN0i9giNgRTfONwVMK1f977r_g71oNQ==
INFLUXDB_URL="https://127.0.0.1:8086"
INFLUXDB_ORG=iot
INFLUXDB_BUCKET=temperature

现在,启动InfluxDB。您应该在docker-compose命令中使用--env-file来强制Docker考虑.env

docker-compose --env-file .env up

当InfluxDB在本地上运行时,请访问https://127.0.0.1:8086,您应该会看到InfluxDB的UI。您可以在.env文件中找到凭据。如您所见,InfluxDB不仅仅是一个数据库;它是一个生态系统,有助于管理和可视化您的数据。您稍后会了解更多关于它的信息。

现在是时候创建一个MQTT消费者,它接收您的温度测量数据并将其存储在InfluxDB中。

创建MQTT和InfluxDB客户端

首先,您需要安装influxdb-client

pip install 'influxdb-client[ciso]'

将所有常量放在一个地方也是一个好主意。它可以防止您重复自己并出错。由于您已经将最重要的一个存储在.env中,您将需要dotenv来在脚本中加载它们

pip install python-dotenv

现在,您需要从MQTT逻辑开始。订阅一个主题需要两个回调函数:on_connecton_message

on_connect在您的应用程序成功连接到代理时被调用。您将使用此函数来订阅主题temperature。结果,每当您的智能传感器在该主题上发布消息时,on_message函数将被调用。您将使用on_message回调将温度测量数据发送到InfluxDB。

使用以下代码创建一个新的Python文件,名为influxdb_consumer.py

"""
MQTT subscriber - Listen to a topic and sends data to InfluxDB
"""

import os
from dotenv import load_dotenv
import paho.mqtt.client as mqtt

load_dotenv()  # take environment variables from .env.

# InfluxDB config
# TODO

# MQTT broker config
MQTT_BROKER_URL    = "mqtt.eclipseprojects.io"
MQTT_PUBLISH_TOPIC = "temperature"

mqttc = mqtt.Client()
mqttc.connect(MQTT_BROKER_URL)

def on_connect(client, userdata, flags, rc):
    """ The callback for when the client connects to the broker."""
    print("Connected with result code "+str(rc))

    # Subscribe to a topic
    client.subscribe(MQTT_PUBLISH_TOPIC)

def on_message(client, userdata, msg):
    """ The callback for when a PUBLISH message is received from the server."""
    print(msg.topic+" "+str(msg.payload))

    ## InfluxDB logic
		# TODO

## MQTT logic - Register callbacks and start MQTT client
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.loop_forever()

上述示例中缺少与InfluxDB相关的代码,但在您添加之前,您应该测试您的系统,以确保智能传感器发送的消息被消费者接收。确保smart_sensor.py仍在运行,然后运行influxdb_consumer.py

python influxdb_consumer.py

您应该看到温度测量数据正在到来

Backend application consuming messages

后端应用程序消费消息

现在,您能够从代理接收消息,并将它们存储在InfluxDB中。

接下来,您需要配置InfluxDB客户端。您将使用预配置的INFLUXDB_TOKEN,这对于测试目的来说很方便,但您也可以通过UI创建一个新的令牌。您的InfluxDB实例仍然应该运行。

返回到UI(https://127.0.0.1:8086),生成一个新的身份验证令牌;然后点击数据。之后,在客户端库中,您需要选择Python。本节允许您创建一个身份验证令牌

Generate token for InfluxDB

为InfluxDB生成令牌
import os
from dotenv import load_dotenv
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import ASYNCHRONOUS
import paho.mqtt.client as mqtt

load_dotenv()  # take environment variables from .env.

# InfluxDB config
BUCKET = os.getenv('INFLUXDB_BUCKET')
client = InfluxDBClient(url=os.getenv('INFLUXDB_URL'),
                token=os.getenv('INFLUXDB_TOKEN'), org=os.getenv('INFLUXDB_ORG'))
write_api = client.write_api()

了解 InfluxDB 的重要事项之一是区分字段和标签。两者都是键值对,但标签作为记录的索引。在这种情况下,field 是测量值,而 tag 可以是过滤数据,例如,通过 location。InfluxDB 的其他关键概念定义在此 页面 上。

现在更新 on_message 回调,使其能够将测量值写入 InfluxDB

def on_message(client, userdata, msg):
    """ The callback for when a PUBLISH message is received from the server."""
    print(msg.topic+" "+str(msg.payload))

    # We received bytes we need to convert into something usable
    measurement = int(msg.payload)

    ## InfluxDB logic
    point = Point(MQTT_PUBLISH_TOPIC).tag("location", "New York").field("temperature", measurement )
    write_api.write(bucket=BUCKET, record=point)

确保 influxdb_consumer.pysmart_sensor.py 仍在运行。如果它们正在运行,新的数据点应该每秒添加到数据库中,以便您可以在 InfluxDB UI 中可视化您的数据。

在 InfluxDB Web UI 中可视化数据

转到 https://127.0.0.1:8086 并点击数据;然后选择存储桶。您应该看到您的温度存储桶

InfluxDB UI: list buckets

InfluxDB UI:列出存储桶

选择温度存储桶。现在您应该已经到达数据探索器,在这里您可以查询和可视化您的数据

InfluxDB UI: Data Explorer

InfluxDB UI:数据探索器

这里的一个不错特性是,一旦您创建了一个满足您需求的可视化,您就可以点击“另存为”并将其添加到您选择的仪表板中

InfluxDB UI: Save to dashboard

InfluxDB UI:保存到仪表板

多亏了 UI,您不需要任何第三方可视化工具和仪表板来为您的应用程序。

InfluxDB UI: Dashboard

InfluxDB UI:仪表板

结论

现在您知道了如何使用 Python 创建 MQTT 的发布者和消费者。不仅如此,您还可以将您的智能传感器测量值存储在时间序列数据库中,并实时可视化它们。

InfluxDB 数据库特别适用于通过提供快速查询和类型相关数据的聚合来操作 IoT 数据。它还附带了一个好处,即拥有一个功能强大的 UI,您可以在同一位置可视化您的数据并创建仪表板。

关于作者

Alexandre Couëdelo

Alexandre 是一个复杂系统工程与管理专家。他从职业生涯开始就拥抱 DevOps 文化,为加拿大一家领先金融机构的数字化转型做出了贡献。他对 DevOps 革命和工业工程充满热情。他喜欢自己有足够的前瞻性,以充分利用两个世界。