Mage.ai for InfluxDB 任务

导航至

任何现有的 InfluxDB 用户都会注意到,InfluxDB 在发布 InfluxDB 3.0 后发生了转变。InfluxDB v3 提供了比之前版本更好的 45 倍写入吞吐量,并且查询速度比之前的 InfluxDB 版本快 5-25 倍(更多性能基准测试,请参阅 这篇文章)。

我们还降低了几个 2.x 版本中存在的特性的优先级,以专注于与现有工具的互操作性。InfluxDB v2 中被降低优先级的特性之一是任务引擎。虽然任务引擎支持广泛的 Flux 函数,但它无法与已经存在的 ETL 工具相竞争,特别是用于数据准备、分析和转换的工具。这些工具中包括 Mage.ai

Mage 是一个开源的数据管道工具,用于转换和集成数据。您可以将其视为 Airflow 的替代品。在本教程中,我们将探讨使用 Mage.ai 在 InfluxDB Cloud v3 中构建时间序列数据的物化视图。

Data Explorer-mage-ai

InfluxDB Cloud UI 中收集下采样数据的示例

Mage.ai 简介

Mage 是一个开源的 ETL 工具。它附带一个易于使用的用户界面,可让您构建用于数据处理、数据转换和机器学习的数据管道。有关 Mage 的架构信息,请参阅以下文档。我对 Mage 的体验很短暂,但这是我目前喜欢它的原因

  • 易用性。UX 直观且灵活。预建脚本使添加代码元素变得容易。一切看起来都可以自定义。

  • Mage 在底层使用 Polars 和 Parquet

  • 它允许您从您的管道中创建模板,因此您可以轻松地共享、复制和操作您的 ETL 管道。

  • 它有一个 AI 功能,可以从纯英文中为您生成管道。

  • 有出色的社区支持。我对 Mage.ai 开发者的互动质量印象深刻,并对他们的响应速度感到感激。

  • 您可以轻松连接到各种数据源。

下面的截图显示了一个创建特定管道模板的示例。创建模板后,您只需点击 新建 并从现有模板加载,即可复制您的管道。

InfluxDB-pipelines

使用Mage的唯一缺点是你需要自己负责部署和管理Mage的部署。幸运的是,[他们的文档](https://docs.mage.ai/production/deploying-to-cloud/aws/setup)提供了如何在AWS、Azure、GCP和Digital Ocean上操作的详细说明,这最终为用户提供更多的控制和灵活性。他们还提供了[ Terraform模板](https://docs.mage.ai/production/deploying-to-cloud/using-terraform)和[ Helm图表](https://docs.mage.ai/production/deploying-to-cloud/using-helm)。

要求

要运行本教程,你需要以下内容

  1. 一个InfluxDB v3 Cloud账户。

    • 一个数据库或存储桶(从该数据库查询数据并将数据写入其中)。在这个演示中,我简单地使用了Telegraf代理将我的机器的本地CPU数据写入。
    • 一个认证令牌,允许你读取和写入该数据库
  2. 一个Docker安装,用于在容器中运行InfluxDB。

你可以按照这个快速入门指南了解更多关于如何使用Mage的信息。要复制本教程中的示例,克隆这个仓库,然后切换到该目录并运行以下命令

docker run -it \
           -p 6789:6789 \
           -e "TOKEN=<your InfluxDB token>" \
           -e "DATABASE=<your InfluxDB database>" \
           -e "HOST=<your InfluxDB host URL i.e. us-east-1-1.aws.cloud2.influxdata.com>" \
           -v "$(pwd):/home/src" \
           mageai/mageai \
           /app/run_app.sh mage start test

然后你可以导航到https://127.0.0.1:6789/来查看你的Mage项目和管道。

对于这个教程,我们使用了环境变量来设置令牌、主机和数据库。但是,你还可以利用Mage的秘密存储。

Mage secret store

启动容器后,导航到文件列表中的requirements.txt文件。点击“安装包”命令来安装InfluxDB v3 Python客户端库。

install the InfluxDB v3 Python Client Library

现在,你应该能够运行管道,甚至可以成功运行触发器。

使用Mage创建物化视图

在本教程中,我们将学习如何创建一个任务,将我们从InfluxDB中的数据降采样。具体来说,我们将构建以下管道

data loader

它包含三个模块

  1. query_influxdb:一个加载数据块,它使用Python InfluxDB v3客户端库查询InfluxDB并返回一个Pandas DataFrame。

  2. aggregate:一个转换块,它创建数据的降采样聚合。

  3. write_influxdb:一个导出数据块,它使用相同的客户端库将降采样数据写回InfluxDB。

此外,请注意,你可以查看作为本项目一部分创建的时间序列可视化。只需单击查询_influxdb模块,然后在右侧导航面板中单击图表图标。

the time series visualizations

创建触发器

你可以创建一个触发器,定期运行此任务。从左侧导航栏转到触发器页面。然后选择调度类型并填写触发器设置。此管道设置为每5分钟运行一次。

Creating-a-trigger

创建触发器后,你可以查看所有管道触发器的状态、日志和块运行情况。

Pipeline triggers

总结

InfluxDB是一个存储所有时间序列数据的好工具。它是基于Apache生态系统编写的,利用了诸如DataFusionArrowParquet等技术,以实现高效的写入、存储和查询。它还与其他许多工具具有互操作性,因此你可以利用它们满足特定的ETL和机器学习需求。Mage还利用了Polars(它是基于Apache Arrow构建的)和Parquet。结合起来,你可以对你的时间序列数据进行复杂的ETL操作。

从这里开始使用 InfluxDB Cloud 3.0。如果您需要任何帮助,请通过我们的 社区网站Slack 频道 联系我们。我很乐意了解您正在尝试实现什么以及您希望 InfluxDB 具有哪些功能。