InfluxDB:使用 Flux 中的 Pivot 函数重组数据

导航至


如果您一直在关注我的 Flux 之旅,因为我使用 RailsGraphQL 实施 Flux 查询,您就会知道我在 Flux 中实施各种功能时一直在使用火车数据。为了使我的 Rails 项目中的 GraphQL gem 正常工作(它为每个属性调用对象上的一个方法),我需要为每个火车数据点创建一个 Ruby 对象,该对象将其每个标签和字段分隔到它们自己的属性中,如下所示

class Train
   attr_reader :time, :measurement, :driver, :location, :train, :outdoor_temp, :speed, :track_temp

   def initialize(train)
      @time = train["_time"]
      @measurement = train["_measurement"]
      @driver = train["driver"]
      @location = train["location"]
      @train = train["train"]
      @outdoor_temp = train["outdoor_temp"]
      @speed = train["speed"]
      @track_temp = train["track_temp"]
   end

end

如果没有正确的 Flux 函数,当我查询我的数据库以返回火车数据点时,我的标签将被分隔到它们自己的列中,但我的字段将被聚合到单个列 _field 中。我的数据不会以我想要的形状与基本查询一起呈现,如下面的查询和它返回的结果所示

from(bucket: "trains")
    |> range(start: -60d)
    |> filter(fn: (r) =>
        r._measurement == "train_speed"
    )
    |>group(columns: ["driver"])

我是否必须编写代码来重塑数据?这似乎不对。那么,如果我无法将字段提取到它们自己的列中以便于解析,我该如何创建一个干净的 Ruby 对象呢?pivot 函数允许我做到这一点。

Flux 提供了一个 pivot 函数,它允许您将存在于单列中的数据重塑为多列,从而确保从查询引擎返回的数据以一种更易于使用的方式呈现,而无需实际更改底层数据本身。通常您会被限定为一组预定的列和行,而 pivot 允许您指定数据的组织和显示方式,并且可以使其更易于阅读和直接使用,而无需编写额外的应用程序代码。让我们看看它是如何工作的。来自 文档

输出构造如下

  1. 为 rowKey 参数在输入中标识的每个唯一值创建一个新行。
  2. 新行的初始列集是 row key 与 group key 的并集,但不包括 columnKey 和 valueColumn 参数指示的列。
  3. 为 columnKey 参数在输入中标识的每个唯一值向行添加一组值列。标签是 valueColumn 字符串和 columnKey 值使用 _ 作为分隔符的连接。
  4. 对于每个 row key + column key 对,从输入表中通过 valueColumn 确定适当的值。如果未找到值,则将值设置为 null。

为了探索 pivot 函数,我使用了带有 Flux 技术预览的 InfluxDB v1.7(但工作流程与 v2.0 Alpha 版本相同)。首先,我使用 write endpoint 播种了一个名为 trains 的 bucket,其中包含火车数据的数据点。measurement 是 train_speed。标签是 driverlocationtrain。字段是 outdoor_tempspeedtrack_temp。我希望看到我的所有字段都分解到它们自己的列中,而不是聚合到单个列中,这样我就可以像上面提到的那样创建一个干净的 Ruby 对象。作为提醒,这里是我开始使用的数据形状的示例,按 driver 分组,您可以看到我的字段都聚合到单个列 _field

上面按 driver 分组的表格可视化是使用以下查询实现的

from(bucket: "trains")
    |> range(start: -60d)
    |> filter(fn: (r) =>
        r._measurement == "train_speed"
    )
    |>group(columns: ["driver"])

在这个查询中,我从我的 trains bucket 中获取数据,设置最近 60 天的范围(在 Flux 中需要设置时间 range),并请求最广泛可能的 filter,即 measurement。然后我通过 driver group 我的数据,以查看与每个 driver 关联的所有可用数据。这几乎是您可以创建的最广泛的过滤器,并且在大型数据集上使用可能不明智,但它有助于从高层次说明过滤的作用。

作为此查询的结果,我的 measurement 和标签都排序到它们自己的列中,但我的所有字段都收集在同一个 _field 列中。我可以通过查看 UI 中的原始数据输出来确认这一点,这对于确认我是否获得了我想要的数据形状很有用。如果我想要一个表格,其中看到所有这些字段都分隔到它们自己的列中怎么办?我可以运行以下查询来实现 pivot 函数

from(bucket: "trains")
    |> range(start: -60d)
    |> filter(fn: (r) =>
        r._measurement == "train_speed"
    )
    |>group(columns: ["driver"])
    |> pivot(
        rowKey:["_time"],
        columnKey: ["_field"],
        valueColumn: "_value"
      )

此查询返回如下结果

您可以看到我的每个字段现在都有自己的列,但我不再看到我的 measurement 或标签。如果我想要看到这些字段分隔到它们自己的列中,同时仍然按 driver 分组并保留 measurement 和标签的行怎么办?除非这些原始列存在于 group 函数或 rowKey 中,否则它们将被删除。因此,我可以实现 pivot 函数,以确保每个字段都有自己的列,方法是将其 columnKey 值设置为 _field,并将我的 rowKey 设置为不仅包含 _time,还包含 locationtrain_measurement

from(bucket: "trains")
    |> range(start: -60d)
    |> filter(fn: (r) =>
        r._measurement == "train_speed"
    )
    |>group(columns: ["driver"])
    |> pivot(
        rowKey:["_time", "location", "train", "_measurement"],
        columnKey: ["_field"],
        valueColumn: "_value"
      )

此查询返回如下结果

我现在已将我的字段分解到它们自己的列中,并且还保留了我的 measurement 和标签作为行键。作为提醒,这是我开始使用的

稍后我们将看到当我们向 columnKey 数组添加其他值时会发生什么。

但是,如果我需要根据特定时间戳存在的值对数据进行排序怎么办?您可以修改 pivot,如下所示

from(bucket: "trains")
    |> range(start: -60d)
    |> filter(fn: (r) =>
        r._measurement == "train_speed"
    )
    |>group(columns: ["driver"])
    |> pivot(
        rowKey:["_field", "location", "train", "_measurement"],
        columnKey: ["_time"],
        valueColumn: "_value"
      )

通过将 _time 设置为我的 columnKey,并将 _field 设置为 rowKey,我现在能够生成如下所示的数据

我现在为我的每个字段都有一行,为每个时间戳都有一列。滚动到右侧允许我查看所有时间戳及其各种行值。

另一个有用的 pivot 允许您将每个标签都显示在列名中以及字段一起,这通过允许您拥有更具体的列名来提高可读性。如果您对特定标签有多个值,这可能会派上用场,但如果您对标签有很多不同的可能值,则应谨慎使用。您可以运行以下查询

from(bucket: "trains")
    |> range(start: -60d)
    |> filter(fn: (r) =>
        r._measurement == "train_speed"
    )
    |>group(columns: ["driver"])
    |> pivot(
        rowKey:["_time", "location", "_measurement"],
        columnKey: ["_field", "train"],
        valueColumn: "_value"
      )

此查询生成的结果是 driver Gupta 同时驾驶了 train atrain b。您将获得单独的列,其中包含所有可能的 train 值(只有两个,因此此查询资源消耗不大)包含在列名中,并用下划线与字段键分隔

所以现在您对 Flux 中的 pivot 工作原理有了一个概念。这是一个有用的函数,用于返回与默认约束规定的格式不同的数据。您可以通过多种不同的方式实现该函数,我发现试验更改 rowKeycolumnKey 数组中的值以查看数据如何以不同方式返回很有用。我鼓励您也这样做,并享受使用 Flux 中提供的所有功能!