InfluxDB 3.0 Python客户端更新:添加Polars支持

导航到

自从我们上次发布了关于InfluxDB 3.0 Python客户端的消息以来,已经有一段时间了。让我们来看看有哪些新功能!

Polars DataFrame读取

2023年,数据分析领域出现了一个新秀,那就是Polars。Polars DataFrame库是OG Pandas的替代品。尽管两者都服务于相同的用例,但它们在技术上都是基于不同的技术构建的。


我们收到最多的社区请求之一是提供与Polars更好的兼容性。由于Polars是基于Apache Arrow构建的,我们扩展了mode函数,包括polars。只需查询数据并将模式修改为以下所示polars

import polars as pl
from influxdb_client_3 import InfluxDBClient3

with InfluxDBClient3(
    token="",
    host="eu-central-1-1.aws.cloud2.influxdata.com",
    org="6a841c0c08328fb1") as client:

        sql = 'SELECT * FROM caught LIMIT 100000'
        df = client.query(database="pokemon-codex", query=sql, language='sql', mode='polars')
        print(df, flush=True)

我们在底层客户端代码中调用Polars函数from_arrow()。这会将我们的Arrow表格自动转换为Polars DataFrame。注意:您必须安装Polars DataFrame库才能使用此模式。

摄取过程略有不同。与 V1 和 V2 类似,InfluxDB V3 期望行协议(LP)作为其主要的摄取方法。这意味着我们在客户端库中构建了 LP 的转换器。Polars 提供了一个极其高效的 用户定义函数(UDF)功能,这使得实现这个新的转换器变得简单直接。我们将新的 Polars 数据帧转换器构建到现有的数据帧转换器类中。以下是一个示例

import polars as pl
from influxdb_client_3 import InfluxDBClient3,InfluxDBError,WriteOptions,write_client_options

class BatchingCallback(object):

    def success(self, conf, data: str):
        print(f"Written batch: {conf}, data: {data}")

    def error(self, conf, data: str, exception: InfluxDBError):
        print(f"Cannot write batch: {conf}, data: {data} due: {exception}")

    def retry(self, conf, data: str, exception: InfluxDBError):
        print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")

callback = BatchingCallback()

write_options = WriteOptions(batch_size=10000,
                                        flush_interval=10_000,
                                        jitter_interval=2_000,
                                        retry_interval=5_000,
                                        max_retries=10,
                                        max_retry_delay=15_000,
                                        exponential_base=2, max_close_wait=900_000)

wco = write_client_options(success_callback=callback.success,
                          error_callback=callback.error,
                          retry_callback=callback.retry,
                          WriteOptions=write_options 
                        )

client = InfluxDBClient3(
    token="token",
    host="eu-central-1-1.aws.cloud2.influxdata.com",
    org="6a841c0c08328fb1", enable_gzip=True, write_client_options=wco)

pl_df =pl.read_parquet('pokemon_100_000.parquet')

client.write(database="pokemon-codex", 
             record=pl_df, data_frame_measurement_name='caught', 
             data_frame_tag_columns=['trainer', 'id', 'num'], 
             data_frame_timestamp_column='timestamp')

client.close()

在这种情况下,你可以看到它包括了我们在编写 Pandas 数据帧时使用的相同参数。我们在写入 API 中区分数据帧类型并调用正确的转换器。

小贴士:请确保包含 data_frame_timestamp_column= 并指定你的时间戳列。Polars 不像 Pandas 提供索引方法,所以我们无法自动区分哪个是正确的列。

自定义 Arrow Flight 头部

另一个请求的功能是包含自定义的 Arrow Flight 调用选项 以供查询使用。这允许熟悉 Arrow Flight 的用户使用底层配置参数。一个简单的例子可以是为特定的查询增加超时时间

df = client.query(database="pokemon-codex", query=sql, language='sql', mode='polars', timeout=5)

错误和杂项

最后,这里是一个小的变更历史列表

版本 变更
0.3.4 / 0.3.3 将 V2 写入 API 合并到 V3 中,并删除了 V2 客户端库作为依赖项。
0.3.4 / 0.3.3 为集群用户添加了自定义端口声明
0.3.2 修复了 Pandas 作为可选依赖项的问题
0.3.1 添加了 flight 错误的说明
0.3.1 添加了社区和烹饪示例
0.3.0 添加了自定义证书参数。(解决基于 Windows 的 gRPC SSL 问题)

接下来是什么?

我们希望您认为 InfluxDB 3.0 Python 客户端库中添加的新功能很有用。如果您有任何功能请求或错误需要报告,请通过 客户端存储库 打开一个问题。我们始终在寻找社区贡献者为我们的 3.0 客户端库做出贡献。您可以通过 Slack 与我们讨论您的贡献。