从零开始使用 Bytewax 和 InfluxDB

导航到

在本教程中,我们将探讨如何将 Bytewax 与 InfluxDB 无缝集成,以解决一个常见挑战:降采样。无论是处理物联网数据、DevOps 监控还是任何时序指标,降采样(或物化视图)是您管理长期存储时序数据而不丢失关键趋势的关键。Bytewax 是一个开源的 Python 框架,用于构建高度可扩展的数据流来处理任何数据流。InfluxDB 是领先时序数据库和平台。完成本指南后,您将清楚地了解如何利用 Bytewax 构建可扩展的数据管道,为使用 InfluxDB Cloud v3 进行深入分析做好准备。本博客文章的对应仓库可以在这里找到。

要求

要运行此示例,您需要以下内容:

  • InfluxDB Cloud v3 账户:在此处注册免费试用。
  • Bytewax:您可以通过pip install bytewax简单地使用pip安装,或者遵循这篇文档。

您需要为InfluxDB设置以下环境变量(或者在脚本中硬编码它们)

INFLUXDB_TOKEN: Your InfluxDB authentication token.
INFLUXDB_DATABASE: The name of your InfluxDB database.
INFLUXDB_ORG: Your InfluxDB organization name.

您还希望有一些实时数据写入您的InfluxDB实例,这样您就可以在Bytewax数据流中利用它。我建议配置一个Telegraf代理从您的机器中拉取CPU指标。

基本请求

将InfluxDB实例中的数据获取到Bytewax数据流的最简单方法是使用SimplePollingSource类。此脚本利用Bytewax创建一个数据管道,定期轮询InfluxDB数据库以获取数据,并实时处理它。它定义了一个自定义数据源类InfluxDBSource,该类连接到InfluxDB数据库并执行一个查询来检索最后15秒的数据。然后,检索到的数据被传递到Bytewax数据流中,在那里进行处理并输出到控制台。

import bytewax.operators as op
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.inputs import SimplePollingSource
from influxdb_client_3 import InfluxDBClient3
import logging
from datetime import timedelta

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class InfluxDBSource(SimplePollingSource):
    def next_item(self):
        client = InfluxDBClient3(host="your host URL: i.e. us-east-1-1.aws.cloud2.influxdata.com ",
                                 database="your database",
                                 token="your InfluxDB Token")
        query = "SELECT * from cpu WHERE time >= now() - INTERVAL '15 seconds' LIMIT 5"
        data = client.query(query=query, mode="pandas")
        return data

flow = Dataflow("a_simple_example")

stream = op.input("input", flow, InfluxDBSource(timedelta(seconds=15)))

op.output("out", stream, StdOutSink())

要运行此脚本,请使用:python3 -m bytewax.run basic_request.py

现在您已经从InfluxDB实例中获取了时间序列数据,您可以利用Bytewax提供的各种工具和教程来处理时间序列数据

使用Bytewax和InfluxDB开始使用物化视图

您还可以使用这些自定义源和接收器来执行InfluxDB的下采样(或创建物化视图)。此dataflow.py脚本提供了一个示例,它使用自定义接收器和源来每10秒下采样一分钟的数据

  1. 查询InfluxDB并返回一个数据框。

  2. 使用SQL聚合值。

  3. 将下采样后的数据框写回到InfluxDB。

import os
import logging
from datetime import timedelta, datetime, timezone

import pandas as pd
import bytewax.operators as op
from bytewax.dataflow import Dataflow
from influx_connector import InfluxDBSink, InfluxDBSource

TOKEN = os.getenv("INLFUXDB_TOKEN", "your InfluxDB token")
DATABASE = os.getenv("INFLUXDB_DATABASE", "your InfluxDB Database")
ORG = os.getenv("INFLUXDB_ORG", "your org ID")

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Your custom aggregation query
query = """
SELECT
  date_bin(INTERVAL '15 seconds', time, TIMESTAMP '1970-01-01 00:00:00Z') AS time,
  avg("usage_system") AS usage_system_avg
FROM "cpu"
WHERE
  time >= now() - INTERVAL '1 minute'
GROUP BY 1
ORDER BY time DESC
"""

# Dataflow setup for querying and aggregating values
flow = Dataflow("a_simple_query")

# Create InfluxDBSource with the custom query
inp = op.input("inp", flow, InfluxDBSource(

    timedelta(seconds=10),  # Poll every 10 seconds
    "your host URL i.e. https://us-east-1-1.aws.cloud2.influxdata.com",
    DATABASE,
    TOKEN,
    "cpu",  # Measurement name
    ORG,
    datetime.now(timezone.utc) - timedelta(minutes=1),  # Query data from the last minute
    query=query  # Pass the custom query
))

# Inspect the input data
op.inspect("input_query", inp)

# Use the custom sink to write the DataFrame directly back to InfluxDB
op.output("out", inp, InfluxDBSink(
    host="https://us-east-1-1.aws.cloud2.influxdata.com",
    database=DATABASE,
    token=TOKEN,
    org=ORG,
    data_frame_measurement_name="cpu_aggregated",
    # data_frame_tag_columns=['cpu'],  # Specify and columns that are tags if applicable
    data_frame_timestamp_column='time'  # Specify the column that contains timestamps
))

此脚本设置了一个Bytewax数据流,每10秒定期从InfluxDB数据库查询数据,重点关注CPU度量。它检索最后分钟的数据,在15秒间隔内汇总usage_system的平均值,然后将汇总后的数据输出到InfluxDB中的不同度量(cpu_aggregated)。该脚本设计得易于通过环境变量配置InfluxDB凭据,确保安全和灵活的使用。还设置了日志记录以监控数据流的活动。

使用官方的Bytewax InfluxDB连接器

我还鼓励您使用官方的源和接收器Bytewax连接器用于InfluxDB(感谢Zander编写此连接器并将InfluxDB添加到Bytewax丰富的源和接收器选项集合中!!)。上面的存储库包含有关如何使用连接器的文档以及安装说明。您可以按照本博客文章中共享的步骤执行,但不需要包括自定义源和接收器 python脚本,我们已在示例存储库中使用。官方的Bytewax InfluxDB源和接收器还支持许多其他写入类型,而不仅仅是数据框。

结论

一如既往,您可以从InfluxDB v3 Cloud这里开始。在下篇文章中,我们将介绍如何运行项目,深入了解架构和逻辑,并讨论所选堆栈的优缺点。如果您需要帮助,请通过我们的社区网站Slack频道联系我们。如果您也在使用InfluxDB和Bytewax进行数据处理项目,我很乐意听到您的声音!最后,我还建议您联系Bytewax社区;他们非常有帮助。感谢Zander和Laura回答我的所有问题,帮助我完成这个示例,并如此热情地欢迎我!