使用Java Flight SQL客户端进行InfluxDB Cloud专用集群的下采样
作者:Subashini Sukumar / 产品
2023年6月9日
导航至
InfluxDB Cloud专用是一个托管和管理的InfluxDB Cloud集群,专门为单个租户服务。InfluxDB时序平台旨在处理高写入和查询负载,因此您可以使用InfluxDB Cloud专用集群来满足您特定的时序用例。在本教程中,我们将通过Java Flight SQL客户端的过程来读取InfluxDB Cloud专用集群中的数据。Java Flight SQL客户端是Apache Arrow Flight的一部分,Apache Arrow Flight是一个用于构建高性能数据服务的框架。它提供了一种通过gRPC(一个现代高性能RPC框架)高效传输大数据集的方法。您可以使用此repo亲自尝试使用Java Flight SQL客户端查询InfluxDB Cloud专用集群。
要求和设置
本教程假设您已经拥有InfluxDB Cloud专用账户。它还假设您已经安装了Docker。
请妥善保存您的集群URL。
您需要创建以下内容
-
源数据库
-
目标数据库
-
具有读/写权限的源令牌
-
具有读/写权限的目标令牌
在初始设置期间,您需要加载用于下采样的数据。在本教程中,我使用了NOAA空气传感器数据集。然而,其他数据源(如Telegraf)也可以使用。以下是一个简单的Telegraf配置来加载数据。请参阅有关将数据写入InfluxDB Cloud专用的其他方法的相关文档。
代码讲解
- 导入所需的类:我们首先从Apache Arrow Flight和其他必要的库中导入所需的类。
import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.pojo.ArrowType; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZoneOffset; import java.time.temporal.TemporalField; import java.util.concurrent.CountDownLatch;
- 定义主类:我们定义一个名为CloudReadWriteExample的类,其中包含一个
main
方法,我们的代码将在其中执行。public class CloudReadWriteExample { public static void main(String[] args) {
- 设置连接配置:我们期望以下变量已在ENV中设置
源/目标主机:源主机是Cloud Dedicated集群URL(不包含协议“https://”),目标主机是集群URL(包含协议“https://”)
源/目标数据库:在本例中,源数据库存储下采样之前的空气传感器数据,目标数据库存储下采样后的数据。
源/目标令牌:源数据库和目标数据库的令牌。
查询:在本例中,查询变量包含下采样查询。您可以根据需要替换它。
//ReadConfigs private static final String SOURCE_HOST = System.getenv("SOURCE_URL"); private static final String SOURCE_HOST = System.getenv("SOURCE_HOST"); private static final String READ_TOKEN = System.getenv("READ_TOKEN"); private static final String SOURCE_DATABASE_NAME = System.getenv("SOURCE_DATABASE_NAME"); private static final String DOWNSAMPLE_QUERY = System.getenv("DOWNSAMPLE_QUERY"); //WriteConfigs private static final String TARGET_CLUSTER_URL = System.getenv("TARGET_URL"); private static final String TARGET_CLUSTER_URL = System.getenv("TARGET_CLUSTER_URL"); private static final String WRITE_TOKEN = System.getenv("WRITE_TOKEN"); private static final String TARGET_DATABASE_NAME = System.getenv("TARGET_DATABASE_NAME"); private static final String TARGET_TABLE_NAME = System.getenv("TARGET_TABLE_NAME");
- 创建一个拦截器,在每个请求中注入头部元数据,在本例中是数据库名称。
FlightClientMiddleware.Factory f = info -> new FlightClientMiddleware() { @Override public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) { outgoingHeaders.insert("database", SOURCE_DATABASE_NAME); }
- 使用
forGrpcTls
方法创建一个“位置”对象,该方法设置与gRPC和传输层安全性(TLS)加密的连接。Location location = Location.forGrpcTls(SOURCE_HOST, 443);
- 身份验证:我们使用
BearerCredentialWriter
和CredentialCallOption
类设置身份验证。将'READ_TOKEN'替换为您的实际云专用(即源数据库)身份验证令牌。CredentialCallOption auth = new CredentialCallOption(new BearerCredentialWriter(READ_TOKEN));
- 身份验证成功后,
execute
返回一个包含元数据和端点列表的FlightInfo
对象。每个端点包含以下内容:- 可以检索数据的一组地址。- 一个标识要检索的数据的ticket
值。FlightInfo flightInfo = sqlClient.execute(DOWNSAMPLE_QUERY, auth); // Extract the Flight ticket from the response. Ticket ticket = flightInfo.getEndpoints().get(0).getTicket();
- 从端点检索查询的流数据。
// Pass the ticket to request the Arrow stream data from the endpoint. final FlightStream stream = sqlClient.getStream(ticket, auth);
- 使用以下代码创建InfluxDB的
WriteApi
函数WriteApi writeApi = influxDBClient.makeWriteApi(WriteOptions.builder() .batchSize(5000) .flushInterval(1000)
- 下面的代码遍历流数据,为每一行准备测量,并将准备好的点写入目标数据库。
while (stream.next()) { try { // Get the current vector data from the stream. final VectorSchemaRoot root = stream.getRoot(); System.out.println(root.contentToTSVString()); InfluxDBClient influxDBClient = InfluxDBClientFactory.create(TARGET_CLUSTER_URL, WRITE_TOKEN.toCharArray(), "", TARGET_DATABASE_NAME); CountDownLatch countDownLatch = new CountDownLatch(1); try (WriteApi writeApi = influxDBClient.makeWriteApi(WriteOptions.builder() .batchSize(5000) .flushInterval(1000) .backpressureStrategy(BackpressureOverflowStrategy.DROP_OLDEST) .bufferLimit(10000) .jitterInterval(1000) .retryInterval(5000) .build())) { writeApi.listenEvents(WriteSuccessEvent.class, (value) -> countDownLatch.countDown()); writeAsPoints(writeApi, root); writeApi.flush(); } } catch (Exception e) { // Handle exceptions. System.out.println("Error executing FlightSqlClient: " + e.getMessage()); } }
- 下面的代码是一个实用方法,用于从
ArrowStream
数据中的每一行准备测量。private void writeAsPoints(WriteApi writeApi, VectorSchemaRoot root) { int fields = root.getSchema().getFields().size(); for(int i = 0; i < rowCount; i++) { Point point = Point.measurement(TARGET_TABLE_NAME); for (int j=0;j<fields;j++) { String fieldName = root.getSchema().getFields().get(j).getName(); ArrowType fieldType = root.getSchema().getFields().get(j).getType(); if (fieldName.equalsIgnoreCase("time") && fieldType instanceof ArrowType.Timestamp) { point.time(((LocalDateTime) root.getFieldVectors().get(j) .getObject(i)).atZone(ZoneId.systemDefault()).toInstant(), WritePrecision.NS); } else { point.addField(fieldName, root.getFieldVectors().get(j).getObject(i).toString()); } } writeApi.writePoint(TARGET_DATABASE_NAME, TARGET_TABLE_NAME, point); } }
您可以在这里找到完整的脚本。
使用Java Flight SQL查询、执行下采样并将数据写回InfluxDB Cloud Dedicated
要在相应的仓库中运行此示例,请按照以下步骤操作
-
克隆仓库并进入它。
-
运行
docker build -t myimage
-
运行
docker run myimage
使用Grafana可视化数据
使用Grafana查询和可视化存储在InfluxDB Cloud Dedicated数据库中的数据。InfluxDB Cloud Dedicated支持SQL和InfluxQL查询语言。安装Grafana FlightSQL插件,以使用Flight SQL协议使用SQL查询InfluxDB。
此Grafana安装链接提供了安装Grafana和可视化数据的说明。
使用Grafana查询数据
您可以使用Flight SQL连接数据源查询云专用数据库中的数据。
资源和结论
查看以下文档。它帮助我构建此示例,并可以帮助您在查询InfluxDB Cloud Dedicated的旅程中。
-
Arrow Flight的参考文档。
-
Java客户端的Arrow Flight SQL的参考文档。
-
InfluxDB Cloud的文档,有关使用Python中的Arrow Flight SQL查询数据。
我希望这篇博客文章能够激发您探索InfluxDB Cloud Dedicated并利用Flight SQL将大型数据集从InfluxDB传输到您选择的工具中进行数据处理。如果您需要任何帮助,请通过我们的社区站点或Slack频道联系。我很乐意了解您试图实现的目标和您希望InfluxDB具有的功能。