使用 Java Flight SQL 客户端将数据下采样到 InfluxDB Cloud Dedicated

导航至

InfluxDB Cloud Dedicated 是一种托管式 InfluxDB Cloud 集群,专用于单个租户。InfluxDB 时间序列平台旨在处理高写入和查询负载,因此您可以为特定的时间序列用例使用和利用 InfluxDB Cloud Dedicated。在本教程中,我们将逐步介绍如何使用 Java Flight SQL 客户端从 InfluxDB Cloud Dedicated 读取数据。Java Flight SQL 客户端是 Apache Arrow Flight 的一部分,Apache Arrow Flight 是一个用于构建高性能数据服务的框架。它提供了一种通过 gRPC(一种现代高性能 RPC 框架)高效传输大型数据集的方法。通过这个 repo,亲自尝试使用 Java Flight SQL 客户端查询 InfluxDB Cloud Dedicated。

要求和设置

本教程假设您已经拥有 InfluxDB Cloud Dedicated 帐户。它还假设您已安装 Docker

请妥善保管您的集群 URL。

您需要创建以下内容

  • 源数据库

  • 目标数据库

  • 具有读/写权限的源令牌

  • 具有读/写权限的目标令牌

在初始设置期间,您需要加载用于下采样的数据。为了本教程的目的,我使用了 NOAA 空气传感器数据集。但是,也可以使用其他数据源,例如 Telegraf。这是一个简单的 Telegraf 配置来加载数据。有关 将数据写入 InfluxDB Cloud Dedicated 的其他数据写入方法,请查看以下文档。

代码演练

  1. 导入所需的类:我们首先从 Apache Arrow Flight 和其他必要的库导入所需的类。
    import org.apache.arrow.vector.VectorSchemaRoot;
    import org.apache.arrow.vector.types.pojo.ArrowType;
    
    import java.time.Instant;
    import java.time.LocalDateTime;
    import java.time.ZoneId;
    import java.time.ZoneOffset;
    import java.time.temporal.TemporalField;
    import java.util.concurrent.CountDownLatch;
  2. 定义主类:我们定义了一个 CloudReadWriteExample 类,其中包含一个 main 方法,我们的代码将在其中执行。
    public class CloudReadWriteExample {
      public static void main(String[] args) {
  3. 设置连接配置:我们期望以下变量在 ENV 中设置

    源/目标主机:源主机是不带协议(“https://”)的 Cloud Dedicated 集群 URL,目标主机是协议(“https://”)的集群 URL

    源/目标数据库:在本例中,源数据库存储下采样前的空气传感器数据,目标数据库存储下采样后的数据。

    源/目标令牌:分别用于源数据库和目标数据库的令牌。

    查询:在本例中,query 变量包含下采样查询。您可以根据您的需求将其替换为任何查询。

     //ReadConfigs
        private static final String SOURCE_HOST = System.getenv("SOURCE_URL");
        private static final String SOURCE_HOST = System.getenv("SOURCE_HOST");
        private static final String READ_TOKEN = System.getenv("READ_TOKEN");
        private static final String SOURCE_DATABASE_NAME = System.getenv("SOURCE_DATABASE_NAME");
        private static final String DOWNSAMPLE_QUERY = System.getenv("DOWNSAMPLE_QUERY");
    
        //WriteConfigs
        private static final String TARGET_CLUSTER_URL = System.getenv("TARGET_URL");
        private static final String TARGET_CLUSTER_URL = System.getenv("TARGET_CLUSTER_URL");
        private static final String WRITE_TOKEN = System.getenv("WRITE_TOKEN");
        private static final String TARGET_DATABASE_NAME = System.getenv("TARGET_DATABASE_NAME");
        private static final String TARGET_TABLE_NAME = System.getenv("TARGET_TABLE_NAME");
  4. 创建一个拦截器,在每个请求中注入标头元数据,在本例中为数据库名称。
    FlightClientMiddleware.Factory f = info -> new FlightClientMiddleware() {
                @Override
                public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) {
                    outgoingHeaders.insert("database", SOURCE_DATABASE_NAME);
                }
  5. 使用 forGrpcTls 方法创建一个“Location”对象,该方法使用 gRPC 和传输层安全 (TLS) 加密建立连接。
    Location location = Location.forGrpcTls(SOURCE_HOST, 443);
  6. 身份验证:我们使用 BearerCredentialWriterCredentialCallOption 类设置身份验证。将“READ_TOKEN”替换为您的实际 Cloud Dedicated(即源数据库)身份验证令牌。
    CredentialCallOption auth = new CredentialCallOption(new BearerCredentialWriter(READ_TOKEN));
  7. 身份验证成功后,execute 返回一个 FlightInfo 对象,其中包含元数据和端点列表。每个端点包含以下内容: - 您可以在其中检索数据的地址列表。- 一个 ticket 值,用于标识要检索的数据。
    FlightInfo flightInfo = sqlClient.execute(DOWNSAMPLE_QUERY, auth);
    
         // Extract the Flight ticket from the response.
         Ticket ticket = flightInfo.getEndpoints().get(0).getTicket();
  8. 从端点检索查询的流数据。
     // Pass the ticket to request the Arrow stream data from the endpoint.
            final FlightStream stream = sqlClient.getStream(ticket, auth);
  9. 使用以下代码创建 InfluxDB WriteApi 函数
    WriteApi writeApi = influxDBClient.makeWriteApi(WriteOptions.builder()
                            .batchSize(5000)
                            .flushInterval(1000)
  10. 以下代码迭代流数据,为每一行准备度量,并将准备好的点写入目标数据库。
     while (stream.next()) {
                try {
                    // Get the current vector data from the stream.
                    final VectorSchemaRoot root = stream.getRoot();
                    System.out.println(root.contentToTSVString());
    
                    InfluxDBClient influxDBClient = InfluxDBClientFactory.create(TARGET_CLUSTER_URL, WRITE_TOKEN.toCharArray(), "", TARGET_DATABASE_NAME);
                    CountDownLatch countDownLatch = new CountDownLatch(1);
                    try (WriteApi writeApi = influxDBClient.makeWriteApi(WriteOptions.builder()
                            .batchSize(5000)
                            .flushInterval(1000)
                            .backpressureStrategy(BackpressureOverflowStrategy.DROP_OLDEST)
                            .bufferLimit(10000)
                            .jitterInterval(1000)
                            .retryInterval(5000)
                            .build())) {
                        writeApi.listenEvents(WriteSuccessEvent.class, (value) -> countDownLatch.countDown());
                        writeAsPoints(writeApi, root);
                        writeApi.flush();
                    }
                } catch (Exception e) {
                    // Handle exceptions.
                    System.out.println("Error executing FlightSqlClient: " + e.getMessage());
                }
            }
  11. 以下代码是一个实用方法,用于从 ArrowStream 数据中的每一行准备度量。
    private void writeAsPoints(WriteApi writeApi, VectorSchemaRoot root) {
            int fields = root.getSchema().getFields().size();
            for(int i = 0; i < rowCount; i++) {
                Point point = Point.measurement(TARGET_TABLE_NAME);
                for (int j=0;j<fields;j++) {
                    String fieldName = root.getSchema().getFields().get(j).getName();
                    ArrowType fieldType = root.getSchema().getFields().get(j).getType();
                    if (fieldName.equalsIgnoreCase("time") && fieldType instanceof ArrowType.Timestamp) {
                        point.time(((LocalDateTime) root.getFieldVectors().get(j)
                            .getObject(i)).atZone(ZoneId.systemDefault()).toInstant(), WritePrecision.NS);
                    } else {
                        point.addField(fieldName, root.getFieldVectors().get(j).getObject(i).toString());
                    }
                }
                writeApi.writePoint(TARGET_DATABASE_NAME, TARGET_TABLE_NAME, point);
            }
        }

您可以在此处找到完整脚本。

使用 Java Flight SQL 查询、执行下采样并将数据写回 InfluxDB Cloud Dedicated

要运行相应 repo 中的示例,请按照以下步骤操作

  1. 克隆 repo 并 cd 进入它。

  2. 运行 docker build -t myimage

  3. 运行 docker run myimage

使用 Grafana 可视化数据

使用 Grafana 查询和可视化存储在 InfluxDB Cloud Dedicated 数据库中的数据。InfluxDB Cloud Dedicated 支持 SQL 和 InfluxQL 查询语言。安装 Grafana FlightSQL 插件,以使用 Flight SQL 协议通过 SQL 查询 InfluxDB。

Grafana 安装链接提供了有关如何安装 Grafana 和可视化数据的说明。

使用 Grafana 查询数据

您可以使用 Flight SQL 连接数据源来查询 Cloud Dedicated 数据库中的数据。

downsampling-influxdb-cloud-dedicated-java-flight-sql-client

资源和结论

请查看以下文档。它帮助我构建了这个示例,并且可以帮助您开始查询 InfluxDB Cloud Dedicated 之旅

  1. Arrow Flight 的参考文档。

  2. Java 客户端的 Arrow Flight SQL 的参考文档。

  3. InfluxDB Cloud 文档,关于 在 Python 中使用 Arrow Flight SQL 查询数据

  4. 一篇关于 InfluxDB、Flight SQL、Pandas 和 Jupyter Notebooks 教程的博文。

  5. 一篇关于 使用 Flight SQL 和 AWS Lambda 进行下采样的博文。

我希望这篇博文能启发您探索 InfluxDB Cloud Dedicated,并利用 Flight SQL 从 InfluxDB 传输大型数据集,以便使用您选择的工具进行数据处理。如果您需要任何帮助,请使用我们的社区网站Slack 频道联系我们。我很乐意了解您尝试实现的目标以及您希望 InfluxDB 拥有的功能。