How to Use Starlark with Telegraf
By
Al Sargent /
Use Cases, Product, Developer
Aug 04, 2020
Navigate to:
Our Telegraf Starlark Processor Plugin is an exciting new processor in Telegraf 1.15 that gives you the flexibility of performing various operations in Telegraf using the Starlark language.
What is Starlark, you ask? Starlark (formerly known as Skylark) is a language intended for use as a configuration language. Starlark is a dialect of Python. Like Python, it is a dynamically typed language with high-level data types, first-class functions with lexical scope, and garbage collection. Independent Starlark threads execute in parallel, so Starlark workloads scale well on parallel machines. Starlark is a small and simple language with a familiar and highly readable syntax.
The Telegraf Starlark Processor calls a Starlark function for each matched metric, allowing for custom programmatic metric processing. Since the Starlark language is a dialect of Python, it will be familiar to those who have experience with the Python language. However, there are major differences and existing Python code is unlikely to work unmodified. In addition, the execution environment is sandboxed and cannot access file system, network, system resources.
Even with these restrictions, the Starlark processor has the following capabilities:
- Math operations
- String operations
- Renaming tags
- Logic operations
Using the Starlark processor, you can perform real-time aggregates (min, max) on your data in Telegraf before sending it on to InfluxDB. This helps reduce the load on your InfluxDB instance by distributing processing to your servers running Telegraf.
Since we’ve had a lot of requests for math capabilities within Telegraf, we’re glad to report that this plugin lets you perform arithmetic functions. A useful IoT example was requested by a community member who wanted to calculate power locally from the current and voltage fields read from the Telegraf Modbus Input Plugin. Here’s the Starlark function within the processor to do that:
[[processors.starlark]]
# Reads the Starlark script embedded
source = '''
def apply(metric):
# Get the field called current and put into variable I
I = metric.fields['current']
# Get the field called voltage and put into variable V
V = metric.fields['voltage']
# Create a new field, power, which is I times V
metric.fields['power'] = I * V
# Return power as part of your Telegraf metrics stream
return metric
'''
You can also put the above into a file, give it a “.star” file extension, and call it from your Telegraf configuration file as follows:
[[processors.starlark]]
# File containing a Starlark script.
script = "/usr/local/bin/myscript.star"
How to run multiple Starlark functions on Telegraf data
Only one Starlark source or script can be run per processor. So, if you would like to include multiple Starlark functions to your data, just make sure you use multiple processors and include namepass = ["measurement_name"]
so all the processors will make changes to a specific data set you’re targeting. Here’s an example of that that looks like:
[[inputs.cpu]]
[[processors.starlark]]
namepass = ['cpu']
# First Starlark script
source = '''
def apply(metric)
return metric
'''
[[processors.starlark]]
namepass = ['cpu']
# Second Starlark script
source = '''
def apply(metric)
return metric
'''
Here are more examples of the Starlark Processor in action.
How to calculate a percentage ratio in Telegraf
Use this Starlark code to calculate the ratio of two integers, then multiply by 100 to get a percentage. The ratio will get returned as part of the Telegraf metric stream.
[[processors.starlark]]
source = '''
# Compute the ratio of two integer fields.
def apply(metric):
used = float(metric.fields['used'])
total = float(metric.fields['total'])
# Create a new field called usage.
metric.fields['usage'] = (used / total) * 100
# Return usage as part of your Telegraf metrics stream
return metric
'''
How to scale metrics in Telegraf using Starlark
A common need is to take metrics that are in bytes, and scale them to kilobytes, megabytes, gigabytes, or terabytes. Here’s how to use Telegraf Starlark Processor to convert bytes to kilobytes:
[[processors.starlark]]
source = '''
# Convert bytes to kilobytes
def apply(metric):
# k stands for key, v for value
for k, v in metric.fields.items():
if type(v) == "int":
# 1000 bytes in a kilobyte
metric.fields[k] = v / 1000
return metric
'''
Here’s how to use Telegraf Starlark Processor to convert bytes to megabytes:
[[processors.starlark]]
source = '''
# Convert bytes to megabytes
def apply(metric):
# k stands for key, v for value
for k, v in metric.fields.items():
if type(v) == "int":
# 1000000 bytes in a megabyte
metric.fields[k] = v / 1000000
return metric
'''
And here’s how to use Telegraf Starlark Processor to convert bytes to gigabytes:
[[processors.starlark]]
source = '''
# Convert bytes to gigabytes
def apply(metric):
# k stands for key, v for value
for k, v in metric.fields.items():
if type(v) == "int":
# 1000000000 bytes in a gigabyte
metric.fields[k] = v / 1000000000
return metric
'''
(If you prefer to use base-2 values to convert your storage, then you’re talking mebibytes, gibibytes, etc. This article explains the difference and how to calculate them.)
One last example: sometimes it’s helpful to scale metrics to ensure that they display correctly in a chart. Here’s how you can use the Telegraf Starlark Processor to scale up a number by 10:
[[processors.starlark]]
source = '''
# Multiply any float fields by 10
def apply(metric):
# k stands for key, v for value
for k, v in metric.fields.items():
if type(v) == "float":
metric.fields[k] = v * 10
return metric
'''
Transform values in Telegraf using Starlark
Suppose you need to transform one value to another in your Telegraf telemetry. For instance, F5 network devices return their status as an integer with values 0 through 5, where the value of 1 means healthy and the rest are variations of unhealthy status. Suppose I want to collapse this to a binary state — just one or zero — where one means healthy and zero means unhealthy, as shown in this table:
Original value | New value |
0 | 0 |
1 | 1 |
2 | 0 |
3 | 0 |
4 | 0 |
5 | 0 |
Note that we don’t need to change anything if the values are zero or one; we only need to make changes if the values are 2 through 5. Here’s how I’d make those changes in Telegraf using Starlark:
[[processors.starlark]]
namepass = ['measurementname']
source = '''
def apply(metric):
# v stands for value
v = metric.fields.get('status')
# If no value, do nothing:
if v == None:
return metric
# When v is 2, 3, 4, or 5:
if 1 < v and v < 6:
metric.fields['status'] = 0
return metric
'''
How to calculate the percent of healthy devices using Flux
Let’s continue with this network monitoring example. Performing this kind of transformation locally simplifies working with the data upstream. Now that this kind of transformation has occurred, we can perform a much simpler mathematical calculation like sum(status)/count(status)*100 to get the percentages of healthy devices.
Using Flux, the query looks like this:
// First, create a variable called data
data = from(bucket: "device_data")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "network_devices" and r._field == "status")
// Create a sum of the data across one minute windows
sum = data
|> aggregateWindow(every: 1m, fn: sum)
|> map(fn: (r) => ({ r with _field: "sum" })
// Create a count of the data across one minute windows
count = data
|> aggregateWindow(every: 1m fn: count)
|> map(fn: (r) => ({ r with _field: "count" })
// Create a union of the data, using the sum and the counts to calculate percent of healthy devices
union(tables: [sum,count])
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> map(fn: (r) => ({ r with
_field: "percent_healthy",
_value: (float(v: r.sum) / float(v: r.count)) * 100.0
}))
In one pass, this Flux script collects the data and then uses it to calculate both the sum and count while storing those values in separate variables. The data is then combined via a union of the two variables and the percent_healthy result is calculated producing a single value in 1 min intervals over the past 10 minutes.
How to rename tags in Telegraf using Starlark
If you need to rename tags in your line protocol, here’s how you’d use Starlark:
[[processors.starlark]]
source = '''
# Rename any tags using the mapping in the renames dict.
renames = {
'lower': 'min',
'upper': 'max',
}
def apply(metric):
# k stands for key, v for value
for k, v in metric.tags.items():
if k in renames:
metric.tags[renames[k]] = v
metric.tags.pop(k)
return metric
'''
How to use conditional logic with Telegraf and Starlark
Starlark also supports changing metrics only if certain logical conditions are met. For example, suppose you wanted to set any metric with a value between 1 and 6 to zero. Here’s how you’d do that with the Telegraf Starlark Processor:
[[processors.starlark]]
source = '''
# Set any 'status' field between 1 and 6 to a value of 0
def apply(metric):
# v stands for value
v = metric.fields.get('status')
if v == None:
# If there is no status field
return metric
if 1 < v and v < 6:
# If status is greater than 1 and less than 6, set status to zero
metric.fields['status'] = 0
# Return your modified metric to the Telegraf metrics stream
return metric
'''
How to drop metrics based off a numeric condition
Here’s how to store metrics in InfluxDB if they meet a particular numeric condition, such as detecting a 500 error:
def apply(metric):
if metric.fields["value"] <= 10:
return metric // keep
else:
return None // drop
Demo of Starlark and Telegraf
If you’d like to see a demo of how to write Starlark scripts for Telegraf using Visual Studio Code, check out this demo video:
Conclusion
As you can see, there’s a lot you can do with the Starlark Processor. For more, check out the list of examples and uses for the Starlark Processor if you would like to see how it could enhance your data collection. Also if you would like to read more about Starlark, take a look at the Starlark specifications for more details about the syntax and available functions you can use with this processor.
Got your own Starlark functions that you think others would find useful? Please share them in our public InfluxData Slack channel and InfluxDB community site. And feel free to post any questions you have there if you need help. Enjoy!