InfluxDB:使用 Flux 中的 Pivot 函数重新组织数据

导航至


如果您一直跟随我的 Flux 之旅,在我使用 RailsGraphQL 实现 Flux 查询的过程中,您会知道我在实现 Flux 中的各种函数时使用了列车数据。为了让我的 Rails 项目的 GraphQL 钥匙(它对每个属性调用对象上的一个方法)正常工作,我需要在每个列车数据点创建一个 Ruby 对象,将每个标签和字段分离到它们自己的属性中,如下所示

class Train
   attr_reader :time, :measurement, :driver, :location, :train, :outdoor_temp, :speed, :track_temp

   def initialize(train)
      @time = train["_time"]
      @measurement = train["_measurement"]
      @driver = train["driver"]
      @location = train["location"]
      @train = train["train"]
      @outdoor_temp = train["outdoor_temp"]
      @speed = train["speed"]
      @track_temp = train["track_temp"]
   end

end

如果没有正确的 Flux 函数,当我查询数据库以返回列车数据点时,我的标签将被分离到它们自己的列中,但我的字段将被汇总到一个单一的列中,_field。我的数据不会按照我想要的方式通过基础查询来塑造,如下所示,以及它返回的结果

from(bucket: "trains")
    |> range(start: -60d)
    |> filter(fn: (r) =>
        r._measurement == "train_speed"
    )
    |>group(columns: ["driver"])

我必须编写代码来重新塑造数据吗?这看起来并不正确。那么如果我不能将字段提取到它们自己的列中以便于解析,我如何创建一个干净的 Ruby 对象?pivot 函数允许我做到这一点。

Flux 提供了一个 pivot 函数,它允许您将存在于单个列中的数据重新塑造为多个列,从而确保查询引擎返回的数据以一种更容易使用的方式呈现,实际上并不改变底层数据本身。您通常会满足于一组预定义的列和行,pivot 允许您指定数据的组织方式和显示方式,可以使其更容易阅读和直接使用,而无需编写额外的应用程序代码。让我们看看它是如何工作的。请参阅文档

输出是按照以下方式构建的

  1. 为输入中通过 rowKey 参数识别的每个唯一值创建一个新行。
  2. 新行的一组初始列是行键与分组键的并集,但不包括由 columnKey 和 valueColumn 参数指示的列。
  3. 为输入中通过 columnKey 参数识别的每个唯一值向行中添加一组值列。标签是 valueColumn 字符串和 columnKey 值的连接,使用 _ 作为分隔符。
  4. 对于每个行键 + 列键对,通过 valueColumn 从输入表中确定适当的值。如果没有找到值,则将值设置为 null。

为了探索pivot功能,我使用了InfluxDB v1.7与Flux技术预览版(但该工作流程与v2.0 Alpha版本相同)。首先,我使用写入端点将名为trains的桶中填充了包含列车数据的点。度量值是train_speed。标签包括driverlocationtrain。字段有outdoor_tempspeedtrack_temp。我希望看到所有的字段都分别在不同的列中,而不是汇总在单个列中,这样我就可以像上面提到的那样创建一个干净的Ruby对象。提醒一下,以下是我开始时数据形状的示例,按驾驶员分组,你可以看到我的字段都汇总到了一个名为_field的单个列中

使用以下查询实现了按驾驶员分组的上述表格可视化

from(bucket: "trains")
    |> range(start: -60d)
    |> filter(fn: (r) =>
        r._measurement == "train_speed"
    )
    |>group(columns: ["driver"])

在此查询中,我从trains桶中获取数据,设置了一个60天的范围(在Flux中设置时间范围是必需的),并请求最广泛的filter,即度量值。然后我通过driver对数据进行分组,以查看与每个驾驶员相关的所有可用数据。这是一个非常广泛的过滤器,可能不适合在大数据集上使用,但它有助于从高层次上说明过滤功能。

查询结果使得我的度量值和标签都分别排序到自己的列中,但所有字段都被收集到同一个_field列中。我可以通过查看UI中的原始数据输出来确认这一点,这对于确认是否得到想要的数据形状非常有用。如果我想看到一个表格,其中所有的字段都分别在不同的列中,我可以运行以下查询来实现pivot功能

from(bucket: "trains")
    |> range(start: -60d)
    |> filter(fn: (r) =>
        r._measurement == "train_speed"
    )
    |>group(columns: ["driver"])
    |> pivot(
        rowKey:["_time"],
        columnKey: ["_field"],
        valueColumn: "_value"
      )

此查询返回的结果如下

你可以看到,现在我的每个字段都有自己的列,但我不再看到我的度量值或标签。如果我想在按驾驶员分组的同时,仍然将这些字段分别显示在不同的列中,并保留度量值和标签的行,那么除非那些原始列存在于group函数或rowKey中,否则它们将被删除。因此,我可以通过将每个字段的columnKey值设置为_field,并将我的rowKey设置为包含不仅_time,还包括locationtrain_measurement来实施pivot功能

from(bucket: "trains")
    |> range(start: -60d)
    |> filter(fn: (r) =>
        r._measurement == "train_speed"
    )
    |>group(columns: ["driver"])
    |> pivot(
        rowKey:["_time", "location", "train", "_measurement"],
        columnKey: ["_field"],
        valueColumn: "_value"
      )

此查询返回的结果如下

现在我的字段已经分别在不同的列中,并且我还保留了度量值和标签作为行键。提醒一下,这是我开始时的样子

稍后我们将看到向columnKey数组添加额外值时会发生什么。

但如果我们需要根据特定时间戳存在的值对数据进行排序怎么办?你可以像下面这样修改pivot

from(bucket: "trains")
    |> range(start: -60d)
    |> filter(fn: (r) =>
        r._measurement == "train_speed"
    )
    |>group(columns: ["driver"])
    |> pivot(
        rowKey:["_field", "location", "train", "_measurement"],
        columnKey: ["_time"],
        valueColumn: "_value"
      )

通过将 _time 设置为我的 columnKey,并将 _field 设置为 rowKey,我现在可以生成如下所示的数据

现在,我有一个字段行和一个时间戳列。向右滚动可以让我看到所有的时间戳及其各种行值。

另一个有用的功能是允许每个标签都出现在列名中,与字段一起,这通过允许您具有更具体的列名来提高可读性。如果您有多个特定标签的值,这会很有用,但如果标签的可能值很多,应谨慎使用。您可以运行以下查询

from(bucket: "trains")
    |> range(start: -60d)
    |> filter(fn: (r) =>
        r._measurement == "train_speed"
    )
    |>group(columns: ["driver"])
    |> pivot(
        rowKey:["_time", "location", "_measurement"],
        columnKey: ["_field", "train"],
        valueColumn: "_value"
      )

此查询会产生一个结果,其中驾驶员 Gupta 驾驶了 train atrain b。您会得到包含所有可能列车值(这里只有两个,所以此查询不耗费资源)的单独列,并且通过下划线与字段键隔开

因此,您现在对 Flux 中的 pivot 的工作方式有了一定的了解。这是一个有用的函数,可以以不同于默认约束规定的格式返回数据。您可以以多种不同的方式实现该函数,我发现通过更改 rowKeycolumnKey 数组中的值来观察数据如何以不同方式返回非常有用。我鼓励您也这样做,并享受在 Flux 中尝试所有功能!