为什么对时间序列数据使用 K-Means 聚类?(第三部分)

导航至

在本系列的第一部分中,我概述了如何使用不同的统计函数和 K-Means 聚类进行时间序列数据的异常检测。在第二部分中,我分享了一些代码,展示了如何将 K-means 应用于时间序列数据,以及 K-means 的一些缺点。在这篇文章中,我将分享

  1. 我如何使用 K-Means 和 InfluxDB 以及 InfluxDB Python 客户端库来检测 EKG 数据中的异常
  2. 我如何使用 Chronograf 对异常发出警报

您可以在这个 repo 中找到我使用的代码和数据集。我借鉴了 Amid Fish 的教程中的代码。它非常棒,我建议您查看一下。

我如何使用 K-Means 和 InfluxDB 以及 InfluxDB Python 客户端库来检测 EKG 数据中的异常

如果您阅读过第二部分,那么您就知道这些是我使用 K-means 进行异常检测的步骤

  1. 分段 – 将您的时间序列数据分割成带有水平平移的小段的过程。
  2. 窗口化 – 将您的分段数据乘以窗口函数以在窗口前后截断数据集的操作。窗口化得名于它的功能:它允许您只看到窗口范围内的数据,因为之前和之后(或窗口之外)的所有内容都乘以零。窗口化允许您无缝地将重建的数据拼接在一起。
  3. 聚类 – 将相似的窗口化段分组并在集群中找到质心的任务。质心位于集群的中心。在数学上,它由集群中所有点的算术平均位置定义。
  4. 重建 – 重建时间序列数据的过程。本质上,您是将正常的时序数据与最接近的质心(预测的质心)匹配,并将这些质心拼接在一起以生成重建的数据。
  5. 正常误差 – 重建的目的是计算与时间序列预测输出相关的正常误差。
  6. 异常检测 – 既然您知道重建的正常误差是多少,您现在就可以将其用作异常检测的阈值。任何高于该正常误差的重建误差都可以被认为是异常。

在上一篇文章中,我分享了如何执行分段、窗口化和聚类来创建重建。对于这篇文章,我将重点介绍如何使用 Python CL 来执行异常检测。但是,在我们深入研究异常检测之前,让我们花一点时间进行一些数据探索。首先,我使用 CL 查询我的正常 EKG 数据并将其转换为 DataFrame。

client = InfluxDBClient(host='localhost', port=8086)
norm = client.query('SELECT "signal_value" FROM "norm_ekg"."autogen"."EKG" limit 8192')
norm_points = [p for p in norm.get_points()]
norm_df = pd.DataFrame(norm_points)
norm_df.head()

接下来,我删除时间戳并将“signal_value”转换为数组。请记住,只有当时间序列数据是规则的时(即 ti 和 ti+1 之间的间隔始终相同),使用 K-Means 进行时间序列数据异常检测才是可行的。这就是为什么我可以排除以下任何分析的时间戳。

ekg_data = np.array(norm_df["signal_value"])

在我们继续分段之前,我们需要绘制我们的正常 EKG 数据并进行一些数据探索

n_samples_to_plot = 300
plt.plot(ekg_data[0:n_samples_to_plot])
plt.xlabel("Sample number")
plt.ylabel("Signal value")
plt.show()

要执行分段,您必须首先决定您希望您的段有多长。如果您查看数据,您可以看到三个重复的形状。在“Sample number” 30、110、180 和 260 附近,我们看到了一个陡峭的峰值,称为 QRX 波群。在每个 QRX 波群之前,都有一个小驼峰。这被称为 P 波。紧随 QRX 波群之后,我们有 T 波。它是周期最大的第二高峰。我们要确保我们的段长度足够长,以封装这些波中的每一个。由于 T 波具有最长的周期,我们将该周期设置为等于段长度,其中 seqment_len= 32。

带有标记特征的正常 EKG 信号

然后使用此分段功能对 EKG 数据进行分段

def sliding_chunker(data, window_len, slide_len):
    """ Segmentation """
    chunks = []
    for pos in range(0, len(data), slide_len):
        chunk = np.copy(data[int(pos):int(pos+window_len)])
        if len(chunk) != window_len:
            continue
        chunks.append(chunk)

    return chunks

我将段存储在名为 test_segments 的数组列表中

slide_len = int(segment_len/2)
test_segments = sliding_chunker(
    ekg_data,
    window_len=segment_len,
    slide_len=slide_len
)
len(test_segments)

接下来,我们执行上一篇文章中描述的重建,并确定正常 EKG 数据的最大重建误差为 8.8。

reconstruction = np.zeros(len(ekg_data))
slide_len = segment_len/2

for segment_n, segment in enumerate(test_segments):
    # don't modify the data in segments
    segment = np.copy(segment)
    segment = segment * window
    nearest_centroid_idx = clusterer.predict(segment.reshape(1,-1))[0]
    centroids = clusterer.cluster_centers_
    nearest_centroid = np.copy(centroids[nearest_centroid_idx])

    # overlay our reconstructed segments with an overlap of half a segment
    pos = segment_n * slide_len
    reconstruction[int(pos):int(pos+segment_len)] += nearest_centroid

n_plot_samples = 300

error = reconstruction[0:n_plot_samples] - ekg_data[0:n_plot_samples]
error_98th_percentile = np.percentile(error, 98)
print("Maximum reconstruction error was %.1f" % error.max())
print("98th percentile of reconstruction error was %.1f" % error_98th_percentile)

plt.plot(ekg_data[0:n_plot_samples], label="Original EKG")
plt.plot(reconstruction[0:n_plot_samples], label="Reconstructed EKG")
plt.plot(error[0:n_plot_samples], label="Reconstruction Error")
plt.legend()
plt.show()

现在我准备好开始异常检测了。首先,我使用 Python 客户端查询我的异常数据。尽管数据是历史数据,但此脚本旨在模拟实时异常检测。我以 32 秒的间隔查询数据,就好像我正在从数据流中收集数据一样。接下来,我像以前一样创建重建,并计算每个段的最大重建误差。最后,我将这些误差写入名为“error_ekg”的新数据库。

while True: 
    end = start + timedelta(seconds=window_time)
    query = 'SELECT "signal_value" FROM "anomaly_ekg"."autogen"."EKG" WHERE time > \'' + str(start) + '\' and time < \'' + str(end) + '\'' 
    client = InfluxDBClient(host='localhost', port=8086)
    anomaly_stream = client.query(query)
    anomaly_pnts = [p for p in anomaly_stream.get_points()]
    df_anomaly = pd.DataFrame(anomaly_pnts)
    anomalous = np.array(df_anomaly["signal_value"])

    windowed_segment = anomalous * window
    nearest_centroid_idx = clusterer.predict(windowed_segment.reshape(1,-1))[0]
    nearest_centroid = np.copy(centroids[nearest_centroid_idx])

    error = nearest_centroid[0:n_plot_samples] - windowed_segment[0:n_plot_samples]
    max_error = error.max()

    write_time =  start + timedelta(seconds=slide_time)
    client.switch_database("error_ekg")
    json_body = [
        {
            "measurement": "ERROR",
            "tags": {
                "error": "max_error",
            },
            "time": write_time,
            "fields": {
                "max_error": max_error
            }
        }]
    client.write_points(json_body)
    print("QUERY:" + query)
    print("MAX ERROR:" + str(max_error))
    start = start + timedelta(seconds=slide_time) 
    time.sleep(32)

我得到这个输出

现在我可以将最大误差写入数据库,我准备好使用 Kapacitor 设置阈值,并在任何误差超过我的正常最大重建误差 8.8 时发出警报。

我如何使用 Chronograf 对异常发出警报

为了使用 Kapacitor(InfluxData 的数据处理框架),我需要编写一个 TICKscript(Kapacitor 的 DSL)来对异常发出警报。由于我是 Kapacitor 的新用户,我选择使用 Chronograf 来帮助我管理警报并为我自动生成 TICKscript。我真幸运!

首先,我导航到“管理任务”页面…

接下来,选择“构建警报规则”…

现在我可以开始构建我的警报规则了。我命名我的警报,选择警报类型…

…并选择我要发出警报的字段值以及阈值条件。最后,我指定要将这些警报发送到哪里…

…并配置连接。

如果我回到“管理任务”页面,我现在可以查看自动生成的 TICKscript。默认情况下,Kapacitor 将这些警报写入“chronograf”数据库。如果我想更改输出数据库,我可以简单地更改第 25 行。

var outputDB = 'chronograf'

就是这样!当我运行 while 循环并将误差发送到数据库时,每当我的误差大于 8.8 时,Kapacitor 都会通过 Slack 通知我。

如果我们看一下我的仪表板,您可以看到在包含异常的段中,我的误差大于 8.8,并且 Kapacitor 能够检测到它。

左侧单元格: 洋红色线代表每 32 个点的最大重建误差。在 13:57:20 左右发生的异常处,它开始超过 8.8。

右侧单元格: 我使用查询“SELECT max(“value”) AS “max_value” FROM “chronograf”.”autogen”.”alerts” limit 1”显示异常的最大误差

我希望这篇以及之前的博文能帮助您进行异常检测之旅。如果您发现任何令人困惑的地方,或者随时向我寻求帮助,请告诉我。您可以访问 InfluxData 社区站点 或在 Twitter 上 @InfluxDB 给我们发消息。

最后,为了保持一致性,我也想用一个脑筋急转弯来结束这篇博文。这里有一些变色龙送给您。