TL;DR InfluxDB 技术提示:从 TICKscripts 到 Flux 任务

导航至

更新InfluxDB 3.0 放弃了 Flux 和内置任务引擎。用户可以使用外部工具,例如基于 Python 的 Quix,在 InfluxDB 3.0 中创建任务。)

如果您是 InfluxDB 1.x 用户,您可能也是 Kapacitor 用户。如果是这样,您也一定熟悉 TICKscripts,Kapacitor 的数据处理和转换语言,Kapacitor 是 InfluxDB 的批处理和流处理程序。Kapacitor 是一个很棒的工具,但它在很大程度上是一个黑匣子,因此使用和实施 TICKscripts 来执行数据处理任务、检查和通知可能对开发者来说是一项具有挑战性的体验。幸运的是,InfluxDB OSS 2.x 和 InfluxDB Cloud 允许您在任务中执行许多此类转换和警报。任务是 Flux 脚本,在用户定义的计划上执行。Flux 既是 InfluxDB 中的查询语言,也是数据转换语言。以下是使用 Flux 而不是 Kapacitor 进行数据转换工作的一些优势

  • 您不再需要维护和学习两种查询语言(InfluxQL 和 TICKscripts)来创建警报、执行持续查询(现在是降采样任务)以及执行数据转换。相反,您可以使用 Flux 完成这项工作。
  • 您不再在黑匣子内工作。您可以使用 Flux 可视化每个数据转换对您的数据的影响。您可以在按计划运行任务之前验证您的任务是否正在生成您期望的数据。
  • 与 TICKscripts 相比,您可以使用 Flux 访问更多功能。您还可以透视数据、操作时间戳、编写自定义函数等等。

然而,尝试用 Flux 任务替换您的 Kapacitor 工作负载具有以下缺点

但是,InfluxDB Cloud 和 OSS 2.x 向后兼容 Kapacitor,因此您始终可以将现有的 Kapacitor 工作负载移植到 InfluxDB Cloud 或 2.x。我们将在本文末尾对将 Kapacitor 工作负载升级到 Flux 任务进行成本效益分析。

本博客的目的是通过模式识别帮助您将 TICKscripts 转换为 Flux。我们将翻译 Kapacitor 仓库中的示例目录中的示例。我鼓励您查看以下仓库,其中包含该示例目录中的 TICKscripts 和 Flux 任务翻译。

本博客假设您对 Flux 的工作原理有基本的了解。如果您是 Flux 的新手,我建议您先阅读以下两篇博客

  1. TL;DR InfluxDB 技术提示:将 InfluxQL 查询转换为 Flux 查询
  2. TL;DR InfluxDB 技术提示 – 从子查询到 Flux!

要深入了解 Flux,重要的是要理解

  1. 语言的基础知识
  2. 带注释的 CSV,Flux 查询的输出格式

计算连接序列上的错误百分比示例

对于我们的第一个翻译,让我们将 error_percent TICKscript 转换为 Flux 任务。在此 TICKscript 中,我们从两个不同的测量(errors 和 views)中查询数据,查找来自这两个测量的值的总和,将它们连接起来,计算错误百分比((错误总和 - 视图总和)/ 错误总和),然后将数据写入新的测量。让我们看看这两个脚本如何相互映射。

Example calculating error percent on Joined series

TICKscript 在左侧;Flux 脚本转换在右侧。

首先,您需要定义您的任务配置选项。请记住,Flux 任务当前仅对批处理数据进行操作。指定 every选项以确定任务运行的间隔。包括偏移量以避免读取和写入冲突。offset 选项延迟任务执行,但对正在查询的数据范围没有影响。

every() 属性等效于任务配置选项中的 every 选项。period() 属性等效于我们在 range() 函数中指定的时间长度,或者我们要查询多少数据。使用 aggregateWindow() 函数来窗口化我们的数据并同时应用总和,而不是在 TICKscript 的 InfluxQL 部分中选择总和,然后应用 .groupBy 属性。使用 fill() 函数用 0 填充空的时间窗口。

我们对来自两个测量的数据(错误和视图)都遵循此过程,并将结果存储在 Flux 中的变量中。然后使用 join() 函数将数据连接在一起。现在我们可以使用 map() 函数来执行百分比计算并将其存储在新列“_value”中。最后,我们使用 set() 函数代替 as() 属性和 measurement() 属性来设置测量的名称和新计算的百分比。最后,使用 to() 函数而不是 InfluxDBOut() 节点将数据写入我们的新存储桶/数据库。

带有标准差的通用批处理警报示例 – TICKscript

Kapacitor 中的通用批处理警报示例可能如下所示

// Step 1: Define thresholds
var info = {info_level} 
var warn = {warn_level}
var crit = {crit_level}
var infoSig = 1
var warnSig = 2
var critSig = 3
var period = 10s
var every = 10s

// Step 2: Query for data. 
var data = batch
  |query('''{InfluxQL_Query}''')
    .period(period)
    .every(every)
    .groupBy('host')

// Step 3: Define the conditions to trigger the alert.
var alert = data
  |eval(lambda: sigma("stat"))
    .as('sigma')
    .keep()
  |alert()
    .id('{{ index .Tags "host"}}/{alert_metric}')
    .message('{{ .ID }}:{{ index .Fields "stat" }}')
    .info(lambda: "stat" > info OR "sigma" > infoSig)
    .warn(lambda: "stat" > warn OR "sigma" > warnSig)
    .crit(lambda: "stat" > crit OR "sigma" > critSig)

// Step 4: Alert.
alert
  .log('/tmp/{alert_name}_log.txt')
  1. 此通用批处理警报 TICKscript 示例是一个阈值警报。它包含以下步骤:定义我们想要的原始值的阈值以及标准差阈值。
  2. 查询我们的数据。
  3. 定义触发警报的条件。
  4. 警报。log 节点记录您的所有警报。此警报端点在 Flux 中不可用,但几乎每个事件处理程序都可用。请参阅此处的完整列表。
  5. 在下一节中,我们将查看 Flux 中的通用批处理警报示例。我们将详细讨论一种方法,但重要的是要知道,在编写 Flux 警报时,您比使用 Kapacitor 警报时拥有更大的灵活性。但是,更大的灵活性和功能要求用户就最适合自己的选项做出一些决定。

带有标准差的通用批处理警报示例 – Flux

让我们看一下等效的 Flux 任务

// Step 1: import Flux packages
import "influxdata/influxdb/monitor"
import "influxdata/influxdb/schema"
import "math"

// Step 2: define your task options. 
// Always include an offset to avoid read and write conflicts. Period and every are defined by the every parameter.
option task = {
name: "generic",
every: 10s,
offset: 2s,
}

// Step 3: Define your thresholds.
infoVal = <info_level>
warnVal = <warn_level>
critVal = <crit_level>
infoSig = 1.0
warnSig = 2.0
critSig = 3.0

// Step 4: Query for data.
Data is grouped by tags or host by default so no need to groupBy('host') as with line 28 in generic_batch_example.tick
data = from(bucket: "<bucket>")
   |> range(start: -task.every)
   |> filter(fn: (r) => r._measurement == "<measurement>")
   |> filter(fn: (r) => r.host == "hostValue1" or r.host == "hostValue2")
   |> filter(fn: (r) => r._field == "stat")

// Step 5: Calculate the mean and standard deviation instead of .sigma and extract the scalar value. 

// Calculate mean from sample and extract the value with findRecord()
mean_val = (data
   |> mean(column: "_value")
   // Insert yield() statements to visualize how your data is being transformed. 
   // |> yield(name: "mean_val")
   |> findRecord(fn: (key) => true, idx: 0))._value

// Calculate standard deviation from sample and extract the value with findRecord()
stddev_val = (data
   |> stddev()
   // Insert yield() statements to visualize how your data is being transformed. 
   // |> yield(name: "stddev")
   |> findRecord(fn: (key) => true, idx: 0))._value

// Step 6: Create a custom message to alert on data
alert = (level, type, eventValue)  => {
slack.message(
      // Will send alerts to the #notifications-testing channel in the InfluxData Slack Community
      url: "https://hooks.slack.com/services/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 ",
      text: "An alert \"${string(v: type)}\" event has occurred! The number of field values= \"${string(v: eventValue)}\".",
      color: "warning",
      )
      return level
      }
data
   // Step 7: Map across values and return the number of stddev to the level as well as a custom slack message defined in the alert() function.
   |> map(
       fn: (r) => ({r with
level: if r._value < mean_val + math.abs(x: stddev_val) and r._value > mean_val - math.abs(x: stddev_val) or r._value > infoVal then
             alert(level: 1, type: info, eventValue: r._value)
           else if r._value < mean_val + math.abs(x: stddev_val) * float(v: 2) and r.airTemperature > mean_val - math.abs(x: stddev_val) * float(v: 2) or r._value > okVal then
             alert(level: 2, type: ok, eventValue: r._value)
           else if r._value < mean_val + math.abs(x: stddev_val) * float(v: 3) and r.airTemperature > mean_val - math.abs(x: stddev_val) * float(v: 3) or r._value > warnVal then
             alert(level: 3, type: warn, eventValue: r._value)
           else
              alert(level: 4, type: crit, eventValue: r._value)
)

   // Use the to() function to write the level created by the map() function if you desire. This is not shown.

此通用批处理警报 Flux 示例是一个阈值警报。它包含以下步骤

  1. 导入 Flux 包。Flux 代码被组织成包含一个或多个函数的包。
  2. 定义您的任务选项。
  3. 定义您的阈值。
  4. 查询数据。
  5. 从样本计算标准差,并使用 findRecord() 提取标量值。
  6. 创建自定义消息以警报数据。此函数返回标准差并调用 slack.message() 函数,该函数向 Slack 频道发送一条消息。alert() 函数还包括一个 type 参数,用于标识警报的严重性。
  7. 步骤 6:跨每个时间戳的每个值进行映射,并返回每个值与平均值的标准差偏离程度,到一个名为“level”的新列。map() 函数评估每个值是否在距平均值 1、2 或 3 个标准差之内,并调用自定义 alert() 函数。这样,我们就会对每个值发出警报。这不是一个实用的警报。实际上,您可能只会在满足以下条件时调用自定义 alert() 函数
    • 该值超过警告或严重阈值。
    • 该值与平均值的偏差超过 3 或 4 个标准差。

重要的是要认识到关于 Flux 中带有标准差的通用批处理警报示例的一些关键属性

  1. 您可以跨数据进行映射并创建包含关于我们警报的元数据的新列。在此示例中,我们创建一个级别列,该列使用条件逻辑描述了我们数据的异常程度。您可以添加带有您想要的任何元数据的其他列。
  2. 您可以定义自定义函数。
  3. 这只是您可以使用 Flux 任务执行此类逻辑的无数种方式中的一个示例。

让我们更详细地讨论第 3 点。Flux 还包含一个 监控包。此包包含的方法是常用 TICKscript 节点(如 StateDurationNode 和 StateChangesOnly)的 Flux 等效项,但它也包含 monitor.check() 和 monitor.notify() 函数。这些函数将大量关于您的检查和通知的元数据写入 _monitoring 存储桶。一些用户在创建警报任务时更喜欢使用这些函数,以便更深入地了解他们的警报。使用监控包时有两种主要方法

  1. 使用 2 个 Flux 任务来生成警报
    • 第一个任务检查您的数据并评估您的数据是否满足警报条件。
    • 第二个任务发送警报消息或通知。

这是在 InfluxDB UI 中创建任务的方式。在此处了解有关此方法的更多信息。

  1. 使用 1 个 Flux 任务。将检查和通知工作合并到一个任务中。在此处查看此方法的示例。

我们将在以后的文章中讨论我们今天了解到的三种通用批处理警报方法的优缺点。您现在应该掌握的主要要点是 Flux 非常灵活。

成本效益分析

此成本效益分析很复杂。如果您正在执行大量基于流的 Kapacitor 任务,我不建议切换到 InfluxDB Cloud 或 InfluxDB OSS 2.x。是的,存在向后兼容性,您可以在升级到最新版本的 InfluxDB 的同时运行这些 Kapacitor 任务。但是,在这种情况下,大多数用户更喜欢等到 Flux 任务中提供流功能。主要例外是当用户发现 Flux 允许他们以 Kapacitor 中不可能的方式处理和转换数据时。有时,对更高级分析的需求使用户克服了必须迁移他们的实例并设置向后兼容性来处理他们的流任务的障碍。最后,一些用户认识到,采用最新版本的 InfluxDB 并学习 Flux 将有助于他们面向未来地保护他们的时间序列数据管道,避免维护和培训 3 种不同的语言——InfluxQL、持续查询、TICKscripts。

让我们花一点时间回顾一下,重点介绍每种工具的优点和缺点。使用 Kapacitor 和 InfluxDB 1.x 的优点包括

  • 执行流处理的能力
  • 创建基于套接字的 UDF 以使 Kapacitor 可扩展到其他语言的能力

Kapacitor 和 InfluxDB 1.x 的缺点包括

  • 具有挑战性的用户体验。Kapacitor 非常像一个黑匣子。它更难使用,也更难培训人们使用。
  • 1.x 用户必须学习 InfluxQL、持续查询和 TICKscripts 才能维护其时间序列数据。

使用 Flux 的优点包括

延伸阅读

本博客假设您已经有一些 Flux 经验,但如果您没有,您可能会对以下资源感兴趣

  • 检查和通知:《Time to Awesome》一书的这一部分描述了 InfluxDB 中的检查和通知系统的工作原理,以及 UI 如何生成检查和通知。
  • 警报任务:这涵盖了 Flux 中基本警报任务的另一个示例。
  • 使用 Flux 查询:如果您是 Flux 的新手,这是一个很棒的资源。
  • InfluxDB 基础知识:这个免费的 InfluxDB 大学课程对于 InfluxDB Cloud 或 InfluxDB 2.x 的新手来说是一个很棒的资源。在本课程中,您将全面了解 InfluxDB 的所有组件,包括 UI、Telegraf 和 Flux。
  • Flux 入门:另一个免费的 InfluxDB 大学课程。
  • TL;DR InfluxDB 技术提示:将 InfluxQL 查询转换为 Flux 查询:对于熟悉 InfluxQL 的 Flux 初学者来说,这是一个很棒的资源。