社区亮点:Supralog 如何使用 InfluxDB OSS 构建在线增量机器学习管道以进行容量规划
作者:社区 / 产品, 用例, 开发者
2020 年 7 月 2 日
导航至
本文由 Supralog 数据科学家实习生 Gregory Scafarto 与 InfluxData 的 DevRel Anais Dotis-Georgiou 合作撰写。
在 InfluxData,我们为我们出色的 InfluxDB 社区感到自豪。我们感谢您的所有贡献和反馈。无论是 Telegraf 插件、社区模板、出色的 InfluxDB 项目,还是第三方 Flux 包,您的贡献都持续给我们留下深刻印象并让我们感到谦卑。今天,让我们聚焦来自 Supralog 的 Gregory Scafarto。
Gregory Scafarto 构建并维护了一个使用 Kapacitor、Python 和 InfluxDB 的在线容量规划管道。Gregory Scafarto 位于法国,并且即将完成他在 ENSEIRB-MATMECA 完成计算机系统网络和电信硕士学位的最后一年。他还作为 Supralog 的数据科学家实习生,从事预测性维护和容量规划解决方案的研究。
Supralog 如何使用 Kapacitor 进行容量规划
容量规划是时间序列监控中最重要的一种分析类型。容量规划工具帮助评估组织所需的生产能力。具体在 IT 领域,容量规划涉及估计存储、业务需求和成本估算,以交付约定的服务级别目标。
设置
Supralog 决定使用 InfluxDB 的 TICK Stack 有很多原因。它性能高且易于使用,但一个堆栈元素脱颖而出:Kapacitor,一个警报和实时流数据处理引擎,可与 Python 配合使用并处理我们的时间序列预测。我们使用 Kapacitor 和 InfluxDB 执行在线机器学习,以生成大量的 CPU、内存和磁盘使用率预测。我们计划扩展此架构以生成网站流量、电子邮件流量和其他应用程序性能指标的预测。
我们使用 InfluxDB-Python 客户端 写入和查询数据,并使用 Python 进行数据准备和预测。我们的 Python 脚本还计算残差,即预测值与实际值之间的差异。Kapacitor 会针对这些预测残差发出警报,以便将持续学习应用于我们的机器学习 (ML) 模型。如果预测残差过大且模型精度漂移,则会在新的数据集上重新训练模型,以优化模型的精度。Grafana 仅用于可视化目的。
注意:为了将本博客限制在上述架构的关键组件上,本教程不会详细介绍所有代码。请查看 Gregory 的配套代码库 此处。
FB Prophet 在大规模容量规划中的缺点
我们的预测架构随着时间的推移而演变。起初,预测是使用 Facebook Prophet 完成的,这是一个“在 R 和 Python 中实现的预测程序”,它具有高级 API,允许用户非常快速地进行预测。但是,我们最终从我们的管道中移除了 Prophet,因为它对于我们的用例存在一些缺点
- 它有平滑曲线的趋势。
- 它没有“在线训练”。相反,每次模型漂移时,都必须在完整的 数据周期上训练模型。这个缺点非常昂贵,因为它需要大量的 CPU 和 RAM 资源,尤其是在我们进行数百个预测的情况下。
数据准备和数据验证
数据准备是将原始数据转换为对后续分析或机器学习应用有价值的形式的过程。数据验证是确保我们的数据已经过适当的数据准备的过程。
第一步:数据准备
第一步是从数据库中提取数据,使用 InfluxDB-Python 客户端并将其转换为 Pandas DataFrame。接下来,我们执行一些时间序列数据分析,将我们的数据分成两个独立的组,以确定应将哪些预测方法应用于数据。具体来说,我们使用时间序列分解来分组我们的数据。我们选择将我们的时间序列分为两个不同的组
- 第一组:具有趋势但没有季节性的数据。第一组数据预测是使用经典模型进行的。具体来说,第一组数据预测是使用线性回归进行的。
- 第二组:具有趋势和季节性的数据。第二组数据预测将采用机器学习模型。具体来说,第二组数据预测是使用长短期记忆网络 (LSTM) 进行的。
在第一个示例中,此时间序列表示主机的磁盘使用率百分比
我们可以很容易地看到数据似乎是分段线性的——它由直线段组成——但我们需要确定它属于哪个组。我们编写了一个可以自动检测时间序列性质的测试。此测试首先计算自相关函数。自相关是时间序列与其自身在先前时间点的相关性的度量。自相关图可以帮助我们确定时间序列是否具有季节性。要了解有关自相关、它告诉我们什么以及它对 ML 模型的影响的更多信息,请查看此博客。绘制上述数据的自相关性为我们提供了如下信号
第二步:数据验证
通过目视检查,该图验证了我们的数据没有表现出季节性,这使其归为第一组。但是,我们需要一种可量化的方法来得出此结论,以便我们可以自动确定数据是否具有季节性。我们采用自相关函数的 皮尔逊相关系数 (R),以便自动化时间序列数据的分组,并确定我们的时间序列是否将由线性模型或 ML 模型很好地建模。皮尔逊相关系数是衡量两个变量之间线性相关程度的指标。它由以下数学公式定义
其中,
R=±1 表示强线性相关,其中 R=0 表示无线性相关。
使用 InfluxDB 和 Python 的线性回归预测模型
既然我们已经确定我们的数据将需要线性预测模型,我们就必须决定将哪些数据输入到我们的模型中。我们不能使用所有数据来构建线性模型,因为斜率随时间变化。使用所有数据会掩盖斜率的最近和相关变化。解决此问题的一种方法是确定最后一次有意义的斜率变化,并且仅选择包含最近重大变化的时间序列部分。这确保了我们的预测将基于最相关和最有意义的趋势。
为了找到那些有用的斜率变化,我们将取序列的绝对导数,然后找到显着的导数峰值。我们通过一个简单的阈值来定义显着的导数峰值,该阈值在 signal.find_peaks 函数中从 SciPy 使用,如下面的代码所示。为了减少信号噪声并确保仅选择相对显着的峰值,使用 while 循环将返回的峰值数量限制为前 10% 的最大峰值。在每个循环中,我们将峰值阈值的高度增加百分之一,以便丢弃嘈杂的较小峰值。我们迭代直到获得前 10% 的峰值。
这是我们在信号中进行此类操作的结果示例
这是我们线性模型训练的代码,Make_Linear_Model.py,完成 while 循环以消除较小的相对不重要的峰值。一旦我们分析了导数峰值,我们就在这些峰值存在时将线性回归应用于我们的时间序列,并生成线性预测。
import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
from numpy import diff
from scipy.signal import find_peaks
def train_linear_model(df,severity):
'''
Determine the last linear part of data to consider and then train the model of this subsequence of data.
Parameters
----------
df : dataframe
data.
severity : int
how insensitive will the model be to small changes
Returns
-------
model : linear_model
linear model trained on the last trend of the signal.
r_sq : int
Pearson's coefficient (caracterize how well the model represent the data).
df_l : dataframe
last linear part of the data.
'''
dx = 1
dy = diff(df["y"])/dx
i=1
mini=np.amin(df["y"])
maxi=np.amax(df["y"])
j=0
nb_peaks = len(df["y"])+1
while nb_peaks > len(df["y"])/10 :
peaks, _ = find_peaks(abs(dy), height=(severity+j)*(maxi-mini)/100)
nb_peaks=len(peaks)
j=j+1
plt.plot(peaks, abs(dy[peaks]), "x")
plt.plot(abs(dy))
plt.plot(df["y"])
plt.title("Slope changes")
plt.show()
i=peaks[-1]+1
df_l=df["y"][i+1 :]
x=np.linspace(1,len(df_l)-1,len(df_l))
model = LinearRegression()
model.fit(x.reshape(-1,1),np.asarray(df_l).reshape(-1,1))
r_sq=model.score(x.reshape(-1,1), np.asarray(df_l).reshape(-1,1))
return model,r_sq,df_l
使用 InfluxDB 和 Python 为我们的机器学习预测模型准备数据
现在让我们关注第二组,具有季节性的非线性数据,它采用机器学习进行预测。第二组包含来自各种主机的可用 CPU 百分比数据。以下是我们的原始数据在一个主机上的样子
首先,我们执行时间序列分解,并将我们的序列分成三个时间序列分量:趋势、季节性和残差或“噪声”。我们执行分解是因为我们在单独的 LSTM 网络中处理每个分量。我们使用来自 Statsmodels 的 seasonal_decompose 函数,Statsmodels 是一个“Python 模块,它提供了用于估计许多不同统计模型的类和函数”,以执行分解。此 Statsmodels 季节性分解函数要求用户指定周期。我们选择了一个 24*7 +1 的周期(如果数据以 1 小时频率采样,则为一周),因为数据具有每周季节性周期。确保您的季节性周期准确非常重要。如果您使用不正确的周期,则分解效果不佳,并且我们的模型变得不准确。这也是使用自相关图验证存在和季节性周期如此重要的原因。
我们还使用了 Statsmodels 季节性分解函数的加法方法,以使最终时间序列的重建变得容易。由于我们为分解选择了加法方法,因此我们的最终预测是通过将各个分量预测相加来重新生成时间序列而完成的。通过使用分解方法,由于 Statsmodels 季节性分解函数使用的滤波器(它使用大小为 13 的移动平均滤波器)造成的边缘效应,我们损失了 12 个点。因此,预测点在时间上向后移动了 12 个时间步,以便时间准确。
LSTM 简要说明
在我们深入研究如何使用 ML 框架 Keras 和 LSTM 进行增量学习之前,让我们花一点时间简要介绍一下 LSTM。本节仅浅尝辄止地介绍 LSTM。如果您有兴趣深入了解 LSTM,此博客 提供了关于 LSTM 工作原理的出色解释。
我们使用三个 LSTM 来输出我们时间序列的每个分解分量的预测。我们使用 LSTM(长短期记忆)网络来建模我们的季节性时间序列,因为它们专门用于学习长时间序列模式中的依赖关系。以下内部属性使 LSTM 成为非常强大的时间序列模型
- LSTM 接受数据序列作为输入。数据序列可以只是一个值数组。这减少了使用 LSTM 所需的数据准备量。
- LSTM 数据处理是双向的,并且它具有记忆状态,这有助于避免 梯度消失问题,这是一个常见的深度神经网络问题。换句话说,LSTM 擅长快速训练。
下图表示 LSTM 单元并总结了网络功能
LSTM 网络单元由三个主要组件或门组成
- 输入门 决定了时间序列的哪些值和分量在通过单元时会被更新。
- 遗忘门 确定了网络“记忆”中哪些值被遗忘或持续存在。此遗忘门使 LSTM 擅长长期保留学习时间序列模式。这是 LSTM 的一个特性部分。
- 输出门 确定要传递到下一个单元的信息,并最终输出预测。
使用 Keras 进行 InfluxDB 的时间序列预测
现在我们准备好看看代码了,这段代码可以对我们的三个时间序列分量进行预测。我们使用了 Keras,这是一个开源神经网络库,是 TensorFlow 的封装器,用于应用 LSTM。我们决定使用 Keras,因为它能够轻松实现增量学习。增量学习是一种机器学习技术,其中不断将新数据馈送到模型中,以进一步训练模型并有效地提高模型精度。例如,我们只需要从 InfluxDB 实例查询一周的数据来重新训练我们的模型。与传统的机器学习方法相比,增量学习更高效、资源节约且成本更低,传统方法需要查询我们所有的历史数据才能重新训练我们的模型。这是我们的 Making_ML_Model.py 代码,它使用 Keras 实现了增量学习 LSTM 网络。
from keras.models import Sequential,save_model
from keras.layers import Dense,LSTM,Dropout
def make_models(nb_layers,loss,,nb_features,optimizer,look_back) :
'''
Create an LSTM model depending on the parameters selected by the user
Parameters
----------
nb_layers : int
nb of layers of the lstm model.
loss : str
loss of the model.
metric : str
metric to evaluate the model.
nb_features : int
size of the ouput (one for regression).
optimizer : str
gradient descend's optimizer.
look_back : int
windows to be process by the network to make the prediction
trend : bool
Distinguish trend signal from others (more complicated to modelise).
Returns
-------
model : Sequential object
model
'''
model=Sequential()
model.add(LSTM(nb_layers,return_sequences=True,activation='relu',input_shape=(nb_features,.look_back)))
model.add(Dropout(0.2))
model.add(LSTM(nb_layers))
model.add(Dropout(0.2))
model.add(Dense(int(nb_layers/2),activation='relu'))
model.add(Dense(1))
model.compile(loss=loss,optimizer=optimizer,metrics=["mse"])
print("model_made")
return model
要了解有关此代码如何工作的更多信息,并使用 LSTM 生成单变量时间序列预测,我建议查看这篇博客。
评估我们的机器学习模型
绘制模型损失图始终有助于评估训练是否成功。我们可以查看模型针对三个分量(趋势、季节性和残差)的训练损失。训练损失是在训练期间对不良预测的惩罚度量。损失越高,预测越差。
换句话说,模型使用数据集的次数越多,模型改进的机会就越多。
当训练损失达到饱和最小值或停止减少时,模型“已学习了所有可以学习的内容”,并且尽可能准确。从上面的图表中,我们看到我们可以进行更多轮次以提高预测的准确性,因为损失看起来尚未完成减少。但是,增加轮次会增加训练的持续时间。我们需要在准确性提高和训练持续时间之间取得折衷。
残差有助于确定模型置信区间
由于季节性分解和残差本身的性质,时间序列残差或噪声往往会对模型产生最大的不确定性。但是,由于残差通常具有高斯分布,因此我们可以使用以下公式轻松计算出 95% 的置信区间
计算噪声的预测区间为我们提供了一种很好的方法来衡量整个模型的最小不确定性,因为它贡献了最多的误差。但是,值得注意的是,通过这种方法衡量不确定性仅对短期预测有效,但对于较长期的预测会降低准确性。使用这种方法是因为它可以很好地近似不确定性,并且比计算最终模型真实不确定性的方法(如Uber 的时间序列深度置信预测中所述)在计算上更有效。
我们的复合 LSTM 的预测,即各个预测的总和
在对趋势、季节性和残差进行预测后,将这些预测加在一起。以下是我们未来一周的预测结果,其中包含置信区间
左侧的图是放大的预测部分,因此我们可以更好地可视化我们的预测与真实值的拟合程度——看起来非常好!最后,既然我们有了预测,我们就使用客户端将其写入我们的 InfluxDB 实例。这是我们的 Grafana 仪表板上的输出
使用 Kapacitor 对残差发出警报并使用 exec() 节点执行增量学习
我们快要结束了!剩下要做的就是
- 创建 Kapacitor 的警报以监控残差的误差
- 创建在线增量训练方法脚本,以正确执行我们的重新训练脚本
我们使用 Kapacitor 对度量进行数学运算并触发警报。具体来说,我们将使用 Kapacitor 监控我们的预测残差,并在模型漂移时重新启动训练。Kapacitor 能够做很多事情,但今天我们将重点介绍 exec() 节点,如果满足警报条件,该节点能够启动脚本。这是我们 TICK 脚本的核心。这是我们的 TICK 脚本
import os
path_to_kap = os.environ['kapacitor']
path_to_script = os.environ['script']
class Alert():
def __init__(self,host,measurement) :
self.host=host
self.measurement=measurement
self.texte=""
def create(self,message,form):
self.form=form
'''
Create the tick alert
Note : NEED TO DEFINE the path of the script, which will be launched when an alert is triggered, as a variable environnement
Parameters
----------
message : str
Message to be shown as an alert on slack etc ; need to be written with kapacitor syntax
form : str
ex ,host=GSCA,device=device1 group by conditions with comma to separate + start with a comma
Returns
-------
None.
'''
where_condition=""
where=[[element.split("=") for element in form[1 :].split(",")][i][0]for i in range(len(form[1 :].split(",")))]
for ele in where :
where_condition=where_condition+ele+"="+ele +" AND "
texte=""
cond=["var "+(form[1 :].replace(","," AND").split("AND")[i]).replace("=","='")+"'" for i in range(len(form[1 :].replace(","," AND").split("AND")))]
for element in cond :
texte=texte+element+"\n"
texte = texte +"\n\n"+ """var realtime = batch
|query('SELECT mean(yhat) as real_value FROM "telegraf"."autogen".real_"""+self.measurement+""" WHERE """+where_condition[: -5]+"""')
.period(6h)
.every(6h)
.align()
|last('real_value')
.as('real_value')
var predicted = batch
|query('SELECT mean(yhat) as prediction FROM "telegraf"."autogen".pred_"""+self.measurement+""" WHERE """+where_condition[: -5]+"""')
.period(6h)
.every(6h)
.align()
|last('prediction')
.as('prediction')
var joined_data = realtime
|join(predicted)
.as('realtime', 'predicted')
.tolerance(20m)
var performance_error = joined_data
|eval(lambda: abs("realtime.real_value" - "predicted.prediction"))
.as('performance_error')
|alert()
.crit(lambda: "performance_error" > 10 )
.message('""" +message+"""')
.slack()
.exec('"""+path_to_script+"""', '"""+self.host+"'"+""", '"""+str(form[1 :])+"'"+""")
self.texte=texte
def save_alert(self):
self.form=self.form[1 :].replace("=",".")
self.form=self.form.replace(",","_")
self.form=self.form.replace(":","")
self.path=r"Alertes/alerte_"+self.host+"_"+self.form+".tick" #path to modifie as you want
with open(self.path,"w") as f :
f.write(self.texte)
f.close()
def define_alert(self):
self.form=self.form.replace("=",".")
self.form=self.form.replace(",","_")
self.form=self.form.replace(":","")
cmd_define_alert=path_to_kap+" define "+"alerte_"+self.host+"_"+self.form+" -type batch -tick "+self.path+" -dbrp telegraf.autogen"
print(cmd_define_alert)
os.system('cmd /c '+cmd_define_alert)
def enable_alert(self):
self.form=self.form.replace("=",".")
self.form=self.form.replace(",","_")
self.form=self.form.replace(":","")
cmd_enable_alert=path_to_kap+" enable "+"alerte_"+self.host+"_"+self.form
os.system('cmd /c '+cmd_enable_alert)
def launch(self):
self.define_alert()
self.enable_alert()
Alert()
类创建一个与我们创建的模型相适应的警报,以便监控我们的预测并在模型漂移时重新启动训练。create()
函数将我们的预测度量与我们的实际数据连接起来,计算预测残差或模型漂移,在发生重新训练事件时向 Slack 发送警报,并使用 exec()
节点运行我们的中间脚本。请注意,Kapacitor 和中间脚本的路径被编写为环境变量,因为在我们的例子中,脚本将被投入到 Docker 容器的生产环境中。
现在让我们解释一下中间脚本的作用,但首先,请记住我们的 Python Keras 脚本需要导入非平凡的模块和库。在项目开始时,在项目根目录下创建了一个虚拟环境:python -m venv name_venv
。我们的中间脚本激活了这个虚拟环境,其中包含多个软件包(包括 pip)和目录(包括包含 activate.bat
的 Script
)。中间脚本包含一行
对于 Windows
%activate_project% && cd %ml_project_directory% && python Retrain.py %1 %2 %3
其中 %activate_project%
是 venv directory
中 \Scripts\activate.bat
的路径
而 %ml_project_directory%
是包含添加到您的路径变量中的 Retrain.py
目录的项目根目录的路径。
对于 Linux 或 MacOS
%activate_project && cd %ml_project_directory && python Retrain.py %1 %2 %3
其中 %activate_project
是 venv directory
中 \Scripts\bin\activate.bat
的路径,%ml_project_directory
是包含添加到您的路径变量中的 Retrain.py
目录的项目根目录的路径。
中间脚本是一个 .bat
文件,它激活虚拟环境,然后运行重新训练 python 代码 Retrain.py
。 Retrain.py
重用了第一个训练文件中的函数,但模型从它们停止的地方继续进行。它们通过应用最新的 HDF5 文件来实现这一点,这些文件存储了每个模型(即 trend.h5、seasonality.h5 和 residuals.h5)的权重,用于在重新训练期间预测每个序列。这些文件存储在预测序列的相应目录中。例如,假设我们正在预测 CPU。那么在上面的脚本中
%1
一个字符串,表示感兴趣的主机host1
%2
一个字符串,表示度量cpu
%3
一个字符串,表示行协议中的序列键host=host1,cpu=cpu-total
。它表示下面脚本中Existing_Predictor()
类中的形式。
对于每个序列,相应的目录具有以下名称:primaryTag_measurement_seriesKey
。对于上面的示例,我们模型在该序列的权重目录将是 host1_cpu_host1_cpu-total
。
这是包含 load_models()
函数的脚本。 load_modules()
函数拆分形式 %3
,并指定将从中加载权重的目录。
class Existing_Predictor(Predictor):
def __init__(self,df,host,measurement,look_back,nb_epochs,nb_batch,form,freq_period) :
Predictor.__init__(self)
self.df=df
self.host=host
self.measurement=measurement
self.form=form
self.look_back=look_back
trend_x, trend_y,seasonal_x,seasonal_y,residual_x,residual_y=self.prepare_data(df,look_back,freq_period)
model_trend,model_seasonal,model_residual=self.load_models()
model_trend=self.train_model(model_trend,trend_x,trend_y,nb_epochs,nb_batch,"trend")
model_seasonal=self.train_model(model_seasonal,seasonal_x,seasonal_y,nb_epochs,nb_batch,"seasonal")
model_residual=self.train_model(model_residual,residual_x,residual_y,nb_epochs,nb_batch,"residual")
self.model_trend=model_trend
self.model_seasonal=model_seasonal
self.model_residual=model_residual
def load_models(self):
file=""
for element in self.form :
value=element.split("=")
file=file+'_'+value[1]
file=file[1 :].replace(":","")
model_trend=load_model("Modeles/"+file+"_"+self.measurement+"/"+"trend"+".h5")
model_seasonal=load_model("Modeles/"+file+"_"+self.measurement+"/"+"seasonal"+".h5")
model_residual=load_model("Modeles/"+file+"_"+self.measurement+"/"+"residual"+".h5")
return model_trend,model_seasonal,model_residual
此时,中间脚本已运行,并反过来成功地重新训练了我们的模型。 恭喜,您已经创建了一个自主模块,该模块根据指标的性质进行预测,并使用 95% 的置信区间进行框架化。
如前所述,本博客中仅共享了本项目中使用的一小部分代码。 请查看这个仓库,以了解完成此 ML 方法的 prepare_data()
、train_model()
和 save_model()
方法。
使用 InfluxDB 2 进行在线机器学习以及来自 InfluxData 的最终想法
以下部分是 InfluxData 的 DevRel Anais Dotis-Georgiou 的注释。
显然,Gregory 是一位数据科学忍者,我非常感谢他分享他的故事。但是,如果您查看他的代码和方法,您会知道他正在使用 InfluxDB 1.x 和 Kapacitor。Gregory 从 1.x TICK Stack 开始了他的旅程。现在刚入门的你们可能会问,“我如何在 InfluxDB 2.0 中实现这一点?” 以下是如何使用 InfluxDB 2.0 创建类似的预测管道的方法
- 使用新的 Python 客户端。这使您可以直接查询和写入 pandas DataFrames。它也与 1.x 兼容,以防您对使用目前处于 beta 阶段的 2.0 感到不舒服。(我们在 InfluxDays 上最近宣布,我们计划在秋季正式发布 InfluxDB 2.0 OSS,而 InfluxDB Cloud 现在已正式发布!)。
- 使用 Flux 和 joins 计算预测残差。我还将使用 pearsonr() 函数计算皮尔逊相关系数。
- 使用 to() 函数将残差写入新的存储桶或度量。我将使用任务定期执行此 Flux 脚本。
- 使用 警报和 http.post() 函数来触发重新训练和运行脚本。或者,您也可以考虑使用 Node-RED,这是一种“用于将硬件设备连接在一起的编程工具”,以触发重新训练。您可能还有兴趣阅读这篇博客,它与提出的架构方法有一些相似之处。它描述了如何使用 Node-RED 来持续部署 Telegraf 配置。
无论您使用的是哪个版本或版本的 InfluxDB,我都希望这个故事能激发您将机器学习集成到您基于 InfluxDB 的解决方案中。再次感谢 Gregory 分享他的故事,并感谢 InfluxData 社区的所有贡献。与往常一样,请在评论区、我们的社区网站或我们的Slack 频道中分享您的想法、疑虑或问题。我们很乐意获得您的反馈并帮助您解决遇到的任何问题!
此外,由于您对本文的主题感兴趣,请观看用于预测和异常检测的无代码 ML 网络研讨会。它描述了使用 InfluxDB 进行容器化、无代码 ML 方法的 POC。您还可以与演示者 Dean Sheehan 分享您的想法。