使用新的Flux "types" 包

导航至

Flux Query 1

作为一门强类型语言,Flux可以保护您免受许多潜在的运行时错误。但是,如果您不知道查询的数据的列类型,您可能会遇到一些令人烦恼的错误。

假设您有一个桶,它从多个不同的流中接收常规写入,并且您想要编写一个任务,将桶中的度量值下采样到另一个桶中。如果您事先知道,例如,_value列始终是数值类型,则可以运行以下任务而不会出现任何问题

option task = {name: "write-agg", every: 30m, offset: 1s}

from(bucket: "test-bucket")
       |> range(start: -30m)
       |> filter(fn: (r) => r._measurement == "logs")
       |> aggregateWindow(fn: mean, every: 5m)
       |> to(bucket: "test-downsample")

但是,如果您在查询之前不知道数据的模式,您可能会遇到麻烦。如果_query_变成了字符串而不是数字,则此查询可能会失败。

could-not-execute-task

到目前为止,还没有一个通用的解决方案。如果您知道您正在寻找的标签,则对_field_的过滤可能很有用,但也许您不知道这些详细信息,或者可能需要包含或排除的标签列表太长,不适合放入过滤谓词中。

介绍types包。此包引入了isType函数,它使对列类型的过滤变得更容易。我们可以使用它通过导入types包并添加一个新的过滤器来检查r._value的类型来修复我们的原始查询。

import "types"

option task = {name: "write-agg", every: 30m, offset: 1s}

from(bucket: "test-bucket")
       |> range(start: -30m)
       |> filter(fn: (r) => r._measurement == "logs")
       |> filter(fn: (r) => types.isType(v: r._value, type: "float"))
       |> aggregateWindow(fn: mean, every: 5m)
       |> to(bucket: "test-downsample")

现在我们可以确信,任何被传输到aggregateWindow的数据都包含一个类型为float的_value列,从而避免了任何潜在的类型错误。果然,我们的任务成功,并将下采样数据成功写入我们的新桶

completed-task message

我们还可以使用 isType 进行更复杂的过滤。让我们设想一下,从上面的任务中读取的 logs 测量值具有许多不同类型的字段。我们希望聚合所有这些字段,但我们知道某些聚合函数并不适用于所有类型。我们可以使用 isType 来根据我们找到的数据类型决定使用哪个聚合函数。

import "types"

option task = {name: "write-agg", every: 30m, offset: 1s}

from(bucket: "test-bucket")
       |> range(start: -30m)
       |> filter(fn: (r) => r._measurement == "logs")
       |> filter(fn: (r) => {
                 return types.isType(v: r._value, type: "float")
                         or types.isType(v: r._value, type: "int")
                         or types.isType(v: r._value, type: "uint")
       })	
       |> aggregateWindow(fn: mean, every: 5m)
       |> to(bucket: "test-downsample")

from(bucket: "test-bucket")
       |> range(start: -30m)
       |> filter(fn: (r) => r._measurement == "logs")
       |> filter(fn: (r) => {
                return types.isType(v: r._value, type: "string")
                        or types.isType(v: r._value, type: "bool")
       })
      |> aggregateWindow(fn: last, every: 5m)
      |> to(bucket: "test-downsample")

这个新包包含在最新的Flux版本中,对所有云用户都可用。我们鼓励您尝试它并告诉我们您的想法!