InfluxDB v2.0的降采样
作者:Anais Dotis-Georgiou / 产品,用例,开发者
2020年10月23日
导航到
(更新:InfluxDB 3.0已移除Flux和内置任务引擎。希望从2.x迁移到3.0的用户可以使用外部工具,如基于Python的Quix,在InfluxDB 3.0中创建任务。)
降采样是在时间窗口内对高分辨率时序数据进行聚合,并将低分辨率的聚合数据存储到新桶中的过程。例如,假设您有一个监测温度的物联网应用程序。您的温度传感器可能会收集温度数据。这些数据以每分钟间隔收集。在白天这对您来说非常有用。没有必要以这种分辨率保留历史温度数据,因此您执行降采样任务,仅保留以每小时精度计算的平均温度。这种降采样有助于在不牺牲不必要的磁盘空间的情况下保留数据的整体形状。降采样还可以通过减少作为查询一部分评估的数据量来加快对大时间范围的查询。
降采样的好处:磁盘使用量和查询性能
降采样是常见的时序数据库任务,通常是时序数据库用户的需求。降采样允许您在收集数据的过程中降低总体磁盘使用量。降采样还可以提高InfluxDB OSS或Cloud的查询性能。最后,InfluxDB每秒可以处理数百万个数据点。保留如此大量的数据不仅会引发存储问题,而且难以处理。降采样允许您消除高精度时序数据的噪声,从而更快地分析和从历史时序数据中提取价值。
InfluxDB 2.x中的降采样功能升级——从连续查询到降采样任务
随着InfluxDB从1.x升级到2.x,降采样功能也得到了提升。在InfluxDB 1.x中,用户通常使用连续查询在相对较小的范围内实现降采样。连续查询是通过Influx CLI工具创建、维护和更改的。连续查询也是一个独立的功能,但随着您想要降采样的数据量增加,它会对InfluxDB本身的读/写性能产生负面影响。在较大规模的数据处理中,建议将工作卸载到Kapacitor。
在InfluxDB 2.x中,您可以通过下采样任务来执行下采样。任务允许您执行各种数据处理作业。InfluxDB 2.x中的所有任务,包括下采样和查询,都使用一种语言编写 – Flux。您可以通过CLI和UI创建任务。在2.x中,通过UI管理任务是最容易执行数据处理作业的方法。能够使用一个工具和语言执行所有的数据转换也有助于您更好地组织和可视化您的时序管道。相比之下,在1.x的TICK堆栈中,当与Kapacitor、InfluxQL和连续查询语法一起工作时,您必须熟悉TICKscripts以创建相同的数据处理作业。
不同分辨率数据的保留策略
为了成功创建下采样任务,您需要有一个源桶和一个目标桶,每个桶有不同的保留策略。
- 源桶是您写入高精度数据的地方。
- 目标桶是您写入聚合数据的地方。
目标桶的保留策略应比源桶更长。
一个完整的下采样任务示例
您可以使用API或UI创建任务。创建任务的最简单方法是使用InfluxDB UI。在左侧导航栏的“任务”菜单下,点击“创建任务”。您可以创建新任务、导入任务,或从您已导入的现有模板中导入任务(请阅读此文档了解如何导入现有模板)。
创建任务和调试任务的推荐工作流程
想象一下,我需要在系统数据上执行下采样任务。为了演示目的,我将每分钟运行任务,并以10秒的间隔聚合数据。开始创建任务的最好方法是使用查询构建器或脚本编辑器创建最适合您需求的聚合。我创建了以下聚合
from(bucket: "my-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) =>
(r["_measurement"] == "cpu"))
|> filter(fn: (r) =>
(r["_field"] == "usage_user"))
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
// Use the to() function to validate that the results look correct. This is optional.
|> to(bucket: "downsampled", org: "my-org")
我们应该只需将Flux脚本保存为任务从“数据资源管理器”中保存。要从“数据资源管理器”保存查询作为任务,请点击左上角的“另存为”按钮。确保您已经创建了一个“下采样”桶,这样您就可以从下拉菜单中选择它。
然而,我们发现用户遇到了复制粘贴问题。我们将描述此问题以突出如何调试任务。但将查询导出为任务是推荐的工作流程。现在,我们的查询看起来很棒,我将复制并粘贴该查询到我的任务中。看起来像这样
option task = {name: "Downsampling CPU", every: 1m}
data = from(bucket: "my-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) =>
(r["_measurement"] == "cpu"))
|> filter(fn: (r) =>
(r["_field"] == "usage_user"))
data
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
|> to(bucket: "downsampled", org: "my-org")
您可以手动编写任务名称和间隔设置,或者可以使用UI指定这些配置选项。使用UI会自动生成我们任务的第一行。接下来,我将从脚本编辑器复制并粘贴我的代码到任务中。保存任务后,注意任务状态已失败。
要查看日志,请单击任务右侧的齿轮图标。选择“查看任务运行”。请注意,这里您还有将任务导出为JSON对象选项。这允许您轻松与他人共享任务,这在调试中很有用。
“查看任务运行”按钮会将您带到处理引擎尝试执行任务的所有时间的列表。
如果您点击 查看日志,您可以看到导致任务失败的错误。在这里,我们看到与任务运行失败相关的错误:“错误耗尽结果迭代器;Err:错误 @4:18-4:19:未定义标识符 v:错误 @4:18-4:19:未定义标识符 v”。
这个错误源于直接从脚本编辑器复制粘贴我们的Flux代码。我们需要将时间范围从引用仪表板时间的变量更改为实际时间。我们的任务结果如下所示
option task = {name: "Downsampling CPU", every: 1m}
data = from(bucket: "my-bucket")
|> range(start: -task.every)
|> filter(fn: (r) =>
(r["_measurement"] == "cpu"))
|> filter(fn: (r) =>
(r["_field"] == "usage_user"))
data
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
|> to(bucket: "downsampled", org: "my-org")
现在在保存任务后,您可以在任务旁边看到一个绿色的箭头,这表示任务正在成功运行。您现在可以查看“下采样”桶中的数据。
下采样常见问题解答:Flux中的多个聚合
在深入了解Flux中的多个聚合之前,让我们先解决最明显的下采样问题:“我如何开始进行下采样?” 这里 是一个专门针对下采样Telegraf指标的InfluxDB模板。使用 此模板 是开始下采样的最简单方法。
此外,我注意到许多用户询问:“我如何下采样我的数据,以在每个聚合间隔从我的数据中检索平均值和值计数?” 这个关于执行多个聚合投影的问题通常来自比Flux更熟悉InfluxQL的InfluxDB 1.x用户。例如,他们习惯于能够执行以下查询
select mean(temp) as temp_mean, count(temp) as temp_count from my_measurement
并将其转换为以下连续查询
CREATE CONTINUOUS QUERY "cq_basic" ON "my-bucket"
BEGIN
select mean(temp) as temp_mean, count(temp) as temp_count from my_measurement
GROUP BY time(36h), "mytag"
END
此查询和相应的CQ将同时返回两个聚合。在InfluxDB 2.x中,在UI中通过在多个标签页中编写两个单独的查询来可视化多个聚合是最简单的方法。
例如,让我们假设我们已经 构建了一个表格 并使用 array.from() 和 to() 函数将以下数据写入InfluxDB Cloud。
import "experimental/array"
array.from(rows: [{_measurement: "my_measurement", mytag: "my_tag", _field: "temp", _value: 21.0, _time: 2020-01-01T00:00:00Z,},
{_measurement: "my_measurement", mytag: "my_tag", _field: "temp", _value: 23.5, _time: 2020-01-02T00:00:00Z}, {_measurement: "my_measurement", mytag: "my_tag", _field: "temp", _value: 20.5, _time: 2020-01-03T00:00:00Z}])
|>to(bucket: "my-bucket")
然后我们可以在两个标签页中查询我们的数据。
这两个查询是
- 平均值,蓝色线
from(bucket: "my-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "my_measurement")
|> aggregateWindow(every: 36h, fn: mean, createEmpty: false)
|> yield(name: "mean")
- 计数,粉色线
from(bucket: "my-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "my_measurement")
|> aggregateWindow(every: 36h, fn: count, createEmpty: false)
|> yield(name: "count")
每个查询都存储在UI中的两个标签页中,即 查询 1 和 查询 2。橙色圆圈中的 眼睛 图标表示该标签页中的查询是否正在可视化。现在我们知道了如何同时可视化多个聚合,让我们了解如何将上面的CQ转换为下采样任务。
执行等效下采样任务的最简单方法是创建每个聚合一个任务,如上节所述。然而,创建多个任务意味着多次查询数据。开发者应该在创建下采样任务时始终旨在减少查询引擎的负载。
当开发者打算在大量数据上执行多个数据转换时,他们可以通过隔离所有任务中查询的共同部分来减少查询引擎的负载。这部分可以转换为其自己的任务,然后在此之上执行剩余的转换。具体来说,如果您正在过滤字段的一部分或使用正则表达式进行过滤,您绝对希望使用顶级任务执行此昂贵的查询,以避免每个后续聚合的重复查询工作。高效的任务创建也与 数据布局和模式设计 密切相关。
然而,构建一个好的下采样任务并不仅仅取决于查询引擎的优化考虑。开发者不仅需要考虑如何最有效地编写大量任务,还必须确保结果数据能够高效地被消费。
- 例如,您可能想利用set()函数在下采样任务中为您的数据添加标签。这样,您可以将数据写入相同的桶和测量。这使用户能够轻松查询下采样数据。
- 例如,您可以将第一个下采样任务附加一个标签,指定下采样任务的聚合类型。现在用户可以轻松通过筛选“agg_type”标签键来比较下采样数据。
option task = {name: "Downsampling CPU", every: 1m}
data = from(bucket: "my-bucket")
|> range(start: -2h)
|> filter(fn: (r) =>
(r["_measurement"] == "cpu"))
|> filter(fn: (r) =>
(r["_field"] == "usage_user"))
data
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
|> set(key: "agg_type",value: "mean_cpu")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["agg_type"])
最后,我们准备好编写CQ的Flux翻译。以下Flux任务与上面的CQ最为相似
option task = {name: "Downsampling CPU", every: 1m}
data = from(bucket: "my-bucket")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "my_measurement")
data
|> mean()
|> set(key: "agg_type",value: "mean_temp")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["agg_type"])
data
|> count()
|> set(key: "agg_type",value: "count_temp")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["agg_type"])
在此示例中,我们没有将初始过滤拆分为单独的任务,因为我们正在查询一个测量和一个字段,“temp”。由于顶级查询返回的数据与我们的桶数据相同,创建一个将此过滤写入新桶的任务将是冗余且浪费的。然而,现在我们可以在同一个任务内执行聚合。我们还已将聚合函数从aggregateWindow()更改为mean()和count()。使用mean()和count()进行聚合使我们能够查询和聚合更短的时间段或更小的数据集,从而提高查询效率。此外,缩短任务执行间隔的另一个权衡是下采样数据将更加及时。短任务执行间隔的缺点是它们对迟到数据的鲁棒性较差。
顺便一提:Flux灵活
Flux是一种强大的查询语言。您可以利用Flux编写更复杂的查询,同时执行聚合。虽然上述下采样方法可能更高效(这实际上取决于您的模式),但了解在聚合数据时您有多种选择是有价值的。
- 您可以进行一个union
data = from(bucket: "my-bucket")
|> range(start: 2019-12-31T00:00:00Z, stop: 2020-01-04T00:00:00Z)
|> filter(fn: (r) => r._measurement == "my_measurement")
|> filter(fn: (r) => r._field == "temp")
temp_mean = data
|> mean()
|> set(key: "agg_type", value: "mean")
temp_count = data
|> count()
|> toFloat()
|> set(key: "agg_type", value: "count")
union(tables: [temp_mean, temp_count])
|> group(columns: ["agg_type"], mode:"by")
|> yield()
此Flux查询允许您同时可视化两个聚合,因为每个聚合都位于一个单独的表中其自己的“_value”列中。以下是图形视图中的输出
以下是表格视图中的输出
- 您可以对join执行操作
data = from(bucket: "my-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "my_measurement")
|> filter(fn: (r) => r._field== "temp")
mean = data |> aggregateWindow(every: 36h, fn: mean, createEmpty: false)
count = data |> aggregateWindow(every: 36h, fn: count, createEmpty: false)
join(tables: {mean: mean, count: count}, on: ["_time", "_field", "_measurement", "_start", "_stop"], method: "inner")
- 或者,您可以使用reduce()函数来创建自定义聚合。不过,reduce()函数一开始可能有点棘手。有关使用reduce()函数的技巧,请参阅这篇博客。以下Flux脚本使用reduce()函数同时为您执行两个聚合
from(bucket: "my-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "my_measurement")
|> filter(fn: (r) => r._field == "temp")
|> reduce(
fn: (r, accumulator) => ({
count: accumulator.count + 1,
sum: r._value + accumulator.sum,
mean: (accumulator.sum + r._value)/(float(v: accumulator.count + 1))
}), identity: {sum:0.0, count:0, mean:0.0 })
|> drop(columns: ["sum"])
Flux脚本编号2和3(上面)在一张表中创建了两个包含所有聚合的列。您无法立即可视化这两个输出。您可以使用自定义菜单来选择要可视化的列。在这里,我们在图形视图中可视化计数
下面的图像是“表格”视图的输出。您可以看到创建了两个列,“计数”和“平均值”,并且它们位于同一张表中。
选择适合您的查询很大程度上取决于您的模式。UI中的查询选项卡包含查询运行时间,这可以帮助您为您的模式选择最佳查询。
持续查询翻译的资源
希望这篇InfluxDB技术技巧博客文章能激发您利用InfluxDB 2.x的降采样任务。如果您决定从1.x升级到2.x并需要帮助将连续查询转换为任务,请向我们寻求帮助!在评论部分、我们的社区网站或我们的Slack频道中分享您的想法、担忧或问题。我们很高兴收到您的反馈并帮助您解决遇到的问题!