Bytewax 和 InfluxDB 入门
作者:Anais Dotis-Georgiou / 开发者
2024 年 10 月 15 日
导航至
在本教程中,我们将探讨 Bytewax 如何与 InfluxDB 无缝集成,以应对一个常见的挑战:降采样。无论您处理的是物联网数据、DevOps 监控还是任何时间序列指标,降采样(或实体化视图)都是您管理时间序列数据以进行长期存储而又不丢失基本趋势的关键。Bytewax 是一个开源 Python 框架,用于构建高度可扩展的数据流以处理任何数据流。InfluxDB 是领先的时间序列数据库和平台。在本指南结束时,您将清楚地了解如何利用 Bytewax 构建可扩展的数据管道,为使用 InfluxDB Cloud v3 进行深入分析做好准备。本博文的对应代码库可以在这里找到。
要求
运行此示例,您需要以下内容
- InfluxDB Cloud v3 账户:在此处注册免费试用。
- Bytewax:您可以使用
pip install bytewax
简单地进行 pip 安装,或遵循 此文档。
您需要为 InfluxDB 设置以下环境变量(或在脚本中硬编码)
INFLUXDB_TOKEN: Your InfluxDB authentication token.
INFLUXDB_DATABASE: The name of your InfluxDB database.
INFLUXDB_ORG: Your InfluxDB organization name.
您还需要将一些实时数据写入您的 InfluxDB 实例,以便您可以在 Bytewax 数据流中利用这些数据。我建议配置一个 Telegraf 代理,以从您的机器中提取 CPU 指标。
一个基本请求
将数据从您的 InfluxDB 实例导入 Bytewax Dataflow 的最简单方法是使用 SimplePollingSource 类。此脚本利用 Bytewax 创建一个数据管道,该管道定期轮询 InfluxDB 数据库以获取数据并实时处理数据。它定义了一个自定义数据源类 InfluxDBSource,该类连接到 InfluxDB 数据库并执行查询以检索过去 15 秒的数据。然后,检索到的数据被传递到 Bytewax 数据流中,在其中进行处理并输出到控制台。
import bytewax.operators as op
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.inputs import SimplePollingSource
from influxdb_client_3 import InfluxDBClient3
import logging
from datetime import timedelta
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class InfluxDBSource(SimplePollingSource):
def next_item(self):
client = InfluxDBClient3(host="your host URL: i.e. us-east-1-1.aws.cloud2.influxdata.com ",
database="your database",
token="your InfluxDB Token")
query = "SELECT * from cpu WHERE time >= now() - INTERVAL '15 seconds' LIMIT 5"
data = client.query(query=query, mode="pandas")
return data
flow = Dataflow("a_simple_example")
stream = op.input("input", flow, InfluxDBSource(timedelta(seconds=15)))
op.output("out", stream, StdOutSink())
要运行此脚本,请使用:python3 -m bytewax.run basic_request.py
现在您已经从 InfluxDB 实例获取了时间序列数据,您可以利用 Bytewax 为时间序列数据提供的各种工具和教程
Bytewax 和 InfluxDB 实体化视图入门
您还可以使用这些用于 InfluxDB 的自定义 Source 和 Sink,通过 Bytewax 执行降采样(或创建实体化视图)。此 dataflow.py 脚本提供了一个示例,该示例使用自定义 sink 和 source,通过以下方式每十秒对一分钟的数据进行降采样:
-
查询 InfluxBD 并返回数据帧。
-
使用 SQL 聚合值。
-
将降采样的数据帧写回 InfluxDB。
import os
import logging
from datetime import timedelta, datetime, timezone
import pandas as pd
import bytewax.operators as op
from bytewax.dataflow import Dataflow
from influx_connector import InfluxDBSink, InfluxDBSource
TOKEN = os.getenv("INLFUXDB_TOKEN", "your InfluxDB token")
DATABASE = os.getenv("INFLUXDB_DATABASE", "your InfluxDB Database")
ORG = os.getenv("INFLUXDB_ORG", "your org ID")
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Your custom aggregation query
query = """
SELECT
date_bin(INTERVAL '15 seconds', time, TIMESTAMP '1970-01-01 00:00:00Z') AS time,
avg("usage_system") AS usage_system_avg
FROM "cpu"
WHERE
time >= now() - INTERVAL '1 minute'
GROUP BY 1
ORDER BY time DESC
"""
# Dataflow setup for querying and aggregating values
flow = Dataflow("a_simple_query")
# Create InfluxDBSource with the custom query
inp = op.input("inp", flow, InfluxDBSource(
timedelta(seconds=10), # Poll every 10 seconds
"your host URL i.e. https://us-east-1-1.aws.cloud2.influxdata.com",
DATABASE,
TOKEN,
"cpu", # Measurement name
ORG,
datetime.now(timezone.utc) - timedelta(minutes=1), # Query data from the last minute
query=query # Pass the custom query
))
# Inspect the input data
op.inspect("input_query", inp)
# Use the custom sink to write the DataFrame directly back to InfluxDB
op.output("out", inp, InfluxDBSink(
host="https://us-east-1-1.aws.cloud2.influxdata.com",
database=DATABASE,
token=TOKEN,
org=ORG,
data_frame_measurement_name="cpu_aggregated",
# data_frame_tag_columns=['cpu'], # Specify and columns that are tags if applicable
data_frame_timestamp_column='time' # Specify the column that contains timestamps
))
此脚本设置了一个 Bytewax 数据流,以每十秒定期从 InfluxDB 数据库查询数据,重点关注 CPU 指标。它检索过去一分钟的数据,聚合 15 秒间隔内的平均 usage_system
值,然后将聚合数据输出回 InfluxDB 中的不同指标 (cpu_aggregated
)。该脚本旨在通过环境变量轻松配置 InfluxDB 凭据,确保安全灵活的使用。确保安全灵活的使用。还设置了日志记录以监控数据流的活动。
使用官方 Bytewax InfluxDB 连接器
我也想鼓励您使用 InfluxDB 的官方 source 和 sink Bytewax 连接器(感谢 Zander 编写此连接器并将 InfluxDB 添加到 Bytewax 大量的 source 和 sink 选项中!!)。上面的代码库包含有关如何使用连接器以及安装说明的文档。您按照本博文中分享的完全相同的步骤操作,但您不需要包含我们在本博文的示例代码库中使用的自定义 Source 和 Sink python 脚本。官方 Bytewax InfluxDB source 和 sink 还支持许多其他写入类型,而非数据帧。
结论
与往常一样,请在此处开始使用 InfluxDB v3 Cloud。在下一篇文章中,我们将介绍如何运行该项目,深入探讨架构和逻辑,并讨论所选堆栈的一些优点和缺点。如果您需要帮助,请在我们的社区网站或 Slack 频道上联系我们。如果您也在使用 InfluxDB 和 Bytewax 进行数据处理项目,我很乐意听到您的消息!最后,我还建议与 Bytewax 社区建立联系;他们非常乐于助人。感谢 Zander 和 Laura,感谢他们回答我的所有问题,帮助我完成这个示例,以及如此热情地欢迎我!