快速入门:Telegraf 的 Starlark Processor 插件

导航至

本文由 Phill West 博士撰写。向下滚动查看作者的照片和简介。

在支付抵押贷款之后,能源成本通常是最大的家庭支出。就我而言,安装太阳能电池板是一个简单的决定,但我希望使用历史数据进行深入的分析。部署监控传感器很简单;收集和处理原始数据成为主要挑战。

Telegraf 和 InfluxDB 是管理时间序列数据的理想选择。尽管我之前没有任何经验,但Telegraf 的 Docker 实例很快就启动了。随着数据收集工作的进行,重点转移到了处理上。

本文介绍了如何构建自定义(Starlark Processor)插件,该插件将电表读数转换为更易于分析和可视化的内容。

什么是 Starlark?

尽管有大量的Telegraf 插件,但在某些情况下,收集的数据需要自定义处理。Starlark 插件是 Telegraf 的编程语言接口,允许对传入的数据流进行高级数学和数据操作。Telegraf 提供了处理器和聚合器版本,在如何以及在何处应用转换方面提供了相当大的灵活性。

Starlark 受到 Python 的强烈影响,几乎是该语言的子集。Starlark 旨在表达配置。它的程序是短暂的,没有外部副作用。

Starlark Processor 用例

在这个太阳能监控示例中,电表跟踪太阳能电池板的总产量和站点总消耗量,以及大量的健康/状态信息。Telegraf 通过 [[inputs.http]] 插件从这个端点获取数据,该插件从 JSON 数组中解析生产和消耗电表读数。每个提取的指标都具有相同的度量名称“site-foo”。


[[inputs.http]]
  urls = [ "energy monitoring URL" ]
  data_format = "json_v2"

  [[inputs.http.json_v2]]
    measurement_name = "site-foo"

  # Parse Production & Consumption Data
    [[inputs.http.json_v2.object]]
      path = "location in JSON array"
      included_keys = ["production", "consumption"]
      tags = ["subtype"]
      [inputs.http.json_v2.object.renames]
        subtype = "power meter"

电表记录千瓦时 (KWh) 的累积值。为了确定此时此刻的功率水平,必须计算当前测量值和上一个测量值之间的差值。但是先前的电表读数已经通过了这个插件,不再可用。幸运的是,Starlark Processor 插件可以轻松地执行此任务。

[[processors.starlark]]

  ## Source of the Starlark script.
  source = '''
state = {
  "last_prod": None,
  "last_cons": None
}

def apply(metric):
  for k, v in metric.tags.items():
    if k == "power meter":
      if v == "Cumulative Production":
        # Load from the shared state the metric assigned to the key "last_prod"
        S = state["last_prod"]
        # Using deepcopy, store the current metric into the shared state and assign it to the key "last_prod"
        state["last_prod"] = deepcopy(metric)
        metric.tags["power meter"] = "production"
        # Compute and add field for delta power
        if S != None:
          metric.fields["delta_power"] = metric.fields["total power"] - S.fields["total power"]

      elif v == "Cumulative Consumption":
        S = state["last_cons"]
        state["last_cons"] = deepcopy(metric)
        metric.tags["power meter"] = "consumption"
        # Compute and add fields for delta powers
        if S != None:
          metric.fields["delta_power"] = metric.fields["total power"] - S.fields["total power"]

      else:
        # ERROR - This condition should never occur
        print ("ERROR: Unrecognized value for Power Meter")

  return metric
'''

处理器通过使用 deepcopy 函数存储先前的测量值来“记住”它。一旦收到并解析了下一个测量值,它的下一站就是 Starlark 处理器。计算差值并将其作为名为“delta power”的新字段应用于当前测量值。原始数据未更改,但生成的测量值现在包含一个新字段,该字段是此时间戳的“瞬时”功率。Deepcopy 将当前测量值设置为新的先前测量值,并且该过程重复。

Telegraf 与 Starlark 的接口是独一无二的,因为它公开了其他插件中不可用的附加功能。由于 Starlark 是一种语言,因此可以进行复杂的计算、逻辑和/或数据操作。测量值可以在流经 Telegraf 的插件链时进行精确的手术式修改。

语法

  • 缩进非常重要,并且可能很棘手。特别是,从 Telegraf TOML 到 Starlark 代码的转换是向外缩进的 - 与正常的缩进约定相反。从那时起,Starlark 语言语法控制缩进要求。

  • 用于 Starlark 的 VS Code 扩展程序对于语法检查和调试很有帮助。

  • Starlark 源代码用 3 个单引号 (ASCII 0x27) 括起来。尽管在 Starlark 代码主体中允许使用单引号字符串,但首选双引号字符串以提高可读性并避免与括起整个代码块的单引号分隔符混淆。

  ## Source of the Starlark script.
  source = '''
state = {

Starlark 源代码最初是向外缩进的。

结论

这个项目的实现比预期的要快,这很大程度上归功于可用的丰富资源。Starlark 语言规范对于掌握基本语言概念、语法和示例用法至关重要。Starlark 处理器聚合器 GitHub 文档包含几个有用的示例。在这个 视频 - 如何在 Telegraf 中使用 Starlark 中,Samantha Wang 演示了几个从 GitHub 存储库中提取的示例。社区论坛是获得针对特定问题的个性化帮助的绝佳场所。

关于作者

author

Phill West 博士是一位退休的航空航天经理。他的论文重点是控制系统的计算机辅助设计,这是 1980 年代新兴的技术。他对离网生活有着浓厚的兴趣,现在他追求减少环境影响和促进可持续未来的技术。