将 Python 客户端库与 InfluxDB v3 Core 结合使用

导航至

期待已久的 InfluxDB 3 Core 终于发布了,它引入了一种管理时间序列数据的强大新方法。InfluxDB 3 Core 是一个用于时间序列和事件数据的开源最新数据引擎。目前,它在 MIT/Apache 2 许可下以公开 Alpha 版本发布。在本文中,我们将深入探讨如何使用 Python 客户端库查询和写入数据,通过清晰、实践的示例释放 InfluxDB v3 Core 的全部潜力。让我们开始了解这个颠覆性的版本提供了什么!要深入了解 InfluxDB v3 Python 客户端库,我建议阅读这个分为两部分的博客系列:客户端库深入剖析:Python(第 1 部分)第 2 部分,其中详细解释了数据对象和文件类型支持。

要求

我建议在安装 Python 客户端库之前创建一个 Python 虚拟环境

$ python3 -m venv ./.venv
$ source .venv/bin/activate
$ pip install –upgrade pip
$ pip install influxdb3-python

您还需要安装 InfluxDB v3 Core。请按照此处的安装说明进行操作。

创建 InfluxDB v3 资源和 CLI 导览

在我们使用 Python 客户端库读取数据到 InfluxDB v3 Core 之前,我们首先需要创建一个令牌。InfluxDB v3 Core CLI 提供了一种管理身份验证设置的简单方法。使用 influxdb3 CLI,您可以启动服务器、创建令牌,并通过查询和写入与数据交互。例如,您可以使用 influxdb3 create 命令创建令牌,并使用 influxdb3 writeinfluxdb3 query 命令来管理和浏览您的数据。

首先,我们需要使用以下命令生成一个新令牌

influxdb3 create token

您应该看到以下输出

Token: apiv3_xxx
Hashed Token: zzz
Start the server with `influxdb3 serve --bearer-token zzz`
HTTP requests require the following header: "Authorization: Bearer apiv3_xxx"
This will grant you access to every HTTP endpoint or deny it otherwise

Hashed Token 是明文 Token 的加密表示。通过将 Hashed Token 传递到服务器,您可以避免在命令行、日志或配置文件中暴露明文令牌。因此,当客户端在 HTTP 请求中发送明文承载令牌时,服务器会对接收到的令牌进行哈希处理,并将哈希结果与您在启动时提供的哈希令牌进行比较。这确保了服务器可以安全地验证明文令牌,而无需直接存储或处理它。

现在,您可以选择使用承载令牌来服务 influxdb3

influxdb3 serve --host-id=local01 --object-store memory --bearer-token zzz

或者,您可以服务实例并将对象存储在本地文件系统中

influxdb3 serve --host-id=local01 --object-store file --data-dir ~/.influxdb3 --bearer-token zzz

Parquet 文件充当 InfluxDB 3.0 的持久化数据格式,使对象存储成为长期数据保留的首选解决方案。这种方法显着降低了存储成本,同时保持了出色的性能。--object-store 选项允许用户指定他们希望将 Parquet 文件写入的位置。您可以选择将这些文件写入内存、本地文件系统、Amazon S3、Azure Blob Storage、Google Cloud Storage 或任何云存储。

接下来,我们可以创建一个数据库并写入数据,命令如下

influxdb3 write --dbname airSensors --file test_data

在本示例中,test_data 是一个包含 Line Protocol 数据的文件,这是 InfluxDB 的摄取格式。您可以在此处找到一系列 Line Protocol 实时数据集。例如,您可以使用一些 空气传感器数据(或 air-sensor-data.lp

airSensors,sensor_id=TLM0100 temperature=71.24021491535241,humidity=35.0752743309533,co=0.5098629816173851 1732669098000000000
airSensors,sensor_id=TLM0101 temperature=71.84309523593232,humidity=34.934199682459,co=0.5034259382294339 1732669098000000000
airSensors,sensor_id=TLM0102 temperature=71.95391915782443,humidity=34.92433120092046,co=0.5175197455105179 1732669098000000000

使用 influxdb3 write 命令写入数据后,您应该看到以下确认信息

success

现在,我们可以使用以下命令成功查询数据

influxdb3 query --dbname=airSensors "SELECT * FROM airSensors LIMIT 10"

使用 cURL 进行测试

当然,有时在使用客户端 SDK 之前,通过执行 cURL 请求来测试您的令牌会很有帮助。例如,您也可以使用以下命令将数据写入 test_db 数据库

curl \
"http://127.0.0.1:8181/api/v2/write?bucket=test_db&precision=s" \
--header "Authorization: Bearer apiv3_xxx" \
--data-binary 'home,room=kitchen temp=72 1732669098'

重要提示

  1. 默认情况下,InfluxDB Core 在端口 8181 上运行。
  2. 数据库在写入时通过 CLI、API 或客户端创建。

InfluxDB v3 Core 和 Python 客户端库

现在我们已经确认可以成功创建存储桶、令牌并使用 cURL 请求进行写入,接下来让我们使用 InfluxDB v3 Python 客户端库也来写入数据。write 方法支持写入几种不同的数据对象,包括 Points、Pandas DataFrames 和 Polars DataFrames。在本教程中,我们将重点介绍写入和返回 Pandas DataFrame。有关使用其他数据对象的说明,请参阅此示例目录或客户端库深入剖析:Python(第 1 部分)

from influxdb_client_3 import InfluxDBClient3
import pandas as pd
import numpy as np

# Initialize Client with the correct authorization scheme
client = InfluxDBClient3(
    host="http://127.0.0.1:8181",
    token="apiv3_xxx",
    org="",
    database="test",
    auth_scheme="Bearer"
)

print("Connection Successful!")

# Create a dataframe
df = pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})

# Create a range of datetime values
dates = pd.date_range(start='2025-01-01', end='2025-01-02', freq='1H')

# Create a DataFrame with random data and datetime index
df = pd.DataFrame(
    np.random.randn(
        len(dates),
        3),
    index=dates,
    columns=[
        'Column 1',
        'Column 2',
        'Column 3'])
df['tagkey'] = 'Hello World'

print(df)

# Write the DataFrame to InfluxDB
try:
    client.write(df, data_frame_measurement_name='table', data_frame_tag_columns=['tagkey'])
    print("DataFrame successfully written to InfluxDB!")
except Exception as e:
    print(f"Failed to write to InfluxDB: {e}")

# Query the DataFrame from InfluxDB
try:
    query = '''SELECT * from "table"'''
    table = client.query(query=query, language="sql", mode="pandas")
    print("SQL Query Results:")
    print(table)

except Exception as e:
    print(f"SQL Query failed: {e}")

此代码初始化一个 InfluxDBClient3 以连接到 InfluxDB Core 实例,并定义一个包含随机生成数据的 Pandas DataFrame。该脚本将此 DataFrame 写入数据库,然后使用 SQL 查询从 test 数据库的 table 中检索所有条目,并将结果打印为 Pandas DataFrame。try-except 块处理写入和查询操作期间的潜在错误。有关如何使用 Python 客户端库写入其他类型的数据(包括 Parquet、CSV、JSON、Polars)以及使用 InfluxQL 进行查询的更多详细信息,请参阅这些示例

最终想法

最后但并非最不重要的一点是,如果您想停止运行 influxdb3 服务器,您可以先使用以下命令返回 PID 来终止该进程

pgrep influxdb3

然后使用

kill <PID> 

InfluxDB v3 CLI 仍在开发中。未来,用户应该能够创建数据库,而无需向其写入数据。这应该有助于使用 InfuxDB v3 Core,尽管使用 write 命令并验证您的写入是否也成功是有优势的。此外,当未来在 InfluxDB v3 Core 中添加额外支持时,必须对 InfluxDB v3 Python 客户端库进行更新。但就目前而言,我希望这篇博文能帮助您开始使用 InfluxDB v3 Core。

要开始使用,请在此处下载 InfluxDB 3 Core。请在 InfluxDB Core 的 Discord 上分享您的第一印象、想法和反馈。在 InfluxDB Core 的 Alpha 版本发布期间,您的经验和意见对我们非常重要。如果您需要帮助,请在我们的社区站点Slack 频道上联系我们。如果您也在使用 InfluxDB 进行数据处理项目,我很乐意收到您的来信!