InfluxDB 3.0 Python 客户端更新:增加 Polars 支持

导航至

自从我们发布关于 InfluxDB 3.0 Python 客户端的文章以来,已经有一段时间了。让我们来看看有哪些新内容!

Polars Dataframe 摄取

2023 年,一个新的数据分析领域成员 Polars 开始流行。Polars Data Frame 库是原始 OG Pandas 的替代数据帧包。虽然两者都服务于相同的用例,但它们在根本上是基于不同的技术构建的。


我们收到的最常见的社区请求之一是提供与 Polars 的更高兼容性。由于 Polars 构建于 Apache Arrow 之上,我们扩展了 mode 函数以包含 polars。只需查询数据并将模式修改为 polars,如下所示

import polars as pl
from influxdb_client_3 import InfluxDBClient3

with InfluxDBClient3(
    token="",
    host="eu-central-1-1.aws.cloud2.influxdata.com",
    org="6a841c0c08328fb1") as client:

        sql = 'SELECT * FROM caught LIMIT 100000'
        df = client.query(database="pokemon-codex", query=sql, language='sql', mode='polars')
        print(df, flush=True)

我们在底层客户端代码中调用 Polars 函数 from_arrow()。这会自动将我们的 Arrow 表转换为 Polars Dataframe。注意:您必须安装 Polars Dataframe 库才能使用此模式。

摄取的情况略有不同。与 V1 和 V2 一样,InfluxDB V3 期望使用行协议 (LP) 作为其主要摄取方法。这意味着我们在客户端库中构建到 LP 的转换器。Polars 提供了一个极其高效的 UDF 功能,这使得创建这个新的转换器变得非常容易实现。我们将新的 Polars 数据帧转换器构建到预先存在的数据帧转换器类中。这是一个例子

import polars as pl
from influxdb_client_3 import InfluxDBClient3,InfluxDBError,WriteOptions,write_client_options

class BatchingCallback(object):

    def success(self, conf, data: str):
        print(f"Written batch: {conf}, data: {data}")

    def error(self, conf, data: str, exception: InfluxDBError):
        print(f"Cannot write batch: {conf}, data: {data} due: {exception}")

    def retry(self, conf, data: str, exception: InfluxDBError):
        print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")

callback = BatchingCallback()

write_options = WriteOptions(batch_size=10000,
                                        flush_interval=10_000,
                                        jitter_interval=2_000,
                                        retry_interval=5_000,
                                        max_retries=10,
                                        max_retry_delay=15_000,
                                        exponential_base=2, max_close_wait=900_000)

wco = write_client_options(success_callback=callback.success,
                          error_callback=callback.error,
                          retry_callback=callback.retry,
                          WriteOptions=write_options 
                        )

client = InfluxDBClient3(
    token="token",
    host="eu-central-1-1.aws.cloud2.influxdata.com",
    org="6a841c0c08328fb1", enable_gzip=True, write_client_options=wco)

pl_df =pl.read_parquet('pokemon_100_000.parquet')

client.write(database="pokemon-codex", 
             record=pl_df, data_frame_measurement_name='caught', 
             data_frame_tag_columns=['trainer', 'id', 'num'], 
             data_frame_timestamp_column='timestamp')

client.close()

在这种情况下,您可以看到它包含了我们在写入 Pandas 数据帧时会使用的相同参数。我们区分数据帧类型并在写入 API 中调用正确的转换器。

小贴士:请务必包含 data_frame_timestamp_column= 并指定您的时间戳列。Polars 不像 Pandas 那样提供索引方法,因此我们无法自动区分哪个是正确的列。

自定义 Arrow Flight 标头

另一个被请求的功能是在查询中包含自定义 Arrow Flight Call Options。这允许熟悉 Arrow Flight 的用户使用底层的配置参数。一个简单的例子可以是增加特定查询的超时时间

df = client.query(database="pokemon-codex", query=sql, language='sql', mode='polars', timeout=5)

错误和杂项

最后,这是一个小的变更历史列表

版本 变更
0.3.4 / 0.3.3 将 V2 Write API 合并到 V3 中,并移除了 V2 客户端库作为依赖项。
0.3.4 / 0.3.3 为集群用户添加了自定义端口声明
0.3.2 修复了 Pandas 作为可选依赖项的问题
0.3.1 添加了 flight errors readme 文件
0.3.1 添加了社区和 cookbook 示例
0.3.0 添加了自定义证书参数。(用于修复基于 Windows 的 gRPC SSL 问题)

下一步是什么?

我们希望您发现添加到 InfluxDB 3.0 Python 客户端库中的新功能很有用。如果您有任何功能请求或错误要报告,请随时通过客户端仓库提交 issue。我们一直在为我们的 3.0 客户端库寻找社区贡献者。您始终可以在 Slack 上与我们讨论您的贡献。