如何在 Flux 中旋转您的数据:处理列式数据

导航到


关系型数据库无疑是数据库中最常见的类型,作为软件开发者,可以说它们是我们大多数人的入门数据库,并且可能仍然在日常使用中。它们共同的特点是数据结构。然而,InfluxDB 的数据结构略有不同。

RDBMS 默认表结构

在关系型数据库中,您有一组表,每个表都有其自己的固定列布局,并且您通过行来读取和写入数据。每一行都有一个列,它包含每个字段的值,并且您会同时获得所有这些值,从而得到一个看起来像这样的表

时间 字段1 字段2 主机
<数据时间戳> 123 789 myhost1
<数据时间戳> 456 101 myhost2

InfluxDB 结果数据

InfluxDB 不遵循这种模式。它以所谓的列式格式返回数据,这意味着您将一次获得一列数据,而不是一次一行数据。这对于习惯了从关系型数据库中使用的行格式开发的开发者来说,可能会引起一些困惑,但在处理时间序列数据时却有很多好处。

当您运行像这样的基本 Flux 查询时

from(bucket: "telegraf")
  |> range(start: -1d)

您将得到如下所示的数据返回

_measurement* _field* _value _start* _stop* _time host*
0 mydata 字段1 123 <查询开始时间> <查询结束时间> <数据时间戳> myhost1
0 mydata 字段1 456 <查询开始时间> <查询结束时间> <数据时间戳> myhost2
1 mydata 字段2 789 <查询开始时间> <查询结束时间> <数据时间戳> myhost1
1 mydata 字段2 101 <查询开始时间> <查询结束时间> <数据时间戳> myhost2

您首先会注意到,表的列标题并不是您的字段名称,就像您在查询 SQL 数据库时那样。相反,您的查询将返回这些相同的列,并且查询结果中的每个记录(如下面的查询中的 r 所表示)将代表该时间点单个字段的值。

这就是为什么您的 Flux 查询过滤看起来像这样

|> filter(fn: (r) => r["_measurement"] == "mydata")
|> filter(fn: (r) => r["_field"] == "field1")

这种数据结构允许在 InfluxDB 最常见的使用模式中实现更快的查询和聚合。因为 InfluxDB 一次处理一列的值,所以它可以在仅对该字段进行计算,而不是同时进行多个计算。

在列式和行式数据布局之间切换

然而,有时候你可能希望数据看起来更像传统数据库查询的结果。为此,你需要执行“旋转”操作。数据旋转是将字段从单独的行转换为单独的列的过程。

Flux 提供了一个执行此操作的 pivot 函数。

|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

此函数将翻转数据,使每一行代表一个时间点(rowKey:["_time"]),字段将成为列(columnKey: ["_field"]),每一列的值将是字段的值(valueColumn: "_value")。结果如下所示

_measurement* _start* _stop* _time host* 字段1 字段2
0 mydata <查询开始时间> <查询结束时间> <数据时间戳> myhost1 123 789
0 mydata <查询开始时间> <查询结束时间> <数据时间戳> myhost2 456 101

使用 fieldsAsCols 而不是

这是人们最常希望旋转数据的方式,因为它将数据转换成与传统数据库相同的工作形状。事实上,我们提供了 schema 包中的快捷函数,专门用于该用例。

在将 schema 导入到你的 Flux 脚本后,上述代码行可以替换为:

Flux
  |> fieldsAsCols()

其他旋转方式

标签作为列

尽管这是最常用的旋转方式,但字段并不是唯一可以翻转成列标题的东西。假设你想比较的不是不同的字段,而是来自不同来源的相同字段的值。例如,如果你想在同一时间比较不同服务器主机读取的值。

在我们的上一个例子中,我们的数据有一个名为 host 的标签,其中包含此值。

_measurement* _field* _value _start* _stop* _time host*
0 mydata 字段1 123 <查询开始时间> <查询结束时间> <数据时间戳> myhost1
0 mydata 字段1 456 <查询开始时间> <查询结束时间> <数据时间戳> myhost2
1 mydata 字段2 789 <查询开始时间> <查询结束时间> <数据时间戳> myhost1
1 mydata 字段2 101 <查询开始时间> <查询结束时间> <数据时间戳> myhost2

如果我们想将所有主机中每个字段的值合并到一个记录中,我们可以使用以下方法

|> group()
  |> pivot(rowKey:["_time", "_field"], columnKey: ["host"], valueColumn: "_value")

第一行重新组合我们的数据,因为 InfluxDB 默认会将来自不同主机(或任何标签值差异)的值放入不同的结果表中。由于 pivot 在一次处理一个结果表,我们必须先调用 group 将来自两个主机的值放入一个表中,然后我们可以围绕 host 值旋转这个合并后的表。

此查询将给我们一个如下所示的表结构

_field _start _stop _time myhost1 myhost2
0 字段1 <查询开始时间> <查询结束时间> <数据时间戳> 123 456
0 字段2 <查询开始时间> <查询结束时间> <数据时间戳> 789 101

合并后的列

您还可以使用 pivot 将字段名与标签值合并。基于上面的例子,假设你想在一个单独的行中看到每个主机的 field1field2 的值?

我们可以通过稍微修改之前的功能调用来做到这一点

|> group()
  |> pivot(rowKey:["_time"], columnKey: ["host", "_field"], valueColumn: "_value")

通过将 _field 数据从 rowKey 移动到 columnKey,以及 host 数据,我们得到一个将它们合并到唯一列中的表。

_start _stop _time myhost1_field1 myhost1_field2 myhost2_field1 myhost2_field2
0 <查询开始时间> <查询结束时间> <数据时间戳> 123 789 456 101

进一步阅读