如何在 Flux 中旋转您的数据:处理列式数据
作者:Michael Hall / 产品,用例,开发者
2021年10月12日
导航到
关系型数据库无疑是数据库中最常见的类型,作为软件开发者,可以说它们是我们大多数人的入门数据库,并且可能仍然在日常使用中。它们共同的特点是数据结构。然而,InfluxDB 的数据结构略有不同。
RDBMS 默认表结构
在关系型数据库中,您有一组表,每个表都有其自己的固定列布局,并且您通过行来读取和写入数据。每一行都有一个列,它包含每个字段的值,并且您会同时获得所有这些值,从而得到一个看起来像这样的表
时间 | 字段1 | 字段2 | 主机 |
<数据时间戳> | 123 | 789 | myhost1 |
<数据时间戳> | 456 | 101 | myhost2 |
InfluxDB 结果数据
InfluxDB 不遵循这种模式。它以所谓的列式格式返回数据,这意味着您将一次获得一列数据,而不是一次一行数据。这对于习惯了从关系型数据库中使用的行格式开发的开发者来说,可能会引起一些困惑,但在处理时间序列数据时却有很多好处。
当您运行像这样的基本 Flux 查询时
from(bucket: "telegraf")
|> range(start: -1d)
您将得到如下所示的数据返回
表 | _measurement* | _field* | _value | _start* | _stop* | _time | host* |
0 | mydata | 字段1 | 123 | <查询开始时间> | <查询结束时间> | <数据时间戳> | myhost1 |
0 | mydata | 字段1 | 456 | <查询开始时间> | <查询结束时间> | <数据时间戳> | myhost2 |
1 | mydata | 字段2 | 789 | <查询开始时间> | <查询结束时间> | <数据时间戳> | myhost1 |
1 | mydata | 字段2 | 101 | <查询开始时间> | <查询结束时间> | <数据时间戳> | myhost2 |
您首先会注意到,表的列标题并不是您的字段名称,就像您在查询 SQL 数据库时那样。相反,您的查询将返回这些相同的列,并且查询结果中的每个记录(如下面的查询中的 r
所表示)将代表该时间点单个字段的值。
这就是为什么您的 Flux 查询过滤看起来像这样
|> filter(fn: (r) => r["_measurement"] == "mydata")
|> filter(fn: (r) => r["_field"] == "field1")
这种数据结构允许在 InfluxDB 最常见的使用模式中实现更快的查询和聚合。因为 InfluxDB 一次处理一列的值,所以它可以在仅对该字段进行计算,而不是同时进行多个计算。
在列式和行式数据布局之间切换
然而,有时候你可能希望数据看起来更像传统数据库查询的结果。为此,你需要执行“旋转”操作。数据旋转是将字段从单独的行转换为单独的列的过程。
Flux 提供了一个执行此操作的 pivot
函数。
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
此函数将翻转数据,使每一行代表一个时间点(rowKey:["_time"]
),字段将成为列(columnKey: ["_field"]
),每一列的值将是字段的值(valueColumn: "_value"
)。结果如下所示
表 | _measurement* | _start* | _stop* | _time | host* | 字段1 | 字段2 |
0 | mydata | <查询开始时间> | <查询结束时间> | <数据时间戳> | myhost1 | 123 | 789 |
0 | mydata | <查询开始时间> | <查询结束时间> | <数据时间戳> | myhost2 | 456 | 101 |
使用 fieldsAsCols 而不是
这是人们最常希望旋转数据的方式,因为它将数据转换成与传统数据库相同的工作形状。事实上,我们提供了 schema
包中的快捷函数,专门用于该用例。
在将 schema
导入到你的 Flux 脚本后,上述代码行可以替换为:
Flux
|> fieldsAsCols()
其他旋转方式
标签作为列
尽管这是最常用的旋转方式,但字段并不是唯一可以翻转成列标题的东西。假设你想比较的不是不同的字段,而是来自不同来源的相同字段的值。例如,如果你想在同一时间比较不同服务器主机读取的值。
在我们的上一个例子中,我们的数据有一个名为 host
的标签,其中包含此值。
表 | _measurement* | _field* | _value | _start* | _stop* | _time | host* |
0 | mydata | 字段1 | 123 | <查询开始时间> | <查询结束时间> | <数据时间戳> | myhost1 |
0 | mydata | 字段1 | 456 | <查询开始时间> | <查询结束时间> | <数据时间戳> | myhost2 |
1 | mydata | 字段2 | 789 | <查询开始时间> | <查询结束时间> | <数据时间戳> | myhost1 |
1 | mydata | 字段2 | 101 | <查询开始时间> | <查询结束时间> | <数据时间戳> | myhost2 |
如果我们想将所有主机中每个字段的值合并到一个记录中,我们可以使用以下方法
|> group()
|> pivot(rowKey:["_time", "_field"], columnKey: ["host"], valueColumn: "_value")
第一行重新组合我们的数据,因为 InfluxDB 默认会将来自不同主机(或任何标签值差异)的值放入不同的结果表中。由于 pivot
在一次处理一个结果表,我们必须先调用 group
将来自两个主机的值放入一个表中,然后我们可以围绕 host
值旋转这个合并后的表。
此查询将给我们一个如下所示的表结构
表 | _field | _start | _stop | _time | myhost1 | myhost2 |
0 | 字段1 | <查询开始时间> | <查询结束时间> | <数据时间戳> | 123 | 456 |
0 | 字段2 | <查询开始时间> | <查询结束时间> | <数据时间戳> | 789 | 101 |
合并后的列
您还可以使用 pivot
将字段名与标签值合并。基于上面的例子,假设你想在一个单独的行中看到每个主机的 field1
和 field2
的值?
我们可以通过稍微修改之前的功能调用来做到这一点
|> group()
|> pivot(rowKey:["_time"], columnKey: ["host", "_field"], valueColumn: "_value")
通过将 _field
数据从 rowKey
移动到 columnKey
,以及 host
数据,我们得到一个将它们合并到唯一列中的表。
表 | _start | _stop | _time | myhost1_field1 | myhost1_field2 | myhost2_field1 | myhost2_field2 |
0 | <查询开始时间> | <查询结束时间> | <数据时间戳> | 123 | 789 | 456 | 101 |
进一步阅读
- pivot() 函数:pivot() 函数的参考文档。
- schema.fieldsAsCol() 函数:来自 schema 包的 fieldsAsCol() 函数的参考文档。
- group() 函数:group() 函数的参考文档。
- TL;DR InfluxDB 技术技巧:Flux 中的使用多个 yield():本文介绍了如何使用多个 yield()。
- TL;DR InfluxDB 技术技巧 – 在标签或字段之间聚合和取消分组:本文介绍了如何使用 group() 合并结果表。