Java和InfluxDB入门

导航到

本文由Reshma Sathe撰写。向下滚动查看作者的图片和简介。

时间序列数据正变得越来越重要,从物联网设备的传感器到金融处理。从这些来源收集的数据可以帮助进行销售预测,并在市场营销和财务规划方面做出明智的决策。在本文中,您将了解目前最有效的时间序列数据库之一——InfluxDB,并探讨如何使用Java与InfluxDB一起工作。

时间序列基础

根据统计学家的说法,时间序列是一组通过时间重复测量获得的数据项的观察结果,这些数据项定义明确。这些数据帮助我们分析和跟踪数据随时间的变化。时间序列数据的一些来源包括物联网设备传感器、自动驾驶汽车、温度变化、股市价格,甚至每日上升和下降的新冠肺炎病例。

为了了解为什么时间序列数据很重要,考虑以下示例

传统上,组织使用平均每日温度(MDT)指标来跟踪特定地点一段时间内温度的变化。对于特定地点,白天和晚上的温度或导致温度变化的环境因素可能差异很大,但MDT可能只略有变化。因此,MDT无法准确反映一天中温度的变化。相反,使用时间序列数据可以提供如何每小时以及在不同条件下温度变化的确切画面。它提供了环境因素,如降水、云层覆盖、风速等导致温度变化的一个清晰画面。所有这些信息都有助于组织更好地在其位置建模和优化能源利用。

时间序列数据在提供深入而富有意义的见解和建立模式方面非常宝贵,但建立模式所需的数据量很大,需要快速可靠的数据存储和检索进行分析。这就是时间序列数据库(TSDB)的作用所在。

时间序列数据库(TSDB)

时间序列数据库专为处理时间序列数据而设计。虽然可以使用“普通”数据库处理时间序列数据,但TSDB因其可扩展性和易用性而更受欢迎。

  • 可扩展性:时间序列数据倾向于呈指数增长。大多数非TSDB未优化以处理这种规模的变化,其性能下降,严重影响了分析速度和整体应用速度。然而,TSDB已针对基于时间戳的数据进行了优化,从而允许对时间序列数据进行闪电般的插入和检索查询。
  • 易用性:由于TSDB已针对处理时间序列数据进行了优化,因此它们包含许多内置功能,如时间聚合、连续查询、灵活的保留策略等。TSDB简化了趋势分析,并具有更多选项。

以下是开发者青睐时间序列数据库(TSDB)的主要原因。事实上,根据DB Engines的数据,TSDB是目前数据库中增长最快的部分。

DB-Engines ranking by database model - last 24 months

为什么选择InfluxDB?

InfluxDB是由InfluxData公司开发的时间序列数据库。它支持数据的收集、存储、监控和可视化,并提供时间序列数据警报。InfluxDB支持微秒和纳秒级精度,使其成为科学和金融分析的理想选择。

它拥有出色的文档,包含所有功能的语法和示例。InfluxDB 2.0的最新版本提供了对整个Telegraf、InfluxDB、Chronograf和Kapacitor(TICK)堆栈的支持。

InfluxDB附带Flux语言,与SQL不同,它是一种函数式语言,使其更加冗长、可读、可组合且易于测试。其创始人Paul Dix给出了这样的原因来解释其创建

“我不想生活在一个人类能想出的最佳数据处理语言是在70年代发明的世界里。”

由于InfluxDB支持TICK堆栈,UI附带“警报”和“仪表板”功能。还有一个集成的查询构建器,可以减少开发者的工作量。

InfluxDB客户端库

InfluxDB支持与Java、Kotlin、Scala、Python等许多编程语言连接。要专门连接Java,请使用InfluxDB客户端Java库。此客户端库取代了早期的InfluxDB Java库。完整文档可在此处找到。

influxdb-client-java需要Java版本大于8.0和InfluxDB版本2.0。在您可以使用客户端之前,您需要安装InfluxDB实例。设置InfluxDB的说明可以在其文档中找到。

假设InfluxDB已安装在您的系统上并设置完成,将InfluxDB客户端包含到Java应用程序中最简单的方法是使用Maven。Maven依赖项如下

<dependency>

<groupId>com.influxdb</groupId>

<artifactId>influxdb-client-java</artifactId>

<version>3.3.0</version>

</dependency>

最新的Maven依赖项可在Maven仓库中找到。

InfluxDB设置并运行实例后,您可以在UI上创建用户、桶和组织。或者,您也可以从客户端使用管理API

示例应用程序

为了更好地了解如何使用Influx Java客户端,请查看以下示例应用程序。该示例应用程序有两个文件

  1. App.java是主应用程序,执行所有调用。
  2. InfluxDBConnection.java执行所有InfluxDB调用,并将令牌、桶和组织作为类成员。

应用程序代码可在GitHub上找到。

建立连接

要使用Influx客户端创建连接,请确保您的系统上正在运行InfluxDB实例。

在Java应用程序中,使用InfluxDBClientFactory类的create()方法建立连接。您需要为此方法提供URL、组织名称、桶和令牌。

要生成令牌,请访问您的InfluxDB实例的UI并使用您的用户ID登录。为了更好地了解如何使用InfluxDB UI,请参阅文档中的通过UI设置InfluxDB部分。

然后导航到数据表并点击令牌。在这里,您可以为您自己的用户生成令牌。生成的令牌用于连接到InfluxDB实例。

IiInfluxDB - generate token

InfluxDB建议至少有一个提供管理员权限的“全部访问”令牌。

InfluxDB - admin token

在示例应用程序中,您可以在InfluxDBConnectionClass中按如下方式建立连接

public InfluxDBClient buildConnection(String url, String token, String bucket, String org) {

setToken(token);

setBucket(bucket);

setOrg(org);

setUrl(url);

return InfluxDBClientFactory.create(getUrl(), getToken().toCharArray(), getOrg(), getBucket());

}

插入数据

InfluxDB客户端可以是同步阻塞API或异步阻塞API。对于同步阻塞API,InfluxDB客户端提供WriteApiBlocking API。

使用WriteApiBlocking,您可以执行以下操作

  • 单点数据插入
  • 多点插入
  • 使用POJO插入

要初始化WriteApiBlocking,请使用以下命令

WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();

接下来,您可以创建一个单点数据并将其插入到InfluxDB

Point point = Point.measurement("sensor").addTag("sensor_id", "TLM0100").addField("location", "Main Lobby")

.addField("model_number", "TLM89092A")

.time(Instant.now(), WritePrecision.MS);

writeApi.writePoint(point);

您也可以构建多个点的列表并将其插入到InfluxDB

WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();

Point point1 = Point.measurement("sensor").addTag("sensor_id", "TLM0103")

.addField("location", "Mechanical Room").addField("model_number", "TLM90012Z")

.time(Instant.now(), WritePrecision.MS);

Point point2 = Point.measurement("sensor").addTag("sensor_id", "TLM0200")

.addField("location", "Conference Room").addField("model_number", "TLM89092B")

.time(Instant.now(), WritePrecision.MS);

Point point3 = Point.measurement("sensor").addTag("sensor_id", "TLM0201").addField("location", "Room 390")

.addField("model_number", "TLM89102B")

.time(Instant.now(), WritePrecision.MS);

List<Point> listPoint = new ArrayList<Point>();

listPoint.add(point1);

listPoint.add(point2);

listPoint.add(point3);

writeApi.writePoints(listPoint);

最后,您还可以使用POJO插入数据

WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();

Sensor sensor = new Sensor();

sensor.sensor_id = "TLM0101";

sensor.location = "Room 101";

sensor.model_number = "TLM89092A";

sensor.last_inspected = Instant.now();

writeApi.writeMeasurement(WritePrecision.MS, sensor);

flag = true;

度量是POJO,需要将其定义为类

@Measurement(name = "sensor")

private static class Sensor {

@Column(tag = true)

String sensor_id;

@Column

String location;

@Column

String model_number;

@Column(timestamp = true)

Instant last_inspected;

}

查询数据

InfluxDB提供Flux用于查询数据。要查询数据库,请使用Query API和查询方法检索数据。在示例应用程序中,您将根据传感器ID检索传感器数据。

查询数据的示例还有很多;所有这些可能性都在查询API文档中进行了讨论。

如果您想获取所有记录,则范围应从0开始。在Flux中,具有特定传感器ID的查询如下所示

String flux = String.format( "from(bucket:\"%s\") |> range(start:0) |> filter(fn: (r) => r[\"_measurement\"] == \"sensor\") |> filter(fn: (r) => r[\"sensor_id\"] == \"TLM0100\"or r[\"sensor_id\"] == \"TLM0101\" or r[\"sensor_id\"] == \"TLM0103\" or r[\"sensor_id\"] == \"TLM0200\") |> sort() |> yield(name: \"sort\")", getBucket());

要触发上述Flux查询,influxdb-client提供包含query 方法的QueryApi

QueryApi queryApi = influxDBClient.getQueryApi();

List<FluxTable> tables = queryApi.query(flux);

for (FluxTable fluxTable : tables) {

List<FluxRecord> records = fluxTable.getRecords();

for (FluxRecord fluxRecord : records) {

System.out.println(fluxRecord.getValueByKey("sensor_id"));

}

}

删除数据点

要从InfluxDB桶中删除数据,请使用DeleteAPI。一个绑定条件是删除查询应至少有一个时间戳。如果您没有提及谓词,InfluxDB将删除测量(表)中的所有数据。

要删除特定数据,请使用以下查询

DeleteApi deleteApi = influxDBClient.getDeleteApi();

try {

OffsetDateTime start = OffsetDateTime.now().minus(72, ChronoUnit.HOURS);

OffsetDateTime stop = OffsetDateTime.now();

String predicate = "_measurement=\"sensor\" AND sensor_id = \"TLM0201\"";

deleteApi.delete(start, stop, predicate,getBucket(), getOrg());

flag = true;

}

结论

如您所见,时间序列数据对于更好的分析和获取有意义的见解至关重要。随着物联网设备、自动驾驶汽车和其他更高效系统的出现,对时间序列数据的需求和范围正在增加。

像InfluxDB这样的专门设计和优化的时间序列数据库非常适合处理时间序列数据增长的数量和速度,而不是“传统”数据库。InfluxDB以优化的方式管理时间序列数据,并提供闪电般的检索速度。其新的查询语言Flux使查询变得简单。Flux是一种类似JavaScript的独立语言,其最实用的功能之一是它可以使用第三方API与不同的数据源和工具集成。因此,您可以轻松地将InfluxDB与第三方分析工具、数据源等连接起来。

根据您应用程序的需求,您可以轻松地将InfluxDB集成到您的应用程序中,无论您的应用程序是用Java、Python、Kotlin、Scala还是许多其他编程语言编写的。

在这篇文章中,您学习了如何使用Java开始使用InfluxDB,并回顾了一些标准操作,例如使用InfluxDB客户端插入数据、查询数据等。

您可以在应用程序中添加许多功能,例如创建和管理桶、为数据库应用程序设置健康检查、与Telegraf连接等。有关详细信息,请参阅文档,其中提供了优秀的示例,是探索这些出色功能的最佳地点。

关于作者

Reshma Sathe

Reshma是伊利诺伊大学香槟分校计算机科学硕士学位的毕业生,热爱学习新事物。她曾担任软件工程师,参与过从生产支持到编程和软件开发的各种项目。她目前正在进行Java、Python、Angular、React和其他前端和后端技术的自主项目。