使用Influx CLI和Python、Java客户端库将CSV数据导入InfluxDB
作者:Josh Powers / 产品,用例
2022年10月25日
导航到
随着数十亿设备和应用程序每纳秒产生时间序列数据,InfluxDB是存储和分析这些数据的主要方式。面对各种数据源,InfluxDB为用户提供了多种将数据导入InfluxDB的方式。其中最常见的数据格式是CSV,即逗号分隔值。
本博客文章展示了如何使用InfluxDB CLI和InfluxDB客户端库将CSV数据转换为行协议,并发送到InfluxDB。
什么是CSV和行协议?
CSV是用逗号分隔的值的数据。每行是一个不同的记录,记录由一个或多个字段组成。CSV数据在历史上用于导出数据或记录随时间推移的事件,这使得它非常适合像InfluxDB这样的时间序列数据库。
以下是CSV数据的基本示例。它以一个标题开始,提供了每列的名称,然后是每条记录在单独的行中
name,building,temperature,humidity,time
iot-devices,5a,72.3,34.1,2022-10-01T12:01:00Z
iot-devices,5a,72.1,33.8,2022-10-02T12:01:00Z
iot-devices,5a,72.2,33.7,2022-10-03T12:01:00Z
要将这些数据导入InfluxDB,以下工具展示了如何将这些行转换为行协议。行协议由以下项目组成
-
测量名称:要存储和稍后查询的数据名称
-
字段:将被查询的实际数据
-
标签:可选的字符串字段,用于索引和帮助过滤数据
-
时间戳:可选,但在CSV数据中很常见,用于指定数据记录的收集或产生时间
使用上面的CSV示例,目标状态是将数据转换为以下行协议
iot-devices,building=5a temperature=72.3,humidity=34.1 1664647260000000000
iot-devices,building=5a temperature=72.1,humidity=33.8 1664733660000000000
iot-devices,building=5a temperature=72.2,humidity=33.7 1664820060000000000
在上面的示例中,iot-devices
是测量名称,而building是一个标签。温度和湿度值是我们的字段。最后,时间戳以纳秒精度的UNIX时间戳保存。
Influx CLI
Influx CLI工具提供用于管理和与InfluxDB交互的命令。使用此工具,用户可以设置、配置和与InfluxDB的许多功能交互。从设置新桶和org到查询数据,甚至推送数据,CLI都可以做到。
用户可以使用的一个子命令是write
命令。Write允许用户从带注释的CSV直接将数据注入InfluxDB。
CSV注释
注释可以是CSV文件本身的属性,也可以作为CLI选项提供。它们描述了如何将每个列转换为测量名称、标签、字段或时间戳。
以下是如何将注释添加到我们的示例数据到文件中的示例
#datatype measurement,tag,double,double,dateTime:RFC3339
name,building,temperature,humidity,time
iot-devices,5a,72.3,34.1,2022-10-01T12:01:00Z
iot-devices,5a,72.1,33.8,2022-10-02T12:01:00Z
iot-devices,5a,72.2,33.7,2022-10-03T12:01:00Z
本示例中的数据类型指定如下
-
measurement
:指定要使用哪一列作为测量名称。如果没有列存在,也可以通过CLI指定为标题。 -
tag
:指定哪些列或列被处理为字符串标签数据。这些是可选的,但有助于在InfluxDB中查询和索引数据。 -
double
:用于两个列,指定它们包含双精度浮点数数据类型。 -
dateTime
:指定最后一列包含记录的时间戳,并进一步说明使用的格式是RFC3339。
用户还可以指定字段的附加数据类型。
-
double
-
long
-
unsignedLong
-
boolean
-
string
-
ignored
:如果某一列无意义或不需要,可以使用它,这样就不会将该列包括在最终数据中。
最后,对于时间戳,有内置的解析能力,包括
-
RFC3339(例如
2020-01-01T00:00:00Z
) -
RFC3339Nano(例如
2020-01-01T00:00:00.000000000Z
) -
Unix时间戳(例如
1577836800000000000
)。
如果时间戳不是这些格式之一,那么用户需要自行指定时间戳的格式(例如dateTime:2006-01-02
),作为注释的一部分,使用Go参考时间。
CLI示例
一旦存在注释,就可以使用CLI将数据发送到InfluxDB。下面是发送包含在文件中的数据的示例
influx write --bucket bucketName --format=csv --file /path/to/data.csv
如果CSV本身没有注释,则可以在CLI命令中添加它们
influx write --bucket bucketName --format=csv --file /path/to/data.csv \
--header “#datatype measurement,tag,double,double,dateTime:RFC3339”
最后,如果CSV文件没有与测量名称相关的列,也可以将其包括在标题中
influx write --bucket bucketName --format=csv --file /path/to/data.csv \
--header “#constant measurement,iot-devices”
--header “#datatype tag,double,double,dateTime:RFC3339”
要开始使用Influx CLI工具,请访问文档网站,用户可以找到安装和开始使用它的步骤。查看将CSV数据写入InfluxDB的文档以获取更多详细信息和方法。这包括带有跳过标题行、不同编码和错误处理的示例。此外,请参阅这篇之前的博客文章,了解注释CSV以及如何使用Flux查询直接写入数据。
Influx CLI提供了一个简单快捷的入门方式,但如果有用户文件很大、未注释或需要在推送到InfluxDB之前进行一些预处理,该怎么办?在这些情况下,用户应该查看InfluxDB客户端库。
InfluxDB客户端库
InfluxDB客户端库提供了特定于语言的包,以快速与InfluxDB v2 API交互。这允许用户使用他们选择的编程语言快速、轻松地创建、处理和打包数据,然后将其发送到InfluxDB。库在许多语言中可用,包括Python、JavaScript、Go、C#、Java和其他许多语言。
以下提供了使用Python和Java解析CSV数据并将数据发送到InfluxDB的两个示例。
Python + Pandas
Python 编程语言让很多人能够轻松学习和开始编程。《Pandas》是一个 Python 数据分析库,它可以快速、高效地处理和分析数据。这两个工具结合使用,可以轻松地将数据发送到 InfluxDB,使用 InfluxDB 客户端库。
如果用户有一个非常大的 CSV 文件或多个文件需要推送到 InfluxDB,Pandas 提供了一种快速读取带有标题的 CSV 文件的方法。结合 InfluxDB 客户端库内置的功能来写入 Pandas DataFrames,用户可以分块读取 CSV 文件,然后将这些块发送到 InfluxDB。
以下示例中,用户正在读取一个包含数千行 VIX 股票数据的 CSV 文件。
symbol,open,high,low,close,timestamp
vix,13.290000,13.910000,13.290000,13.570000,135935640000000000
vix,13.870000,13.880000,13.040000,13.310000,135944280000000000
vix,13.640000,14.330000,13.600000,14.320000,135952920000000000
为了避免将整个文件读入内存,用户可以利用 Pandas 的 read_csv
函数,该函数将根据 CSV 标题读取列名并将文件分成每 1,000 行的块。最后,使用 InfluxDB 客户端库将那些 1,000 行的组发送到 InfluxDB,在指定度量、标签和时间戳列之后。
from influxdb_client import InfluxDBClient, WriteOptions
import pandas as pd
with InfluxDBClient.from_env_properties() as client:
for df in pd.read_csv("vix.csv", chunksize=1_000):
with client.write_api() as write_api:
try:
write_api.write(
record=df,
bucket="my-bucket",
data_frame_measurement_name="stocks",
data_frame_tag_columns=["symbol"],
data_frame_timestamp_column="date",
)
except InfluxDBError as e:
print(e)
Java
Java 编程语言在从 Android 设备到企业应用的各种来源中得到广泛应用。Java 用户可以使用 opencsv 来快速开始 CSV 数据解析。
此示例使用了一个简单的 Java 对象(简称 POJO)以及注解来告诉 opencsv 哪些 CSV 列属于哪些对象变量,以及 InfluxDB 客户端库对于类来说测量名称应该是什么,以及哪些变量应该成为标签、字段或时间戳。
@Measurement(name = "stock")
public class StockData {
@Column(tag = true)
@CsvBindByName(column = "symbol")
private String symbol;
@Column
@CsvBindByName(column = "open")
private String open;
@Column
@CsvBindByName(column = "high")
private String high;
@Column
@CsvBindByName(column = "low")
private String low;
@Column
@CsvBindByName(column = "close")
private String close;
@Column(timestamp = true)
@CsvBindByName(column = "timestamp")
private String timestamp;
}
然后用户可以遍历 CSV 文件并为每一行创建一个 StockData
对象。如果需要,这个变量可以在发送到 InfluxDB 之前进行操作。
InfluxDBClient influxDBClient = InfluxDBClientFactory.create();
WriteApi writeApi = influxDBClient.getWriteApi();
FileReader reader = new FileReader("vix.csv");
CsvToBean"StockData" csvToBean = new CsvToBeanBuilder(reader)
.withType(StockData.class)
.build();
Iterator"StockData" stockIterator = csvToBean.iterator();
while (stockIterator.hasNext()) {
StockData data = stockIterator.next();
writeApi.writeMeasurement(WritePrecision.S, data);
}
influxDBClient.close();
今天查看 InfluxDB CLI 和客户端库
本文展示了 InfluxDB 客户端库使用起来多么快捷、方便和灵活。虽然上面的示例只展示了 CSV 数据,但它开始展示了用户向 InfluxDB 发送数据时可以拥有的巨大力量。结合其他 API,用户有更多的选项和潜力。