JSON 与 Telegraf 和 Starlark 结合使用 InfluxDB

导航至

数据平台——或具有 API 集以便灵活处理数据的数据库——对于那些严重依赖于能够随着时间的推移更改获取数据和处理数据方式的人来说,是典型的支柱。一个好的数据平台将为您提供必要的工具,以收集您需要的见解来解决实际问题。该平台还应该希望让您在执行此操作时不会感到糟糕!

在这篇博文中,我将讨论 JSON 上下文中的数据(指标)采集,并使用 Telegraf 来完成它,方法是将 JSON 转换为 行协议

JSON 数据格式在指标领域中无处不在,这使其值得至少撰写一篇博文。如果您有 JSON 格式的时序数据,并希望将其放入一个可以理解所有数据的平台中,那么这篇文章就是为您而写的……所以让我们开始吧。

[剧透警告] 我们将把这个 JSON

{
   "stats": [{
       "fields": {
           "count": "1"
       },
       "tags": [{
           "field": "group",
           "value": "Engineering"
       }, {
           "field": "state",
           "value": "New"
       }]
   }, {
       "fields": {
           "count": "34"
       },
       "tags": [{
           "field": "group",
           "value": "Engineering"
       }, {
           "field": "state",
           "value": "In Progress"
       }]
   }, {
       "fields": {
           "count": "1"
       },
       "tags": [{
           "field": "group",
           "value": "Engineering"
       }, {
           "field": "state",
           "value": "On Hold"
       }]
   }, {
       "fields": {
           "count": "95"
       },
       "tags": [{
           "field": "group",
           "value": "Engineering"
       }, {
           "field": "state",
           "value": "Closed"
       }]
   }, {
       "fields": {
           "count": "1"
       },
       "tags": [{
           "field": "group",
           "value": "Engineering"
       }, {
           "field": "state",
           "value": "Canceled"
       }]
   }]
}

……变成这个行协议

stats,group=engineering,state=new count=1i 1614800220766085000
stats,group=engineering,state=in_progress count=34i 1614800220766115000
stats,group=engineering,state=on_hold count=1i 1614800220766121000
stats,group=engineering,state=closed count=95i 1614800220766126000
stats,group=engineering,state=canceled count=1i 1614800220766130000

注意:有关 Telegraf 的一些背景信息及其带来的价值可以在以下资源中找到

Telegraf 有许多不同的输入插件,可让您收集 JSON 数据,包括

一旦您使用上述一个或多个插件收集了 JSON 数据,Telegraf 就有两个插件可让您转换它

这篇文章是关于选项 #2,Starlark 处理器插件,但 JSON 解析器的要点是

  • 它对于简单(通常是“扁平”)JSON 非常有用。
  • 它采用参数来确定哪些键将成为度量、标签、字段和您的时间戳。
  • 它实现了 GJSON,用于深入研究您需要的 JSON 部分。
  • 以上都不是对更复杂的(嵌套)JSON 有用。

现在开始介绍 Starlark。Starlark(规范 在此)是 Python 的一种方言。它不是 Python,但看起来并且在很大程度上像 Python 一样运行。它在 Telegraf 中实现为嵌入式脚本语言。查看一些预先编写的示例此处

在 Telegraf 中嵌入一种像 Python 这样普及的语言的方言,降低了入门门槛。在 Telegraf 中使用 Starlark 与单独的 Python 脚本/进程相比,用户可以访问

  • 内置于 Telegraf 的客户端行为框架
  • 对其他 Telegraf 插件的本地访问
  • 访问 Telegraf 的 Metric 对象
    • 无需解析 行协议
    • 无需生成自己的行协议

以下是单个 Telegraf 实例的假设拓扑,该实例配置了 Starlark 处理器来解析来自两个不同来源的 JSON

Hypothetical topology of a single Telegraf instance configured with a Starlark processor to parse JSON from two different sources

此拓扑使用以下软件组件

现在让我们深入了解一下配置的样子

您可能已经注意到上图中的“值解析器”进程。这是额外的配置位,用于避免对通过您选择的输入/输出馈送的 JSON 进行任何默认解析。它只是将整个未触及的 JSON“blob”放入名为 value 的字段中。这个 value 字段将包含 Starlark 需要的所有信息……我们最终将删除最初包含它的 Telegraf 度量。

Telegraf 中 HTTP 输入插件和值解析器的配置

[[inputs.http]]
   urls = ["<URL>"]
   data_format = "value"
   data_type = "string"

前两行启用 HTTP 插件并设置要抓取的端点。接下来的(最后)两行调用值解析器并告诉它将整个 blob 作为字符串传递。我们就完成了。

Starlark 处理器的配置

[[processors.starlark]]
    namepass = ["file"]
    script = "/path/to/<script>.star"

就是这样。完整的 Telegraf 配置示例此处

在下面的视频中,我演示了编写 Starlark 代码来解析本文档前面部分的 JSON 的示例,只是这次 JSON 来自文件(而不是 HTTP 端点)。以下是 <script>.star 中的最终代码

load("json.star", "json")
 
def apply(metric):
   j = json.decode(metric.fields.get("value"))
   metrics = []
   for group in j["stats"]:  # Array of JSON elements to become Metrics
       new_metric = Metric("stats")
       new_metric.fields["count"] = int(group["fields"]["count"]) # Set "count" Field
	# Set `group` and `state` Tags
       new_metric.tags["group"] = group["tags"][0]["value"].lower().replace(" ", "_")
       new_metric.tags["state"] = group["tags"][1]["value"].lower().replace(" ", "_")
       metrics.append(new_metric)
  
   return metrics

再次,最终输出

stats,group=engineering,state=new count=1i 1614800220766085000
stats,group=engineering,state=in_progress count=34i 1614800220766115000
stats,group=engineering,state=on_hold count=1i 1614800220766121000
stats,group=engineering,state=closed count=95i 1614800220766126000
stats,group=engineering,state=canceled count=1i 1614800220766130000

让我们分解上面显示的行协议

  • 度量名称是 stats
  • 有两个标签:groupstateGroup 现在是小写——只是最佳实践)
  • group 标签的值可以是 engineeringEngineering 现在是小写)
  • state 标签的值可以是 newin_progresscanceledon_holdclosed(现在是小写,空格替换为下划线)
  • 有一个字段 count,它是一个整数
  • 行协议中每行末尾的时间戳采用 Unix 时间格式(自 1970 年 1 月 1 日以来的秒数)
    • 请注意,我们可以传递 JSON (事件时间) 提供的时间戳。如果不存在,则 Telegraf 将提供一个时间戳 (处理时间)

就这样——Telegraf 使用 Starlark 将 JSON 转换为行协议。如果您想查看更多关于如何使用 Telegraf Starklark 插件的代码示例,请查看这篇博文

而且,如果您有时序数据以 JSON 格式公开/发出,并且想要将其转换为有用的图形和警报,请注册一个免费 InfluxDB Cloud 帐户开始使用。注册后,请随时在乐于助人的 InfluxDB 社区社区 Slack 频道中提出任何问题。尽情享受吧!