Mage 结合 InfluxDB 和半空间树进行异常检测
作者:Anais Dotis-Georgiou / 开发者
2023 年 11 月 10 日
导航至
任何现有的 InfluxDB 用户都会注意到,随着 InfluxDB 3.0 的发布,InfluxDB 经历了转型。与以前版本的 InfluxDB 相比,InfluxDB v3 提供了 45 倍更好的写入吞吐量和 5-25 倍更快的查询速度(有关更多性能基准,请参阅这篇文章)。
使用 InfluxDB 进行 ETL
我们还降低了 2.x 中存在的几个功能的优先级,以专注于与现有工具的互操作性。任务引擎是 InfluxDB v2 中存在的优先级降低的功能之一。虽然任务引擎支持广泛的 Flux 函数集合,但它无法与已经存在的、专门用于数据准备、分析和转换的大量提取、转换、加载 (ETL) 工具竞争。在众多工具中,Mage.ai 就是其中之一。Mage 是 Airflow 的开源替代品。在之前的文章中,我描述了如何使用 Mage 进行简单的降采样任务。本教程假设您已阅读该文章和/或对如何使用 Mage 有基本的了解。在本教程中,我们将学习如何使用 Mage 进行异常检测,并将警报发送到 Slack webhook。具体来说,我们将生成机器数据、生成异常,并使用半空间树检测它们。
要求
要运行本教程,您需要以下内容
- 一个 InfluxDB v3 Cloud 帐户。
- 一个数据库或存储桶(您从中查询数据并向其中写入数据的数据库或存储桶)。
- 一个允许您读取和写入该数据库的身份验证令牌。
- 一个 Docker 安装,用于在容器中运行 InfluxDB。
- 一个包含以下信息的
.env
文件
bash
export INFLUX_HOST=
export INFLUX_TOKEN=
export INFLUX_ORG=
export INFLUX_DATABASE=
export MAGE_SLACK_WEBHOOK_URL=https://hooks.slack.com/services/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
export MAGE_PROJECT_NAME=influx-magic
export MAGE_ENV=dev
您可以按照此快速入门指南了解更多关于如何使用 Mage 的信息。要复制本教程中的示例,请克隆此仓库,更改目录到其中,并运行以下命令
- 获取
env
文件:bash source .env
- 构建镜像:
bash docker compose build
- 启动它们:
bash docker compose up -d magic
注意:本教程使用 InfluxData Slack webhook #notifications-testing 频道。请加入以在那里发送和接收通知,或将 env 文件替换为您选择的 Slack webhook。
数据集
在本示例中,我们将生成机器数据。具体来说,是三台机器(machine1、machine2 和 machine3)的负载、振动、功率和温度数据。要创建异常,请导航到 localhost:5005
并单击一台机器以切换异常创建。生成数据的代码来自此项目,该项目展示了如何使用 Arrow、Docker 和一个名为 Anomaly Detection Tool Kit (ADTK) 的分析库构建一个简单的任务引擎。它使用 Mosquitto 和 Telegraf 将 MQTT 数据写入 InfluxDB Cloud v3。
InfluxDB v3 Cloud UI 中数据浏览器显示的正常机器数据示例。
Mage Pipeline
该 pipeline 包含四个块
- Load_influx_data
- 使用 InfluxDB v3 Python 客户端查询机器数据并返回 Pandas DataFrame。
- Transform_data
- 在 DataFrame 中创建一个新的列
unique_id
。对于 DataFrame 中的每一行,它通过 slugify 提供程序然后附加机器 ID(在连字符后)来构造一个唯一的 ID。 - 更改时间戳日期时间格式。
- 在 DataFrame 中创建一个新的列
- Detect_anomalies
- 初始化一个新的 DataFrame。
- 循环遍历唯一 ID:对于输入 DataFrame 的
unique_id
列中存在的每个唯一 ID (uid),该函数继续进行异常检测。 - 模型创建:此模型采用 HalfSpaceTrees 方法,这是一种在线异常检测方法。
- 准备数据。
- 遍历行并检测异常。
- 合并结果。
- 返回结果。
- Check_anomalies
- 遍历唯一 ID。
- 按唯一 ID 过滤数据。
- 检查异常。
- 准备数据以进行绘图。
- 绘制数据。
模型选择:半空间树
本示例使用半空间树来检测异常。半空间树 (HST) 是一种机器学习算法,用于检测高维数据中的异常。当处理期望异常占据高维空间内特定区域或子空间的数据时,它特别有用。HST 是基于隔离森林方法的一种类型,它们的工作原理是将特征空间划分为区域,并将异常隔离到较小的分区中。
以下是半空间树如何工作以检测异常
- 数据在特征空间中通过超平面进行分区。超平面是随机生成的。
- 添加额外的平面以递归地分割数据集并对其进行分区,直到分区中的点数达到最小阈值。每个额外的平面都会在树中创建一个节点。
- 点在树中的深度定义了异常。或者它们是否存在于较小的分区中。
最终想法
在本教程中,我们每次运行 pipeline 时都会导入所有客户端和模型。但是,如果您希望加载大型模型,则不希望每次运行 pipeline 时都这样做。我鼓励您利用条件块。条件块或“附加块”是与另一个块关联的块。Pipeline 在执行父块之前评估条件,这决定了父块是否被执行。您可以使用它们来实例化转换块的一部分(您将在其中执行异常检测),但不必每次都运行它。
InfluxDB 作为高效管理时间序列数据的出色解决方案脱颖而出。它构建在 Apache 生态系统之上,并利用 DataFusion、Arrow 和 Parquet 等技术来优化数据写入、存储和查询。此外,它还拥有与各种其他工具的兼容性,使您可以利用它们来满足您的特定 ETL 和机器学习需求。此外,Mage 还利用了基于 Apache Arrow 和 Parquet 的 Polars。通过结合这些技术,您可以在时间序列数据上执行高级 ETL 操作。
单击此处开始使用 InfluxDB Cloud 3.0。如果您需要任何帮助,请使用我们的社区站点或 Slack 频道联系我们。我很乐意了解您尝试实现的目标以及您希望 InfluxDB 拥有的功能。