使用Java Flight SQL客户端查询InfluxDB Cloud

导航至

InfluxDB Cloud 3.0 是建立在Apache生态系统之上的通用时序数据库。您可以使用Apache Arrow 生态系统的 Arrow Flight SQL 接口查询InfluxDB Cloud,该接口为处理时序数据提供了 SQL 支持。在本教程中,我们将使用Java通过Flight SQL查询InfluxDB Cloud的过程进行演示。Java Flight SQL客户端是Apache Arrow Flight的一部分,Apache Arrow Flight是一个构建高性能数据服务的框架。它提供了一种通过gRPC(一种现代高性能RPC框架)高效传输大数据集的方法。您可以通过此 仓库 尝试使用Java Flight SQL客户端查询 InfluxDB Cloud

要求和设置

本教程假设您已经拥有一个免费的InfluxDB Cloud账户。同时,也假设您的机器上已安装并运行了 Docker

最后,您需要创建或获取以下InfluxDB资源

  • 一个桶
  • 一个令牌
  • 您的组织(通常是您注册账户时使用的邮箱地址)

您还需要将数据写入您的InfluxDB账户。最简单的方法是通过UI手动写入一些行协议。导航到 加载数据 > 存储桶 > +添加数据 > 行协议 > 手动输入,选择您想要写入的存储桶,并将数据点写入InfluxDB。例如,您可以写入 measurementName,tagKey=tagValue fieldKey=1.0。或者,如果您想查看真实的行协议数据,请尝试NOAA空气质量数据集。您还可以查看将数据写入InfluxDB Cloud 3.0的文档,了解其他将数据写入InfluxDB Cloud的方法。

代码分析

让我们将代码分解成更小的部分,以便了解发生了什么。

  1. 导入所需的类:我们首先从Apache Arrow Flight和其他必要的库中导入所需的类。
    import io.grpc.CallOptions;
    import org.apache.arrow.flight.*; 
    import org.apache.arrow.flight.auth2.BearerCredentialWriter; 
    import org.apache.arrow.flight.grpc.CredentialCallOption; 
    import org.apache.arrow.flight.sql.FlightSqlClient;
    import org.apache.arrow.memory.BufferAllocator; 
    import org.apache.arrow.memory.RootAllocator; 
    import org.apache.arrow.vector.VectorSchemaRoot; 
    import io.grpc.Metadata;
    import java.net.URI;
    
  2. 定义主类:我们定义一个JavaExample类,其中包含一个main方法,我们的代码将在其中执行。
    public class JavaExample { public static void main(String[] args) {...} }
    
  3. 设置连接:我们定义了host变量,该变量应该是InfluxDB实例URL(不包括协议部分“https://”)。
    String host = "<host without https:// i.e. us-east-1-1.aws.cloud2.influxdata.com>";
    
  4. 接下来,我们定义我们想要执行的查询
    String query = "SELECT *";
    
  5. 现在,我们使用forGrpcTls方法创建一个Location对象,该方法设置与gRPC和TLS(传输层安全性)加密的连接。
    Location location = Location.forGrpcTls(host, 443);
    
  6. 身份验证:我们使用BearerCredentialWriter和CredentialCallOption类设置身份验证。用您的实际InfluxDB身份验证令牌替换'your token'。
    CredentialCallOption auth = new CredentialCallOption(new BearerCredentialWriter("your token"));
  7. 初始化FlightClient:首先,使用RootAllocator创建一个BufferAllocator来管理内存分配
    BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
  8. 我们创建了一个自定义拦截器,它在每个请求中注入数据库头。用您的InfluxDB存储桶名称替换'your bucket'。
    FlightClientMiddleware.Factory f = info -> new FlightClientMiddleware() { 
    @Override public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) { outgoingHeaders.insert("database", "your bucket"); } 
    @Override public void onHeadersReceived(CallHeaders incomingHeaders) { } 
    @Override public void onCallCompleted(CallStatus status) { } };
  9. 现在,我们使用分配器、位置和自定义拦截器构建FlightClient
    FlightClient client = FlightClient.builder(allocator, location).intercept(f)
    .build();
  10. 通过将FlightClient包装在FlightSQLClient构造函数中创建SQL客户端。
    FlightSqlClient sqlClient = new FlightSqlClient(client);
  11. 使用execute方法和传递身份验证和查询在服务器上执行查询。flightInfo是一个包含元数据和结果数据位置的对象。使用getStream()获取数据。然后将输出转换为字符串并使用contentToTSVString()打印它
    FlightInfo flightInfo = sqlClient.execute(query, auth);
    final FlightStream stream = sqlClient.getStream(flightInfo.getEndpoints().get(0).getTicket(), auth);
            while (stream.next()) {
                try {
                    final VectorSchemaRoot root = stream.getRoot();
                    System.out.println(root.contentToTSVString());
                } catch (Exception e) {
                    // handle the exception here, e.g. print error message
                    System.out.println("Error executing FlightSqlClient: " + e.getMessage());
                }
            }

您可以在这里找到完整的脚本。

使用Java Flight SQL查询InfluxDB Cloud

要运行相应存储库中的示例,请按照以下步骤操作

  1. 克隆存储库并进入。
  2. 运行docker build -t myimage
  3. 运行docker run myimage

资源和结论

请查看以下文档。它帮助我构建了这个示例,也可以帮助您在查询InfluxDB Cloud的旅程中。

  1. Arrow Flight参考文档
  2. Java客户端的Arrow Flight SQL参考文档
  3. InfluxDB、Flight SQL、Pandas和Jupyter Notebooks教程博客
  4. TL;DR InfluxDB技术技巧:使用Flight SQL和AWS Lambda进行降采样博客

我希望这篇博客文章能够激发您探索 InfluxDB Cloud,并利用 Flight SQL 将大量数据集从 InfluxDB 转移出来,使用您选择的工具进行数据处理。如果您需要任何帮助,请通过我们的社区网站Slack频道联系。我很乐意了解您想要实现的目标以及您希望 InfluxDB 中的任务系统具备哪些功能。