使用 InfluxDB v2.0 进行降采样
作者:Anais Dotis-Georgiou / 产品, 使用场景, 开发者
2020年10月23日
导航至
(**更新**:InfluxDB 3.0 放弃了 Flux 和内置任务引擎。希望从 2.x 迁移到 3.0 的用户可以使用外部工具,例如基于 Python 的 Quix,在 InfluxDB 3.0 中创建任务。)
降采样是指在时间窗口内聚合高分辨率时间序列,然后将较低分辨率的聚合存储到新存储桶的过程。例如,假设您有一个监控温度的物联网应用程序。您的温度传感器可能会收集温度数据。此数据以分钟为间隔收集。它实际上只在白天对您有用。没有必要以该分辨率保留历史温度数据,因此您执行降采样任务以仅保留小时精度的平均温度。这种降采样有助于保留数据的整体形状,而不会浪费不必要的磁盘空间。降采样还有助于加快大时间范围的查询速度,因为它减少了作为查询一部分评估的数据量。
降采样的好处:磁盘使用率和查询性能
降采样是常见的时间序列数据库任务,通常是时间序列数据库用户的要求。降采样允许您随着时间的推移收集数据,从而减少总体磁盘使用量。降采样还可以提高 InfluxDB OSS 或 Cloud 的查询性能。最后,InfluxDB 每秒可以摄取数十万个数据点。保留如此大量的数据不仅会产生存储问题,而且也很难处理。降采样允许您消除高精度时间序列的噪音,从而使您能够更快地分析和从历史时间序列中获取价值。
InfluxDB 2.x 中的降采样得到升级 – 从连续查询到降采样任务
随着 InfluxDB 从 1.x 升级到 2.x,降采样功能也随之升级。在 InfluxDB 1.x 中,用户通常使用连续查询以相对较小的规模实现降采样。连续查询完全通过 Influx CLI 工具创建、维护和更改。连续查询也是一个独立的功能,但是随着您要降采样的数据量增加,它会对 InfluxDB 本身的读取和写入性能产生负面影响。在大规模情况下,建议将工作卸载到 Kapacitor。
在 InfluxDB 2.x 中,您可以使用降采样任务执行降采样。任务使您能够执行各种数据处理作业。所有任务,包括降采样,以及 InfluxDB 2.x 中的查询都使用一种语言编写 – Flux。您可以通过 CLI 和 UI 创建任务。在 2.x 中通过 UI 管理任务是使用 InfluxDB 执行数据处理作业的最简单方法。使用一种工具和语言执行所有数据转换的能力还有助于您更好地组织和可视化时间序列管道。相比之下,在 1.x TICK 堆栈中,您必须熟悉使用 Kapacitor 时的 TICKscripts、InfluxQL 和连续查询语法才能创建相同的数据处理作业。
不同分辨率数据的保留策略
为了成功创建降采样任务,您需要有一个源存储桶和一个目标存储桶,每个存储桶都有不同的保留策略。
- *源*存储桶是您写入高精度数据的地方。
- *目标*存储桶是您写入聚合数据的地方。
目标存储桶应具有比源存储桶更长的保留策略。
完整的降采样任务示例
您可以使用 API 或 UI 创建任务。创建任务的最简单方法是通过 InfluxDB UI。在左侧导航栏的 **Tasks** 菜单下,单击 **Create Task**。您可以创建一个新任务、导入任务或从您已经导入的现有模板导入任务(请阅读此文档以了解如何导入现有模板)。
任务创建和调试任务的推荐工作流程
假设我需要在我的系统数据上执行降采样任务。为了演示的目的,我将每分钟运行一次任务,并以 10 秒的间隔聚合我的数据。创建任务的最佳方法是在 Query Builder 或 Script Editor 中创建最适合您需求的聚合。我创建了以下聚合
from(bucket: "my-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) =>
(r["_measurement"] == "cpu"))
|> filter(fn: (r) =>
(r["_field"] == "usage_user"))
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
// Use the to() function to validate that the results look correct. This is optional.
|> to(bucket: "downsampled", org: "my-org")
我们应该只从 **Data Explorer** 将 Flux 脚本另存为任务。要从 **Data Explorer** 将查询另存为任务,请单击左上角的 **Save As** 按钮。确保您已创建“downsampled”存储桶,以便可以从下拉列表中选择它。
但是,我们看到用户遇到复制粘贴问题。我们将描述这个问题,以突出显示如何调试任务。然而,将查询导出为任务是推荐的工作流程。现在我们的查询看起来很棒,我将该查询复制并粘贴到我的任务中。它看起来像这样
option task = {name: "Downsampling CPU", every: 1m}
data = from(bucket: "my-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) =>
(r["_measurement"] == "cpu"))
|> filter(fn: (r) =>
(r["_field"] == "usage_user"))
data
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
|> to(bucket: "downsampled", org: "my-org")
您可以手动编写任务名称和间隔设置,也可以使用 UI 指定这些配置选项。使用 UI 会自动生成任务的第一行。接下来,我将代码从 Script Editor 复制并粘贴到任务中。保存任务后,请注意任务状态已失败。
要查看日志,请单击任务右侧的齿轮图标。选择 **View Task Run**。请注意,您还可以在此处选择将任务导出为 JSON 对象。这使您可以轻松地与他人共享任务,这对于调试非常有用。
**View Task Runs** 按钮会将您定向到处理引擎尝试执行任务的所有时间的列表。
如果您单击 **View Logs**,则可以看到导致任务失败的错误。在这里,我们看到了与 Task Run 失败相关的错误,“Error exhausting result iterator; Err: error @4:18-4:19: undefined identifier v: error @4:18-4:19: undefined identifier v”。
此错误源于直接从 Script Editor 复制粘贴我们的 Flux 代码。我们需要将时间范围从引用仪表板时间的变量更改为实际时间。我们生成的任务如下所示
option task = {name: "Downsampling CPU", every: 1m}
data = from(bucket: "my-bucket")
|> range(start: -task.every)
|> filter(fn: (r) =>
(r["_measurement"] == "cpu"))
|> filter(fn: (r) =>
(r["_field"] == "usage_user"))
data
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
|> to(bucket: "downsampled", org: "my-org")
现在,在保存任务后,您可以看到任务旁边的绿色箭头,表示它正在成功运行。您现在可以在“downsampled”存储桶中查看数据。
降采样常见问题解答:使用 Flux 进行多次聚合
在深入探讨使用 Flux 进行多次聚合之前,让我们解决最明显的降采样问题:“我如何开始使用降采样?”。 这里有一个专门用于降采样 Telegraf 指标的 InfluxDB 模板。使用此模板是开始使用降采样的最简单方法。
此外,我看到许多用户问:“如何降采样我的数据以在每个聚合间隔从我的数据中检索平均值和值计数?”。关于执行多个聚合投影的这个问题经常来自 InfluxDB 1.x 用户,他们更熟悉 InfluxQL 而不是 Flux。例如,他们习惯于能够执行以下查询
select mean(temp) as temp_mean, count(temp) as temp_count from my_measurement
并将其转换为以下连续查询
CREATE CONTINUOUS QUERY "cq_basic" ON "my-bucket"
BEGIN
select mean(temp) as temp_mean, count(temp) as temp_count from my_measurement
GROUP BY time(36h), "mytag"
END
此查询和相应的 CQ 将同时返回两个聚合。在 InfluxDB 2.x 中,可视化多个聚合的最简单方法是在 UI 中的多个选项卡中编写两个单独的查询。
例如,让我们假设我们已经构建了一个表,并使用 array.from() 和 to() 函数将以下数据写入 InfluxDB Cloud。
import "experimental/array"
array.from(rows: [{_measurement: "my_measurement", mytag: "my_tag", _field: "temp", _value: 21.0, _time: 2020-01-01T00:00:00Z,},
{_measurement: "my_measurement", mytag: "my_tag", _field: "temp", _value: 23.5, _time: 2020-01-02T00:00:00Z}, {_measurement: "my_measurement", mytag: "my_tag", _field: "temp", _value: 20.5, _time: 2020-01-03T00:00:00Z}])
|>to(bucket: "my-bucket")
然后我们可以在两个选项卡中查询我们的数据。
两个查询是
- 平均值,蓝线
from(bucket: "my-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "my_measurement")
|> aggregateWindow(every: 36h, fn: mean, createEmpty: false)
|> yield(name: "mean")
- 计数,粉线
from(bucket: "my-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "my_measurement")
|> aggregateWindow(every: 36h, fn: count, createEmpty: false)
|> yield(name: "count")
每个查询都存储在 UI 的两个选项卡中,即 **Query 1** 和 **Query 2**。橙色圆圈中的 **eye** 图标指示是否可视化该选项卡中的查询。现在我们知道了如何同时可视化多个聚合,让我们学习如何将上面的 CQ 转换为降采样任务。
执行等效降采样任务的最简单方法是为每个聚合创建一个任务,如上节所述。但是,创建多个任务意味着多次查询数据。开发人员在创建降采样任务时应始终旨在减少查询引擎的工作负载。
当开发人员打算对大量数据子集执行多个数据转换时,他们可以通过隔离所有任务中查询的公共部分来减少查询引擎的工作负载。然后,可以将此部分转换为其自己的任务,在其之上执行剩余的转换。具体来说,如果您正在过滤一部分字段或使用正则表达式进行过滤,您肯定希望使用顶级任务执行该成本高昂的查询,以避免为每个后续聚合重复查询工作。高效的任务创建也与数据布局和模式设计密切相关。
然而,构建良好的降采样任务并不止于查询引擎优化方面的考虑。开发人员不仅必须考虑如何最有效地编写大量任务,还必须确保构建结果数据以实现高效消费。
- 例如,您可能希望利用 set() 函数在降采样任务中向您的数据添加标签。这样,您可以将数据写入同一存储桶和测量。这使用户可以轻松查询降采样数据。
- 例如,您可以附加第一个降采样任务以包含一个标签,该标签指定降采样任务的聚合类型。现在,用户可以通过过滤“agg_type”标签键轻松比较降采样数据。
option task = {name: "Downsampling CPU", every: 1m}
data = from(bucket: "my-bucket")
|> range(start: -2h)
|> filter(fn: (r) =>
(r["_measurement"] == "cpu"))
|> filter(fn: (r) =>
(r["_field"] == "usage_user"))
data
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
|> set(key: "agg_type",value: "mean_cpu")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["agg_type"])
最后,我们准备好编写 CQ 的 Flux 翻译。以下 Flux 任务最接近上述 CQ
option task = {name: "Downsampling CPU", every: 1m}
data = from(bucket: "my-bucket")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "my_measurement")
data
|> mean()
|> set(key: "agg_type",value: "mean_temp")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["agg_type"])
data
|> count()
|> set(key: "agg_type",value: "count_temp")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["agg_type"])
在此示例中,我们没有将初始过滤拆分为单独的任务,因为我们正在查询一个测量和一个字段“temp”。由于顶级查询返回的数据与我们的存储桶的数据相同,因此创建将此过滤写入新存储桶的任务将是冗余和浪费的。但是,现在我们可以在同一任务中执行两个聚合。我们还将聚合函数从 aggregateWindow() 更改为 mean() 和 count()。使用 mean() 和 count() 执行聚合使我们能够查询和聚合较短的周期或较小的数据集,从而提高查询效率。此外,缩短任务执行间隔的另一个权衡是降采样数据将更加最新。短任务执行间隔的一个缺点是它们对延迟到达的数据的鲁棒性较差。
题外话:Flux 的灵活性
Flux 是一种强大的查询语言。您可以利用 Flux 编写更复杂的查询来同时执行聚合。虽然上面的降采样方法可能更有效(这实际上取决于您的模式),但了解您在聚合数据时有多种选择是很有价值的。
- 您可以执行 union
data = from(bucket: "my-bucket")
|> range(start: 2019-12-31T00:00:00Z, stop: 2020-01-04T00:00:00Z)
|> filter(fn: (r) => r._measurement == "my_measurement")
|> filter(fn: (r) => r._field == "temp")
temp_mean = data
|> mean()
|> set(key: "agg_type", value: "mean")
temp_count = data
|> count()
|> toFloat()
|> set(key: "agg_type", value: "count")
union(tables: [temp_mean, temp_count])
|> group(columns: ["agg_type"], mode:"by")
|> yield()
此 Flux 查询允许您同时可视化您的两个聚合,因为每个聚合都在单独表中的自己的“_value”列中。这是 **Graph** 视图中的输出
这是 **Table** 视图中的输出
- 您可以执行 join
data = from(bucket: "my-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "my_measurement")
|> filter(fn: (r) => r._field== "temp")
mean = data |> aggregateWindow(every: 36h, fn: mean, createEmpty: false)
count = data |> aggregateWindow(every: 36h, fn: count, createEmpty: false)
join(tables: {mean: mean, count: count}, on: ["_time", "_field", "_measurement", "_start", "_stop"], method: "inner")
- 或者您可以使用 reduce() 函数创建自定义聚合。但是,最初使用 reduce() 函数可能会很棘手。要了解有关使用 reduce() 函数的技巧,请参阅此博客。以下 Flux 脚本使用 reduce() 函数同时为您执行两个聚合
from(bucket: "my-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "my_measurement")
|> filter(fn: (r) => r._field == "temp")
|> reduce(
fn: (r, accumulator) => ({
count: accumulator.count + 1,
sum: r._value + accumulator.sum,
mean: (accumulator.sum + r._value)/(float(v: accumulator.count + 1))
}), identity: {sum:0.0, count:0, mean:0.0 })
|> drop(columns: ["sum"])
Flux 脚本 2 和 3(上文)在一个表中创建了两列,其中包含两个聚合。您无法立即可视化两个输出。您可以使用 **Customize** 菜单选择要可视化的列。在这里,我们可视化 **Graph** 视图中的计数
下图是在 表格 视图中的输出。您可以看到创建了“count”和“mean”两列,并且它们在同一个表格中。
为您选择合适的查询在很大程度上取决于您的模式。UI 中的查询选项卡包含查询运行时间,这可以帮助您为您的模式选择最佳查询。
持续查询转换的资源
希望这篇 InfluxDB 技术提示博客文章能启发您利用 InfluxDB 2.x 的降采样任务。如果您决定从 1.x 升级到 2.x,并且在将您的持续查询转换为任务时需要帮助,请向我们寻求帮助!请在评论区、我们的 社区网站 或我们的 Slack 频道分享您的想法、疑虑或问题。我们很乐意获得您的反馈并帮助您解决遇到的任何问题!