InfluxDB 3.0 Python客户端更新:添加Polars支持
作者:Jay Clifford / 开发者
2024年1月26日
导航到
自从我们上次发布了关于InfluxDB 3.0 Python客户端的消息以来,已经有一段时间了。让我们来看看有哪些新功能!
Polars DataFrame读取
2023年,数据分析领域出现了一个新秀,那就是Polars。Polars DataFrame库是OG Pandas的替代品。尽管两者都服务于相同的用例,但它们在技术上都是基于不同的技术构建的。
我们收到最多的社区请求之一是提供与Polars更好的兼容性。由于Polars是基于Apache Arrow构建的,我们扩展了mode函数,包括polars
。只需查询数据并将模式修改为以下所示polars
import polars as pl
from influxdb_client_3 import InfluxDBClient3
with InfluxDBClient3(
token="",
host="eu-central-1-1.aws.cloud2.influxdata.com",
org="6a841c0c08328fb1") as client:
sql = 'SELECT * FROM caught LIMIT 100000'
df = client.query(database="pokemon-codex", query=sql, language='sql', mode='polars')
print(df, flush=True)
我们在底层客户端代码中调用Polars函数from_arrow()
。这会将我们的Arrow表格自动转换为Polars DataFrame。注意:您必须安装Polars DataFrame库才能使用此模式。
摄取过程略有不同。与 V1 和 V2 类似,InfluxDB V3 期望行协议(LP)作为其主要的摄取方法。这意味着我们在客户端库中构建了 LP 的转换器。Polars 提供了一个极其高效的 用户定义函数(UDF)功能,这使得实现这个新的转换器变得简单直接。我们将新的 Polars 数据帧转换器构建到现有的数据帧转换器类中。以下是一个示例
import polars as pl
from influxdb_client_3 import InfluxDBClient3,InfluxDBError,WriteOptions,write_client_options
class BatchingCallback(object):
def success(self, conf, data: str):
print(f"Written batch: {conf}, data: {data}")
def error(self, conf, data: str, exception: InfluxDBError):
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
def retry(self, conf, data: str, exception: InfluxDBError):
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
callback = BatchingCallback()
write_options = WriteOptions(batch_size=10000,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=10,
max_retry_delay=15_000,
exponential_base=2, max_close_wait=900_000)
wco = write_client_options(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry,
WriteOptions=write_options
)
client = InfluxDBClient3(
token="token",
host="eu-central-1-1.aws.cloud2.influxdata.com",
org="6a841c0c08328fb1", enable_gzip=True, write_client_options=wco)
pl_df =pl.read_parquet('pokemon_100_000.parquet')
client.write(database="pokemon-codex",
record=pl_df, data_frame_measurement_name='caught',
data_frame_tag_columns=['trainer', 'id', 'num'],
data_frame_timestamp_column='timestamp')
client.close()
在这种情况下,你可以看到它包括了我们在编写 Pandas 数据帧时使用的相同参数。我们在写入 API 中区分数据帧类型并调用正确的转换器。
小贴士:请确保包含 data_frame_timestamp_column=
并指定你的时间戳列。Polars 不像 Pandas 提供索引方法,所以我们无法自动区分哪个是正确的列。
自定义 Arrow Flight 头部
另一个请求的功能是包含自定义的 Arrow Flight 调用选项 以供查询使用。这允许熟悉 Arrow Flight 的用户使用底层配置参数。一个简单的例子可以是为特定的查询增加超时时间
df = client.query(database="pokemon-codex", query=sql, language='sql', mode='polars', timeout=5)
错误和杂项
最后,这里是一个小的变更历史列表
版本 | 变更 |
0.3.4 / 0.3.3 | 将 V2 写入 API 合并到 V3 中,并删除了 V2 客户端库作为依赖项。 |
0.3.4 / 0.3.3 | 为集群用户添加了自定义端口声明 |
0.3.2 | 修复了 Pandas 作为可选依赖项的问题 |
0.3.1 | 添加了 flight 错误的说明 |
0.3.1 | 添加了社区和烹饪示例 |
0.3.0 | 添加了自定义证书参数。(解决基于 Windows 的 gRPC SSL 问题) |
接下来是什么?
我们希望您认为 InfluxDB 3.0 Python 客户端库中添加的新功能很有用。如果您有任何功能请求或错误需要报告,请通过 客户端存储库 打开一个问题。我们始终在寻找社区贡献者为我们的 3.0 客户端库做出贡献。您可以通过 Slack 与我们讨论您的贡献。