使用 Telegraf 将 Flume Water 连接到 InfluxDB Cloud

导航至

我喜欢让我的设备与我对话并告诉我事情。事实上,我的房子里到处都是智能设备:智能电视、扬声器、灯开关、电器,甚至我的牙刷都有应用程序。然而,一个很大的谜团是我的公用事业使用情况。在大多数情况下,美国的公共事业公司并不以其尖端技术而闻名。是时候自己动手了。以下是我将在本文中讨论的主题

使用 Flume Water Monitor 跟踪我的用水量

我决定专注于我的用水量。我住在加利福尼亚州,这个州一直在鼓励人们节约用水。这是因为加利福尼亚州容易发生干旱,而且 2022 年加利福尼亚州新法律将每户家庭的用水量限制为每人 55 加仑。

当我每月查看我的水费账单时,我看到我的用水量随时间变化的曲线图,但我无法从公用事业公司实时获取该数据。

My real water usage for the past 6 months

过去 6 个月的实际用水量

在 2021 年圣诞节,我向圣诞老人要了一个 Flume Water 传感器,一个智能家居水表监控器。Flume 智能水表监控设备的工作原理是感应我所在城市水表中的旋转盘并估算用水量。旋转速度越快,我使用的水就越多。该设备在跟踪用水量方面做得出奇地好。我选择 Flume Water 水表主要有两个原因

  • 我不需要切割任何管道,而且它可以在几分钟内安装完成。我只需要将它的橡胶带绑在水表上,他们甚至还在设备中附赠了一副橡胶手套。
  • 其次,Flume Water 拥有一个 API,允许我基于我的用水量构建应用程序和自动化。

安装后它实际上是什么样子的?这是我的水表,它连接到进入我家的水表。

Flume water sensor

Flume Water 传感器连接到我的城市水表

在您的水表中安装 Flume 智能水表监控器

您可以通过简单的自助安装在您的水表中安装 Flume 智能家居水表监控器。您可以按照他们网站上的教程进行操作。

我承认,打开我家门前人行道上的其中一个面板感觉很奇怪。但是他们的教程引导我完成了整个过程。它还向我保证,我在美国所做的事情是完全合法的。

不仅仅是设置 Flume 应用程序

我在我的 iPhone 上下载了 flume 应用程序,但它也适用于 Android。Flume 还有一个直观的 Web 门户。

在我通过 iPhone 应用程序完成设置和校准后,Flume 开始跟踪我的用水量。事实证明,我每次冲马桶大约使用 1.3 加仑的水。真棒!

Flume 提供了一个不错的 Web 门户和一个 iPhone Flume 应用程序,用于查看我的用水量、设置警报等。但在 InfluxData 工作,我想弄清楚如何将这些数据导入到我的 InfluxDB Cloud 帐户中,以便在其上构建自定义仪表板和警报。所以,我开始着手制作它。

flume-graph

构建 Flume Water Golang 客户端

我查看了 Flume Water API 文档,它看起来非常简单易于交互。我一直在寻找简单的解决方案。所以我开始寻找一些用我熟悉的语言编写的客户端库。在 Google 搜索后,我找到了一个有用的项目,用于将数据整合到 Samsung Smartthings 中。但是没有我熟悉的语言的版本。

我一直想多学习一点关于编写 Go 代码的知识,因为 InfluxDB 就是用 Go 编写的。这就是为什么我开始构建一个客户端库,以便更轻松地与 Flume Water API 进行交互。

在查看了一些其他的 Go 客户端并从网上复制粘贴了一些 Go 示例后,我得到了一个可用的客户端。该客户端通过身份验证,从 Flume Water 令牌中解码了我的用户 ID,并查询了我的用水量。即使我不需要它,我还是决定添加一些函数来与整个 Flume API 进行交互,包括订阅和通知。

Use case using InfluxDB, Go and Flume water meter

我不是一个经验丰富的 Go 开发人员,但我能够拼凑出一些看起来可以用于身份验证和查询数据的东西。这是一个使用客户端查询数据的示例。

# These credentials can be generated in your Flume Water portal
client := NewClient(os.Getenv("FLUME_CLIENT_ID"), os.Getenv("FLUME_CLIENT_SECRET"), os.Getenv("FLUME_USERNAME"), os.Getenv("FLUME_PASSWORD"))

devices, _ := client.FetchUserDevices(FlumeWaterFetchDeviceRequest{})

query := FlumeWaterQuery{
	Bucket:        FlumeWaterBucketDay,
	SinceDatetime: "2021-03-12 00:00:00",
	RequestID:     "test",
}
results, err := client.QueryUserDevice(devices[0].ID, FlumeWaterQueryRequest{
	Queries: []FlumeWaterQuery{query},
})

如果您有兴趣在您的项目中使用此库,它被称为 Flume Water Go Client。它在 MIT 许可证下可用,供任何人使用和贡献。

现在我有一个可用的 Go 客户端,我需要找到一种方法将数据导入到我的 InfluxDB Cloud 帐户中。我是 Telegraf 数据采集代理的忠实粉丝。Telegraf 团队最近发布了一项功能,允许用户构建和托管外部插件。我想我可以尝试一下。

什么是 Telegraf 外部插件?

Telegraf Execd 输入插件是一种非常强大的向 Telegraf 添加新插件的方法。该插件旨在启动并与长时间运行的外部应用程序交互。该应用程序可以用任何语言编写并编译成可执行文件。Telegraf 通过 STDIN 与该应用程序通信,并且可以侦听输出或以固定的时间间隔向应用程序发出信号。

这是对现有 Exec 输入插件的改进,因为应用程序不会在每个 Telegraf 收集间隔重新启动。对于较大的应用程序,每次重新启动进程的开销对于数据收集的频率来说可能过大。

如果进程崩溃或被终止,Telegraf 也会管理进程的重新启动。如果您的代码不能很好地处理网络问题(例如我的代码),这也非常方便。

Telegraf 的现有外部插件

我开始查看 Telegraf 的现有外部插件,看看构建自己的插件需要做些什么。《Rand 示例》非常出色。它提供了一些样板代码,使创建插件变得非常容易。我过去曾贡献过自定义 Telegraf 插件。这个项目配置为,您真正需要做的就是像在 Telegraf 中通常那样实现相同的 Gather 函数,因此它非常直观。

作为外部插件运行它的好处是,该示例提供了一种简单的方法来测试插件并将结果打印到终端,以便您可以验证它是否正常工作。

Makefile 文件

我还添加了一个 Makefile 来简化构建和运行这些命令。make run

./bin/flume-water --config plugin.dev.conf --poll_interval 5s
flume_water,bridge_id=34535334534534534,device_id=34534534534534543,device_type=2,location_building_type=SINGLE_FAMILY_HOME,location_city=San\ Francisco,location_name=SF\ Bernal,location_postal_code=94110,location_state=CA,request_id=flume-water-telegraf-input,units=gallons,[email protected] value=0.10389612 1616521800000000000

结果:自定义 Telegraf Flume Water 输入插件

我最终得到的是一个自定义的 Telegraf Flume Water 输入插件,它可以像原生 Telegraf 插件一样轻松配置,并通过 Telegraf Execd 输入运行。一个示例配置如下所示

[[inputs.flume_water]]
    client_id = "clientid"
    client_secret = "secret"
    username = "username"
    password = "password"
    ## If this isn't set, we will fetch your device list and pick the first one
    #device_id = ""
    ## lookback_mins is the amount of minutes to look back when querying data. This helps catch any late arriving data
    #lookback_mins = 5
    ## units can be one of GALLONS, LITERS, CUBIC_FEET, or CUBIC_METERS
    #units = "GALLONS"

Telegraf 的相应 execd 配置如下所示

[[inputs.execd]]
  ## One program to run as daemon.
  ## NOTE: process and each argument should each be their own string
  command = ["/path/to/telegraf-flume-water-input/bin/flume-water", "--config", "/path/to/telegraf-flume-water-input/plugin.conf"]

  ## Define how the process is signaled on each collection interval.
  ## Valid values are:
  ##   "none"    : Do not signal anything. (Recommended for service inputs)
  ##               The process must output metrics by itself.
  ##   "STDIN"   : Send a newline on STDIN. (Recommended for gather inputs)
  ##   "SIGHUP"  : Send a HUP signal. Not available on Windows. (not recommended)
  ##   "SIGUSR1" : Send a USR1 signal. Not available on Windows.
  ##   "SIGUSR2" : Send a USR2 signal. Not available on Windows.
  signal = "none"

  ## Delay before the process is restarted after an unexpected termination
  restart_delay = "10s"

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

最后,我像往常一样启动 Telegraf,使用

telegraf --config telegraf.conf
2021-03-19T00:08:19Z I! Starting Telegraf 1.18.0
2021-03-19T00:08:19Z I! Loaded inputs: execd
2021-03-19T00:08:19Z I! Loaded aggregators:
2021-03-19T00:08:19Z I! Loaded processors:
2021-03-19T00:08:19Z I! Loaded outputs: influxdb_v2
2021-03-19T00:08:19Z I! Tags enabled: host=myhost.lan
2021-03-19T00:08:19Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:"myhost.lan", Flush Interval:10s
2021-03-19T00:08:19Z I! [inputs.execd] Starting process: /Users/rsavage/workspace/telegraf-flume-water-input/bin/flume-water [--config /Users/rsavage/workspace/telegraf-flume-water-input/plugin.conf]

好的,我已经通过外部插件将数据流式传输到 Telegraf 中。现在我需要将其连接到我的 InfluxDB Cloud 帐户中。

将所有内容连接到 InfluxDB Cloud

我已经有一个免费的 InfluxDB Cloud 帐户,所以我打开浏览器,跳转到 UI 的“加载数据 > Telegraf”部分。我从 UI 中抓取了输出插件配置,并将其添加到我的 Telegraf 配置中。我也能够通过浏览器生成 API 令牌。

我启动了所有内容并浏览了我的数据。几秒钟后,我开始看到我的用水量“流入”(我的意思是,我必须这样做)我的 InfluxDB Cloud 帐户。我现在可以准确地看到我使用了多少水。

仅仅是开始

此博客文章的所有代码都是开源的。如果您有 Flume Water 水表,您可以将其配置为连接到您的 InfluxDB Cloud 帐户。我目前正在构建 InfluxDB 模板。一旦数据流入您的帐户,就可以使用它快速启动并运行,除非有人抢先一步。

如果您有最喜欢的 IoT 设备,并且它具有导出数据的 API,为什么不贡献您自己的自定义 Telegraf 输入插件?我们很乐意了解您正在进行的工作,并向社区重点介绍。

如果您想亲自体验 InfluxDB,请注册一个免费的 InfluxDB Cloud 帐户。注册后,欢迎在我们始终乐于助人的 InfluxDB 社区和社区 Slack 频道中提出任何问题。祝您在将您的 flume water 水表配置到您的 InfluxDB Cloud 帐户时玩得开心!

常见问题

flume 水传感器如何工作?

Flume 智能水表监控器有三个部分

  1. Flume 水传感器
  2. Flume 网桥
  3. Flume Water 应用程序

要安装 Flume,您只需将其绑在水表上即可。它测量水表的磁场,并使用 RF 将此信息发送到 Flume 网桥。网桥连接到您家中的电源并链接到您的 Wifi 网络。然后,它将数据传输到云端。您可以从智能手机上运行的 flume water 应用程序中查看您的水流量。它还可以检测泄漏并在发生泄漏时向您发出漏水警报。

Flume Water Monitor 准确吗?

Flume 提供的详细用水报告与您的自来水公司报告的用水量非常接近。Flume 不玩猜测游戏。他们在各种水表上测试了 Flume – Sensus Sr2、Badger 25 和 Hersey 420。在所有这些水表中,Flume 关于用水量的使用数据与水表相比,差异仅为 ± 1%。

安装 Flume 的好处是什么?

水是我们最宝贵的资源之一,有了 Flume,您就可以实时查看您的用水量。此外,它还可以用作泄漏检测器,告诉您您的家中是否存在不寻常的水流 – 即使是来自滴水的水龙头。这种实时数据可以帮助您节约用水,并在此过程中省钱。