TL;DR InfluxDB技术提示:将InfluxQL查询转换为Flux查询

导航至

如果您是InfluxDB 1.x用户,您可能比Flux更熟悉InfluxQL。为了深入理解Flux,了解以下内容非常重要:

  1. 语言的基本知识
  2. 标注CSV,Flux查询的输出格式

但是,即使不研究这些主题,您也可以使用Flux。在本TL;DR中,我们将将常见的InfluxQL查询转换为Flux,并确定两种语言之间的模式,以帮助您更容易地开始使用Flux,如果您来自InfluxQL或SQL背景。

基础

几乎每个用户都会从编写以下InfluxQL查询开始

SELECT "fieldKey1","fieldKey2" FROM "db"."measurement1" WHERE time >= '2022-02-22T20:22:02Z' and time < now() GROUP BY <tagKey1>

这是等效的Flux查询

from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now())
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1" or r._field == "fieldKey2")
|> yield(name: "my first flux query")

让我们看看这两个查询是如何对应起来的

InfluxQL query

如果您是 Flux 的初学者,您首先会注意到管道前进运算符,|>。Flux 是一种声明式语言,并且被设计用来通过管道前进运算符在函数之间传递数据来转换数据。您可以看到代码从 from() 函数开始,然后将 from() 函数的结果管道传递到 range() 函数。您使用 range() 函数指定时间范围,而不是 WHERE 子句。最后,您不需要按标签分组,因为标签在 Flux 中会自动分组。然而,您可以包括一个多余的 group() 函数来按该标签分组,如下所示。

您使用 yield() 函数来指定 Flux 查询的哪些部分应该返回数据,以及您将这些返回的结果赋予什么名称。如果您像上面一样只查询一个数据流,那么包含 yield() 函数是可选的。如果没有应用 yield() 函数,则默认将结果命名为 “_result”。如果您想同时查询来自多个桶的数据,则必须使用两个 yield 函数,并为每个结果命名不同的名称。这与使用 AS 子句类似。例如

from(bucket: "db1")
// specify start:0 to query from all time. Equivalent to SELECT * from db1. Use just as cautiously.
|> range(start: 0)
|> yield(name: "db1 results")
from(bucket: "db2")
|> range(start: 0)
|> yield(name: "db2 results")

SELECT 语句 → from() 和 filter() 函数

要使用 Flux 选择字段、标签和度量,请使用 from() 和 filter() 函数而不是 SELECT 语句。from() 函数用于指定您要从哪个桶中获取数据。在 Flux 中,桶指的是您的数据库和保留策略。您使用 filter() 函数选择字段、标签和度量。您可以使用 and 语句选择不同的字段、标签和度量,或者您可以使用多个 filter() 函数。以下两个 Flux 查询是等效的

  1. |> filter(fn: (r) => r._measurement == "measurement1")
    |> filter(fn: (r) => r._field == "tagKey1")
    |> filter(fn: (r) => r._field == "fieldKey1")
  2. |> filter(fn: (r) => r._measurement == "measurement1" and r._field == "tagKey1" and r._field == "fieldKey1")

请确保不要不小心使用 and 语句 而不是 or 语句,因为这将返回无结果。以下查询试图返回同时具有名称为 “fieldKey1” 和 “fieldKey2” 的字段。这是不可能的,所以您将返回无结果。 不要 做以下操作

|> filter(fn: (r) => r._field == "fieldKey1" and r._field == "fieldKey1" )

应该 使用 or 语句 来选择多个度量、标签或字段,如下所示

|> filter(fn: (r) => r._field == "fieldKey1" or r._field == "fieldKey1" )

选择多个聚合 → 多个 yield() 函数

如果您熟悉 InfluxQL,那么您可能之前已经选择过多个聚合。也许您想选择不同字段的 min 和 max,如下所示

SELECT max("fieldKey1") AS "max", min("fieldKey2") AS "min" FROM "db"."measurement1" WHERE time >= '2022-02-22T20:22:02Z' and time < now() GROUP BY <tagKey1>

您可以使用 Flux 做同样的事情,但这需要更多的代码行

//querying your common data once and referencing it with a variable
data = from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")

data 
|> filter(fn: (r) => r._field == "fieldKey1")
|> max()
|> yield(name: "max")

data
|> filter(fn: (r) => r._field == "fieldKey2")
|> min()
|> yield(name: "min")

让我们看看这两个查询是如何对应起来的

multiple aggregations - two queries map to each other

顺便说一句:学习 Flux 的 InfluxQL 用户告诉我的第一件事之一是 Flux 必然很复杂。我听到这样的话,“过去一行 InfluxQL 代码现在变成了 10 行 Flux”。我在学习 Flux 的时候也有同样的痛点,但现在我更喜欢 Flux 而不是 InfluxQL。“简单”的 InfluxQL 查询在 Flux 中可能看起来更复杂,但与 Flux 提供的数据转换和分析能力相比,这种复杂程度相形见绌。您可以构建自定义函数、旋转数据、连接数据、编写自定义抖动算法、跨数据映射、获取和操作 JSON,以及更多。

在编写Flux时,重要的是在脑海中考虑数据结构、层次和形状。这可能是在Flux和InfluxQL中思维方式的最大的转变。例如,在上面的查询中,您需要考虑查询中的公共分母。在这种情况下,是从“db”和“measurement1”查询数据。将这些数据存储在变量中并引用该变量。现在您可以进一步筛选以找到最大值和最小值的特定字段(或“fieldKey1”和“fieldKey2”,分别)。最后使用yield()函数为每个结果命名,就像您在InfluxQL中使用AS子句同时产出两个结果一样——同时得到max(“fieldKey1”)和min(“fieldkey2”)。

您也可以执行以下查询

//querying your data multiple times 
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1")
|> max()
|> yield(name: "max")

from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1")
|> max()
|> yield(name: "max")

使用这个后者的查询,您将得到相同的结果,但这个查询可能不太高效。效率取决于您数据的形式。将基本查询存储在变量中然后引用该变量的优点是这种方法可以防止您执行冗余查询,或者像后者那样多次查询数据。使用变量来存储基本查询对于在基本查询中筛选标签,从而将返回的数据限制在特定数据子集的情况特别理想。如果您在测量中有许多字段,那么查询所有数据并将数据存储在变量中,就像在先前的解决方案中那样,可能不太高效。这是因为通过只筛选一个测量,如果该测量中有数百个字段,您可能需要查询大量数据,而不是只查询两个字段。最佳实践是将基本查询存储在变量中,如果该基本查询只返回数据子集,然后对这些数据进行进一步筛选。

顺便说一句:使用reduce()同时返回多个聚合

您还可以使用reduce()函数同时返回多个聚合。使用reduce()是一个更高级的话题,但在这里提一下这个解决方案是值得的

from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1" or r._field == "fieldKey2")
|> reduce(
      identity: {count: 0.0, sum: 0.0, min: 0.0, max: 0.0, mean: 0.0},
      fn: (r, accumulator) => ({
        count: accumulator.count + 1.0,
        sum: r._value + accumulator.sum,
        min: if accumulator.count == 0.0 then r._value else if r._value < accumulator.min then r._value else accumulator.min,
        max: if accumulator.count == 0.0 then r._value else if r._value > accumulator.max then r._value else accumulator.max,
        mean: (r._value + accumulator.sum) / (accumulator.count + 1.0)
      })
    )
|> yield(name: "count_sum_min_max_mean_reduce")

这将为您的数据产生4个额外的列,其中包含两个字段的计数、总和、最小值、最大值和平均值。这种方法唯一的缺点是InfluxDB UI一次只能可视化一个列。如果您想同时可视化这些聚合,您需要使用上面讨论的方法。

WHERE子句 → range()或filter()函数

希望您已经注意到了WHERE子句和range()或filter()函数之间的重叠。您使用range()函数来描述您想要从哪里选择数据的时间范围。filter()函数支持对您的标签、测量、字段或字段值的条件语句。

也许您想选择字段值大于某个值的记录,如下所示

SELECT "fieldKey1" FROM "<db>"."<measurement1>" WHERE "fieldKey1" > 8 and time >= '2022-02-22T20:22:02Z' and time < now()

您可以使用Flux做到这一点,但使用range()和filter()函数分别代替时间和字段值条件

from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1")
|> filter(fn: (r) => r._value > 8)

GROUP BY子句 → group()或aggregateWindow()函数

标签在Flux中被自动分组,所以通常不需要像在InfluxQL中那样GROUP BY标签,除非您已经对数据进行了解组。换句话说,以下InfluxQL查询

SELECT "fieldKey1" FROM "db"."measurement1"
WHERE "tagKey1" == 'tagValue1' OR "tagKey1" == 'tagValue2' and time >= '2022-02-22T20:22:02Z' and time < now() GROUP BY
"tagKey1"

…与以下Flux查询等价

from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r.tagKey1 == "tagValue1" or r.tagKey1 == "tagValue2")
|> filter(fn: (r) => r._field == "fieldKey1")

另外,如果在 InfluxQL 中不使用 GROUP BY 子句,您将需要在 Flux 中使用一个空的 group() 函数来取消分组您的数据。以下 InfluxQL 查询

SELECT "fieldKey1" FROM "db"."measurement1"
WHERE "tagKey1" == 'tagValue1' OR "tagKey1" == 'tagValue2' and time >= '2022-02-22T20:22:02Z' and time < now()

…与以下Flux查询等价

from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r.tagKey1 == "tagValue1" or r.tagKey1 == "tagValue2")
|> filter(fn: (r) => r._field == "fieldKey1")
|> group()

按时间分组

在 InfluxQL 中按时间分组时,您几乎总是要对数据进行聚合或选择函数操作。最可能的是您会执行类似以下操作

SELECT max("fieldKey1") FROM "db"."measurement1"
WHERE "tagKey1" == 'tagValue1' OR "tagKey1" == 'tagValue2' and time >= '2022-02-22T20:22:02Z' and time < now() GROUP BY time(1d)

您可以使用 Flux 完成同样的操作,但是用 aggregateWindow() 函数代替

from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r.tagKey1 == "tagValue1" or r.tagKey1 == "tagValue2")
|> filter(fn: (r) => r._field == "fieldKey1")
|> aggregateWindow(
    every: 1d,
    fn: max)

aggregateWindow() 函数允许您在按时间分组后,通过 every 参数指定的持续时间对数据进行选择器(如 min、max、median 等)或聚合函数(如 mean、count、sum 等)操作。您还可以构建任何所需的自定义函数并将其传递给 fn 参数。查看 中级 Flux 用户的前五大难题和优化 Flux 的资源,了解有关如何使用 Flux 创建自定义实用函数以及在 aggregateWindow() 函数中使用它的更多信息。

INTO 子句 → to() 函数

以下 InfluxQL 查询用于将一个查询的结果写入另一个数据库中的另一个测量中

SELECT "fieldKey1" FROM "db"."measurement1" INTO "destination_db"."measurement1"
 WHERE "fieldKey1" > 8 and time >= '2022-02-22T20:22:02Z' and time < now()

您可以用 Flux 完成同样的操作,但用 to() 函数代替

from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1")
|> filter(fn: (r) => r._value > 8)
|> to(bucket: "destination_db")

请注意,如果您想移动大量数据,您需要确保以与 InfluxQL 中相同的顺序写入方式执行。如果您想在将数据写入另一个数据库之前重命名测量,您可以使用以下 InfluxQL 操作

SELECT * FROM "db"."measurement1" INTO "destination_db"."new_measurement1"
 WHERE "fieldKey1" > 8 and time >= '2022-02-22T20:22:02Z' and time < now()

您可以在 Flux 中使用 set() 函数完成相同的操作

from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1")
|> filter(fn: (r) => r._value > 8)
|> set(key: "_measurement",value: "new_measurement1")
|> to(bucket: "destination_db")

您不能使用 InfluxQL 重命名标签,但您可以使用 Flux。如果您想重命名标签或字段键,您可以使用 set 函数多次,或者可以使用 map() 函数结合条件查询逻辑来 有条件地转换数据

from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1")
|> filter(fn: (r) => r._tagKey1 == "tagValue1")
|> filter(fn: (r) => r._tagKey1 == "tagValue2")
|> filter(fn: (r) => r._value > 8)
|> map(
        fn: (r) => ({r with
           _measurement: "new_measurement1",
           tagKey1: if r.tagkey1 == "tagValue1" then "new_tagValue1"  else "new_tagValue2""
     })
    )
|> rename(columns: {tagKey1: "new_tagKey1"})
|> to(bucket: "destination_db")

在此,我们使用 map 函数将测量从“measurement1”重命名为“new_measurement1”,将标签值从“tagValue1”和“tagValue2”重命名为“new_tagValue1”和“new_tagValue2”。最后,我们使用 rename() 函数将标签键从“tagKey1”重命名为“new_tagKey1”。当使用 Flux 而不是 InfluxQL 时,map() 函数是一个非常强大的函数。您可以使用它通过对其应用函数来转换通过管道传输到它的每个数据记录。

一组 InfluxQL 翻译

有时候,通过重复和举例来学习新事物是最好的方法。本节仅列出了 InfluxQL 到 Flux 查询翻译列表,以帮助您了解两种语言之间的相似之处和不同之处。

InfluxQL 查询 Flux 查询
SELECT LAST(fieldKey1) from measurement1 where time < '2021-08-17' from(bucket: "my-bucket") |> range(start: 0, stop: 2021-08-17T12:00:00Z) |> filter(fn: (r) => r._measurement == "measurement1") |> filter(fn: (r) => r._field == "fieldKey1") |> last()
SELECT last("fieldKey1")*60+last("fieldKey2") FROM "measurement1" WHERE time > now() -5m GROUP BY time(1h) fill(null) import "influxdata/influxdb/schema"from(bucket:"my-bucket") |>range(start: -5m, stop: now()) |>filter(fn: (r) => r._measurement == "measurement1") |>aggregateWindow(every:1h fn: last) |> schema.fieldsAsCols() |> map(fn: (r) => ({ r with _value: r.fieldKey1 * 60 + r.fieldKey2}))
SELECT last("fieldKey1") - first("fieldKey2") FROM "measurement1" WHERE time < now() - 1d AND "tagKey1"='measurement1' 基于数据总是增加的假设:from(bucket: "my-bucket") |> range(start: -1d) |> filter(fn: (r) => r["_measurement"] == "measurement1") |> filter(fn: (r) => r["_field"] == "fieldKey1" or r["_field"] == "fieldKey2") |> spread()

或者

data = from(bucket: "my-bucket") |> range(start: -1d) |> filter(fn: (r) => r["_measurement"] == "measurement1") |> filter(fn: (r) => r["_field"] == "fieldKey1" or r["_field"] == "fieldKey2")

last = data |> last() first = data |> first() union(tables: [last, first]) |> sort(columns: ["_time"], desc: true) |> difference()

虽然我不会详细介绍上述表格中Flux列中使用的所有函数,但我希望您能够自然地得出关于Flux脚本可能工作方式的结论。我也鼓励您通过设置免费账户编写和查询一些样本数据来尝试这些函数,并将数据写入和查询您的InfluxDB云实例。

技巧:在上述查询的行之间使用多个yield()语句,以查看每行Flux如何转换您的数据。

结论:成本效益分析

当然,您已经注意到Flux查询通常比InfluxQL查询更长。它们可能一开始看起来更复杂——尽管我希望这篇帖子能帮助您克服Flux采用障碍。然而,Flux比InfluxQL强大得多。使用Flux,您可以

如果您想了解更多关于Flux的信息,我鼓励您阅读Time to Awesome书籍中的以下两个部分:Flux简介查询和数据转换,这些部分将引导您了解Flux语言中最重要和最有用的函数和概念。我还想邀请您查看InfluxDB大学提供的以下免费课程

如果您使用Flux并需要帮助,请在我们社区网站Slack频道中寻求帮助。如果您正在开发基于InfluxDB的酷IoT应用程序,我们非常乐意了解您的情况,所以请确保分享您的经历!此外,请分享您的想法、担忧或问题在评论部分。我们非常乐意获取您的反馈并帮助您解决遇到的问题!