Python 和 InfluxDB v2.0 入门

导航至

更新InfluxDB 3.0 放弃了 Flux 和内置任务引擎。用户可以使用外部工具,例如基于 Python 的 Quix,在 InfluxDB 3.0 中创建任务。)

Telegraf 拥有 200 多个插件,在数据收集应用方面具有广泛的用途。但是,有时您需要收集自定义数据,或者您可能希望将外部工具集成到您的时间数据分析中。在这种情况下,利用 InfluxDB 的客户端库是有意义的。今天,我们将重点介绍如何将最新的 InfluxDB Python 客户端库 与 InfluxDB v2.0 一起使用。如果您正在运行 InfluxDB v1.x,请查看此教程

InfluxDB Python 客户端库

自 v1.x 以来,InfluxDB Python 客户端经历了一些相当大的改进。它更快——快得多——并且更易于使用。它支持多进程处理,并允许您将查询作为 Pandas DataFrame 返回。WriteAPI 支持同步、异步和批量写入 InfluxDB v2.0。WriteAPI 还支持 4 种不同的写入选项。QueryAPI 还支持多种查询选项,并具有中断查询流的能力。

要求

本教程在 MacOS 系统上执行,该系统通过 Homebrew 安装了 Python 3,并通过 Conda 安装了 Python 3.6 和 Python 3.7。执行多进程处理需要 Python 3.7。我建议设置额外的工具,例如 virtualenvpyenvconda-env,以简化 Python 和客户端安装。否则,完整的需求可以在此处找到。

安装

要安装 InfluxDB Python 客户端库,只需运行

pip install influxdb-client

如果您已经安装了客户端,可以使用以下命令升级它

pip3 install --upgrade influxdb-client

收集身份验证参数

为了使用客户端,您需要收集以下参数

  • Bucket 名称或 ID

请按照此文档创建 Bucket。要查看您的 Bucket,请使用 UI 或执行

influx -t <your-token> -o <your-org> bucket find
  • 令牌

请按照此文档创建令牌。要查看您的令牌,请使用 UI 或执行

influx auth find
  • 组织

查看您的组织,请使用 UI 或执行

influx org find

导入和连接

导入客户端

from influxdb_client import InfluxDBClient

建立连接

client = InfluxDBClient(url="http://localhost:9999", token=token, org=org)

写入

在本教程中,我们将探索所有与 Coyote Creek 水位相关的数据写入方法。我们的模式如下所示

Bucket:“my-bucket” Measurement:“h2o_feet” 标签键:“location” 标签值:“coyote_creek” 字段键:“water_level” 字段值:1

首先,实例化 WriteAPI

write_api = client.write_api()

WriteAPI 还支持 4 种不同的写入选项:Line Protocol String、Line Protocol Bytes、Data Point Structure 和 Dictionary Style。

写入选项一 - Line Protocol String

Line protocol 是 Influx 的摄取格式。当您已经有转换为 Line Protocol 的数据(例如 txt 文件)时,它非常有用。

write_api.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=1"])

写入选项二 - Line Protocol Bytes

这是已经使用 UTF-8 编码为字节数组的 Line Protocol。客户端在内部将 Line Protocol 字符串转换为此格式。

write_api.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=1".encode()])

写入选项三 - Data Point structure

当在客户端代码中构造 Line Protocol 时,此结构非常有用。Point() 类确保您的数据被正确序列化为 Line Protocol。

write_api.write("my-bucket", "my-org", Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 1).time(1))

写入选项四 - Dictionary Style

这是最具 Python 兼容性的数据表示方式。字典在内部转换回 Point() 结构。

write_api.write("my-bucket", "my-org", [{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, "fields": {"water_level": 1}, "time": 1}])

现在我们了解了所有可以写入数据库的方法,让我们探索如何执行这些写入。我们有一些可用的调整选项。我们可以指定同步和异步写入。我们还可以调整写入的批处理。

指定写入选项:同步写入

首先,您需要使用以下命令导入 write_option 方法

from influxdb_client.client.write_api import SYNCHRONOUS

要指定同步写入,只需使用该选项作为参数实例化 WriteAPI。

write_api = client.write_api(write_options=SYNCHRONOUS)

指定写入选项:异步写入

首先,您需要使用以下命令导入 write_option 方法

from influxdb_client.client.write_api import ASYNCHRONOUS

要指定异步写入,只需使用该选项作为参数实例化 WriteAPI。

write_api = client.write_api(write_options=ASYNCHRONOUS)

指定写入选项:批处理

WriteApi 的默认实例使用批处理。您可以指定以下批处理参数

  • batch_size:批处理中要收集的数据点数
  • flush_interval:批处理写入之前的毫秒数
  • jitter_interval:将批处理刷新间隔增加随机量的毫秒数
  • retry_interval:重试不成功写入的毫秒数。当 InfluxDB 服务器未指定“Retry-After”标头时,将使用重试间隔

请查看术语表以了解有关这些参数功能的更多信息。

批处理选项示例

write_api = client.write_api(write_options=WriteOptions(batch_size=500, flush_interval=10_000, jitter_interval=2_000, retry_interval=5_000))

查询数据库

在查询 Influx 时,您有两个选择。您可以选择使用表格结构查询,也可以选择使用流查询。您还可以在返回所需数据后中断流。但是,为了执行查询,我们必须首先实例化 QueryAPI。

首先,实例化 QueryAPI

query_api = client.query_api()

生成 Flux 查询

您必须使用 Flux 查询您的数据。如果您从未编写过 Flux 查询,我建议您探索以下资源

我们刚刚写入的数据点的 Flux 查询如下所示

query = ' from(bucket:"my-bucket")\
|> range(start: -10m)\
|> filter(fn:(r) => r._measurement == "h2o_level")\
|> filter(fn: (r) => r.location == "coyote_creek")\
|> filter(fn:(r) => r._field == "water_level" )'

然后我们可以使用表格结构将我们的数据作为以下内容返回

查询选项一 - 表格结构

result = client.query_api().query(org=my-org, query=query)

results = []
for table in result:
    for record in table.records:
        results.append((record.get_value(), record.get_field()))

print(results)
[(Water_level, 1)]

Flux 对象具有以下访问数据的方法

  • .get_measurement()
  • .get_field()
  • .values.get("<您的标签>")
  • .get_time()
  • .get_start()
  • .get_stop()
  • .get_measurement()

查询选项二 - 流

records = client.query_api.query_stream(org= "my-org", query=query)

for record in records:
    print(f'Temperature in {record["location"]} is {record["_value"]}')

Temperature in coyote_creek is 1.

使用 Python 客户端将点写入和查询到 InfluxDB 的脚本

现在我们了解了我们所有的选项,让我们看看一个完整的脚本可能是什么样子,以便使用 Python 客户端将点写入和查询到 InfluxDB。您可以在此处找到包含此脚本的存储库。

from influxdb_client import InfluxDBClient

org = "my-org"
bucket = "my-bucket"
token = $my-token
query = 'from(bucket: "my-bucket")\
|> range(start: -10m)\
|> filter(fn: (r) => r._measurement == "h2o_level")\
|> filter(fn: (r) => r._field == "water_level")\
|> filter(fn: (r) => r.location == "coyote_creek")'

#establish a connection
client = InfluxDBClient(url="http://localhost:9999", token=token, org=org)

#instantiate the WriteAPI and QueryAPI
write_api = client.write_api()
query_api = client.query_api()

#create and write the point
p = Point("h2o_level").tag("location", "coyote_creek").field("water_level", 1)
write_api.write(bucket=bucket,org=org,record=p)
#return the table and print the result
result = client.query_api().query(org=org, query=query)
results = []
for table in result:
    for record in table.records:
        results.append((record.get_value(), record.get_field()))
print(results)

我希望本教程可以帮助您开始使用 Influx。与往常一样,如果您遇到障碍,请在我们的社区站点Slack 频道上分享它们。我们很乐意获得您的反馈并帮助您解决遇到的任何问题。