开始使用脚本与 InfluxDB
作者:Jason Myers / 产品
2022 年 11 月 09 日
导航至
本文最初发表于 The New Stack,并经许可在此处转载。
将脚本与时间序列数据库结合使用,可帮助开发人员简化应用程序开发、扩展工作负载并构建精益集成。
时间序列数据无处不在,而且这种情况不会改变。时间序列数据的本质意味着时间序列工作负载与其他许多类型的数据不同。鉴于时间序列数据在我们现代互联世界中的普遍性,比以往任何时候都更重要的是确保开发人员拥有管理它的工具。
通过将脚本与时间序列数据库 InfluxDB 结合使用,可以帮助开发人员简化和普及应用程序开发,扩展时间序列工作负载,并构建精益集成以增强系统或应用程序功能。
在 InfluxDB 的上下文中,脚本由 Flux 代码组成。(Flux 是 InfluxDB 的脚本和查询语言。您可以在此处学习如何使用 Flux。)您可以在本地机器上编写 Flux 脚本,然后上传该代码并将其安装在 InfluxDB Cloud 中。一旦进入 InfluxDB Cloud,您就可以通过不同的方式共享和利用这些脚本。
在团队中保存和共享代码
首先,脚本可以是开发团队的重要学习工具。Flux 专家可以编写高级 Flux 应用程序并将其保存为脚本。然后,其他团队成员可以访问该代码并直接使用它来满足他们的需求。访问这些脚本意味着构建 Flux 技能的团队成员拥有宝贵的资源,可以阅读、学习并在自己的工作中重用。
您还可以将脚本与任务结合使用。InfluxDB 任务系统是一个自动化的平台,用于按特定计划运行查询。在任务中安装脚本意味着每次任务按计划运行时,脚本都会执行。这很有帮助,因为您可以在多个任务上安装相同的脚本。当您更新该脚本时,更改会自动应用于所有调用该脚本的任务。如果您有大量任务,则在一个地方进行更改并在整个系统中立即应用它们的能力可以节省大量时间。
动态数据分析
将动态参数传递到脚本中的能力简化了开发和规模管理流程。假设您有一个制造工厂,您需要在建筑物内保持温度,因此您在整个建筑物中设置了温度传感器。您需要对每个传感器运行相同的检查,但该检查的频率对于不同的传感器型号有所不同。以下脚本是一个简单的温度检查操作的开始。该脚本有一些动态变量,它将从调用该脚本的任务中提取。
import "slack"
sendSlackMessage = (text, color) =>
slack.message(
url: "https://hooks.slack.com/services/YOURHOOKHERE",
token: "",
channel: "team-sensors",
text: text,
color: color,
)
from(bucket:"sensors")
|> range (start:-tasks.every)
|> filter (fn(r)=>r.sensor_id==params.sensor)
|> map( fn(r) => {
return {r with sentMessage:
if r.temperature > 80
then string(v: 200 == sendSlackMessage(color: "warning", text: "Temperature on sensor ${r.sensor_id} is WARN: ${r.temperature}") ) }
else if r.temperature > 100
then string(v: 200 == sendSlackMessage(color: "danger", "text": "ALERT: Temperature sensor on ${r.sensor_id} needs immediate attention: ${r.temperature}"))
else "false"
}
)
|> to(bucket: "SensorCheckLog")
我们这里有两个任务示例。第一个任务每五分钟检查一次传感器的温度。该任务调用 check_temp
脚本并传入要检查的传感器 ID。
下一个任务执行相同的操作,主要区别在于 task_10
任务以不同的间隔检查不同的传感器。这次,是每 10 分钟一次。
使用数据和脚本生成价值
动态地将数据插入脚本和任务的能力使管理大规模的任务和设备变得更加容易。以这种方式使用脚本可以让您以任何提供价值的方式快速切片和分析您的数据。
例如,假设您想在两个不同的粒度级别对数据集执行相同的转换。以下脚本计算一个简单的平均值。数据源、数据输出位置和聚合窗口都是动态值。
// script get_downsample
from(bucket:params.from)
|> range(start:-tasks.every)
|> AggregateWindow(period:params.period, fn:mean)
|> to(bucket:params.to)
要在任务中使用此脚本,我们需要调用该脚本并传入参数。在这里,我们从 raw_data
bucket 中选择数据,取五分钟窗口的平均值,然后将聚合数据发送到一个名为 ds_5m
的新 bucket。
以下任务对相同的数据执行相同的操作,但有一些关键差异。我们在这里获得了更粗的平均值粒度,对 15 分钟窗口内的数据进行平均。我们希望将此数据分开进行分析,因此我们将其发送到其自己的 bucket ds_15m
。
如果事实证明基本平均值操作不能满足您的需求,您可以更新脚本并使用更高级的数据分析算法,任务将自动开始使用该算法。同样,这有助于大规模管理数据源和处理。
使用脚本进行更智能的警报
虽然脚本使扩展系统更容易,但它们也可以帮助监控这些不断增长的系统。您可以使用脚本来简化基于时间序列数据的警报。假设我们为站点可靠性工程 (SRE) 团队构建一个策略,以便在出现问题时发出警报。一种方法是在 Flux 脚本中写出您的警报策略。
使用像 InfluxDB 这样的时间序列数据库,这可能包括加载消息传递密钥等项目。(InfluxDB 有一个安全的 vault 用于存储密钥,Flux 有调用在需要时提取这些密钥。)要考虑的另一个策略元素是消除重复警报的过程。与其让相同的警报不断 ping 您的端点,不如将当前警报与之前的警报进行比较,以查看是否存在状态更改。如果没有,则无需发送另一个警报。
在脚本中设置实际端点以及发送警报的时间和位置的部分,是使用动态变量的有用位置。
import "influxdata/influxdb/secrets"
import "slack"
import "pagerduty"
import "array"
slackhook = secrets.get(key: "SLACK_HOOK")
pd_key = secrets.get(key: "PD_KEY")
sendSlackMessage = (text, color) =>
slack.message(
url: slackhook,
token: "",
channel: "your-channel",
text: text,
color: color,
)
pagerduty_endpoint = pagerduty.endpoint()
sendPagerDutyMessage = (
service,
level,
summary,
timestamp,
key,
) =>
pagerduty.sendEvent(
class: "error_alert",
client: "your-company",
clientURL: "http://your.company.com",
dedupKey: key,
eventAction: pagerduty.actionFromLevel(level),
group: service,
routingKey: pd_key,
severity: pagerduty.severityFromLevel(level),
source: "ALERT_SRE",
summary: summary,
timestamp: timestamp,
)
record = if params.level=="crit"
then {pd_sent: sendPagerDutyMessage(
key: params.pagerdutyDedupKey,
level: params.level,
service: "sre-services",
summary: params.message,
timestamp: now(),
).statusCode,
slack_sent: 0,
message: params.message,
_time: now(),
level: prams.level
}
else if params.level=="warn"
then { pd_sent: 0,
slack_sent: sendSlackMessage(text: params.message, color: "warning"),
message: params.message,
_time: now(),
level: prams.level
}
else if params.level=="ok"
then {pd_sent: sendPagerDutyMessage(
key: params.pagerdutyDedupKey,
level: params.level,
service: "sre-services",
summary: params.message,
timestamp: now(),
).statusCode,
slack_sent: sendSlackMessage(text: params.message, color: "ok"),
message: params.message,
_time: now(),
level: prams.level
}
// shouldn't hit this case, is added for syntactical correctness
else {pd_sent: 0,
slack_sent: sendSlackMessage(text: params.message, color: "ok"),
message: params.message,
_time: now(),
level: prams.level}
array.from(rows:[record])
|> set(key: "_measurement", value: "SRE_ALERTS")
|> group(columns: ["level", "pd_sent", "slack_sent", "_measurement"])
|> wideTto(bucket: "AlertSeriesLog")
在上面的示例中,当任务生成关键警报时,脚本会提取自定义消息并将其发送到 PagerDuty 端点。如果是严重问题,可能值得叫醒某人来修复它。警告警报向 Slack 等群组消息平台发送自定义警报消息。如果警报级别返回到 OK,您可能希望为值班 SRE 和更广泛的团队设置警报,让他们知道问题已解决。
将警报历史数据写入 InfluxDB 中的新 bucket 也是一个好主意,因此请将其作为警报策略的一部分。警报历史记录为 SRE 提供有关事件的详细信息,以便他们可以跟踪问题的主要里程碑。警报历史记录也为重复消除过程提供信息。因为我们处理的是时间序列数据,所以很容易回顾之前的警报并确定它是重复的还是新的。
通过可调用脚本扩展可能性
另一个有用的选项是将警报策略脚本安装为可调用脚本。在可调用 URL 后安装 Flux 脚本使用户能够使用经过身份验证的令牌从平台外部访问其时间序列数据和 InfluxDB。
这大大简化了应用程序开发,因为它允许开发人员访问时间序列数据,而无需在其代码中安装额外的库或组件。他们只需调用 URL 端点即可。
因此,为了继续我们的示例,将脚本放在可调用 URL 之后,使支持您的应用程序的外部服务或应用程序可以通过调用脚本 URL 来使用相同的警报策略。这些服务无需担心策略的细节。他们只需要传入警告级别和相应的消息的参数。
结论
无论您计划如何使用时间序列数据,脚本都可以帮助您更有效率和效果地使用它。采用动态参数的脚本有助于更容易地扩展,因为您可以编写脚本来执行复杂的操作,并在整个解决方案中简单地重用它们。您甚至可以将脚本暴露给外部源,并以更少的代码行构建更复杂和强大的解决方案。