使用 Kapacitor 丰富您的数据
作者:Alan Caldera / 产品, 开发者
2017 年 10 月 01 日
导航至
我们不时收到社区关于基于业务期间(例如典型工作日或细分为轮班期)查询 InfluxDB 的请求。考虑以下请求:如何仅针对工作时间(定义为周一至周五上午 08:00 至下午 05:00)汇总整个八月份的数据? InfluxQL 目前没有任何用于基于时间进行过滤的函数。我们仅限于
SELECT * FROM "mymeasurement" WHERE time >= '2017-08-01 08:00:00.000000' and time <= '2017-08-31 17:00:00.000000';
那么我们如何实现这一点呢?提供的 Telegraf 插件通常只发送时间戳值,并且除了与配置的插件关联的指标和标签外,还能够发送一些静态标签。解决方案是使用 Kapacitor 作为“预处理器”来“装饰”或丰富您的数据,并使用表示您希望查询的时间段的计算值。
就本文而言,我们在 localhost
上运行 Telegraf、InfluxDB 和 Kapacitor,但在成熟的环境中,这些将在不同的主机上运行。
第一步是将 Telegraf 配置为写入 Kapacitor 而不是直接写入 InfluxDB。在 telegraf.conf
文件的 [[outputs.influxdb]]
部分中,有 3 个关键设置需要考虑
[[outputs.influxdb]]
urls = [http://localhost:9092]
database = "kap_telegraf"
retention_policy = "autogen"
urls 参数必须指向端口 9092(Kapacitor 的默认监听端口)而不是 InfluxDB 的端口 8086。database 参数应指向不存在的数据库(您可以忽略 Telegraf 关于未找到数据库的警告)。retention_policy
参数应设置为“autogen”或您之前在实例中创建的特定保留策略。
注意:将 retention_policy
设置为“”(默认)与“autogen”不同,后者在初始化 InfluxDB 时被指定为默认保留策略。
telegraf.conf
中的所有其他设置都可以为您的实例正常配置。
下一步是创建一个 TICKscript,它将处理来自 Telegraf 的数据。在此示例中,我们有兴趣创建一个标签,如果数据点在我们上面描述的工作时间内,该标签将包含真值或假值。
stream
|from()
.database('kap_telegraf')
|eval(lambda: if(((weekday("time") >= 1 AND weekday("time") <= 5) AND (hour("time") >= 8 AND (hour("time")*100+minute("time")) <= 1700)), 'true', 'false'))
.as('business_hours')
.tags('business_hours')
.keep()
|delete()
.field('business_hours')
|influxDBOut()
.database('telegraf')
.retentionPolicy('autogen')
.tag('kapacitor_augmented','true')
在此 TICKscript 中,我们正在从我们上面在 telegraf.conf
文件中配置的不存在的数据库“kap_telegraf”流式传输数据。stream()
节点的 .from()
方法只需要数据库匹配即可。然后,我们将控制权传递给 eval()
节点,该节点将评估点是否已到达我们设计的窗口中。在本例中,我们使用 https://docs.influxdb.org.cn/kapacitor/v1.3/tick/expr/#time-functions 中描述的 weekday()
、hour()
和 minute()
函数来评估“time”值。条件的第一部分评估一周中的日期是否在周一 (1) 和周五 (5) 之间。条件的第二部分评估小时值,注意考虑到我们希望在小时 17(下午 5 点)的“00”标记处停止。为此,我们将小时乘以 100,并将 minute()
函数的结果添加到与结束时间 1700 进行比较。如果 time
值在此范围内,则 eval()
节点返回 true 作为名为 business_hours
的字段。但是,由于我们想查询此值,我们应该将其作为标签,因此我们链接一个 .tags()
方法将值更改为名为 business_hours
的标签。
重要提示:eval()
节点将从流中删除所有其他字段和标签,因此我们希望指定 .keep()
以从流中保留这些值。
此时,我们有一个名为 business_hours
的字段和一个标签,其中包含 eval()
节点的输出。我们应该通过调用指定要通过 .fields()
方法删除的字段的 delete()
节点,将其从流中过滤掉。
最后,我们将流中的控制权传递给 influxDBOut()
节点,该节点指定要写入的目标数据库和保留策略。我们为此示例添加了一个额外的静态标签,称为 kapacitor-augmented
。所有其他数据(如 measurement 名称)都将传递,并且只需要提供新信息。在本例中,我们将写入 telegraf
数据库和 autogen
保留策略。
创建 TICKscript(下面称为 businesshours.tick
)后,我们必须告诉 Kapacitor 我们要运行它。对于此示例,我们将使用 Kapacitor CLI 来配置任务。
$ kapacitor define business_hours -type stream -dbrp kap_telegraf.autogen -tick business_hours.tick
此命令将名为 business_hours
的任务定义为流类型,侦听写入到 kap_telegraf.autogen
的数据,这是上面在 Telegraf 中配置的数据库名称和保留策略名称。成功创建任务后,我们需要启用它以供 Kapacitor 处理。
$ kapacitor enable business_hours
要显示状态,我们可以要求 Kapacitor 列出当前任务
$ kapacitor list tasks
ID Type Status Executing Databases and Retention Policies
business_hours stream enabled true ["kap_telegraf"."autogen"]
一旦数据开始通过 Kapacitor 流向 InfluxDB,您就可以将条件 AND business_hours='true'
添加到我们指定的第一个查询中
SELECT * FROM "mymeasurement" WHERE time >= '2017-08-01 08:00:00.000000' and time <= '2017-08-31 17:00:00.000000' AND business_hours='true';
总而言之,我们展示了一个简单的示例,说明如何使用 Kapacitor 来丰富从 Telegraf 流入 InfluxDB 的数据。此方法可用于添加其他类型的数据,从而可能无需创建自定义 Telegraf 插件来满足您的业务需求。