InfluxDB Python 客户端库:WriteAPI 深度解析

导航至

InfluxDB 是一个开源时序数据库。专为处理来自物联网设备到企业应用程序产生的大量时间戳数据而构建。由于 InfluxDB 的数据源可能存在于许多不同的情况和场景中,因此提供不同的数据进入 InfluxDB 的方法至关重要。

InfluxDB 客户端库 是特定于语言的软件包,与 InfluxDB v2 API 集成。这些库为用户提供了发送、查询和管理 InfluxDB 的强大方法。查看此TL;DR,以获得客户端库的优秀概述。这些库以多种语言提供,包括 PythonJavaScriptGoC#Java 以及许多其他语言。

这篇文章将引导用户了解如何获取 Python 客户端库和 API 结构,并演示如何使用 Python 连接、写入和准备数据!Python 因其易学易用性而受到开发人员的广泛增长和采用。

入门

下载

InfluxDB Python 客户端库可直接从 PyPI 获取,以便使用 pip 轻松安装或作为项目中的依赖项

pip install influxdb-client

InfluxDB Python 客户端库支持 InfluxDB Cloud、InfluxDB 2.x 和 InfluxDB 1.8。它经过构建和测试,支持 Python 3.6 及更高版本。

请注意,对 InfluxDB 1.8 的支持仅限于 API 的子集,并且需要一些差异;这些将在本文后面部分指出。

软件包扩展

客户端库有意保持小尺寸和少依赖项。但是,还有其他软件包扩展可用,用户可以使用它们来拉取其他依赖项并启用一些附加功能

  • influxdb-client[ciso]:使用 ciso8601 日期时间解析器。它利用 C 绑定,从而更快地处理日期时间对象,但代价是需要使用 C 绑定。

  • influxdb-client[async]:顾名思义,如果用户的工具使用 async 和 await Python 命令,则允许使用异步请求并从中受益。

  • influxdb-client[extras]:添加了使用 Pandas DataFrames 的能力。Pandas 库是一种常用的数据分析工具。这些额外的依赖项尺寸较大且并非总是需要;因此,它作为单独的额外软件包包含在内。

API & 文档

客户端库 API 和文档可在 Read the Docs 上获取。

源代码

如果用户想要从源代码构建或使用该库,则可以在 GitHub 上获取

git clone https://github.com/influxdata/influxdb-client-python

API 概述

在高层面上,API 由客户端组成,提供对 InfluxDB 为特定实例公开的各种 API 的访问。

InfluxDBClient 用于处理身份验证参数并连接到 InfluxDB。有几种不同的方法可以指定参数,以下部分将演示这些方法。

InfluxDB 基础知识

连接后,有三个 API 处理与 InfluxDB 的基本交互

  • WriteApi:将时序数据写入 InfluxDB

  • QueryApi:使用 Flux 查询 InfluxDB,Flux 是 InfluxDB 的函数式数据脚本语言

  • DeleteApi:删除 InfluxDB 中的时序数据

任务和脚本

用户还可以使用客户端库来创建任务、可调用脚本和标签

  • TasksApi:使用任务(计划的 Flux 查询)来输入数据流,然后相应地分析、修改和处理数据

  • InvokableScriptsApi:创建自定义 InfluxDB API 端点,用于查询、处理和塑造数据。要了解有关可调用脚本如何增强用户能力,请查看此 TL;DR 以获取更多详细信息!

InfluxDB 管理

最后,用户可以通过最后一组 API 直接管理他们的实例

  • BucketsApi:创建、管理和删除存储桶

  • OrganizationApi:创建、管理和删除组织

  • UsersApi:创建、管理和删除用户

  • LabelsApi:向 InfluxDB UI 中的仪表板、任务和其他项目添加可视化元数据

另请查看 InfluxData Meet the Developer 视频,以获取有关使用这些 API 的更多指导步骤!

InfluxDBClient 设置

用户首先需要创建一个客户端才能访问各种 API。客户端需要连接信息,其中包括以下内容

  1. URL: InfluxDB 实例的 URL(例如,http://192.168.100.10:8086),其中包含主机名或 IP 地址和端口。另请注意,如果在服务器上设置了安全 HTTP,则用户将需要使用 https:// 协议。

  2. 访问令牌: 用于向 InfluxDB 验证身份的访问令牌。如果使用 InfluxDB 1.8,则使用用户名和密码代替令牌。使用 username:password 格式设置令牌参数。

  3. Org: 令牌有权访问的组织。在 InfluxDB 1.8 中,没有组织的概念。org 参数将被忽略,可以留空。

以上连接信息可以通过文件、环境或代码指定。

通过配置文件

用户可以使用配置文件指定令牌,而不是在代码中硬编码令牌,并限制用户对配置文件的访问。

该文件可以使用 tomlini 格式。以下是两者的示例

# TOML-based config
[influx2]
url = "http://localhost:8086"
org = "my-org"
token = "my-token"
; ini-based config
[influx2]
url = http://localhost:8086
org = my-org
token = my-token

用户还可以指定其他配置详细信息,例如超时、代理设置和应用于数据的全局标签。查看整个配置设置列表,包括新数据的默认标签。

然后在代码中,用户可以加载文件并创建一个客户端,如下所示

from influxdb_client import InfluxDBClient

with InfluxDBClient.from_config_file("config.toml") as client:
    # use the client to access the necessary APIs
    # for example, write data using the write_api
    with client.write_api() as writer:
        writer.write(bucket="testing", record="sensor temp=23.3")

通过环境变量

用户可以导出或设置以下任何环境变量

INFLUXDB_V2_URL="http://localhost:8086"
INFLUXDB_V2_ORG="my-org"
INFLUXDB_V2_TOKEN="my-token"

请参阅文档以获取 已识别环境变量 的完整列表,包括设置新数据的默认标签。

然后在代码中,用户可以创建一个客户端,如下所示

from influxdb_client import InfluxDBClient

with InfluxDBClient.from_env_properties() as client:
    # use the client to access the necessary APIs
    # for example, write data using the write_api
    with client.write_api() as writer:
        writer.write(bucket="testing", record="sensor temp=23.3")

通过代码

客户端库用户还可以在代码中提供必要的信息。不建议使用此方法,因为它会导致硬编码的令牌存在于代码中。 虽然它很容易上手,但将凭据放在配置文件中是首选选项。

from influxdb_client import InfluxDBClient

url = "http://localhost:8086"
token = "my-token"
org = "my-org"

with InfluxDBClient(url, token) as client:
    # use the client to access the necessary APIs
    # for example, write data using the write_api
    with client.write_api() as writer:
        writer.write(bucket="testing", org=org, record="sensor temp=23.3")

请注意,使用文件或环境变量设置的配置指定了一个组织。该组织是查询、写入和删除 API 的默认组织。用户还可以在进行查询、写入或删除时指定不同的组织来覆盖设置值。

文档列出了创建客户端时 可能的其他参数

使用 WriteApi 写入数据

创建客户端后,用户就可以使用各种 API 了。以下将演示写入查询 API 以将数据发送到 InfluxDB。

批处理

默认情况下,客户端将尝试每秒以 1,000 个批次发送数据

from influxdb_client import InfluxDBClient

with InfluxDBClient.from_config_file("config.toml") as client:
    with client.write_api() as writer:
        writer.write(bucket="testing", record="sensor temp=23.3")

如果遇到错误,客户端会在五秒后重试,并对其他错误使用指数退避,重试之间最多 125 秒。重试尝试五次或最多等待 180 秒,以先发生者为准。

用户可以通过在创建 write_api 对象时设置 write_options 值来自由修改这些设置。基于时间的选项以毫秒为单位。

from influxdb_client import InfluxDBClient, WriteOptions

options = WriteOptions(
    batch_size=500,
    flush_interval=10_000,
    jitter_interval=2_000,
    retry_interval=5_000,
    max_retries=5,
    max_retry_delay=30_000,
    exponential_base=2
)

with InfluxDBClient.from_config_file("config.toml") as client:
    with client.write_api(write_options=options) as writer:
        writer.write(bucket="testing", record="sensor temp=23.3")

同步

虽然这不是写入数据的默认方法,但同步写入是建议的数据写入方法。此方法使捕获错误并对其做出响应变得更容易。此外,用户仍然可以将他们的数据分解为批次,无论是手动分解还是使用像 Rx 这样的库来获得与批处理写入类似的行为。

from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS

with InfluxDBClient.from_config_file("config.toml") as client:
    with client.write_api(write_options=SYNCHRONOUS) as writer:
        writer.write(bucket="testing", record="sensor temp=23.3")

异步

如果用户不希望在将数据发送到 InfluxDB 时阻止其应用程序,则可以使用异步客户端和写入 API。请记住,使用异步需要使用 influxdb-client[async] 软件包扩展和特殊的异步客户端,以及对不同 API 的访问

import asyncio

from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync

async def main():
    async with InfluxDBClientAsync(
        url="http://localhost:8086", token="my-token", org="my-org"
    ) as client:
        await client.write_api().write(bucket="my-bucket", record="sensor temp=23.3")

if __name__ == "__main__":
    asyncio.run(main())

准备数据的不同方法

InfluxDB 使用 行协议格式,该格式由测量名称和字段以及可选的标签和时间戳组成。客户端库允许以几种不同的方式指定数据,用户可以使用最适合导入数据格式的选项!

字符串

第一个选项是包含行协议格式的字符串。如果用户直接从文件读取 Influx 行协议,或者想要在使用 Python 数据时构建数据字符串,则此选项演示了一种选项。

records = """
cpu,core=0 temp=25.3 1657729063
cpu,core=0 temp=25.4 1657729078
cpu,core=0 temp=25.2 1657729093
"""

换行符必须分隔行协议中的每个条目。没有 \n 的同一行上的条目将导致解析数据时出错。

字典

第二个选项使用字典,该字典指定行协议格式的各个部分。对于正在解析文件并同时构建其数据点的用户来说,此选项可能是最佳选择。

records = [
    {
 "measurement": "cpu",
    	 "tags": {"core": "0"},
    	 "fields": {"temp": 25.3},
    	 "time": 1657729063
    },
    {
    	 "measurement": "cpu",
    	 "tags": {"core": "0"},
    	 "fields": {"temp": 25.4},
    	 "time": 1657729078
    },
    {
 "measurement": "cpu",
    	 "tags": {"core": "0"},
    	 "fields": {"temp": 25.2},
    	 "time": 1657729093
    },
]

Point 辅助类

客户端库有一个 Point 类,允许用户轻松构建测量。此类帮助用户将数据格式化为行协议的各个部分,从而确保正确序列化的数据。标签和字段是可重复的,允许一次添加多个标签和字段。

from influxdb_client import Point

records = [
	Point("cpu").tag("core", "0").field("temp", 25.3).time(1657729063),
	Point("cpu").tag("core", "0").field("temp", 25.4).time(1657729078),
	Point("cpu").tag("core", "0").field("temp", 25.2).time(1657729093),
]

Pandas DataFrame

当安装了 influxdb-client-python[extras] 扩展软件包时,用户可以直接传入 Pandas DataFrames。用户可以直接传入数据帧,并指定要用作标签的列和测量名称。

import pandas as pd

from influxdb_client import InfluxDBClient

records = pd.DataFrame(
    data=[
        ["0", 25.3, 1657729063],
    	  ["0", 25.4, 1657729078],
    	  ["0", 25.2, 1657729093],
    ],
    columns=["core", "temp", "timestamp"],
)

with InfluxDBClient.from_config_file("config.toml") as client:
    with client.write_api() as writer:
        writer.write(
        	bucket="testing",
        	record=records,
        	data_frame_measurement_name="cpu",
        	data_frame_tag_columns=["core"],
        )

请注意,创建 Pandas DataFrames 的方法有很多,这只是一个示例。有关更多示例,请参阅 Pandas DataFrame 文档

数据类

利用 Python 数据类 的用户可以直接传入它们,然后在传递数据时指定要用于标签、字段和时间戳的属性。数据类最初在 Python 3.7 中通过 PEP 557 提供。

from dataclasses import dataclass

from influxdb_client import InfluxDBClient

@dataclass
class CPU:
    core: str
    temp: float
    timestamp: int

records = [
    CPU("0", 25.3, 1657729063),
    CPU("0", 25.4, 1657729078),
    CPU("0", 25.2, 1657729093),
]

with InfluxDBClient.from_config_file("config.toml") as client:
    with client.write_api() as writer:
        writer.write(
            bucket="testing",
            record=records,
            record_measurement_name="cpu",
            record_tag_keys=["core"],
            record_field_keys=["temp"],
            record_time_key="timestamp",
    	  )

命名元组

最后,命名元组 为元组中的每个位置分配含义,从而允许代码更具可读性、自我记录性。用户可以直接传入命名元组,然后指定应将哪个元组字段名称用作标签、字段和时间戳。

from collections import namedtuple

from influxdb_client import InfluxDBClient

class CPU:
	def __init__(self, core, temp, timestamp):
    	self.core = core
    	self.temp = temp
    	self.timestamp = timestamp

record = namedtuple("CPU", ["core", "temp", "timestamp"])

records = [
	record("0", 25.3, 1657729063),
	record("0", 25.4, 1657729078),
	record("0", 25.2, 1657729093),
]

with InfluxDBClient.from_config_file("config.toml") as client:
	with client.write_api() as writer:
    	writer.write(
        	bucket="testing",
        	record=records,
        	record_measurement_name="cpu",
        	record_tag_keys=["core"],
        	record_field_keys=["temp"],
        	record_time_key="timestamp",
    	)

立即查看 Python 客户端库

这篇文章展示了 Python InfluxDB 客户端 库是多么快速、简单和灵活易用。虽然上面只演示了写入 API,但它开始演示用户在与 InfluxDB 交互时可以拥有的强大功能。结合其他 API,用户拥有更多选择和潜力。

考虑一下您可以在哪里使用 InfluxDB客户端库,并立即试用一下!