使用InfluxDB云服务获取Meetup通知

导航至

那是一个温暖的星期四下午,在苏格兰,Slack的熟悉铃声将我从沉思中唤醒。是汤姆。嗯,他想干什么?

汤姆在Slack上询问是否可以在用户小组即将到来时发布消息<figcaption> 汤姆提出了一个简单的问题。</figcaption>

任务

任务很简单。汤姆想为我们社区Slack工作空间的每个用户小组发布更新。他希望这些通知在活动前一周和活动前24小时发布。当然,我本可以使用许多Slack库和Python解析Meetup API并在cron上运行它……但这些是事件,对吧?InfluxDB存储事件。所以我开始思考……我们能否使用Telegraf解析Meetup API,将事件存储在InfluxDB中,然后使用Flux任务发送Slack警报?

是的。答案是肯定的。

收集事件

首先,我们需要将Meetup.com上的事件获取到InfluxDB云中。在这里我们非常幸运,因为Meetup.com实际上允许未经身份验证即可合理访问他们的API。Telegraf可以处理HTTP请求的身份验证,但不必担心这个问题真是太好了。

使用Telegraf HTTP输入插件,我们能够通过简单的配置HTTP插件从Meetup.com API获取所有Meetup信息。

[[inputs.http]]
  name_override = "meetup-events"
  interval = "1h"

  urls = [
    "https://api.meetup.com/pro/influxdb/events"
  ]

  data_format = "json"

  tag_keys = [
    "event_id",
    "event_venue_city",
    "event_venue_localized_country_name",
    "chapter_name"
  ]

  json_string_fields = ["event_name", "event_link"]

  fieldpass = ["event_duration", "waitlist_count", "yes_rsvp_count"]

  json_time_key = "event_time"
  json_time_format = "unix_ms"

上述配置下,Telegraf将每小时解析Meetup.com并存储事件到InfluxDB云(输出配置省略)。是的,我们将每小时写入相同的数据点;但我们使用事件开始的时间戳和每个事件(标签的组合)的系列键不会改变——所以我们没问题。

确认数据

默认情况下,当您使用“探索指标”挖掘数据时……它假定您的数据是过去的。想想看!为了确认您的数据存在,您需要更新GUI时间选择器——或者切换到脚本编辑器。我会向您展示如何做这两件事。

GUI

选择日期下拉菜单,选择“自定义时间范围”。您可以选择修改原始日期字符串,或者在月份选择器上快速向前滚动多次。

脚本编辑器

Flux内置了对日期、时间和持续时间的支持。这允许我们编写下面的查询,查找从now()到4周(4w)的数据。

很好。

from(bucket: "metrics")
|> range(start: now(), stop: 4w)

发送警报

现在我们已经收集了数据并确认了其可用性。让我们看看如何设置第一个通知:提前一周。

首先,转到任务页面。如果您以前从未去过那里,您将看到一个空页面,就像这样。点击“创建任务”。

任务配置

我们将每小时运行此任务,因此在下面的范围过滤器中需要记住这一点。我们不想为事件发送两次通知,因此需要确保每小时每个查询的时间范围正确。

范围过滤器

我们需要为这个查询一个非常具体的范围。我们只想获取预定在1周和1周加1小时之间开始的事件。这个“加1小时”很重要。因为我们每小时运行一次任务,我们确保只获取1小时窗口的事件;避免重复通知。

from(bucket: "metrics")
|> range(start: 1w, stop: 1w1h)

测量过滤器

现在,我们不想查询我们的桶中的每个测量值。我们只想获取meetup-events 测量值(在Telegraf中配置的名称)。

|> filter(fn: (r) => r._measurement == "meetup-events")

分组和旋转

由于数据在InfluxDB 2中的存储方式,我们需要一些额外步骤来获取您期望的格式。根据我们的当前查询,我们不会得到基于“行”的数据;其中每一行是一个单独的事件。相反,我们将得到需要组装的列式数据。它看起来像这样

哎呀。太可怕了!为了使数据符合要求,我的第一个想法是event_id分组。

from(bucket: "metrics")
|> range(start: 1w, stop: 1w1h)
|> filter(fn: (r) => r._measurement == "meetup-events")
|> group(columns: ["event_id"], mode:"by")

嗯。但不幸的是,这并不完全正确。现在我们有一个每个事件一个表,每列一个

好的。让我们放弃分组并使用旋转

from(bucket: "metrics")
|> range(start: 1w, stop: 1w1h)
|> filter(fn: (r) => r._measurement == "meetup-events")
|> pivot(rowKey:["event_id"], columnKey: ["_field"], valueColumn: "_value")

旋转将为每个事件返回一个;每个表都有一个包含所有事件字段的单行。完美。

发送到Slack

Flux有两种与Slack交互的方法。

  1. HTTP包
  2. Slack包

HTTP包允许我们向任何端点发送任意HTTP请求。这允许我们使用Slack创建一个入站webhook并将HTTP请求发送到端点。

Slack包允许我们提供旧令牌并将通知直接发送到Slack API。

后者允许您在“发送时间”指定发送者/频道和其他一些细节,而入站webhook则在预设置所有这些细节。

由于Slack的“旧令牌”可能会随意弃用,我们将使用入站webhook和HTTP包来展示本教程。

为了使用HTTP包,我们首先需要导入它。这很简单:import "http"

目前HTTP包的API相当简单

import "http"

http.post(
  url: "https://127.0.0.1:9999/",
  headers: {x:"a", y:"b", z:"c"},
  data: bytes(v: "body")
)

因此,基于这个想法,我们可以将我们一直在构建的Flux代码组合在一起,使其变得有用。

注意:我们还在使用json 包来编码HTTP有效负载。

import "http"
import "json"

from(bucket: "metrics")
	|> range(start: 1w, stop: 1w1h)
	|> filter(fn: (r) =>
		(r._measurement == "meetup-events"))
	|> pivot(rowKey: ["event_id"], columnKey: ["_field"], valueColumn: "_value")
	|> map(fn: (r) => {
		message = {text: "1 Week Warning! ${r.event_group_name} is meeting this time next week in ${r.event_venue_city}, ${r.event_venue_localized_country_name}, and there's ${string(v: r.event_yes_rsvp_count)} amazing people for you to join and learn with. ${r.event_link}"}
		_ = http.post(url: "https://hooks.slack.com/services/SECRET/SECRET/SECRET", data: json.encode(v: message))

		return r
	})

哦,糟了...

是的,你也发现了吗?我们在Flux任务中存储了一个硬编码的秘密。糟糕!

幸运的是,InfluxDB 2提供了一个秘密API。遗憾的是,目前还没有闪亮的GUI;但我们可以使用原始API和curl 来使用它。

让我们用curl将我们的Slack webhook端点添加到我们的组织中

curl -XPATCH \
  https://us-west-2-1.aws.cloud2.influxdata.com/api/v2/orgs/${ORG_ID}/secrets \
  -H 'authorization: Token ${TOKEN}' \
  -H 'Content-type: application/json' \
  --data '{"slackWebhook": "${SLACK_WEBHOOK_ENDPOINT}"}'

你需要

  1. 组织ID(在登录InfluxDB Cloud时从URI获取)
  2. InfluxDB Cloud令牌(从令牌页面获取)
  3. Slack Webhook端点

注意事项

您可以使用curl配合-XPATCH多次添加新的机密。您可以通过GET请求查看您的机密的键,但不能查看值。

curl -XGET \ https://us-west-2-1.aws.cloud2.influxdata.com/api/v2/orgs/${ORG_ID}/secrets \ -H 'authorization: Token ${TOKEN}' \ -H 'Content-type: application/json'

清理工作

现在我们通过API创建了一个机密,让我们将其带入我们的Flux任务中清理代码。以下是清理后的完整代码。

import "http"
import "json"
import "influxdata/influxdb/secrets"

webhookUri = secrets.get(key: "slackWebhook")

from(bucket: "metrics")
	|> range(start: 1w, stop: 1w1h)
	|> filter(fn: (r) =>
		(r._measurement == "meetup-events"))
	|> pivot(rowKey: ["event_id"], columnKey: ["_field"], valueColumn: "_value")
	|> map(fn: (r) => {
		message = {text: "1 Week Warning! ${r.event_group_name} is meeting this time next week in ${r.event_venue_city}, ${r.event_venue_localized_country_name}, and there's ${string(v: r.event_yes_rsvp_count)} amazing people for you to join and learn with. ${r.event_link}"}
		_ = http.post(url: webhookUri, data: json.encode(v: message))

		return r
	})

这是Slack上的输出。做得好,即使我这么说????

最后思考

InfluxDB Cloud结合Flux、任务和机密具有强大的功能。我们希望您喜欢这个教程,并迫不及待地想看到您能构建什么。

祝您玩得开心。