使用InfluxDB Cloud、Python和Flask构建IoT应用程序(第3部分)
作者:Rick Spencer / 产品,用例,开发者
2021年7月2日
导航到
去年我开始了一个IoT项目,植物伴侣。这个项目涉及到将一些传感器焊接到一个Arduino上,并教会这个设备如何直接与InfluxDB Cloud通信,以便我可以监控这些植物。现在我将这个概念进一步发展,并为plantbuddy.com编写应用程序。这个应用程序将使用户能够可视化并从他们上传的植物伴侣设备数据中创建警报,以获得定制的用户体验。
如果您想了解一些背景信息,可以查看第1部分,我在那里设计了设备的基本功能,并教会它进行通信并添加一些通知,以及第2部分,我在那里开始了一些降采样和仪表板。在第4部分中,Barbara Nelson提供了这篇文章的续篇,其中她展示了如何使用UI(而非CLI)来实现相同的功能。)
目标(基于InfluxDB Cloud构建)
这个教程非常注重应用开发者。我恰好使用Python和Flask来编写这个教程,但所有的概念都应该适用于任何语言或Web开发框架。
本教程将涵盖
- 如何设置开发环境以开始在InfluxDB Cloud上进行开发
- 如何使用InfluxDB Python客户端库从用户那里接收数据,并将其写入InfluxDB Cloud
- 如何查询InfluxDB Cloud,以便您的应用程序可以可视化用户数据
- 如何在InfluxDB中安装降采样任务,这不仅可以节省存储成本,还可以优化用户的体验
- 如何使用InfluxDB Cloud的强大“检查和通知”系统,以帮助您为用户提供定制的警报
本教程旨在专注于InfluxDB,而不是一般的IoT应用程序,因此我不会关注以下内容
- 创建您的设备并从设备发送数据
- 为您的Web应用程序进行用户授权
- 在您的Web应用程序中管理机密
- 如何使用不同平台上的许多强大的可视化库
代码在哪里?
本教程的代码可在GitHub上找到。有两个分支
- 如果您只想看到起点,请参阅demo-start分支。
- 如果您想查看完成的项目,可以在demo-done 分支中找到。
plantbuddy.com 概述
数据分割策略
在我们开始构建开发环境之前,这里有一些关于是否以及如何分割用户数据的思考。这一部分相当理论化,您可以自由地跳过。
您的应用程序可能拥有许多用户。您通常会想只向每个用户展示他们输入到应用程序中的数据。根据您应用程序的规模和其他需求,有三种一般方法可以考虑。
- 单用户桶。这是默认方法。在这种方法中,您将所有用户的数据放入InfluxDB的单个桶中,并为每个用户使用不同的标签来启用快速查询。
- 每个用户一个桶。在这种方法中,您为每个用户创建一个单独的桶。这种方法在应用程序中管理起来更复杂,但允许您向用户提供读取和写入令牌,以便在需要时他们可以直接从自己的桶中读取和写入。
- 多组织。在这种方法中,您为每个用户创建一个单独的组织。这提供了客户数据的最终隔离,但设置和管理起来更复杂。这种方法适用于您作为OEM角色时,您的客户有自己的客户,您需要为您的客户提供管理工具。
方法 | 优点 | 缺点 | 总结 |
多用户桶 |
|
|
默认方法,使用标签在单个桶中分割客户 |
每个用户一个桶 |
|
|
当客户数据分割很重要但需要更多努力时很有用。允许直接读取和写入。 |
多组织 |
|
|
客户数据分割的终极方式。当您的客户有自己的客户时很有用。 |
由于Plant Buddy相对简单且低敏感性,多用户桶的默认方法最为合理。Plant Buddy有一个相当简单的模式。但如果您想了解更多关于这些主题的信息,您可以在这里阅读有关数据布局和模式设计的内容这里。
IoT应用程序和InfluxDB云架构概述
从更广阔的角度来看,Plant Buddy将大致如此运行
在顶层是用户体验。他们可以在网络浏览器中查看传感器读数,并且每个用户可以有任意数量的Plant Buddies报告数据。
Plant Buddy应用程序本身大致分为两部分:我称之为“写入网关”和“读取网关”。这些“网关”实际上只是Plant Buddy Flask应用程序中的端点,如您将看到的,但它们可以很容易地分为自己的应用程序。
Plant Buddy本身处理用户与其设备之间的读取和写入身份验证。每个设备和每个浏览器请求都应该由Plant Buddy授权。
最后是后端,即InfluxDB。读写网关将写入同一个桶(关于这一点,稍后介绍下采样时再详细介绍)。我们还将利用InfluxDB的任务系统和提供的系统桶。
Plant Buddy代码与InfluxDB后端的身份验证由InfluxDB令牌管理,这些令牌作为秘密存储在Web应用中。
Plant Buddy入门代码
一开始,Plant Buddy可以接收用户的设备信息,但只能解析并发送的数据,并将其打印出来。写入InfluxDB尚未实现。
@app.route("/write", methods = ['POST'])
def write():
user = users.authorize_and_get_user(request)
d = parse_line(request.data.decode("UTF-8"), user["user_name"])
print(d, flush=True)
return {'result': "OK"}, 200
def parse_line(line, user_name):
data = {"device" : line[:2],
"sensor_name" : sensor_names.get(line[2:4], "unknown"),
"value" : line[4:],
"user": user_name}
return data
同样,还有一个包含用于渲染图表的插槽的HTML模板集,但到目前为止,它只打印从请求对象中提取的用户名。
@app.route("/")
def index():
user = users.authorize_and_get_user(request)
return render_template("home.html",
user_name = user["user_name"],
graph_code = "<i>put graph here</i>"
您可以在VSCode中看到所有这些内容。
您可以在这里看到完整的入门代码。基本上,Web应用配置如下:
- 一个用于验证用户和提取用户名的模块(这是虚构的)。
- 一个用于安全存储InfluxDB秘密的模块(也是虚构的)。
- 一个用于接收Plant Buddy设备数据的端点。
- 一个用于Flask服务器渲染的HTML模板。
- 一个用于渲染该HTML模板的端点。
如您所见,我正在本地主机上运行应用程序,在Docker容器中简化开发。
InfluxDB云的IoT应用程序开发入门
现在,我们已经介绍了所有这些入门材料,是时候开始使用InfluxDB了!
先决条件
- 设置InfluxDB云账户。免费账户即可。
- 设置VSCode。
- 下载并安装Influx CLI。
设置InfluxDB CLI
我有一个空白的云账户,并且在我的开发笔记本电脑上安装了CLI。可以直接使用CLI,但我发现不断提供信息很麻烦,比如我目标区域、组织名称等——所以,设置CLI配置会容易得多。实际上,你可以设置多个配置并轻松地在它们之间切换,但现在,我只需要一个。
Influx CLI有一个很好的帮助系统,所以我会询问如何创建一个
$ influx config create -h
The influx config create command creates a new InfluxDB connection configuration
and stores it in the configs file (by default, stored at ~/.influxdbv2/configs).
Examples:
# create a config and set it active
influx config create -a -n $CFG_NAME -u $HOST_URL -t $TOKEN -o $ORG_NAME
这告诉我,我需要提供一个令牌、组织名称、主机URL以及为配置命名。
首先,我需要导航到InfluxDB UI,到生成我的第一个令牌。我通过在UI中使用左侧导航到数据 -> 令牌来实现。
然后点击生成令牌,选择一个所有访问令牌,并为它提供一个名称。
然后我可以通过点击列表中的令牌来获取令牌字符串。
如果我忘记了组织名称,我可以从“关于”页面获取。
让我们将所有内容都放入CLI并创建配置文件
$ influx config create -t cgIsJ7uamKESRiDkRz2mNPScXw_K_zswiOfdZmIQMina1TCtWk2NGu3VssF7cJPPf-QR88nPdFFrlC9GleTpwQ== -o [email protected] -u https://eastus-1.azure.cloud2.influxdata.com/ -n plantbuddy -a
产生以下输出
Active Name URL Org
* plantbuddy https://eastus-1.azure.cloud2.influxdata.com/ [email protected]
我们现在已经创建了一个plantbuddy CLI配置文件。
创建一个桶来存储用户数据
下一步是在InfluxDB中创建一个桶来存储用户数据。这是通过桶创建命令完成的
$ influx bucket create -n plantbuddy
产生以下输出
ID Name Retention Shard group duration Organization ID
f8fbced4b964c6a4 plantbuddy 720h0m0s n/a f1d35b5f11f06a1d
这为我创建了一个带有默认最大保留期的桶,即桶保留数据的时长。
上传一些Line Protocol
现在我已经有了桶,我可以上传一些数据到里面,这样我就可以立即开始测试,而不是先编写Python代码来处理数据。我生成了2天的样本数据。
行协议格式有详细的文档。但这里有一段摘录
light,user=rick,device_id=01 reading=26i 1621978469468115968
soil_moisture,user=rick,device_id=01 reading=144i 1621978469468115968
humidity,user=rick,device_id=01 reading=68i 1621978469468115968
soil_temp,user=rick,device_id=01 reading=67i 1621978469468115968
air_temp,user=rick,device_id=01 reading=72i 1621978469468115968
light,user=rick,device_id=01 reading=28i 1621978529468115968
soil_moisture,user=rick,device_id=01 reading=140i 1621978529468115968
humidity,user=rick,device_id=01 reading=65i 1621978529468115968
soil_temp,user=rick,device_id=01 reading=67i 1621978529468115968
air_temp,user=rick,device_id=01 reading=72i 1621978529468115968
这表明我在报告5个测量值:光、土壤湿度、湿度、土壤温度和空气温度。对于每个点,都有一个用户标签和一个设备ID标签。请注意,只有在预期用户拥有有限数量的设备时,此解决方案才会有效。否则,标签值的组合可能会导致基数爆炸。字段名为“reading”,其值为整数。我使用一个简单的Python脚本来生成这些测量值,每分钟一个,所以文件有点长。
$ influx write -b plantbuddy -f ~/generated.lp
没有报告错误,所以看起来它成功了,但让我们运行一个查询来确保。
运行一个查询
我可以通过CLI中的influx query命令运行测试查询,如下所示
$ influx query "from(bucket: \"plantbuddy\") |> range(start: 0)
这产生了大量的输出,所以我知道上传行协议成功了,但我可以进一步优化查询以获得更紧凑的输出。
$ influx query "from(bucket: \"plantbuddy\") |> range(start: 0) |> last() |> keep(columns: [\"_time\",\"_measurement\",\"_value\"]) |> group()"
此查询的输出如下
Result: _result
Table: keys: []
_time:time _value:float _measurement:string
------------------------------ ---------------------------- ----------------------
2021-05-27T16:00:22.881946112Z 70 air_temp
2021-05-27T16:00:22.881946112Z 69 humidity
2021-05-27T16:00:22.881946112Z 36 light
2021-05-27T16:00:22.881946112Z 173 soil_moisture
2021-05-27T16:00:22.881946112Z 66 soil_temp
因此,我已经有了数据,可以开始使用查询来探索它,但是通过CLI进行查询迭代并不特别容易。所以,让我们安装并设置InfluxDB VSCode插件。
设置VSCode和InfluxDB Cloud的Flux扩展
作为VSCode用户,我自然希望使用他们的编辑器来编写Flux查询,并保持整个项目在源代码控制之下。所以,下一步是设置InfluxDB VSCode扩展。通过在扩展管理器中搜索“Influx”可以轻松找到Flux扩展。
安装扩展后,你可以在底部左侧看到添加了一个InfluxDB窗口。
现在我可以使用该窗口通过将焦点放在InfluxDB窗口并单击+按钮来设置与我的云账户的连接。
如果我忘记了需要配置Flux扩展的信息,我可以使用influx config命令来设置凭据
$ influx config
Active Name URL Org
* plantbuddy https://eastus-1.azure.cloud2.influxdata.com/ [email protected]
我可以通过influx auth list命令找到我的令牌字符串,如下所示
$ influx auth list
因为我的组织名称包含一些特殊字符,我需要提供我的组织ID而不是组织名称。我可以通过influx org list命令找到组织ID,如下所示
$ influx org list
ID Name
f1d35b5f11f06a1d [email protected]
然后完成表格。
使用测试按钮,我可以看到连接已成功,然后保存它。保存后,请注意,InfluxDB窗口现在已填充,我可以浏览测量值。
从代码编辑器查询InfluxDB Cloud
现在连接已经设置好,我通常会在项目中添加一个临时Flux文件来运行Flux查询。
首先要注意的是,格式化功能正常工作。Flux 扩展还包含了语句完成和您所期望的现场帮助。
然后我可以尝试直接从编辑器运行查询。
这以整洁的网格视图显示我的结果,比将结果输出到终端更容易使用。
请注意,有关扩展的信息被写入OUTPUT窗口的Flux语言选项卡。如果您有Flux错误,这特别有用。
现在我已将开发环境设置好以用于InfluxDB,让我们实现后端。
实现写入功能
现在,我将回到编写Python代码以使Web应用工作。首先,我将处理对/write端点的调用,将这些调用转换为实际上将数据放入InfluxDB的操作。
- 我已经为这次创建了一个名为"plantbuddy"的桶。
- 接下来,我将创建一个新的token,它有权读写该桶。
- 然后我将导入InfluxDB v2 Python客户端库。
- 然后我将创建点并用客户端库写入它们。
创建token
首先,我需要一个有权读写该桶的token。您可以从influx auth create帮助中看到,有许多选项可以控制token的权限。
k$ influx auth create -h
Create authorization
Usage:
influx auth create [flags]
Flags:
-c, --active-config string Config name to use for command; Maps to env var $INFLUX_ACTIVE_CONFIG
--configs-path string Path to the influx CLI configurations; Maps to env var $INFLUX_CONFIGS_PATH (default "/Users/rick/.influxdbv2/configs")
-d, --description string Token description
-h, --help Help for the create command
--hide-headers Hide the table headers; defaults false; Maps to env var $INFLUX_HIDE_HEADERS
--host string HTTP address of InfluxDB; Maps to env var $INFLUX_HOST
--json Output data as json; defaults false; Maps to env var $INFLUX_OUTPUT_JSON
-o, --org string The name of the organization; Maps to env var $INFLUX_ORG
--org-id string The ID of the organization; Maps to env var $INFLUX_ORG_ID
--read-bucket stringArray The bucket id
--read-buckets Grants the permission to perform read actions against organization buckets
--read-checks Grants the permission to read checks
--read-dashboards Grants the permission to read dashboards
--read-dbrps Grants the permission to read database retention policy mappings
--read-notificationEndpoints Grants the permission to read notificationEndpoints
--read-notificationRules Grants the permission to read notificationRules
--read-orgs Grants the permission to read organizations
--read-tasks Grants the permission to read tasks
--read-telegrafs Grants the permission to read telegraf configs
--read-user Grants the permission to perform read actions against organization users
--skip-verify Skip TLS certificate chain and host name verification.
-t, --token string Authentication token; Maps to env var $INFLUX_TOKEN
-u, --user string The user name
--write-bucket stringArray The bucket id
--write-buckets Grants the permission to perform mutative actions against organization buckets
--write-checks Grants the permission to create checks
--write-dashboards Grants the permission to create dashboards
--write-dbrps Grants the permission to create database retention policy mappings
--write-notificationEndpoints Grants the permission to create notificationEndpoints
--write-notificationRules Grants the permission to create notificationRules
--write-orgs Grants the permission to create organizations
--write-tasks Grants the permission to create tasks
--write-telegrafs Grants the permission to create telegraf configs
--write-user Grants the permission to perform mutative actions against organization users
--write-bucket
和 --read-bucket
是我想查找的选项。这些选项使用桶id而不是桶名称。您可以通过influx bucket list命令轻松找到id。
$ influx bucket list
ID Name Retention Shard group duration Organization ID
d6ec11a304c652aa _monitoring 168h0m0s n/a f1d35b5f11f06a1d
fe25b83e9e002181 _tasks 72h0m0s n/a f1d35b5f11f06a1d
f8fbced4b964c6a4 plantbuddy 720h0m0s n/a f1d35b5f11f06a1d
然后我可以使用influx auth create命令创建token,如下所示
$ influx auth create --write-bucket f8fbced4b964c6a4 --read-bucket f8fbced4b964c6a4
输出包括token字符串,然后我可以将其注册到我的应用程序服务器的密钥存储中。
ID Description Token User Name User ID Permissions
0797c045bc99e000 d0QnHz8bTrQU2XI798YKQzmMQY36HuDPRWiCwi8Lppo1U4Ej5IKhCC-rTgeRBs3MgWsomr-YXBbDO3o4BLJe9g== [email protected] 078bedcd5c762000 [read:orgs/f1d35b5f11f06a1d/buckets/f8fbced4b964c6a4 write:orgs/f1d35b5f11f06a1d/buckets/f8fbced4b964c6a4]
导入和设置InfluxDB Python库
在您的环境中安装InfluxDB Python客户端后,下一步是
- 导入库。
- 设置客户端。
- 创建写入和查询API。
import influxdb_client
client = influxdb_client.InfluxDBClient(
url = "https://eastus-1.azure.cloud2.influxdata.com/",
token = secret_store.get_bucket_secret(),
org = "f1d35b5f11f06a1d"
)
write_api = client.write_api()
query_api = client.query_api()
我在代码文件顶部添加了此代码,这样我就可以在整个模块中轻松使用写入和查询API。
创建并写入点
现在我已设置好客户端库和相关的API,我可以将我的代码从仅打印用户上传的数据更改为实际保存它。首先部分是创建一个点,这是一个表示我想要写入的数据的对象。
我将通过创建一个新的函数“write_to_influx”来完成此操作
def write_to_influx(data):
p = influxdb_client.Point(data["sensor_name"]).tag("user",data["user"]).tag("device_id",data["device"]).field("reading", int(data["value"]))
write_api.write(bucket="plantbuddy", org="f1d35b5f11f06a1d", record=p)
print(p, flush=True)
此方法接收一个Python字典,并提取用于标签和字段的值。您可以在点中包含多个标签和字段,但Plant Buddy仅使用单个字段,“reading”。它还在函数结束时打印出点,主要是为了让您能够看到它的工作。
现在我可以更新我的写入端点以使用该函数
@app.route("/write", methods = ['POST'])
def write():
user = users.authorize_and_get_user(request)
d = parse_line(request.data.decode("UTF-8"), user["user_name"])
write_to_influx(d)
return {'result': "OK"}, 200
我恰好已经有了一个植物伴侣,它正在向服务器写入数据点,看起来它正在正常工作。
通过运行一个简单的查询,我可以确认数据正在被加载到我的桶中。
实现读取
植物伴侣网页将非常简单。它将显示过去48小时的植物伴侣数据图。为了使这个工作,我需要
- 编写一个查询来获取数据
- 使用InfluxDB v2 Python客户端库来获取数据
- 创建一个图表
- 遍历结果并将它们添加到图表中
- 在网页中显示图表
编写查询
这是一个简单的查询。
from(bucket: "plantbuddy")
|> range(start: -48h)
|> filter(fn: (r) => r.user == "rick")
这会返回大量数据。重要的是要理解,数据是按时间序列组织返回的。时间序列首先按测量值组织,然后按标签值和字段进一步细分。每个时间序列本质上是一个按时间戳顺序排列的相关数据表,非常适合图表,每个时间序列表在图表中就是一条线。
然而,我将在运行时不知道用户名,所以我需要参数化查询。我将为此使用简单的字符串替换。所以,我只需稍微调整查询即可
from(bucket: "plantbuddy")
|> range(start: -48h)
|> filter(fn: (r) => r.user == "{}")
此外,我还需要在运行时读取此查询,所以我添加了一个名为“graph.flux”的文件,并将查询保存在该文件中。
现在,我已经准备好了查询,可以使用它来获取数据。
从InfluxDB获取数据
在我的app.py中的index()函数中,我首先打开Flux文件,替换用户名,然后使用之前实例化的查询API获取结果集,将其添加到index()函数中
query = open("graph.flux").read().format(user["user_name"])
result = query_api.query(query, org="f1d35b5f11f06a1d")
使用mpld3和InfluxDB Cloud可视化时间序列
对于植物伴侣,我选择了mpld3库来创建我的图表。我选择这个库是因为它非常容易使用。
在将mpld3库及其依赖项添加到我的环境中后,我需要导入一些内容
import matplotlib.pyplot as plt, mpld3
现在,我可以用结果构建图表。为此,对于每个测量值(光、湿度等),我需要创建x轴的值列表和y轴的值列表。当然,x轴将是时间。
如上所述,Flux数据模型以非常适合此应用程序的格式返回数据。它为每个测量值返回一个表,所以我只需遍历这些表,然后遍历每个表中的每条记录,构建列表,并让matplotlib绘制它们。
fig = plt.figure()
for table in result:
x_vals = []
y_vals = []
label = ""
for record in table:
y_vals.append(record["_value"])
x_vals.append(record["_time"])
label = record["_measurement"]
plt.plot(x_vals, y_vals, label=label)
然后我通过请求图例,将图表转换为html,并将其传递给模板进行渲染来完成。
plt.legend()
grph = mpld3.fig_to_html(fig)
return render_template("home.html",
user_name = user["user_name"],
graph_code = grph)
现在,当我加载索引页面时,你可以看到图表正在工作。
聚合和降采样
聚合
需要注意的一点是,matplotlib 创建图形需要相当长的时间,而 mpld3 转换为 html 也需要相当长的时间。但用户并不需要每个点都被绘制出来。因此,我们可以通过在 Flux 查询中添加一些聚合来加速用户界面。我们不再检索每个点,而是每10分钟检索一次平均值。我只是在查询中添加了一个使用 aggregateWindow() 函数的聚合
from(bucket: "plantbuddy")
|> range(start: -48h)
|> filter(fn: (r) => r.user == "rick")
|> aggregateWindow(every: 10m, fn: mean)
现在页面加载得更快了,而且图形看起来也更好看。
实际上,对于这样一个简单的查询和数据集,这样的查询在生产环境中将绰绰有余。然而,我们可以通过降采样进一步展示如何优化 UI 延迟。
降采样
降采样 涉及从高分辨率数据计算较低分辨率的显示数据,并将这些数据预先计算好以供显示或进一步计算。除了提供更流畅的用户体验外,它还可以节省存储成本,因为您可以将降采样数据存储在比原始数据保留期更长的存储桶中。
为了实现这一点,我需要
- 创建一个新的降采样存储桶和一个新的令牌,可以读取和写入该存储桶
- 创建一个降采样 Flux 脚本
- 创建一个任务,定期运行该脚本
- 将 graph.flux 中的 flux 更改为查询降采样存储桶
创建一个新的存储桶和令牌
首先,我将像之前一样创建一个新的存储桶,命名为“downsampled”。
$ influx bucket create -n downsampled
输出会友好地给出存储桶 ID
ID Name Retention Shard group duration Organization ID
c7b43676728de98d downsampled 720h0m0s n/a f1d35b5f11f06a1d
然而,为了简单起见,我将创建一个可以读取和写入两个存储桶的单个令牌。首先,列出存储桶以获取存储桶 ID
$ influx bucket list
ID Name Retention Shard group duration Organization ID
d6ec11a304c652aa _monitoring 168h0m0s n/a f1d35b5f11f06a1d
fe25b83e9e002181 _tasks 72h0m0s n/a f1d35b5f11f06a1d
c7b43676728de98d downsampled 720h0m0s n/a f1d35b5f11f06a1d
f8fbced4b964c6a4 plantbuddy 720h0m0s n/a f1d35b5f11f06a1d
然后创建一个具有两个存储桶的读取和写入权限的令牌
$ influx auth create --write-bucket c7b43676728de98d --write-bucket f8fbced4b964c6a4 --read-bucket c7b43676728de98d --read-bucket f8fbced4b964c6a4
ID Description Token User Name User ID Permissions
079820bd8b7c1000 hf356pobXyeoeqpIIt6t-ge7LI-UtcBBElq8Igf1K1wxm5Sv9XK8BleS79co32gCQwQ1voXuwXu1vEZg-sYDRg== [email protected] 078bedcd5c762000 [read:orgs/f1d35b5f11f06a1d/buckets/c7b43676728de98d read:orgs/f1d35b5f11f06a1d/buckets/f8fbced4b964c6a4 write:orgs/f1d35b5f11f06a1d/buckets/c7b43676728de98d write:orgs/f1d35b5f11f06a1d/buckets/f8fbced4b964c6a4]
然后获取这个新令牌并替换应用程序密钥存储中的旧令牌。重新启动并确保一切正常工作。
创建一个降采样 Flux 脚本
降采样脚本有两个基本步骤。首先进行一些聚合,然后写入聚合数据。
我已经通过创建新的 graph.flux 文件找到了想要进行的聚合。我将使用现有的聚合作为起点添加一个“downsample.flux”文件。一个关键的区别是,我想降采样所有数据,而不仅仅是特定用户的数据。因此,我的聚合步骤将跳过过滤器
from(bucket: "plantbuddy")
|> range(start: -48h)
|> aggregateWindow(every: 10m, fn: mean)
运行这个脚本,我可以看到这会聚合存储桶中的所有数据。
现在我只需要添加一个 to() 函数,将所有降采样数据写入我的降采样存储桶
from(bucket: "plantbuddy")
|> range(start: -48h)
|> aggregateWindow(every: 10m, fn: mean)
|> to(bucket: "downsampled")
查询降采样存储桶,我可以看到所有数据都在那里。
我已经降采样了所有现有数据,所以接下来我将设置一个任务,以便数据流入时也会进行降采样。
从 Flux 脚本创建降采样任务
这只是将我的 downsample.flux 脚本注册为每10分钟运行一次的任务这么简单。
第一步是将范围更改为仅回溯最后10分钟。我不想不断重新下采样已经下采样过的数据。
from(bucket: "plantbuddy")
|> range(start: -10m)
|> aggregateWindow(every: 10m, fn: mean)
|> to(bucket: "downsampled")
接下来,我需要添加每个任务都需要的一个选项。有多个字段可供选择,但对我来说只需要一个名称和“每个”,这告诉任务系统Flux的运行频率。
现在我的完整downsample.flux看起来是这样的
option task = {
name: "downsampled",
every: 10m
}
from(bucket: "plantbuddy")
|> range(start: -10m)
|> aggregateWindow(every: 10m, fn: mean)
|> to(bucket: "downsampled")
现在我只需将此任务注册到InfluxDB Cloud。这只需要使用CLI中的task create命令。
$ influx task create -f downsample.flux
ID Name Organization ID Organization Status Every Cron
079824d7a391b000 downsampled f1d35b5f11f06a1d [email protected] active 10m
由于输出提供了任务ID,我可以使用它来监视任务。
$ influx task log list --task-id 079824d7a391b000
我可以看到它已成功运行,还有一些其他有用的信息,例如实际运行的Flux。
RunID Time Message
079825437a264000 2021-05-28T01:30:00.099978246Z Started task from script: "option task = {\n name: \"downsampled\",\n every: 10m\n}\nfrom(bucket: \"plantbuddy\") \n |> range(start: -10m)\n |> aggregateWindow(every: 10m, fn: mean) \n |> to(bucket: \"downsampled\")"
079825437a264000 2021-05-28T01:30:00.430597345Z trace_id=0adc0ed1a407fd7a is_sampled=true
079825437a264000 2021-05-28T01:30:00.466570704Z Completed(success)
更新图形查询
最后,我可以更新图形所用的查询,以从下采样存储桶中读取。这是一个更简单、更快的查询。
from(bucket: "downsampled")
|> range(start: -48h)
|> filter(fn: (r) => r.user == "{}" )
重启一切,运行速度飞快!
通知
我们将添加的最后一个功能是让plantbuddy.com能够在土壤太干燥时通知用户。使用InfluxDB,您可以使用任务创建状态检查和通知规则,根据您定义的条件将消息发送到您的应用程序。您不应在应用程序中添加轮询逻辑来读取状态;您应该让InfluxDB为您完成这项工作!
虽然使用单个任务实现此功能有更简单的方法,例如执行简单的查询,然后在同一任务中直接使用http.post(),但plantbuddy.com将充分利用整个检查和通知系统。额外的好处特别包括存储在 _monitoring 桶中的状态和通知记录。此外,由于这些数据以标准格式存储,因此可以轻松共享仪表板和查询等工具。
阈值检查
我想设置的第一种任务是阈值检查。这是一种任务,
- 查询感兴趣的数据值
- 检查这些值是否超过某些阈值
- 使用Flux监视库将时序数据写入 _monitoring 桶
- 从Flux脚本创建任务并启动它
查询您要检查的值
Plant Buddy将能够向任何用户提供通知,如果他们的土壤湿度变得太干燥。我将在新文件check.flux中创建相关的Flux查询(很快将成为任务),一个简单的查询可以回溯时间并找到所有(和仅)土壤湿度值
data = from(bucket: "plantbuddy")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "soil_moisture")
注意我正在查询plantbuddy桶中的原始数据,而不是下采样数据。
为值定义阈值
接下来,我需要确定要监视的不同阈值。检查系统将使用这些阈值来设置每行数据的状况。阈值检查有4个可能的级别
- ok
- Info
- warn
- crit
为每个创建一个返回定义值是否在该阈值内的布尔值的谓词函数来设置阈值。您不需要在阈值检查中使用所有级别。对于Plant Buddy,土壤湿度可以是“ok”或“crit”,所以我只定义这两个函数
ok = (r) => r.reading > 35
crit = (r) => r.reading <= 35
基本上,如果土壤湿度读数降至35或以下,则为关键。
如上所述,结果的状态将被写入 _monitoring 桶。您还需要提供一个谓词函数来创建要记录的消息。这个消息可以是复杂的,也可以是简单的。我保持简单
messageFn = (r) => "soil moisture at ${string(v:r.reading)} for ${r.user}"
这些谓词函数是monitor.check()函数的参数,该函数生成状态并将它们写入 _monitoring 桶,我们在下面将使用该桶。
使用monitor.check()函数为每条记录生成状态
监控包(monitoring package)中有一个名为monitoring.check()的函数,现在它可以为您计算并记录状态。它将遍历返回数据中的每条记录,计算状态等级(在这个例子中是ok或crit),计算消息字符串,然后与一些其他信息一起将其记录在_monitoring桶中。
首先,我们需要向monitor.check提供一些元数据。需要一个id、名称、类型和标签列表。标签在您有多个通知规则时很有用。由于目前我只计划一个,所以我将标签对象留空。
check = {_check_id: "check1xxxxxxxxxx",
_check_name: "soil moisture check",
_type: "threshold",
tags: {}
现在我的脚本可以继续调用monitor.check。请注意以下几点
- 通过schema.fieldsAsCols()函数进行管道转发是必需的。这是monitor.check函数期望的形状。这也使得编写一些检查更容易,因为您可以更容易地编写结合不同字段值的表达式。
- 由于检查元数据参数命名为“data”,所以我将这个选项称为“check”,而我将检索到的时间序列数据称为“data”。这对我来说更清晰。
我还添加了一个yield(),以便我更容易运行脚本并检查结果。
data
|> v1["fieldsAsCols"]()
|> yield()
|> monitor.check(data: check, messageFn: messageFn, crit: crit, ok: ok)
当所有内容都在一个地方时,更易于理解其工作原理
import "influxdata/influxdb/monitor"
import "influxdata/influxdb/v1"
option task = {name: "soil moisture check", every: 10m, offset: 0s}
data = from(bucket: "plantbuddy")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "soil_moisture")
ok = (r) => r.reading > 35
crit = (r) => r.reading <= 35
messageFn = (r) => "soil moisture at ${string(v:r.reading)} for ${r.user}"
check = {_check_id: "check1xxxxxxxxxx",
_check_name: "soil moisture check",
_type: "threshold",
tags: {}
}
data
|> v1["fieldsAsCols"]()
|> yield()
|> monitor.check(data: check, messageFn: messageFn, crit: crit, ok: ok)
现在我可以手动运行我的检查。不出所料,它为过去10分钟内的每条读数创建了一个“ok”级别。
我可以进一步深入,直接查询_monitoring桶
from(bucket: "_monitoring")
|> range(start: -5m)
|> filter(fn: (r) => r._field == "_message")
一切看起来都很正常,所以接下来,我将设置InfluxDB在预定时间运行check.flux中的代码。
创建阈值检查任务
我已经添加了选项将脚本转换为任务
option task = {name: "soil moisture check", every: 10m, offset: 0s}
因此,现在我可以仅使用check.flux文件创建任务
$ influx task create -f check.flux
ID Name Organization ID Organization Status Every Cron
0798ba98b30d5000 soil moisture check f1d35b5f11f06a1d [email protected] active 10m
现在,我有一个每10分钟运行一次的任务,记录状态,所以接下来我需要编写一个通知规则,这是一个调用Plant Buddy端点的另一个任务,以便Plant Buddy可以通知用户。
通知规则
为了编写这个任务,我需要完成几件事情
- 在InfluxDB中添加一个秘密,以便Plant Buddy可以授权任何来自InfluxDB Cloud的通信
- 在plantbuddy.com上添加一个端点,当通知规则的条件满足时将被调用
- 在Flux中创建一个端点变量,指向一个公开可访问的URL(我在防火墙中打了一个洞,以便我的开发服务器可以从InfluxDB Cloud访问)
- 使用监控库搜索任何我想通知用户的状况(特别是如果_level是crit)
- 为符合该条件的每条记录调用端点,发送由检查创建的消息
在InfluxDB Cloud中存储和使用秘密
首先,我想存储Plant Buddy将用来验证任何传入请求来自InfluxDB的令牌。我可以简单地将它添加到Flux脚本中,但使用平台提供的秘密存储比将其保留在源代码中更安全。
$ influx secret update -k pb_secret -v sqps8LCY6z8XuZ2k
Key Organization ID
pb_secret f1d35b5f11f06a1d
然后,我可以使用这段代码检索秘密。我创建了一个名为 notifation_rule.flux 的新文件来存储这个 Flux。这就是将秘密作为字符串检索所涉及的全部内容
import "influxdata/influxdb/secrets"
secrets.get(key: "pb_secret")
向 Plant Buddy 添加一个端点
实际上以某种方式通知用户尚未实现,所以,我将简单地将其代码添加到 app.py 中,以确保系统端到端运行
@app.route("/notify", methods = ['POST'])
def notify():
print("notification received", flush=True)
print(request.data, flush=True)
return {'result': "OK"}, 200
# TODO: check the authorization and actually notify the user
将端点添加到 Flux
现在,我可以在我的 Flux 中引用该端点
endpoint = http.endpoint(url: "http://71.126.178.222:5000/notify")
你可能注意到我尚未提供头部或正文。因为我希望正文包含每个状态的状态和消息,所以我将在实际调用端点时动态添加头部和正文。
查找匹配的状态
此代码使用监视库内置的便利函数回溯 2 分钟,并找到任何设置为 crit 的状态
monitor.from(start: -2m)
|> filter(fn: (r) => r._level == "crit")
在接下来的步骤中,我将实际触发并记录任何匹配的通知。
设置通知规则任务
就像状态检查一样,Flux 中的监视库要求 notify 函数正常工作,需要以特定格式提供元数据。我在此对象中定义了它
rule = {
_notification_rule_id: "notif-rule01xxxx",
_notification_rule_name: "soil moisture crit",
_notification_endpoint_id: "1234567890123456",
_notification_endpoint_name: "soil moisture crit",
}
通知
现在,我可以完成这项工作并调用 monitor.notify() 函数。我只需要提供
- 为通知定义的元数据。当通知被触发时,它用于在 InfluxDB 中记录通知事件发生(以及通知是否成功发送)。
- 提供一个 map() 函数,该函数返回我想要通过 http.post() 发送到我定义的端点的头部和数据。
monitor.from(start: -2m)
|> filter(fn: (r) => r._level == "crit")
|> monitor.notify(
data: rule,
endpoint: endpoint(
mapFn: (r) => ({
headers: {"Authorization":secrets.get(key: "pb_secret")},
data: bytes(v: r._message),
}),
),
)
|> yield(name: "monitor.notify")
当我运行这个时,没有结果。
这是因为没有 crit 状态。然而,我可以合成一个 crit 状态以产生结果
$ influx write --bucket plantbuddy "soil_moisture,user=rick,device_id=01 reading=20i"
然后,我可以重新运行 check.flux 和 notifcation_rule.flux,并看到我的应用程序已收到通知,InfluxDB 记录了通知已发送。
再次,我认为当你看到 notification_rule.flux 的全部内容时,更容易理解它如何协同工作。
import "influxdata/influxdb/monitor"
import "influxdata/influxdb/secrets"
import "http"
option task = {
name: "SM Notification Rule",
every: 1m,
offset: 0s,
}
rule = {
_notification_rule_id: "notif-rule01xxxx",
_notification_rule_name: "soil moisture crit",
_notification_endpoint_id: "1234567890123456",
_notification_endpoint_name: "soil moisture crit",
}
endpoint = http.endpoint(url: "http://71.126.178.222:5000/notify")
monitor.from(start: -2m)
|> filter(fn: (r) => r._level == "crit")
|> monitor.notify(
data: rule,
endpoint: endpoint(
mapFn: (r) => ({
headers: {"Authorization":secrets.get(key: "pb_secret")},
data: bytes(v: r._message),
}),
),
)
|> yield(name: "monitor.notify")
创建通知规则任务
最后,为了完成它,我需要将通知规则作为任务注册到 InfluxDB。像以前一样,我需要创建一个名为“task”的选项,以便任务系统知道如何处理脚本
option task = {
name: "SM Notification Rule",
every: 1m,
offset: 0s,
}
然后,我可以像以前一样简单地创建任务。
$ influx task create -f notification_rule.flux
ID Name Organization ID Organization Status Every Cron
0798c6de048d5000 SM Notification Rule f1d35b5f11f06a1d [email protected] active 1m
运营监控
关于我的 InfluxDB 后端,我现在有
- 数据流入 InfluxDB
- 3 个任务正在运行:降采样任务、阈值检查任务和通知规则任务
我如何监控后端是否正常运行?通过将 运营监控模板 安装到您的帐户中,可以轻松实现。请注意,您需要从免费帐户升级,以便拥有足够的桶配额来安装模板。
安装模板后,例如,我可以使用任务摘要仪表板来确认所有任务都运行良好。我可以继续创建检查和通知,如果出现任何问题,可以提醒我(而不是我的用户)。