如何将 Flux 数据输出到 MQTT 的原生方式
作者:David G. Simmons / 用例,产品
2019年8月16日
导航到
使用 Flux 将数据从 InfluxDB 写入 MQTT
我早在 InfluxDB v2.0 Alpha 版本发布之初就开始使用开源版本。即使在早期的版本中,我也非常喜欢其发展态势。但是,如您所知,我做了很多 IoT 构建工作,并使用 InfluxDB 进行所有这些工作,因此我需要它完成一些它尚未完成的事情。
我所做的所有 IoT 示例之一是向 MQTT 代理写入警报。我还有其他 IoT 设备从该代理读取,并根据接收到的消息采取行动。但是,InfluxDB 2.0 Alpha 版本没有真正的输出功能。
注意: InfluxDB 2.0 非常快就要推出一个警报框架,但 a) 那时还没有可用,b) 我当时就需要它。
该怎么办呢?Flux是一种可扩展的语言,所以我决定扩展语言以写入MQTT。首先,重要的是要注意,Flux有用于读写数据的两种语言构造:from()
和to()
。如果你写过任何Flux,你会认出from()
的语法是你如何从InfluxDB获取数据的方式。而to()
就稍微复杂一些。语言内置了使用to()
语法的将数据写回InfluxDB的能力。我还发现了一个针对http的to()
扩展,它允许你将Flux查询的结果写入http端点。至少,我现在有一个起点了!
将MQTT添加到Flux
我开始研究Flux代码,看看http to()
方法是如何实现的,很快我就发现,使用这个相同的框架来使用MQTT几乎是微不足道的,所以我复制了所有http to()
的代码,并开始将其转移到MQTT。和所有这些事情一样,这比我最初想象的要复杂一些,但在几周的断断续续的工作后,我终于从Flux成功输出到了MQTT!
首先,我必须定义MQTT输出需要哪些选项,我决定采用一种默认的最小选项集。
golang
type ToMQTTOpSpec struct {
Broker string `json:"broker"`
Name string `json:"name"`
Topic string `json:"topic"`
Message string `json:"message"`
ClientID string `json:"clientid"`
Username string `json:"username"`
Password string `json:"password"`
QoS int `json:"qos"`
NameColumn string `json:"nameColumn"` // either name or name_column must be set, if none is set try to use the "_measurement" column.
Timeout time.Duration `json:"timeout"` // default to something reasonable if zero
NoKeepAlive bool `json:"noKeepAlive"`
TimeColumn string `json:"timeColumn"`
TagColumns []string `json:"tagColumns"`
ValueColumns []string `json:"valueColumns"`
}
当然,并非所有这些选项都是必需的
,但我将说明其中的一些。
当然,首先你需要定义一个代理。这是你想要使用的MQTT代理的URL。在你的URL中,你的代理应该被标识为tcp
、ws
或tls
,所以tcp://mqtt.mybroker.com:1883
就是它所寻找的。大部分其他选项在很大程度上是可选的。如果
你提供了用户名,那么你也必须提供密码。你不能没有其中之一!此外,如果你没有提供主题,则会为你创建一个由查询返回的所有标签连接起来的主题。我建议提供主题,因为像/tag1/tag_2/tag_3/...
这样的主题在许多情况下都不是理想的。
如何使用这个新功能?
我很高兴你问了!实际上,这还不是Flux的一部分。我已经提交了一个PR,它已经被接受,但(截至撰写本文时)尚未合并。如果你想自己构建Flux版本来获取这个功能,你需要拉取分支并从源代码构建。请参阅MQTT PR并从那里开始。
一旦你这样做,Flux代码开始写入MQTT代理实际上是非常简单的!你需要在InfluxDB 2.0 UI中创建一个任务,然后你可以粘贴以下代码
import "mqtt"
from(bucket: "telegraf")
|> range(start: -task.every)
|> filter(fn: (r) =>
(r._measurement == "cpu"))
|> filter(fn: (r) =>
(r._field == "usage_system"))
|> filter(fn: (r) =>
(r.cpu == "cpu-total"))
|> last()
|> mqtt.to(
broker: "tcp://davidgs.com:8883",
topic: "cpu",
clientid: "cpu-flux",
valueColumns: ["_value"],
tagColumns: ["cpu", "host"],
)
这将把最新的CPU usage_system
值写入你的MQTT代理。使用UI,你可以决定你想多久写入一次这些数据。
以下几点需要注意
重要的是要认识到Flux将查询的所有数据都作为表格返回。上述任务使用last()
函数的原因是限制返回值为只有一行数据的表格。MQTT的to()
函数会将整个表格以line-protocol的形式写入代理。如果你的查询返回一个非常大的表格,请准备好你的MQTT代理将接收到一个非常大的表格作为消息有效载荷。
此外,如果你的查询返回多个表格,MQTT的to()
函数将写入一条消息,每条消息包含一个完整的表格。如果你的行为不是这样的,你应该考虑如何构建查询,以便它返回一个单一(最好是小型)表格作为结果。
您可能也注意到了上面的一个可选字段 消息
。如果您想要发送预定义的消息而不是结果表格,您可以在调用 to()
时定义 message
参数,那么这条消息就会被发送。到目前为止,我已经使用了大约2个月,效果非常棒!我能够根据其他物联网设备的读数控制一些物联网设备,效果非常好!
更新: 这个PR现在已经合并到主分支,所以它应该会在Flux的下一个版本中显示!