如何从 Flux 本机输出数据到 MQTT
作者:David G. Simmons / 用例, 产品
2019 年 8 月 16 日
导航至
使用 Flux 将数据从 InfluxDB 写入 MQTT
我很早就开始使用 InfluxDB v2.0 的开源 (OSS) 版本,当时还处于 Alpha 发布阶段。即使在早期版本中,我就对事情的发展方向非常着迷。但如您所知,我做了很多物联网构建,并且所有构建都使用 InfluxDB,因此有些事情我需要它来做,但它当时还不能做。
我的所有物联网演示都做的一件事是将警报写入 MQTT broker。我还有其他物联网设备从该 broker 读取数据,并根据它们收到的消息采取行动。但是 InfluxDB 2.0 Alpha 没有真正的输出功能。
注意: 一个警报框架即将添加到 InfluxDB 2.0 中,但 a) 当时它还不可用,并且 b) 我现在就需要它。
怎么办?嗯,Flux 是一种可扩展的语言,所以我决定扩展该语言以写入 MQTT。首先,重要的是要注意 Flux 有 2 种用于读取和写入数据的语言构造:from()
和 to()
。如果您编写过任何 Flux,您会认出 from()
语法是您如何从 InfluxDB 取回数据的。 to()
业务有点难。该语言内置了使用 to()
语法写回 InfluxDB 的能力。我还找到了一个 to()
http 扩展,它允许您将 Flux 查询的结果写入 http 端点。至少我现在有了一个起点!
将 MQTT 添加到 Flux
我开始研究 Flux 代码,看看 http to()
方法是如何实现的,并很快发现使用相同的框架用于 MQTT 几乎是微不足道的,所以我复制了 http to()
输出的所有代码,并开始着手将其转移到 MQTT。与所有这些事情一样,它比我最初想象的要少一些“微不足道”,但在断断续续工作了几个星期后,我从 Flux 获得了到 MQTT 的工作输出!
首先,我必须定义 MQTT 输出需要哪些选项,我确定了一组默认的最小选项
golang
type ToMQTTOpSpec struct {
Broker string `json:"broker"`
Name string `json:"name"`
Topic string `json:"topic"`
Message string `json:"message"`
ClientID string `json:"clientid"`
Username string `json:"username"`
Password string `json:"password"`
QoS int `json:"qos"`
NameColumn string `json:"nameColumn"` // either name or name_column must be set, if none is set try to use the "_measurement" column.
Timeout time.Duration `json:"timeout"` // default to something reasonable if zero
NoKeepAlive bool `json:"noKeepAlive"`
TimeColumn string `json:"timeColumn"`
TagColumns []string `json:"tagColumns"`
ValueColumns []string `json:"valueColumns"`
}
当然,并非所有这些都是必需的,但我将解释一下必需的那些。
首先,当然,您需要定义一个 Broker。这是您要使用的 MQTT broker 的 URL。在您的 URL 中,您的 broker 应标识为 tcp
、ws
或 tls
,因此 tcp://mqtt.mybroker.com:1883
将是它正在寻找的。 大部分其余部分在很大程度上是可选的。如果您提供 Username
,则您还必须提供密码。您不能只有其中一个而没有另一个!此外,如果您不提供 Topic
,那么将为您创建一个主题,方法是将查询返回的所有标签串在一起。我建议给出一个主题,因为主题为 /tag1/tag_2/tag_3/...
在很多情况下都不是很理想。
如何使用这个新事物?
很高兴您问到!首先,它实际上还不是 Flux 的一部分。我已经提交了一个 PR,它已被接受,但(截至撰写本文时)尚未合并。如果您想构建自己的 Flux 版本以便现在获得它,那么您需要拉取分支并从源代码构建。请参阅 MQTT PR 并从那里开始。
完成此操作后,开始写入 MQTT broker 的 Flux 代码实际上非常简单!您需要在 InfluxDB 2.0 UI 中创建一个 Task,然后您可以粘贴以下代码
import "mqtt"
from(bucket: "telegraf")
|> range(start: -task.every)
|> filter(fn: (r) =>
(r._measurement == "cpu"))
|> filter(fn: (r) =>
(r._field == "usage_system"))
|> filter(fn: (r) =>
(r.cpu == "cpu-total"))
|> last()
|> mqtt.to(
broker: "tcp://davidgs.com:8883",
topic: "cpu",
clientid: "cpu-flux",
valueColumns: ["_value"],
tagColumns: ["cpu", "host"],
)
这将把最后一个 CPU usage_system
值写入您的 MQTT broker。使用 UI,您可以决定要以多高的频率写入此数据。
需要注意的几件事
重要的是要意识到 Flux 从查询返回的所有数据都是表。上述 Task 使用 last()
函数的原因是将返回值限制为恰好有一行的表。 MQTT to()
函数会将整个表作为行协议写入 broker。如果您的查询返回一个非常大的表,请准备好让您的 MQTT broker 获得一个非常大的表作为消息负载。
此外,如果您的查询返回多个表,则 MQTT to()
函数将每个表写入一条消息,每条消息都包含一个完整的表。如果这不是您想要的行为,您应该考虑如何编写您的查询,使其返回单个(最好是小的)表作为结果。
您可能还注意到上面有一个可选字段 Message
。如果您想要发送预定义的消息而不是结果表,您可以在调用 to()
时定义 message
参数,并且将发送该消息。到目前为止,我已经使用了大约 2 个月,效果非常好!我能够根据其他物联网设备的读数来控制一些物联网设备,而且效果很好!
更新: 此 PR 现在已合并到 master 分支中,因此它应该很快就会出现在 Flux 的版本中!