如何将 Starlark 与 Telegraf 结合使用

导航至

我们的 Telegraf Starlark 处理器插件Telegraf 1.15 中令人兴奋的新处理器,它使您能够灵活地使用 Starlark 语言 在 Telegraf 中执行各种操作。

starlark telegraf logos

您可能会问,什么是 Starlark?Starlark(以前称为 Skylark)是一种旨在用作配置语言的语言。Starlark 是 Python 的一种方言。与 Python 类似,它是一种动态类型语言,具有高级数据类型、具有词法作用域的一流函数和垃圾回收。独立的 Starlark 线程并行执行,因此 Starlark 工作负载在并行机器上可以很好地扩展。Starlark 是一种小型且简单的语言,具有熟悉且高度可读的语法。

Telegraf Starlark 处理器为每个匹配的指标调用一个 Starlark 函数,从而允许自定义的程序化指标处理。由于 Starlark 语言是 Python 的一种方言,因此对于那些有 Python 语言经验的人来说,它会很熟悉。但是,两者之间存在重大差异,现有的 Python 代码不太可能未经修改即可工作。此外,执行环境是沙箱环境,无法访问文件系统、网络、系统资源。

即使存在这些限制,Starlark 处理器仍具有以下功能

  • 数学运算
  • 字符串运算
  • 重命名标签
  • 逻辑运算

使用 Starlark 处理器,您可以在将数据发送到 InfluxDB 之前,在 Telegraf 中对数据执行实时聚合(最小值最大值)。这有助于通过将处理分配到运行 Telegraf 的服务器来减少 InfluxDB 实例上的负载。

由于我们收到了很多关于 Telegraf 中数学功能的需求,我们很高兴地报告这个插件让您能够执行算术函数。一位社区成员提出了一个有用的物联网示例,他希望根据从 Telegraf Modbus 输入插件 读取的电流和电压字段在本地计算功率。以下是处理器中执行此操作的 Starlark 函数

[[processors.starlark]]
 # Reads the Starlark script embedded
 source = '''
def apply(metric):
 # Get the field called current and put into variable I
 I = metric.fields['current']
 # Get the field called voltage and put into variable V
 V = metric.fields['voltage']
 # Create a new field, power, which is I times V
 metric.fields['power'] = I * V
 # Return power as part of your Telegraf metrics stream
 return metric
'''

您也可以将上述内容放入一个文件中,为其指定“.star”文件扩展名,并从您的 Telegraf 配置文件中调用它,如下所示

[[processors.starlark]]
 # File containing a Starlark script.
 script = "/usr/local/bin/myscript.star"

如何在 Telegraf 数据上运行多个 Starlark 函数

每个处理器只能运行一个 Starlark 源或脚本。因此,如果您想将多个 Starlark 函数包含到您的数据中,只需确保您使用多个处理器并包含 namepass = ["measurement_name"] ,以便所有处理器都对您定位的特定数据集进行更改。以下是一个示例,如下所示

[[inputs.cpu]]
[[processors.starlark]]
namepass = ['cpu']
 # First Starlark script 
 source = '''
def apply(metric)
 return metric
'''
[[processors.starlark]]
namepass = ['cpu']
 # Second Starlark script 
 source = '''
def apply(metric)
 return metric
'''

以下是 Starlark 处理器在操作中的更多示例。

starlark examples

图片来源:Roman Mager via Unsplash

如何在 Telegraf 中计算百分比比率

使用此 Starlark 代码计算两个整数的比率,然后乘以 100 得到百分比。该比率将作为 Telegraf 指标流的一部分返回。

[[processors.starlark]]
 source = '''
# Compute the ratio of two integer fields.
def apply(metric):
 used = float(metric.fields['used'])
 total = float(metric.fields['total'])
 # Create a new field called usage.
 metric.fields['usage'] = (used / total) * 100
 # Return usage as part of your Telegraf metrics stream
 return metric
'''

如何使用 Starlark 在 Telegraf 中缩放指标

一个常见的需求是获取以字节为单位的指标,并将它们缩放到千字节、兆字节、千兆字节或太字节。以下是如何使用 Telegraf Starlark 处理器将字节转换为千字节

[[processors.starlark]]
 source = '''
# Convert bytes to kilobytes
def apply(metric):
	# k stands for key, v for value
	for k, v in metric.fields.items():
 	 if type(v) == "int":
	 # 1000 bytes in a kilobyte
 	 metric.fields[k] = v / 1000
	return metric
'''

以下是如何使用 Telegraf Starlark 处理器将字节转换为兆字节

[[processors.starlark]]
 source = '''
# Convert bytes to megabytes
def apply(metric):
	# k stands for key, v for value
	for k, v in metric.fields.items():
 	 if type(v) == "int":
	 # 1000000 bytes in a megabyte
 	 metric.fields[k] = v / 1000000
	return metric
'''

以下是如何使用 Telegraf Starlark 处理器将字节转换为千兆字节

[[processors.starlark]]
 source = '''
# Convert bytes to gigabytes
def apply(metric):
	# k stands for key, v for value
	for k, v in metric.fields.items():
 	 if type(v) == "int":
	 # 1000000000 bytes in a gigabyte
 	 metric.fields[k] = v / 1000000000
	return metric
'''

(如果您更喜欢使用以 2 为基数的值来转换您的存储,那么您说的是 mebibytesgibibytes 等。本文 解释了它们之间的区别以及如何计算它们。)

最后一个例子:有时缩放指标以确保它们在图表中正确显示很有帮助。以下是如何使用 Telegraf Starlark 处理器将数字放大 10 倍

[[processors.starlark]]
 source = '''
# Multiply any float fields by 10
def apply(metric):
	# k stands for key, v for value
	for k, v in metric.fields.items():
 	 if type(v) == "float":
 	 metric.fields[k] = v * 10
	return metric
'''

使用 Starlark 在 Telegraf 中转换值

假设您需要在 Telegraf 遥测中将一个值转换为另一个值。例如,F5 网络设备将其状态作为整数返回,值为 0 到 5,其中值 1 表示健康,其余值是各种不健康状态的变体。假设我想将其折叠为二元状态——仅为 1 或 0——其中 1 表示健康,0 表示不健康,如下表所示

原始值 新值
0 0
1 1
2 0
3 0
4 0
5 0

请注意,如果值为 0 或 1,我们不需要更改任何内容;我们只需要在值为 2 到 5 时进行更改。以下是我如何使用 Starlark 在 Telegraf 中进行这些更改

[[processors.starlark]]
 namepass = ['measurementname']
 source = '''
def apply(metric):
	# v stands for value
	v = metric.fields.get('status')
	# If no value, do nothing:
	if v == None:
 	return metric
	# When v is 2, 3, 4, or 5:
	if 1 < v and v < 6:
 		metric.fields['status'] = 0
	return metric
'''

如何使用 Flux 计算健康设备百分比

让我们继续这个网络监控示例。在本地执行这种转换简化了上游数据的处理。既然已经发生了这种转换,我们可以执行更简单的数学计算,例如 sum(status)/count(status)*100,以获得健康设备的百分比。

使用 Flux,查询如下所示

// First, create a variable called data
data = from(bucket: "device_data")
 |> range(start: -10m)
 |> filter(fn: (r) => r._measurement == "network_devices" and r._field == "status")

// Create a sum of the data across one minute windows
sum = data
 |> aggregateWindow(every: 1m, fn: sum)
 |> map(fn: (r) => ({ r with _field: "sum" })

// Create a count of the data across one minute windows
count = data
 |> aggregateWindow(every: 1m fn: count)
 |> map(fn: (r) => ({ r with _field: "count" })

// Create a union of the data, using the sum and the counts to calculate percent of healthy devices
union(tables: [sum,count])
 |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
 |> map(fn: (r) => ({ r with
 _field: "percent_healthy",
 _value: (float(v: r.sum) / float(v: r.count)) * 100.0
 }))

在一个过程中,这个 Flux 脚本收集数据,然后使用它来计算总和和计数,同时将这些值存储在单独的变量中。然后通过两个变量的并集组合数据,并计算 percent_healthy 结果,在过去 10 分钟内的 1 分钟间隔内生成单个值。

如何使用 Starlark 在 Telegraf 中重命名标签

如果您需要在 行协议 中重命名标签,以下是如何使用 Starlark

[[processors.starlark]]
 source = '''
# Rename any tags using the mapping in the renames dict.
renames = {
	'lower': 'min',
	'upper': 'max',
}
def apply(metric):
	# k stands for key, v for value
	for k, v in metric.tags.items():
 	if k in renames:
 	metric.tags[renames[k]] = v
 	metric.tags.pop(k)
	return metric
'''

如何将条件逻辑与 Telegraf 和 Starlark 结合使用

Starlark 还支持仅在满足某些逻辑条件时才更改指标。例如,假设您想将任何值在 1 到 6 之间的指标设置为零。以下是如何使用 Telegraf Starlark 处理器执行此操作

[[processors.starlark]]
 source = '''
# Set any 'status' field between 1 and 6 to a value of 0
def apply(metric):
	# v stands for value
	v = metric.fields.get('status')
	if v == None:
	 # If there is no status field
 	 return metric
	if 1 < v and v < 6:
	 # If status is greater than 1 and less than 6, set status to zero
 	 metric.fields['status'] = 0
	# Return your modified metric to the Telegraf metrics stream
	return metric
'''

如何根据数字条件删除指标

以下是如何在指标满足特定数字条件(例如检测到 500 错误)时将其存储在 InfluxDB 中

def apply(metric):
    if metric.fields["value"] <= 10:
      return metric // keep
    else:
      return None // drop

Starlark 和 Telegraf 演示

如果您想观看如何使用 Visual Studio Code 为 Telegraf 编写 Starlark 脚本的演示,请查看此演示视频

结论

如您所见,Starlark 处理器有很多用途。有关更多信息,如果您想了解它如何增强您的数据收集,请查看 示例和用途列表。此外,如果您想了解更多关于 Starlark 的信息,请查看 Starlark 规范,以了解更多关于语法和可用于此处理器的可用函数的详细信息。

有您自己认为其他人会觉得有用的 Starlark 函数吗?请在我们的 公共 InfluxData Slack 频道InfluxDB 社区站点 中分享它们。如果您需要帮助,请随时在那里发布您遇到的任何问题。祝您使用愉快!