目录
强大的性能,无限的扩展能力
收集、组织和处理海量高速数据。当您将任何数据视为时间序列数据时,它会更有价值。借助 InfluxDB,这是 #1 的时间序列平台,旨在与 Telegraf 一起扩展。
查看入门方法
为什么使用 Quix for InfluxDB?
Quix for InfluxDB 允许您构建实时数据管道,以预处理数据和触发操作——所有这些都在 Python 中完成。如果您已经了解 Python,那么它是 Kapacitor 和 Flux 的完美替代品,因为它可以执行相同的常见任务,例如数据集成、丰富、降采样和警报。您可以使用 Quix 处理任何需要检测事件并在外部系统中触发操作的用例,例如异常检测和预测性维护。
Quix 由 Quix Streams 和 Quix Cloud 组成。Quix Streams 是一个开源 Python 客户端库,具有流式 DataFrame 和有状态操作,例如窗口函数和自定义聚合。它提供了对 Apache Kafka 的高级抽象,使 Python 开发人员能够以弹性和持久性保证处理流式数据。由于是开源的,您的业务逻辑没有供应商锁定。Quix Streams 可以在安装了 Python 3.8+ 的任何地方运行。
Quix Cloud 是一个完全托管的平台,用于在流式 ETL 管道中运行 Quix Streams 任务。您可以使用云 IDE 和可视化 DAG 编辑器构建管道,并通过单击一下部署。Quix Cloud 提供托管的 Apache Kafka、Kubernetes、监控和 CI/CD。您可以将其用作无服务器平台,也可以将其部署到您的本地基础设施以构建混合边缘到云数据平台。
如何使用此集成
要使用此集成,您需要运行 Apache Kafka — Quix 使用 Kafka 作为其数据骨干来存储消息、警报和中间计算/聚合。您可以自带 Kafka 集群,也可以使用 Quix Cloud 上的托管 Kafka。
您可以在 Quix 网站上注册免费试用帐户。
拥有帐户后,创建一个项目,创建一个初始环境,然后创建一个应用程序。导航到代码示例并找到输入InfluxDB 3.0 Source 连接器。
定义以下环境变量
- output:将接收流的输出 Kafka 主题的名称。如果该主题不存在,则会在首次运行时自动创建。(默认值:influxdb,必需:True)
- task_interval:运行查询的间隔。必须采用 InfluxDB 表示法;1 秒、1 分钟、1 小时、1 天、1 周、1 个月、1 年(默认值:5 分钟,必需:True)
- INFLUXDB_HOST:InfluxDB 实例的主机地址。(默认值:eu-central-1-1.aws.cloud2.influxdata.com,必需:True)
- NFLUXDB_TOKEN:用于访问 InfluxDB 的身份验证令牌。(默认值:
TOKEN
,必需:True) - INFLUXDB_ORG:InfluxDB 中的组织名称。(默认值:
ORG
,必需:False) - INFLUXDB_DATABASE:存储数据的 InfluxDB 中的数据库名称。(默认值:
DATABASE
,必需:True) - INFLUXDB_MEASUREMENT_NAME:从中读取数据的 InfluxDB 测量名称。如果未指定,将使用输出主题的名称,(必需:False)
以下代码在 Quix IDE 中运行,您在创建第一个应用程序后会看到它(您也可以通过从此代码使用 Quix 示例库创建应用程序在本地工作)。
# Import utility modules
import os
import random
import json
import logging
from time import sleep
# Import vendor-specific libraries
from quixstreams import Application
import influxdb_client_3 as InfluxDBClient3
# Initialize logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Create a Quix Application
app = Application(consumer_group="influxdb_sample", auto_create_topics=True)
# Define the topic using the "output" environment variable, along with how its
# data should be serialized
topic = app.topic(
name=os.environ["output"],
key_serializer="string",
value_serializer="json"
)
influxdb3_client = InfluxDBClient3.InfluxDBClient3(
token=os.environ["INFLUXDB_TOKEN"],
host=os.environ["INFLUXDB_HOST"],
org=os.environ["INFLUXDB_ORG"],
database=os.environ["INFLUXDB_DATABASE"]
)
measurement_name = os.getenv("INFLUXDB_MEASUREMENT_NAME", os.environ["output"])
interval = os.getenv("task_interval", "5m")
# Global variable to control the main loop's execution
run = True
# InfluxDB interval-to-seconds conversion dictionary
UNIT_SECONDS = {
"s": 1,
"m": 60,
"h": 3600,
"d": 86400,
"w": 604800,
"y": 31536000,
}
# Helper function to convert time intervals (like 1h, 2m) into seconds for easier processing.
# This function is useful for determining the frequency of certain operations.
def interval_to_seconds(interval: str) -> int:
try:
return int(interval[:-1]) * UNIT_SECONDS[interval[-1]]
except ValueError as e:
if "invalid literal" in str(e):
raise ValueError(
"interval format is {int}{unit} i.e. '10h'; "
f"valid units: {list(UNIT_SECONDS.keys())}")
except KeyError:
raise ValueError(
f"Unknown interval unit: {interval[-1]}; "
f"valid units: {list(UNIT_SECONDS.keys())}")
interval_seconds = interval_to_seconds(interval)
# Function to fetch data from InfluxDB and send it to Quix
# It runs in a continuous loop, periodically fetching data based on the interval.
def get_data():
# Run in a loop until the main thread is terminated
while run:
try:
query_definition = f'SELECT * FROM "{measurement_name}" WHERE time >= now() - {interval}'
logger.info(f"Sending query {query_definition}")
# Query InfluxDB 3.0 using influxql or sql
table = influxdb3_client.query(
query=query_definition,
mode="pandas",
language="influxql"
)
table = table.drop(columns=["iox::measurement"])
# If there are rows to write to the stream at this time
if not table.empty:
# Convert to JSON for JSON-to-bytes serializer
json_result = table.to_json(orient='records', date_format='iso')
yield json_result
logger.info("query success")
else:
logger.info("No new data to publish.")
# Wait for the next interval
sleep(interval_seconds)
except Exception as e:
logger.error(f"query failed; error: {e}")
sleep(1)
def main():
"""
Read data from the Query and publish it to Kafka
"""
# Create a pre-configured Producer object.
# Producer is already setup to use Quix brokers.
# It will also ensure that the topics exist before producing to them if
# Application is initialized with "auto_create_topics=True".
with app.get_producer() as producer:
for res in get_data():
# Parse the JSON string into a Python object
records = json.loads(res)
for index, obj in enumerate(records):
# Generate a unique message_key for each row
message_key = f"INFLUX_DATA_{str(random.randint(1, 100)).zfill(3)}_{index}"
logger.info(f"Produced message with key:{message_key}, value:{obj}")
# Serialize data for kafka producing
serialized = topic.serialize(key=message_key, value=obj)
# publish the data to the topic
producer.produce(
topic=topic.name,
key=serialized.key,
value=serialized.value,
)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
logger.info("Stop requested...")
run = False
finally:
logger.info("Application has been stopped")
您可以在Quix 文档中找到有关如何运行和部署应用程序的更多详细信息。
一旦数据在 Kafka 主题中可用,您可以使用另一个服务来消费和处理数据(您可以移植常见的 InfluxDB 任务,例如降采样)。当您准备好将数据写回 InfluxDB 时,请使用相应的输出InfluxDB 3.0 Sink 连接器。
有关灵感,请参阅Quix 模板库,其中包含准备运行的参考用例/数据管道以及完整的源代码。您可能对在 InfluxDB v2 和 v3 之间同步数据以及使用 InfluxDB 进行预测性维护感兴趣。
注意:此处涵盖的连接器适用于 InfluxDB 3.0,但在 Quix Samples GitHub 存储库中还有一个输入InfluxDB 2.0 Source 连接器可用。
强大的性能,无限的扩展能力
收集、组织和处理海量高速数据。当您将任何数据视为时间序列数据时,它会更有价值。借助 InfluxDB,这是 #1 的时间序列平台,旨在与 Telegraf 一起扩展。
查看入门方法