使用脚本与InfluxDB入门

导航到

本文最初发布于The New Stack,在此经许可转载。

使用时序数据库脚本可以帮助开发者简化应用程序开发,扩展工作负载,构建精简的集成。

时序数据无处不在,这种现实不会改变。时序数据本身的特性意味着时序工作负载与其他许多类型的数据有很大不同。鉴于时序数据在现代互联世界中的普遍性,确保开发者拥有管理它的工具比以往任何时候都更加重要。

通过使用时序数据库脚本,InfluxDB帮助开发者简化并民主化应用程序开发,扩展时序工作负载,构建精简的集成,以增强系统或应用程序功能。

在InfluxDB的背景下,脚本由Flux代码组成。(Flux是InfluxDB的脚本和查询语言。您可以在此处学习如何使用Flux。)您可以在本地计算机上编写Flux脚本,然后将该代码上传并安装到InfluxDB Cloud中。一旦在InfluxDB Cloud中,您可以通过不同的方式共享和利用这些脚本。

在团队中保存和分享代码

首先,脚本可以成为开发团队的优秀学习工具。Flux 专家可以编写高级 Flux 应用程序并将它们保存为脚本。其他团队成员可以访问该代码,并直接根据他们的需求使用它。拥有这些脚本的访问权限意味着正在构建 Flux 技能的团队成员拥有宝贵的资源来阅读、学习和在自己的工作中重用。

您还可以将脚本与任务结合使用。InfluxDB 的 任务系统 是一个自动化平台,用于在特定时间表上运行查询。在任务中安装脚本意味着脚本将在任务按其时间表运行时执行。这很有用,因为您可以在多个任务上安装相同的脚本。当您更新该脚本时,更改将自动应用于所有调用该脚本的任务。如果您有很多任务,能够在一个地方进行更改并立即在整个系统中应用,这将节省大量时间。

动态数据分析

将动态参数传递到脚本中简化了开发和规模管理流程。假设您有一个需要维护建筑物内温度的制造设施,因此您在建筑物各处设置了温度传感器。您需要为每个传感器运行相同的检查,但不同型号传感器的检查频率不同。以下脚本是简单温度检查操作的起点。该脚本包含一些将从调用脚本的任务中提取的动态变量。

import "slack"

sendSlackMessage = (text, color) =>
   slack.message(
       url: "https://hooks.slack.com/services/YOURHOOKHERE",
       token: "",
       channel: "team-sensors",
       text: text,
       color: color,
   )

from(bucket:"sensors")
 |> range (start:-tasks.every)
 |> filter (fn(r)=>r.sensor_id==params.sensor)
 |> map( fn(r) => {
           return {r with sentMessage:
             if r.temperature > 80
             then string(v: 200 == sendSlackMessage(color: "warning", text: "Temperature on sensor ${r.sensor_id} is WARN: ${r.temperature}") ) }
             else if r.temperature > 100
               then string(v: 200 == sendSlackMessage(color: "danger", "text": "ALERT: Temperature sensor on ${r.sensor_id} needs immediate attention: ${r.temperature}"))
             else "false"
       }
   )
 |> to(bucket: "SensorCheckLog")

这里有两个任务示例。第一个每五分钟检查传感器的温度。任务调用 check_temp 脚本,并传入要检查的传感器 ID。

Influx script list

下一个任务执行相同操作,主要区别在于 task_10 任务检查不同的传感器,并在不同的间隔进行。这次是每 10 分钟。

influx task - create name

利用数据和脚本创造价值

能够动态地将数据插入脚本和任务中,使得管理大规模的任务和设备更加容易。以这种方式使用脚本可以让您快速以任何提供价值的方式对数据进行切片和切块。

例如,假设您想在两个不同的粒度级别对数据集执行相同的转换。以下脚本计算一个简单的平均值。数据源、数据输出位置和聚合窗口都是动态值。

// script get_downsample
from(bucket:params.from)
|> range(start:-tasks.every)
|> AggregateWindow(period:params.period, fn:mean)
|> to(bucket:params.to)

要在任务中使用此脚本,我们需要调用脚本并传入参数。这里,我们正在从 raw_data 存储桶中选择数据,在五分钟窗口内取平均值,然后将聚合数据发送到名为 ds_5m 的新存储桶。

Influx script list-2

以下任务对相同数据执行相同操作,但有一些关键区别。我们在这里获取更粗的粒度平均值,在 15 分钟窗口内平均数据。我们希望将此数据保持分离以供分析,因此我们将将其发送到自己的存储桶 ds_15m

create name

如果基本平均操作无法满足您的需求,您可以在脚本中更新更高级的数据分析算法,任务将自动开始使用它。再次强调,这有助于管理数据源和大规模处理。

使用脚本实现更智能的警报

虽然脚本使得系统扩展变得容易,但它们同样可以帮助监控这些增长中的系统。您可以使用脚本根据时间序列数据进行警报管理。例如,我们为当出现问题时通知站点可靠性工程(SRE)团队制定一个策略。实现这一目标的一种方法是将您的警报策略编写在Flux脚本中。

使用InfluxDB等时间序列数据库,这可能包括加载消息密钥等项。(InfluxDB有一个安全保险库来存储密钥,Flux在需要时可以调用提取这些密钥。)另一个需要考虑的政策元素是消除重复警报的过程。而不是让相同的警报不断ping您的端点,比较当前警报与上一个警报以查看是否存在状态变化。如果没有,就没有必要发送另一个警报。

脚本中设置实际端点以及何时发送警报的准则的部分是使用动态变量的有用地方。

import "influxdata/influxdb/secrets"
import "slack"
import "pagerduty"
import "array"

slackhook =  secrets.get(key: "SLACK_HOOK")
pd_key = secrets.get(key: "PD_KEY")

sendSlackMessage = (text, color) =>
   slack.message(
       url: slackhook,
       token: "",
       channel: "your-channel",
       text: text,
       color: color,
   )

pagerduty_endpoint = pagerduty.endpoint()
sendPagerDutyMessage = (
   service,
   level,
   summary,
   timestamp,
   key,
) =>
   pagerduty.sendEvent(
       class: "error_alert",
       client: "your-company",
       clientURL: "http://your.company.com",
       dedupKey: key,
       eventAction: pagerduty.actionFromLevel(level),
       group: service,
       routingKey: pd_key,
       severity: pagerduty.severityFromLevel(level),
       source: "ALERT_SRE",
       summary: summary,
       timestamp: timestamp,
   )

record = if params.level=="crit"
then {pd_sent: sendPagerDutyMessage(
                                   key: params.pagerdutyDedupKey,
                                   level: params.level,
                                   service: "sre-services",
                                   summary: params.message,
                                   timestamp: now(),
                               ).statusCode,
     slack_sent: 0,
     message: params.message,
     _time: now(),
     level: prams.level
}

else if params.level=="warn"
then {    pd_sent: 0,
     slack_sent: sendSlackMessage(text: params.message, color: "warning"),
     message: params.message,
     _time: now(),
     level: prams.level
}
else if params.level=="ok"
then {pd_sent: sendPagerDutyMessage(
                                   key: params.pagerdutyDedupKey,
                                   level: params.level,
                                   service: "sre-services",
                                   summary: params.message,
                                   timestamp: now(),
                               ).statusCode,
     slack_sent: sendSlackMessage(text: params.message, color: "ok"),
     message: params.message,
     _time: now(),
     level: prams.level
}
// shouldn't hit this case, is added for syntactical correctness
else {pd_sent: 0,
     slack_sent: sendSlackMessage(text: params.message, color: "ok"),
     message: params.message,
     _time: now(),
     level: prams.level}

array.from(rows:[record])
   |> set(key: "_measurement", value: "SRE_ALERTS")
   |> group(columns: ["level", "pd_sent", "slack_sent", "_measurement"])
   |> wideTto(bucket: "AlertSeriesLog")

在上面的示例中,当任务生成关键警报时,脚本会拉入自定义消息并将其发送到PagerDuty端点。如果是一个关键问题,可能值得唤醒某人修复它。警告警报会将自定义警报消息发送到类似Slack的群组消息平台。如果警报级别返回正常,您可能想要设置警报通知到值班SRE和更广泛的团队,让他们知道问题已解决。

将警报历史数据写入InfluxDB的新桶中也是一个好主意,因此请将其作为您的警报策略的一部分。警报历史为SRE提供了关于事件的详细信息,使他们能够跟踪问题的重大里程碑。警报历史还通知重复消除过程。因为我们处理的是时间序列数据,所以很容易回顾上一个警报并确定它是否是重复的或新的。

使用可调用脚本的扩展可能性

另一个有用的选项是将警报策略脚本安装为可调用脚本。将Flux脚本安装在可调用URL后面使用户能够使用认证令牌从平台外部访问其时间序列数据和InfluxDB。

这极大地简化了应用程序开发,因为它允许开发人员在不将额外的库或组件安装到其代码中的情况下访问时间序列数据。他们只需调用URL端点即可。

因此,继续我们的示例,将脚本放在可调用URL后面允许支持您的应用程序的外部服务或应用程序通过调用脚本URL使用相同的警报策略。这些服务不需要担心策略的细节。他们只需要传递警告级别和相应消息的参数。

结论

无论您计划如何使用时间序列数据,脚本都可以帮助您更高效、更有效地使用它。接受动态参数的脚本可以简化扩展,因为您可以为复杂的操作编写脚本,并在整个解决方案中重复使用它们。您甚至可以将脚本公开给外部来源,用更少的代码构建更复杂、更健壮的解决方案。