如何从 Flux 本机输出数据到 MQTT

导航至

使用 Flux 将数据从 InfluxDB 写入 MQTT

我很早就开始使用 InfluxDB v2.0 的开源 (OSS) 版本,当时还处于 Alpha 发布阶段。即使在早期版本中,我就对事情的发展方向非常着迷。但如您所知,我做了很多物联网构建,并且所有构建都使用 InfluxDB,因此有些事情我需要它来做,但它当时还不能做。

我的所有物联网演示都做的一件事是将警报写入 MQTT broker。我还有其他物联网设备从该 broker 读取数据,并根据它们收到的消息采取行动。但是 InfluxDB 2.0 Alpha 没有真正的输出功能。

注意: 一个警报框架即将添加到 InfluxDB 2.0 中,但 a) 当时它还不可用,并且 b) 我现在就需要它。

怎么办?嗯,Flux 是一种可扩展的语言,所以我决定扩展该语言以写入 MQTT。首先,重要的是要注意 Flux 有 2 种用于读取和写入数据的语言构造:from()to()。如果您编写过任何 Flux,您会认出 from() 语法是您如何从 InfluxDB 取回数据的。 to() 业务有点难。该语言内置了使用 to() 语法写回 InfluxDB 的能力。我还找到了一个 to() http 扩展,它允许您将 Flux 查询的结果写入 http 端点。至少我现在有了一个起点!

将 MQTT 添加到 Flux

我开始研究 Flux 代码,看看 http to() 方法是如何实现的,并很快发现使用相同的框架用于 MQTT 几乎是微不足道的,所以我复制了 http to() 输出的所有代码,并开始着手将其转移到 MQTT。与所有这些事情一样,它比我最初想象的要少一些“微不足道”,但在断断续续工作了几个星期后,我从 Flux 获得了到 MQTT 的工作输出!

首先,我必须定义 MQTT 输出需要哪些选项,我确定了一组默认的最小选项

golang
type ToMQTTOpSpec struct {
    Broker string `json:"broker"`
    Name string `json:"name"`
    Topic string `json:"topic"`
    Message string `json:"message"`
    ClientID string `json:"clientid"`
    Username string `json:"username"`
    Password string `json:"password"`
    QoS int `json:"qos"`
    NameColumn string `json:"nameColumn"` // either name or name_column must be set, if none is set try to use the "_measurement" column.
    Timeout time.Duration `json:"timeout"` // default to something reasonable if zero
    NoKeepAlive bool `json:"noKeepAlive"`
    TimeColumn string `json:"timeColumn"`
    TagColumns []string `json:"tagColumns"`
    ValueColumns []string `json:"valueColumns"`
}

当然,并非所有这些都是必需的,但我将解释一下必需的那些。

首先,当然,您需要定义一个 Broker。这是您要使用的 MQTT broker 的 URL。在您的 URL 中,您的 broker 应标识为 tcpwstls,因此 tcp://mqtt.mybroker.com:1883 将是它正在寻找的。 大部分其余部分在很大程度上是可选的。如果您提供 Username,则您还必须提供密码。您不能只有其中一个而没有另一个!此外,如果您不提供 Topic,那么将为您创建一个主题,方法是将查询返回的所有标签串在一起。我建议给出一个主题,因为主题为 /tag1/tag_2/tag_3/... 在很多情况下都不是很理想。

如何使用这个新事物?

很高兴您问到!首先,它实际上还不是 Flux 的一部分。我已经提交了一个 PR,它已被接受,但(截至撰写本文时)尚未合并。如果您想构建自己的 Flux 版本以便现在获得它,那么您需要拉取分支并从源代码构建。请参阅 MQTT PR 并从那里开始。

完成此操作后,开始写入 MQTT broker 的 Flux 代码实际上非常简单!您需要在 InfluxDB 2.0 UI 中创建一个 Task,然后您可以粘贴以下代码

import "mqtt"
from(bucket: "telegraf")
    |> range(start: -task.every)
    |> filter(fn: (r) =>
        (r._measurement == "cpu"))
    |> filter(fn: (r) =>
        (r._field == "usage_system"))
    |> filter(fn: (r) =>
        (r.cpu == "cpu-total"))
    |> last()
    |> mqtt.to(
        broker: "tcp://davidgs.com:8883",
        topic: "cpu",
        clientid: "cpu-flux",
        valueColumns: ["_value"],
        tagColumns: ["cpu", "host"],
    )

这将把最后一个 CPU usage_system 值写入您的 MQTT broker。使用 UI,您可以决定要以多高的频率写入此数据。

需要注意的几件事

重要的是要意识到 Flux 从查询返回的所有数据都是表。上述 Task 使用 last() 函数的原因是将返回值限制为恰好有一行的表。 MQTT to() 函数会将整个表作为行协议写入 broker。如果您的查询返回一个非常大的表,请准备好让您的 MQTT broker 获得一个非常大的表作为消息负载。

此外,如果您的查询返回多个表,则 MQTT to() 函数将每个表写入一条消息,每条消息都包含一个完整的表。如果这不是您想要的行为,您应该考虑如何编写您的查询,使其返回单个(最好是小的)表作为结果。

您可能还注意到上面有一个可选字段 Message。如果您想要发送预定义的消息而不是结果表,您可以在调用 to() 时定义 message 参数,并且将发送该消息。到目前为止,我已经使用了大约 2 个月,效果非常好!我能够根据其他物联网设备的读数来控制一些物联网设备,而且效果很好!

更新: 此 PR 现在已合并到 master 分支中,因此它应该很快就会出现在 Flux 的版本中!