TL;DR InfluxDB 技术提示:Flux 中使用 yield() 的多重聚合
作者:Anais Dotis-Georgiou / 产品, 用例, 开发者
2021 年 9 月 27 日
导航至
yield() 函数确定应在 Flux 脚本中返回哪些表输入。 yield() 函数还为 Flux 查询的输出分配名称。该名称存储在默认注释中。
例如,如果我们查询下表
_measurement | tag1 | _field | _value | _time |
Measurement1 | tagvalue1 | field1 | 1i | 2021-09-17T21:22:52.00Z |
没有 yield 函数
from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.00Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
将返回以下带注释的 CSV 输出。请注意,默认注释默认设置为 _results
。
#group,false,false,true,true,false,false,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
#default,_results,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,tag1
,,0,2021-08-17T21:22:52.452072242Z,2021-08-17T21:23:52.452072242Z,2021-08-17T21:23:39.010094213Z,1,field1,Measurement1,tagvalue1
现在,如果我们添加 yield() 函数
from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> yield(name: "myFluxQuery")
将返回以下带注释的 CSV 输出。请注意,默认注释已更改为 myFluxQuery
。
#group,false,false,true,true,false,false,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
#default,myFluxQuery,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,tag1
,,0,2021-08-17T21:22:52.452072242Z,2021-08-17T21:23:52.452072242Z,2021-08-17T21:23:39.010094213Z,1,field1,Measurement1,tagvalue1
通过更改带注释的 CSV,yield() 函数允许您在调用 yield() 函数的时间点将查询结果作为新的表流返回。
yield() 函数非常重要,因为调用多个 yield() 函数允许您从单个 Flux 脚本同时返回多个表流。
使用多个 yield() 函数返回多个聚合
假设您要返回单个表的 min()、max() 和 mean() 值
_measurement | _field | _value | _time |
measurement1 | field1 | 1.0 | 2021-09-17T21:22:52.00Z |
measurement1 | field1 | 2.0 | 2021-10-17T21:22:52.00Z |
measurement1 | field2 | 4.0 | 2021-11-17T21:22:52.00Z |
measurement1 | field3 | 5.0 | 2021-12-17T21:22:52.00Z |
Flux 新用户,尤其是来自 SQL 或 InfluxQL 背景的用户,倾向于运行以下 Flux 查询
from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> min()
|> max()
|> mean()
这是因为他们习惯于能够执行 SELECT min("field1"), max("field1"), mean("field1")
。但是,上面的 Flux 查询实际上只会返回最小值。 Flux 是管道转发的,因此您必须使用多个 yield() 函数来一起返回最小值、最大值和平均值
from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> min()
|> yield(name: "min")
from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> max()
|> yield(name: "max")
from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> mean()
|> yield(name: "mean")
上面的脚本将产生三个表
结果:最小值
_measurement | _field | _value | _time |
measurement1 | field1 | 1.0 | 2021-09-17T21:22:52.00Z |
结果:最大值
_measurement | _field | _value | _time |
measurement1 | field1 | 5.0 | 2021-12-17T21:22:52.00Z |
结果:平均值
_measurement | _field | _value |
measurement1 | field1 | 3.0 |
题外话: 请记住,mean() 函数不返回时间戳列,因为它是一个聚合器。平均值没有关联的时间戳。
使用变量执行多个聚合
虽然上面的 Flux 查询将产生所有三个转换,但这不是一个有效的查询,因为您多次查询整个数据集。相反,将基本查询存储在变量中并像这样引用它
data = from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
data_min = data
|> min()
|> yield(name: "min")
data_max = data
|> max()
|> yield(name: "max")
data_mean = data
|> mean()
|> yield(name: "mean")
重要提示: 确保不要将变量命名为与函数名称相同,以避免命名冲突。
使用和不使用 yield() 函数
值得注意的是,您不限于每个查询或查询变量使用一次 yield() 函数。您可以在同一查询中多次使用 yield() 函数。例如,如果您只想返回查询中的最小值和原始数据,您可以使用 yield() 函数两次
from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> yield(name:"raw data")
|> min()
|> yield(name: "min")
此外,您无需使用 yield() 函数即可返回单个查询的结果。以下查询将产生表流,因为 InfluxDB 在没有 yield() 函数的情况下,为默认注释返回带有默认“_results”分配的表流
from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> min()
但是,如果您在进行两个查询时没有调用多个 yield,您将收到错误。以下查询
data = from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
data_min = data
|> min()
data_max = data
|> max()
产生以下错误
error in query specification while starting program: this Flux script returns no streaming data. Consider adding a "yield" or invoking streaming functions directly, without performing an assignment.
发生此错误是因为 InfluxDB 无法为默认注释返回具有相同默认“_results”分配的不同表流。
结论
我希望这篇文章能够启发您利用 yield() 函数。如果您正在编写 Flux 并需要帮助,请在我们的社区网站或 Slack 频道中提问。如果您正在 InfluxDB 之上开发酷炫的物联网应用程序,我们很乐意听到您的消息,因此请务必分享您的故事!此外,请在评论区分享您的想法、疑虑或问题。我们很乐意获得您的反馈并帮助您解决遇到的任何问题!
延伸阅读
虽然这篇文章旨在全面概述如何使用 yield() 函数,但以下资源也可能让您感兴趣
- TL;DR InfluxDB 技术提示 – 如何解释带注释的 CSV:这篇文章描述了如何解释带注释的 CSV,这是 InfluxDB 的 Flux 查询结果格式。
- Flux 初学者面临的 5 大障碍以及学习使用 Flux 的资源:这篇文章描述了 Flux 初学者常遇到的障碍,以及如何通过使用 InfluxDB UI、理解带注释的 CSV 等来解决这些障碍。
- Flux 中级用户面临的 5 大障碍以及优化 Flux 的资源:这篇文章描述了 Flux 中级和高级用户常遇到的障碍,同时更详细地介绍了下推模式、Flux 引擎的工作原理等。
- TL;DR InfluxDB 技术提示 – 优化 InfluxDB Cloud 中的 Flux 性能:这篇文章描述了如何使用 Flux Profiler 和 Flux VS Code 扩展来优化您的 Flux 性能。