TL;DR InfluxDB 技术提示 — 从子查询到 Flux!

导航至

在这篇文章中,我们将 InfluxDB 1.x 版本中使用的 InfluxQL 子查询转换为 Flux,这是一种数据脚本和函数式查询语言,适用于 InfluxDB 1.8 及更高版本,包括 OSS 或 Cloud 版本。这里翻译的子查询来自这篇博客。这篇博客假设您对 Flux 有基本的了解。如果您完全不熟悉 Flux,我建议您查看以下文档和博客

如何将以下 InfluxQL 子查询转换为 Flux?

子查询 1:对函数的函数执行函数

以下子查询计算每个地铁 标签 的乘客 平均值,并且仅返回 最大 平均值

> SELECT MAX("mean") FROM (SELECT MEAN("passengers") FROM "schedule" GROUP BY "subway")

Flux 翻译 1:对函数的函数执行函数

选择您要从中查询数据的 存储桶 和您要查询的 范围筛选 感兴趣的 字段。计算特定列的平均值。Flux 根据组键将数据分组到表中。您不需要按 subway 标签 键分组,因为它默认是 组键。现在将所有平均值分组到一个表中,以查找平均值的总体最大值。

from(bucket: "schedule")
  |> range(start: -10d)
  |> filter(fn: (r) => r["_field"] == "passengers") 
  |> mean(column: "_value")
  |> group()
  |> max()

在 InfluxDB 1.7 及更早版本中,InfluxQL 查询仅返回单个未组织的数据集合。在 InfluxDB 1.8 及更高版本中,数据以 带注释的 CSV 形式组织。这种数据格式包含元数据,可帮助您执行更复杂的查询。 Flux 允许您以有组织的组或 的形式返回数据。表具有 组键,即表中每行具有相同值的列列表。组键值(true 或 false)分配给表中的每一列。例如,列名可能是“subway”,代表我们的地铁标签键。如果表中“subway”列的每一行都相同,则组键值设置为 true。这表明两种可能性之一:1) 您只有一个标签值,并且该列的所有行都相同;或者 2) 您使用了 group() 函数。如果后者为真,则您的数据已分组到较小的表中,例如,在一个表中,“subway”列的所有行都具有值“station A”,而在另一个表中,所有行都具有值“station B”。

子查询 2:对另一个查询的结果执行额外分析

子查询的另一个用途是对数学运算的结果执行函数。以下查询计算每 passengerspilled_coffees 数量,并返回这些商的 平均值

> SELECT MEAN("spills_per_person") FROM (SELECT "spilled_coffee"/"passengers" AS "spills_per_person" FROM "schedule" GROUP BY "subway")

此用例等同于在函数内执行数学运算。

Flux 翻译 2:对另一个查询的结果执行额外分析

Flux 查询与第一个 Flux 翻译类似,不同之处在于这次我们筛选了两个字段值,spilled_coffeespassenger。接下来,我们 pivot 我们的表,以便我们可以根据字段键分隔我们的字段值。现在,我们使用 map() 函数来执行跨行的数学运算,以计算每 passengerspilled_coffees 数量。使用 group() 函数将 mean() 函数应用于 map() 函数的所有结果。

from(bucket: "schedule")
  |> range(start: -10d)
  |> filter(fn: (r) => r["_field"] == "spilled_coffee" or r["_field"] == "passengers")  
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) => ({ r with spills_per_person: r.spilled_coffee/ r.passengers }))
  |> group()
  |> mean(column: "spills_per_person")

或者,使用 fieldAsCol() 函数代替 pivot,如下所示

import "influxdata/influxdb/v1"
  from(bucket: "schedule")
  |> range(start: -10d)
  |> filter(fn: (r) => r["_field"] == "spilled_coffee" or r["_field"] == "passengers")  
  |> v1.fieldsAsCols()
  |> map(fn: (r) => ({ r with spills_per_person: r.spilled_coffee/ r.passengers }))
  |> group()
  |> mean(column: "spills_per_person")

请注意,这是对 这篇博客 中查询的直接翻译,我们假设所有字段都属于同一 measurement。在以前版本的 InfluxDB(1.8 及更早版本)中,您无法跨 measurement 执行数学运算。使用 InfluxDB 1.8+ 和 Flux,您可以跨 measurement 执行 join 和数学运算。用于在单独 measurement 中查找字段的类似查询如下所示

httpd = from(bucket:"telegraf")
|> range(start: -1hr)
|> filter(fn:(r) => r._measurement == "influxdb_httpd" and r._field == "writeReq")

write = from(bucket:"telegraf")
|> range(start: -1hr)
|> filter(fn:(r) => r._measurement == "influxdb_write" and r._field == "pointReq")

avg_batch_size = join(tables:{httpd:httpd, write:write}, on:["_time"])

|> map(fn:(r) => ({_value: r._value_write / r._value_httpd}))
|> mean()
|> yield()

首先,我们筛选来自两个 measurement “influxdb_httpd” 和 “influxdb_write” 的数据。然后,我们在时间戳上 join 这两个表。最后,我们使用 map() 函数执行除法。

子查询 3:对另一个查询的结果设置特定条件

执行一个函数,并且仅返回符合特定条件的结果。此用例类似于 SQLHAVING 子句。以下查询计算十分钟间隔的 最小 乘客数,并且仅返回那些大于 15 的最小值

> SELECT "min_pass" FROM (SELECT MIN("passengers") AS "min_pass"  FROM "schedule" WHERE time >= '2017-01-25T18:00:00Z' AND time <= '2017-01-25T18:15:00Z' GROUP BY time(10m)) WHERE "min_pass" > 15

Flux 翻译 3:对另一个查询的结果设置特定条件

此 Flux 查询利用 aggregateWindow() 函数轻松计算每 10 分钟数据的最小值。应用筛选器以查找高于 15 passengers 的最小 passengers 数。

from(bucket: "schedule")
  |> range(start: 2017-01-25T18:00:00Z, stop: 2017-01-25T18:15:00Z)
  |> filter(fn: (r) => r["_field"] == "passengers")
  |> aggregateWindow(
       every: 10m,
       fn: min)
  |> filter(fn: (r) => r["_value"] > 15)

还有什么?

我希望本教程能够帮助您理解如何使用 Flux。与往常一样,如果您遇到障碍,请在我们的 社区站点Slack 频道上分享。我们很乐意获得您的反馈并帮助您解决遇到的任何问题。