使用 Telegraf 进行 MQTT 主题和负载解析
作者:Sam Dillard / 用例, 产品, 开发者
2021 年 12 月 20 日
导航至
请注意,这篇文章篇幅较长……但我希望它能提供充分的信息!这篇文章是关于 Telegraf 作为 MQTT 消息的消费者,并将其写入 InfluxDB。如果您对 Telegraf 感兴趣但不熟悉,您可以在此处查看文档。不确定 Telegraf 是否符合您的需求?我在这篇博客文章的“优化写入”部分中对其进行了说明。
了解 Line Protocol,InfluxDB 的默认接受格式,也可能会有所帮助。
在这篇文章中,我想通过以下方式传达 Telegraf 在通过 MQTT 收集数据方面的有效性:a) 深入研究最新版本的功能,以及 b) 讨论解析方法。正如许多读者所知,MQTT 消息负载实际上可以是您想要的任何内容,因此,对于许多主题的订阅者来说,能够处理您发送的任何内容至关重要。
Telegraf 可以订阅任意数量的主题,同时让用户控制如何塑造传入的数据。控制数据形状允许用户写出不仅与目标数据存储兼容,而且还经过优化的数据。
此上下文中的数据形状控制有两种形式,每种形式都将在本文中单独介绍
- 主题解析
- 负载解析
内务处理
- 以下所有 Telegraf 行为均假定标准
[Agent]
级别配置,其中omit_hostname = true
- 示例中省略了时间戳
主题解析
在 Telegraf 的最新版本 (v1.21) 中,MQTT Consumer 输入插件添加了动态解析主题信息并将该信息附加到负载的功能。这意味着 Telegraf 运营商可以使用 MQTT 消息负载和传入主题中的数据。
下图示例说明了四个不同的 MQTT 消息数据包在 InfluxDB 数据模型中以相同方式布局。
通过插入 Telegraf 可以实现这一点,本文的其余部分将解释如何实现。
我将首先通过示例解释此功能解决的问题。假设我有一个基本传感器,它向包含其唯一 ID 的主题发出单个温度值;主题如:sensors/00191
。MQTT 消息将发送到此主题,其中包含单个原始值——对于此示例,我们假设浮点数样本 10.0
。
sensors/00191
中此信息的任何第三方订阅者都不知道如何处理此温度值
-
它本身不是有效的 LP,因此 Telegraf 将拒绝它。
-
如果您使用
data_format="value"
和data_type="float"
帮助 Telegraf,它将生成如下 LP
mqtt_consumer,topic=sensors/00191 value=10.0 <timestamp>
(这是未给出测量名称的输入插件的默认行为)。
请注意,Telegraf 默认情况下将整个主题添加为 LP 标签。这是 Telegraf 的帮助功能,因此您可以保留任何可能必要的上下文。但是,将整个主题作为单个字符串包含在内,稍后需要进行一些正则表达式才能使其有用。它存在的原因是,如果没有它
- 这个
value
字段对任何/任何消耗数据的人来说都没有意义。 - 如果每个传感器都以这种方式发出数据并在同一时间写入,则会覆盖最后一个传感器的数据(需要标签来区分
value
等字段)。
在这种情况下,所需的信息在主题中。如果供应商对其传感器进行编程以这种方式发出数据,或者用户正在使用高度受限的资源并且需要尽可能保持其负载较小,则可能会发生这种情况。
输入主题解析!Telegraf 通过允许用户指定他们想要引入哪些主题部分作为度量、标签或字段来解决此问题,而无需将整个主题存储为标签。在此版本之前,这将通过 Regex Processor Plugin 完成,但新体验更简单、更方便。
对 mqtt_consumer 输入插件进行简单的配置更改将为我们提供所需的内容
[[inputs.mqtt_consumer]]
servers = ["<address:port>"]
topics = ["sensors/#"]
[[inputs.mqtt_consumer.topic_parsing]]
topic = "sensors/+"
measurement = "measurement/_"
tags = "_/sensor_id"
上面的 Telegraf 配置代码片段实例化了一个订阅 sensors/#
的客户端(任何具有父级 sensors
的主题)。如果您熟悉 SNMP 等输入插件或 JSON_v2 等解析器(如果不熟悉,接下来会介绍!),您会识别出由 [[inputs.mqtt_consumer.topic_parsing]]
定义的嵌套 TOML 表。可以有任意数量的此类表,但我们只需要一个。在此表中,您告诉 Telegraf 要匹配哪些主题,并使用后续信息解析到达的这些主题的消息。之后,您告诉 Telegraf 将哪些内容作为度量、标签和/或字段。在本例中,我们只提取度量和单个标签(毕竟没有太多信息要解析)。没有字段存在……但这并非总是如此。
我们使用语法 measurement/_
(用引号括起来)向 Telegraf 发出信号(双关语)主题名称的哪些段将解析为哪些 LP 元素。键映射到主题中各自索引的(按段)值,而下划线映射到与该 Line Protocol 元素无关的段(占位符)。
在上面的示例配置中,我们告诉 Telegraf 第一段将是我们的度量,第二段(最后一段)将是具有键 sensor_id
的标签。该段中的实际信息将填充为该标签键的值。如果我们现在想删除默认的 topic
标签,我们可以在配置中包含 topic_tag = ""
参数,并获得此输出
mqtt_consumer,sensor_id=00191 value=10.0 <timestamp>
像这样的修改在大规模以及更复杂的主题和负载中很重要。它还大大改善了下游的分析体验。
快速说明:最佳(非必要)做法是让字段名称包含有用的信息,因此“value”并不理想。要解决此问题,您可以简单地使用 Pivot Processor(超出本文范围)。
无论如何,说到更复杂的主题和负载,让我们再看两个快速简短的示例
主题:detroit/0f801/42.33/-83.04/front_left/09fse21/rpm
消息:846
我们希望我们的 LP 输出包含所有这些信息。第一段是此车辆所属的城市(站点)。第二个是车辆的 ID。第三个是给定采样时间的纬度。第四个是经度,第五个是轮胎在车辆上的位置,第六个是轮胎的 ID,最后一个是轮胎的 rpm。所有可能重要的信息!
我不会详细介绍哪些段应该包含哪些 LP 元素的原因——为此,您可以参考我链接在顶部的博客文章。对于此示例,我们将使用通用度量,并让 Telegraf 命名它。我们的度量将是 mqtt_consumer
(如有必要,Telegraf 可以在其他插件中重命名它)。每个字符串都将是一个标签,每个数值元素都将是一个字段。最终,我们希望我们生成的 LP 看起来像
mqtt_consumer,site=detroit,vehicle_id=0f801,tire_id=09fse21 lat=42.33,lon=-83.04,rpm=846 <timestamp>
以下相关的 [[inputs.mqtt_consumer]]
配置将使我们达到目标
...snipped...
[[inputs.mqtt_consumer.topic_parsing]]
topic = "+/+/+/+/+/+/+" # all topics with 7 segments
tags = "site/vehicle_id/_/_/orientation/tire_id/_"
fields = "_/_/_/_/_/_/rpm"
就是这样——一个简洁的配置,用于将主题编码的信息从每个遵循 7 段结构的主题拉入 Line Protocol 记录!
负载解析!
解析器插件对于 Telegraf 来说并不新鲜,JSON 解析也不是,但我认为 MQTT 消费主题值得深入研究 JSON 解析。首先,请注意,Telegraf 还提供了其他解析器列表。解析器使用 data_format
参数在插件内部调用。它们的配置略有不同,因为它们各自的格式在语义上有所不同。在本文中,我们将重点关注 JSON 解析。
JSON 解析在 Telegraf 中已经存在很长时间了,但在几个版本前的新版本中进行了改进。尝试向它抛出一些它无法解析的 JSON!
在我深入探讨之前,如果您想要全面了解此升级后的解析器,Telegraf 产品经理 Samantha Wang 的这篇博客文章非常精彩。此外,如果您喜欢使用 Python 方言进行解析,还有另一种方法,我在这里写过。
您可能知道,MQTT 将接受几乎任何类型的负载。JSON 在这种上下文中是最常见的。也就是说,JSON 也可以是几乎任何形状,因此可配置的解析至关重要!
为了连接各个部分,我们将继续使用上一个示例消息,但通过将主题中的一些信息放入 JSON 负载中来稍微更改它
主题:detroit/0f801/09fse21
消息
{
"orientation": "front_left"
"latitude": 42.33
"longitude": -83.04
"rpm": 846
}
为此,用户将希望像以前一样解析主题。该配置将如下所示
...snipped...
[[inputs.mqtt_consumer.topic_parsing]]
topic = "+/+/+" # all topics with 3 segments
tags = "site/vehicle_id/tire_id"
对于这个配置,我们保持了度量不变(Telegaf 默认的每个输入),并且我们没有设置任何字段。后者的原因是我们知道这次可以从负载中获取字段。那么让我们解析它!
为了使 JSON 解析足够灵活以处理深度嵌套的 blob 和其他复杂性,这个新的 JSON_v2 解析器使用嵌套的 TOML 表和 GJSON 查询来访问所需的 blob。上面的消息负载很小且扁平,因此这次解析器将被严重未充分利用。
从上面主题和 JSON 正文的组合中,我们希望获得与之前主题解析示例中相同的输出。作为回顾,该 LP 看起来像
mqtt_consumer,site=detroit,vehicle_id=0f801,tire_id=09fse21 lat=42.33,lon=-83.04,rpm=846 <timestamp>
请记住我们刚刚解析的较短主题,以下是随附的 JSON 解析配置
...snipped...
data_format = "json_v2" # invokes the parser -- lines following are parser config
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.tag]]
path = "orientation" # GJSON path: JSON is flat -- all keys at root
[[inputs.mqtt_consumer.json_v2.field]]
path = "rpm"
type = "int"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "latitude"
rename = "lat"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "longitude"
rename = "lon"
type = "float"
这将产生与所有元数据都在主题本身中的示例相同的结果。这两种情况都完全合理。
我可以理解,对于如此简单的 JSON 简述,JSON 解析看起来有点冗长。冗长性在更复杂的情况下会凸显出来。对于这种情况,这里有一个使用不同方式解析标签和不同方式解析字段的相同结果示例
...snipped...
data_format = "json_v2" # invokes the parser -- lines following are parser config
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.object]]
tags = ["site"] # can list all Tags -- our case only has one
[[inputs.mqtt_consumer.json_v2.object.field]] # fields by key and type
rpm = "int"
latitude = "float"
longitude = "float"
定义字段的方式是我最喜欢这个解析器的部分。如此简洁!另请注意使用的 object
表。如果您只需要定义标签和字段,并且不需要对每个标签和字段执行特殊操作,那么这些是保持配置更简洁的好方法!
总结
这就是本质。您现在知道 Telegraf 从 MQTT 代理消费、订阅多个主题以及同时解析主题和负载中的数据,并将该数据合并到 LP 记录中是什么样子。InfluxDB 会因此而爱您的!
如果您正处于开始开发物联网数据管道并且 MQTT 混合在其中的阶段,那么插入 Telegraf 作为消费者(甚至生产者,如果您愿意)以将数据分发到您的数据存储可能是一个不错的选择。如果您正在使用或计划使用 InfluxDB 作为数据存储层的时间序列组件,那么 Telegraf 经过优化设计,可用于馈送 InfluxDB。无论您的数据最终位于何处,如果您需要一种相对简单的方法从 MQTT 代理/集群获取各种形式和来源的数据,Telegraf 都可以做到。