TL;DR InfluxDB 技术小贴士 —— 从子查询到 Flux!

导航到

在这篇博文中,我们将 InfluxDB 1.x 版本中使用 InfluxQL 的子查询翻译成 Flux,这是 InfluxDB 1.8 及更高版本中的一种数据脚本和函数查询语言,无论是在开源版(OSS)还是云版(Cloud)。这里翻译的子查询来自 这篇博客。本博客假设您已经对 Flux 有基本的了解。如果您对 Flux 完全陌生,我建议您查看以下文档和博客

如何将以下 InfluxQL 子查询翻译成 Flux?

子查询 1:对函数进行函数操作

以下子查询计算了每个地铁 标签 乘客的平均值,并返回最大平均值

> SELECT MAX("mean") FROM (SELECT MEAN("passengers") FROM "schedule" GROUP BY "subway")

Flux 翻译 1:对函数进行函数操作

选择您要查询数据的 和您要查询的 范围。使用 过滤 选择感兴趣的 字段。计算特定列的平均值。Flux 根据分组键将数据分组到表格中。您不需要按照 subway 标签 键进行分组,因为它默认是一个 分组键。现在将所有平均值组合到一个表格中,以找到整体最大平均值。

from(bucket: "schedule")
  |> range(start: -10d)
  |> filter(fn: (r) => r["_field"] == "passengers") 
  |> mean(column: "_value")
  |> group()
  |> max()

在InfluxDB 1.7及更早版本中,InfluxQL查询返回的是一组无组织的数据。从InfluxDB 1.8版本开始,数据以标注CSV的形式组织。这种数据格式包含元数据,可以帮助您执行更复杂的查询。Flux允许您以组织良好的组或表格的形式返回数据。表格有分组键,即表中每一行都拥有相同值的列列表。分组键值(true或false)被分配给表中的每一列。例如,列名可能是“subway”,代表我们的地铁标签键。如果表中“subway”列的每一行都是相同的,那么分组键值设置为true。这表明两种可能性之一:1)您只有一个标签值,并且该列的所有行都是相同的;2)您使用了group()函数。如果后者是正确的,那么您的数据已经被分组到更小的表格中,例如,在一个表格中,“subway”列的所有行都有值“station A”,而在另一个表格中,所有行都有值“station B”。

子查询2:对另一个查询的结果执行进一步分析

子查询的另一种用途是在数学运算的结果上执行函数。以下查询计算每个passengerspilled_coffees数量,并返回这些商数的平均值。

> SELECT MEAN("spills_per_person") FROM (SELECT "spilled_coffee"/"passengers" AS "spills_per_person" FROM "schedule" GROUP BY "subway")

这种用例等同于在函数内部执行数学运算。

Flux翻译2:对另一个查询的结果执行进一步分析

Flux查询与第一个Flux翻译类似,但这次我们筛选两个字段值,spilled_coffeespassenger。接下来,我们pivot我们的表格,以便我们可以根据字段键分离字段值。现在我们使用map()函数在行之间执行数学运算,以计算每个passengerspilled_coffees数量。使用group()函数应用mean()函数,对所有map()函数的结果进行聚合。

from(bucket: "schedule")
  |> range(start: -10d)
  |> filter(fn: (r) => r["_field"] == "spilled_coffee" or r["_field"] == "passengers")  
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) => ({ r with spills_per_person: r.spilled_coffee/ r.passengers }))
  |> group()
  |> mean(column: "spills_per_person")

或者,使用fieldAsCol()函数代替pivot,如下所示

import "influxdata/influxdb/v1"
  from(bucket: "schedule")
  |> range(start: -10d)
  |> filter(fn: (r) => r["_field"] == "spilled_coffee" or r["_field"] == "passengers")  
  |> v1.fieldsAsCols()
  |> map(fn: (r) => ({ r with spills_per_person: r.spilled_coffee/ r.passengers }))
  |> group()
  |> mean(column: "spills_per_person")

请注意,这是直接翻译自这篇博客中找到的查询,我们假设所有字段都属于相同的测量。在InfluxDB的先前版本(1.8及之前)中,您无法执行跨测量的数学运算。从InfluxDB 1.8+和Flux开始,您可以执行连接和跨测量的数学运算。对于分别测量的字段,类似的查询可能如下所示

httpd = from(bucket:"telegraf")
|> range(start: -1hr)
|> filter(fn:(r) => r._measurement == "influxdb_httpd" and r._field == "writeReq")

write = from(bucket:"telegraf")
|> range(start: -1hr)
|> filter(fn:(r) => r._measurement == "influxdb_write" and r._field == "pointReq")

avg_batch_size = join(tables:{httpd:httpd, write:write}, on:["_time"])

|> map(fn:(r) => ({_value: r._value_write / r._value_httpd}))
|> mean()
|> yield()

首先,我们筛选来自两个测量值,“influxdb_httpd”和“influxdb_write”的数据。然后我们在时间戳上连接两个表格。最后,我们使用map()函数执行除法。

子查询3:对另一个查询的结果设置特定条件

执行一个函数,并仅返回满足特定条件的那些结果。这个用例类似于SQL的HAVING子句。以下查询计算每十分钟间隔的乘客最小数量,并仅返回大于15的最小值。

> SELECT "min_pass" FROM (SELECT MIN("passengers") AS "min_pass"  FROM "schedule" WHERE time >= '2017-01-25T18:00:00Z' AND time <= '2017-01-25T18:15:00Z' GROUP BY time(10m)) WHERE "min_pass" > 15

Flux翻译3:对另一个查询的结果设置特定条件

此Flux查询利用了aggregateWindow()函数,轻松地计算每10分钟数据的最小值。应用过滤器查找大于15的乘客数量的最小值。

from(bucket: "schedule")
  |> range(start: 2017-01-25T18:00:00Z, stop: 2017-01-25T18:15:00Z)
  |> filter(fn: (r) => r["_field"] == "passengers")
  |> aggregateWindow(
       every: 10m,
       fn: min)
  |> filter(fn: (r) => r["_value"] > 15)

还有其他什么吗?

希望这个教程能帮助您了解如何使用Flux。一如既往,如果您遇到困难,请在我们社区网站Slack频道上分享。我们非常乐意收到您的反馈,并帮助您解决遇到的问题。