使用 InfluxDB 3 中的全新 Python 处理引擎转换数据
作者:Peter Barnett / 开发者
2025 年 2 月 20 日
导航至
一月初,我们宣布公开发布 InfluxDB 3 Core 和 InfluxDB 3 Enterprise 公共 Alpha 版。最新包含的功能之一是 InfluxDB 3 处理引擎——一个基于 Python 的虚拟机,旨在实现数据转换、增强、降采样、警报等功能,所有这些都在数据库本身内完成。一个月后,我们很高兴发布重大更新,启用与数据交互和转换数据的新方式。
我们构建处理引擎的原因
在 InfluxData,我们专注于三个关键目标:收集、组织和处理时间序列数据。确保这三者无缝协作对于大规模交付可操作的见解至关重要。
多年来,我们开发了 Kapacitor 和 Flux Tasks 等工具,以帮助用户转换和处理他们的数据。InfluxDB 3 以这些基础为构建,通过将轻量级 Python 虚拟机直接嵌入到数据库中——我们的处理引擎——使得在数据库内自动化工作流程、丰富数据和创建自定义处理逻辑比以往任何时候都更加容易。
我们期望用户将这个新引擎用于关键任务,例如数据摄取时的实时转换、运行计划分析以及构建自定义警报模块。
在何处以及何时使用处理引擎
构建处理引擎是为了解决实际问题。我们已经看到早期用户利用它进行异常检测,并在达到阈值时发送 Slack 警报。一位用户将其应用于使用其移动设备坐标的实时天气信息来丰富数据。我们还看到了预测分析、API 集成、报告等的用例。
我们选择使用 Python 是因为它令人难以置信的强大功能和简洁性。您可以设置每周触发器来拉取数据,利用 Pandas 进行分析,使用 Ploty 制作图表,导出为 PDF,上传到 S3,并发送包含报告链接的 Slack 通知。所有这些都在数据库内发生,您可以根据需要轻松地调整和修改它。
我们相信 LLM 的力量,尤其是与 Python 代码结合使用时。我们希望它们继续帮助用户快速构建插件以解决自定义问题,使他们能够每天专注于更高阶的问题。
那么,它是如何工作的呢?
处理引擎由插件和触发器组成。插件是自定义 Python 脚本,可以访问整个 Python 库阵列。要运行插件,必须将其附加到四个预定义的事件触发器之一
- WAL Flush:每当预写日志 (WAL) 刷新到对象存储时执行(默认情况下每秒一次)。使用此功能可以立即评估和转换写入的数据,而无需查询它。
- 计划任务:按定义的计划运行(使用 cron 语法指定)。考虑将此触发器用于诸如定期聚合、数据清理、自动化报告等用例。
- 按需请求:当向
/api/v3/engine
下的自定义端点发出 GET 或 POST 请求时执行。您可以将其用于基于 Webhook 的操作、外部触发器和用户发起的计算。
我们设计的每种触发器类型都与现有工作负载集成,为您提供适用于不同工作负载(例如,实时与批处理)的灵活且可扩展的工具。
使用处理引擎
对于那些感兴趣的人,我们将在这里深入探讨一些细节(我们将在文档中提供更详细的指导)。要开始使用引擎,请在启动服务器时提供 --plugin-dir
参数,以指定用于存储插件的目录。您可以在我们的公共 GitHub 存储库中参考和贡献示例插件。随着更多插件的开发,我们将创建一个更长期的空间,用于发现和利用 InfluxData 和社区构建的插件。
所有插件类型都可以访问共享 API,从而实现与数据库的无缝交互。此 API 提供必要的工具,以简单的用户体验高效地读取、转换和写入数据。让我们分解一下,然后展示如何将它们重新组合在一起。
写入和查询数据
让我们通过一个使用物联网传感器数据监测风速的示例,我们将摄取、理解并采取行动。只有当您可以将丰富/处理后的数据写回系统时,数据转换才有用。这就是 LineBuilder
的用武之地,它允许您构建可以写回数据库的 Line Protocol。
line = LineBuilder("weather")
.tag("location", "us-midwest")
.float64_field("wind_speed", 22.5)
.time_ns(1627680000000000000)
influxdb3_local.write(line)
此代码片段创建一个名为 weather
的新测量,使用位置标记它,并在特定时间戳记录温度值。write
函数将数据提交到数据库。
有时,您需要拉取历史数据来做出决策或生成新指标。query
函数通过允许您从插件内部执行 SQL 查询,使这变得容易。此外,考虑到这些类型的查询通常本质上是分析性的,InfluxDB 的列式存储使其在响应处理引擎时非常高效。
results = influxdb3_local.query("SELECT wind_speed FROM weather WHERE time > now() - INTERVAL '10 minutes'")
# Log the results to "info"
for row in results:
influxdb3_local.info(f"Sensor reading: {row}")
此脚本获取 wind_speed
中最近 10 分钟内的所有记录。然后记录结果以进行进一步处理。无论您需要过滤掉异常值、计算滚动平均值还是标记异常模式,此函数都能实现。
日志消息
良好的可观察性需要了解实时发生的事情。处理引擎提供内置的日志记录函数(info
、warn
、error
)来帮助实现这一点。
influxdb3_local.info("Wind speed normal")
influxdb3_local.warn("Potential issue detected in weather data")
influxdb3_local.error("Critical failure: unknown sensor issue")
此外,这些日志是非短暂性的。它们长期存储在系统表和服务器日志中。使用它们来帮助了解脚本中何时出现问题以及原因,以及标准日志记录以了解随时间变化的状态。
动态参数
每种插件类型都可以从触发器配置接收参数。这些参数使您的插件更具动态性;构建它是为了简化设置阈值、定义过滤器、提供外部 API 凭据等。在下面的示例中,您可以看到在可变情况下的灵活性,例如在风暴中跟踪风速。
def process_writes(influxdb3_local, table_batches, args=None):
threshold = float(args["wind_threshold"])
for table_batch in table_batches:
for row in table_batch["rows"]:
wind_speed = row.get("wind_speed")
if wind_speed > threshold:
influxdb3_local.warn(f"High wind speed detected: {wind_speed} mph at {row['time']}")
通过这种方法,您可以动态调整阈值而无需修改代码。想将阈值更改为 50 英里/小时,因为您知道风暴即将来临,并且您不想要持续的日志记录?只需更新插件配置——无需重新部署脚本或重启数据库。
如何开始使用并分享反馈
我们仍处于起步阶段,并大力投入开发适用于许多不同用例的优质插件。在接下来的几个月中,预计将有更多内置函数、新触发器以及更轻松地查找和共享插件的方式。处理引擎目前在 InfluxDB 3 Core 和 InfluxDB 3 Enterprise 公共 Alpha 版中可用,适用于本机安装(新增!)和 Docker 环境。在我们的文档中了解更多关于入门的信息。