第6部分:如何在Google Cloud Platform上使用TICK Stack创建一个物联网项目
作者:Todd Persen / 产品,用例
2016年3月3日
导航至
第6部分:使用Kapacitor设置警报
- 定义一个警报,用于监控温度数据并检查是否超过30摄氏度的阈值。
安装Kapacitor
要将 Kapacitor 部署到我们的虚拟机实例(运行 InfluxDB 的实例),我们需要通过 SSH 连接到该实例。请按照以下步骤操作:
- 登录到 Google 开发者控制台并选择您的项目。
- 从顶部滑动菜单中,转到计算 --> 计算引擎 --> 虚拟机实例
- 您将看到您的虚拟机实例列表。
- 请注意行末的 SSH 按钮。
- 点击它,等待 SSH 会话初始化并为您设置。如果一切顺利,您应该会看到一个新浏览器窗口,它会带您转到以下虚拟机实例
接下来是安装 Kapacitor,由于我们在创建虚拟机时选择了 Debian Linux,我们可以按照官方文档站点上给出的步骤安装 Kapacitor。
wget https://s3.amazonaws.com/kapacitor/kapacitor_0.10.1-1_amd64.deb
sudo dpkg -i kapacitor_0.10.1-1_amd64.deb
安装成功后,您将理想地拥有两个我们将要使用的应用程序
- `kapacitord`:这是 Kapacitor 守护进程,它需要运行以处理进入 InfluxDB 的数据。
- `kapacitor`:这是 CLI,我们将使用它来与 kapacitord 守护进程通信并设置我们的任务等。
生成默认配置文件
Kapacitor 是一个功能强大的产品,具有多个配置选项,这使得创建初始配置文件具有挑战性。因此,为了使事情变得简单,我们可以借助 kapacitord 应用程序来帮助我们生成默认配置文件。
我们继续生成默认配置文件:以下所示的 kapacitor.conf
$ kapacitord config > kapacitor.conf
配置文件(kapacitor.conf)包含多个配置部分,包括连接到 InfluxDB、可配置的各种通道等。
以下是 kapacitor.conf
文件中一些有趣的配置部分
- `[http]`:这是 kapacitord 将暴露的 API 端点,Kapacitor 客户端将与它通信。
- `[influxdb]`:在启动时,Kapacitor 默认设置多个 InfluxDB 数据库订阅。此部分包含连接到 InfluxDB 实例的各种配置属性。您会注意到它是一个 localhost URL,因为 InfluxDB 实例在同一实例上运行。
- `[logging]`:此部分包含默认日志级别。如果需要,可以通过 Kapacitor 客户端更改它。
- `[slack]`:我们感兴趣的教程部分是使用 Slack 通知。各种属性包括我们想要发布消息的 Slack 频道、Slack 团队的 incoming Webhook URL 等。我们将在本教程稍后设置 Slack Incoming Webhook 集成时再详细介绍。
启动 Kapacitor 服务
我们目前不对我们的 kapacitor.conf
文件进行任何更改。我们只需像下面这样启动 Kapacitor 服务
$ kapacitord -config kapacitor.conf
这启动了 Kapacitor 服务,您会注意到控制台日志的末尾会设置大量订阅,包括我们感兴趣的 temperature_db 数据库上的订阅。
Kapacitor 客户端
Kapacitor CLI(kapacitor)是您将用于与 Kapacitor 守护进程通信的客户端应用程序。您可以使用客户端不仅配置警报、启用/禁用它们,还可以检查它们的状态等。
检查为 Kapacitor 设置了任何任务的命令之一是使用 lists 命令。我们可以像下面这样执行它
$ kapacitor list tasks
Name Type Enabled Executing Databases and Retention Policies
这显示目前没有配置任何任务。
通过 TICKscript 创建高温警报任务
接下来我们要做的任务是设置一个任务来检测温度是否高于30摄氏度,从站点S1开始。任务脚本是用一种名为TICKscript的领域特定语言编写的。
以下是我们高温警报的TICKscript
stream
.from()
.database('temperature_db')
.measurement('temperature')
.where(lambda:"Station" == 'S1')
.alert()
.message('{{index .Tags "Station" }} has high temperature : {{ index .Fields "value" }}')
.warn(lambda:"value" >= 30)
.log('/tmp/high_temp.log')
请注意,按照下面的方式读取脚本是非常直观的
- 我们正在工作在流模式,这意味着Kapacitor正在订阅来自InfluxDB的实时数据流,而不是批量模式,在批量模式下,Kapacitor会批量查询InfluxDB。
- 然后我们指定要使用的InfluxDB数据库,通过database()函数。这将监控进入我们的temperature_db数据库的数据流。
- 为标签Station指定了一个过滤器。我们感兴趣的值是“S1”站点。
- 对于上述标准,我们希望在测量值大于30时发出警报。
- 如果值大于30,那么我们希望在这一点上将其记录在临时文件中(我们将在稍后看到Slack集成)。
- 我们还想捕捉的消息(即自定义消息)也进行了指定。例如:“S1温度过高:30.5”。
我们将上述TICKscript保存到temperature_alert.tick
文件中。
配置高温警报任务
下一步是使用Kapacitor客户端定义此任务并将其提供给Kapacitor。我们通过以下定义命令来完成此操作
$ kapacitor define \
-name temp_alert \
-type stream \
-tick temperature_alert.tick \
-dbrp temperature_db.default
请注意以下参数
- 我们将任务命名为temp_alert。
- 我们指定我们想以流模式使用它。
- 我们指定TICKscript文件:temperature_alert.tick。
- 数据库保留策略是从temperature_db数据库选择的默认策略(无限持续时间,复制因子设置为集群中的节点数)。
现在我们可以查看Kapacitor服务知道的任务,如下所示
$ kapacitor list tasks
Name Type Enabled Executing Databases and Retention Policies
temp_alert stream false false ["temperature_db"."default"]
您可以看到Enabled和Executing属性都被设置为false。
干运行:温度警报
在开发警报系统时面临的一个挑战是在投入生产之前对其进行测试。Kapacitor的一个很棒的功能是基于数据快照/记录进行警报的干运行。
步骤很简单,从高层次来看,我们必须做以下事情
- 确保警报(temp_alert)未启用。我们已在前面部分进行了验证。
- 我们要求Kapacitor记录从InfluxDB进入的数据流,持续一定的时间间隔(例如20秒或30秒)。在记录这些数据的同时,我们确保一些进入的数据满足触发警报的条件,正如我们所期望的。在我们的情况下,如果温度高于或等于30,则应记录数据。
- Kapacitor在上述定义的时间间隔内记录数据,并给我们一个记录ID。
- 然后我们重新播放这些数据,并告诉Kapacitor运行我们定义的警报(temp_alert)。
- 我们通过检查我们的日志文件(/tmp/high_temp.log)中是否有条目来检查与我们警报相关的TICKscript是否运行正常。
- 如果测试运行正常,我们将启用任务。
让我们开始吧。我们已经有了未启用的temp_alert
,即Enabled
属性的值为false,正如我们在kapacitor list tasks
命令中看到的。
下一步是要求Kapacitor开始记录我们警报的数据。我们要求它以流模式记录数据(其他选项是批量和查询)。我们指定持续时间为30秒,并指定任务名称(temp_alert)
。
kapacitor record stream -name temp_alert -duration 30s
这将使 Kapacitor 使用指定任务中指定的数据库和保留策略记录数据流 30 秒。如果您的数据正在流式传输,给它总共 30 秒来记录。或者,您也可以使用 Influx 客户端生成 INSERT 语句。
只需确保从第一个 INSERT 到最后一个 INSERT 的时间间隔等于或大于指定的持续时间(30 秒),无论您是通过手动 INSERT 语句发送数据,还是数据正在流式传输。
上述命令将完成,并输出一个记录 ID,例如下面所示
fbd79eaa-50c5-4591-bbb0-e76f354ef074
您可以使用以下命令检查 Kapacitor 中是否可用的记录
kapacitor list recordings <recording-id>
下面是输出示例
ID Type Size Created
fbd79eaa-50c5-4591-bbb0-e76f354ef074 stream 159 B 17 Feb 16 22:18 IST
大于零的大小表示已记录数据。现在,我们只需回放记录的数据以针对我们指定的警报即可。-fast 参数用于尽可能快地回放数据,而不必等待记录数据所花费的整个持续时间(在我们的例子中是 30 秒)
kapacitor replay -id $rid -name temp_alert -fast
其中 $rid
是包含 记录 ID
值的变量。
我在记录阶段使用的数据中,某些记录的温度值超过 30 摄氏度,这正是我期望警报被触发并将记录写入 /tmp/high_temp.log
文件的情况。
检查 /tmp/high_temp.log
文件中的条目,我们确实注意到以下条目
$ cat /tmp/high_temp.log
{"id":"temperature:nil","message":"S1 has high temperature : 31",
}
{"id":"temperature:nil","message":"S1 has high temperature : 32",
}
{"id":"temperature:nil","message":"S1 has high temperature : 31",
}
启用任务
现在我们已经验证了警报正常工作,我们需要将其投入使用。这意味着我们需要像下面这样启用任务
$ kapacitor enable temp_alert
您现在可以通过以下 show
命令检查任务的详细信息
$ kapacitor show temp_alert
这将打印出关于任务的详细信息,以及以下给出的任务 TICKscript
Name: temp_alert
Error:
Type: stream
Enabled: true
Executing: true
Databases Retention Policies: ["temperature_db"."default"]
TICKscript:
stream
.from()
.database('temperature_db')
.measurement('temperature')
.where(lambda:"Station" == 'S1')
.alert()
.message('{{index .Tags "Station" }} has high temperature : {{ index .Fields "value" }}')
.warn(lambda:"value" >= 30)
.log('/tmp/high_temp.log')
DOT:
digraph temp_alert {
stream0 -> stream1 [label="0"];
stream1 -> alert2 [label="0"];
}
注意,现在 Enabled
和 Executing
属性都为 true。
高温警报正在行动
如果温度值正在传入,任务将被执行,并将记录写入日志文件。下面是 /tmp/high_temp.log
文件中特定记录的示例
{"id":"temperature:nil","message":"S1 has high temperature : 30","time":"2016-01-22T06:37:58.83553813Z","level":"WARNING","data":{"series":[{"name":"temperature","tags":{"Station":"S1"},"columns":["time","value"],"values":[["2016-01-22T06:37:58.83553813Z",30]]}]}}
注意,消息属性包含消息以及其他标记、值和时间戳。
这证实了我们的高温警报任务已经正确设置并正常运行。下一步是设置 Slack 通道通知。
Slack 入站钩子集成
Slack API 为外部应用程序提供了多种与其集成的机制。其中之一是入站 Webhooks 集成。通过这种集成机制,外部应用程序可以向 Slack 团队中的特定通道或用户发送消息。
Kapacitor 支持通过此机制向您的 Slack 团队发送消息,因此我们只需向 Kapacitor 配置提供详细信息,在我们的 TICKscript 中指定 Slack 通知,我们就可以完成了。
启用 Slack 通道
第一步是在您的 Slack 团队内部启用此集成。为此,我们将假设您已登录到您的 Slack 团队,并且您是管理员。
转到 Slack 应用目录 并点击下面的 创建自定义集成
这将显示您可以为您团队构建的自定义集成列表,我们将选择下面的 入站 Webhooks
我们希望消息发布到 #general 通道,因此我们选择该通道并点击 添加传入 WebHooks 集成。
这完成了 WebHooks 设置,并将带您进入刚刚设置的集成详情页面。这将包含您需要记录的 Webhook URL。Kapacitor 只需要这个信息,以便它能将 JSON 有效负载数据发布到 Slack,Slack 再将其传递到您的 #general 通道。
在 Kapacitor 配置文件中配置 Slack 通道
接下来我们需要做的是回到 Kapacitor 服务使用的 kapacitor.conf
文件。
在该文件中,您将找到 [slack]
配置部分,并按以下方式填写:
[slack]
enabled = true
url = "https://hooks.slack.com/services/<rest of Webhook URL>"
channel = "#general"
global = false
请注意,从上一部分获得的 Webhook URL 设置为 url 属性。我们还启用了此通道,指定了要发布的 channel (#general)
,并将全局设置为 false,因为我们希望在 TICKscript 中显式启用 Slack 集成。
保存此文件,并再次重新启动 Kapacitor 服务。
您应该在启动控制台的最后几行看到以下内容
[udp:temperature_db.default] 2016/01/22 06:46:53 I! Started listening on UDP: 127.0.0.1:35958
[influxdb] 2016/01/22 06:46:53 I! started UDP listener for temperature_db default
[task_master] 2016/01/22 06:46:53 I! Started task: temp_alert
请注意,已为我们的 temperature_db 数据库启动了监听器,并且我们的任务也已启动。
将 Slack 通道添加到 TICKscript
我们尚未修改我们的 TICKscript,它只记录了高温到一个文件。现在我们将添加 Slack 通道。
在编辑器中打开 temperature_alert.tick
文件,并添加以下突出显示的附加行
stream
.from()
.database('temperature_db')
.measurement('temperature')
.where(lambda:"Station" == 'S1')
.alert()
.message('{{index .Tags "Station" }} has high temperature : {{ index .Fields "value" }}')
.warn(lambda:"value" >= 30)
.slack()
.log('/tmp/high_temp.log')
保存 temperature_alert.tick
文件。
重新加载任务
我们现在将重新加载任务,因为我们已更改了脚本。为此,您必须再次定义任务(使用相同的名称),如下所示。define 命令将自动重新加载已启用的任务
$ kapacitor define -name temp_alert -tick temperature_alert.tick
Slack 通道通知
我们现在已准备好接收 Slack 通知。如果温度数据正在流入,并且温度值超过 30 摄氏度,您将在 Slack 中看到一个通知。下面是我们 general 通道中的示例记录
这标志着 Kapacitor 与我们的物联网传感器应用的集成完成。