客户端库深度解析:Python(第 1 部分)

导航至

InfluxDB-3-Python

使用新的 InfluxDB 3.0 Python CLI 和客户端库

社区客户端库已随着 InfluxDB 3.0 回归。如果您想了解每个客户端库的概览,我强烈建议您查看 Anais 关于其状态的博客

在这个分为两部分的博客系列中,我们将深入探讨新的 Python 客户端库CLI。到最后,您应该对当前的功能、内部工作原理以及我对这两个项目未来的想法有一个很好的理解。我希望这能让您有机会为它们的未来做出贡献并发表您的意见。

在这篇文章(第 1 部分)中,我们将主要关注客户端库,因为它构成了 Python CLI 的基础。

如果您愿意,可以观看此视频教程。

Python 客户端库

那么,让我们从 Python 客户端库开始。范围很简单:构建一个可以写入和查询 InfluxDB 3.0 的库。由于写入端点在 InfluxDB 3.0 中没有改变,我们可以从 V2 库中引入许多功能,例如批量写入、数据解析、点对象等等。然而,在查询方面,我们不得不完全重做它。我们想专注于 Arrow Flight 的功能,并支持基于 SQL 和 InfluxQL 的查询。PyArrow 还为 Pandas 和 Polars 等库打开了更好的生态系统支持,但我稍后会详细介绍这一点。

让我们一起构建一个简单的 Python 应用程序,用于写入和查询 InfluxDB 3.0。

安装

要安装客户端库(我建议首先创建一个 Python 虚拟环境)

$ python3 -m venv ./.venv
$ source .venv/bin/activate
$ pip install –upgrade pip
$ pip install influxdb3-python

这组命令创建了我们的虚拟 Python 环境,激活它,更新了我们的 Python 包安装程序,最后安装了新的客户端库。

创建客户端

在本节中,我们导入新安装的库并建立客户端。我还将讨论一些配置参数及其背后的原因。

让我们创建一个名为 main.py 的文件,其中包含以下代码

from influxdb_client_3 import InfluxDBClient3, Point
import pandas as pd
import numpy as np
import datetime

client = InfluxDBClient3( token="",
    host="eu-central-1-1.aws.cloud2.influxdata.com",
    org="6a841c0c08328fb1",
    database="pokemon-codex")

此示例显示了客户端的最小配置。与之前的客户端一样,它需要以下参数

token 这为客户端提供身份验证,以便从 InfluxDB Cloud ServerlessDedicated 读取和写入数据。注意:如果您希望同时使用这两个功能,则需要一个具有读写身份验证的令牌。
host InfluxDB 主机 — 这应该只是域名,不带协议 (https://)
org Cloud Serverless 仍然需要用户的组织 ID 才能将数据写入 3.0。Dedicated 用户只需使用任意字符串即可。
database 您希望查询和写入数据的数据库。

我建议在每个数据库的基础上创建一个客户端,但如果您只想创建一个客户端,您可以更新 _database 实例变量。

接下来,让我们看一下客户端的高级参数

flight_client_options 这提供了对 flight 查询协议参数的访问权限。您可以在此处找到配置选项。

示例.
write_client_options 这提供了对 V2 写入客户端使用的参数的访问权限,您可以在此处找到这些参数。

示例。
**kwargs 最后,这提供了对 V2 客户端使用的参数的访问权限,您可以在此处找到这些参数。

示例。(gzip 压缩)

让我们继续原来的示例,讨论写入功能。

写入数据

现在我们已经建立了客户端,在本节中,我们将介绍您可以使用的不同方法将数据写入 InfluxDB 3.0。大多数方法对您来说都很熟悉,因为它们遵循与 V2 相同的摄取方法。

让我们从基本点构建开始

# Continued from the Client's example

now = datetime.datetime.now(datetime.timezone.utc)

data = Point("caught").tag("trainer", "ash").tag("id", "0006").tag("num", "1")\
                                             .field("caught", "charizard")\
                                             .field("level", 10).field("attack", 30)\
                                             .field("defense", 40).field("hp", 200)\
                                             .field("speed", 10)\
                                             .field("type1", "fire").field("type2", "flying")\
                                             .time(now)

try:
    client.write(data)
except Exception as e:
    print(f"Error writing point: {e}")

在此示例中,您可以看到我们使用 Point 类的实例构建了我们的行协议,然后将其转换为行协议

Point,trainer=ash,id=0006,num=1 caught="charizard",level=10i,attack=30i,defense=40i,hp=200i,speed=10i,type1="fire",type2="flying" <timestamp>

您也可以将其格式化为点数组

data = []
# Adding first point
data.append(
    Point("caught")
    .tag("trainer", "ash")
    .tag("id", "0006")
    .tag("num", "1")
    .field("caught", "charizard")
    .field("level", 10)
    .field("attack", 30)
    .field("defense", 40)
    .field("hp", 200)
    .field("speed", 10)
    .field("type1", "fire")
    .field("type2", "flying")
    .time(now)
)

# Adding second point
data.append(
    Point("caught")
    .tag("trainer", "ash")
    .tag("id", "0007")
    .tag("num", "2")
    .field("caught", "bulbasaur")
    .field("level", 12)
    .field("attack", 31)
    .field("defense", 31)
    .field("hp", 190)
    .field("speed", 11)
    .field("type1", "grass")
    .field("type2", "poison")
    .time(now)
)

您还可以通过字典编码和结构化数据方法进行写入。我最喜欢的摄取方法之一是通过 Pandas DataFrame。

让我们看一个利用此方法的示例

# Convert the list of dictionaries to a DataFrame
caught_pokemon_df = pd.DataFrame(data).set_index('timestamp')

# Print the DataFrame
print(caught_pokemon_df)

try:
    client.write(caught_pokemon_df, data_frame_measurement_name='caught',
             data_frame_tag_columns=['trainer', 'id', 'num'])
except Exception as e:
    print(f"Error writing point: {e}")

此示例为本次会话创建了一个 Pandas DataFrame,其中包含我们捕获的宝可梦。我们将数据帧的索引设置为捕获宝可梦的时间戳,然后将数据帧以及以下写入参数提供给“write()”函数

data_frame_measurement_name 您希望将 Pandas DataFrame 写入的测量名称。
data_frame_tag_columns 一个字符串列表,其中包含您希望设为标签的列名。
data_frame_timestamp_column 如果您的索引未设置为时间戳,请使用此参数设置时间戳列。

请务必查看此处的完整示例。您还可以此处找到批量处理示例。

从文件写入数据

以前的客户端库的一个广受欢迎的功能是提供更多上传和解析不同文件数据格式的方法。利用 PyArrow 的实用程序,我们现在可以支持上传以下格式的文件

CSV 此处示例。
JSON 此处示例。
Feather 此处示例。
ORC 此处示例。
Parquet 此处示例。

查询数据

现在我们已经将一些数据写入 InfluxDB 3.0,让我们来谈谈如何将数据查询出来。3.0 提供了完全支持的 Apache Arrow Flight 端点,允许用户使用 SQL 或 InfluxQL 进行查询。

让我们首先看一下 SQL 和 InfluxQL 中的基本时间序列查询;

from influxdb_client_3 import InfluxDBClient3
import pandas as pd

client = InfluxDBClient3(
    token="",
    host="eu-central-1-1.aws.cloud2.influxdata.com",
    org="6a841c0c08328fb1",
    database="pokemon-codex")

sql = '''SELECT * FROM caught WHERE trainer = 'ash' AND time >= now() - interval '1 hour' LIMIT 5'''
table = client.query(query=sql, language='sql', mode='all')
print(table)

influxql = '''SELECT * FROM caught WHERE trainer = 'ash' AND time  > now() - 1h LIMIT 5'''
table = client.query(query=influxql, language='influxql', mode='pandas')
print(table)

正如您在此示例中看到的,我们使用了相同的客户端来使用 InfluxQL 和 SQL 进行查询。让我们快速看一下查询参数,了解它们如何塑造我们的返回结果。

query 此参数当前接受 SQL 或 InfluxQL 查询的字符串字面量。我们希望尽快为此添加预处理语句。
language 此参数接受字符串字面量“sql”或“influxql”
mode 目前有 5 种返回模式
1. “all”:这将返回查询的所有数据,格式为 PyArrow 表
2. “pandas”:将所有数据作为 Pandas DataFrame 返回
3. “chunk”:返回 flight reader,以便用户可以以较小的样本大小迭代大型查询(请参阅示例
4. “reader”:尝试将流转换为 RecordBatchReader
5. “schema”:返回查询负载架构

未来展望

罗马不是一天建成的,还有许多生活质量方面的改进和新功能需要添加。下表概述了一些

功能 状态
合并 V2 客户端中的 Write API,以消除外部库依赖项。 进行中
查询的预处理语句 待办
InfluxDB 的 Arrow 表写入器 待办
改进 Polars 支持 待办
集成 delta sharing 待办

亲自试用

我们为 Python 中的 InfluxDB 3.0 构建了一个我希望成为出色的社区驱动客户端库的基础。我的行动号召是,如果您还没有这样做,请试用该库并进行全面测试。我们可能没有意识到很多边缘情况,如果没有社区的帮助,我们就无法找到这些情况。我热切期待问题和功能请求。