结合 Kapacitor 和连续查询

导航至

Kapacitor 可以执行与 InfluxDB 中的连续查询相同的工作。今天,我们将探讨使用一个而非另一个的理由,即 Kapacitor 与连续查询的比较,以及使用 Kapacitor 进行 CQ 类型工作负载的基本方法。

什么是 Kapacitor?它是 TICK 栈中的“K”。Kapacitor 是 InfluxDB 的本地数据处理引擎。它可以从 InfluxDB 处理流式和批量数据。Kapacitor 允许您插入自己的自定义逻辑或用户定义的函数来处理具有动态阈值的警报、匹配指标模式或计算统计异常。

一个示例

首先,让我们将一个简单的 CQ 重新编写为 Kapacitor 的 TICKscript。

这是一个每 5 分钟计算一次 cpu.usage_idle 的平均值并将其存储在新测量 mean_cpu_idle 中的连续查询。

CREATE CONTINUOUS QUERY cpu_idle_mean ON telegraf BEGIN SELECT mean("usage_idle") as usage_idle INTO mean_cpu_idle FROM cpu GROUP BY time(5m),* END

使用 Kapacitor 执行相同操作,这里是一个流式 TICKscript。

stream
    |from()
        .database('telegraf')
        .measurement('cpu')
        .groupBy(*)
    |window()
        .period(5m)
        .every(5m)
        .align()
    |mean('usage_idle')
        .as('usage_idle')
    |influxDBOut()
        .database('telegratelegraff')
        .retentionPolicy('default')
        .measurement('mean_cpu_idle')
        .precision('s')

同样,也可以在 Kapacitor 中将其作为批量任务执行。

batch
    |query('SELECT mean(usage_idle) as usage_idle FROM "telegraf"."default".cpu')
        .period(5m)
        .every(5m)
        .groupBy(*)
    |influxDBOut()
        .database('telegraf')
        .retentionPolicy('default')
        .measurement('mean_cpu_idle')
        .precision('s')

这三种方法都会产生相同的结果。

问题

在这个阶段,我们应该回答几个问题

  • 何时应使用 Kapacitor 而非连续查询?
  • 何时应使用流任务与 Kapacitor 中的批量任务?

何时应使用 Kapacitor 而非连续查询?

有一些原因使用 Kapacitor 而非连续查询。

  • 您正在执行大量连续查询,并希望隔离工作负载。通过使用 Kapacitor 来执行聚合,InfluxDB 的性能配置文件可以保持更稳定,并从 Kapacitor 的隔离。
  • 您需要做的不仅仅是执行查询,例如,可能您只想存储聚合中的异常值,而不是全部。Kapacitor 可以比连续查询更有效地处理数据,因此您在转换数据方面有更大的灵活性。

有一些用例,使用连续查询几乎总是合理的。

  • 执行保留策略的下采样。这是连续查询设计用于并做得很好的。如果您不需要它,就没有必要在您的基础设施中添加另一个移动部件(即 Kapacitor)。保持简单。
  • 您只有少数几个连续查询,再次保持简单,除非您需要,否则不要向您的设置中添加更多移动部件。

何时应使用流任务与 Kapacitor 中的批量任务?

基本上,答案归结为两个因素:可用的 RAM 和使用的时间段。

流任务需要在指定的时间内将所有数据保持在RAM中。如果这个时间段对于可用的RAM来说太长,那么您首先需要将数据存储到InfluxDB中,然后使用批处理任务进行查询。流任务有一个微小的优势,即因为它正在监控数据流,所以它通过数据的时间戳来理解时间。因此,不存在某个点是否进入窗口的竞争条件。如果您正在使用批处理任务,则仍然有可能某个点到达较晚并且在一个窗口中被遗漏。

另一个例子:创建一个连续查询以跨保留策略进行下采样

CREATE CONTINUOUS QUERY cpu_idle_median ON telegraf BEGIN SELECT median("usage_idle") as usage_idle INTO "telegraf"."sampled_5m"."median_cpu_idle" FROM "telegraf"."default"."cpu" GROUP BY time(5m),* END

流TICK脚本

stream
    |from()
        .database('telegraf')
        .retentionPolicy('default')
        .measurement('cpu')
        .groupBy(*)
    |window()
        .period(5m)
        .every(5m)
        .align()
    |median('usage_idle')
        .as('usage_idle')
    |influxDBOut()
        .database('telegraf')
        .retentionPolicy('sampled_5m')
        .measurement('median_cpu_idle')
        .precision('s')

以及批处理TICK脚本

batch
    |query('SELECT median(usage_idle) as usage_idle FROM "telegraf"."default"."cpu"')
        .period(5m)
        .every(5m)
        .groupBy(*)
    |influxDBOut()
        .database('telegraf')
        .retentionPolicy('sampled_5m')
        .measurement('median_cpu_idle')
        .precision('s')

总结

Kapacitor是一个强大的工具,如果您需要更多的功能,请使用它。如果不使用,请继续使用CQs,直到您需要为止。有关如何从InfluxQL查询编写TICK脚本的信息和帮助,请参阅Kapacitor中InfluxQL节点的这些文档。InfluxDB查询语言中的每个函数在Kapacitor中都是可用的,因此您可以将其转换为任何查询的Kapacitor TICK脚本。

接下来是什么

  • 下载 TICK堆栈!
  • 需要从0.8x或0.9x迁移到0.11的帮助?我们在这里为您提供帮助!请通过[email protected]联系我们,开始您的迁移项目。
  • 想要提升您的InfluxDB知识?请查看我们的免费和价格合理的虚拟公开培训。