Mage.ai 用于 InfluxDB 的任务

导航至

任何现有的 InfluxDB 用户都会注意到,随着 InfluxDB 3.0 的发布,InfluxDB 经历了转型。与之前的 InfluxDB 版本相比,InfluxDB v3 提供了 45 倍的写入吞吐量,查询速度提高了 5-25 倍(有关更多性能基准,请参阅 这篇文章)。

我们还降低了 2.x 中存在的几个功能的优先级,以专注于与现有工具的互操作性。InfluxDB v2 中被降低优先级的功能之一是任务引擎。虽然任务引擎支持大量的 Flux 函数,但它无法与已经存在的大量 ETL 工具竞争,尤其是在数据准备、分析和转换方面。在众多工具中,Mage.ai 就是其中之一。

Mage 是一款开源数据管道工具,用于转换和集成数据。您可以将其视为 Airflow 的替代品。在本教程中,我们将初步了解如何使用 Mage.ai 在 InfluxDB Cloud v3 中构建时间序列数据的物化视图。

Data Explorer-mage-ai

在 InfluxDB Cloud UI 中收集降采样数据的示例

Mage.ai 简介

Mage 是一款 OSS ETL 工具。它配备了易于使用的 UI,使您能够构建用于数据处理、数据转换和机器学习的数据管道。要了解有关 Mage 架构 的更多信息,请查看以下文档。我对 Mage 的使用经验很短,仅仅触及皮毛,但这就是我目前喜欢它的地方

  • 易于使用。用户体验直观且灵活。预构建的脚本使添加代码元素变得容易。一切看起来都是可定制的。

  • Mage 在底层使用 Polars 和 Parquet

  • 它允许您从管道中创建模板,因此您可以轻松地共享、复制和操作您的 ETL 管道。

  • 它具有 AI 功能,可以从简单的英语为您生成管道

  • 社区支持非常棒。我对与 Mage.ai 开发人员互动的质量和响应速度印象深刻并心存感激。

  • 您可以开箱即用地连接到各种数据源。

下面的屏幕截图显示了为特定管道创建模板的示例。创建模板后,您只需单击“+新建”并从现有模板加载即可复制您的管道。

InfluxDB-pipelines

使用 Mage 的唯一缺点是您需要负责部署和管理 Mage 的部署。幸运的是,他们的文档 提供了有关如何在 AWS、Azure、GCP 和 Digital Ocean 上执行此操作的详细说明,这最终为用户提供了更多的控制和灵活性。他们还提供了 Terraform 模板Helm charts

要求

要运行本教程,您需要以下内容

  1. 一个 InfluxDB v3 Cloud 帐户。

    • 数据库或存储桶(您从中查询数据和写入数据的)。对于本演示,我只是使用 Telegraf 代理从我的机器写入本地 CPU 数据。
    • 允许您读取和写入该数据库的身份验证令牌
  2. 用于在容器中运行 InfluxDB 的 Docker 安装。

您可以按照本快速入门指南了解有关如何使用 Mage 的更多信息。要复制本教程中的示例,请克隆 此 repo,并将目录更改为它并运行以下命令

docker run -it \
           -p 6789:6789 \
           -e "TOKEN=<your InfluxDB token>" \
           -e "DATABASE=<your InfluxDB database>" \
           -e "HOST=<your InfluxDB host URL i.e. us-east-1-1.aws.cloud2.influxdata.com>" \
           -v "$(pwd):/home/src" \
           mageai/mageai \
           /app/run_app.sh mage start test

然后,您可以导航到 http://localhost:6789/ 以查看您的 Mage 项目和管道。

在本教程中,我们对 Token、Host 和 Database 使用了环境变量。但是,您也可以利用 Mage 的密钥存储。

Mage secret store

启动容器后,导航到文件列表中的 requirements.txt 文件。单击“安装软件包”命令以安装 InfluxDB v3 Python 客户端库。

install the InfluxDB v3 Python Client Library

现在,您应该能够运行管道,甚至成功运行触发器。

使用 Mage 创建物化视图

在本教程中,我们将学习如何创建一个任务,该任务对来自 InfluxDB 的数据进行降采样。具体来说,我们将构建以下管道

data loader

它包含三个块

  1. query_influxdb:一个加载器数据块,它使用 Python InfluxDB v3 客户端库来查询 InfluxDB 并返回 Pandas DataFrame。

  2. aggregate:一个转换块,用于创建数据的降采样聚合。

  3. write_influxdb:一个导出器数据块,它使用相同的客户端库将降采样数据写回 InfluxDB。

此外,请注意如何查看作为本项目一部分创建的时间序列可视化。只需单击 query_influxdb 块,然后单击右侧导航面板中的图表图标。

the time series visualizations

创建触发器

您可以创建一个触发器来安排此任务定期运行。从左侧导航栏导航到触发器页面。然后选择“计划类型”并填写触发器设置。此管道设置为每 5 分钟运行一次。

Creating-a-trigger

创建触发器后,您可以查看所有管道触发器的状态、日志和块运行

Pipeline triggers

最终想法

InfluxDB 是存储所有时间序列数据的绝佳工具。它构建于 Apache 生态系统之上,并利用 DataFusionArrowParquet 等技术来实现高效的写入、存储和查询。它还与其他许多工具提供互操作性,因此您可以针对特定的 ETL 和机器学习需求利用它们。Mage 还利用 Polars(构建于 Apache Arrow 之上)和 Parquet。结合使用,您可以对时间序列数据执行复杂的 ETL 任务。

这里开始使用 InfluxDB Cloud 3.0。如果您需要任何帮助,请通过我们的社区网站Slack 频道与我们联系。我很乐意了解您尝试实现的目标以及您希望 InfluxDB 拥有的功能。