使用 Kapacitor 丰富您的数据

导航至

我们不时收到社区关于基于业务期间(例如典型工作日或细分为轮班期)查询 InfluxDB 的请求。考虑以下请求:如何仅针对工作时间(定义为周一至周五上午 08:00 至下午 05:00)汇总整个八月份的数据? InfluxQL 目前没有任何用于基于时间进行过滤的函数。我们仅限于

SELECT * FROM "mymeasurement" WHERE time >= '2017-08-01 08:00:00.000000' and time <= '2017-08-31 17:00:00.000000';

那么我们如何实现这一点呢?提供的 Telegraf 插件通常只发送时间戳值,并且除了与配置的插件关联的指标和标签外,还能够发送一些静态标签。解决方案是使用 Kapacitor 作为“预处理器”来“装饰”或丰富您的数据,并使用表示您希望查询的时间段的计算值。

就本文而言,我们在 localhost 上运行 Telegraf、InfluxDB 和 Kapacitor,但在成熟的环境中,这些将在不同的主机上运行。

第一步是将 Telegraf 配置为写入 Kapacitor 而不是直接写入 InfluxDB。在 telegraf.conf 文件的 [[outputs.influxdb]] 部分中,有 3 个关键设置需要考虑

[[outputs.influxdb]]
urls = [http://localhost:9092]
database = "kap_telegraf"
retention_policy = "autogen"

urls 参数必须指向端口 9092(Kapacitor 的默认监听端口)而不是 InfluxDB 的端口 8086。database 参数应指向不存在的数据库(您可以忽略 Telegraf 关于未找到数据库的警告)。retention_policy 参数应设置为“autogen”或您之前在实例中创建的特定保留策略。

注意:将 retention_policy 设置为“”(默认)与“autogen”不同,后者在初始化 InfluxDB 时被指定为默认保留策略。

telegraf.conf 中的所有其他设置都可以为您的实例正常配置。

下一步是创建一个 TICKscript,它将处理来自 Telegraf 的数据。在此示例中,我们有兴趣创建一个标签,如果数据点在我们上面描述的工作时间内,该标签将包含真值或假值。

stream
  |from()
   .database('kap_telegraf')
  |eval(lambda: if(((weekday("time") >= 1 AND weekday("time") <= 5) AND (hour("time") >= 8 AND (hour("time")*100+minute("time")) <= 1700)), 'true', 'false'))
     .as('business_hours')
     .tags('business_hours')
     .keep()
  |delete()
     .field('business_hours')
  |influxDBOut()
    .database('telegraf')
    .retentionPolicy('autogen')
    .tag('kapacitor_augmented','true')

在此 TICKscript 中,我们正在从我们上面在 telegraf.conf 文件中配置的不存在的数据库“kap_telegraf”流式传输数据。stream() 节点的 .from() 方法只需要数据库匹配即可。然后,我们将控制权传递给 eval() 节点,该节点将评估点是否已到达我们设计的窗口中。在本例中,我们使用 https://docs.influxdb.org.cn/kapacitor/v1.3/tick/expr/#time-functions 中描述的 weekday()hour()minute() 函数来评估“time”值。条件的第一部分评估一周中的日期是否在周一 (1) 和周五 (5) 之间。条件的第二部分评估小时值,注意考虑到我们希望在小时 17(下午 5 点)的“00”标记处停止。为此,我们将小时乘以 100,并将 minute() 函数的结果添加到与结束时间 1700 进行比较。如果 time 值在此范围内,则 eval() 节点返回 true 作为名为 business_hours 的字段。但是,由于我们想查询此值,我们应该将其作为标签,因此我们链接一个 .tags() 方法将值更改为名为 business_hours 的标签。

重要提示:eval() 节点将从流中删除所有其他字段和标签,因此我们希望指定 .keep() 以从流中保留这些值。

此时,我们有一个名为 business_hours 的字段和一个标签,其中包含 eval() 节点的输出。我们应该通过调用指定要通过 .fields() 方法删除的字段的 delete() 节点,将其从流中过滤掉。

最后,我们将流中的控制权传递给 influxDBOut() 节点,该节点指定要写入的目标数据库和保留策略。我们为此示例添加了一个额外的静态标签,称为 kapacitor-augmented。所有其他数据(如 measurement 名称)都将传递,并且只需要提供新信息。在本例中,我们将写入 telegraf 数据库和 autogen 保留策略。

创建 TICKscript(下面称为 businesshours.tick)后,我们必须告诉 Kapacitor 我们要运行它。对于此示例,我们将使用 Kapacitor CLI 来配置任务。

$ kapacitor define business_hours -type stream -dbrp kap_telegraf.autogen -tick business_hours.tick

此命令将名为 business_hours 的任务定义为流类型,侦听写入到 kap_telegraf.autogen 的数据,这是上面在 Telegraf 中配置的数据库名称和保留策略名称。成功创建任务后,我们需要启用它以供 Kapacitor 处理。

$ kapacitor enable business_hours

要显示状态,我们可以要求 Kapacitor 列出当前任务

$ kapacitor list tasks
ID             Type      Status    Executing Databases and Retention Policies
business_hours stream    enabled   true      ["kap_telegraf"."autogen"]

一旦数据开始通过 Kapacitor 流向 InfluxDB,您就可以将条件 AND business_hours='true' 添加到我们指定的第一个查询中

SELECT * FROM "mymeasurement" WHERE time >= '2017-08-01 08:00:00.000000' and time <= '2017-08-31 17:00:00.000000' AND business_hours='true';

总而言之,我们展示了一个简单的示例,说明如何使用 Kapacitor 来丰富从 Telegraf 流入 InfluxDB 的数据。此方法可用于添加其他类型的数据,从而可能无需创建自定义 Telegraf 插件来满足您的业务需求。