使用 Influx CLI 以及 Python 和 Java 客户端库将 CSV 数据导入 InfluxDB

导航至

随着数十亿的设备和应用程序每纳秒产生时间序列数据,InfluxDB 是存储和分析这些数据的领先方式。凭借数据源的巨大多样性,InfluxDB 为用户提供了多种将数据导入 InfluxDB 的方法。这种数据最常见的数据格式之一是 CSV,即逗号分隔值。

这篇博文演示了如何获取 CSV 数据,将其转换为行协议,并使用 InfluxDB CLI 和 InfluxDB 客户端库将其发送到 InfluxDB。

什么是 CSV 和行协议?

CSV 是由逗号分隔值的数据。每一行都是不同的记录,其中记录由一个或多个字段组成。CSV 数据在历史上用于导出数据或记录随时间发生的事件,这使其非常适合像 InfluxDB 这样的时间序列数据库。

这是一个 CSV 数据的基本示例。它以提供每列名称的标题开头,然后是每行中的每个记录

name,building,temperature,humidity,time
iot-devices,5a,72.3,34.1,2022-10-01T12:01:00Z
iot-devices,5a,72.1,33.8,2022-10-02T12:01:00Z
iot-devices,5a,72.2,33.7,2022-10-03T12:01:00Z

为了将这些数据导入 InfluxDB,以下工具展示了如何将这些行转换为行协议。行协议包括以下项

  • 测量名称:要存储和稍后查询的数据的名称

  • 字段:将要查询的实际数据

  • 标签:可选的字符串字段,用于索引和帮助过滤数据

  • 时间戳:可选,但在 CSV 数据中非常常见,用于指定数据记录何时收集或生成

使用上面的 CSV 示例,目标状态是将数据转换为类似以下行协议的内容

iot-devices,building=5a temperature=72.3,humidity=34.1 1664647260000000000
iot-devices,building=5a temperature=72.1,humidity=33.8 1664733660000000000
iot-devices,building=5a temperature=72.2,humidity=33.7 1664820060000000000

在上面,iot-devices 是测量名称,building 是标签。温度和湿度值是我们的字段。最后,时间戳以纳秒精度 UNIX 时间戳保存。

Influx CLI

Influx CLI 工具提供了管理和与 InfluxDB 交互的命令。使用此工具,用户可以设置、配置和与 InfluxDB 的许多功能进行交互。从设置新的存储桶和组织到查询数据,甚至推送数据,CLI 都可以做到。

用户可以使用的子命令之一是 write 命令。Write 允许用户直接从 带注释的 CSV 将数据注入到 InfluxDB 中。

CSV 注释

注释,无论是在 CSV 文件本身中还是作为 CLI 选项提供,都是 CSV 文件中列的属性。它们描述了如何将每列转换为测量名称、标签、字段或时间戳。

以下演示了如何将注释添加到我们的示例数据文件中

#datatype measurement,tag,double,double,dateTime:RFC3339
name,building,temperature,humidity,time
iot-devices,5a,72.3,34.1,2022-10-01T12:01:00Z
iot-devices,5a,72.1,33.8,2022-10-02T12:01:00Z
iot-devices,5a,72.2,33.7,2022-10-03T12:01:00Z

本示例中的数据类型指定如下

  • measurement:指示要用作测量名称的列。如果不存在列,也可以通过 CLI 将其指定为标头。

  • tag:指定要作为字符串标签数据处理的列。这些是可选的,但有助于查询和索引 InfluxDB 中的数据。

  • double:用于两列以指定它们包含双精度数据类型。

  • dateTime:指定最后一列包含记录的时间戳,并进一步说明使用的格式是 RFC3339。

用户还可以为字段指定其他数据类型

  • double

  • long

  • unsignedLong

  • boolean

  • string

  • ignored:如果列无用或不需要,则使用此选项,这将不会将该列包含在最终数据中

最后,对于时间戳,内置了以下格式的解析功能

  • RFC3339(例如 2020-01-01T00:00:00Z

  • RFC3339Nano(例如 2020-01-01T00:00:00.000000000Z

  • Unix 时间戳(例如 1577836800000000000)。

如果时间戳不是这些格式之一,则用户需要使用 Go 参考时间 将时间戳的格式指定为注释的一部分(例如 dateTime:2006-01-02)。

CLI 示例

注释存在后,就可以使用 CLI 将数据发送到 InfluxDB。以下是发送文件中包含的数据的示例

influx write --bucket bucketName --format=csv --file /path/to/data.csv

如果 CSV 本身没有注释,则用户可以将它们作为 CLI 命令的一部分添加

influx write --bucket bucketName --format=csv --file /path/to/data.csv \
    --header “#datatype measurement,tag,double,double,dateTime:RFC3339”

最后,如果 CSV 文件中没有与测量名称相关的列,也可以将其作为标头包含在内

influx write --bucket bucketName --format=csv --file /path/to/data.csv \
    --header “#constant measurement,iot-devices”
    --header “#datatype tag,double,double,dateTime:RFC3339”

要开始使用 Influx CLI 工具,请访问文档站点,用户可以在其中找到安装和入门步骤。查看将 CSV 数据写入 InfluxDB文档以获取更多详细信息和示例。这包括带有跳过标头行、不同编码和错误处理的示例。此外,请参阅这篇之前的博文,以了解有关带注释的 CSV 以及如何使用 Flux 查询直接写入数据的更多信息。

Influx CLI 提供了一种简单快速的入门方法,但如果用户的文件更大、未注释或需要在推送到 InfluxDB 之前对其进行一些预处理,该怎么办?在这些情况下,用户应该查看 InfluxDB 客户端库。

InfluxDB 客户端库

InfluxDB 客户端库提供了特定于语言的软件包,可以快速与 InfluxDB v2 API 交互。这允许用户使用他们选择的编程语言快速轻松地创建、处理和打包数据,然后将其发送到 InfluxDB。这些库以多种语言提供,包括 PythonJavaScript、Go、C#、Java 和许多其他语言。

以下提供了两个使用 Python 和 Java 解析 CSV 数据,然后将该数据发送到 InfluxDB 的示例。

Python + Pandas

Python 编程语言使许多人能够轻松学习和开始编程。Pandas 是一个 Python 数据分析库,它是一个快速而强大的数据分析和操作工具。两者结合在一起,构成了一个强大的组合,可以轻松处理数据,并使用 InfluxDB 客户端库将其发送到 InfluxDB。

如果用户有非常大的 CSV 文件或他们想要推送到 InfluxDB 的文件,Pandas 提供了一种使用标头快速读取 CSV 文件的简便方法。结合 InfluxDB 客户端库写入 Pandas DataFrames 的内置功能,用户可以分块读取 CSV,然后将这些块发送到 InfluxDB。

在以下示例中,用户正在读取包含数千行 VIX 股票数据的 CSV

symbol,open,high,low,close,timestamp
vix,13.290000,13.910000,13.290000,13.570000,135935640000000000
vix,13.870000,13.880000,13.040000,13.310000,135944280000000000
vix,13.640000,14.330000,13.600000,14.320000,135952920000000000

为了避免将整个文件读入内存,用户可以利用 Pandas 的 read_csv 函数,该函数将根据 CSV 标头读取列名,并将文件分块为 1,000 行的块。最后,使用 InfluxDB 客户端库在指定测量、标签和时间戳列后,将这些 1,000 行的组发送到 InfluxDB

from influxdb_client import InfluxDBClient, WriteOptions
import pandas as pd

with InfluxDBClient.from_env_properties() as client:
    for df in pd.read_csv("vix.csv", chunksize=1_000):
        with client.write_api() as write_api:
            try:
                write_api.write(
                    record=df,
                    bucket="my-bucket",
                    data_frame_measurement_name="stocks",
                    data_frame_tag_columns=["symbol"],
                    data_frame_timestamp_column="date",
                )
            except InfluxDBError as e:
                print(e)

Java

Java 编程语言广泛应用于从 Android 设备到企业应用程序的各种来源。Java 用户可以查看 opencsv 以快速开始 CSV 数据解析。

此示例使用了普通的旧 Java 对象(又名 POJO)以及注释,以告知 opencsv 哪些 CSV 列属于哪个对象变量,以及 InfluxDB 客户端库该类的测量名称应该是什么,以及哪些变量应该成为标签、字段或时间戳。

@Measurement(name = "stock")
public class StockData {
    @Column(tag = true)
    @CsvBindByName(column = "symbol")
    private String symbol;

    @Column
    @CsvBindByName(column = "open")
    private String open;

    @Column
    @CsvBindByName(column = "high")
    private String high;

    @Column
    @CsvBindByName(column = "low")
    private String low;

    @Column
    @CsvBindByName(column = "close")
    private String close;

    @Column(timestamp = true)
    @CsvBindByName(column = "timestamp")
    private String timestamp;
}

然后,用户可以迭代 CSV 文件并为每一行创建一个 StockData 对象。如果需要,可以在将此变量发送到 InfluxDB 之前对其进行操作

InfluxDBClient influxDBClient = InfluxDBClientFactory.create();
WriteApi writeApi = influxDBClient.getWriteApi();

FileReader reader = new FileReader("vix.csv");
CsvToBean"StockData" csvToBean = new CsvToBeanBuilder(reader)
	.withType(StockData.class)
	.build();

Iterator"StockData" stockIterator = csvToBean.iterator();
while (stockIterator.hasNext()) {
	StockData data = stockIterator.next();
	writeApi.writeMeasurement(WritePrecision.S, data);
}

influxDBClient.close();

立即查看 InfluxDB CLI 和客户端库

这篇文章展示了 InfluxDB 客户端库的使用是多么快速、简单和灵活。虽然上面仅演示了 CSV 数据,但它开始演示用户在将数据发送到 InfluxDB 时可以拥有的强大功能。结合其他 API,用户拥有更多选择和潜力。

考虑一下您可以在哪里使用 InfluxDB、Influx CLI 和 客户端库,并立即试用它们!