目录
MQTT 是一种机器对机器 (M2M)/物联网通信协议,设计为轻量级发布/订阅消息传递工具。MQTT 适用于需要小资源占用空间和/或网络带宽受限的连接。MQTT 由 IBM 的 Andy Stanford-Clark 博士和 Arcom 的 Arlen Nipper 于 1999 年创建。当时,IBM 正在与一家石油和天然气公司合作,该公司需要从偏远地区的石油管道中提取数据,这需要一种新的协议来满足这些要求。结果就是 MQTT。
MQTT 在 IBM 内部使用,直到他们在 2010 年发布了 MQTT 3.1 规范,允许其他人创建自己的 MQTT 实现。开发人员很快意识到 MQTT 对于物联网相关用例的价值,并且采用迅速增长,创建了许多开源代理和客户端库。2013 年,IBM 将 MQTT 提交给 OASIS 进行维护和标准化。
为什么使用 MQTT 消费者 Telegraf 输入插件?
Telegraf 使您可以轻松收集 MQTT 数据,而无需编写自定义脚本来连接到 MQTT 代理。Telegraf 还使处理和转换已传递的 MQTT 消息变得简单,然后再将其发送到数据库进行存储。将 Telegraf 与 MQTT 结合使用的另一个好处是 40 多个输出插件。这意味着您可以将 MQTT 数据发送到几乎任何数据存储,甚至同时发送到多个数据存储。
如何使用 MQTT 消费者 Telegraf 输入插件
MQTT 消费者 Telegraf 输入插件易于设置且可以快速运行。您只需配置 Telegraf YAML 配置文件即可。这是一个包含所有可用配置选项的基本示例
# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
## Broker URLs for the MQTT server or cluster. To connect to multiple
## clusters or standalone servers, use a separate plugin instance.
## example: servers = ["tcp://:1883"]
## servers = ["ssl://:1883"]
## servers = ["ws://:1883"]
servers = ["tcp://127.0.0.1:1883"]
## Topics that will be subscribed to.
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]
## The message topic will be stored in a tag specified by this value. If set
## to the empty string no topic tag will be created.
# topic_tag = "topic"
## QoS policy for messages
## 0 = at most once
## 1 = at least once
## 2 = exactly once
##
## When using a QoS of 1 or 2, you should enable persistent_session to allow
## resuming unacknowledged messages.
# qos = 0
## Connection timeout for initial connection in seconds
# connection_timeout = "30s"
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Persistent session disables clearing of the client session on connection.
## In order for this option to work you must also set client_id to identify
## the client. To receive messages that arrived while the client is offline,
## also set the qos option to 1 or 2 and don't forget to also set the QoS when
## publishing.
# persistent_session = false
## If unset, a random client ID will be generated.
# client_id = ""
## Username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Enable extracting tag values from MQTT topics
## _ denotes an ignored entry in the topic path
# [[inputs.mqtt_consumer.topic_parsing]]
# topic = ""
# measurement = ""
# tags = ""
# fields = ""
## Value supported is int, float, unit
# [[inputs.mqtt_consumer.topic.types]]
# key = type
示例输出
您的 MQTT 消息将被转换为如下所示的行协议
mqtt_consumer,host=pop-os,topic=telegraf/host01/cpu value=45i 1653579140440951943 mqtt_consumer,host=pop-os,topic=telegraf/host01/cpu value=100i 1653579153147395661
主题解析
默认情况下,您的 MQTT 消息将仅以 MQTT 主题作为标签值存储。如果要在其他字段上标记和查询,可以使用 Telegraf 解析 MQTT 消息并创建标签。这是一个例子
[[inputs.mqtt_consumer]]
## Broker URLs for the MQTT server or cluster. To connect to multiple
## clusters or standalone servers, use a separate plugin instance.
## example: servers = ["tcp://:1883"]
## servers = ["ssl://:1883"]
## servers = ["ws://:1883"]
servers = ["tcp://127.0.0.1:1883"]
## Topics that will be subscribed to.
topics = [
"telegraf/+/cpu/23",
]
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "value"
data_type = "float"
[[inputs.mqtt_consumer.topic_parsing]]
topic = "telegraf/one/cpu/23"
measurement = "_/_/measurement/_"
tags = "tag/_/_/_"
fields = "_/_/_/test"
[inputs.mqtt_consumer.topic_parsing.types]
test = "int"
使用 Telegraf 处理器插件
Telegraf 提供了许多处理器插件,可用于在将数据存储在 InfluxDB 中之前进一步转换数据。这是一个使用 Pivot 处理器插件将单个值指标转换为多个指标的示例
示例主题
/sensors/CLE/v1/device5/temp /sensors/CLE/v1/device5/rpm /sensors/CLE/v1/device5/ph /sensors/CLE/v1/device5/spin
默认情况下,这将产生以下指标
sensors,site=CLE,version=v1,device_name=device5,field=temp value=390 sensors,site=CLE,version=v1,device_name=device5,field=rpm value=45.0 sensors,site=CLE,version=v1,device_name=device5,field=ph value=1.45
如果您将以下配置与 Pivot 处理器一起使用
[[inputs.mqtt_consumer]]
....
topics = "/sensors/#"
[[inputs.mqtt_consumer.topic_parsing]]
measurement = "/measurement/_/_/_/_"
tags = "/_/site/version/device_name/field"
[[processors.pivot]]
tag_key = "field"
value_key = "value"
输出将如下所示
sensors,site=CLE,version=v1,device_name=device5 temp=390,rpm=45.0,ph=1.45