TL;DR InfluxDB 技术技巧:从 TICKscripts 到 Flux Tasks
作者:Anais Dotis-Georgiou / 产品,用例,开发者
2022年4月26日
导航至
(更新: InfluxDB 3.0 已从 Flux 和内置任务引擎中移除。用户可以使用外部工具,如基于 Python 的 Quix,在 InfluxDB 3.0 中创建任务。)
如果您是 InfluxDB 1.x 的用户,您可能也是 Kapacitor 的用户。如果是这样,您也对 Kapacitor 的数据处理和转换语言——InfluxDB 的批处理和流处理工具——TICKscripts 很熟悉。Kapacitor 是一款非常好的工具,但它大部分是黑盒,因此使用和实施 TICKscripts 来执行数据处理任务、检查和通知可能会是一个具有挑战性的开发人员体验。幸运的是,InfluxDB OSS 2.x 和 InfluxDB Cloud 允许您在任务中执行许多这些转换和警报。任务是在用户定义的计划上执行的 Flux 脚本。Flux 是 InfluxDB 中的查询和数据转换语言。以下是使用 Flux 而不是 Kapacitor 进行数据转换的一些优势:
- 您不再需要维护和学习两种查询语言(InfluxQL 和 TICKscripts)来创建警报、执行连续查询(现在是下采样任务)以及执行数据转换。相反,您可以使用 Flux 来完成这项工作。
- 您不再在黑盒中工作。您可以使用 Flux 来可视化每个数据转换对您数据的影响。在按计划运行任务之前,您可以验证您的任务是否产生您预期的数据。
- Flux 比 TICKscripts 提供更多的函数。您还可以旋转数据、操作时间戳、编写自定义函数等。
但是,尝试用 Flux 任务替换 Kapacitor 工作负载有以下缺点:
- 无法执行 实时流处理
- 无法执行 基于套接字的 UDF
然而,InfluxDB Cloud和OSS 2.x与Kapacitor向后兼容,因此您可以将现有的Kapacitor工作负载始终迁移到InfluxDB Cloud或2.x。我们将在本帖末尾进行升级Kapacitor工作负载到Flux任务的成本效益分析。
本博客的目的是通过模式识别帮助您将TICKscripts转换为Flux。我们将翻译Kapacitor存储库中的示例目录中的示例。我鼓励您查看以下存储库,该存储库包含来自该示例目录的TICKscripts和Flux任务翻译。
本博客假设您对Flux的工作原理有基本了解。如果您是Flux的新手,我建议您首先阅读以下两篇博客
要深入理解Flux,理解以下内容非常重要
计算连接序列上的错误百分比的示例
在我们的第一个转换中,让我们将error_percent TICKscript转换为Flux任务。在这个TICKscript中,我们从两个不同的度量(错误和视图)查询数据,计算两个度量的值之和,将它们连接起来,计算错误百分比(错误总和 - 视图总和)/ 错误总和),然后将数据写入新的度量。让我们看看这两个脚本是如何相互映射的。
TICKscript在左边;Flux脚本转换在右边。
首先,您需要定义您的任务配置选项。记住,Flux任务目前仅适用于数据批次。指定every选项以确定任务运行的间隔。包括一个偏移量以避免读写冲突。offset选项延迟任务执行,但不会影响查询的数据范围。
every()属性相当于任务配置选项中的every选项。period()属性相当于我们在range()函数中指定的持续时间,或我们想要查询的数据量。使用aggregateWindow()函数对数据进行窗口化,并同时应用求和,而不是在TICKscript的InfluxQL部分中选择求和,然后应用.groupBy属性。使用fill()函数用0填充空的时间窗口。
我们对来自两个度量(错误和视图)的数据遵循此程序,并在Flux中将结果存储在变量中。然后使用join()函数将数据连接起来。现在我们可以使用map()函数执行百分比计算,并将其存储在名为“_value”的新列中。最后,我们使用set()函数代替as()属性和measurement()属性来设置度量的名称和新计算出的百分比。最后,使用to()函数而不是InfluxDBOut()节点将数据写入我们的新桶/数据库。
带有标准差的通用批量警报示例 – TICKscript
在 Kapacitor 中,一个通用的批量警报示例可能看起来像这样:
// Step 1: Define thresholds
var info = {info_level}
var warn = {warn_level}
var crit = {crit_level}
var infoSig = 1
var warnSig = 2
var critSig = 3
var period = 10s
var every = 10s
// Step 2: Query for data.
var data = batch
|query('''{InfluxQL_Query}''')
.period(period)
.every(every)
.groupBy('host')
// Step 3: Define the conditions to trigger the alert.
var alert = data
|eval(lambda: sigma("stat"))
.as('sigma')
.keep()
|alert()
.id('{{ index .Tags "host"}}/{alert_metric}')
.message('{{ .ID }}:{{ index .Fields "stat" }}')
.info(lambda: "stat" > info OR "sigma" > infoSig)
.warn(lambda: "stat" > warn OR "sigma" > warnSig)
.crit(lambda: "stat" > crit OR "sigma" > critSig)
// Step 4: Alert.
alert
.log('/tmp/{alert_name}_log.txt')
- 这个通用的批量警报 TICKscript 示例是一个阈值警报。它包含以下步骤:定义我们想要的原始值的标准差阈值。
- 查询我们的数据。
- 定义触发警报的条件。
- 警报。日志节点记录了所有的警报。在 Flux 中,这个警报端点不可用,但几乎每个事件处理器都有。请参阅完整列表这里。
- 在下一节中,我们将探讨 Flux 中的通用批量警报示例。我们将详细讨论一种方法,但重要的是要知道,在编写 Flux 警报时,你比 Kapacitor 警报有更多的灵活性。然而,更大的灵活性和功能需要用户自己做出一些决策。
带有标准差的通用批量警报示例 – Flux
让我们看看等效的 Flux 任务:
// Step 1: import Flux packages
import "influxdata/influxdb/monitor"
import "influxdata/influxdb/schema"
import "math"
// Step 2: define your task options.
// Always include an offset to avoid read and write conflicts. Period and every are defined by the every parameter.
option task = {
name: "generic",
every: 10s,
offset: 2s,
}
// Step 3: Define your thresholds.
infoVal = <info_level>
warnVal = <warn_level>
critVal = <crit_level>
infoSig = 1.0
warnSig = 2.0
critSig = 3.0
// Step 4: Query for data.
Data is grouped by tags or host by default so no need to groupBy('host') as with line 28 in generic_batch_example.tick
data = from(bucket: "<bucket>")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "<measurement>")
|> filter(fn: (r) => r.host == "hostValue1" or r.host == "hostValue2")
|> filter(fn: (r) => r._field == "stat")
// Step 5: Calculate the mean and standard deviation instead of .sigma and extract the scalar value.
// Calculate mean from sample and extract the value with findRecord()
mean_val = (data
|> mean(column: "_value")
// Insert yield() statements to visualize how your data is being transformed.
// |> yield(name: "mean_val")
|> findRecord(fn: (key) => true, idx: 0))._value
// Calculate standard deviation from sample and extract the value with findRecord()
stddev_val = (data
|> stddev()
// Insert yield() statements to visualize how your data is being transformed.
// |> yield(name: "stddev")
|> findRecord(fn: (key) => true, idx: 0))._value
// Step 6: Create a custom message to alert on data
alert = (level, type, eventValue) => {
slack.message(
// Will send alerts to the #notifications-testing channel in the InfluxData Slack Community
url: "https://hooks.slack.com/services/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 ",
text: "An alert \"${string(v: type)}\" event has occurred! The number of field values= \"${string(v: eventValue)}\".",
color: "warning",
)
return level
}
data
// Step 7: Map across values and return the number of stddev to the level as well as a custom slack message defined in the alert() function.
|> map(
fn: (r) => ({r with
level: if r._value < mean_val + math.abs(x: stddev_val) and r._value > mean_val - math.abs(x: stddev_val) or r._value > infoVal then
alert(level: 1, type: info, eventValue: r._value)
else if r._value < mean_val + math.abs(x: stddev_val) * float(v: 2) and r.airTemperature > mean_val - math.abs(x: stddev_val) * float(v: 2) or r._value > okVal then
alert(level: 2, type: ok, eventValue: r._value)
else if r._value < mean_val + math.abs(x: stddev_val) * float(v: 3) and r.airTemperature > mean_val - math.abs(x: stddev_val) * float(v: 3) or r._value > warnVal then
alert(level: 3, type: warn, eventValue: r._value)
else
alert(level: 4, type: crit, eventValue: r._value)
)
// Use the to() function to write the level created by the map() function if you desire. This is not shown.
这个通用的批量警报 Flux 示例是一个阈值警报。它包含以下步骤:
- 导入 Flux 包。Flux 代码组织成包含一个或多个函数的包。
- 定义你的任务选项。
- 定义你的阈值。
- 查询数据。
- 使用 findRecord() 从样本计算标准差并提取标量值。
- 创建一个用于警报数据的自定义消息。此函数返回标准差并调用 slack.message() 函数,该函数向 Slack 频道发送一条消息。警报() 函数还包括一个类型参数,用于标识警报的严重性。
- 第 6 步:映射每个时间戳的每个值,并将每个值与平均值的标准差数返回到名为 "level" 的新列。map() 函数评估每个值是否在平均值加减 1、2 或 3 个标准差之内,并调用自定义的 alert() 函数。这样,我们对每个值都发出警报。这不是一个实用的警报。在现实中,你可能只有在以下条件满足时才会调用自定义的 alert() 函数:
- 值超过警告或临界阈值。
- 值比平均值高出 3 或 4 个标准差。
重要的是要认识到关于 Flux 中带有标准差的通用批量警报示例的一些关键属性:
- 你可以映射数据并创建包含我们警报元数据的新列。在这个例子中,我们使用条件逻辑创建了一个描述我们的数据异常程度的 level 列。你可以添加任何你想要的额外列。
- 你可以定义自定义函数。
- 这仅仅是使用 Flux 任务执行此类逻辑的无数种方法之一。
让我们更详细地讨论第 3 点。Flux 还包含一个 监控包。此包包含与常用 TICKscript 节点(如 StateDurationNode 和 StateChangesOnly)等效的 Flux 方法,但它还包含 monitor.check() 和 monitor.notify() 函数。这些函数将大量有关你的检查和通知的元数据写入 _monitoring 桶。一些用户更喜欢在创建警报任务时使用这些函数,以便对警报有更多的可见性。使用监控包有两个主要方法:
- 使用 2 个 Flux 任务生成警报
-
- 第一个任务检查你的数据并评估你的数据是否满足警报条件。
- 第二个任务会发送警报消息或通知。
这是在 InfluxDB UI 中创建任务的方法。了解更多关于这种方法的信息 请点击此处。
- 使用 1 个 Flux 任务。将检查和通知工作合并到一个任务中。请参阅此方法的示例 请点击此处。
我们将在未来的文章中讨论今天我们学到的三种通用批量警报方法的优缺点。你现在应该得到的要点是 Flux 非常灵活。
成本效益分析
这种成本效益分析非常复杂。如果你正在进行大量的基于流的 Kapacitor 任务,我不建议切换到 InfluxDB Cloud 或 InfluxDB OSS 2.x。是的,存在向后兼容性,你可以在升级到最新版本的 InfluxDB 的同时运行这些 Kapacitor 任务。然而,在这种情况下的大多数用户更喜欢等待流功能可用在 Flux 任务中。主要的例外是当用户发现 Flux 允许他们以 Kapacitor 中不可能的方式处理和转换数据时。有时,对更高级分析的需求使 Kapacitor 用户越过了迁移实例并设置向后兼容性来处理他们的流任务的障碍。最后,一些用户认识到采用 InfluxDB 的最新版本并学习 Flux 将帮助他们确保他们的时序数据管道在未来不会被维护和训练 3 种不同的语言——InfluxQL、连续查询和 TICKscripts。
让我们通过突出每种工具的优缺点来稍作总结。
- 执行流处理的能力
- 创建基于套接字的 UDF 来使 Kapacitor 可扩展到其他语言
Kapacitor 和 InfluxDB 1.x 的缺点包括
- 用户体验具有挑战性。Kapacitor 完全是一个黑盒。它更难使用,也更难培训人员。
- 1.x 用户必须学习 InfluxQL、连续查询和 TICKscripts 来维护他们的时序数据。
使用 Flux 的优点包括
- 更多功能,包括以下能力
- 更易于开发者使用。
- 在每行 Flux 代码之后插入 yield() 语句,以查看每个函数是如何影响和转换您的数据的。使用 yield() 函数就像使用打印语句一样。
- 编写有关您的检查或转换工作的元数据的能力。
- 维护一种语言——Flux,用于您所有的时序管理,比维护和培训 InfluxQL、连续查询和 TICKscripts 更容易。
进一步阅读
本博客假设您已经对 Flux 有了一些了解,但如果您还没有,您可能对以下资源感兴趣
- 检查和通知:本节介绍了《从入门到精通InfluxDB》一书中关于InfluxDB中的检查和通知系统的工作原理以及UI如何生成检查和通知。
- 警报任务:这部分介绍了Flux中的另一个基本警报任务示例。
- 使用Flux进行查询:如果您是Flux的新手,这是一个非常好的资源。
- InfluxDB基础教程:这是一门针对新接触InfluxDB Cloud或InfluxDB 2.x用户的免费InfluxDB University课程。在这门课程中,您将了解InfluxDB的所有组件,包括UI、Telegraf和Flux。
- 入门Flux:另一门免费的InfluxDB University课程。
- TL;DR InfluxDB技术技巧:将InfluxQL查询转换为Flux查询:这是一个非常适合熟悉InfluxQL的Flux新手的好资源。