从 InfluxDB 开始 - 新工作,新通勤

导航至

我于 2019 年 9 月中旬作为产品经理加入 InfluxData。任何新工作的一部分都是熟悉新的通勤方式。InfluxData 的旧金山总部位于城市中非常繁忙的地区,对于像我这样住在海湾对岸奥克兰的人来说,有很多不同的通勤方式。我想测试一下湾区的共享单车项目 Bay Wheels (Lyft 旗下),因为我可以在旧金山和奥克兰使用自行车。

检索 Bay Wheels 系统数据

Bay Wheels(Lyft 旗下)向公众共享其实时自行车站点数据。我特别使用了站点状态信息站点元数据作为我的数据集。需要注意的是,如链接的 JSON 文件末尾所示,数据每 5 分钟报告一次 (ttl = 300s)。

配置 Telegraf HTTP 输入插件

我的第一个 InfluxDB 设置是通过遵循《入门指南》(感谢文档团队!)在 InfluxDB 2.0 alpha 实例上拉取本地机器指标。为了将 Bay Wheels 系统数据读入 Telegraf,我使用了Telegraf HTTP 输入插件。我从历史上来说不是开发者,因此 Telegraf 输入插件的简单性使其非常易于使用并开始收集数据。我主要添加的是 “interval = 300s”,以便按照 Bay Wheels JSON 文件中指示的每 5 分钟读取一次数据。

[[inputs.http]]
  interval = "300s"
  ## One or more URLs from which to read formatted metrics
  urls = [
    "https://gbfs.baywheels.com/gbfs/en/station_information.json"
  ]
  name_override = "baywheels_meta"
  tagexclude = ["url"]
  ## HTTP method
  method = "GET"
  ## Optional HTTP headers
  headers = {"User-Agent" = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.131 Safari/537.36"}
  ## Amount of time allowed to complete the HTTP request
  timeout = "5s"
  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "json"
  ## Query is a GJSON path that specifies a specific chunk of JSON to be
  ## parsed, if not specified the whole document will be parsed.
  ##
  ## GJSON query paths are described here:
  ##   https://github.com/tidwall/gjson#path-syntax
  json_query = "data.stations"
  ## Tag keys is an array of keys that should be added as tags.
  tag_keys = [
    "short_name",
    "station_id",
    "name",
    "region_id",
    "lat",
    "lon"
  ]
  ## String fields is an array of keys that should be added as string fields.
  json_string_fields = ["rental_url","external_id"]
  ## Name key is the key to use as the measurement name.
  json_name_key = ""
  ## Time key is the key containing the time that should be used to create the
  ## metric.
  json_time_key = ""
  ## Time format is the time layout that should be used to interprete the json_time_key.
  ## The time must be `unix`, `unix_ms`, `unix_us`, `unix_ns`, or a time in the
  ## "reference time".  To define a different format, arrange the values from
  ## the "reference time" in the example to match the format you will be
  ## using.  For more information on the "reference time", visit
  ## https://golang.ac.cn/pkg/time/#Time.Format
  ##   ex: json_time_format = "Mon Jan 2 15:04:05 -0700 MST 2006"
  ##       json_time_format = "2006-01-02T15:04:05Z07:00"
  ##       json_time_format = "01/02/2006 15:04:05"
  ##       json_time_format = "unix"
  ##       json_time_format = "unix_ms"
  json_time_format = ""
  ## Timezone allows you to provide an override for timestamps that
  ## don't already include an offset
  ## e.g. 04/06/2016 12:41:45
  ##
  ## Default: "" which renders UTC
  ## Options are as follows:
  ##   1. Local               -- interpret based on machine localtime
  ##   2. "America/New_York"  -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
  ##   3. UTC                 -- or blank/unspecified, will return timestamp in UTC
  json_timezone = ""

使用任务和 Flux 进行数据增强

我理想情况下希望使用实际站点名称绘制站点信息图表,并按区域和城市进行筛选/查询。为了做到这一点,我需要通过连接我在 InfluxDB 中创建的两个测量(来自站点状态和元数据)来增强数据。借助 Flux,我还可以计算新的使用率指标,以查看站点中当前正在使用的自行车百分比。通过数据浏览器创建一个任务,我能够通过我的 Flux 查询快速启动并运行一个任务。任务是定期运行的 Flux 脚本,用于修改或分析数据流并将其输出到新的数据源——在我的例子中,是从我的两个 Bay Wheels 表格中获取报告的数据,并将它们每分钟连接到一个新的 “baywheels_enriched” 源。

可以使用 Flux 通过数据浏览器轻松创建 InfluxDB 任务

“baywheels_enriched” 任务的 Flux 查询

import "math"
metadata = from(bucket: "telegraf")
    |> range(start: -10m)
    |> filter(fn: (r) =>
        (r._measurement == "baywheels_meta"))
    |> last()
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> group()
    |> unique(column: "station_id")
live_data = from(bucket: "telegraf")
    |> range(start: -10m)
    |> filter(fn: (r) =>
        (r._measurement == "baywheels"))
    |> filter(fn: (r) =>
        (r._field == "num_bikes_available"))
    |> last()
join(tables: {live: live_data, meta: metadata}, on: ["station_id"], method: "inner")
    |> drop(columns: ["host_meta", "_time_meta", "_stop_meta", "_start_meta", "_stop_live", "_start_live", "_measurement_meta", "host_live"])
    |> rename(columns: {_measurement_live: "_measurement", _time_live: "_time"})
    |> set(key: "_measurement", value: "baywheels_enriched")
    |> map(fn: (r) =>
        ({r with used_perc: math.round(x: (1.0 - r._value / r.capacity) * 100.0)}))

现在有了我的新 “baywheels_enriched” 表格,我可以根据 Bay Wheels 站点元数据进行筛选和查询。我可以直接按站点名称、位置搜索,或者在下面的示例中选择湾区特定区域中的所有站点。

在我的可视化中,我现在可以看到来自站点状态和元数据表格的所有站点信息。

设置警报和监控

我最初将自行车信息导入 InfluxDB 的想法是设置一个警报系统,当我要经常取车的站点自行车数量不足时发出警报。我将 Slack 设置为通知的端点,这样每当这些站点之一自行车数量不足或没有自行车时,我都会收到 Slack 消息。我可以为 critical、warning、info 和 okay 设置阈值,并确定何时收到消息。Katy 的这篇很棒的博客逐步展示了如何在 Cloud 2.0 中设置检查和警报。我选择在任何这些检查时都收到 Slack 消息(下面的示例显示了 19th Street 站点的良好水平,大约在下午 5 点)。

因此,下次 19th Street BART 站点的自行车数量不足时,我就知道要尽快冲出办公室去抢最后一辆(或者知道去另一个站点...):P

如果您想知道我为什么加入 InfluxData...

在加入之前,我的大部分职业生涯都在主要公司的电信和移动分析领域度过。InfluxData 因其在时序数据库和开源领域的领先地位而真正吸引了我。我知道这个角色在技术上将是一个巨大的挑战,也是一个我可以作为产品经理快速成长的地方。InfluxData 也是我第一次在员工人数少于 40,000 人的公司工作(InfluxData 仅占 0.5%!),所以我知道这将提供一个令人兴奋的环境。