使用 Kapacitor 丰富您的数据

导航至

有时,我们看到了社区围绕基于业务周期查询 InfluxDB 的请求,例如典型的业务日或拆分为班次周期。考虑以下请求:我如何仅针对周一至周五早上8点至下午5点的工作时间,对整个8月份的数据进行汇总?InfluxQL 目前没有基于时间过滤的任何功能。我们只能

SELECT * FROM "mymeasurement" WHERE time >= '2017-08-01 08:00:00.000000' and time <= '2017-08-31 17:00:00.000000';

那么我们如何实现这一点呢?提供的 Telegraf 插件通常只是发送一个时间戳值,并且还能够发送一些与配置的插件相关的指标和标签的静态标签。解决方案是使用 Kapacitor 作为“预处理程序”来“装饰”或丰富您的数据,以包含一个表示您想要查询的时间段的计算值。

出于本文目的,我们在本地主机上运行 Telegraf、InfluxDB 和 Kapacitor,但在完整环境中,这些将在不同的主机上运行。

第一步是配置 Telegraf 将数据写入 Kapacitor 而不是直接写入 InfluxDB。在 telegraf.conf 文件的 [[outputs.influxdb]] 部分中,有 3 个关键设置需要考虑

[[outputs.influxdb]]
urls = [https://127.0.0.1:9092]
database = "kap_telegraf"
retention_policy = "autogen"

urls 参数必须指向端口 9092(Kapacitor 的默认监听端口),而不是 InfluxDB 的端口 8086。数据库参数应指向一个不存在的数据库(可以忽略 Telegraf 关于数据库未找到的警告)。retention_policy 参数应设置为“autogen”或您之前在实例中创建的特定保留策略。

注意:将 retention_policy 设置为“”(默认)与设置为“autogen”不同,后者是初始化 InfluxDB 时指定的默认保留策略。

telegraf.conf 中的所有其他设置都可以根据您的实例正常配置。

下一步是创建一个 TICKscript,该脚本将处理来自 Telegraf 的数据。在此示例中,我们感兴趣的是创建一个包含布尔值的标签,如果数据点在我们上面描述的工作时间内,则该值为真。

stream
  |from()
   .database('kap_telegraf')
  |eval(lambda: if(((weekday("time") >= 1 AND weekday("time") <= 5) AND (hour("time") >= 8 AND (hour("time")*100+minute("time")) <= 1700)), 'true', 'false'))
     .as('business_hours')
     .tags('business_hours')
     .keep()
  |delete()
     .field('business_hours')
  |influxDBOut()
    .database('telegraf')
    .retentionPolicy('autogen')
    .tag('kapacitor_augmented','true')

在这段TICKscript中,我们从上面在telegraf.conf文件中配置的不存在的数据库“kap_telegraf”进行流式传输。对于stream()节点中的.from()方法,只需要数据库匹配即可。然后,我们将控制权传递给一个eval()节点,该节点将评估点是否到达我们设计的窗口中。在这种情况下,我们使用weekday()hour()minute()函数(如https://docs.influxdb.org.cn/kapacitor/v1.3/tick/expr/#time-functions所述)来评估“时间”值。条件的第一部分评估星期几是否在周一(1)和周五(5)之间。条件的第二部分评估小时值,注意我们要在17点(下午5点)的“00”刻停止。为此,我们将小时数乘以100,并将minute()函数的结果加到一起,以与结束时间1700进行比较。如果time值在这个范围内,eval()节点将返回true,作为名为business_hours的字段。然而,由于我们想查询这个值,我们应该将其作为标签,因此我们链式调用.tags()方法将值转换为名为business_hours的标签。

重要提示:eval()节点将消除流中的所有其他字段和标签,因此我们想要使用.keep()来保留这些值。

在此阶段,我们有一个名为business_hours的字段和标签,包含eval()节点的输出。我们应该通过调用指定要删除的字段的delete()节点来将其从流中过滤出来。

最后,我们将流控制传递给一个指定要写入的目标数据库和保留策略的influxDBOut()节点。我们为这个例子添加了一个额外的静态标签kapacitor-augmented。所有其他数据,如度量名称,都通过传递,并且只需提供新信息。在这种情况下,我们将写入telegraf数据库和autogen保留策略。

一旦创建了TICKscript(如下面的businesshours.tick所引用),我们必须告诉Kapacitor我们想运行它。对于这个例子,我们将使用Kapacitor CLI来配置任务。

$ kapacitor define business_hours -type stream -dbrp kap_telegraf.autogen -tick business_hours.tick

此命令将任务定义为名为business_hours的流类型,监听写入kap_telegraf.autogen的操作,这是在Telegraf中配置的数据库名称和保留策略名称。一旦我们成功创建任务,我们需要通过Kapacitor启用它以进行处理。

$ kapacitor enable business_hours

要显示状态,我们可以要求Kapacitor列出当前任务

$ kapacitor list tasks
ID             Type      Status    Executing Databases and Retention Policies
business_hours stream    enabled   true      ["kap_telegraf"."autogen"]

一旦数据开始通过Kapacitor流向InfluxDB,你就可以在第一个指定的查询中添加你的条件AND business_hours='true'

SELECT * FROM "mymeasurement" WHERE time >= '2017-08-01 08:00:00.000000' and time <= '2017-08-31 17:00:00.000000' AND business_hours='true';

总之,我们展示了如何使用Kapacitor将来自Telegraf的数据丰富到InfluxDB的一个简单示例。这种方法可以用来添加其他类型的数据,可能无需创建定制的Telegraf插件来满足你的业务需求。