为 InfluxDB 3 Processing Engine 构建您的第一个 Python 插件

导航至

**注意:此博客运行的是 InfluxDB 3 Core CLI,而非 Enterprise。

InfluxDB 3 最引人注目的功能之一是其内置的 Python Processing Engine,这是一个多功能组件,为 InfluxDB 3 Core 增加了强大的实时处理能力。对于那些熟悉 InfluxDB 1.x 中的 Kapacitor 或 2.x 中的 Flux Tasks 的用户来说,Processing Engine 代表了一种更精简、集成和可扩展的数据处理方法。借助直接在数据库中运行 Python 代码的能力,您不再需要外部服务器或复杂的数据管道来处理传入的信息。

Processing Engine 可以在数据到达时、按需或按计划触发操作,使其成为实时转换、规范化、警报、降采样和边缘数据复制的理想选择。在本博客中,我们将构建一个 Python 插件,用于标准化来自不同来源的 IoT 数据。标准化至关重要,因为 IoT 设备通常以不一致的格式生成数据——不同的单位、结构或命名约定——使分析和决策变得复杂。通过在摄取点规范化这些数据,您可以简化下游查询,确保数据集之间的一致性,并提高分析的可靠性。

要求

要遵循本教程,您需要

  • 您的机器上安装了 Docker。
  • 代码编辑器,例如 Visual Studio Code (VS Code) 或您选择的其他集成开发环境 (IDE)。

注意:在本教程中使用 Docker 可确保您可以轻松启动兼容的环境,而无需复杂的设置步骤,从而使您能够专注于构建和测试您的 Python 插件。我们将逐步介绍如何使用 Docker 从头开始创建插件的过程,但您也可以在本地运行本教程,而无需 Docker 命令。

本教程还假定您对 Docker 基础知识有所了解,例如运行容器、管理镜像和使用 Docker Compose。此外,对 InfluxDB 的基本了解,包括 Line Protocol 摄取格式和 InfluxDB 3 Core CLI 也很有帮助。如果您不熟悉这些概念中的任何一个,我们建议您在继续操作之前查看 InfluxDB 文档或 Docker 的入门指南。

流程概述

当处理来自各种设备的 IoT 数据时,单位、字段名称和测量结构的不一致性很常见。不同的传感器可能以华氏度或摄氏度报告温度,以帕斯卡或千帕斯卡报告压力,以及不一致的命名约定,如 humidity_percent 与 humidity。这种可变性使得查询、分析和关联数据变得不必要的复杂,导致报告错误、洞察延迟和维护开销增加。

在数据摄取期间标准化单位和字段名称可确保为下游分析提供一致且可靠的数据集。除了简化分析之外,一致的命名约定对于维护符合行业标准、法规要求和内部数据治理策略至关重要。借助标准化、结构良好的数据,团队可以更自信地生成报告、审核历史记录以及与其他依赖清晰、可预测数据格式的系统集成。

我们的插件将通过在数据摄取时标准化单位和名称来解决此问题,从而确保为下游分析提供一致且可靠的数据集。

以下是该过程的高级概览

  • 创建插件目录:在您的本地 InfluxDB 3 环境中设置一个插件目录,并为其赋予读/写权限。
  • 使用 Docker 启动 InfluxDB 3:拉取 InfluxDB 3 Core Docker 镜像并启动容器,挂载插件目录。
  • 编写 Python 处理脚本:创建一个 Python 脚本,将测量值转换为标准化单位和命名约定。
  • 创建源数据库和目标数据库:使用 CLI 创建用于原始数据和标准化数据的数据库。
  • 测试并启用插件:使用示例数据测试脚本,然后启用插件。
  • 写入数据并验证转换:摄取具有不一致格式的示例 IoT 数据,并查询目标数据库以确认标准化是否成功。

构建您的第一个插件

如概述中所述,首先在以下位置创建一个插件目录

mkdir -p ~/influxdb3/plugins

确保该目录具有必要的读取和写入权限

chmod 755 ~/influxdb3/plugins

接下来,拉取最新的 InfluxDB 3 Enterprise Docker 镜像

docker pull quay.io/influxdb/influxdb3-enterprise:latest

有关运行 Enterprise 或 Core 版本的详细信息,请参阅官方文档,具体取决于您的设置。现在,我们准备使用以下命令启动 InfluxDB 3 容器

docker run -it \
  -v ~/influxdb3/data:/var/lib/influxdb3 \
  -v ~/influxdb3/plugins:/plugins \
  -p 8181:8181 \
  --user root \
  quay.io/influxdb/influxdb3-enterprise:latest serve \
  --node-id my_host \
  --object-store file \
  --data-dir /var/lib/influxdb3 \
  --plugin-dir /plugins

让我们分解一下这个命令

  • -v ~/influxdb3/data:/var/lib/influxdb3:将本地数据目录挂载为数据库的持久存储位置。
  • -v ~/influxdb3/plugins:/plugins:挂载插件目录,我们的 Python 插件将位于该目录中,使其可供 Processing Engine 访问。
  • -p 8181:8181:将端口 8181 从容器映射到主机,允许访问 InfluxDB 3 API。
  • --user root:确保容器以 root 权限运行,Processing Engine 访问插件需要此权限。
  • serve:启动 InfluxDB 3 服务器。
  • --node-id my_host:分配唯一的节点 ID,可以根据您的环境进行自定义。
  • --object-store file:配置数据库以使用本地文件系统进行对象存储。
  • --data-dir /var/lib/influxdb3:指向 InfluxDB 将持久化其数据的目录。
  • --plugin-dir /plugins:指示 InfluxDB 从挂载的插件目录加载任何可用的插件。

注意:如果您在本地运行 InfluxDB 3 Core 或 Enterprise,您需要使用以下命令启动您的 InfluxDB 3 实例并设置插件目录(通过 InfluxDB 3 Core 和 Enterprise 的 CLI 操作文档 了解有关 serve 命令选项的更多信息)

influxdb3 serve --object-store file --data-dir ~/.influxdb3/data --node-id my_host --plugin-dir ~/influxdb3/plugins

通过此设置,Processing Engine 将有权访问您的 Python 插件,并准备好对传入数据应用转换。

现在,我们准备编写 Python 脚本来处理我们的数据标准化。我们将脚本命名为 hello_world.py。它将包含一个名为 process_writes 的函数,该函数执行核心转换逻辑。此函数处理传入的传感器数据批次,标准化字段名称、标签和单位,以确保数据集之间的一致性。它迭代每个表批次,记录关键信息,并跳过与预定义的排除规则匹配的表。对于每一行,它将传感器名称和位置转换为小写,并将空格替换为下划线以保持统一的命名约定。它还通过添加时间戳字段来丰富数据,指示记录何时被处理。最后,该函数将转换后的数据写入名为 unified_sensor_data 的新 InfluxDB 3 数据库,确保所有传感器记录共享一致的结构,以便更轻松地进行查询、分析和遵守数据标准。

import datetime

def process_writes(influxdb3_local, table_batches, args=None):
    # Log the provided arguments
    if args:
        for key, value in args.items():
            influxdb3_local.info(f"{key}: {value}")

   # Process each table batch
    for table_batch in table_batches:
        table_name = table_batch["table_name"]
        influxdb3_local.info(f"Processing table: {table_name}")

      # Skip processing a specific table if needed
        if table_name == "exclude_table":
            continue

      # Analyze each row
        for row in table_batch["rows"]:
            influxdb3_local.info(f"Row: {row}")

            # Standardize sensor names (lowercase, no spaces)
            sensor_name = row.get("sensor", "unknown").lower().replace(" ", "_")
            influxdb3_local.info(f"Standardized sensor name: {sensor_name}")

            # Standardize location and other tags by replacing spaces with underscores
            location = row.get("location", "unknown").lower().replace(" ", "_")

            # Add enriched field (e.g., timestamp)
            line = LineBuilder(table_name)
            line.tag("sensor", sensor_name)
            line.tag("location", location)
            line.float64_field("temperature_c", row.get("temperature", 0))
            line.string_field("processed_at", datetime.datetime.utcnow().isoformat())

            # Write the enriched data to a different database
            influxdb3_local.write_to_db("unified_sensor_data", line)

    influxdb3_local.info("Processing completed")

使用 influxdb3 create database 命令创建源数据库和目标数据库

docker exec {container id} influxdb3 create database my_databasedocker exec {container id} influxdb3 create database unified_sensor_data

测试您的插件

您可以使用 influxdb3 test command 在目标数据库上测试您的插件

docker exec {container id} influxdb3 test wal_plugin \
-d my_database \
--lp="sensor_data,location=living\\ room temperature=22.5 123456789" \
/plugins/hello-world.py

该命令输出一个 JSON 对象,其中包含日志信息、数据库写入和错误。它向我们展示了触发器正在解析来自 sensor_data measurement 的数据,标准化数据,并将转换后的 Line Protocol 写入 unified_sensor_data 数据库,且没有错误

{
  "log_lines": [
    "INFO: Processing table: sensor_data",
    "INFO: Row: {'location': 'living room', 'temperature': 22.5, 'time': 123456789}",
    "INFO: Processing completed"
  ],
  "database_writes": {
    "my_database": [],
    "unified_sensor_data": [
      "sensor_data,sensor=unknown,location=living\\ room temperature_c=22.5,processed_at=\"2025-02-13T21:33:44.117195\""
    ]
  },
  "errors": []
}

创建并启用您的触发器

既然测试成功通过,让我们创建一个触发器并启用它来运行我们的 Python 插件。以下命令运行 influxdb3 create trigger 命令。-d 选项指定将应用触发器的数据库。--plugin-filename="/plugins/hello-world.py" 选项指向将在触发器激活时执行的插件脚本。--trigger-spec="all_tables" 选项指示触发器应应用于指定数据库中的所有表。最后,hello_world_trigger 是分配给触发器的名称。

docker exec {container id} influxdb3 create trigger \
-d my_database \
--plugin-filename="/plugins/hello-world.py" \
--trigger-spec="all_tables"  \
hello_world_trigger

现在我们可以使用 influxdb3 enable trigger 命令启用它

docker exec {container id} influxdb3 enable trigger \
--database my_database  \
hello_world_trigger

验证您的触发器和插件是否正常工作

为了验证我们的触发器是否已正确启用以及我们的插件是否按预期工作,我们可以向 my_database 源数据库写入一行,并查询 unified_sensor_data database 以验证标准化是否按预期工作。使用 influxdb3 write 命令完成前者

docker exec {container id} influxdb3 write \
--database my_database \
"sensor_data,sensor=TempSensor1,location=living\\ room temperature=22.5 123456789"

最后,我们使用 influxdb3 query 命令验证我们的数据是否已转换为我们的标准

docker exec {container id} influxdb3 query \
--database unified_sensor_data \
"SELECT * FROM sensor_data"

输出确认我们的 location 标签值中的空格已正确替换为下划线,添加了 processed_at 字段,我们的 sensor 标签值已转换为小写,并且我们的 temperature 字段键现在包含温度单位。

最终想法

我希望本教程能帮助您开始使用 Docker 在 InfluxDB 3 Core 和 Enterprise 中创建 Python 插件并启用触发器。我鼓励您查看 InfluxData/influxdb3_plugins,因为我们开始在那里添加示例和插件。我还邀请您贡献您在那里创建的任何插件!立即下载 Core 或 Enterprise 开始使用。在 Discord 的 #influxdb3_core 频道、Slack 的 #influxdb3_core 频道或我们的 社区论坛 上与我们的开发团队分享您的反馈。