结合 Kapacitor 和连续查询

导航至

Kapacitor 可以执行与 InfluxDB 中的连续查询相同的工作。今天,我们将探讨使用 Kapacitor 而不是连续查询的原因、Kapacitor 与连续查询的对比,以及使用 Kapacitor 进行 CQ 类型工作负载的基础知识。

什么是 Kapacitor?它是 TICK 堆栈中的“K”。Kapacitor 是 InfluxDB 的原生数据处理引擎。它可以处理来自 InfluxDB 的流数据和批处理数据。Kapacitor 允许您插入自己的自定义逻辑或用户定义的函数来处理具有动态阈值的警报、匹配指标以查找模式或计算统计异常。

一个例子

首先,让我们采用一个简单的 CQ,并将其重写为 Kapacitor TICKscript。

这是一个 CQ,它计算 `cpu.usage_idle` 每 5 分钟的平均值,并将其存储在新的 measurement `mean_cpu_idle` 中。

CREATE CONTINUOUS QUERY cpu_idle_mean ON telegraf BEGIN SELECT mean("usage_idle") as usage_idle INTO mean_cpu_idle FROM cpu GROUP BY time(5m),* END

要使用 Kapacitor 执行相同的操作,这是一个流式 TICKscript。

stream
    |from()
        .database('telegraf')
        .measurement('cpu')
        .groupBy(*)
    |window()
        .period(5m)
        .every(5m)
        .align()
    |mean('usage_idle')
        .as('usage_idle')
    |influxDBOut()
        .database('telegratelegraff')
        .retentionPolicy('default')
        .measurement('mean_cpu_idle')
        .precision('s')

同样的事情也可以在 Kapacitor 中作为批处理任务完成。

batch
    |query('SELECT mean(usage_idle) as usage_idle FROM "telegraf"."default".cpu')
        .period(5m)
        .every(5m)
        .groupBy(*)
    |influxDBOut()
        .database('telegraf')
        .retentionPolicy('default')
        .measurement('mean_cpu_idle')
        .precision('s')

这三种方法都将产生相同的结果。

问题

在这一点上,我们应该回答几个问题

  • 我们何时应该使用 Kapacitor 而不是 CQ?
  • 我们何时应该在 Kapacitor 中使用流任务而不是批处理任务?

我们何时应该使用 Kapacitor 而不是 CQ?

有几个理由使用 Kapacitor 而不是 CQ。

  • 您正在执行大量的 CQ,并且想要隔离工作负载。通过使用 Kapacitor 执行聚合,InfluxDB 的性能概况可以保持更稳定,并与 Kapacitor 的性能概况隔离。
  • 您需要做的不仅仅是执行查询,例如,您可能只想存储来自聚合的异常值,而不是全部。Kapacitor 可以使用数据做更多的事情,因此您在转换数据方面具有更大的灵活性。

在某些用例中,使用 CQ 几乎总是合理的。

  • 为保留策略执行降采样。这是 CQ 设计的目的,并且做得很好。如果您不需要,则无需在您的基础设施中添加另一个移动部件(即 Kapacitor)。保持简单。
  • 您只有少量的 CQ,再次保持简单,除非您需要,否则不要在您的设置中添加更多移动部件。

我们何时应该在 Kapacitor 中使用流任务而不是批处理任务?

基本上,答案归结为两件事:可用 RAM 和使用的时间段。

流任务必须将所有数据保存在 RAM 中指定的期间。如果此期间对于可用 RAM 来说太长,那么您首先需要将数据存储在 InfluxDB 中,然后使用批处理任务进行查询。流任务确实有一个轻微的优势,因为它可以监视数据流,因此它通过数据上的时间戳来理解时间。因此,对于给定点是否会进入窗口,不存在竞争条件。如果您使用的是批处理任务,则某个点仍然可能迟到并在窗口中遗漏。

另一个例子:创建连续查询以跨保留策略进行降采样

CREATE CONTINUOUS QUERY cpu_idle_median ON telegraf BEGIN SELECT median("usage_idle") as usage_idle INTO "telegraf"."sampled_5m"."median_cpu_idle" FROM "telegraf"."default"."cpu" GROUP BY time(5m),* END

流式 TICKscript

stream
    |from()
        .database('telegraf')
        .retentionPolicy('default')
        .measurement('cpu')
        .groupBy(*)
    |window()
        .period(5m)
        .every(5m)
        .align()
    |median('usage_idle')
        .as('usage_idle')
    |influxDBOut()
        .database('telegraf')
        .retentionPolicy('sampled_5m')
        .measurement('median_cpu_idle')
        .precision('s')

和批处理 TICKscript

batch
    |query('SELECT median(usage_idle) as usage_idle FROM "telegraf"."default"."cpu"')
        .period(5m)
        .every(5m)
        .groupBy(*)
    |influxDBOut()
        .database('telegraf')
        .retentionPolicy('sampled_5m')
        .measurement('median_cpu_idle')
        .precision('s')

总结

Kapacitor 是一个强大的工具,如果您需要更多功能,请使用它。如果不需要,请继续使用 CQ,直到您需要为止。有关更多信息和从 InfluxQL 查询编写 TICKscript 的帮助,请查看 Kapacitor 中的 InfluxQL 节点上的文档。InfluxDB 查询语言中可用的每个函数在 Kapacitor 中都可用,因此您可以将任何查询转换为 Kapacitor TICKscript。

下一步是什么

  • 下载 TICK 堆栈!
  • 需要帮助从 0.8x 或 0.9x 迁移到 0.11?我们随时为您提供帮助!请发送邮件至 [email protected] 以开始您的迁移项目。
  • 想要提升您的 InfluxDB 知识水平?查看我们免费且经济实惠的虚拟公开培训。