Telegraf 对 MQTT 主题和有效载荷的解析

导航至


系好安全带,这篇文章可能不短,但我希望它能提供充分的信息!这篇文章是关于 Telegraf 作为 InfluxDB 写入上下文中 MQTT 消息的消费者。如果您对 Telegraf 感兴趣但又不熟悉它,您可以在这里查看文档。不确定 Telegraf 是否符合您的需求?我在这篇文章的“优化写入”部分对其进行了论述。

了解行协议(InfluxDB 默认接受格式)可能也有帮助。

在这篇文章中,我想通过以下两个方面来阐述 Telegraf 在 MQTT 上收集数据的有效性:a) 深入探讨最新版本的一个功能;b) 讨论解析方法。正如许多读者所知,MQTT 消息的有效载荷可以是…实际上可以是任何您想要的内容,因此,一个订阅多个主题的订阅者能够处理任何您向它扔来的内容是至关重要的。

Telegraf 可以订阅任意数量的主题,同时让用户控制它如何塑造传入的数据。控制数据形状允许用户写入不仅与目标数据存储兼容,而且优化的数据。

在此上下文中,数据形状控制有两种形式,每种形式都会在这篇文章的单独部分中进行说明

  1. 主题解析
  2. 有效载荷解析

维护

  • 以下所有 Telegraf 行为均假设具有标准 [Agent] 级别配置,其中 omit_hostname = true
  • 示例中省略了时间戳

主题解析

在 Telegraf 的最新版本(v1.21)中,MQTT 消费者输入插件 添加了从主题动态解析信息并将其附加到有效载荷的能力。这意味着 Telegraf 操作员可以使用 MQTT 消息有效载荷和传入主题中的数据。

以下是一个图例,说明了四个不同的 MQTT 消息数据包在 InfluxDB 数据模型中以相同的方式展开。

MQTT Topic and Payload Parsing with Telegraf

这是通过 Telegraf 实现的,这篇文章的其余部分将解释如何实现。

首先,我将通过例子来解释这个问题。假设我有一个基本传感器,向包含其唯一ID的主题发送单个温度值;例如:sensors/00191。MQTT消息将发送到这个主题,包含一个原始值——在这个例子中,让我们假设一个浮点数10.0的样本。

sensors/00191上的任何第三方订阅者都不会知道如何处理这个温度值

  1. 它本身不是有效的LP,所以Telegraf将拒绝它。

  2. 如果你帮助Telegraf设置data_format="value"data_type="float",它将生成LP如

mqtt_consumer,topic=sensors/00191 value=10.0 <timestamp>

(这是没有提供测量名称的输入插件的默认行为)。

请注意,Telegraf默认会将整个主题作为LP标签添加。这是Telegraf在方便你的同时保留任何可能需要的上下文。然而,将整个主题作为一个字符串包含在内,将需要一些正则表达式来使其有用。它存在的原因是,如果没有它

  • 这个value字段对任何消费数据的人或事物都没有意义。
  • 如果同时写入,每个发出这种数据的传感器将覆盖最后一个传感器的数据(需要标签来区分像value这样的字段)。

在这种情况下,所需的信息在主题中。这可能是因为供应商编写了传感器以这种方式发出数据,或者是因为用户正在处理高度受限的资源,并且需要将有效载荷保持尽可能小。

进入主题解析!Telegraf通过允许用户指定他们想要将主题的哪些部分作为测量、标签或字段带入,来解决此问题,而无需将整个主题作为标签存储。在此版本之前,这可以通过正则表达式处理器插件来完成,但新体验更简单、更方便。

只需对mqtt_consumer输入插件进行简单的配置更改,我们就可以得到我们所需的内容

[[inputs.mqtt_consumer]]
    servers = ["<address:port>"]
    topics = ["sensors/#"]
    [[inputs.mqtt_consumer.topic_parsing]]
        topic = "sensors/+"
        measurement = "measurement/_"
        tags = "_/sensor_id"

上面的Telegraf配置实例化了一个订阅了sensors/#(任何具有父级sensors的主题)的客户端。如果您熟悉SNMP等输入插件或JSON_v2等解析器(如果之前没有介绍过!),您将认出由[[inputs.mqtt_consumer.topic_parsing]]定义的嵌套TOML表格。可以有任意数量的这些,但我们只需要一个。在这个表中,您告诉Telegraf哪些主题要匹配,以及随着信息的到来,您将解析这些主题。之后是您告诉Telegraf将什么作为测量、标签和/或字段。在这种情况下,我们只提取测量和一个单个标签(毕竟没有多少信息需要解析)。没有字段出现,但这种情况并不总是如此。

我们使用引号中的measurement/_语法来告诉Telegraf哪些主题名称段将被解析成什么LP元素。键映射到主题中相应索引的值(按段索引),下划线映射到与该行协议元素不相关的段(占位符)。

在上面的示例配置中,我们告诉Telegraf第一个部分将是我们的测量值,第二个(最后一个)部分将是一个带有键sensor_id的标签。该段中的实际信息将作为该标签键的值。如果我们现在想移除默认的topic标签,我们可以在配置中包含一个topic_tag = ""参数,并得到以下输出

mqtt_consumer,sensor_id=00191 value=10.0 <timestamp>

此类修改在规模较大和更复杂的话题和有效载荷中很重要。它还极大地改善了下游的统计分析体验...

快速提示:最好(但不必要)的做法是让字段名称包含有用信息,因此“value”并不是理想的选择。为了解决这个问题,您只需使用Pivot处理器(本文不涉及此内容)。

无论如何,谈到更复杂的话题和有效载荷,让我们再看看两个简短的例子

话题:detroit/0f801/42.33/-83.04/front_left/09fse21/rpm 消息:846

我们希望我们的LP输出包含所有这些信息。第一个部分是该车辆所属的城市(站点)。第二部分是车辆的ID。第三部分是给定样本时间的纬度。第四部分是经度,第五部分是车辆上的轮胎位置,第六部分是轮胎的ID,最后是轮胎的rpm值。所有这些都可能是重要的信息!

我不会深入探讨哪些段应该对应哪些LP元素——关于这一点,您可以参考我上面链接的博客文章。对于这个例子,我们将对测量值进行通用处理,并让Telegraf为其命名。我们的测量值将是mqtt_consumer ;(如果需要,Telegraf可以在其他插件中重命名此值)。每个字符串都将是一个标签,每个数值元素将是一个字段。最终,我们希望我们的结果LP看起来像

mqtt_consumer,site=detroit,vehicle_id=0f801,tire_id=09fse21 lat=42.33,lon=-83.04,rpm=846 <timestamp>

以下相关的[[inputs.mqtt_consumer]] 配置将帮助我们实现目标

...snipped...
    [[inputs.mqtt_consumer.topic_parsing]]
        topic = "+/+/+/+/+/+/+"  # all topics with 7 segments
        tags = "site/vehicle_id/_/_/orientation/tire_id/_"
        fields = "_/_/_/_/_/_/rpm"

这就是它了——从遵循7段结构的每个话题中提取主题编码信息到行协议记录的简洁配置!

有效载荷解析!

解析插件在Telegraf中并不陌生,JSON解析也是如此,但我认为MQTT消费话题值得深入探讨JSON解析。首先,请注意Telegraf还提供了一份其他解析器的列表。解析器是通过data_format 参数在插件内部调用的。它们的配置略有不同,因为它们的格式在语义上有所不同。对于这篇文章,我们将专注于JSON解析。

Telegraf中的JSON解析已经存在很长时间了,但在几个版本之前进行了重写。新版本中尝试抛给它一些它无法解析的JSON!

在深入探讨之前,如果您想详细了解这个升级后的解析器,Samantha Wang(Telegraf产品经理)的这篇博客文章非常出色。此外,如果您更喜欢使用Python方言进行解析,我还在这里写了一篇文章。

如您所知,MQTT可以接受几乎所有类型的有效载荷。JSON是最常见的。话虽如此,JSON也可以是任何形状,因此可配置的解析至关重要!

为了连接部分,我们将继续使用上一个示例消息,但稍作改动,将一些原本属于主题的信息放入JSON有效负载中

主题:detroit/0f801/09fse21 消息

{
    "orientation": "front_left"
    "latitude": 42.33
    "longitude": -83.04
    "rpm": 846
}

为此,用户需要像之前一样解析主题。该配置如下

...snipped...
    [[inputs.mqtt_consumer.topic_parsing]]
        topic = "+/+/+"  # all topics with 3 segments
        tags = "site/vehicle_id/tire_id"

对于这个,我们保留测量值(如Telegaf的默认输入)并设置无字段。原因是这次我们知道我们可以从有效负载中获取字段。让我们解析它!

为了使JSON解析足够灵活,能够处理深度嵌套的blob和其他复杂性,这个新的JSON_v2解析器使用嵌套的TOML表和GJSON查询来访问所需的blob。上面的消息有效负载很小且扁平,所以这次解析器将被大量地未充分利用

结合上述主题和JSON正文,我们想要与之前主题解析示例中相同的输出。作为一个提醒,LP看起来如下

mqtt_consumer,site=detroit,vehicle_id=0f801,tire_id=09fse21 lat=42.33,lon=-83.04,rpm=846 <timestamp>

考虑到我们刚才解析的较短的主题,以下是相应的JSON解析配置

...snipped...
data_format = "json_v2" # invokes the parser -- lines following are parser config
[[inputs.mqtt_consumer.json_v2]]
    [[inputs.mqtt_consumer.json_v2.tag]]
        path = "orientation" # GJSON path: JSON is flat -- all keys at root
    [[inputs.mqtt_consumer.json_v2.field]]
        path = "rpm"
        type = "int"
    [[inputs.mqtt_consumer.json_v2.tag]]
        path = "latitude"
        rename = "lat"
        type = "float"
    [[inputs.mqtt_consumer.json_v2.field]]
        path = "longitude"
        rename = "lon"
        type = "float"

这将产生与所有元数据都在主题本身中的示例相同的输出。任何一种情况都是完全合理的。

我可以理解如果JSON解析看起来有点冗长,特别是对于如此简单的JSON片段。冗长在更复杂的情况下更为明显。对于这个案例,这里有一个使用不同的方式解析标签和字段来达到相同结果的示例

...snipped...
data_format = "json_v2" # invokes the parser -- lines following are parser config
[[inputs.mqtt_consumer.json_v2]]
    [[inputs.mqtt_consumer.json_v2.object]]
        tags = ["site"] # can list all Tags -- our case only has one
    [[inputs.mqtt_consumer.json_v2.object.field]] # fields by key and type
        rpm = "int"
        latitude = "float"
        longitude = "float"

这就是这个解析器我最喜欢的部分。如此简洁!同时请注意使用的object表。如果你只需要定义你的标签和字段,而不需要为每个元素做特别处理,这些方法可以很好地缩短你的配置!

总结

这就是它的核心内容。你现在知道了Telegraf如何从MQTT代理消费,订阅多个主题,以及如何同时解析主题和有效负载中的数据,将这些数据合并到LP记录中。InfluxDB会因此而喜欢你!

如果你处于开始开发物联网数据管道的阶段,MQTT在其中,并且作为消费者(甚至如果你愿意,也可以作为生产者)将数据分发到你的数据存储中,这可能是一个不错的选择。如果你正在使用或计划使用InfluxDB作为数据存储层的时序组件,Telegraf被设计为最适合为InfluxDB提供数据。无论你的数据最终在哪里,如果你需要从MQTT代理/集群以相对简单的方式获取各种形式和来源的数据,Telegraf都可以做到。