InfluxDB:使用 Flux 中的 Pivot 函数重新组织数据
作者 Sonia Gupta / 用例,开发者,产品
2019年2月5日
导航至
如果您一直跟随我的 Flux 之旅,在我使用 Rails 和 GraphQL 实现 Flux 查询的过程中,您会知道我在实现 Flux 中的各种函数时使用了列车数据。为了让我的 Rails 项目的 GraphQL 钥匙(它对每个属性调用对象上的一个方法)正常工作,我需要在每个列车数据点创建一个 Ruby 对象,将每个标签和字段分离到它们自己的属性中,如下所示
class Train
attr_reader :time, :measurement, :driver, :location, :train, :outdoor_temp, :speed, :track_temp
def initialize(train)
@time = train["_time"]
@measurement = train["_measurement"]
@driver = train["driver"]
@location = train["location"]
@train = train["train"]
@outdoor_temp = train["outdoor_temp"]
@speed = train["speed"]
@track_temp = train["track_temp"]
end
end
如果没有正确的 Flux 函数,当我查询数据库以返回列车数据点时,我的标签将被分离到它们自己的列中,但我的字段将被汇总到一个单一的列中,_field
。我的数据不会按照我想要的方式通过基础查询来塑造,如下所示,以及它返回的结果
from(bucket: "trains")
|> range(start: -60d)
|> filter(fn: (r) =>
r._measurement == "train_speed"
)
|>group(columns: ["driver"])
我必须编写代码来重新塑造数据吗?这看起来并不正确。那么如果我不能将字段提取到它们自己的列中以便于解析,我如何创建一个干净的 Ruby 对象?pivot 函数允许我做到这一点。
Flux 提供了一个 pivot 函数,它允许您将存在于单个列中的数据重新塑造为多个列,从而确保查询引擎返回的数据以一种更容易使用的方式呈现,实际上并不改变底层数据本身。您通常会满足于一组预定义的列和行,pivot 允许您指定数据的组织方式和显示方式,可以使其更容易阅读和直接使用,而无需编写额外的应用程序代码。让我们看看它是如何工作的。请参阅文档
输出是按照以下方式构建的
- 为输入中通过 rowKey 参数识别的每个唯一值创建一个新行。
- 新行的一组初始列是行键与分组键的并集,但不包括由 columnKey 和 valueColumn 参数指示的列。
- 为输入中通过 columnKey 参数识别的每个唯一值向行中添加一组值列。标签是 valueColumn 字符串和 columnKey 值的连接,使用 _ 作为分隔符。
- 对于每个行键 + 列键对,通过 valueColumn 从输入表中确定适当的值。如果没有找到值,则将值设置为 null。
为了探索pivot功能,我使用了InfluxDB v1.7与Flux技术预览版(但该工作流程与v2.0 Alpha版本相同)。首先,我使用写入端点将名为trains
的桶中填充了包含列车数据的点。度量值是train_speed
。标签包括driver
、location
和train
。字段有outdoor_temp
、speed
和track_temp
。我希望看到所有的字段都分别在不同的列中,而不是汇总在单个列中,这样我就可以像上面提到的那样创建一个干净的Ruby对象。提醒一下,以下是我开始时数据形状的示例,按驾驶员分组,你可以看到我的字段都汇总到了一个名为_field
的单个列中
使用以下查询实现了按驾驶员分组的上述表格可视化
from(bucket: "trains")
|> range(start: -60d)
|> filter(fn: (r) =>
r._measurement == "train_speed"
)
|>group(columns: ["driver"])
在此查询中,我从trains
桶中获取数据,设置了一个60天的范围(在Flux中设置时间范围是必需的),并请求最广泛的filter
,即度量值。然后我通过driver
对数据进行分组,以查看与每个驾驶员相关的所有可用数据。这是一个非常广泛的过滤器,可能不适合在大数据集上使用,但它有助于从高层次上说明过滤功能。
查询结果使得我的度量值和标签都分别排序到自己的列中,但所有字段都被收集到同一个_field
列中。我可以通过查看UI中的原始数据输出来确认这一点,这对于确认是否得到想要的数据形状非常有用。如果我想看到一个表格,其中所有的字段都分别在不同的列中,我可以运行以下查询来实现pivot功能
from(bucket: "trains")
|> range(start: -60d)
|> filter(fn: (r) =>
r._measurement == "train_speed"
)
|>group(columns: ["driver"])
|> pivot(
rowKey:["_time"],
columnKey: ["_field"],
valueColumn: "_value"
)
此查询返回的结果如下
你可以看到,现在我的每个字段都有自己的列,但我不再看到我的度量值或标签。如果我想在按驾驶员分组的同时,仍然将这些字段分别显示在不同的列中,并保留度量值和标签的行,那么除非那些原始列存在于group
函数或rowKey
中,否则它们将被删除。因此,我可以通过将每个字段的columnKey
值设置为_field
,并将我的rowKey
设置为包含不仅_time
,还包括location
、train
和_measurement
来实施pivot功能
from(bucket: "trains")
|> range(start: -60d)
|> filter(fn: (r) =>
r._measurement == "train_speed"
)
|>group(columns: ["driver"])
|> pivot(
rowKey:["_time", "location", "train", "_measurement"],
columnKey: ["_field"],
valueColumn: "_value"
)
此查询返回的结果如下
现在我的字段已经分别在不同的列中,并且我还保留了度量值和标签作为行键。提醒一下,这是我开始时的样子
稍后我们将看到向columnKey
数组添加额外值时会发生什么。
但如果我们需要根据特定时间戳存在的值对数据进行排序怎么办?你可以像下面这样修改pivot
from(bucket: "trains")
|> range(start: -60d)
|> filter(fn: (r) =>
r._measurement == "train_speed"
)
|>group(columns: ["driver"])
|> pivot(
rowKey:["_field", "location", "train", "_measurement"],
columnKey: ["_time"],
valueColumn: "_value"
)
通过将 _time
设置为我的 columnKey
,并将 _field
设置为 rowKey
,我现在可以生成如下所示的数据
现在,我有一个字段行和一个时间戳列。向右滚动可以让我看到所有的时间戳及其各种行值。
另一个有用的功能是允许每个标签都出现在列名中,与字段一起,这通过允许您具有更具体的列名来提高可读性。如果您有多个特定标签的值,这会很有用,但如果标签的可能值很多,应谨慎使用。您可以运行以下查询
from(bucket: "trains")
|> range(start: -60d)
|> filter(fn: (r) =>
r._measurement == "train_speed"
)
|>group(columns: ["driver"])
|> pivot(
rowKey:["_time", "location", "_measurement"],
columnKey: ["_field", "train"],
valueColumn: "_value"
)
此查询会产生一个结果,其中驾驶员 Gupta 驾驶了 train a
和 train b
。您会得到包含所有可能列车值(这里只有两个,所以此查询不耗费资源)的单独列,并且通过下划线与字段键隔开
因此,您现在对 Flux 中的 pivot 的工作方式有了一定的了解。这是一个有用的函数,可以以不同于默认约束规定的格式返回数据。您可以以多种不同的方式实现该函数,我发现通过更改 rowKey
和 columnKey
数组中的值来观察数据如何以不同方式返回非常有用。我鼓励您也这样做,并享受在 Flux 中尝试所有功能!