使用 Java Flight SQL 客户端查询 InfluxDB Cloud

导航至

InfluxDB Cloud 3.0 是一个基于 Apache 生态系统构建的多功能时间序列数据库。您可以使用 Apache Arrow Flight SQL 接口查询 InfluxDB Cloud,该接口为处理时间序列数据提供了 SQL 支持。在本教程中,我们将逐步介绍使用 Java Flight SQL 客户端查询 InfluxDB Cloud 的过程。Java Flight SQL 客户端是 Apache Arrow Flight 的一部分,Apache Arrow Flight 是一个用于构建高性能数据服务的框架。它提供了一种通过 gRPC(一种现代高性能 RPC 框架)高效传输大型数据集的方法。通过此仓库,亲自尝试使用 Java Flight SQL 客户端查询 InfluxDB Cloud

要求和设置

本教程假设您已经拥有一个免费的 InfluxDB Cloud 账户。它还假设您的机器上正在运行 Docker

最后,您需要创建或获取以下 InfluxDB 资源

  • 一个存储桶
  • 一个令牌
  • 您的组织(通常是您注册帐户时使用的电子邮件)

您还需要将数据写入您的 InfluxDB 帐户。最简单的方法是通过 UI 手动编写一些行协议。导航到加载数据 > 存储桶 > +添加数据 > 行协议 > 手动输入,选择您要写入的存储桶,然后向 InfluxDB 写入一个点。例如,您可以写入 measurementName,tagKey=tagValue fieldKey=1.0。或者,如果您想要真实世界的行协议数据,请尝试 NOAA 空气传感器数据集。您还可以查看以下关于将数据写入 InfluxDB Cloud 3.0 的文档,了解将数据写入 InfluxDB Cloud 的其他方法。

代码演练

让我们将代码分解成更小的部分,以了解正在发生的事情。

  1. 导入所需的类:我们首先从 Apache Arrow Flight 和其他必要的库中导入所需的类。
    import io.grpc.CallOptions;
    import org.apache.arrow.flight.*; 
    import org.apache.arrow.flight.auth2.BearerCredentialWriter; 
    import org.apache.arrow.flight.grpc.CredentialCallOption; 
    import org.apache.arrow.flight.sql.FlightSqlClient;
    import org.apache.arrow.memory.BufferAllocator; 
    import org.apache.arrow.memory.RootAllocator; 
    import org.apache.arrow.vector.VectorSchemaRoot; 
    import io.grpc.Metadata;
    import java.net.URI;
    
  2. 定义主类:我们定义一个 JavaExample 类,其中包含一个 main 方法,我们的代码将在其中执行。
    public class JavaExample { public static void main(String[] args) {...} }
    
  3. 设置连接:我们定义 host 变量,它应该是 InfluxDB 实例 URL,不包含协议 (“https://”) 部分。
    String host = "<host without https:// i.e. us-east-1-1.aws.cloud2.influxdata.com>";
    
  4. 接下来,我们定义要执行的查询
    String query = "SELECT *";
    
  5. 现在,我们使用 forGrpcTls 方法创建一个 Location 对象,该方法设置了与 gRPC 和 TLS(传输层安全)加密的连接。
    Location location = Location.forGrpcTls(host, 443);
    
  6. 身份验证:我们使用 BearerCredentialWriter 和 CredentialCallOption 类设置身份验证。将“your token”替换为您的实际 InfluxDB 身份验证令牌。
    CredentialCallOption auth = new CredentialCallOption(new BearerCredentialWriter("your token"));
  7. 初始化 FlightClient:首先,使用 RootAllocator 创建一个 BufferAllocator 来管理内存分配
    BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
  8. 我们创建一个自定义拦截器,它在每个请求上注入数据库标头。将“your bucket”替换为您的 InfluxDB 存储桶的名称。
    FlightClientMiddleware.Factory f = info -> new FlightClientMiddleware() { 
    @Override public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) { outgoingHeaders.insert("database", "your bucket"); } 
    @Override public void onHeadersReceived(CallHeaders incomingHeaders) { } 
    @Override public void onCallCompleted(CallStatus status) { } };
  9. 现在,我们使用分配器、位置和自定义拦截器构建 FlightClient
    FlightClient client = FlightClient.builder(allocator, location).intercept(f)
    .build();
  10. 通过将 FlightClient 包装在 FlightSQLClient 构造函数中来创建 SQL 客户端。
    FlightSqlClient sqlClient = new FlightSqlClient(client);
  11. 使用 execute 方法并传入 auth 和 query 以在服务器上执行查询。flightInfo 是一个包含元数据和结果数据位置的对象。使用 getStream() 获取数据。然后将输出转换为字符串并使用 contentToTSVString() 打印它
    FlightInfo flightInfo = sqlClient.execute(query, auth);
    final FlightStream stream = sqlClient.getStream(flightInfo.getEndpoints().get(0).getTicket(), auth);
            while (stream.next()) {
                try {
                    final VectorSchemaRoot root = stream.getRoot();
                    System.out.println(root.contentToTSVString());
                } catch (Exception e) {
                    // handle the exception here, e.g. print error message
                    System.out.println("Error executing FlightSqlClient: " + e.getMessage());
                }
            }

您可以在此处找到完整脚本。

使用 Java Flight SQL 查询 InfluxDB Cloud

要在相应的 repo 中运行示例,请按照以下步骤操作

  1. 克隆 repo 并 cd 进入它。
  2. 运行 docker build -t myimage
  3. 运行 docker run myimage

资源和结论

请查看以下文档。它帮助我构建了这个示例,并且可以帮助您查询 InfluxDB Cloud 的旅程

  1. Arrow Flight 的参考文档
  2. Java 客户端的 Arrow Flight SQL 参考文档
  3. 关于 InfluxDB、Flight SQL、Pandas 和 Jupyter Notebooks 教程的博客文章
  4. 关于 TL;DR InfluxDB 技术提示的博客文章:使用 Flight SQL 和 AWS Lambda 进行降采样

我希望这篇博客文章能激励您探索 InfluxDB Cloud,并利用 Flight SQL 从 InfluxDB 传输大型数据集,以便使用您选择的工具进行数据处理。如果您需要任何帮助,请通过我们的社区网站Slack 频道与我们联系。我很乐意了解您尝试实现的目标以及您希望 InfluxDB 中的任务系统具备哪些功能。