Quix for InfluxDB

强大的性能和简单的集成,由 InfluxData 构建的开源数据连接器 Telegraf 提供支持。

50 亿+

Telegraf 下载量

#1

时间序列数据库
来源:DB Engines

10 亿+

InfluxDB 下载量

2,800+

贡献者

目录

强大的性能,无限的扩展能力

收集、组织和处理海量高速数据。当您将任何数据视为时间序列数据时,它会更有价值。借助 InfluxDB,这是 #1 的时间序列平台,旨在与 Telegraf 一起扩展。

查看入门方法

为什么使用 Quix for InfluxDB?

Quix for InfluxDB 允许您构建实时数据管道,以预处理数据和触发操作——所有这些都在 Python 中完成。如果您已经了解 Python,那么它是 Kapacitor 和 Flux 的完美替代品,因为它可以执行相同的常见任务,例如数据集成、丰富、降采样和警报。您可以使用 Quix 处理任何需要检测事件并在外部系统中触发操作的用例,例如异常检测和预测性维护。

Quix 由 Quix Streams 和 Quix Cloud 组成。Quix Streams 是一个开源 Python 客户端库,具有流式 DataFrame 和有状态操作,例如窗口函数和自定义聚合。它提供了对 Apache Kafka 的高级抽象,使 Python 开发人员能够以弹性和持久性保证处理流式数据。由于是开源的,您的业务逻辑没有供应商锁定。Quix Streams 可以在安装了 Python 3.8+ 的任何地方运行。

Quix Cloud 是一个完全托管的平台,用于在流式 ETL 管道中运行 Quix Streams 任务。您可以使用云 IDE 和可视化 DAG 编辑器构建管道,并通过单击一下部署。Quix Cloud 提供托管的 Apache Kafka、Kubernetes、监控和 CI/CD。您可以将其用作无服务器平台,也可以将其部署到您的本地基础设施以构建混合边缘到云数据平台。

如何使用此集成

要使用此集成,您需要运行 Apache Kafka — Quix 使用 Kafka 作为其数据骨干来存储消息、警报和中间计算/聚合。您可以自带 Kafka 集群,也可以使用 Quix Cloud 上的托管 Kafka。

您可以在 Quix 网站上注册免费试用帐户

拥有帐户后,创建一个项目创建一个初始环境,然后创建一个应用程序。导航到代码示例并找到输入InfluxDB 3.0 Source 连接器

定义以下环境变量

  • output:将接收流的输出 Kafka 主题的名称。如果该主题不存在,则会在首次运行时自动创建。(默认值:influxdb,必需:True
  • task_interval:运行查询的间隔。必须采用 InfluxDB 表示法;1 秒、1 分钟、1 小时、1 天、1 周、1 个月、1 年(默认值:5 分钟,必需:True
  • INFLUXDB_HOST:InfluxDB 实例的主机地址。(默认值:eu-central-1-1.aws.cloud2.influxdata.com,必需:True
  • NFLUXDB_TOKEN:用于访问 InfluxDB 的身份验证令牌。(默认值:TOKEN,必需:True
  • INFLUXDB_ORG:InfluxDB 中的组织名称。(默认值:ORG,必需:False
  • INFLUXDB_DATABASE:存储数据的 InfluxDB 中的数据库名称。(默认值:DATABASE,必需:True
  • INFLUXDB_MEASUREMENT_NAME:从中读取数据的 InfluxDB 测量名称。如果未指定,将使用输出主题的名称,(必需:False

以下代码在 Quix IDE 中运行,您在创建第一个应用程序后会看到它(您也可以通过从此代码使用 Quix 示例库创建应用程序在本地工作)。

# Import utility modules
import os
import random
import json
import logging
from time import sleep

# Import vendor-specific libraries
from quixstreams import Application
import influxdb_client_3 as InfluxDBClient3

# Initialize logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Create a Quix Application
app = Application(consumer_group="influxdb_sample", auto_create_topics=True)

# Define the topic using the "output" environment variable, along with how its
# data should be serialized
topic = app.topic(
    name=os.environ["output"],
    key_serializer="string",
    value_serializer="json"
)

influxdb3_client = InfluxDBClient3.InfluxDBClient3(
    token=os.environ["INFLUXDB_TOKEN"],
    host=os.environ["INFLUXDB_HOST"],
    org=os.environ["INFLUXDB_ORG"],
    database=os.environ["INFLUXDB_DATABASE"]
)

measurement_name = os.getenv("INFLUXDB_MEASUREMENT_NAME", os.environ["output"])
interval = os.getenv("task_interval", "5m")

# Global variable to control the main loop's execution
run = True

# InfluxDB interval-to-seconds conversion dictionary
UNIT_SECONDS = {
    "s": 1,
    "m": 60,
    "h": 3600,
    "d": 86400,
    "w": 604800,
    "y": 31536000,
}

# Helper function to convert time intervals (like 1h, 2m) into seconds for easier processing.
# This function is useful for determining the frequency of certain operations.
def interval_to_seconds(interval: str) -> int:
    try:
        return int(interval[:-1]) * UNIT_SECONDS[interval[-1]]
    except ValueError as e:
        if "invalid literal" in str(e):
            raise ValueError(
                "interval format is {int}{unit} i.e. '10h'; "
                f"valid units: {list(UNIT_SECONDS.keys())}")
    except KeyError:
        raise ValueError(
            f"Unknown interval unit: {interval[-1]}; "
            f"valid units: {list(UNIT_SECONDS.keys())}")

interval_seconds = interval_to_seconds(interval)

# Function to fetch data from InfluxDB and send it to Quix
# It runs in a continuous loop, periodically fetching data based on the interval.
def get_data():
    # Run in a loop until the main thread is terminated
    while run:
        try:
            query_definition = f'SELECT * FROM "{measurement_name}" WHERE time >= now() - {interval}'
            logger.info(f"Sending query {query_definition}")
            # Query InfluxDB 3.0 using influxql or sql
            table = influxdb3_client.query(
                query=query_definition,
                mode="pandas",
                language="influxql"
            )
            table = table.drop(columns=["iox::measurement"])

            # If there are rows to write to the stream at this time
            if not table.empty:
                # Convert to JSON for JSON-to-bytes serializer
                json_result = table.to_json(orient='records', date_format='iso')
                yield json_result
                logger.info("query success")
            else:
                logger.info("No new data to publish.")

            # Wait for the next interval
            sleep(interval_seconds)

        except Exception as e:
            logger.error(f"query failed; error: {e}")
            sleep(1)

def main():
    """
    Read data from the Query and publish it to Kafka
    """

    # Create a pre-configured Producer object.
    # Producer is already setup to use Quix brokers.
    # It will also ensure that the topics exist before producing to them if
    # Application is initialized with "auto_create_topics=True".

    with app.get_producer() as producer:
        for res in get_data():
            # Parse the JSON string into a Python object
            records = json.loads(res)
            for index, obj in enumerate(records):
                # Generate a unique message_key for each row
                message_key = f"INFLUX_DATA_{str(random.randint(1, 100)).zfill(3)}_{index}"
                logger.info(f"Produced message with key:{message_key}, value:{obj}")

                # Serialize data for kafka producing
                serialized = topic.serialize(key=message_key, value=obj)

                # publish the data to the topic
                producer.produce(
                    topic=topic.name,
                    key=serialized.key,
                    value=serialized.value,
                )

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        logger.info("Stop requested...")
        run = False
    finally:
        logger.info("Application has been stopped")

您可以在Quix 文档中找到有关如何运行和部署应用程序的更多详细信息。

一旦数据在 Kafka 主题中可用,您可以使用另一个服务来消费和处理数据(您可以移植常见的 InfluxDB 任务,例如降采样)。当您准备好将数据写回 InfluxDB 时,请使用相应的输出InfluxDB 3.0 Sink 连接器

有关灵感,请参阅Quix 模板库,其中包含准备运行的参考用例/数据管道以及完整的源代码。您可能对在 InfluxDB v2 和 v3 之间同步数据以及使用 InfluxDB 进行预测性维护感兴趣。

注意:此处涵盖的连接器适用于 InfluxDB 3.0,但在 Quix Samples GitHub 存储库中还有一个输入InfluxDB 2.0 Source 连接器可用。

有关更多信息,请查看文档。

项目 URL   文档

强大的性能,无限的扩展能力

收集、组织和处理海量高速数据。当您将任何数据视为时间序列数据时,它会更有价值。借助 InfluxDB,这是 #1 的时间序列平台,旨在与 Telegraf 一起扩展。

查看入门方法

相关集成