BIRCH 结合 InfluxDB 进行异常检测

导航至

在本教程中,我们将使用来自 BIRCH(平衡迭代减少和使用层次聚类)算法,该算法来自 scikit-learn,并结合 ADTK(异常检测工具包)软件包来检测 CPU 行为异常。我们将使用 InfluxDB 2.0 Python 客户端来查询 InfluxDB 2.0 中的数据,并将其作为 Pandas DataFrame 返回。 

本教程假设您已在本地机器上安装并配置了 InfluxDB 和 Telegraf,以收集 CPU 统计信息。为了轻松收集本地机器上的系统统计信息,请安装 InfluxDB自动配置 Telegraf 以添加 System 插件。

我们建议在本博客中使用 Python 3.6+ 的虚拟环境运行代码。此项目的 requirements.txt 如下所示:

adtk==0.6.2
pandas==0.23.4
sklearn==0.23.1

BIRCH 简介

BIRCH(平衡迭代减少和使用层次聚类)是一种无监督聚类算法,针对大型数据集的高性能进行了优化。它还擅长减少数据集中的噪声,以找到有意义的模式并生成准确的模型。它类似于更流行的 k-means 聚类算法。

ADTK 和 scikit-learn 简介

ADTK (异常检测工具包) 是一个用于时序数据无监督异常检测的 Python 包。根据文档,“此软件包提供了一组通用的检测器、转换器和聚合器,具有统一的 API,以及将它们连接到模型中的管道类。它还提供了一些函数来处理和可视化时间序列和异常事件。” 我非常喜欢这个软件包,因为它易于使用、文档完善,并且模块高效且轻量级。

使用 BIRCH 进行时序异常检测的步骤

步骤一:导入依赖项

import pandas as pd
from sklearn.cluster import DBSCAN
from influxdb_client import InfluxDBClient
from adtk.detector import MinClusterDetector
from sklearn.cluster import Birch
from adtk.visualization import plot

步骤二:使用 InfluxDB 2.0 Python 客户端查询数据并返回 DataFrame

  1. 收集授权和查询参数(令牌组织存储桶),并将它们存储在变量中。
  2. 创建一个 Flux 查询,从本地机器收集 CPU 数据。Flux 查询使用 pivot() 和 drop() 将我们的数据转换为正确的形状。当然,这种数据转换也可以使用 Pandas 执行。
  3. 将这些变量传递到客户端对象中,并实例化客户端。
  4. 使用 query_data_frame() 方法将我们的数据作为 Pandas DataFrame 返回。
token = <your token>
org = <your organization>
client = InfluxDBClient(url="http://localhost:9999", token=token, org=org)
query = '''from(bucket: "your-bucket")
  |> range(start: 2020-06-18T18:00:00Z ,  stop: 2020-06-20T02:00:00Z)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["_field"] == "usage_system")
  |> pivot(rowKey:["_time"], columnKey: ["cpu"], valueColumn: "_value")
  |> drop(columns:["_start", "_stop", "host", "_field", "_measurement"])'''

query_api = client.query_api()
df = query_api.query_data_frame(query)
df.head()

步骤三:转换和准备数据

  • 为了准备 DataFrame `df`,以便 ADTK MinCluserDetector 函数使用,请执行剩余的数据转换
    • 将 time 列转换为 datetime 对象并将其设为索引。
    • 删除任何无关的列。
  • 使用 head() 返回我们在步骤二中创建的 DataFrame 的前五行。
df["_time"] = pd.to_datetime(df["_time"].astype(str))
df = df.drop(columns=["result", "table"])
df = df.set_index("_time")
df.head()

步骤四:使用 ADTK MinClusterDetector 函数将 sklearn 的 BIRCH 算法应用于我们的 dataframe

  • 使用我们期望的 scikit-learn 异常检测算法 BIRCH 实例化 MinClusterDetector 函数,并指定聚类数量 n_clusters=10
  • 根据文档,MinClusterDetector “函数将多元时间序列视为高维空间中的独立点,将它们划分为聚类,并将最小聚类中的值识别为异常值。这可能有助于捕获高维空间中的异常值”。

请注意: 

    • 您可以将任何 scikit-learn 聚类算法作为模型类型传递到 MiniClusterDetector 函数中,只要它具有 fitprediction 方法。
    • 聚类数量值是为本教程随意选择的。通过目视检查各种 n_clusters 值来检查 ADTK BIRCH 模型的成功率。该模型似乎在 n_clusters=10 时成功拟合数据并标记异常。
    • 由于 BIRCH 是一种无监督学习技术,因此优化聚类数量需要分析聚类数量的变化如何影响异常检测的准确性。此分析需要获取标记数据集,以衡量我们的模型针对聚类数量变化的准确性。我没有标记的异常 CPU 统计数据集,因此我没有进行此分析。但是,确定聚类大小对于有效使用聚类算法并且相当简单至关重要。要了解有关聚类数量选择的更多信息,请查看本文
  • 使用 fit_detect() 函数检测异常并将序列存储到变量中。
  • 使用 ADTK 的 plot 函数绘制 DataFrame 以及异常。指定绘图属性。
min_cluster_detector = MinClusterDetector(Birch(n_clusters=10))
anomalies = min_cluster_detector.fit_detect(df)
plot(df, anomaly=anomalies, ts_linewidth=1, ts_markersize=3, anomaly_color='red', anomaly_alpha=0.3, curve_group='all')

time series anomalies<figcaption> 将 BIRCH 应用于 DataFrame 以对跨 CPU 的时间序列异常进行建模</figcaption>

MinClusterDetector 函数和 BIRCH 的应用检测到 06-18-20 和 06-19-18 附近的异常。让我们放大一些异常。

plot(df[500:525], anomaly=anomalies[500:525], ts_linewidth=2, ts_markersize=4, anomaly_color='red', anomaly_alpha=2.0, curve_group='all')

anomalies detected<figcaption> 更仔细地查看 BIRCH 和 ADTK 检测到的异常</figcaption>

Cpu0 的行为与其他处理器不同。具体而言,当其余处理器表现出负趋势时,cpu0(橙色)表现出正趋势。

关于将 BIRCH 异常检测与 InfluxDB 结合的结论

虽然本教程重点介绍了 MinClusterDetector 函数的使用,但 ADTK 软件包具有多种有效且轻量级的异常检测函数。我鼓励您查看它们并找到适合您的时间序列用例的函数。当然,您可能想知道:“如何将 ADTK 软件包应用于我在 InfluxDB 中以连续函数形式存在的时间序列数据?”

您的选择包括

我希望本教程能够启发您将 ADTK 和异常检测集成到您的 InfluxDB 驱动的解决方案中。您可以在评论区、我们的社区站点或我们的 Slack 频道中分享您的想法、疑虑或问题。我们很乐意获得您的反馈并帮助您解决遇到的任何问题!