从 InfluxDB 开始 - 新工作,新通勤

导航至

我在 2019 年 9 月中旬加入 InfluxData,担任产品经理。任何新工作的一个部分就是适应新的通勤方式。InfluxData 的旧金山总部位于城市中一个相当繁忙的地区,像我这样住在湾区另一边的奥克兰的人有很多不同的通勤方式。我想尝试一下湾区共享单车项目 Bay Wheels(Lyft 所有),因为我在旧金山和奥克兰都可以使用自行车。

检索 Bay Wheels 系统数据

Bay Wheels(Lyft 所有)向公众共享其实时自行车站数据。我特别使用了 站点状态信息和 站点元数据作为我的数据集。需要注意的是,数据每 5 分钟报告一次(ttl = 300s),如链接 JSON 文件末尾所示。

配置 Telegraf HTTP 输入插件

我的第一个 InfluxDB 设置是通过遵循 入门指南(感谢文档团队!)来收集 InfluxDB 2.0 预览版实例上的本地机器指标。为了将 Bay Wheels 系统数据读入 Telegraf,我使用了 Telegraf HTTP 输入插件。我以前不是一个开发者,所以 Telegraf 输入插件的简单性使得使用它变得非常简单,并开始收集数据。我主要的添加是“interval = 300s”,以便每 5 分钟读取一次 Bay Wheels JSON 文件中指示的数据。

[[inputs.http]]
  interval = "300s"
  ## One or more URLs from which to read formatted metrics
  urls = [
    "https://gbfs.baywheels.com/gbfs/en/station_information.json"
  ]
  name_override = "baywheels_meta"
  tagexclude = ["url"]
  ## HTTP method
  method = "GET"
  ## Optional HTTP headers
  headers = {"User-Agent" = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.131 Safari/537.36"}
  ## Amount of time allowed to complete the HTTP request
  timeout = "5s"
  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "json"
  ## Query is a GJSON path that specifies a specific chunk of JSON to be
  ## parsed, if not specified the whole document will be parsed.
  ##
  ## GJSON query paths are described here:
  ##   https://github.com/tidwall/gjson#path-syntax
  json_query = "data.stations"
  ## Tag keys is an array of keys that should be added as tags.
  tag_keys = [
    "short_name",
    "station_id",
    "name",
    "region_id",
    "lat",
    "lon"
  ]
  ## String fields is an array of keys that should be added as string fields.
  json_string_fields = ["rental_url","external_id"]
  ## Name key is the key to use as the measurement name.
  json_name_key = ""
  ## Time key is the key containing the time that should be used to create the
  ## metric.
  json_time_key = ""
  ## Time format is the time layout that should be used to interprete the json_time_key.
  ## The time must be `unix`, `unix_ms`, `unix_us`, `unix_ns`, or a time in the
  ## "reference time".  To define a different format, arrange the values from
  ## the "reference time" in the example to match the format you will be
  ## using.  For more information on the "reference time", visit
  ## https://golang.ac.cn/pkg/time/#Time.Format
  ##   ex: json_time_format = "Mon Jan 2 15:04:05 -0700 MST 2006"
  ##       json_time_format = "2006-01-02T15:04:05Z07:00"
  ##       json_time_format = "01/02/2006 15:04:05"
  ##       json_time_format = "unix"
  ##       json_time_format = "unix_ms"
  json_time_format = ""
  ## Timezone allows you to provide an override for timestamps that
  ## don't already include an offset
  ## e.g. 04/06/2016 12:41:45
  ##
  ## Default: "" which renders UTC
  ## Options are as follows:
  ##   1. Local               -- interpret based on machine localtime
  ##   2. "America/New_York"  -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
  ##   3. UTC                 -- or blank/unspecified, will return timestamp in UTC
  json_timezone = ""

使用任务与 Flux 进行数据丰富

我理想地希望用实际的站点名称绘制站点信息图表,并按地区和城市进行过滤/查询。为了做到这一点,我需要通过连接我在 InfluxDB 中创建的两个度量(从站点状态和元数据)来丰富数据。使用 Flux,我还可以计算一个新的使用度量,以查看站点中当前有多少比例的自行车在使用。通过通过数据探索器创建任务,我能够非常快速地通过 Flux 查询启动一个任务。任务是定期运行的 Flux 脚本,用于修改或分析数据流并将其输出到新的数据源——在我的情况下,每分钟将报告的数据从我的两个 Bay Wheels 表连接到新的“baywheels_enriched”源。

使用Flux和数据探索器,可以轻松创建InfluxDB任务。

“baywheels_enriched”任务的Flux查询

import "math"
metadata = from(bucket: "telegraf")
    |> range(start: -10m)
    |> filter(fn: (r) =>
        (r._measurement == "baywheels_meta"))
    |> last()
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> group()
    |> unique(column: "station_id")
live_data = from(bucket: "telegraf")
    |> range(start: -10m)
    |> filter(fn: (r) =>
        (r._measurement == "baywheels"))
    |> filter(fn: (r) =>
        (r._field == "num_bikes_available"))
    |> last()
join(tables: {live: live_data, meta: metadata}, on: ["station_id"], method: "inner")
    |> drop(columns: ["host_meta", "_time_meta", "_stop_meta", "_start_meta", "_stop_live", "_start_live", "_measurement_meta", "host_live"])
    |> rename(columns: {_measurement_live: "_measurement", _time_live: "_time"})
    |> set(key: "_measurement", value: "baywheels_enriched")
    |> map(fn: (r) =>
        ({r with used_perc: math.round(x: (1.0 - r._value / r.capacity) * 100.0)}))

现在有了我的新“baywheels_enriched”表,我可以基于Bay Wheels站点的元数据进行过滤和查询。我可以直接通过站点名称、位置进行搜索,或者在下面的示例中,选择旧金山湾区的特定区域内的所有站点。

在我的可视化中,我现在可以看到来自站点状态和元数据表的所有站点信息。

设置警报和监控

我最初的想法是将自行车信息导入InfluxDB,以便在经常获取自行车的站点自行车数量不足时设置警报系统。我将Slack设置为通知的端点,这样每当这些站点中的任何一个站点自行车数量不足时,我都会收到Slack消息。我可以为关键、警告、信息和正常设置阈值,并确定何时接收消息。以下是由Katy撰写的一篇优秀的博客,展示了如何在云2.0中逐步设置检查和警报。我选择在任何这些检查中接收Slack消息(以下示例显示19街站大约在下午5点时处于良好水平)。

所以下次19街BART站点自行车数量不足时,我会知道尽快走出办公室去抢最后一辆自行车(或者知道去其他站点……) :P

你可能会想知道我为什么加入InfluxData……

在我加入之前,我在大型企业中从事了大部分的电信和移动分析工作。我对InfluxData非常感兴趣,因为他们引领了时间序列数据库和开源领域。我知道这个职位在技术上是一个巨大的挑战,并且作为一个产品经理,我可以快速成长。InfluxData也是我第一次为员工人数少于40,000人的公司工作(InfluxData仅占0.5%)!所以我知道这将提供一种令人兴奋的环境。