第 6 部分:如何在 Google Cloud Platform 上使用 TICK Stack 创建 IoT 项目
作者:Todd Persen / 产品, 用例
2016 年 3 月 3 日
导航至
第 6 部分:使用 Kapacitor 设置警报
在这一部分中,我们将了解 TICK Stack 中的“K”——Kapacitor。Kapacitor 是一个流和批处理引擎,既是数据处理器又是警报引擎。在我们的例子中,我们将专门以下列方式使用它
- 定义一个警报,监控温度数据并检查它是否超过 30 摄氏度的阈值。
- 如果报告的温度高于该值,我们希望将此记录记录在一个文件中,并在我们的 Slack 频道中发出通知。
Kapacitor 的功能远不止于此,它配备了一个复杂的引擎来检测数据模式,并将其导入到它开箱即用的多个通道中。在我们的例子中,将高温记录在一个文件中并通过 Slack 发出通知只是它可以做的几个集成。
那么,让我们开始设置 Kapacitor 并查看我们的温度警报是否生效。
安装 Kapacitor
我们将在与 InfluxDB 实例相同的实例上运行 Kapacitor。此实例在 Google Cloud 上运行,因此安装此软件的最佳方法是通过 SSH 连接到该实例。
要将 Kapacitor 设置到我们的 VM 实例(运行 InfluxDB 的实例)中,我们需要 SSH 连接到该实例。请按照以下步骤操作
- 登录 Google Developers Console 并选择您的项目。
- 从顶部的滑动菜单中,转到 Compute --> Compute Engine --> VM Instances
- 您将看到您的 VM 实例已列出。
- 查找行尾的 SSH 按钮。
- 单击该按钮并等待 SSH 会话初始化并为您设置。如果一切顺利,您应该会看到另一个浏览器窗口,将您传送到 VM 实例,如下所示
接下来是安装 Kapacitor,由于这是我们在创建 VM 时选择的 Debian Linux,我们可以按照官方文档站点上给出的 安装 Kapacitor 的步骤进行操作。
wget https://s3.amazonaws.com/kapacitor/kapacitor_0.10.1-1_amd64.deb
sudo dpkg -i kapacitor_0.10.1-1_amd64.deb
成功安装后,您理想情况下将拥有两个我们将要使用的应用程序
- `kapacitord`:这是 Kapacitor 守护程序,需要运行以处理进入 InfluxDB 的数据。
- `kapacitor`:这是我们将用来与 kapacitord 守护程序通信并设置我们的任务等的 CLI。
生成默认配置文件
Kapacitor 是一个功能强大的产品,具有多个配置选项,这使得创建初始配置文件具有挑战性。因此,为了简化操作,我们可以借助 kapacitord 应用程序来帮助我们生成默认配置文件。
我们继续生成一个默认配置文件:kapacitor.conf
,如下所示
$ kapacitord config > kapacitor.conf
配置文件 (kapacitor.conf) 具有多个配置部分,包括与 InfluxDB 的连接、可以配置的各种通道等等。
以下是 kapacitor.conf
文件中一些感兴趣的配置部分
- `[http]`:这是 kapacitord 将公开的 API 端点,Kapacitor 客户端将与之通信。
- `[influxdb]`:在启动时,Kapacitor 默认会为 InfluxDB 数据库设置多个订阅。此部分具有用于连接到 InfluxDB 实例的各种配置属性。您会注意到它是一个 localhost url,因为 InfluxDB 实例在同一实例上运行。
- `[logging]`:此部分具有默认日志记录级别。如果需要,可以通过 Kapacitor 客户端更改此级别。
- `[slack]`:我们在教程中感兴趣的部分是通过 Slack 获得通知。各种属性包括我们想要将消息发布到的 Slack 中的频道、Slack 团队的传入 Webhook URL 等。当我们设置 Slack 传入 Webhook 集成时,我们将在本教程稍后部分详细介绍。
启动 Kapacitor 服务
我们目前不对 kapacitor.conf
文件进行任何更改。我们只需启动 Kapacitor 服务,如下所示
$ kapacitord -config kapacitor.conf
这将启动 Kapacitor 服务,您将在控制台日志记录的末尾注意到已设置了一堆订阅,包括我们感兴趣的 temperature_db 数据库上的订阅。
Kapacitor 客户端
Kapacitor CLI (kapacitor) 是您将用来与 Kapacitor 守护程序通信的客户端应用程序。您可以使用客户端不仅配置警报、启用/禁用它们,还可以检查它们的状态等等。
检查 Kapacitor 是否设置了任何任务的命令之一是通过 lists 命令。我们可以像下面这样触发它
$ kapacitor list tasks
Name Type Enabled Executing Databases and Retention Policies
这表明当前未配置任何任务。
通过 TICKscript 创建高温警报任务
接下来我们要做的任务是设置任务,以检测来自站点 S1 的温度是否高于 30 摄氏度。任务脚本是用一种名为 TICKscript 的 DSL 编写的。
我们的高温警报的 TICKscript 如下所示
stream
.from()
.database('temperature_db')
.measurement('temperature')
.where(lambda:"Station" == 'S1')
.alert()
.message('{{index .Tags "Station" }} has high temperature : {{ index .Fields "value" }}')
.warn(lambda:"value" >= 30)
.log('/tmp/high_temp.log')
请注意,读取以下给出的脚本非常直观
- 我们正在查看在流模式下工作,这意味着 Kapacitor 正在订阅来自 InfluxDB 的实时数据馈送 v/s 批处理模式,其中 Kapacior 以批处理方式查询 InfluxDB。
- 然后,我们通过 database() 指定哪个 InfluxDB 数据库。这将监控进入我们的 temperature_db 数据库的数据流。
- 为标签站点指定了一个过滤器。我们感兴趣的值是“S1”站点。
- 对于上述标准,只有当测量值大于 30 时,我们才希望收到警报。
- 如果值大于 30,那么我们希望此时将该数据记录在一个临时文件中(稍后我们将看到 Slack 集成)。
- 我们想要捕获的消息(即自定义消息)也被指定。例如,“S1 温度过高:30.5”。
我们将上述 TICKscript 保存在 temperature_alert.tick
文件中。
配置高温警报任务
下一步是使用 Kapacitor 客户端定义此任务,并使其可用于 Kapacitor。我们通过 define 命令执行此操作,如下所示
$ kapacitor define \
-name temp_alert \
-type stream \
-tick temperature_alert.tick \
-dbrp temperature_db.default
请注意以下参数
- 我们将我们的任务命名为 temp_alert。
- 我们指定我们要以流模式使用它。
- 我们指定 TICKscript 文件:temperature_alert.tick。
- 数据库保留策略从 temperature_db 数据库中选择为默认策略(无限持续时间,复制因子设置为集群中节点的数量)。
我们现在可以查看 Kapacitor 服务知道的任务,如下所示
$ kapacitor list tasks
Name Type Enabled Executing Databases and Retention Policies
temp_alert stream false false ["temperature_db"."default"]
您可以看到 Enabled 和 Executing 属性设置为 false。
干运行:温度警报
在开发警报系统时,您面临的挑战之一是在投入生产之前对其进行测试。Kapacitor 的一个很棒的功能是基于数据快照/记录对警报进行干运行。
步骤非常简单,在高层次上,我们必须执行以下操作
- 确保未启用警报 (temp_alert)。我们在上一节中验证了这一点。
- 我们要求 Kapacitor 记录在给定时间间隔(例如 20 秒或 30 秒)内进入 InfluxDB 的数据流。在记录此数据时,我们确保传入的某些数据满足触发警报的条件,正如我们所期望的那样。在我们的例子中,如果温度高于或等于 30,那么它应该记录数据。
- Kapacitor 记录在上面定义的时间间隔内的数据,并为我们提供一个记录 ID。
- 然后,我们重放该数据,并告诉 Kapacitor 在我们定义的警报 (temp_alert) 中运行它。
- 我们通过检查我们的日志文件 (/tmp/high_temp.log) 中是否有任何条目来检查与警报关联的 TICKscript 是否工作正常。
- 如果测试运行良好,我们将启用该任务。
让我们开始吧。我们已经使我们的 temp_alert
未启用,即 Enabled
属性的值为 false,正如我们在 kapacitor list tasks
命令中看到的那样。
下一步是要求 Kapacitor 开始记录我们警报的数据。我们要求它以流模式记录数据(其他选项是 batch 和 query)。我们将持续时间指定为 30 秒,并指定任务名称 (temp_alert)
。
kapacitor record stream -name temp_alert -duration 30s
这将使 Kapacitor 使用指定的任务中的数据库和保留策略记录 30 秒的实时数据流。如果您的数据正在流式传输,请给它总共 30 秒的时间来记录它。或者,您也可以使用 Influx 客户端生成 INSERT 语句。
只需确保从第一个 INSERT 到最后一个 INSERT 的时间间隔等于或大于指定的持续时间(30 秒),无论您是通过手动 INSERT 语句发送数据,还是数据正在流式传输。
上述命令将完成,并将输出一个记录 ID,如下例所示
fbd79eaa-50c5-4591-bbb0-e76f354ef074
您可以使用以下命令检查记录是否在 Kapacitor 中可用
kapacitor list recordings <recording-id>
示例输出如下所示
ID Type Size Created
fbd79eaa-50c5-4591-bbb0-e76f354ef074 stream 159 B 17 Feb 16 22:18 IST
大于零的大小表示数据已记录。现在,我们所需要做的就是针对我们指定的警报重放记录的数据。提供 -fast 参数是为了尽可能快地重放数据,而不是等待数据记录的整个持续时间(在我们的例子中为 30 秒)
kapacitor replay -id $rid -name temp_alert -fast
其中 $rid
是一个变量,包含 Recording Id
的值。
我在记录阶段中使用的数据包含一些记录的温度值超过 30 摄氏度,这正是我期望警报被触发并在 /tmp/high_temp.log
文件中写入记录的原因。
在检查文件 /tmp/high_temp.log
的条目时,我们确实注意到了如下所示的条目
$ cat /tmp/high_temp.log
{"id":"temperature:nil","message":"S1 has high temperature : 31",
}
{"id":"temperature:nil","message":"S1 has high temperature : 32",
}
{"id":"temperature:nil","message":"S1 has high temperature : 31",
}
启用任务
现在我们已经验证了我们的警报工作正常,我们需要使其生效。这意味着我们需要启用任务,如下所示
$ kapacitor enable temp_alert
您现在可以通过 show
命令检查您的任务的详细信息,如下所示
$ kapacitor show temp_alert
这将打印出任务的详细信息以及任务的 TICKscript,如下所示
Name: temp_alert
Error:
Type: stream
Enabled: true
Executing: true
Databases Retention Policies: ["temperature_db"."default"]
TICKscript:
stream
.from()
.database('temperature_db')
.measurement('temperature')
.where(lambda:"Station" == 'S1')
.alert()
.message('{{index .Tags "Station" }} has high temperature : {{ index .Fields "value" }}')
.warn(lambda:"value" >= 30)
.log('/tmp/high_temp.log')
DOT:
digraph temp_alert {
stream0 -> stream1 [label="0"];
stream1 -> alert2 [label="0"];
}
请注意,Enabled
和 Executing
属性现在为 true。
高温警报生效
如果温度值正在传入,则将执行任务,并将记录写入日志文件。下面显示了 /tmp/high_temp.log
文件中的特定记录
{"id":"temperature:nil","message":"S1 has high temperature : 30","time":"2016-01-22T06:37:58.83553813Z","level":"WARNING","data":{"series":[{"name":"temperature","tags":{"Station":"S1"},"columns":["time","value"],"values":[["2016-01-22T06:37:58.83553813Z",30]]}]}}
请注意,message 属性包含消息以及其他标签、值和时间戳。
这证实我们的高温警报任务已正确设置并且工作正常。接下来要做的是设置 Slack 频道通知。
Slack 传入 Hook 集成
Slack API 提供了多种机制供外部应用程序与其集成。其中之一是传入 Webhook 集成。通过这种集成机制,外部应用程序可以将消息发布到 Slack 团队中的特定频道或用户。
Kapacitor 支持通过此机制将消息发布到您的 Slack 团队,因此我们所需要做的就是向 Kapacitor 配置提供详细信息,在我们的 TICKscript 中指定 slack 通知,我们就完成了所有设置。
启用 Slack 频道
第一步是在您的 Slack 团队内部启用此集成。为此,我们将假设您已登录到您的 Slack 团队并且您是管理员。
转到 Slack 应用目录 并单击制作自定义集成,如下所示
这将显示您可以为您的团队构建的自定义集成列表,我们将选择传入 WebHooks,如下所示
我们希望消息发布到 #general 频道,因此我们选择该频道并单击添加传入 WebHooks 集成。
这将完成 WebHooks 设置,并将您引导到您刚刚设置的集成的详细信息页面。这将包含您需要记下的Webhook URL。Kapacitor 只需要拥有此信息,以便它可以将 JSON Payload 数据发布到 Slack,而 Slack 又会将其传递到您的 #general 频道。
在 Kapacitor 配置文件中配置 Slack 频道
接下来我们需要做的是返回到我们的 Kapacitor 服务正在使用的 kapacitor.conf
文件。
在该文件中,您将找到 [slack]
配置部分,我们将按如下方式填写该部分
[slack]
enabled = true
url = "https://hooks.slack.com/services/<rest of Webhook URL>"
channel = "#general"
global = false
请注意,我们从上一节获得的 Webhook URL 设置为 url 属性。我们还启用此通道,指定要发布到的 channel (#general)
,并将 global 设置为 false,因为我们希望在我们的 TICKscript 中显式启用 Slack 集成。
保存此文件并再次重启 Kapacitor 服务。
您应该看到启动控制台中的最后几行,如下所示
[udp:temperature_db.default] 2016/01/22 06:46:53 I! Started listening on UDP: 127.0.0.1:35958
[influxdb] 2016/01/22 06:46:53 I! started UDP listener for temperature_db default
[task_master] 2016/01/22 06:46:53 I! Started task: temp_alert
请注意,已为我们的 temperature_db 数据库启动了侦听器,并且我们的任务也已启动。
将 Slack 频道添加到 TICKscript
我们尚未修改我们的 TICKscript,它仅将高温记录到一个文件中。我们现在将添加 Slack 频道。
在编辑器中打开 temperature_alert.tick
文件,并添加如下突出显示的附加行
stream
.from()
.database('temperature_db')
.measurement('temperature')
.where(lambda:"Station" == 'S1')
.alert()
.message('{{index .Tags "Station" }} has high temperature : {{ index .Fields "value" }}')
.warn(lambda:"value" >= 30)
.slack()
.log('/tmp/high_temp.log')
保存 temperature_alert.tick
文件。
重新加载任务
由于我们更改了脚本,我们现在将再次重新加载任务。为此,您必须再次定义任务(使用相同的名称),如下所示。define
命令将自动重新加载已启用的任务
$ kapacitor define -name temp_alert -tick temperature_alert.tick
Slack 频道通知
我们现在都已设置好接收 Slack 通知。如果温度数据正在流式传输,并且温度值高于 30 摄氏度,您将在 Slack 中看到通知。下面显示了我们的 general 中的示例记录
这结束了 Kapacitor 集成到我们的 IoT 传感器应用程序中。