为什么对时间序列数据使用 K-Means 聚类?(第三部分)
作者:Anais Dotis-Georgiou / 产品、 用例、 开发者
2018 年 11 月 16 日
导航至
在本系列的第一部分中,我概述了如何使用不同的统计函数和 K-Means 聚类进行时间序列数据的异常检测。在第二部分中,我分享了一些代码,展示了如何将 K-means 应用于时间序列数据,以及 K-means 的一些缺点。在这篇文章中,我将分享
- 我如何使用 K-Means 和 InfluxDB 以及 InfluxDB Python 客户端库来检测 EKG 数据中的异常
- 我如何使用 Chronograf 对异常发出警报
您可以在这个 repo 中找到我使用的代码和数据集。我借鉴了 Amid Fish 的教程中的代码。它非常棒,我建议您查看一下。
我如何使用 K-Means 和 InfluxDB 以及 InfluxDB Python 客户端库来检测 EKG 数据中的异常
如果您阅读过第二部分,那么您就知道这些是我使用 K-means 进行异常检测的步骤
- 分段 – 将您的时间序列数据分割成带有水平平移的小段的过程。
- 窗口化 – 将您的分段数据乘以窗口函数以在窗口前后截断数据集的操作。窗口化得名于它的功能:它允许您只看到窗口范围内的数据,因为之前和之后(或窗口之外)的所有内容都乘以零。窗口化允许您无缝地将重建的数据拼接在一起。
- 聚类 – 将相似的窗口化段分组并在集群中找到质心的任务。质心位于集群的中心。在数学上,它由集群中所有点的算术平均位置定义。
- 重建 – 重建时间序列数据的过程。本质上,您是将正常的时序数据与最接近的质心(预测的质心)匹配,并将这些质心拼接在一起以生成重建的数据。
- 正常误差 – 重建的目的是计算与时间序列预测输出相关的正常误差。
- 异常检测 – 既然您知道重建的正常误差是多少,您现在就可以将其用作异常检测的阈值。任何高于该正常误差的重建误差都可以被认为是异常。
在上一篇文章中,我分享了如何执行分段、窗口化和聚类来创建重建。对于这篇文章,我将重点介绍如何使用 Python CL 来执行异常检测。但是,在我们深入研究异常检测之前,让我们花一点时间进行一些数据探索。首先,我使用 CL 查询我的正常 EKG 数据并将其转换为 DataFrame。
client = InfluxDBClient(host='localhost', port=8086)
norm = client.query('SELECT "signal_value" FROM "norm_ekg"."autogen"."EKG" limit 8192')
norm_points = [p for p in norm.get_points()]
norm_df = pd.DataFrame(norm_points)
norm_df.head()
接下来,我删除时间戳并将“signal_value”转换为数组。请记住,只有当时间序列数据是规则的时(即 ti 和 ti+1 之间的间隔始终相同),使用 K-Means 进行时间序列数据异常检测才是可行的。这就是为什么我可以排除以下任何分析的时间戳。
ekg_data = np.array(norm_df["signal_value"])
在我们继续分段之前,我们需要绘制我们的正常 EKG 数据并进行一些数据探索
n_samples_to_plot = 300
plt.plot(ekg_data[0:n_samples_to_plot])
plt.xlabel("Sample number")
plt.ylabel("Signal value")
plt.show()
要执行分段,您必须首先决定您希望您的段有多长。如果您查看数据,您可以看到三个重复的形状。在“Sample number” 30、110、180 和 260 附近,我们看到了一个陡峭的峰值,称为 QRX 波群。在每个 QRX 波群之前,都有一个小驼峰。这被称为 P 波。紧随 QRX 波群之后,我们有 T 波。它是周期最大的第二高峰。我们要确保我们的段长度足够长,以封装这些波中的每一个。由于 T 波具有最长的周期,我们将该周期设置为等于段长度,其中 seqment_len= 32。
然后使用此分段功能对 EKG 数据进行分段
def sliding_chunker(data, window_len, slide_len):
""" Segmentation """
chunks = []
for pos in range(0, len(data), slide_len):
chunk = np.copy(data[int(pos):int(pos+window_len)])
if len(chunk) != window_len:
continue
chunks.append(chunk)
return chunks
我将段存储在名为 test_segments 的数组列表中
slide_len = int(segment_len/2)
test_segments = sliding_chunker(
ekg_data,
window_len=segment_len,
slide_len=slide_len
)
len(test_segments)
接下来,我们执行上一篇文章中描述的重建,并确定正常 EKG 数据的最大重建误差为 8.8。
reconstruction = np.zeros(len(ekg_data))
slide_len = segment_len/2
for segment_n, segment in enumerate(test_segments):
# don't modify the data in segments
segment = np.copy(segment)
segment = segment * window
nearest_centroid_idx = clusterer.predict(segment.reshape(1,-1))[0]
centroids = clusterer.cluster_centers_
nearest_centroid = np.copy(centroids[nearest_centroid_idx])
# overlay our reconstructed segments with an overlap of half a segment
pos = segment_n * slide_len
reconstruction[int(pos):int(pos+segment_len)] += nearest_centroid
n_plot_samples = 300
error = reconstruction[0:n_plot_samples] - ekg_data[0:n_plot_samples]
error_98th_percentile = np.percentile(error, 98)
print("Maximum reconstruction error was %.1f" % error.max())
print("98th percentile of reconstruction error was %.1f" % error_98th_percentile)
plt.plot(ekg_data[0:n_plot_samples], label="Original EKG")
plt.plot(reconstruction[0:n_plot_samples], label="Reconstructed EKG")
plt.plot(error[0:n_plot_samples], label="Reconstruction Error")
plt.legend()
plt.show()
现在我准备好开始异常检测了。首先,我使用 Python 客户端查询我的异常数据。尽管数据是历史数据,但此脚本旨在模拟实时异常检测。我以 32 秒的间隔查询数据,就好像我正在从数据流中收集数据一样。接下来,我像以前一样创建重建,并计算每个段的最大重建误差。最后,我将这些误差写入名为“error_ekg”的新数据库。
while True:
end = start + timedelta(seconds=window_time)
query = 'SELECT "signal_value" FROM "anomaly_ekg"."autogen"."EKG" WHERE time > \'' + str(start) + '\' and time < \'' + str(end) + '\''
client = InfluxDBClient(host='localhost', port=8086)
anomaly_stream = client.query(query)
anomaly_pnts = [p for p in anomaly_stream.get_points()]
df_anomaly = pd.DataFrame(anomaly_pnts)
anomalous = np.array(df_anomaly["signal_value"])
windowed_segment = anomalous * window
nearest_centroid_idx = clusterer.predict(windowed_segment.reshape(1,-1))[0]
nearest_centroid = np.copy(centroids[nearest_centroid_idx])
error = nearest_centroid[0:n_plot_samples] - windowed_segment[0:n_plot_samples]
max_error = error.max()
write_time = start + timedelta(seconds=slide_time)
client.switch_database("error_ekg")
json_body = [
{
"measurement": "ERROR",
"tags": {
"error": "max_error",
},
"time": write_time,
"fields": {
"max_error": max_error
}
}]
client.write_points(json_body)
print("QUERY:" + query)
print("MAX ERROR:" + str(max_error))
start = start + timedelta(seconds=slide_time)
time.sleep(32)
我得到这个输出
现在我可以将最大误差写入数据库,我准备好使用 Kapacitor 设置阈值,并在任何误差超过我的正常最大重建误差 8.8 时发出警报。
我如何使用 Chronograf 对异常发出警报
为了使用 Kapacitor(InfluxData 的数据处理框架),我需要编写一个 TICKscript(Kapacitor 的 DSL)来对异常发出警报。由于我是 Kapacitor 的新用户,我选择使用 Chronograf 来帮助我管理警报并为我自动生成 TICKscript。我真幸运!
首先,我导航到“管理任务”页面…
接下来,选择“构建警报规则”…
现在我可以开始构建我的警报规则了。我命名我的警报,选择警报类型…
…并选择我要发出警报的字段值以及阈值条件。最后,我指定要将这些警报发送到哪里…
…并配置连接。
如果我回到“管理任务”页面,我现在可以查看自动生成的 TICKscript。默认情况下,Kapacitor 将这些警报写入“chronograf”数据库。如果我想更改输出数据库,我可以简单地更改第 25 行。
var outputDB = 'chronograf'
就是这样!当我运行 while 循环并将误差发送到数据库时,每当我的误差大于 8.8 时,Kapacitor 都会通过 Slack 通知我。
如果我们看一下我的仪表板,您可以看到在包含异常的段中,我的误差大于 8.8,并且 Kapacitor 能够检测到它。
左侧单元格: 洋红色线代表每 32 个点的最大重建误差。在 13:57:20 左右发生的异常处,它开始超过 8.8。
右侧单元格: 我使用查询“SELECT max(“value”) AS “max_value” FROM “chronograf”.”autogen”.”alerts” limit 1”显示异常的最大误差
我希望这篇以及之前的博文能帮助您进行异常检测之旅。如果您发现任何令人困惑的地方,或者随时向我寻求帮助,请告诉我。您可以访问 InfluxData 社区站点 或在 Twitter 上 @InfluxDB 给我们发消息。
最后,为了保持一致性,我也想用一个脑筋急转弯来结束这篇博文。这里有一些变色龙送给您。