Mage.ai 用于 InfluxDB 的任务
作者:Anais Dotis-Georgiou / 产品
2023 年 9 月 22 日
导航至
任何现有的 InfluxDB 用户都会注意到,随着 InfluxDB 3.0 的发布,InfluxDB 经历了转型。与之前的 InfluxDB 版本相比,InfluxDB v3 提供了 45 倍的写入吞吐量,查询速度提高了 5-25 倍(有关更多性能基准,请参阅 这篇文章)。
我们还降低了 2.x 中存在的几个功能的优先级,以专注于与现有工具的互操作性。InfluxDB v2 中被降低优先级的功能之一是任务引擎。虽然任务引擎支持大量的 Flux 函数,但它无法与已经存在的大量 ETL 工具竞争,尤其是在数据准备、分析和转换方面。在众多工具中,Mage.ai 就是其中之一。
Mage 是一款开源数据管道工具,用于转换和集成数据。您可以将其视为 Airflow 的替代品。在本教程中,我们将初步了解如何使用 Mage.ai 在 InfluxDB Cloud v3 中构建时间序列数据的物化视图。
Mage.ai 简介
Mage 是一款 OSS ETL 工具。它配备了易于使用的 UI,使您能够构建用于数据处理、数据转换和机器学习的数据管道。要了解有关 Mage 架构 的更多信息,请查看以下文档。我对 Mage 的使用经验很短,仅仅触及皮毛,但这就是我目前喜欢它的地方
-
易于使用。用户体验直观且灵活。预构建的脚本使添加代码元素变得容易。一切看起来都是可定制的。
-
Mage 在底层使用 Polars 和 Parquet。
-
它允许您从管道中创建模板,因此您可以轻松地共享、复制和操作您的 ETL 管道。
-
它具有 AI 功能,可以从简单的英语为您生成管道。
-
社区支持非常棒。我对与 Mage.ai 开发人员互动的质量和响应速度印象深刻并心存感激。
-
您可以开箱即用地连接到各种数据源。
下面的屏幕截图显示了为特定管道创建模板的示例。创建模板后,您只需单击“+新建”并从现有模板加载即可复制您的管道。
使用 Mage 的唯一缺点是您需要负责部署和管理 Mage 的部署。幸运的是,他们的文档 提供了有关如何在 AWS、Azure、GCP 和 Digital Ocean 上执行此操作的详细说明,这最终为用户提供了更多的控制和灵活性。他们还提供了 Terraform 模板 和 Helm charts。
要求
要运行本教程,您需要以下内容
-
一个 InfluxDB v3 Cloud 帐户。
- 数据库或存储桶(您从中查询数据和写入数据的)。对于本演示,我只是使用 Telegraf 代理从我的机器写入本地 CPU 数据。
- 允许您读取和写入该数据库的身份验证令牌
-
用于在容器中运行 InfluxDB 的 Docker 安装。
您可以按照本快速入门指南了解有关如何使用 Mage 的更多信息。要复制本教程中的示例,请克隆 此 repo,并将目录更改为它并运行以下命令
docker run -it \
-p 6789:6789 \
-e "TOKEN=<your InfluxDB token>" \
-e "DATABASE=<your InfluxDB database>" \
-e "HOST=<your InfluxDB host URL i.e. us-east-1-1.aws.cloud2.influxdata.com>" \
-v "$(pwd):/home/src" \
mageai/mageai \
/app/run_app.sh mage start test
然后,您可以导航到 http://localhost:6789/
以查看您的 Mage 项目和管道。
在本教程中,我们对 Token、Host 和 Database 使用了环境变量。但是,您也可以利用 Mage 的密钥存储。
启动容器后,导航到文件列表中的 requirements.txt
文件。单击“安装软件包”命令以安装 InfluxDB v3 Python 客户端库。
现在,您应该能够运行管道,甚至成功运行触发器。
使用 Mage 创建物化视图
在本教程中,我们将学习如何创建一个任务,该任务对来自 InfluxDB 的数据进行降采样。具体来说,我们将构建以下管道
它包含三个块
-
query_influxdb
:一个加载器数据块,它使用 Python InfluxDB v3 客户端库来查询 InfluxDB 并返回 Pandas DataFrame。 -
aggregate
:一个转换块,用于创建数据的降采样聚合。 -
write_influxdb
:一个导出器数据块,它使用相同的客户端库将降采样数据写回 InfluxDB。
此外,请注意如何查看作为本项目一部分创建的时间序列可视化。只需单击 query_influxdb
块,然后单击右侧导航面板中的图表图标。
创建触发器
您可以创建一个触发器来安排此任务定期运行。从左侧导航栏导航到触发器页面。然后选择“计划类型”并填写触发器设置。此管道设置为每 5 分钟运行一次。
创建触发器后,您可以查看所有管道触发器的状态、日志和块运行
最终想法
InfluxDB 是存储所有时间序列数据的绝佳工具。它构建于 Apache 生态系统之上,并利用 DataFusion、Arrow 和 Parquet 等技术来实现高效的写入、存储和查询。它还与其他许多工具提供互操作性,因此您可以针对特定的 ETL 和机器学习需求利用它们。Mage 还利用 Polars(构建于 Apache Arrow 之上)和 Parquet。结合使用,您可以对时间序列数据执行复杂的 ETL 任务。
从这里开始使用 InfluxDB Cloud 3.0。如果您需要任何帮助,请通过我们的社区网站或 Slack 频道与我们联系。我很乐意了解您尝试实现的目标以及您希望 InfluxDB 拥有的功能。