TL;DR InfluxDB技术提示:在Flux中使用yield()进行多个聚合
作者:Anais Dotis-Georgiou / 产品,用例,开发者
2021年9月27日
导航至
yield() 函数确定在Flux脚本中应返回哪些表输入。yield() 函数还为数组查询的输出分配一个名称。名称存储在默认注释中。
例如,如果我们查询以下表
_measurement | tag1 | _field | _value | _time |
Measurement1 | tagvalue1 | field1 | 1i | 2021-09-17T21:22:52.00Z |
没有yield函数
from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.00Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
返回以下带注释的CSV输出。注意默认注释默认设置为 _results
。
#group,false,false,true,true,false,false,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
#default,_results,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,tag1
,,0,2021-08-17T21:22:52.452072242Z,2021-08-17T21:23:52.452072242Z,2021-08-17T21:23:39.010094213Z,1,field1,Measurement1,tagvalue1
现在如果我们添加yield()函数
from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> yield(name: "myFluxQuery")
返回以下带注释的CSV输出。注意默认注释已更改为 myFluxQuery
。
#group,false,false,true,true,false,false,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
#default,myFluxQuery,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,tag1
,,0,2021-08-17T21:22:52.452072242Z,2021-08-17T21:23:52.452072242Z,2021-08-17T21:23:39.010094213Z,1,field1,Measurement1,tagvalue1
通过更改带注释的CSV,yield() 函数允许您将yield() 函数调用的时刻的查询结果作为一个新的表流返回。
yield() 函数非常重要,因为调用多个 yield() 函数允许您从单个 Flux 脚本中同时返回多个表流。
使用多个 yield() 函数返回多个聚合
想象一下,您想要返回单个表 min()、max() 和 mean() 的值
_measurement | _field | _value | _time |
measurement1 | field1 | 1.0 | 2021-09-17T21:22:52.00Z |
measurement1 | field1 | 2.0 | 2021-10-17T21:22:52.00Z |
measurement1 | field2 | 4.0 | 2021-11-17T21:22:52.00Z |
measurement1 | field3 | 5.0 | 2021-12-17T21:22:52.00Z |
新 Flux 用户,尤其是那些来自 SQL 或 InfluxQL 背景的用户,倾向于运行以下 Flux 查询
from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> min()
|> max()
|> mean()
这是因为他们习惯于能够执行 SELECT min("field1"), max("field1"), mean("field1")
。然而,上面的 Flux 查询实际上只会返回最小值。Flux 是管道式传递的,所以您必须使用多个 yield() 函数来一起返回最小值、最大值和平均值
from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> min()
|> yield(name: "min")
from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> max()
|> yield(name: "max")
from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> mean()
|> yield(name: "mean")
上面的脚本将生成三个表
结果:最小值
_measurement | _field | _value | _time |
measurement1 | field1 | 1.0 | 2021-09-17T21:22:52.00Z |
结果:最大值
_measurement | _field | _value | _time |
measurement1 | field1 | 5.0 | 2021-12-17T21:22:52.00Z |
结果:平均值
_measurement | _field | _value |
measurement1 | field1 | 3.0 |
注意:请记住,mean() 函数不会返回时间戳列,因为它是聚合器。与平均值相关联的没有时间戳。
使用变量进行多个聚合
虽然上面的 Flux 查询将生成所有三个转换,但这不是一个高效的查询,因为您正在多次查询整个数据集。相反,将基本查询存储在变量中,如下所示引用
data = from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
data_min = data
|> min()
|> yield(name: "min")
data_max = data
|> max()
|> yield(name: "max")
data_mean = data
|> mean()
|> yield(name: "mean")
重要提示:请确保不要将变量命名为与函数名相同的名称,以避免命名冲突。
使用和不使用 yield() 函数
值得注意的是,您不仅在每个查询或查询变量中可以使用 yield() 函数一次。您可以在同一个查询中使用多个 yield() 函数。例如,如果您只想从查询中返回最小值和原始数据,您可以使用 yield() 函数两次
from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> yield(name:"raw data")
|> min()
|> yield(name: "min")
此外,您不需要使用 yield() 函数来返回单个查询的结果。以下查询将生成一个表流,因为 InfluxDB 在没有使用 yield() 函数的情况下使用默认的 “_results” 分配返回默认注释的表流
from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> min()
但是,如果您在执行两个查询时没有调用多个 yields,您将得到一个错误。以下查询
data = from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
data_min = data
|> min()
data_max = data
|> max()
产生以下错误
error in query specification while starting program: this Flux script returns no streaming data. Consider adding a "yield" or invoking streaming functions directly, without performing an assignment.
此错误发生是因为 InfluxDB 无法返回具有相同默认 “_results” 分配的默认注释的不同表流。
结论
我希望这篇帖子能激发您利用 yield() 函数的灵感。如果您在编写 Flux 时需要帮助,请在我们 社区网站 或 Slack 频道中寻求帮助。如果您在 InfluxDB 上开发酷炫的 IoT 应用程序,我们非常愿意了解它,所以请确保 分享您的故事!另外,请在评论部分分享您的想法、担忧或问题。我们很高兴得到您的反馈并帮助您解决遇到的问题!
进一步阅读
虽然本帖旨在提供关于如何使用 yield() 函数的全面概述,但以下资源也可能对您感兴趣
- TL;DR InfluxDB 技术提示 – 如何解读带注释的 CSV:本文介绍了如何解读带注释的 CSV,这是 InfluxDB 的 Flux 查询结果格式。
- Flux 初学者常见难题及学习使用 Flux 的资源:本文介绍了 Flux 初学者常见的难题以及如何通过使用 InfluxDB UI、理解带注释的 CSV 等方法来解决这些问题。
- 中级 Flux 用户常见难题及优化 Flux 的资源:本文介绍了中级和高级 Flux 用户常见的难题,并提供了关于下推模式、Flux 引擎工作原理等更多详细内容。
- TL;DR InfluxDB 技术提示 – 优化 InfluxDB Cloud 中的 Flux 性能:本文介绍了如何使用 Flux Profiler 和 Flux VS Code 扩展来优化您的 Flux 性能。