使用 InfluxDB Cloud 的 Meetup 通知

导航至

那是一个温暖的星期四下午,在苏格兰,熟悉的 Slack 提示音将我从白日梦中唤醒。是 Thom。嗯,他想要什么?

Slack message from Thom asking if we can publish Slack messages when a user group is coming up<figcaption> Thom 提出了一个简单的问题。</figcaption>

任务

这很简单。Thom 希望为每个用户组向我们的 社区 Slack 发布更新。他希望在每次活动前 1 周和每次活动前 24 小时发布这些通知。现在,当然,我可以使用许多 Slack 库中的一个,并使用一些 Python 来解析 Meetup API 并在 cron 上运行它……但这些是活动,对吧?InfluxDB 存储事件。所以我开始思考……我们可以使用 Telegraf 解析 Meetup API,将事件存储在 InfluxDB 中,然后使用 Flux 任务发送 Slack 警报吗?

是的。答案是肯定的。

收集事件

首先,我们需要从 Meetup.com 获取事件到 InfluxDB Cloud 中。我们非常幸运,因为 Meetup.com 实际上允许在无需身份验证的情况下合理访问其 API。Telegraf 可以处理 HTTP 请求的身份验证,但不必担心它真是太好了。

使用 Telegraf HTTP 输入插件,我们能够通过 HTTP 插件的简单配置从 Meetup.com API 获取所有 meetup 信息。

[[inputs.http]]
  name_override = "meetup-events"
  interval = "1h"

  urls = [
    "https://api.meetup.com/pro/influxdb/events"
  ]

  data_format = "json"

  tag_keys = [
    "event_id",
    "event_venue_city",
    "event_venue_localized_country_name",
    "chapter_name"
  ]

  json_string_fields = ["event_name", "event_link"]

  fieldpass = ["event_duration", "waitlist_count", "yes_rsvp_count"]

  json_time_key = "event_time"
  json_time_format = "unix_ms"

使用上面的配置,Telegraf 将每小时解析 Meetup.com 并将事件存储到 InfluxDB Cloud(省略了输出配置)。是的,我们将每小时写入相同的点;但我们正在使用事件开始的时间戳,并且每个事件的系列键(标签的组合)不会更改——所以我们很棒。

确认数据

默认情况下,当您使用“浏览指标”来挖掘您的数据时……它会假设您的数据在过去。可想而知!为了确认您的数据在那里,您需要更新 GUI 时间选择器——或者切换到脚本编辑器。我将向您展示如何执行这两种操作。

GUI

选择日期下拉菜单并选择“自定义时间范围”。您可以修改原始日期字符串,或多次单击月份选择器上的快进。

脚本编辑器

Flux 内置支持日期、时间和持续时间。这使我们能够编写以下查询,查找 now() 和 4 周 (4w) 时间之间的数据。

不错。

from(bucket: "metrics")
|> range(start: now(), stop: 4w)

发送警报

现在我们已经收集了数据并确认它可用。让我们看看如何设置第一个通知:提前 1 周。

首先,转到“任务”页面。如果您以前从未去过那里,您会看到一个像这样的空白页面。单击“创建任务”。

任务配置

我们将每小时运行此任务,因此我们需要在下面的范围过滤器中记住这一点。我们不想为同一个事件发送两次通知,因此我们需要确保每小时的每个查询都正确地限定了时间范围。

范围过滤器

我们需要为此查询设置一个非常具体的范围。我们只想要计划在 1 周和 1 周加 1 小时之间开始的事件。这个“加 1 小时”很重要。由于我们每小时运行一次任务,因此我们确保仅从 1 小时的窗口中获取事件;避免重复通知。

from(bucket: "metrics")
|> range(start: 1w, stop: 1w1h)

测量过滤器

现在,我们不想查询存储桶中的每个测量值。我们只想要 meetup-events 测量值(在 Telegraf 中配置的名称)。

|> filter(fn: (r) => r._measurement == "meetup-events")

分组和透视

由于数据在 InfluxDB 2 中的存储方式,我们需要几个额外的步骤才能以您期望的格式获取数据。使用我们当前的查询,我们将不会获得“行”式数据;其中每一行都是一个单独的事件。相反,我们将返回需要组装的列式数据。它看起来像这样

哎呀。可怕!为了使其成形,我的第一个想法是按 event_id 进行 group()

from(bucket: "metrics")
|> range(start: 1w, stop: 1w1h)
|> filter(fn: (r) => r._measurement == "meetup-events")
|> group(columns: ["event_id"], mode:"by")

嗯。但不幸的是,这不太正确。现在我们每个事件都有一个,每列都有一个

好的。让我们放弃 group 并使用 pivot

from(bucket: "metrics")
|> range(start: 1w, stop: 1w1h)
|> filter(fn: (r) => r._measurement == "meetup-events")
|> pivot(rowKey:["event_id"], columnKey: ["_field"], valueColumn: "_value")

透视将为每个事件返回一个;每个表都有一行包含所有事件字段。完美。

发送到 Slack

Flux 有 2 种与 Slack 交互的方法。

  1. HTTP 包
  2. Slack 包

HTTP 包允许我们向任何端点发送任意 HTTP 请求。这使我们能够使用 Slack 创建一个 传入 webhook 并将 HTTP 请求发送到该端点。

Slack 包允许我们配置一个 旧版令牌 并将通知直接发送到 Slack API。

后者允许您在“发送时间”指定发送者/频道和一些其他详细信息,而传入 webhook 则预先配置所有这些详细信息。

由于 Slack“旧版令牌”可能会随意弃用,因此我们将在此教程中使用传入 webhook 和 HTTP 包。

为了使用 HTTP 包,我们首先需要导入它。这非常简单:import "http"

目前,HTTP 包的 API 非常简单

import "http"

http.post(
  url: "http://localhost:9999/",
  headers: {x:"a", y:"b", z:"c"},
  data: bytes(v: "body")
)

考虑到这一点,我们可以采用我们一直在构建的 Flux 代码,并开始将其整合到有用的东西中。

注意:我们还使用 json 包来编码 HTTP 有效负载。

import "http"
import "json"

from(bucket: "metrics")
	|> range(start: 1w, stop: 1w1h)
	|> filter(fn: (r) =>
		(r._measurement == "meetup-events"))
	|> pivot(rowKey: ["event_id"], columnKey: ["_field"], valueColumn: "_value")
	|> map(fn: (r) => {
		message = {text: "1 Week Warning! ${r.event_group_name} is meeting this time next week in ${r.event_venue_city}, ${r.event_venue_localized_country_name}, and there's ${string(v: r.event_yes_rsvp_count)} amazing people for you to join and learn with. ${r.event_link}"}
		_ = http.post(url: "https://hooks.slack.com/services/SECRET/SECRET/SECRET", data: json.encode(v: message))

		return r
	})

哦,完 ... 蛋了!

是的,您也发现了吗?我们将硬编码的密钥存储在我们的 Flux 任务中。完蛋了!

幸运的是,InfluxDB 2 提供了密钥 API。遗憾的是,目前还没有闪亮的 GUI;但我们现在可以使用带有 curl 的原始 API。

让我们使用 curl 将我们的 Slack webhook 端点添加到我们的组织

curl -XPATCH \
  https://us-west-2-1.aws.cloud2.influxdata.com/api/v2/orgs/${ORG_ID}/secrets \
  -H 'authorization: Token ${TOKEN}' \
  -H 'Content-type: application/json' \
  --data '{"slackWebhook": "${SLACK_WEBHOOK_ENDPOINT}"}'

您需要

  1. 组织 ID(从您登录 InfluxDB Cloud 时的 URI 中获取)
  2. InfluxDB Cloud 令牌(从令牌页面获取)
  3. Slack Webhook 端点

注意事项

您可以多次使用带有 -XPATCHcurl 来添加新密钥。您可以使用 GET 请求查看密钥,但看不到密钥的值。

curl -XGET \ https://us-west-2-1.aws.cloud2.influxdata.com/api/v2/orgs/${ORG_ID}/secrets \ -H 'authorization: Token ${TOKEN}' \ -H 'Content-type: application/json'

清理

现在我们已经通过 API 创建了一个密钥,让我们将其引入我们的 Flux 任务以清理代码。这是完整的清理后的代码。

import "http"
import "json"
import "influxdata/influxdb/secrets"

webhookUri = secrets.get(key: "slackWebhook")

from(bucket: "metrics")
	|> range(start: 1w, stop: 1w1h)
	|> filter(fn: (r) =>
		(r._measurement == "meetup-events"))
	|> pivot(rowKey: ["event_id"], columnKey: ["_field"], valueColumn: "_value")
	|> map(fn: (r) => {
		message = {text: "1 Week Warning! ${r.event_group_name} is meeting this time next week in ${r.event_venue_city}, ${r.event_venue_localized_country_name}, and there's ${string(v: r.event_yes_rsvp_count)} amazing people for you to join and learn with. ${r.event_link}"}
		_ = http.post(url: webhookUri, data: json.encode(v: message))

		return r
	})

这是 Slack 上的输出。即使我这样说,这项工作也做得很好 ????

最终想法

带有 Flux、任务和密钥的 InfluxDB Cloud 是一个强大的功能组合。我们希望您喜欢本教程,并且我们迫不及待想看看您构建的内容。

祝您度过美好的一天。