如何将 Flux 数据输出到 MQTT 的原生方式

导航到

使用 Flux 将数据从 InfluxDB 写入 MQTT

我早在 InfluxDB v2.0 Alpha 版本发布之初就开始使用开源版本。即使在早期的版本中,我也非常喜欢其发展态势。但是,如您所知,我做了很多 IoT 构建工作,并使用 InfluxDB 进行所有这些工作,因此我需要它完成一些它尚未完成的事情。

我所做的所有 IoT 示例之一是向 MQTT 代理写入警报。我还有其他 IoT 设备从该代理读取,并根据接收到的消息采取行动。但是,InfluxDB 2.0 Alpha 版本没有真正的输出功能。

注意: InfluxDB 2.0 非常快就要推出一个警报框架,但 a) 那时还没有可用,b) 我当时就需要它。

该怎么办呢?Flux是一种可扩展的语言,所以我决定扩展语言以写入MQTT。首先,重要的是要注意,Flux有用于读写数据的两种语言构造:from()to()。如果你写过任何Flux,你会认出from() 的语法是你如何从InfluxDB获取数据的方式。而to()就稍微复杂一些。语言内置了使用to()语法的将数据写回InfluxDB的能力。我还发现了一个针对http的to()扩展,它允许你将Flux查询的结果写入http端点。至少,我现在有一个起点了!

将MQTT添加到Flux

我开始研究Flux代码,看看http to()方法是如何实现的,很快我就发现,使用这个相同的框架来使用MQTT几乎是微不足道的,所以我复制了所有http to() 的代码,并开始将其转移到MQTT。和所有这些事情一样,这比我最初想象的要复杂一些,但在几周的断断续续的工作后,我终于从Flux成功输出到了MQTT!

首先,我必须定义MQTT输出需要哪些选项,我决定采用一种默认的最小选项集。

golang
type ToMQTTOpSpec struct {
    Broker string `json:"broker"`
    Name string `json:"name"`
    Topic string `json:"topic"`
    Message string `json:"message"`
    ClientID string `json:"clientid"`
    Username string `json:"username"`
    Password string `json:"password"`
    QoS int `json:"qos"`
    NameColumn string `json:"nameColumn"` // either name or name_column must be set, if none is set try to use the "_measurement" column.
    Timeout time.Duration `json:"timeout"` // default to something reasonable if zero
    NoKeepAlive bool `json:"noKeepAlive"`
    TimeColumn string `json:"timeColumn"`
    TagColumns []string `json:"tagColumns"`
    ValueColumns []string `json:"valueColumns"`
}

当然,并非所有这些选项都是必需的,但我将说明其中的一些。

当然,首先你需要定义一个代理。这是你想要使用的MQTT代理的URL。在你的URL中,你的代理应该被标识为tcpwstls,所以tcp://mqtt.mybroker.com:1883就是它所寻找的。大部分其他选项在很大程度上是可选的。如果你提供了用户名,那么你也必须提供密码。你不能没有其中之一!此外,如果你没有提供主题,则会为你创建一个由查询返回的所有标签连接起来的主题。我建议提供主题,因为像/tag1/tag_2/tag_3/... 这样的主题在许多情况下都不是理想的。

如何使用这个新功能?

我很高兴你问了!实际上,这还不是Flux的一部分。我已经提交了一个PR,它已经被接受,但(截至撰写本文时)尚未合并。如果你想自己构建Flux版本来获取这个功能,你需要拉取分支并从源代码构建。请参阅MQTT PR并从那里开始。

一旦你这样做,Flux代码开始写入MQTT代理实际上是非常简单的!你需要在InfluxDB 2.0 UI中创建一个任务,然后你可以粘贴以下代码

import "mqtt"
from(bucket: "telegraf")
    |> range(start: -task.every)
    |> filter(fn: (r) =>
        (r._measurement == "cpu"))
    |> filter(fn: (r) =>
        (r._field == "usage_system"))
    |> filter(fn: (r) =>
        (r.cpu == "cpu-total"))
    |> last()
    |> mqtt.to(
        broker: "tcp://davidgs.com:8883",
        topic: "cpu",
        clientid: "cpu-flux",
        valueColumns: ["_value"],
        tagColumns: ["cpu", "host"],
    )

这将把最新的CPU usage_system值写入你的MQTT代理。使用UI,你可以决定你想多久写入一次这些数据。

以下几点需要注意

重要的是要认识到Flux将查询的所有数据都作为表格返回。上述任务使用last() 函数的原因是限制返回值为只有一行数据的表格。MQTT的to()函数会将整个表格以line-protocol的形式写入代理。如果你的查询返回一个非常大的表格,请准备好你的MQTT代理将接收到一个非常大的表格作为消息有效载荷。

此外,如果你的查询返回多个表格,MQTT的to()函数将写入一条消息,每条消息包含一个完整的表格。如果你的行为不是这样的,你应该考虑如何构建查询,以便它返回一个单一(最好是小型)表格作为结果。

您可能也注意到了上面的一个可选字段 消息。如果您想要发送预定义的消息而不是结果表格,您可以在调用 to() 时定义 message 参数,那么这条消息就会被发送。到目前为止,我已经使用了大约2个月,效果非常棒!我能够根据其他物联网设备的读数控制一些物联网设备,效果非常好!

更新: 这个PR现在已经合并到主分支,所以它应该会在Flux的下一个版本中显示!