快速入门:Telegraf 的 Starlark 处理器插件

导航至

本文由 Dr. Phill West 撰写。向下滚动查看作者的图片和简介。

在支付了抵押贷款后,能源成本通常是家庭最大的开支。在我的情况下,安装太阳能电池板是一个简单的决定,但我想要使用历史数据进行深入分析。部署监控传感器很简单;收集和处理原始数据成为了主要挑战。

Telegraf 和 InfluxDB 是管理 时序数据 的理想选择。尽管我之前没有经验,但 Telegraf 的 Docker 实例 很快就部署完成了。数据收集工作完成后,重点转向了数据处理。

本文描述了构建一个自定义(Starlark 处理器)插件,该插件可以将电力表读数转换为更适合分析和可视化的形式。

什么是 Starlark?

虽然 Telegraf 有大量 插件,但在某些情况下,收集到的数据需要定制处理。《Starlark 插件》是 Telegraf 的编程语言接口,它允许对传入的数据流进行高级数学和数据操作。Telegraf 提供了处理器和聚合器版本,提供了相当大的灵活性,可以决定如何以及在何处应用转换。

Starlark 强烈受到 Python 的影响,几乎是该语言的子集。Starlark 用于表达配置。其程序是短暂的,没有外部副作用。

Starlark 处理器用例

在这个太阳能监测示例中,电力表跟踪总太阳能电池板产量和总站点消耗,以及大量健康/状态信息。Telegraf 通过 [[inputs.http]] 插件从该端点获取数据,该插件从 JSON 数组中解析产量和消耗表读数。每个提取的指标具有相同的测量名称,“site-foo”。


[[inputs.http]]
  urls = [ "energy monitoring URL" ]
  data_format = "json_v2"

  [[inputs.http.json_v2]]
    measurement_name = "site-foo"

  # Parse Production & Consumption Data
    [[inputs.http.json_v2.object]]
      path = "location in JSON array"
      included_keys = ["production", "consumption"]
      tags = ["subtype"]
      [inputs.http.json_v2.object.renames]
        subtype = "power meter"

功率计以千瓦时(KWh)为单位记录累积值。为了确定此刻的功率水平,必须计算当前测量值与前一次测量值之间的差异。但是,之前的读数已经通过此插件处理,并且不再可用。幸运的是,Starlark处理器插件可以轻松完成这项任务。

[[processors.starlark]]

  ## Source of the Starlark script.
  source = '''
state = {
  "last_prod": None,
  "last_cons": None
}

def apply(metric):
  for k, v in metric.tags.items():
    if k == "power meter":
      if v == "Cumulative Production":
        # Load from the shared state the metric assigned to the key "last_prod"
        S = state["last_prod"]
        # Using deepcopy, store the current metric into the shared state and assign it to the key "last_prod"
        state["last_prod"] = deepcopy(metric)
        metric.tags["power meter"] = "production"
        # Compute and add field for delta power
        if S != None:
          metric.fields["delta_power"] = metric.fields["total power"] - S.fields["total power"]

      elif v == "Cumulative Consumption":
        S = state["last_cons"]
        state["last_cons"] = deepcopy(metric)
        metric.tags["power meter"] = "consumption"
        # Compute and add fields for delta powers
        if S != None:
          metric.fields["delta_power"] = metric.fields["total power"] - S.fields["total power"]

      else:
        # ERROR - This condition should never occur
        print ("ERROR: Unrecognized value for Power Meter")

  return metric
'''

处理器通过使用deepcopy函数来存储它“记住”之前的测量值。一旦接收到并解析了下一个测量值,它的下一个目的地就是Starlark处理器。计算差异并将其作为名为“delta power”的新字段应用于当前测量值。原始数据未更改,但现在的测量结果包含了一个新的字段,即该时间戳的“瞬时”功率。Deepcopy将当前测量值设为新的前一个测量值,然后重复此过程。

Telegraf与Starlark的接口独特,因为它暴露了其他插件中不可用的额外功能。由于Starlark是一种语言,因此可以进行复杂的计算、逻辑和/或数据处理。测量值可以在通过Telegraf插件链的过程中进行精确修改。

语法

  • 缩进非常重要,有时可能很棘手。特别是,从Telegraf TOML到Starlark代码的缩进转换——与常规缩进约定相反。从那个点起,Starlark语言语法控制缩进要求。

  • VS Code扩展程序对于语法检查和调试Starlark很有帮助。

  • Starlark源代码由3个单引号(ASCII 0x27)包围。尽管在Starlark代码体内部允许使用单引号字符串,但为了提高可读性和避免与包围整个代码块的单引号分隔符混淆,首选使用双引号字符串。

  ## Source of the Starlark script.
  source = '''
state = {

Starlark源代码最初是缩进的。

结论

由于可用的资源丰富,这个项目在预期时间内实现。Starlark语言规范至关重要,有助于掌握基本语言概念、语法和示例用法。Starlark处理器和聚合器GitHub文档中包含许多有用的示例。在此视频 - 如何在Telegraf中使用Starlark中,Samantha Wang展示了从GitHub存储库中取出的几个示例。社区论坛是获得针对特定问题的个性化帮助的好地方。

关于作者

author

Phill West博士是一位退休的航空航天经理。他的论文专注于控制系统的计算机辅助设计,这是20世纪80年代的一项新兴技术。他对离网生活有深厚的兴趣,现在致力于研究减少环境影响并促进可持续未来的技术。