将 Python 客户端库与 InfluxDB v3 Core 结合使用
作者:Anais Dotis-Georgiou / 开发者
2025 年 1 月 15 日
导航至
期待已久的 InfluxDB 3 Core 终于发布了,它引入了一种管理时间序列数据的强大新方法。InfluxDB 3 Core 是一个用于时间序列和事件数据的开源最新数据引擎。目前,它在 MIT/Apache 2 许可下以公开 Alpha 版本发布。在本文中,我们将深入探讨如何使用 Python 客户端库查询和写入数据,通过清晰、实践的示例释放 InfluxDB v3 Core 的全部潜力。让我们开始了解这个颠覆性的版本提供了什么!要深入了解 InfluxDB v3 Python 客户端库,我建议阅读这个分为两部分的博客系列:客户端库深入剖析:Python(第 1 部分) 和 第 2 部分,其中详细解释了数据对象和文件类型支持。
要求
我建议在安装 Python 客户端库之前创建一个 Python 虚拟环境
$ python3 -m venv ./.venv
$ source .venv/bin/activate
$ pip install –upgrade pip
$ pip install influxdb3-python
您还需要安装 InfluxDB v3 Core。请按照此处的安装说明进行操作。
创建 InfluxDB v3 资源和 CLI 导览
在我们使用 Python 客户端库读取数据到 InfluxDB v3 Core 之前,我们首先需要创建一个令牌。InfluxDB v3 Core CLI 提供了一种管理身份验证设置的简单方法。使用 influxdb3
CLI,您可以启动服务器、创建令牌,并通过查询和写入与数据交互。例如,您可以使用 influxdb3 create
命令创建令牌,并使用 influxdb3 write
和 influxdb3 query
命令来管理和浏览您的数据。
首先,我们需要使用以下命令生成一个新令牌
influxdb3 create token
您应该看到以下输出
Token: apiv3_xxx
Hashed Token: zzz
Start the server with `influxdb3 serve --bearer-token zzz`
HTTP requests require the following header: "Authorization: Bearer apiv3_xxx"
This will grant you access to every HTTP endpoint or deny it otherwise
Hashed Token
是明文 Token
的加密表示。通过将 Hashed Token
传递到服务器,您可以避免在命令行、日志或配置文件中暴露明文令牌。因此,当客户端在 HTTP 请求中发送明文承载令牌时,服务器会对接收到的令牌进行哈希处理,并将哈希结果与您在启动时提供的哈希令牌进行比较。这确保了服务器可以安全地验证明文令牌,而无需直接存储或处理它。
现在,您可以选择使用承载令牌来服务 influxdb3
influxdb3 serve --host-id=local01 --object-store memory --bearer-token zzz
或者,您可以服务实例并将对象存储在本地文件系统中
influxdb3 serve --host-id=local01 --object-store file --data-dir ~/.influxdb3 --bearer-token zzz
Parquet 文件充当 InfluxDB 3.0 的持久化数据格式,使对象存储成为长期数据保留的首选解决方案。这种方法显着降低了存储成本,同时保持了出色的性能。--object-store
选项允许用户指定他们希望将 Parquet 文件写入的位置。您可以选择将这些文件写入内存、本地文件系统、Amazon S3、Azure Blob Storage、Google Cloud Storage 或任何云存储。
接下来,我们可以创建一个数据库并写入数据,命令如下
influxdb3 write --dbname airSensors --file test_data
在本示例中,test_data
是一个包含 Line Protocol 数据的文件,这是 InfluxDB 的摄取格式。您可以在此处找到一系列 Line Protocol 实时数据集。例如,您可以使用一些 空气传感器数据(或 air-sensor-data.lp
)
airSensors,sensor_id=TLM0100 temperature=71.24021491535241,humidity=35.0752743309533,co=0.5098629816173851 1732669098000000000
airSensors,sensor_id=TLM0101 temperature=71.84309523593232,humidity=34.934199682459,co=0.5034259382294339 1732669098000000000
airSensors,sensor_id=TLM0102 temperature=71.95391915782443,humidity=34.92433120092046,co=0.5175197455105179 1732669098000000000
使用 influxdb3 write
命令写入数据后,您应该看到以下确认信息
success
现在,我们可以使用以下命令成功查询数据
influxdb3 query --dbname=airSensors "SELECT * FROM airSensors LIMIT 10"
使用 cURL 进行测试
当然,有时在使用客户端 SDK 之前,通过执行 cURL 请求来测试您的令牌会很有帮助。例如,您也可以使用以下命令将数据写入 test_db
数据库
curl \
"http://127.0.0.1:8181/api/v2/write?bucket=test_db&precision=s" \
--header "Authorization: Bearer apiv3_xxx" \
--data-binary 'home,room=kitchen temp=72 1732669098'
重要提示
- 默认情况下,InfluxDB Core 在端口 8181 上运行。
- 数据库在写入时通过 CLI、API 或客户端创建。
InfluxDB v3 Core 和 Python 客户端库
现在我们已经确认可以成功创建存储桶、令牌并使用 cURL 请求进行写入,接下来让我们使用 InfluxDB v3 Python 客户端库也来写入数据。write 方法支持写入几种不同的数据对象,包括 Points、Pandas DataFrames 和 Polars DataFrames。在本教程中,我们将重点介绍写入和返回 Pandas DataFrame。有关使用其他数据对象的说明,请参阅此示例目录或客户端库深入剖析:Python(第 1 部分)。
from influxdb_client_3 import InfluxDBClient3
import pandas as pd
import numpy as np
# Initialize Client with the correct authorization scheme
client = InfluxDBClient3(
host="http://127.0.0.1:8181",
token="apiv3_xxx",
org="",
database="test",
auth_scheme="Bearer"
)
print("Connection Successful!")
# Create a dataframe
df = pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})
# Create a range of datetime values
dates = pd.date_range(start='2025-01-01', end='2025-01-02', freq='1H')
# Create a DataFrame with random data and datetime index
df = pd.DataFrame(
np.random.randn(
len(dates),
3),
index=dates,
columns=[
'Column 1',
'Column 2',
'Column 3'])
df['tagkey'] = 'Hello World'
print(df)
# Write the DataFrame to InfluxDB
try:
client.write(df, data_frame_measurement_name='table', data_frame_tag_columns=['tagkey'])
print("DataFrame successfully written to InfluxDB!")
except Exception as e:
print(f"Failed to write to InfluxDB: {e}")
# Query the DataFrame from InfluxDB
try:
query = '''SELECT * from "table"'''
table = client.query(query=query, language="sql", mode="pandas")
print("SQL Query Results:")
print(table)
except Exception as e:
print(f"SQL Query failed: {e}")
此代码初始化一个 InfluxDBClient3 以连接到 InfluxDB Core 实例,并定义一个包含随机生成数据的 Pandas DataFrame。该脚本将此 DataFrame 写入数据库,然后使用 SQL 查询从 test
数据库的 table
中检索所有条目,并将结果打印为 Pandas DataFrame。try-except 块处理写入和查询操作期间的潜在错误。有关如何使用 Python 客户端库写入其他类型的数据(包括 Parquet、CSV、JSON、Polars)以及使用 InfluxQL 进行查询的更多详细信息,请参阅这些示例。
最终想法
最后但并非最不重要的一点是,如果您想停止运行 influxdb3 服务器,您可以先使用以下命令返回 PID 来终止该进程
pgrep influxdb3
然后使用
kill <PID>
InfluxDB v3 CLI 仍在开发中。未来,用户应该能够创建数据库,而无需向其写入数据。这应该有助于使用 InfuxDB v3 Core,尽管使用 write 命令并验证您的写入是否也成功是有优势的。此外,当未来在 InfluxDB v3 Core 中添加额外支持时,必须对 InfluxDB v3 Python 客户端库进行更新。但就目前而言,我希望这篇博文能帮助您开始使用 InfluxDB v3 Core。
要开始使用,请在此处下载 InfluxDB 3 Core。请在 InfluxDB Core 的 Discord 上分享您的第一印象、想法和反馈。在 InfluxDB Core 的 Alpha 版本发布期间,您的经验和意见对我们非常重要。如果您需要帮助,请在我们的社区站点或 Slack 频道上联系我们。如果您也在使用 InfluxDB 进行数据处理项目,我很乐意收到您的来信!