使用Influx CLI和Python、Java客户端库将CSV数据导入InfluxDB

导航到

随着数十亿设备和应用程序每纳秒产生时间序列数据,InfluxDB是存储和分析这些数据的主要方式。面对各种数据源,InfluxDB为用户提供了多种将数据导入InfluxDB的方式。其中最常见的数据格式是CSV,即逗号分隔值。

本博客文章展示了如何使用InfluxDB CLI和InfluxDB客户端库将CSV数据转换为行协议,并发送到InfluxDB。

什么是CSV和行协议?

CSV是用逗号分隔的值的数据。每行是一个不同的记录,记录由一个或多个字段组成。CSV数据在历史上用于导出数据或记录随时间推移的事件,这使得它非常适合像InfluxDB这样的时间序列数据库。

以下是CSV数据的基本示例。它以一个标题开始,提供了每列的名称,然后是每条记录在单独的行中

name,building,temperature,humidity,time
iot-devices,5a,72.3,34.1,2022-10-01T12:01:00Z
iot-devices,5a,72.1,33.8,2022-10-02T12:01:00Z
iot-devices,5a,72.2,33.7,2022-10-03T12:01:00Z

要将这些数据导入InfluxDB,以下工具展示了如何将这些行转换为行协议。行协议由以下项目组成

  • 测量名称:要存储和稍后查询的数据名称

  • 字段:将被查询的实际数据

  • 标签:可选的字符串字段,用于索引和帮助过滤数据

  • 时间戳:可选,但在CSV数据中很常见,用于指定数据记录的收集或产生时间

使用上面的CSV示例,目标状态是将数据转换为以下行协议

iot-devices,building=5a temperature=72.3,humidity=34.1 1664647260000000000
iot-devices,building=5a temperature=72.1,humidity=33.8 1664733660000000000
iot-devices,building=5a temperature=72.2,humidity=33.7 1664820060000000000

在上面的示例中,iot-devices是测量名称,而building是一个标签。温度和湿度值是我们的字段。最后,时间戳以纳秒精度的UNIX时间戳保存。

Influx CLI

Influx CLI工具提供用于管理和与InfluxDB交互的命令。使用此工具,用户可以设置、配置和与InfluxDB的许多功能交互。从设置新桶和org到查询数据,甚至推送数据,CLI都可以做到。

用户可以使用的一个子命令是write命令。Write允许用户从带注释的CSV直接将数据注入InfluxDB。

CSV注释

注释可以是CSV文件本身的属性,也可以作为CLI选项提供。它们描述了如何将每个列转换为测量名称、标签、字段或时间戳。

以下是如何将注释添加到我们的示例数据到文件中的示例

#datatype measurement,tag,double,double,dateTime:RFC3339
name,building,temperature,humidity,time
iot-devices,5a,72.3,34.1,2022-10-01T12:01:00Z
iot-devices,5a,72.1,33.8,2022-10-02T12:01:00Z
iot-devices,5a,72.2,33.7,2022-10-03T12:01:00Z

本示例中的数据类型指定如下

  • measurement:指定要使用哪一列作为测量名称。如果没有列存在,也可以通过CLI指定为标题。

  • tag:指定哪些列或列被处理为字符串标签数据。这些是可选的,但有助于在InfluxDB中查询和索引数据。

  • double:用于两个列,指定它们包含双精度浮点数数据类型。

  • dateTime:指定最后一列包含记录的时间戳,并进一步说明使用的格式是RFC3339。

用户还可以指定字段的附加数据类型

  • double

  • long

  • unsignedLong

  • boolean

  • string

  • ignored:如果某一列无意义或不需要,可以使用它,这样就不会将该列包括在最终数据中。

最后,对于时间戳,有内置的解析能力,包括

  • RFC3339(例如2020-01-01T00:00:00Z

  • RFC3339Nano(例如2020-01-01T00:00:00.000000000Z

  • Unix时间戳(例如1577836800000000000)。

如果时间戳不是这些格式之一,那么用户需要自行指定时间戳的格式(例如dateTime:2006-01-02),作为注释的一部分,使用Go参考时间

CLI示例

一旦存在注释,就可以使用CLI将数据发送到InfluxDB。下面是发送包含在文件中的数据的示例

influx write --bucket bucketName --format=csv --file /path/to/data.csv

如果CSV本身没有注释,则可以在CLI命令中添加它们

influx write --bucket bucketName --format=csv --file /path/to/data.csv \
    --header “#datatype measurement,tag,double,double,dateTime:RFC3339”

最后,如果CSV文件没有与测量名称相关的列,也可以将其包括在标题中

influx write --bucket bucketName --format=csv --file /path/to/data.csv \
    --header “#constant measurement,iot-devices”
    --header “#datatype tag,double,double,dateTime:RFC3339”

要开始使用Influx CLI工具,请访问文档网站,用户可以找到安装和开始使用它的步骤。查看将CSV数据写入InfluxDB的文档以获取更多详细信息和方法。这包括带有跳过标题行、不同编码和错误处理的示例。此外,请参阅这篇之前的博客文章,了解注释CSV以及如何使用Flux查询直接写入数据。

Influx CLI提供了一个简单快捷的入门方式,但如果有用户文件很大、未注释或需要在推送到InfluxDB之前进行一些预处理,该怎么办?在这些情况下,用户应该查看InfluxDB客户端库。

InfluxDB客户端库

InfluxDB客户端库提供了特定于语言的包,以快速与InfluxDB v2 API交互。这允许用户使用他们选择的编程语言快速、轻松地创建、处理和打包数据,然后将其发送到InfluxDB。库在许多语言中可用,包括PythonJavaScript、Go、C#、Java和其他许多语言。

以下提供了使用Python和Java解析CSV数据并将数据发送到InfluxDB的两个示例。

Python + Pandas

Python 编程语言让很多人能够轻松学习和开始编程。《Pandas》是一个 Python 数据分析库,它可以快速、高效地处理和分析数据。这两个工具结合使用,可以轻松地将数据发送到 InfluxDB,使用 InfluxDB 客户端库。

如果用户有一个非常大的 CSV 文件或多个文件需要推送到 InfluxDB,Pandas 提供了一种快速读取带有标题的 CSV 文件的方法。结合 InfluxDB 客户端库内置的功能来写入 Pandas DataFrames,用户可以分块读取 CSV 文件,然后将这些块发送到 InfluxDB。

以下示例中,用户正在读取一个包含数千行 VIX 股票数据的 CSV 文件。

symbol,open,high,low,close,timestamp
vix,13.290000,13.910000,13.290000,13.570000,135935640000000000
vix,13.870000,13.880000,13.040000,13.310000,135944280000000000
vix,13.640000,14.330000,13.600000,14.320000,135952920000000000

为了避免将整个文件读入内存,用户可以利用 Pandas 的 read_csv 函数,该函数将根据 CSV 标题读取列名并将文件分成每 1,000 行的块。最后,使用 InfluxDB 客户端库将那些 1,000 行的组发送到 InfluxDB,在指定度量、标签和时间戳列之后。

from influxdb_client import InfluxDBClient, WriteOptions
import pandas as pd

with InfluxDBClient.from_env_properties() as client:
    for df in pd.read_csv("vix.csv", chunksize=1_000):
        with client.write_api() as write_api:
            try:
                write_api.write(
                    record=df,
                    bucket="my-bucket",
                    data_frame_measurement_name="stocks",
                    data_frame_tag_columns=["symbol"],
                    data_frame_timestamp_column="date",
                )
            except InfluxDBError as e:
                print(e)

Java

Java 编程语言在从 Android 设备到企业应用的各种来源中得到广泛应用。Java 用户可以使用 opencsv 来快速开始 CSV 数据解析。

此示例使用了一个简单的 Java 对象(简称 POJO)以及注解来告诉 opencsv 哪些 CSV 列属于哪些对象变量,以及 InfluxDB 客户端库对于类来说测量名称应该是什么,以及哪些变量应该成为标签、字段或时间戳。

@Measurement(name = "stock")
public class StockData {
    @Column(tag = true)
    @CsvBindByName(column = "symbol")
    private String symbol;

    @Column
    @CsvBindByName(column = "open")
    private String open;

    @Column
    @CsvBindByName(column = "high")
    private String high;

    @Column
    @CsvBindByName(column = "low")
    private String low;

    @Column
    @CsvBindByName(column = "close")
    private String close;

    @Column(timestamp = true)
    @CsvBindByName(column = "timestamp")
    private String timestamp;
}

然后用户可以遍历 CSV 文件并为每一行创建一个 StockData 对象。如果需要,这个变量可以在发送到 InfluxDB 之前进行操作。

InfluxDBClient influxDBClient = InfluxDBClientFactory.create();
WriteApi writeApi = influxDBClient.getWriteApi();

FileReader reader = new FileReader("vix.csv");
CsvToBean"StockData" csvToBean = new CsvToBeanBuilder(reader)
	.withType(StockData.class)
	.build();

Iterator"StockData" stockIterator = csvToBean.iterator();
while (stockIterator.hasNext()) {
	StockData data = stockIterator.next();
	writeApi.writeMeasurement(WritePrecision.S, data);
}

influxDBClient.close();

今天查看 InfluxDB CLI 和客户端库

本文展示了 InfluxDB 客户端库使用起来多么快捷、方便和灵活。虽然上面的示例只展示了 CSV 数据,但它开始展示了用户向 InfluxDB 发送数据时可以拥有的巨大力量。结合其他 API,用户有更多的选项和潜力。

考虑您可能在哪里可以使用 InfluxDB、Influx CLI 以及 客户端库,并今天尝试它们!