使用 Kapacitor 丰富您的数据
作者:Alan Caldera / 产品,开发者
2017年10月01日
导航至
有时,我们看到了社区围绕基于业务周期查询 InfluxDB 的请求,例如典型的业务日或拆分为班次周期。考虑以下请求:我如何仅针对周一至周五早上8点至下午5点的工作时间,对整个8月份的数据进行汇总?InfluxQL 目前没有基于时间过滤的任何功能。我们只能
SELECT * FROM "mymeasurement" WHERE time >= '2017-08-01 08:00:00.000000' and time <= '2017-08-31 17:00:00.000000';
那么我们如何实现这一点呢?提供的 Telegraf 插件通常只是发送一个时间戳值,并且还能够发送一些与配置的插件相关的指标和标签的静态标签。解决方案是使用 Kapacitor 作为“预处理程序”来“装饰”或丰富您的数据,以包含一个表示您想要查询的时间段的计算值。
出于本文目的,我们在本地主机上运行 Telegraf、InfluxDB 和 Kapacitor,但在完整环境中,这些将在不同的主机上运行。
第一步是配置 Telegraf 将数据写入 Kapacitor 而不是直接写入 InfluxDB。在 telegraf.conf
文件的 [[outputs.influxdb]]
部分中,有 3 个关键设置需要考虑
[[outputs.influxdb]]
urls = [https://127.0.0.1:9092]
database = "kap_telegraf"
retention_policy = "autogen"
urls 参数必须指向端口 9092(Kapacitor 的默认监听端口),而不是 InfluxDB 的端口 8086。数据库参数应指向一个不存在的数据库(可以忽略 Telegraf 关于数据库未找到的警告)。retention_policy
参数应设置为“autogen”或您之前在实例中创建的特定保留策略。
注意:将 retention_policy
设置为“”(默认)与设置为“autogen”不同,后者是初始化 InfluxDB 时指定的默认保留策略。
telegraf.conf
中的所有其他设置都可以根据您的实例正常配置。
下一步是创建一个 TICKscript,该脚本将处理来自 Telegraf 的数据。在此示例中,我们感兴趣的是创建一个包含布尔值的标签,如果数据点在我们上面描述的工作时间内,则该值为真。
stream
|from()
.database('kap_telegraf')
|eval(lambda: if(((weekday("time") >= 1 AND weekday("time") <= 5) AND (hour("time") >= 8 AND (hour("time")*100+minute("time")) <= 1700)), 'true', 'false'))
.as('business_hours')
.tags('business_hours')
.keep()
|delete()
.field('business_hours')
|influxDBOut()
.database('telegraf')
.retentionPolicy('autogen')
.tag('kapacitor_augmented','true')
在这段TICKscript中,我们从上面在telegraf.conf
文件中配置的不存在的数据库“kap_telegraf”进行流式传输。对于stream()
节点中的.from()
方法,只需要数据库匹配即可。然后,我们将控制权传递给一个eval()
节点,该节点将评估点是否到达我们设计的窗口中。在这种情况下,我们使用weekday()
、hour()
和minute()
函数(如https://docs.influxdb.org.cn/kapacitor/v1.3/tick/expr/#time-functions所述)来评估“时间”值。条件的第一部分评估星期几是否在周一(1)和周五(5)之间。条件的第二部分评估小时值,注意我们要在17点(下午5点)的“00”刻停止。为此,我们将小时数乘以100,并将minute()
函数的结果加到一起,以与结束时间1700进行比较。如果time
值在这个范围内,eval()
节点将返回true,作为名为business_hours
的字段。然而,由于我们想查询这个值,我们应该将其作为标签,因此我们链式调用.tags()
方法将值转换为名为business_hours
的标签。
重要提示:eval()
节点将消除流中的所有其他字段和标签,因此我们想要使用.keep()
来保留这些值。
在此阶段,我们有一个名为business_hours
的字段和标签,包含eval()
节点的输出。我们应该通过调用指定要删除的字段的delete()
节点来将其从流中过滤出来。
最后,我们将流控制传递给一个指定要写入的目标数据库和保留策略的influxDBOut()
节点。我们为这个例子添加了一个额外的静态标签kapacitor-augmented
。所有其他数据,如度量名称,都通过传递,并且只需提供新信息。在这种情况下,我们将写入telegraf
数据库和autogen
保留策略。
一旦创建了TICKscript(如下面的businesshours.tick
所引用),我们必须告诉Kapacitor我们想运行它。对于这个例子,我们将使用Kapacitor CLI来配置任务。
$ kapacitor define business_hours -type stream -dbrp kap_telegraf.autogen -tick business_hours.tick
此命令将任务定义为名为business_hours
的流类型,监听写入kap_telegraf.autogen
的操作,这是在Telegraf中配置的数据库名称和保留策略名称。一旦我们成功创建任务,我们需要通过Kapacitor启用它以进行处理。
$ kapacitor enable business_hours
要显示状态,我们可以要求Kapacitor列出当前任务
$ kapacitor list tasks
ID Type Status Executing Databases and Retention Policies
business_hours stream enabled true ["kap_telegraf"."autogen"]
一旦数据开始通过Kapacitor流向InfluxDB,你就可以在第一个指定的查询中添加你的条件AND business_hours='true'
SELECT * FROM "mymeasurement" WHERE time >= '2017-08-01 08:00:00.000000' and time <= '2017-08-31 17:00:00.000000' AND business_hours='true';
总之,我们展示了如何使用Kapacitor将来自Telegraf的数据丰富到InfluxDB的一个简单示例。这种方法可以用来添加其他类型的数据,可能无需创建定制的Telegraf插件来满足你的业务需求。