使用Java Flight SQL客户端进行InfluxDB Cloud专用集群的下采样

导航至

InfluxDB Cloud专用是一个托管和管理的InfluxDB Cloud集群,专门为单个租户服务。InfluxDB时序平台旨在处理高写入和查询负载,因此您可以使用InfluxDB Cloud专用集群来满足您特定的时序用例。在本教程中,我们将通过Java Flight SQL客户端的过程来读取InfluxDB Cloud专用集群中的数据。Java Flight SQL客户端是Apache Arrow Flight的一部分,Apache Arrow Flight是一个用于构建高性能数据服务的框架。它提供了一种通过gRPC(一个现代高性能RPC框架)高效传输大数据集的方法。您可以使用此repo亲自尝试使用Java Flight SQL客户端查询InfluxDB Cloud专用集群。

要求和设置

本教程假设您已经拥有InfluxDB Cloud专用账户。它还假设您已经安装了Docker

请妥善保存您的集群URL。

您需要创建以下内容

  • 源数据库

  • 目标数据库

  • 具有读/写权限的源令牌

  • 具有读/写权限的目标令牌

在初始设置期间,您需要加载用于下采样的数据。在本教程中,我使用了NOAA空气传感器数据集。然而,其他数据源(如Telegraf)也可以使用。以下是一个简单的Telegraf配置来加载数据。请参阅有关将数据写入InfluxDB Cloud专用的其他方法的相关文档。

代码讲解

  1. 导入所需的类:我们首先从Apache Arrow Flight和其他必要的库中导入所需的类。
    import org.apache.arrow.vector.VectorSchemaRoot;
    import org.apache.arrow.vector.types.pojo.ArrowType;
    
    import java.time.Instant;
    import java.time.LocalDateTime;
    import java.time.ZoneId;
    import java.time.ZoneOffset;
    import java.time.temporal.TemporalField;
    import java.util.concurrent.CountDownLatch;
  2. 定义主类:我们定义一个名为CloudReadWriteExample的类,其中包含一个main方法,我们的代码将在其中执行。
    public class CloudReadWriteExample {
      public static void main(String[] args) {
  3. 设置连接配置:我们期望以下变量已在ENV中设置

    源/目标主机:源主机是Cloud Dedicated集群URL(不包含协议“https://”),目标主机是集群URL(包含协议“https://”)

    源/目标数据库:在本例中,源数据库存储下采样之前的空气传感器数据,目标数据库存储下采样后的数据。

    源/目标令牌:源数据库和目标数据库的令牌。

    查询:在本例中,查询变量包含下采样查询。您可以根据需要替换它。

     //ReadConfigs
        private static final String SOURCE_HOST = System.getenv("SOURCE_URL");
        private static final String SOURCE_HOST = System.getenv("SOURCE_HOST");
        private static final String READ_TOKEN = System.getenv("READ_TOKEN");
        private static final String SOURCE_DATABASE_NAME = System.getenv("SOURCE_DATABASE_NAME");
        private static final String DOWNSAMPLE_QUERY = System.getenv("DOWNSAMPLE_QUERY");
    
        //WriteConfigs
        private static final String TARGET_CLUSTER_URL = System.getenv("TARGET_URL");
        private static final String TARGET_CLUSTER_URL = System.getenv("TARGET_CLUSTER_URL");
        private static final String WRITE_TOKEN = System.getenv("WRITE_TOKEN");
        private static final String TARGET_DATABASE_NAME = System.getenv("TARGET_DATABASE_NAME");
        private static final String TARGET_TABLE_NAME = System.getenv("TARGET_TABLE_NAME");
  4. 创建一个拦截器,在每个请求中注入头部元数据,在本例中是数据库名称。
    FlightClientMiddleware.Factory f = info -> new FlightClientMiddleware() {
                @Override
                public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) {
                    outgoingHeaders.insert("database", SOURCE_DATABASE_NAME);
                }
  5. 使用forGrpcTls方法创建一个“位置”对象,该方法设置与gRPC和传输层安全性(TLS)加密的连接。
    Location location = Location.forGrpcTls(SOURCE_HOST, 443);
  6. 身份验证:我们使用BearerCredentialWriterCredentialCallOption类设置身份验证。将'READ_TOKEN'替换为您的实际云专用(即源数据库)身份验证令牌。
    CredentialCallOption auth = new CredentialCallOption(new BearerCredentialWriter(READ_TOKEN));
  7. 身份验证成功后,execute返回一个包含元数据和端点列表的FlightInfo对象。每个端点包含以下内容:- 可以检索数据的一组地址。- 一个标识要检索的数据的ticket值。
    FlightInfo flightInfo = sqlClient.execute(DOWNSAMPLE_QUERY, auth);
    
         // Extract the Flight ticket from the response.
         Ticket ticket = flightInfo.getEndpoints().get(0).getTicket();
  8. 从端点检索查询的流数据。
     // Pass the ticket to request the Arrow stream data from the endpoint.
            final FlightStream stream = sqlClient.getStream(ticket, auth);
  9. 使用以下代码创建InfluxDB的WriteApi函数
    WriteApi writeApi = influxDBClient.makeWriteApi(WriteOptions.builder()
                            .batchSize(5000)
                            .flushInterval(1000)
  10. 下面的代码遍历流数据,为每一行准备测量,并将准备好的点写入目标数据库。
     while (stream.next()) {
                try {
                    // Get the current vector data from the stream.
                    final VectorSchemaRoot root = stream.getRoot();
                    System.out.println(root.contentToTSVString());
    
                    InfluxDBClient influxDBClient = InfluxDBClientFactory.create(TARGET_CLUSTER_URL, WRITE_TOKEN.toCharArray(), "", TARGET_DATABASE_NAME);
                    CountDownLatch countDownLatch = new CountDownLatch(1);
                    try (WriteApi writeApi = influxDBClient.makeWriteApi(WriteOptions.builder()
                            .batchSize(5000)
                            .flushInterval(1000)
                            .backpressureStrategy(BackpressureOverflowStrategy.DROP_OLDEST)
                            .bufferLimit(10000)
                            .jitterInterval(1000)
                            .retryInterval(5000)
                            .build())) {
                        writeApi.listenEvents(WriteSuccessEvent.class, (value) -> countDownLatch.countDown());
                        writeAsPoints(writeApi, root);
                        writeApi.flush();
                    }
                } catch (Exception e) {
                    // Handle exceptions.
                    System.out.println("Error executing FlightSqlClient: " + e.getMessage());
                }
            }
  11. 下面的代码是一个实用方法,用于从ArrowStream数据中的每一行准备测量。
    private void writeAsPoints(WriteApi writeApi, VectorSchemaRoot root) {
            int fields = root.getSchema().getFields().size();
            for(int i = 0; i < rowCount; i++) {
                Point point = Point.measurement(TARGET_TABLE_NAME);
                for (int j=0;j<fields;j++) {
                    String fieldName = root.getSchema().getFields().get(j).getName();
                    ArrowType fieldType = root.getSchema().getFields().get(j).getType();
                    if (fieldName.equalsIgnoreCase("time") && fieldType instanceof ArrowType.Timestamp) {
                        point.time(((LocalDateTime) root.getFieldVectors().get(j)
                            .getObject(i)).atZone(ZoneId.systemDefault()).toInstant(), WritePrecision.NS);
                    } else {
                        point.addField(fieldName, root.getFieldVectors().get(j).getObject(i).toString());
                    }
                }
                writeApi.writePoint(TARGET_DATABASE_NAME, TARGET_TABLE_NAME, point);
            }
        }

您可以在这里找到完整的脚本。

使用Java Flight SQL查询、执行下采样并将数据写回InfluxDB Cloud Dedicated

要在相应的仓库中运行此示例,请按照以下步骤操作

  1. 克隆仓库并进入它。

  2. 运行docker build -t myimage

  3. 运行docker run myimage

使用Grafana可视化数据

使用Grafana查询和可视化存储在InfluxDB Cloud Dedicated数据库中的数据。InfluxDB Cloud Dedicated支持SQL和InfluxQL查询语言。安装Grafana FlightSQL插件,以使用Flight SQL协议使用SQL查询InfluxDB。

Grafana安装链接提供了安装Grafana和可视化数据的说明。

使用Grafana查询数据

您可以使用Flight SQL连接数据源查询云专用数据库中的数据。

downsampling-influxdb-cloud-dedicated-java-flight-sql-client

资源和结论

查看以下文档。它帮助我构建此示例,并可以帮助您在查询InfluxDB Cloud Dedicated的旅程中。

  1. Arrow Flight的参考文档。

  2. Java客户端的Arrow Flight SQL的参考文档。

  3. InfluxDB Cloud的文档,有关使用Python中的Arrow Flight SQL查询数据

  4. 关于InfluxDB、Flight SQL、Pandas和Jupyter Notebooks教程的博客文章。

  5. 关于使用Flight SQL和AWS Lambda进行下采样的博客文章

我希望这篇博客文章能够激发您探索InfluxDB Cloud Dedicated并利用Flight SQL将大型数据集从InfluxDB传输到您选择的工具中进行数据处理。如果您需要任何帮助,请通过我们的社区站点Slack频道联系。我很乐意了解您试图实现的目标和您希望InfluxDB具有的功能。