使用 Flux 对时序数据进行分组、塑形和分析

导航到

Flux 是一种从头开始设计用于时序分析的编程语言。传统上,对大型动态时序数据集进行分组、塑形和执行数学运算非常繁琐。Flux 的目标是使这些数据集的处理变得更加优雅。

在 InfluxDB 2 OSS Alpha 中添加了几个令人兴奋的新 Flux 功能,这促使我全身心投入实践以了解它们在实际中的应用。我的初次尝试很有希望,但某些地方还不够完美。

我对 Flux 的关键洞察

当我研究如何跨多个数据存储进行数据处理时,我很快意识到我遗漏了一些基本的东西。我得到了一些意想不到的答案,并且许多图表看起来都不对。

为了找到答案,我联系了 Flux 团队的 Adam Anthony,他一步一步地向我解释了一个更复杂的用例。经过几步操作后,我恍然大悟——我必须密切注意的关键部分是表格结果。

表格作为数据

您遇到的第一个几个 Flux 示例看起来非常简单。这是一个本地 Telegraf 代理收集的系统指标查询的典型示例

from(bucket:"my-bucket")
  |> range(start: -10m)
  |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu != "cpu-total" )

这个 Flux 查询从我的-bucket 中提取了 Telegraf 代理在过去十分钟内写入的所有数据,并过滤掉 cpu 总值以外的 cpu 系统使用率指标。

Flux time series data - dashboard

当我第一次在 InfluxDB Cloud 2 的 UI 中看到这个查询的结果时,我立刻得出结论,Flux 只简单地返回了四个数组——每个数组都是一串数字。结果证明,我的这种初步印象非常错误,这也成为了我在尝试解决更复杂场景时的失误之源。

在 Adam 的课程之后,我意识到使用和理解 Flux 取决于理解 Flux 结果的每一个部分。Flux 结果是表格,有时结果是一个相当庞大的表格集。我了解到,在分析 Flux 时,最有用的工具不是绘图工具,而是 UI 中的“查看原始数据”切换按钮。

Flux time series data dashboard - Flux results

原始视图正是如此——Flux 的完整原始输出。当我第一次查看原始视图时,我以为 UI 可能添加了诸如标签、时间戳或扩展描述等内容。事实上,UI 对结果没有任何添加——这里的每个标签、行和列都是直接来自 Flux 的响应。

深入响应中,最容易忽视的是表格列——本例中的第 3 列。上面的 UI 已滚动以显示两个表格(表格 0 和表格 1)的数据。我们可以看到上面的系统 cpu Flux 查询并没有返回四个数组,而是返回了四个表格——每个表格对应 Telegraf 监控的四个 CPU。

在操作、分析和塑造数据的过程中,如果你始终牢记每个中间计算和结果都是一个表格,你将发现自己离所寻求的答案更近。下面的最终示例通过关注表格来展示一个更复杂的使用案例,以演示如何逐步找到答案。

再次回顾鱼缸健康供应商

让我们继续一个案例,一个鱼缸健康供应商,该供应商将物联网元数据存储在 PostgreSQL 中,将传感器数据存储在 InfluxDB 中。在那个例子中,供应商希望他们的客户成功代表了解所有客户的鱼缸的 pH 平衡。在连接之后,表格结果看起来像这样

time series data - sensor data InfluxDB

这个结果对客户成功代表来说足够简单,易于阅读和理解,但如果添加几十个水箱和传感器,代表们就会在数据中迷失。为了使事情更简单,管理层意识到代表们真正需要了解的是 pH 值高于 8 的鱼缸的位置。

幸运的是,他们使用 Flux,可以使用内置的管道前进操作符简单地推送上述初始表格结果到额外的计算中,以找到最高的平均 pH 值。

//  group by location so that we can analyze each location independently of the others. (creates 3 tables out of 1)
|> group(columns: ["customer_location"])

Flux table results

// find the mean value inside of each table
 |> mean(column: "_value")

Flux table results mean value

// grouping by no columns puts all of the tables together into one table
|> group(columns: [])

// filter out all rows of data below a value of 8
|> filter(fn: (r) => r._value >  8)

Flux - filter rows

// sort the list to ensure the highest priority fish tanks are at the top of the list
|> sort(columns: ["_value"], desc: true)

Flux - sort columns

在处理和过滤所有数据后,客户成功代表应该有一份可管理的鱼缸列表可以关注。在这个例子中,列表顶部是 Lobby 鱼缸,看起来它需要一些认真的关注。

Flux 的未来充满希望

Flux 正在步入正轨,许多强大的功能,如条件和多数据存储等,正在发布。现在是你如果正在处理任何时间序列分析用例的话,跳入并深入了解 Flux 的好时机。你可以通过注册 InfluxDB Cloud 2 或下载最新的 OSS alpha 来轻松开始。

一如既往,如果您对 Flux 有任何问题或功能请求,请访问Flux 的社区论坛并告诉我们。