为什么使用K-Means进行时间序列数据分析?(第三部分)
由Anais Dotis-Georgiou / 产品,用例,开发者
2018年11月16日
导航至
在本系列的第一部分中,我概述了如何使用不同的统计函数和K-Means聚类进行时间序列数据的异常检测。在第二部分中,我分享了一些代码,展示了如何将K-means应用于时间序列数据以及K-means的一些缺点。在这篇文章中,我将分享
- 我是如何使用K-Means和InfluxDB Python客户端库检测EKG数据中的异常
- 我是如何使用Chronograf对异常进行警报
您可以在这个repo中找到我使用的代码和数据集。我从Amid Fish的教程中借用了一些代码。它相当不错,我推荐您查看。
如何使用K-Means和InfluxDB Python客户端库检测EKG数据中的异常
如果您阅读了第二部分,那么您知道我使用以下步骤进行K-means异常检测
- 分割 – 将您的时序数据分割成小段,进行水平平移。
- 窗口化 – 通过窗口化函数乘以分割后的数据,在窗口前后截断数据集。窗口化函数的名称来源于其功能:它允许您仅查看窗口范围内的数据,因为窗口前后(或窗口外的)数据乘以零。窗口化允许您无缝拼接重建的数据。
- 聚类 – 将相似的窗口化段分组,并找到聚类中的质心。质心位于聚类的中心。从数学上来说,它由聚类中所有点的算术平均位置定义。
- 重建 – 重建您的时序数据的过程。本质上,您是将正常时序数据与最近的质心(预测质心)相匹配,并将这些质心拼接起来以产生重建数据。
- 普通错误 – 重构的目的是计算与您时间序列预测输出相关的普通误差。
- 异常检测 – 由于您已知重构的普通误差,现在可以使用它作为异常检测的阈值。任何高于该普通误差的重构误差都可以被认为是异常。
在前一篇文章中,我分享了如何进行分割、窗口和聚类以创建重构。对于这篇文章,我将重点介绍我是如何使用Python CL 进行异常检测的。然而,在我们深入异常检测之前,让我们花点时间进行数据探索。首先,我使用CL查询我的正常心电图数据并将其转换为DataFrame。
client = InfluxDBClient(host='localhost', port=8086)
norm = client.query('SELECT "signal_value" FROM "norm_ekg"."autogen"."EKG" limit 8192')
norm_points = [p for p in norm.get_points()]
norm_df = pd.DataFrame(norm_points)
norm_df.head()
接下来,我删除了时间戳并将“signal_value”转换为数组。请记住,如果时间序列数据是规则的(即,ti和ti+1之间的间隔始终相同),则使用K-Means进行时间序列数据的异常检测才是可行的。这就是为什么我可以排除任何以下分析中的时间戳。
ekg_data = np.array(norm_df["signal_value"])
在我们进行分割之前,我们需要绘制正常心电图数据并做一些数据探索。
n_samples_to_plot = 300
plt.plot(ekg_data[0:n_samples_to_plot])
plt.xlabel("Sample number")
plt.ylabel("Signal value")
plt.show()
要执行分割,您必须首先决定您想要您的段多长。如果您查看数据,您可以看到三种重复的形状。在“样本号”30、110、180和260附近,我们看到一个陡峭的峰值,称为QRX复合波。在每个QRX复合波之前,有一个小隆起。这被称为P波。QRX复合波紧接着是T波。它是第二高的峰值,周期最大。我们想要确保我们的段长度足够长,可以包含所有这些波。由于T波有最长的周期,我们将该周期设置为段长度,即seqment_len=32。
然后使用此分割函数对心电图数据进行分割
def sliding_chunker(data, window_len, slide_len):
""" Segmentation """
chunks = []
for pos in range(0, len(data), slide_len):
chunk = np.copy(data[int(pos):int(pos+window_len)])
if len(chunk) != window_len:
continue
chunks.append(chunk)
return chunks
我将段存储在一个名为test_segments的数组列表中
slide_len = int(segment_len/2)
test_segments = sliding_chunker(
ekg_data,
window_len=segment_len,
slide_len=slide_len
)
len(test_segments)
接下来,我们按照前一篇文章中描述的方法执行重构,并确定正常心电图数据的最大重构误差为8.8。
reconstruction = np.zeros(len(ekg_data))
slide_len = segment_len/2
for segment_n, segment in enumerate(test_segments):
# don't modify the data in segments
segment = np.copy(segment)
segment = segment * window
nearest_centroid_idx = clusterer.predict(segment.reshape(1,-1))[0]
centroids = clusterer.cluster_centers_
nearest_centroid = np.copy(centroids[nearest_centroid_idx])
# overlay our reconstructed segments with an overlap of half a segment
pos = segment_n * slide_len
reconstruction[int(pos):int(pos+segment_len)] += nearest_centroid
n_plot_samples = 300
error = reconstruction[0:n_plot_samples] - ekg_data[0:n_plot_samples]
error_98th_percentile = np.percentile(error, 98)
print("Maximum reconstruction error was %.1f" % error.max())
print("98th percentile of reconstruction error was %.1f" % error_98th_percentile)
plt.plot(ekg_data[0:n_plot_samples], label="Original EKG")
plt.plot(reconstruction[0:n_plot_samples], label="Reconstructed EKG")
plt.plot(error[0:n_plot_samples], label="Reconstruction Error")
plt.legend()
plt.show()
现在我开始进行异常检测。首先,我使用Python客户端查询异常数据。尽管数据是历史的,但此脚本是用于模拟实时异常检测。我以32秒的间隔查询数据,就像我从数据流中收集数据一样。接下来,我像以前一样创建重构,并为每个段计算最大重构误差。最后,我将这些错误写入一个名为“error_ekg”的新数据库。
while True:
end = start + timedelta(seconds=window_time)
query = 'SELECT "signal_value" FROM "anomaly_ekg"."autogen"."EKG" WHERE time > \'' + str(start) + '\' and time < \'' + str(end) + '\''
client = InfluxDBClient(host='localhost', port=8086)
anomaly_stream = client.query(query)
anomaly_pnts = [p for p in anomaly_stream.get_points()]
df_anomaly = pd.DataFrame(anomaly_pnts)
anomalous = np.array(df_anomaly["signal_value"])
windowed_segment = anomalous * window
nearest_centroid_idx = clusterer.predict(windowed_segment.reshape(1,-1))[0]
nearest_centroid = np.copy(centroids[nearest_centroid_idx])
error = nearest_centroid[0:n_plot_samples] - windowed_segment[0:n_plot_samples]
max_error = error.max()
write_time = start + timedelta(seconds=slide_time)
client.switch_database("error_ekg")
json_body = [
{
"measurement": "ERROR",
"tags": {
"error": "max_error",
},
"time": write_time,
"fields": {
"max_error": max_error
}
}]
client.write_points(json_body)
print("QUERY:" + query)
print("MAX ERROR:" + str(max_error))
start = start + timedelta(seconds=slide_time)
time.sleep(32)
我得到了这个输出
现在,我能够将最大错误写入数据库,我准备好使用Kapacitor设置阈值并在任何超过正常最大重构误差8.8的错误上发出警报。
我是如何使用Chronograf发出异常警报的
为了使用InfluxData的数据处理框架Kapacitor,我需要编写一个TICKscript,这是Kapacitor的DSL,以发出异常警报。由于我是Kapacitor的新用户,我选择使用Chronograf来帮助我管理警报并自动为我生成TICKscript。真是太幸运了!
首先,我导航到“管理任务”页面…
然后选择“构建警报规则”…
现在我可以开始构建我的警报规则。我为警报命名,选择警报类型…
…并选择要发出警报的字段值和阈值条件。最后,我指定要将这些警报发送到何处…
…并配置连接。
如果返回“管理任务”页面,现在可以查看自动生成的TICKscripts。默认情况下,Kapacitor将这些警报写入“chronograf”数据库。如果想更改输出数据库,只需更改第25行即可。
var outputDB = 'chronograf'
就这么多!当运行while循环并向数据库发送错误时,如果错误大于8.8,Kapacitor会随时通知我。
看看我的仪表盘,可以看到包含异常的段中有一个大于8.8的错误,而Kapacitor能够检测到它。
左侧单元格: 深紫色线代表每32个点的一个最大重建误差。它开始超过8.8,发生在大约13:57:20的异常处。
右侧单元格: 我使用查询“SELECT max(“value”) AS “max_value” FROM “chronograf”.”autogen”.”alerts” limit 1”显示异常的最大错误。
希望这篇和之前的博客能帮助你在异常检测之旅中。如果您觉得有任何地方不清楚或者需要帮助,请随时告诉我。您可以访问InfluxData 社区网站 或通过@InfluxDB发推文。
最后,为了保持一致性,我也想用一个小憩结束这篇博客。这里有一些变色龙供您欣赏。