TL;DR InfluxDB 技术提示: 将 InfluxQL 查询转换为 Flux 查询
作者:Anais Dotis-Georgiou / 用例, 开发者, 产品
2022 年 3 月 31 日
导航至
如果您是 InfluxDB 1.x 用户,您很可能比 Flux 更熟悉 InfluxQL。要深入理解 Flux,重要的是要理解
但是,即使不学习这些主题,您仍然可以使用 Flux。在本 TL;DR 中,我们将把常见的 InfluxQL 查询转换为 Flux,并识别两种语言之间的模式,以帮助您更轻松地开始使用 Flux,如果您来自 InfluxQL 或 SQL 背景。
基本原理
几乎每个用户都会从编写类似以下的 InfluxQL 查询开始
SELECT "fieldKey1","fieldKey2" FROM "db"."measurement1" WHERE time >= '2022-02-22T20:22:02Z' and time < now() GROUP BY <tagKey1>
这是等效的 Flux 查询
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now())
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1" or r._field == "fieldKey2")
|> yield(name: "my first flux query")
让我们看看这两个查询是如何相互映射的
如果您是 Flux 的新手,您首先会注意到管道前向运算符 |>
。Flux 是一种声明性语言,旨在通过在函数之间管道前向数据来转换数据,其中每个函数按顺序转换数据。您可以看到代码以 from() 函数开始,然后将该 from() 函数的结果管道前向到 range() 函数。您可以使用 range() 函数而不是 WHERE 子句来指定您的时间范围。最后,您不需要按标签分组,因为标签在 Flux 中会自动分组。但是,您可以包含一个冗余的 group() 函数来按该标签分组,如所示。
yield() 函数是您用来指定 Flux 查询的哪些部分应返回数据以及在哪里为这些返回结果分配名称的函数。如果您只查询一个数据流(如上例),则包含 yield() 函数是可选的。如果没有应用 yield() 函数,结果将默认命名为“_result”。如果您想同时查询来自多个存储桶的数据,则必须使用两个 yield 函数,并将每个结果命名为不同的名称。这类似于使用 AS 子句。例如
from(bucket: "db1")
// specify start:0 to query from all time. Equivalent to SELECT * from db1. Use just as cautiously.
|> range(start: 0)
|> yield(name: "db1 results")
from(bucket: "db2")
|> range(start: 0)
|> yield(name: "db2 results")
SELECT 语句 → from() 和 filter() 函数
要使用 Flux 选择字段、标签和度量,请使用 from() 和 filter() 函数而不是 SELECT 语句。from() 函数是您指定要从中获取数据的存储桶的位置。在 Flux 中,存储桶指的是您的数据库和保留策略。您可以使用 filter() 函数选择字段、标签和度量。您可以使用 and 语句来选择不同的字段、标签和度量,也可以使用多个 filter() 函数。以下两个 Flux 查询是等效的
-
|> filter(fn: (r) => r._measurement == "measurement1") |> filter(fn: (r) => r._field == "tagKey1") |> filter(fn: (r) => r._field == "fieldKey1")
-
|> filter(fn: (r) => r._measurement == "measurement1" and r._field == "tagKey1" and r._field == "fieldKey1")
请确保不要意外地使用 and 语句 而不是 or 语句,因为您将不会返回任何结果。以下查询试图同时返回名为“fieldKey1”和“fieldKey2”的字段。这是不可能的,因此您将不会返回任何结果。不要 执行以下操作
|> filter(fn: (r) => r._field == "fieldKey1" and r._field == "fieldKey1" )
请 使用 or 语句 来选择多个度量、标签或字段,如下所示
|> filter(fn: (r) => r._field == "fieldKey1" or r._field == "fieldKey1" )
SELECT 多个聚合 → 多个 yield() 函数
如果您熟悉 InfluxQL,那么您可能以前选择过多个聚合。也许您想选择不同字段的最小值和最大值,如下所示
SELECT max("fieldKey1") AS "max", min("fieldKey2") AS "min" FROM "db"."measurement1" WHERE time >= '2022-02-22T20:22:02Z' and time < now() GROUP BY <tagKey1>
您可以使用 Flux 执行相同的操作,但这需要更多代码行
//querying your common data once and referencing it with a variable
data = from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
data
|> filter(fn: (r) => r._field == "fieldKey1")
|> max()
|> yield(name: "max")
data
|> filter(fn: (r) => r._field == "fieldKey2")
|> min()
|> yield(name: "min")
让我们看看这两个查询是如何相互映射的
题外话:InfluxQL 用户在学习 Flux 时告诉我的第一件事是 Flux 必然很复杂。我听到诸如“以前一行 InfluxQL 代码现在是 10 行 Flux 代码”之类的话。当我开始学习 Flux 时,我也感受到了同样的痛点,但现在我更喜欢 Flux 而不是 InfluxQL。“简单”的 InfluxQL 查询在 Flux 中可能看起来更复杂,但这种复杂程度与 Flux 提供的海量数据转换和分析能力相比,就显得微不足道了。您可以构建自定义函数、透视数据、连接数据、编写自定义抖动算法、跨数据映射、获取和处理 JSON 等等。
在编写 Flux 时,重要的是开始在脑海中思考数据结构、层次结构和形状。这可能是 Flux 和 InfluxQL 之间思维方式的最大转变。例如,在上面的查询中,您需要考虑查询中的共同点。在本例中,它是从“db”和“measurement1”查询数据。将此数据存储在一个变量中并引用该变量。现在您可以进一步筛选您想要查找最大值和最小值的特定字段(或分别为“fieldKey1”和“fieldKey2”)。最后,使用 yield() 函数来命名每个结果,就像您可能在 InfluxQL 中使用 AS 子句那样,以同时生成两个结果——max(“fieldKey1”) 和 min(“fieldkey2”)。
您也可以执行以下查询
//querying your data multiple times
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1")
|> max()
|> yield(name: "max")
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1")
|> max()
|> yield(name: "max")
您将使用后一个查询返回相同的结果,但此查询可能效率较低。效率取决于数据的形状。将基本查询存储在变量中,然后引用该变量的优势在于,这种方法可以防止您执行冗余查询,或像后者所示那样多次查询数据。当您在基本查询中筛选标签时,使用变量存储基本查询尤其理想,这些标签将返回的数据限制为数据的特定子集。如果您的度量中有很多字段,那么像前一个解决方案那样查询所有数据并将数据首先存储在变量中,可能效率较低。这是因为通过仅筛选一个度量,如果该度量中有数百个字段,而不是仅有两个字段,您可能会查询大量数据。最佳实践是将您的基本查询存储在变量中,如果该基本查询仅返回数据的子集,然后将进一步筛选该基本查询的子集。
题外话:使用 reduce() 同时返回多个聚合
您还可以使用 reduce() 函数同时返回多个聚合。使用 reduce() 更像是一个高级主题,但值得在此处提及此解决方案
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1" or r._field == "fieldKey2")
|> reduce(
identity: {count: 0.0, sum: 0.0, min: 0.0, max: 0.0, mean: 0.0},
fn: (r, accumulator) => ({
count: accumulator.count + 1.0,
sum: r._value + accumulator.sum,
min: if accumulator.count == 0.0 then r._value else if r._value < accumulator.min then r._value else accumulator.min,
max: if accumulator.count == 0.0 then r._value else if r._value > accumulator.max then r._value else accumulator.max,
mean: (r._value + accumulator.sum) / (accumulator.count + 1.0)
})
)
|> yield(name: "count_sum_min_max_mean_reduce")
这将会在您的数据中生成 4 个额外的列,其中包含您的两个字段的计数、总和、最小值、最大值和平均值。此方法的唯一缺点是 InfluxDB UI 一次只能可视化一列。如果您想同时可视化这些聚合,您需要使用上面讨论的方法。
WHERE 子句 → range() 或 filter() 函数
希望您可能已经注意到 WHERE 子句与 range() 或 filter() 函数之间存在一些重叠。您可以使用 range() 函数来描述您要从中选择数据的时间范围。filter() 函数支持对您的标签、度量、字段或字段值进行条件语句。
也许您想选择大于某个值的字段值,如下所示
SELECT "fieldKey1" FROM "<db>"."<measurement1>" WHERE "fieldKey1" > 8 and time >= '2022-02-22T20:22:02Z' and time < now()
您可以使用 Flux 执行相同的操作,但分别使用 range() 和 filter() 函数来处理时间和字段值条件
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1")
|> filter(fn: (r) => r._value > 8)
GROUP BY 子句 → group() 或 aggregateWindow() 函数
标签在 Flux 中会自动分组,因此通常不需要像 InfluxQL 那样使用 GROUP BY 标签,除非您已取消分组您的数据。换句话说,以下 InfluxQL 查询
SELECT "fieldKey1" FROM "db"."measurement1"
WHERE "tagKey1" == 'tagValue1' OR "tagKey1" == 'tagValue2' and time >= '2022-02-22T20:22:02Z' and time < now() GROUP BY
"tagKey1"
...等效于以下 Flux 查询
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r.tagKey1 == "tagValue1" or r.tagKey1 == "tagValue2")
|> filter(fn: (r) => r._field == "fieldKey1")
或者,如果您不在 InfluxQL 中使用 GROUP BY 子句,您将必须通过使用空的 group() 函数来取消分组您的数据。以下 InfluxQL 查询
SELECT "fieldKey1" FROM "db"."measurement1"
WHERE "tagKey1" == 'tagValue1' OR "tagKey1" == 'tagValue2' and time >= '2022-02-22T20:22:02Z' and time < now()
...等效于以下 Flux 查询
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r.tagKey1 == "tagValue1" or r.tagKey1 == "tagValue2")
|> filter(fn: (r) => r._field == "fieldKey1")
|> group()
按时间分组
当在 InfluxQL 中按时间分组时,您几乎总是对您的数据应用聚合或选择器函数。最有可能的是,您将执行类似以下的操作
SELECT max("fieldKey1") FROM "db"."measurement1"
WHERE "tagKey1" == 'tagValue1' OR "tagKey1" == 'tagValue2' and time >= '2022-02-22T20:22:02Z' and time < now() GROUP BY time(1d)
您可以使用 Flux 执行相同的操作,但使用 aggregateWindow() 函数代替
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r.tagKey1 == "tagValue1" or r.tagKey1 == "tagValue2")
|> filter(fn: (r) => r._field == "fieldKey1")
|> aggregateWindow(
every: 1d,
fn: max)
aggregateWindow() 函数允许您在按时间分组后,通过 every 参数指定的持续时间,将选择器(如 min、max、median 等)或聚合函数(mean、count、sum 等)应用于您的数据。您还可以构建您想要的任何自定义函数,并将其传递到 fn 参数中。查看 中级 Flux 用户的前 5 个障碍以及优化 Flux 的资源,以了解有关使用 Flux 创建您自己的自定义实用程序函数以及将其与 aggregateWindow() 函数一起使用的更多信息。
INTO 子句 → to() 函数
以下 InfluxQL 查询用于将一个查询的结果写入另一个数据库中的另一个度量
SELECT "fieldKey1" FROM "db"."measurement1" INTO "destination_db"."measurement1"
WHERE "fieldKey1" > 8 and time >= '2022-02-22T20:22:02Z' and time < now()
您可以使用 Flux 执行相同的操作,但使用 to() 函数代替
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1")
|> filter(fn: (r) => r._value > 8)
|> to(bucket: "destination_db")
请记住,如果您想移动大量数据,您需要确保像使用 InfluxQL 那样使用顺序写入来执行此操作。如果您想在将数据写入另一个数据库之前重命名度量,您可以在 InfluxQL 中执行以下操作
SELECT * FROM "db"."measurement1" INTO "destination_db"."new_measurement1"
WHERE "fieldKey1" > 8 and time >= '2022-02-22T20:22:02Z' and time < now()
您可以使用 Flux 中的 set() 函数执行相同的操作
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1")
|> filter(fn: (r) => r._value > 8)
|> set(key: "_measurement",value: "new_measurement1")
|> to(bucket: "destination_db")
您无法使用 InfluxQL 重命名标签,但可以使用 Flux 重命名。如果您想重命名您的标签或字段键,您可以多次使用 set 函数,或者您可以使用 map() 函数与条件查询逻辑来有条件地转换您的数据
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1")
|> filter(fn: (r) => r._tagKey1 == "tagValue1")
|> filter(fn: (r) => r._tagKey1 == "tagValue2")
|> filter(fn: (r) => r._value > 8)
|> map(
fn: (r) => ({r with
_measurement: "new_measurement1",
tagKey1: if r.tagkey1 == "tagValue1" then "new_tagValue1" else "new_tagValue2""
})
)
|> rename(columns: {tagKey1: "new_tagKey1"})
|> to(bucket: "destination_db")
在这里,我们使用 map 函数将度量从“measurement1”重命名为“new_measurement1”,并将标签值从“tagValue1”和“tagValue2”重命名为“new_tagValue1”和“new_tagValue2”。最后,我们使用 rename() 函数将标签键从“tagKey1”重命名为“new_tagKey1”。当使用 Flux 而不是 InfluxQL 时,map() 函数是一个非常强大的函数。您可以使用它通过将函数应用于数据,来转换管道输入数据的每个记录。
InfluxQL 翻译集合
有时,学习新事物最好的方法就是通过重复和示例。本节只是 InfluxQL 到 Flux 查询翻译的列表,以帮助您理解两种语言之间的异同。
InfluxQL 查询 | Flux 查询 |
SELECT LAST(fieldKey1) from measurement1 where time < '2021-08-17' |
from(bucket: "my-bucket") |> range(start: 0, stop: 2021-08-17T12:00:00Z) |> filter(fn: (r) => r._measurement == "measurement1") |> filter(fn: (r) => r._field == "fieldKey1") |> last() |
SELECT last("fieldKey1")*60+last("fieldKey2") FROM "measurement1" WHERE time > now() -5m GROUP BY time(1h) fill(null) |
import "influxdata/influxdb/schema"from(bucket:"my-bucket") |>range(start: -5m, stop: now()) |>filter(fn: (r) => r._measurement == "measurement1") |>aggregateWindow(every:1h fn: last) |> schema.fieldsAsCols() |> map(fn: (r) => ({ r with _value: r.fieldKey1 * 60 + r.fieldKey2})) |
SELECT last("fieldKey1") - first("fieldKey2") FROM "measurement1" WHERE time < now() - 1d AND "tagKey1"='measurement1' |
假设数据始终在增加: from(bucket: "my-bucket") |> range(start: -1d) |> filter(fn: (r) => r["_measurement"] == "measurement1") |> filter(fn: (r) => r["_field"] == "fieldKey1" or r["_field"] == "fieldKey2") |> spread() 或者 data = from(bucket: "my-bucket") |> range(start: -1d) |> filter(fn: (r) => r["_measurement"] == "measurement1") |> filter(fn: (r) => r["_field"] == "fieldKey1" or r["_field"] == "fieldKey2")
last = data |> last() first = data |> first() union(tables: [last, first]) |> sort(columns: ["_time"], desc: true) |> difference() |
虽然我不会详细介绍上表 Flux 列中使用的所有函数,但我希望您可以自然而然地得出一些关于 Flux 脚本可能如何工作的结论。我也鼓励您通过设置 免费层帐户 并 写入和查询一些示例数据 到您的 InfluxDB Cloud 实例并从中查询,亲自试用这些函数。
专家提示:在上面查询的行之间使用多个 yield() 语句,以查看 Flux 的每一行如何转换您的数据。
结论:成本效益分析
当然,您已经注意到 Flux 查询通常比 InfluxQL 查询更长。它们起初甚至可能看起来更复杂——尽管我希望这篇文章能帮助您克服采用 Flux 的障碍。但是,Flux 比 InfluxQL 强大得多。使用 Flux,您可以
- 执行 joins 和 unions
- 操作时间戳
- 使用条件逻辑进行筛选
- 有条件地转换数据
- 执行 跨度量的数学运算
- 编写 中位数绝对偏差异常检测算法 或 朴素贝叶斯分类器
- 与 TICK 脚本相比,具有更高的灵活性和功能性,以及大大改善的开发者体验
- 使用任务按计划转换数据
- 创建警报和通知
- 以及更多
如果您想了解有关 Flux 的更多信息,我鼓励您阅读《Time to Awesome》这本书的以下两节:Flux 简介 和 查询和数据转换,以指导您了解 Flux 语言中一些最重要和最有用的函数和概念。我还想邀请您查看作为 InfluxDB 大学 一部分提供的以下免费课程
如果您正在使用 Flux 并且需要帮助,请在我们的 社区网站 或 Slack 频道中寻求帮助。如果您正在 InfluxDB 之上开发一个很酷的物联网应用程序,我们很乐意听到您的消息,因此请务必 分享您的故事!此外,请在评论部分分享您的想法、疑虑或问题。我们很乐意获得您的反馈并帮助您解决遇到的任何问题!