为什么使用K-Means进行时间序列数据分析?(第三部分)

导航至

在本系列的第一部分中,我概述了如何使用不同的统计函数和K-Means聚类进行时间序列数据的异常检测。在第二部分中,我分享了一些代码,展示了如何将K-means应用于时间序列数据以及K-means的一些缺点。在这篇文章中,我将分享

  1. 我是如何使用K-Means和InfluxDB Python客户端库检测EKG数据中的异常
  2. 我是如何使用Chronograf对异常进行警报

您可以在这个repo中找到我使用的代码和数据集。我从Amid Fish的教程中借用了一些代码。它相当不错,我推荐您查看。

如何使用K-Means和InfluxDB Python客户端库检测EKG数据中的异常

如果您阅读了第二部分,那么您知道我使用以下步骤进行K-means异常检测

  1. 分割 – 将您的时序数据分割成小段,进行水平平移。
  2. 窗口化 – 通过窗口化函数乘以分割后的数据,在窗口前后截断数据集。窗口化函数的名称来源于其功能:它允许您仅查看窗口范围内的数据,因为窗口前后(或窗口外的)数据乘以零。窗口化允许您无缝拼接重建的数据。
  3. 聚类 – 将相似的窗口化段分组,并找到聚类中的质心。质心位于聚类的中心。从数学上来说,它由聚类中所有点的算术平均位置定义。
  4. 重建 – 重建您的时序数据的过程。本质上,您是将正常时序数据与最近的质心(预测质心)相匹配,并将这些质心拼接起来以产生重建数据。
  5. 普通错误 – 重构的目的是计算与您时间序列预测输出相关的普通误差。
  6. 异常检测 – 由于您已知重构的普通误差,现在可以使用它作为异常检测的阈值。任何高于该普通误差的重构误差都可以被认为是异常。

在前一篇文章中,我分享了如何进行分割、窗口和聚类以创建重构。对于这篇文章,我将重点介绍我是如何使用Python CL 进行异常检测的。然而,在我们深入异常检测之前,让我们花点时间进行数据探索。首先,我使用CL查询我的正常心电图数据并将其转换为DataFrame。

client = InfluxDBClient(host='localhost', port=8086)
norm = client.query('SELECT "signal_value" FROM "norm_ekg"."autogen"."EKG" limit 8192')
norm_points = [p for p in norm.get_points()]
norm_df = pd.DataFrame(norm_points)
norm_df.head()

接下来,我删除了时间戳并将“signal_value”转换为数组。请记住,如果时间序列数据是规则的(即,ti和ti+1之间的间隔始终相同),则使用K-Means进行时间序列数据的异常检测才是可行的。这就是为什么我可以排除任何以下分析中的时间戳。

ekg_data = np.array(norm_df["signal_value"])

在我们进行分割之前,我们需要绘制正常心电图数据并做一些数据探索。

n_samples_to_plot = 300
plt.plot(ekg_data[0:n_samples_to_plot])
plt.xlabel("Sample number")
plt.ylabel("Signal value")
plt.show()

要执行分割,您必须首先决定您想要您的段多长。如果您查看数据,您可以看到三种重复的形状。在“样本号”30、110、180和260附近,我们看到一个陡峭的峰值,称为QRX复合波。在每个QRX复合波之前,有一个小隆起。这被称为P波。QRX复合波紧接着是T波。它是第二高的峰值,周期最大。我们想要确保我们的段长度足够长,可以包含所有这些波。由于T波有最长的周期,我们将该周期设置为段长度,即seqment_len=32。

带有标记特征的正常心电图信号

然后使用此分割函数对心电图数据进行分割

def sliding_chunker(data, window_len, slide_len):
    """ Segmentation """
    chunks = []
    for pos in range(0, len(data), slide_len):
        chunk = np.copy(data[int(pos):int(pos+window_len)])
        if len(chunk) != window_len:
            continue
        chunks.append(chunk)

    return chunks

我将段存储在一个名为test_segments的数组列表中

slide_len = int(segment_len/2)
test_segments = sliding_chunker(
    ekg_data,
    window_len=segment_len,
    slide_len=slide_len
)
len(test_segments)

接下来,我们按照前一篇文章中描述的方法执行重构,并确定正常心电图数据的最大重构误差为8.8。

reconstruction = np.zeros(len(ekg_data))
slide_len = segment_len/2

for segment_n, segment in enumerate(test_segments):
    # don't modify the data in segments
    segment = np.copy(segment)
    segment = segment * window
    nearest_centroid_idx = clusterer.predict(segment.reshape(1,-1))[0]
    centroids = clusterer.cluster_centers_
    nearest_centroid = np.copy(centroids[nearest_centroid_idx])

    # overlay our reconstructed segments with an overlap of half a segment
    pos = segment_n * slide_len
    reconstruction[int(pos):int(pos+segment_len)] += nearest_centroid

n_plot_samples = 300

error = reconstruction[0:n_plot_samples] - ekg_data[0:n_plot_samples]
error_98th_percentile = np.percentile(error, 98)
print("Maximum reconstruction error was %.1f" % error.max())
print("98th percentile of reconstruction error was %.1f" % error_98th_percentile)

plt.plot(ekg_data[0:n_plot_samples], label="Original EKG")
plt.plot(reconstruction[0:n_plot_samples], label="Reconstructed EKG")
plt.plot(error[0:n_plot_samples], label="Reconstruction Error")
plt.legend()
plt.show()

现在我开始进行异常检测。首先,我使用Python客户端查询异常数据。尽管数据是历史的,但此脚本是用于模拟实时异常检测。我以32秒的间隔查询数据,就像我从数据流中收集数据一样。接下来,我像以前一样创建重构,并为每个段计算最大重构误差。最后,我将这些错误写入一个名为“error_ekg”的新数据库。

while True: 
    end = start + timedelta(seconds=window_time)
    query = 'SELECT "signal_value" FROM "anomaly_ekg"."autogen"."EKG" WHERE time > \'' + str(start) + '\' and time < \'' + str(end) + '\'' 
    client = InfluxDBClient(host='localhost', port=8086)
    anomaly_stream = client.query(query)
    anomaly_pnts = [p for p in anomaly_stream.get_points()]
    df_anomaly = pd.DataFrame(anomaly_pnts)
    anomalous = np.array(df_anomaly["signal_value"])

    windowed_segment = anomalous * window
    nearest_centroid_idx = clusterer.predict(windowed_segment.reshape(1,-1))[0]
    nearest_centroid = np.copy(centroids[nearest_centroid_idx])

    error = nearest_centroid[0:n_plot_samples] - windowed_segment[0:n_plot_samples]
    max_error = error.max()

    write_time =  start + timedelta(seconds=slide_time)
    client.switch_database("error_ekg")
    json_body = [
        {
            "measurement": "ERROR",
            "tags": {
                "error": "max_error",
            },
            "time": write_time,
            "fields": {
                "max_error": max_error
            }
        }]
    client.write_points(json_body)
    print("QUERY:" + query)
    print("MAX ERROR:" + str(max_error))
    start = start + timedelta(seconds=slide_time) 
    time.sleep(32)

我得到了这个输出

现在,我能够将最大错误写入数据库,我准备好使用Kapacitor设置阈值并在任何超过正常最大重构误差8.8的错误上发出警报。

我是如何使用Chronograf发出异常警报的

为了使用InfluxData的数据处理框架Kapacitor,我需要编写一个TICKscript,这是Kapacitor的DSL,以发出异常警报。由于我是Kapacitor的新用户,我选择使用Chronograf来帮助我管理警报并自动为我生成TICKscript。真是太幸运了!

首先,我导航到“管理任务”页面…

然后选择“构建警报规则”…

现在我可以开始构建我的警报规则。我为警报命名,选择警报类型…

…并选择要发出警报的字段值和阈值条件。最后,我指定要将这些警报发送到何处…

…并配置连接。

如果返回“管理任务”页面,现在可以查看自动生成的TICKscripts。默认情况下,Kapacitor将这些警报写入“chronograf”数据库。如果想更改输出数据库,只需更改第25行即可。

var outputDB = 'chronograf'

就这么多!当运行while循环并向数据库发送错误时,如果错误大于8.8,Kapacitor会随时通知我。

看看我的仪表盘,可以看到包含异常的段中有一个大于8.8的错误,而Kapacitor能够检测到它。

左侧单元格: 深紫色线代表每32个点的一个最大重建误差。它开始超过8.8,发生在大约13:57:20的异常处。

右侧单元格: 我使用查询“SELECT max(“value”) AS “max_value” FROM “chronograf”.”autogen”.”alerts” limit 1”显示异常的最大错误。

希望这篇和之前的博客能帮助你在异常检测之旅中。如果您觉得有任何地方不清楚或者需要帮助,请随时告诉我。您可以访问InfluxData 社区网站 或通过@InfluxDB发推文。

最后,为了保持一致性,我也想用一个小憩结束这篇博客。这里有一些变色龙供您欣赏。