从零开始使用 Bytewax 和 InfluxDB
作者:Anais Dotis-Georgiou / 开发者
2024年10月15日
导航到
在本教程中,我们将探讨如何将 Bytewax 与 InfluxDB 无缝集成,以解决一个常见挑战:降采样。无论是处理物联网数据、DevOps 监控还是任何时序指标,降采样(或物化视图)是您管理长期存储时序数据而不丢失关键趋势的关键。Bytewax 是一个开源的 Python 框架,用于构建高度可扩展的数据流来处理任何数据流。InfluxDB 是领先时序数据库和平台。完成本指南后,您将清楚地了解如何利用 Bytewax 构建可扩展的数据管道,为使用 InfluxDB Cloud v3 进行深入分析做好准备。本博客文章的对应仓库可以在这里找到。
要求
要运行此示例,您需要以下内容:
- InfluxDB Cloud v3 账户:在此处注册免费试用。
- Bytewax:您可以通过
pip install bytewax
简单地使用pip安装,或者遵循这篇文档。
您需要为InfluxDB设置以下环境变量(或者在脚本中硬编码它们)
INFLUXDB_TOKEN: Your InfluxDB authentication token.
INFLUXDB_DATABASE: The name of your InfluxDB database.
INFLUXDB_ORG: Your InfluxDB organization name.
您还希望有一些实时数据写入您的InfluxDB实例,这样您就可以在Bytewax数据流中利用它。我建议配置一个Telegraf代理从您的机器中拉取CPU指标。
基本请求
将InfluxDB实例中的数据获取到Bytewax数据流的最简单方法是使用SimplePollingSource类。此脚本利用Bytewax创建一个数据管道,定期轮询InfluxDB数据库以获取数据,并实时处理它。它定义了一个自定义数据源类InfluxDBSource,该类连接到InfluxDB数据库并执行一个查询来检索最后15秒的数据。然后,检索到的数据被传递到Bytewax数据流中,在那里进行处理并输出到控制台。
import bytewax.operators as op
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.inputs import SimplePollingSource
from influxdb_client_3 import InfluxDBClient3
import logging
from datetime import timedelta
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class InfluxDBSource(SimplePollingSource):
def next_item(self):
client = InfluxDBClient3(host="your host URL: i.e. us-east-1-1.aws.cloud2.influxdata.com ",
database="your database",
token="your InfluxDB Token")
query = "SELECT * from cpu WHERE time >= now() - INTERVAL '15 seconds' LIMIT 5"
data = client.query(query=query, mode="pandas")
return data
flow = Dataflow("a_simple_example")
stream = op.input("input", flow, InfluxDBSource(timedelta(seconds=15)))
op.output("out", stream, StdOutSink())
要运行此脚本,请使用:python3 -m bytewax.run basic_request.py
现在您已经从InfluxDB实例中获取了时间序列数据,您可以利用Bytewax提供的各种工具和教程来处理时间序列数据
使用Bytewax和InfluxDB开始使用物化视图
您还可以使用这些自定义源和接收器来执行InfluxDB的下采样(或创建物化视图)。此dataflow.py脚本提供了一个示例,它使用自定义接收器和源来每10秒下采样一分钟的数据
-
查询InfluxDB并返回一个数据框。
-
使用SQL聚合值。
-
将下采样后的数据框写回到InfluxDB。
import os
import logging
from datetime import timedelta, datetime, timezone
import pandas as pd
import bytewax.operators as op
from bytewax.dataflow import Dataflow
from influx_connector import InfluxDBSink, InfluxDBSource
TOKEN = os.getenv("INLFUXDB_TOKEN", "your InfluxDB token")
DATABASE = os.getenv("INFLUXDB_DATABASE", "your InfluxDB Database")
ORG = os.getenv("INFLUXDB_ORG", "your org ID")
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Your custom aggregation query
query = """
SELECT
date_bin(INTERVAL '15 seconds', time, TIMESTAMP '1970-01-01 00:00:00Z') AS time,
avg("usage_system") AS usage_system_avg
FROM "cpu"
WHERE
time >= now() - INTERVAL '1 minute'
GROUP BY 1
ORDER BY time DESC
"""
# Dataflow setup for querying and aggregating values
flow = Dataflow("a_simple_query")
# Create InfluxDBSource with the custom query
inp = op.input("inp", flow, InfluxDBSource(
timedelta(seconds=10), # Poll every 10 seconds
"your host URL i.e. https://us-east-1-1.aws.cloud2.influxdata.com",
DATABASE,
TOKEN,
"cpu", # Measurement name
ORG,
datetime.now(timezone.utc) - timedelta(minutes=1), # Query data from the last minute
query=query # Pass the custom query
))
# Inspect the input data
op.inspect("input_query", inp)
# Use the custom sink to write the DataFrame directly back to InfluxDB
op.output("out", inp, InfluxDBSink(
host="https://us-east-1-1.aws.cloud2.influxdata.com",
database=DATABASE,
token=TOKEN,
org=ORG,
data_frame_measurement_name="cpu_aggregated",
# data_frame_tag_columns=['cpu'], # Specify and columns that are tags if applicable
data_frame_timestamp_column='time' # Specify the column that contains timestamps
))
此脚本设置了一个Bytewax数据流,每10秒定期从InfluxDB数据库查询数据,重点关注CPU度量。它检索最后分钟的数据,在15秒间隔内汇总usage_system
的平均值,然后将汇总后的数据输出到InfluxDB中的不同度量(cpu_aggregated
)。该脚本设计得易于通过环境变量配置InfluxDB凭据,确保安全和灵活的使用。还设置了日志记录以监控数据流的活动。
使用官方的Bytewax InfluxDB连接器
我还鼓励您使用官方的源和接收器Bytewax连接器用于InfluxDB(感谢Zander编写此连接器并将InfluxDB添加到Bytewax丰富的源和接收器选项集合中!!)。上面的存储库包含有关如何使用连接器的文档以及安装说明。您可以按照本博客文章中共享的步骤执行,但不需要包括自定义源和接收器 python脚本,我们已在示例存储库中使用。官方的Bytewax InfluxDB源和接收器还支持许多其他写入类型,而不仅仅是数据框。
结论
一如既往,您可以从InfluxDB v3 Cloud这里开始。在下篇文章中,我们将介绍如何运行项目,深入了解架构和逻辑,并讨论所选堆栈的优缺点。如果您需要帮助,请通过我们的社区网站或Slack频道联系我们。如果您也在使用InfluxDB和Bytewax进行数据处理项目,我很乐意听到您的声音!最后,我还建议您联系Bytewax社区;他们非常有帮助。感谢Zander和Laura回答我的所有问题,帮助我完成这个示例,并如此热情地欢迎我!