如何在 Telegraf 中使用 Starlark
作者:Al Sargent / 用例,产品,开发者
2020年8月4日
导航到
我们的 Telegraf Starlark 处理器插件 是 Telegraf 1.15 中一个令人兴奋的新处理器,它允许您使用 Starlark 语言 在 Telegraf 中执行各种操作。
那么什么是 Starlark 呢?Starlark(之前称为 Skylark)是一种旨在用作配置语言的编程语言。Starlark 是 Python 的一种方言。像 Python 一样,它是一种动态类型语言,具有高级数据类型、具有词法作用域的一等函数以及垃圾回收。独立的 Starlark 线程可以并行执行,因此 Starlark 工作负载在并行机器上扩展良好。Starlark 是一种小巧且简单的语言,具有熟悉且易于阅读的语法。
Telegraf Starlark 处理器为每个匹配的度量调用 Starlark 函数,允许进行定制的程序化度量处理。由于 Starlark 语言是 Python 的一种方言,因此对于有 Python 语言经验的用户来说,它将是熟悉的。但是,它们之间存在重大差异,并且现有的 Python 代码可能无法直接使用。此外,执行环境被沙箱化,无法访问文件系统、网络和系统资源。
即使有这些限制,Starlark 处理器还具有以下功能
- 数学运算
- 字符串操作
- 重命名标签
- 逻辑操作
使用 Starlark 处理器,您可以在将数据发送到 InfluxDB 之前在 Telegraf 中对您的数据进行实时聚合(最小值,最大值)。这有助于通过将处理分配到运行 Telegraf 的服务器来减轻您的 InfluxDB 实例的负载。
由于我们收到了很多在 Telegraf 中进行数学功能请求,我们很高兴地报告说,此插件允许您执行算术函数。社区成员请求了一个有用的物联网示例,他们希望从读取自 Telegraf Modbus 输入插件 的电流和电压字段中本地计算功率。这里是处理器中执行此操作的 Starlark 函数
[[processors.starlark]]
# Reads the Starlark script embedded
source = '''
def apply(metric):
# Get the field called current and put into variable I
I = metric.fields['current']
# Get the field called voltage and put into variable V
V = metric.fields['voltage']
# Create a new field, power, which is I times V
metric.fields['power'] = I * V
# Return power as part of your Telegraf metrics stream
return metric
'''
您还可以将上述内容放入文件中,给它一个“.star”文件扩展名,然后按照以下方式从您的 Telegraf 配置文件中调用它
[[processors.starlark]]
# File containing a Starlark script.
script = "/usr/local/bin/myscript.star"
如何在 Telegraf 数据上运行多个 Starlark 函数
每个处理器只能运行一个 Starlark 源或脚本。因此,如果您想将多个 Starlark 函数包含到数据中,只需确保您使用多个处理器并包含 namepass = ["measurement_name"]
,这样所有的处理器都将更改您针对的特定数据集。以下是一个示例
[[inputs.cpu]]
[[processors.starlark]]
namepass = ['cpu']
# First Starlark script
source = '''
def apply(metric)
return metric
'''
[[processors.starlark]]
namepass = ['cpu']
# Second Starlark script
source = '''
def apply(metric)
return metric
'''
以下是 Starlark 处理器在行动中的更多示例。
如何在Telegraf中计算百分比比率
使用此Starlark代码计算两个整数的比率,然后乘以100得到百分比。该比率将作为Telegraf指标流的一部分返回。
[[processors.starlark]]
source = '''
# Compute the ratio of two integer fields.
def apply(metric):
used = float(metric.fields['used'])
total = float(metric.fields['total'])
# Create a new field called usage.
metric.fields['usage'] = (used / total) * 100
# Return usage as part of your Telegraf metrics stream
return metric
'''
如何在Telegraf中使用Starlark缩放指标
一个常见的需求是将以字节为单位的指标缩放到千字节、兆字节、吉字节或太字节。以下是使用Telegraf Starlark处理器将字节转换为千字节的方法
[[processors.starlark]]
source = '''
# Convert bytes to kilobytes
def apply(metric):
# k stands for key, v for value
for k, v in metric.fields.items():
if type(v) == "int":
# 1000 bytes in a kilobyte
metric.fields[k] = v / 1000
return metric
'''
以下是使用Telegraf Starlark处理器将字节转换为兆字节的方法
[[processors.starlark]]
source = '''
# Convert bytes to megabytes
def apply(metric):
# k stands for key, v for value
for k, v in metric.fields.items():
if type(v) == "int":
# 1000000 bytes in a megabyte
metric.fields[k] = v / 1000000
return metric
'''
以下是使用Telegraf Starlark处理器将字节转换为吉字节的方法
[[processors.starlark]]
source = '''
# Convert bytes to gigabytes
def apply(metric):
# k stands for key, v for value
for k, v in metric.fields.items():
if type(v) == "int":
# 1000000000 bytes in a gigabyte
metric.fields[k] = v / 1000000000
return metric
'''
(如果您更喜欢使用基于2的值来转换存储,那么您将讨论梅比字节、吉比字节等。这篇文章解释了这些单位的区别以及如何计算它们。)
最后一个例子:有时将指标缩放以确保它们在图表中正确显示是有帮助的。以下是您如何使用Telegraf Starlark处理器将数字乘以10进行缩放
[[processors.starlark]]
source = '''
# Multiply any float fields by 10
def apply(metric):
# k stands for key, v for value
for k, v in metric.fields.items():
if type(v) == "float":
metric.fields[k] = v * 10
return metric
'''
在Telegraf中使用Starlark转换值
假设您需要将Telegraf遥测中的某个值转换为另一个值。例如,F5网络设备返回的状态是一个整数,取值范围是0到5,其中1表示健康,其余表示不健康状态的不同变体。假设我想将其缩减为二进制状态——仅有一个或零——其中1表示健康,0表示不健康,如下表所示
原始值 | 新值 |
0 | 0 |
1 | 1 |
2 | 0 |
3 | 0 |
4 | 0 |
5 | 0 |
请注意,如果值是0或1,我们不需要做任何更改;如果值是2到5,我们才需要做更改。以下是我在Telegraf中使用Starlark进行这些更改的方法
[[processors.starlark]]
namepass = ['measurementname']
source = '''
def apply(metric):
# v stands for value
v = metric.fields.get('status')
# If no value, do nothing:
if v == None:
return metric
# When v is 2, 3, 4, or 5:
if 1 < v and v < 6:
metric.fields['status'] = 0
return metric
'''
如何使用Flux计算健康设备的百分比
让我们继续这个网络监控示例。在本地执行此类转换可以简化处理上游数据的工作。现在这种转换已经发生,我们可以执行像sum(status)/count(status)*100这样更简单的数学计算,以获得健康设备的百分比。
使用Flux,查询看起来像这样
// First, create a variable called data
data = from(bucket: "device_data")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "network_devices" and r._field == "status")
// Create a sum of the data across one minute windows
sum = data
|> aggregateWindow(every: 1m, fn: sum)
|> map(fn: (r) => ({ r with _field: "sum" })
// Create a count of the data across one minute windows
count = data
|> aggregateWindow(every: 1m fn: count)
|> map(fn: (r) => ({ r with _field: "count" })
// Create a union of the data, using the sum and the counts to calculate percent of healthy devices
union(tables: [sum,count])
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> map(fn: (r) => ({ r with
_field: "percent_healthy",
_value: (float(v: r.sum) / float(v: r.count)) * 100.0
}))
此Flux脚本在一个循环中收集数据,然后使用它来计算总和和计数,同时将那些值存储在单独的变量中。然后通过两个变量的联合将数据合并,并计算percent_healthy结果,在过去的10分钟内以每分钟1个间隔产生一个值。
如何使用Starlark在Telegraf中重命名标签
如果您需要重命名行协议中的标签,以下是您如何使用Starlark的方法
[[processors.starlark]]
source = '''
# Rename any tags using the mapping in the renames dict.
renames = {
'lower': 'min',
'upper': 'max',
}
def apply(metric):
# k stands for key, v for value
for k, v in metric.tags.items():
if k in renames:
metric.tags[renames[k]] = v
metric.tags.pop(k)
return metric
'''
如何在Telegraf和Starlark中使用条件逻辑
Starlark还支持仅在满足某些逻辑条件时更改指标。例如,假设您想要将任何值在1到6之间的指标设置为0。以下是您如何使用Telegraf Starlark处理器完成此操作的示例
[[processors.starlark]]
source = '''
# Set any 'status' field between 1 and 6 to a value of 0
def apply(metric):
# v stands for value
v = metric.fields.get('status')
if v == None:
# If there is no status field
return metric
if 1 < v and v < 6:
# If status is greater than 1 and less than 6, set status to zero
metric.fields['status'] = 0
# Return your modified metric to the Telegraf metrics stream
return metric
'''
如何根据数值条件删除指标
以下是如果指标满足特定数值条件(例如检测到500错误)时,如何将它们存储在InfluxDB中的方法
def apply(metric):
if metric.fields["value"] <= 10:
return metric // keep
else:
return None // drop
Starlark和Telegraf的演示
如果您想查看如何使用Visual Studio Code编写Telegraf Starlark脚本的演示,请观看此演示视频
结论
如您所见,Starlark 处理器有很多用途。更多示例和用途,请查看示例列表,如果您想了解它如何增强您的数据收集。另外,如果您想了解更多关于 Starlark 的信息,请查看Starlark 规范,以获取有关此处理器可使用的语法和函数的详细信息。
您有自己的 Starlark 函数,认为其他人也会觉得有用吗?请在我们的公共 InfluxData Slack 频道和InfluxDB 社区网站上分享。如果您需要帮助,也可以在那里提出任何问题。祝您使用愉快!