MQTT 消费者 Telegraf 输入插件
免费使用此 InfluxDB 集成MQTT 是一种机器对机器(M2M)/物联网通信协议,设计为轻量级的发布/订阅消息工具。MQTT 适用于需要小资源占用和/或网络带宽有限的情况。MQTT 由 IBM 的 Andy Stanford-Clark 博士和 Arcom 的 Arlen Nipper 于 1999 年创建。当时,IBM 正在与一家石油和天然气公司合作,需要从偏远地区的石油管道中提取数据,这需要一种新的协议来满足这些需求。结果是 MQTT。
IBM 内部使用了 MQTT,直到他们在 2010 年发布了 MQTT 3.1 规范,允许其他人创建自己的 MQTT 实现。开发人员很快意识到 MQTT 对于物联网相关用例的价值,其采用率迅速增长,创建了众多开源代理和客户端库。2013 年,IBM 将 MQTT 提交给 OASIS 以进行维护和标准化。
为什么要使用 MQTT 消费者 Telegraf 输入插件?
Telegraf 使您能够轻松收集 MQTT 数据,而无需编写自定义脚本来连接到您的 MQTT 代理。Telegraf 还可以轻松处理和转换在发送到数据库进行存储之前交付的 MQTT 消息。使用 Telegraf 和 MQTT 的另一个好处是 40 多个输出插件。这意味着您可以将 MQTT 数据发送到几乎任何数据存储,甚至同时发送到多个数据存储。
如何使用 MQTT 消费者 Telegraf 输入插件
MQTT 消费者 Telegraf 输入插件易于设置和快速运行。您只需配置 Telegraf YAML 配置文件。以下是包含所有可用配置选项的基本示例
# Read metrics from MQTT topic(s) [[inputs.mqtt_consumer]] ## Broker URLs for the MQTT server or cluster. To connect to multiple ## clusters or standalone servers, use a separate plugin instance. ## example: servers = ["tcp://127.0.0.1:1883"] ## servers = ["ssl://127.0.0.1:1883"] ## servers = ["ws://127.0.0.1:1883"] servers = ["tcp://127.0.0.1:1883"] ## Topics that will be subscribed to. topics = [ "telegraf/host01/cpu", "telegraf/+/mem", "sensors/#", ] ## The message topic will be stored in a tag specified by this value. If set ## to the empty string no topic tag will be created. # topic_tag = "topic" ## QoS policy for messages ## 0 = at most once ## 1 = at least once ## 2 = exactly once ## ## When using a QoS of 1 or 2, you should enable persistent_session to allow ## resuming unacknowledged messages. # qos = 0 ## Connection timeout for initial connection in seconds # connection_timeout = "30s" ## Maximum messages to read from the broker that have not been written by an ## output. For best throughput set based on the number of metrics within ## each message and the size of the output's metric_batch_size. ## ## For example, if each message from the queue contains 10 metrics and the ## output metric_batch_size is 1000, setting this to 100 will ensure that a ## full batch is collected and the write is triggered immediately without ## waiting until the next flush_interval. # max_undelivered_messages = 1000 ## Persistent session disables clearing of the client session on connection. ## In order for this option to work you must also set client_id to identify ## the client. To receive messages that arrived while the client is offline, ## also set the qos option to 1 or 2 and don't forget to also set the QoS when ## publishing. # persistent_session = false ## If unset, a random client ID will be generated. # client_id = "" ## Username and password to connect MQTT server. # username = "telegraf" # password = "metricsmetricsmetricsmetrics" ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" ## Enable extracting tag values from MQTT topics ## _ denotes an ignored entry in the topic path # [[inputs.mqtt_consumer.topic_parsing]] # topic = "" # measurement = "" # tags = "" # fields = "" ## Value supported is int, float, unit # [[inputs.mqtt_consumer.topic.types]] # key = type
示例输出
您的 MQTT 消息将被转换为类似于以下行协议
mqtt_consumer,host=pop-os,topic=telegraf/host01/cpu value=45i 1653579140440951943 mqtt_consumer,host=pop-os,topic=telegraf/host01/cpu value=100i 1653579153147395661
主题解析
默认情况下,您的 MQTT 消息将仅存储 MQTT 主题作为标签值。如果您想要对附加字段进行标记和查询,可以使用 Telegraf 解析 MQTT 消息并创建标签。以下是一个示例
[[inputs.mqtt_consumer]] ## Broker URLs for the MQTT server or cluster. To connect to multiple ## clusters or standalone servers, use a separate plugin instance. ## example: servers = ["tcp://127.0.0.1:1883"] ## servers = ["ssl://127.0.0.1:1883"] ## servers = ["ws://127.0.0.1:1883"] servers = ["tcp://127.0.0.1:1883"] ## Topics that will be subscribed to. topics = [ "telegraf/+/cpu/23", ] ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "value" data_type = "float" [[inputs.mqtt_consumer.topic_parsing]] topic = "telegraf/one/cpu/23" measurement = "_/_/measurement/_" tags = "tag/_/_/_" fields = "_/_/_/test" [inputs.mqtt_consumer.topic_parsing.types] test = "int"
使用 Telegraf 处理器插件
Telegraf 提供了许多处理器插件,可以在将数据存储在 InfluxDB 之前进一步转换您的数据。以下是一个使用 Pivot 处理器插件的示例,将单个值度量转换为多个度量
示例主题
/sensors/CLE/v1/device5/temp /sensors/CLE/v1/device5/rpm /sensors/CLE/v1/device5/ph /sensors/CLE/v1/device5/spin
这将默认产生以下指标
sensors,site=CLE,version=v1,device_name=device5,field=temp value=390 sensors,site=CLE,version=v1,device_name=device5,field=rpm value=45.0 sensors,site=CLE,version=v1,device_name=device5,field=ph value=1.45
如果您使用以下配置与 Pivot 处理器一起使用
[[inputs.mqtt_consumer]] .... topics = "/sensors/#" [[inputs.mqtt_consumer.topic_parsing]] measurement = "/measurement/_/_/_/_" tags = "/_/site/version/device_name/field" [[processors.pivot]] tag_key = "field" value_key = "value"
输出将是以下内容
sensors,site=CLE,version=v1,device_name=device5 temp=390,rpm=45.0,ph=1.45