TL;DR InfluxDB技术提示:在Flux中使用yield()进行多个聚合

导航至

yield() 函数确定在Flux脚本中应返回哪些表输入。yield() 函数还为数组查询的输出分配一个名称。名称存储在默认注释中。

例如,如果我们查询以下表

_measurement tag1 _field _value _time
Measurement1 tagvalue1 field1 1i 2021-09-17T21:22:52.00Z

没有yield函数

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.00Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )

返回以下带注释的CSV输出。注意默认注释默认设置为 _results

#group,false,false,true,true,false,false,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
#default,_results,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,tag1
,,0,2021-08-17T21:22:52.452072242Z,2021-08-17T21:23:52.452072242Z,2021-08-17T21:23:39.010094213Z,1,field1,Measurement1,tagvalue1

现在如果我们添加yield()函数

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> yield(name: "myFluxQuery")

返回以下带注释的CSV输出。注意默认注释已更改为 myFluxQuery

#group,false,false,true,true,false,false,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
#default,myFluxQuery,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,tag1
,,0,2021-08-17T21:22:52.452072242Z,2021-08-17T21:23:52.452072242Z,2021-08-17T21:23:39.010094213Z,1,field1,Measurement1,tagvalue1

通过更改带注释的CSV,yield() 函数允许您将yield() 函数调用的时刻的查询结果作为一个新的表流返回。

yield() 函数非常重要,因为调用多个 yield() 函数允许您从单个 Flux 脚本中同时返回多个表流。

使用多个 yield() 函数返回多个聚合

想象一下,您想要返回单个表 min()max()mean() 的值

_measurement _field _value _time
measurement1 field1 1.0 2021-09-17T21:22:52.00Z
measurement1 field1 2.0 2021-10-17T21:22:52.00Z
measurement1 field2 4.0 2021-11-17T21:22:52.00Z
measurement1 field3 5.0 2021-12-17T21:22:52.00Z

新 Flux 用户,尤其是那些来自 SQL 或 InfluxQL 背景的用户,倾向于运行以下 Flux 查询

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> min()
|> max()
|> mean()

这是因为他们习惯于能够执行 SELECT min("field1"), max("field1"), mean("field1")。然而,上面的 Flux 查询实际上只会返回最小值。Flux 是管道式传递的,所以您必须使用多个 yield() 函数来一起返回最小值、最大值和平均值

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> min()
|> yield(name: "min") 

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> max()
|> yield(name: "max") 

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> mean()
|> yield(name: "mean")

上面的脚本将生成三个表

结果:最小值

_measurement _field _value _time
measurement1 field1 1.0 2021-09-17T21:22:52.00Z

结果:最大值

_measurement _field _value _time
measurement1 field1 5.0 2021-12-17T21:22:52.00Z

结果:平均值

_measurement _field _value
measurement1 field1 3.0

注意:请记住,mean() 函数不会返回时间戳列,因为它是聚合器。与平均值相关联的没有时间戳。

使用变量进行多个聚合

虽然上面的 Flux 查询将生成所有三个转换,但这不是一个高效的查询,因为您正在多次查询整个数据集。相反,将基本查询存储在变量中,如下所示引用

data = from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )

data_min = data
|> min()
|> yield(name: "min") 

data_max = data
|> max()
|> yield(name: "max") 

data_mean = data
|> mean()
|> yield(name: "mean")

重要提示:请确保不要将变量命名为与函数名相同的名称,以避免命名冲突。

使用和不使用 yield() 函数

值得注意的是,您不仅在每个查询或查询变量中可以使用 yield() 函数一次。您可以在同一个查询中使用多个 yield() 函数。例如,如果您只想从查询中返回最小值和原始数据,您可以使用 yield() 函数两次

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> yield(name:"raw data")
|> min()
|> yield(name: "min")

此外,您不需要使用 yield() 函数来返回单个查询的结果。以下查询将生成一个表流,因为 InfluxDB 在没有使用 yield() 函数的情况下使用默认的 “_results” 分配返回默认注释的表流

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> min()

但是,如果您在执行两个查询时没有调用多个 yields,您将得到一个错误。以下查询

data = from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )

data_min = data
|> min()

data_max = data
|> max()

产生以下错误

error in query specification while starting program: this Flux script returns no streaming data. Consider adding a "yield" or invoking streaming functions directly, without performing an assignment.

此错误发生是因为 InfluxDB 无法返回具有相同默认 “_results” 分配的默认注释的不同表流。

结论

我希望这篇帖子能激发您利用 yield() 函数的灵感。如果您在编写 Flux 时需要帮助,请在我们 社区网站Slack 频道中寻求帮助。如果您在 InfluxDB 上开发酷炫的 IoT 应用程序,我们非常愿意了解它,所以请确保 分享您的故事!另外,请在评论部分分享您的想法、担忧或问题。我们很高兴得到您的反馈并帮助您解决遇到的问题!

进一步阅读

虽然本帖旨在提供关于如何使用 yield() 函数的全面概述,但以下资源也可能对您感兴趣

  1. TL;DR InfluxDB 技术提示 – 如何解读带注释的 CSV:本文介绍了如何解读带注释的 CSV,这是 InfluxDB 的 Flux 查询结果格式。
  2. Flux 初学者常见难题及学习使用 Flux 的资源:本文介绍了 Flux 初学者常见的难题以及如何通过使用 InfluxDB UI、理解带注释的 CSV 等方法来解决这些问题。
  3. 中级 Flux 用户常见难题及优化 Flux 的资源:本文介绍了中级和高级 Flux 用户常见的难题,并提供了关于下推模式、Flux 引擎工作原理等更多详细内容。
  4. TL;DR InfluxDB 技术提示 – 优化 InfluxDB Cloud 中的 Flux 性能:本文介绍了如何使用 Flux Profiler 和 Flux VS Code 扩展来优化您的 Flux 性能。