MQTT Consumer Telegraf Input Plugin
Use This InfluxDB Integration for FreeMQTT is a machine-to-machine (M2M)/Internet of Things communication protocol designed as a lightweight publish/subscribe messaging tool. MQTT is useful for connections where a small resource footprint is required and/or network bandwidth is at a premium. MQTT was created by Dr. Andy Stanford-Clark of IBM, and Arlen Nipper of Arcom in 1999. At the time, IBM was working with an oil and gas company that needed to pull data from oil pipelines in remote areas, which required a new protocol to fit those requirements. The result was MQTT.
MQTT was used internally by IBM until they released the MQTT 3.1 specification in 2010, allowing others to create their own MQTT implementations. Developers quickly realized the value MQTT had for IoT-related use cases and adoption grew quickly, with numerous open source brokers and client libraries being created. In 2013 IBM submitted MQTT to OASIS to be maintained and standardized.
Why use the MQTT Consumer Telegraf Input Plugin?
Telegraf makes it easy to collect your MQTT data without having to write a custom script to connect to your MQTT broker. Telegraf also makes it simple to process and transform MQTT messages as they are delivered, before sending them to a database for storage. Another benefit of using Telegraf with MQTT is the 40+ output plugins. This means you can send your MQTT data to almost any data store and even multiple data stores at the same time.
How to use the MQTT Consumer Telegraf Input Plugin
The MQTT Consumer Telegraf Input Plugin is easy to set up and get running quickly. All you need to do is configure your Telegraf YAML configuration file. Here is a basic example with all the available configuration options:
# Read metrics from MQTT topic(s) [[inputs.mqtt_consumer]] ## Broker URLs for the MQTT server or cluster. To connect to multiple ## clusters or standalone servers, use a separate plugin instance. ## example: servers = ["tcp://127.0.0.1:1883"] ## servers = ["ssl://127.0.0.1:1883"] ## servers = ["ws://127.0.0.1:1883"] servers = ["tcp://127.0.0.1:1883"] ## Topics that will be subscribed to. topics = [ "telegraf/host01/cpu", "telegraf/+/mem", "sensors/#", ] ## The message topic will be stored in a tag specified by this value. If set ## to the empty string no topic tag will be created. # topic_tag = "topic" ## QoS policy for messages ## 0 = at most once ## 1 = at least once ## 2 = exactly once ## ## When using a QoS of 1 or 2, you should enable persistent_session to allow ## resuming unacknowledged messages. # qos = 0 ## Connection timeout for initial connection in seconds # connection_timeout = "30s" ## Maximum messages to read from the broker that have not been written by an ## output. For best throughput set based on the number of metrics within ## each message and the size of the output's metric_batch_size. ## ## For example, if each message from the queue contains 10 metrics and the ## output metric_batch_size is 1000, setting this to 100 will ensure that a ## full batch is collected and the write is triggered immediately without ## waiting until the next flush_interval. # max_undelivered_messages = 1000 ## Persistent session disables clearing of the client session on connection. ## In order for this option to work you must also set client_id to identify ## the client. To receive messages that arrived while the client is offline, ## also set the qos option to 1 or 2 and don't forget to also set the QoS when ## publishing. # persistent_session = false ## If unset, a random client ID will be generated. # client_id = "" ## Username and password to connect MQTT server. # username = "telegraf" # password = "metricsmetricsmetricsmetrics" ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" ## Enable extracting tag values from MQTT topics ## _ denotes an ignored entry in the topic path # [[inputs.mqtt_consumer.topic_parsing]] # topic = "" # measurement = "" # tags = "" # fields = "" ## Value supported is int, float, unit # [[inputs.mqtt_consumer.topic.types]] # key = type
Example output
Your MQTT messages will be converted into line protocol that looks something like this:
mqtt_consumer,host=pop-os,topic=telegraf/host01/cpu value=45i 1653579140440951943 mqtt_consumer,host=pop-os,topic=telegraf/host01/cpu value=100i 1653579153147395661
Topic parsing
By default your MQTT messages will be stored with only the MQTT topic as a tag value. If you want to tag and query on additional fields, you can use Telegraf to parse the MQTT message and create tags as well. Here is an example:
[[inputs.mqtt_consumer]] ## Broker URLs for the MQTT server or cluster. To connect to multiple ## clusters or standalone servers, use a separate plugin instance. ## example: servers = ["tcp://127.0.0.1:1883"] ## servers = ["ssl://127.0.0.1:1883"] ## servers = ["ws://127.0.0.1:1883"] servers = ["tcp://127.0.0.1:1883"] ## Topics that will be subscribed to. topics = [ "telegraf/+/cpu/23", ] ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "value" data_type = "float" [[inputs.mqtt_consumer.topic_parsing]] topic = "telegraf/one/cpu/23" measurement = "_/_/measurement/_" tags = "tag/_/_/_" fields = "_/_/_/test" [inputs.mqtt_consumer.topic_parsing.types] test = "int"
Using Telegraf processor plugins
Telegraf provides a number of processor plugins that can be used to further transform your data before storing it inside InfluxDB. Here is an example of using the Pivot Processor Plugin to turn a single value metric into multiple metrics:
Example topics:
/sensors/CLE/v1/device5/temp /sensors/CLE/v1/device5/rpm /sensors/CLE/v1/device5/ph /sensors/CLE/v1/device5/spin
Which would produce the following metrics by default:
sensors,site=CLE,version=v1,device_name=device5,field=temp value=390 sensors,site=CLE,version=v1,device_name=device5,field=rpm value=45.0 sensors,site=CLE,version=v1,device_name=device5,field=ph value=1.45
If you use the following configuration with the Pivot Processor:
[[inputs.mqtt_consumer]] .... topics = "/sensors/#" [[inputs.mqtt_consumer.topic_parsing]] measurement = "/measurement/_/_/_/_" tags = "/_/site/version/device_name/field" [[processors.pivot]] tag_key = "field" value_key = "value"
The output will be the following:
sensors,site=CLE,version=v1,device_name=device5 temp=390,rpm=45.0,ph=1.45