TL;DR InfluxDB 技术提示:Flux 中使用 yield() 的多重聚合

导航至

yield() 函数确定应在 Flux 脚本中返回哪些表输入。 yield() 函数还为 Flux 查询的输出分配名称。该名称存储在默认注释中。

例如,如果我们查询下表

_measurement tag1 _field _value _time
Measurement1 tagvalue1 field1 1i 2021-09-17T21:22:52.00Z

没有 yield 函数

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.00Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )

将返回以下带注释的 CSV 输出。请注意,默认注释默认设置为 _results

#group,false,false,true,true,false,false,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
#default,_results,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,tag1
,,0,2021-08-17T21:22:52.452072242Z,2021-08-17T21:23:52.452072242Z,2021-08-17T21:23:39.010094213Z,1,field1,Measurement1,tagvalue1

现在,如果我们添加 yield() 函数

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> yield(name: "myFluxQuery")

将返回以下带注释的 CSV 输出。请注意,默认注释已更改为 myFluxQuery

#group,false,false,true,true,false,false,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
#default,myFluxQuery,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,tag1
,,0,2021-08-17T21:22:52.452072242Z,2021-08-17T21:23:52.452072242Z,2021-08-17T21:23:39.010094213Z,1,field1,Measurement1,tagvalue1

通过更改带注释的 CSV,yield() 函数允许您在调用 yield() 函数的时间点将查询结果作为新的表流返回。

yield() 函数非常重要,因为调用多个 yield() 函数允许您从单个 Flux 脚本同时返回多个表流。

使用多个 yield() 函数返回多个聚合

假设您要返回单个表的 min()max()mean()

_measurement _field _value _time
measurement1 field1 1.0 2021-09-17T21:22:52.00Z
measurement1 field1 2.0 2021-10-17T21:22:52.00Z
measurement1 field2 4.0 2021-11-17T21:22:52.00Z
measurement1 field3 5.0 2021-12-17T21:22:52.00Z

Flux 新用户,尤其是来自 SQL 或 InfluxQL 背景的用户,倾向于运行以下 Flux 查询

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> min()
|> max()
|> mean()

这是因为他们习惯于能够执行 SELECT min("field1"), max("field1"), mean("field1")。但是,上面的 Flux 查询实际上只会返回最小值。 Flux 是管道转发的,因此您必须使用多个 yield() 函数来一起返回最小值、最大值和平均值

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> min()
|> yield(name: "min") 

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> max()
|> yield(name: "max") 

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> mean()
|> yield(name: "mean")

上面的脚本将产生三个表

结果:最小值

_measurement _field _value _time
measurement1 field1 1.0 2021-09-17T21:22:52.00Z

结果:最大值

_measurement _field _value _time
measurement1 field1 5.0 2021-12-17T21:22:52.00Z

结果:平均值

_measurement _field _value
measurement1 field1 3.0

题外话: 请记住,mean() 函数不返回时间戳列,因为它是一个聚合器。平均值没有关联的时间戳。

使用变量执行多个聚合

虽然上面的 Flux 查询将产生所有三个转换,但这不是一个有效的查询,因为您多次查询整个数据集。相反,将基本查询存储在变量中并像这样引用它

data = from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )

data_min = data
|> min()
|> yield(name: "min") 

data_max = data
|> max()
|> yield(name: "max") 

data_mean = data
|> mean()
|> yield(name: "mean")

重要提示: 确保不要将变量命名为与函数名称相同,以避免命名冲突。

使用和不使用 yield() 函数

值得注意的是,您不限于每个查询或查询变量使用一次 yield() 函数。您可以在同一查询中多次使用 yield() 函数。例如,如果您只想返回查询中的最小值和原始数据,您可以使用 yield() 函数两次

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> yield(name:"raw data")
|> min()
|> yield(name: "min")

此外,您无需使用 yield() 函数即可返回单个查询的结果。以下查询将产生表流,因为 InfluxDB 在没有 yield() 函数的情况下,为默认注释返回带有默认“_results”分配的表流

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> min()

但是,如果您在进行两个查询时没有调用多个 yield,您将收到错误。以下查询

data = from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )

data_min = data
|> min()

data_max = data
|> max()

产生以下错误

error in query specification while starting program: this Flux script returns no streaming data. Consider adding a "yield" or invoking streaming functions directly, without performing an assignment.

发生此错误是因为 InfluxDB 无法为默认注释返回具有相同默认“_results”分配的不同表流。

结论

我希望这篇文章能够启发您利用 yield() 函数。如果您正在编写 Flux 并需要帮助,请在我们的社区网站Slack 频道中提问。如果您正在 InfluxDB 之上开发酷炫的物联网应用程序,我们很乐意听到您的消息,因此请务必分享您的故事!此外,请在评论区分享您的想法、疑虑或问题。我们很乐意获得您的反馈并帮助您解决遇到的任何问题!

延伸阅读

虽然这篇文章旨在全面概述如何使用 yield() 函数,但以下资源也可能让您感兴趣

  1. TL;DR InfluxDB 技术提示 – 如何解释带注释的 CSV:这篇文章描述了如何解释带注释的 CSV,这是 InfluxDB 的 Flux 查询结果格式。
  2. Flux 初学者面临的 5 大障碍以及学习使用 Flux 的资源:这篇文章描述了 Flux 初学者常遇到的障碍,以及如何通过使用 InfluxDB UI、理解带注释的 CSV 等来解决这些障碍。
  3. Flux 中级用户面临的 5 大障碍以及优化 Flux 的资源:这篇文章描述了 Flux 中级和高级用户常遇到的障碍,同时更详细地介绍了下推模式、Flux 引擎的工作原理等。
  4. TL;DR InfluxDB 技术提示 – 优化 InfluxDB Cloud 中的 Flux 性能:这篇文章描述了如何使用 Flux Profiler 和 Flux VS Code 扩展来优化您的 Flux 性能。