使用InfluxDB云服务获取Meetup通知
作者:David Flanagan / 产品,开发者
2019年9月19日
导航至
那是一个温暖的星期四下午,在苏格兰,Slack的熟悉铃声将我从沉思中唤醒。是汤姆。嗯,他想干什么?
<figcaption> 汤姆提出了一个简单的问题。</figcaption>
任务
任务很简单。汤姆想为我们社区Slack工作空间的每个用户小组发布更新。他希望这些通知在活动前一周和活动前24小时发布。当然,我本可以使用许多Slack库和Python解析Meetup API并在cron上运行它……但这些是事件,对吧?InfluxDB存储事件。所以我开始思考……我们能否使用Telegraf解析Meetup API,将事件存储在InfluxDB中,然后使用Flux任务发送Slack警报?
是的。答案是肯定的。
收集事件
首先,我们需要将Meetup.com上的事件获取到InfluxDB云中。在这里我们非常幸运,因为Meetup.com实际上允许未经身份验证即可合理访问他们的API。Telegraf可以处理HTTP请求的身份验证,但不必担心这个问题真是太好了。
使用Telegraf HTTP输入插件,我们能够通过简单的配置HTTP插件从Meetup.com API获取所有Meetup信息。
[[inputs.http]]
name_override = "meetup-events"
interval = "1h"
urls = [
"https://api.meetup.com/pro/influxdb/events"
]
data_format = "json"
tag_keys = [
"event_id",
"event_venue_city",
"event_venue_localized_country_name",
"chapter_name"
]
json_string_fields = ["event_name", "event_link"]
fieldpass = ["event_duration", "waitlist_count", "yes_rsvp_count"]
json_time_key = "event_time"
json_time_format = "unix_ms"
上述配置下,Telegraf将每小时解析Meetup.com并存储事件到InfluxDB云(输出配置省略)。是的,我们将每小时写入相同的数据点;但我们使用事件开始的时间戳和每个事件(标签的组合)的系列键不会改变——所以我们没问题。
确认数据
默认情况下,当您使用“探索指标”挖掘数据时……它假定您的数据是过去的。想想看!为了确认您的数据存在,您需要更新GUI时间选择器——或者切换到脚本编辑器。我会向您展示如何做这两件事。
GUI
选择日期下拉菜单,选择“自定义时间范围”。您可以选择修改原始日期字符串,或者在月份选择器上快速向前滚动多次。
脚本编辑器
Flux内置了对日期、时间和持续时间的支持。这允许我们编写下面的查询,查找从now()
到4周(4w
)的数据。
很好。
from(bucket: "metrics")
|> range(start: now(), stop: 4w)
发送警报
现在我们已经收集了数据并确认了其可用性。让我们看看如何设置第一个通知:提前一周。
首先,转到任务页面。如果您以前从未去过那里,您将看到一个空页面,就像这样。点击“创建任务”。
任务配置
我们将每小时运行此任务,因此在下面的范围过滤器中需要记住这一点。我们不想为事件发送两次通知,因此需要确保每小时每个查询的时间范围正确。
范围过滤器
我们需要为这个查询一个非常具体的范围。我们只想获取预定在1周和1周加1小时之间开始的事件。这个“加1小时”很重要。因为我们每小时运行一次任务,我们确保只获取1小时窗口的事件;避免重复通知。
from(bucket: "metrics")
|> range(start: 1w, stop: 1w1h)
测量过滤器
现在,我们不想查询我们的桶中的每个测量值。我们只想获取meetup-events
测量值(在Telegraf中配置的名称)。
|> filter(fn: (r) => r._measurement == "meetup-events")
分组和旋转
由于数据在InfluxDB 2中的存储方式,我们需要一些额外步骤来获取您期望的格式。根据我们的当前查询,我们不会得到基于“行”的数据;其中每一行是一个单独的事件。相反,我们将得到需要组装的列式数据。它看起来像这样
哎呀。太可怕了!为了使数据符合要求,我的第一个想法是按event_id
分组。
from(bucket: "metrics")
|> range(start: 1w, stop: 1w1h)
|> filter(fn: (r) => r._measurement == "meetup-events")
|> group(columns: ["event_id"], mode:"by")
嗯。但不幸的是,这并不完全正确。现在我们有一个每个事件一个表,每列一个行。
from(bucket: "metrics")
|> range(start: 1w, stop: 1w1h)
|> filter(fn: (r) => r._measurement == "meetup-events")
|> pivot(rowKey:["event_id"], columnKey: ["_field"], valueColumn: "_value")
旋转将为每个事件返回一个表;每个表都有一个包含所有事件字段的单行。完美。
发送到Slack
Flux有两种与Slack交互的方法。
- HTTP包
- Slack包
HTTP包允许我们向任何端点发送任意HTTP请求。这允许我们使用Slack创建一个入站webhook并将HTTP请求发送到端点。
Slack包允许我们提供旧令牌并将通知直接发送到Slack API。
后者允许您在“发送时间”指定发送者/频道和其他一些细节,而入站webhook则在预设置所有这些细节。
由于Slack的“旧令牌”可能会随意弃用,我们将使用入站webhook和HTTP包来展示本教程。
为了使用HTTP包,我们首先需要导入它。这很简单:import "http"
。
目前HTTP包的API相当简单
import "http"
http.post(
url: "https://127.0.0.1:9999/",
headers: {x:"a", y:"b", z:"c"},
data: bytes(v: "body")
)
因此,基于这个想法,我们可以将我们一直在构建的Flux代码组合在一起,使其变得有用。
注意:我们还在使用json
包来编码HTTP有效负载。
import "http"
import "json"
from(bucket: "metrics")
|> range(start: 1w, stop: 1w1h)
|> filter(fn: (r) =>
(r._measurement == "meetup-events"))
|> pivot(rowKey: ["event_id"], columnKey: ["_field"], valueColumn: "_value")
|> map(fn: (r) => {
message = {text: "1 Week Warning! ${r.event_group_name} is meeting this time next week in ${r.event_venue_city}, ${r.event_venue_localized_country_name}, and there's ${string(v: r.event_yes_rsvp_count)} amazing people for you to join and learn with. ${r.event_link}"}
_ = http.post(url: "https://hooks.slack.com/services/SECRET/SECRET/SECRET", data: json.encode(v: message))
return r
})
哦,糟了...
是的,你也发现了吗?我们在Flux任务中存储了一个硬编码的秘密。糟糕!
幸运的是,InfluxDB 2提供了一个秘密API。遗憾的是,目前还没有闪亮的GUI;但我们可以使用原始API和curl
来使用它。
让我们用curl
将我们的Slack webhook端点添加到我们的组织中
curl -XPATCH \
https://us-west-2-1.aws.cloud2.influxdata.com/api/v2/orgs/${ORG_ID}/secrets \
-H 'authorization: Token ${TOKEN}' \
-H 'Content-type: application/json' \
--data '{"slackWebhook": "${SLACK_WEBHOOK_ENDPOINT}"}'
你需要
- 组织ID(在登录InfluxDB Cloud时从URI获取)
- InfluxDB Cloud令牌(从令牌页面获取)
- Slack Webhook端点
注意事项
您可以使用curl
配合-XPATCH
多次添加新的机密。您可以通过GET
请求查看您的机密的键,但不能查看值。
curl -XGET \ https://us-west-2-1.aws.cloud2.influxdata.com/api/v2/orgs/${ORG_ID}/secrets \ -H 'authorization: Token ${TOKEN}' \ -H 'Content-type: application/json'
清理工作
现在我们通过API创建了一个机密,让我们将其带入我们的Flux任务中清理代码。以下是清理后的完整代码。
import "http"
import "json"
import "influxdata/influxdb/secrets"
webhookUri = secrets.get(key: "slackWebhook")
from(bucket: "metrics")
|> range(start: 1w, stop: 1w1h)
|> filter(fn: (r) =>
(r._measurement == "meetup-events"))
|> pivot(rowKey: ["event_id"], columnKey: ["_field"], valueColumn: "_value")
|> map(fn: (r) => {
message = {text: "1 Week Warning! ${r.event_group_name} is meeting this time next week in ${r.event_venue_city}, ${r.event_venue_localized_country_name}, and there's ${string(v: r.event_yes_rsvp_count)} amazing people for you to join and learn with. ${r.event_link}"}
_ = http.post(url: webhookUri, data: json.encode(v: message))
return r
})
最后思考
InfluxDB Cloud结合Flux、任务和机密具有强大的功能。我们希望您喜欢这个教程,并迫不及待地想看到您能构建什么。
祝您玩得开心。