InfluxDB Python客户端库:深入了解WriteAPI

导航到

InfluxDB是一个开源的时间序列数据库。它被构建来处理从物联网设备到企业应用产生的海量时间戳数据。由于InfluxDB的数据来源可以存在于许多不同的情况和场景中,因此提供不同的方法将数据导入InfluxDB是至关重要的。

InfluxDB的客户端库是与InfluxDB v2 API集成的特定语言的包。这些库为用户提供了一种强大的方法来发送、查询和管理InfluxDB。查看这个TL;DR,了解客户端库的概述。这些库支持多种语言,包括PythonJavaScriptGoC#Java等。

本文将引导用户获取Python客户端库和API结构,并演示如何使用Python进行连接、写入和准备数据!由于Python易于学习和使用,因此它受到了开发者的广泛欢迎和采用。

入门

下载

InfluxDB Python客户端库可以直接从PyPI下载,使用pip进行轻松安装,或在项目中作为依赖项

pip install influxdb-client

InfluxDB Python客户端库支持InfluxDB Cloud、InfluxDB 2.x和InfluxDB 1.8。它被构建和测试以支持Python 3.6及更高版本。

请注意,InfluxDB 1.8的支持仅限于API的子集,并需要一些差异;这些差异将在本文中进一步说明。

包额外内容

客户端库有意保持小巧的大小和依赖性。但是,用户可以使用额外的包额外内容来引入其他依赖项并启用一些附加功能

  • influxdb-client[ciso]:使用ciso8601日期时间解析器。它利用C绑定,这导致在处理日期时间对象时速度更快,但需要使用C绑定。

  • influxdb-client[async]:顾名思义,它允许在用户工具使用async和await Python命令时使用和受益于客户端库的异步请求。

  • influxdb-client[extras]:添加了使用Pandas 数据框的能力。Pandas库是一个常用的数据分析工具。这些额外的依赖项很大,并不总是需要;因此,它被包括为单独的额外包。

API与文档

客户端库API和文档可在Read the Docs上找到。

源代码

如果用户想要从源代码构建或使用库,它可在GitHub上找到。

git clone https://github.com/influxdata/influxdb-client-python

API概述

在较高层次上,API由一个客户端组成,提供对InfluxDB特定实例公开的各种API的访问。

使用 InfluxDBClient 来处理身份验证参数并连接到 InfluxDB。可以通过多种方式指定参数,下文将演示这些方法。

InfluxDB 基础知识

连接后,有三个 API 处理与 InfluxDB 的基本交互

  • WriteApi:将时序数据写入 InfluxDB

  • QueryApi:使用 Flux(InfluxDB 的函数式数据处理脚本语言)查询 InfluxDB

  • DeleteApi:从 InfluxDB 删除时序数据

任务与脚本

用户还可以使用客户端库来创建任务、可调用的脚本和标签

  • TasksApi:使用任务(计划好的 Flux 查询)输入数据流,然后根据需要进行数据分析和修改

  • InvokableScriptsApi:创建自定义 InfluxDB API 端点,用于查询、处理和塑造数据。想了解更多关于可调用脚本如何增强用户功能的信息,请查看这个 TL;DR 以获取更多细节!

InfluxDB 管理

最后,用户可以通过最后的 API 集直接管理他们的实例

  • BucketsApi:创建、管理、删除存储桶

  • OrganizationApi:创建、管理、删除组织

  • UsersApi:创建、管理、删除用户

  • LabelsApi:向 InfluxDB UI 中的仪表板、任务和其他项目添加视觉元数据

还可以查看 InfluxData Meet the Developer 视频,以获取更多关于使用这些 API 的指导步骤!

InfluxDBClient 配置

用户首先需要创建一个客户端以获取访问各种 API 的权限。客户端需要连接信息,包括以下内容

  1. URL: InfluxDB 实例的 URL(例如,http://192.168.100.10:8086),包括主机名或 IP 地址和端口。请注意,如果服务器上设置了安全 HTTP,则用户需要使用 https:// 协议。

  2. 访问令牌: 认证 InfluxDB 的访问令牌。如果使用 InfluxDB 1.8,则使用用户名和密码而不是令牌。使用 username:password 格式设置令牌参数。

  3. Org: 令牌可以访问的组织。在 InfluxDB 1.8 中,没有组织的概念。org 参数被忽略,可以留空。

上述连接信息可以通过文件、环境或代码进行指定。

通过配置文件

用户可以使用配置文件指定令牌,而不是在代码中硬编码令牌,并限制用户对配置文件的访问。

文件可以使用 tomlini 格式。下面是两种格式的示例

# TOML-based config
[influx2]
url = "https://127.0.0.1:8086"
org = "my-org"
token = "my-token"
; ini-based config
[influx2]
url = https://127.0.0.1:8086
org = my-org
token = my-token

用户还可以指定其他配置细节,如超时、代理设置和应用于数据的全局标签。查看完整的配置设置列表,包括新数据的默认标签。

然后在代码中,用户可以加载文件并创建客户端,如下所示

from influxdb_client import InfluxDBClient

with InfluxDBClient.from_config_file("config.toml") as client:
    # use the client to access the necessary APIs
    # for example, write data using the write_api
    with client.write_api() as writer:
        writer.write(bucket="testing", record="sensor temp=23.3")

通过环境变量

用户可以导出或设置以下任一环境变量

INFLUXDB_V2_URL="https://127.0.0.1:8086"
INFLUXDB_V2_ORG="my-org"
INFLUXDB_V2_TOKEN="my-token"

请参阅文档以获取完整的环境变量列表,包括为新数据设置默认标签的列表:受认可的环境变量

然后在代码中,用户可以创建客户端如下:

from influxdb_client import InfluxDBClient

with InfluxDBClient.from_env_properties() as client:
    # use the client to access the necessary APIs
    # for example, write data using the write_api
    with client.write_api() as writer:
        writer.write(bucket="testing", record="sensor temp=23.3")

通过代码

客户端库用户也可以在代码中提供必要的信息。此方法不建议使用,因为它会导致代码中存在硬编码的令牌。虽然开始使用很简单,但将凭据放在配置文件中是首选选项。

from influxdb_client import InfluxDBClient

url = "https://127.0.0.1:8086"
token = "my-token"
org = "my-org"

with InfluxDBClient(url, token) as client:
    # use the client to access the necessary APIs
    # for example, write data using the write_api
    with client.write_api() as writer:
        writer.write(bucket="testing", org=org, record="sensor temp=23.3")

请注意,使用文件或环境变量指定的配置设置了组织。该组织是查询、写入和删除API的默认组织。用户也可以指定不同的组织以覆盖查询、写入或删除时设置的值。

文档列出了创建客户端时可能的额外参数

使用WriteApi写入数据

创建客户端后,用户可以使用各种API。以下将演示写入查询API将数据发送到InfluxDB。

批量处理

默认情况下,客户端将尝试每秒发送1,000条数据的批量

from influxdb_client import InfluxDBClient

with InfluxDBClient.from_config_file("config.toml") as client:
    with client.write_api() as writer:
        writer.write(bucket="testing", record="sensor temp=23.3")

如果遇到错误,客户端将在五秒后重试,并在重试之间使用指数退避策略,最长可达125秒。重试尝试五次或等待180秒,以先到者为准。

用户可以在创建写入_api对象时通过设置write_options值来修改这些设置。基于时间的选项以毫秒为单位。

from influxdb_client import InfluxDBClient, WriteOptions

options = WriteOptions(
    batch_size=500,
    flush_interval=10_000,
    jitter_interval=2_000,
    retry_interval=5_000,
    max_retries=5,
    max_retry_delay=30_000,
    exponential_base=2
)

with InfluxDBClient.from_config_file("config.toml") as client:
    with client.write_api(write_options=options) as writer:
        writer.write(bucket="testing", record="sensor temp=23.3")

同步

虽然这不是写入数据的默认方法,但同步写入是建议的写入方法。这种方法更容易捕获错误并作出响应。此外,用户仍然可以手动或使用Rx等库将数据分成批量,以获得与批量写入类似的行为。

from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS

with InfluxDBClient.from_config_file("config.toml") as client:
    with client.write_api(write_options=SYNCHRONOUS) as writer:
        writer.write(bucket="testing", record="sensor temp=23.3")

异步

如果用户不希望应用程序在将数据发送到InfluxDB时被阻塞,则可以使用异步客户端和写入API。请注意,使用异步需要包含在influxdb-client[async]包中的额外依赖项以及特殊的异步客户端,该客户端可以访问不同的API。

import asyncio

from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync

async def main():
    async with InfluxDBClientAsync(
        url="https://127.0.0.1:8086", token="my-token", org="my-org"
    ) as client:
        await client.write_api().write(bucket="my-bucket", record="sensor temp=23.3")

if __name__ == "__main__":
    asyncio.run(main())

准备数据的不同方法

InfluxDB使用由测量名称、字段以及可选的标签和时间戳组成的行协议格式。客户端库允许以几种不同的方式指定数据,用户可以使用最适合导入数据格式的选项!

字符串

第一个选项是包含行协议格式的字符串。这演示了如果用户直接从文件读取influx行协议或在使用Python数据时构建数据字符串的选项之一。

records = """
cpu,core=0 temp=25.3 1657729063
cpu,core=0 temp=25.4 1657729078
cpu,core=0 temp=25.2 1657729093
"""

新行字符必须分隔行协议中的每个条目。如果条目没有使用\n分隔而位于同一行,则解析数据时将导致错误。

字典

第二个选项使用字典,该字典指定了行协议格式的各个部分。此选项可能最适合同时解析文件并构建数据点的用户。

records = [
    {
 "measurement": "cpu",
    	 "tags": {"core": "0"},
    	 "fields": {"temp": 25.3},
    	 "time": 1657729063
    },
    {
    	 "measurement": "cpu",
    	 "tags": {"core": "0"},
    	 "fields": {"temp": 25.4},
    	 "time": 1657729078
    },
    {
 "measurement": "cpu",
    	 "tags": {"core": "0"},
    	 "fields": {"temp": 25.2},
    	 "time": 1657729093
    },
]

点辅助类

客户端库有一个Point类,允许用户轻松地构建测量值。此类帮助用户将数据格式化为行协议的各个部分,确保正确序列化数据。标签和字段是可重复的,允许一次性添加许多标签和字段。

from influxdb_client import Point

records = [
	Point("cpu").tag("core", "0").field("temp", 25.3).time(1657729063),
	Point("cpu").tag("core", "0").field("temp", 25.4).time(1657729078),
	Point("cpu").tag("core", "0").field("temp", 25.2).time(1657729093),
]

Pandas DataFrame

如果安装了influxdb-client-python[extras]扩展包,则可以直接传递Pandas DataFrame。用户可以直接传递一个数据框并指定用作标签的列和测量名称。

import pandas as pd

from influxdb_client import InfluxDBClient

records = pd.DataFrame(
    data=[
        ["0", 25.3, 1657729063],
    	  ["0", 25.4, 1657729078],
    	  ["0", 25.2, 1657729093],
    ],
    columns=["core", "temp", "timestamp"],
)

with InfluxDBClient.from_config_file("config.toml") as client:
    with client.write_api() as writer:
        writer.write(
        	bucket="testing",
        	record=records,
        	data_frame_measurement_name="cpu",
        	data_frame_tag_columns=["core"],
        )

请注意,创建Pandas DataFrame的方法有很多,这只是其中一种示例。请参考Pandas的DataFrame文档获取更多示例。

数据类

利用Python的数据类的用户可以直接传入数据类,然后指定在传入数据时使用哪些属性作为标签、字段和时间戳。数据类首次在Python 3.7中通过PEP 557提供。

from dataclasses import dataclass

from influxdb_client import InfluxDBClient

@dataclass
class CPU:
    core: str
    temp: float
    timestamp: int

records = [
    CPU("0", 25.3, 1657729063),
    CPU("0", 25.4, 1657729078),
    CPU("0", 25.2, 1657729093),
]

with InfluxDBClient.from_config_file("config.toml") as client:
    with client.write_api() as writer:
        writer.write(
            bucket="testing",
            record=records,
            record_measurement_name="cpu",
            record_tag_keys=["core"],
            record_field_keys=["temp"],
            record_time_key="timestamp",
    	  )

命名元组

最后,命名元组为元组中的每个位置赋予了意义,使代码更易于阅读,具有自文档特性。用户可以直接传入命名元组,然后指定哪个元组字段名应用作标签、字段和时间戳。

from collections import namedtuple

from influxdb_client import InfluxDBClient

class CPU:
	def __init__(self, core, temp, timestamp):
    	self.core = core
    	self.temp = temp
    	self.timestamp = timestamp

record = namedtuple("CPU", ["core", "temp", "timestamp"])

records = [
	record("0", 25.3, 1657729063),
	record("0", 25.4, 1657729078),
	record("0", 25.2, 1657729093),
]

with InfluxDBClient.from_config_file("config.toml") as client:
	with client.write_api() as writer:
    	writer.write(
        	bucket="testing",
        	record=records,
        	record_measurement_name="cpu",
        	record_tag_keys=["core"],
        	record_field_keys=["temp"],
        	record_time_key="timestamp",
    	)

今天就来了解一下Python客户端库

本篇帖子展示了Python InfluxDB客户端库的使用是多么快速、简单和灵活。虽然上述内容仅演示了写入API,但它开始展示了用户与InfluxDB交互时的强大功能。结合其他API,用户拥有更多选项和潜力。

考虑您可能在哪里可以使用InfluxDB客户端库,并今天尝试一下!