客户端库深入探讨:Python(第1部分)

导航到

InfluxDB-3-Python

使用新的 InfluxDB 3.0 Python CLI 和客户端库

社区客户端库随着 InfluxDB 3.0 回归。如果您想了解每个客户端库的概述,我强烈建议查看 Anais 的博客,了解其状态。

在这个两篇系列博客中,我们将深入探讨新的 Python 客户端库CLI。到那时,您应该对当前的功能、内部工作原理以及我对这两个项目的未来想法有很好的了解。从那里,我希望它给您提供机会为它们做出贡献,并在其未来发表意见。

在本篇(第1部分)中,我们将主要关注客户端库,因为它构成了 Python CLI 的基础。

如果您喜欢,也可以观看这个教程的视频形式。

Python 客户端库

因此,让我们从 Python 客户端库开始。其范围很简单:构建一个可以写入和查询 InfluxDB 3.0 的库。由于 InfluxDB 3.0 中的写入端点没有变化,我们可以将 V2 库中的许多功能(如批量写入、数据解析、点对象等)向前推进。然而,在查询方面,我们必须完全重做。我们想专注于 Arrow Flight 的功能,并支持基于 SQL 和 InfluxQL 的查询。 PyArrow 还为 Pandas 和 Polars 等库提供了更好的生态系统支持,但我稍后会详细介绍。

让我们一起构建一个简单的 Python 应用程序,该程序可以写入和查询 InfluxDB 3.0。

安装

安装客户端库(我建议首先创建一个 Python 虚拟环境)

$ python3 -m venv ./.venv
$ source .venv/bin/activate
$ pip install –upgrade pip
$ pip install influxdb3-python

这些命令创建我们的虚拟 Python 环境,激活它,更新我们的 Python 包安装程序,最后安装新的客户端库。

创建客户端

在本节中,我们导入我们新安装的库并建立客户端。我还讨论了一些配置参数及其背后的原因。

让我们创建一个名为 main.py 的文件,其中包含以下代码

from influxdb_client_3 import InfluxDBClient3, Point
import pandas as pd
import numpy as np
import datetime

client = InfluxDBClient3( token="",
    host="eu-central-1-1.aws.cloud2.influxdata.com",
    org="6a841c0c08328fb1",
    database="pokemon-codex")

此示例显示了客户端的最小配置。与以前的客户端一样,它需要以下参数

token 这为客户提供了从 InfluxDB Cloud 无服务器专用 读取和写入的认证。注意:如果您想使用这两个功能,则需要具有读写认证的令牌。
host InfluxDB 主机——这应该是域名,不包含协议(https://)
org 无服务器云仍然需要用户的组织 ID 来写入 3.0 的数据。专用用户可以使用任意字符串。
database 您希望查询和写入的数据库。

我建议为每个数据库创建一个客户端,尽管如果您只想创建一个客户端,可以更新 _database 实例变量。

接下来,让我们看看客户端的高级参数

flight_client_options 这提供了对飞行查询协议参数的访问。您可以在这里找到配置选项。

示例.
write_client_options 这提供了对V2写入客户端使用的参数的访问,您可以在这里找到。

示例。
**kwargs 最后,这提供了对V2客户端使用的参数的访问,您可以在这里找到。

示例。(gzip压缩)

让我们继续我们的原始示例,讨论写入功能。

写入数据

既然我们已经建立了客户端,在本节中,我们将探讨您可以将数据写入InfluxDB 3.0的不同方法。大多数方法您应该都很熟悉,因为它们遵循与V2相同的摄取方法。

让我们从基本的点构建开始

# Continued from the Client's example

now = datetime.datetime.now(datetime.timezone.utc)

data = Point("caught").tag("trainer", "ash").tag("id", "0006").tag("num", "1")\
                                             .field("caught", "charizard")\
                                             .field("level", 10).field("attack", 30)\
                                             .field("defense", 40).field("hp", 200)\
                                             .field("speed", 10)\
                                             .field("type1", "fire").field("type2", "flying")\
                                             .time(now)

try:
    client.write(data)
except Exception as e:
    print(f"Error writing point: {e}")

在这个例子中,您可以看到我们使用Point类的实例构建我们的行协议,然后将其转换为行协议

Point,trainer=ash,id=0006,num=1 caught="charizard",level=10i,attack=30i,defense=40i,hp=200i,speed=10i,type1="fire",type2="flying" <timestamp>

您也可以将其格式化为点的数组

data = []
# Adding first point
data.append(
    Point("caught")
    .tag("trainer", "ash")
    .tag("id", "0006")
    .tag("num", "1")
    .field("caught", "charizard")
    .field("level", 10)
    .field("attack", 30)
    .field("defense", 40)
    .field("hp", 200)
    .field("speed", 10)
    .field("type1", "fire")
    .field("type2", "flying")
    .time(now)
)

# Adding second point
data.append(
    Point("caught")
    .tag("trainer", "ash")
    .tag("id", "0007")
    .tag("num", "2")
    .field("caught", "bulbasaur")
    .field("level", 12)
    .field("attack", 31)
    .field("defense", 31)
    .field("hp", 190)
    .field("speed", 11)
    .field("type1", "grass")
    .field("type2", "poison")
    .time(now)
)

您也可以通过字典编码和结构化数据方法进行写入。我最喜欢的摄取方法是通过Pandas DataFrame。

让我们看看一个利用此方法的示例

# Convert the list of dictionaries to a DataFrame
caught_pokemon_df = pd.DataFrame(data).set_index('timestamp')

# Print the DataFrame
print(caught_pokemon_df)

try:
    client.write(caught_pokemon_df, data_frame_measurement_name='caught',
             data_frame_tag_columns=['trainer', 'id', 'num'])
except Exception as e:
    print(f"Error writing point: {e}")

此示例创建了一个包含本次会话捕获的宝可梦的Pandas DataFrame。我们将dataframe的索引设置为宝可梦捕获的时间戳,然后向“write()”函数提供dataframe以及以下写入参数

data_frame_measurement_name 您希望将Pandas DataFrame写入的度量名称。
data_frame_tag_columns 包含您希望设置为标签的列名的字符串列表。
data_frame_timestamp_column 如果您没有将索引设置为时间戳,请使用此参数设置时间戳列。

请务必查看完整的示例这里。您还可以在这里找到批处理示例这里

从文件写入数据

之前客户端库中一个经常要求的功能是提供更多上传和解析不同文件数据格式的方法。利用PyArrow的实用工具,我们现在可以支持以下格式的文件上传

CSV 示例在这里。
JSON 示例在这里。
Feather 示例在这里。
ORC 示例在这里。
Parquet 示例在这里。

查询数据

现在我们已经将一些数据写入InfluxDB 3.0,让我们谈谈如何查询它。3.0提供了一个完全支持的Apache Arrow Flight端点,允许用户使用SQL或InfluxQL进行查询。

让我们首先看看SQL和InfluxQL中的基本时间序列查询

from influxdb_client_3 import InfluxDBClient3
import pandas as pd

client = InfluxDBClient3(
    token="",
    host="eu-central-1-1.aws.cloud2.influxdata.com",
    org="6a841c0c08328fb1",
    database="pokemon-codex")

sql = '''SELECT * FROM caught WHERE trainer = 'ash' AND time >= now() - interval '1 hour' LIMIT 5'''
table = client.query(query=sql, language='sql', mode='all')
print(table)

influxql = '''SELECT * FROM caught WHERE trainer = 'ash' AND time  > now() - 1h LIMIT 5'''
table = client.query(query=influxql, language='influxql', mode='pandas')
print(table)

如您在此示例中看到的那样,我们使用了相同的客户端进行InfluxQL和SQL的查询。让我们快速看看查询参数,看看它们是如何塑造我们的返回结果的。

query 此参数目前接受您的SQL或InfluxQL查询的字符串字面量。我们希望很快添加预定义语句到这个功能。
language 此参数接受字符串字面量“sql”或“influxql”
mode 目前有5种返回模式
1. ‘all’:这将以PyArrow Table的形式返回查询的所有数据
2. ‘pandas’:将所有数据作为Pandas DataFrame返回
3. ‘chunk’:返回一个飞行读取器,用户可以以较小的样本大小迭代大型查询(查看示例
4. ‘reader’:尝试将流转换为RecordBatchReader
5. ‘schema’:返回查询有效载荷模式

未来期望

罗马不是一天建成的,还有很多生活质量的提升和新功能要添加。以下是一个概述的表格

功能 状态
将V2客户端的写API合并以消除外部库依赖。 进行中
查询的预处理语句 待办事项
InfluxDB的箭头表写入器 待办事项
改进Polars支持 待办事项
集成delta共享 待办事项

亲自尝试

我们为InfluxDB 3.0在Python中构建了一个希望成为优秀的社区驱动的客户端库的基础。我的行动呼吁是,如果您还没有这样做,请尝试这个库并对其进行测试。可能有许多边缘情况我们可能没有意识到,没有社区的帮助我们不会发现这些问题。我期待着问题和功能请求。