使用 C++ Flight SQL 客户端查询 InfluxDB Cloud

导航至

InfluxDB Cloud 3.0 是一个基于 Apache 生态系统构建的多功能时序数据库。您可以使用 Apache Arrow Flight SQL 接口查询 InfluxDB Cloud,该接口为处理时序数据提供 SQL 支持。在本教程中,我们将逐步介绍如何使用 C++ 通过 Flight SQL 查询 InfluxDB Cloud。C++ Flight SQL 客户端是 Apache Arrow Flight 的一部分,Apache Arrow Flight 是一个用于构建高性能数据服务的框架。它提供了一种通过 gRPC(一种现代高性能 RPC 框架)高效传输大型数据集的方法。通过此代码仓库,亲自尝试使用 C++ Flight SQL 客户端查询 InfluxDB Cloud

要求和设置

本教程假设您已经拥有一个免费的 InfluxDB Cloud 账户,并且您的机器上运行着 Docker

最后,您需要创建或获取以下 InfluxDB 资源

  • 一个 Bucket
  • 一个 Token
  • 您的组织 (通常是您注册账户时使用的电子邮件)

您还需要将数据写入您的 InfluxDB 账户。最简单的方法是通过 UI 手动写入一些 Line Protocol。导航到 Load Data > Buckets > +Add Data > Line Protocol > Enter Manually,选择您要写入的 Bucket,然后向 InfluxDB 写入一个数据点。例如,您可以写入 measurementName,tagKey=tagValue fieldKey=1.0。或者,如果您想要真实的 Line Protocol 数据,请尝试 NOAA 空气传感器数据集。您还可以查看以下关于写入数据到 InfluxDB Cloud 3.0 的文档,了解其他将数据写入 InfluxDB Cloud 的方法。

代码演练

让我们将代码分解成更小的部分,以了解正在发生的事情。

  1. 导入所需的类:我们首先从 Apache Arrow Flight 和其他必要的库中导入所需的类。
    #include <cstdlib>
    #include <iostream>
    
    #include <arrow/flight/client.h>
    #include <arrow/flight/sql/client.h>
    #include <arrow/table.h>
    #include <gflags/gflags.h>
    #include <arrow/flight/client_auth.h>
    #include <grpcpp/grpcpp.h>
    #include <grpcpp/support/channel_arguments.h>
    #include <arrow/flight/api.h>
    #include <arrow/flight/client_middleware.h>
    
    namespace flight = arrow::flight;
    namespace flightsql = arrow::flight::sql;
  2. 收集您的身份验证凭据以及您要执行的查询
    const char* myhost = std::getenv("HOST");
    const char* mytoken = std::getenv("TOKEN");
    const char* mydatabase = std::getenv("DATABASE_NAME");
    
    DEFINE_string(host, myhost , "The host of the Flight SQL server.");
    DEFINE_string(token, mytoken, "The token for InfluxDB");
    DEFINE_string(database, mydatabase, "The database to query from");
    DEFINE_int32(port, 443, "The port of the Flight SQL server.");
    DEFINE_string(query, "SELECT * FROM 'measurementName'", "The query to execute.");
  3. 定义主类:我们定义一个 C++ 类,其中包含一个 Main 方法,我们的代码将在其中执行。同时连接到 Arrow 客户端。
    arrow::Status Main() {
    ARROW_ASSIGN_OR_RAISE(auto location,
                            flight::Location::ForGrpcTls(FLAGS_host, FLAGS_port));
      std::cout << "Connecting to " << location.ToString() << std::endl;
  4. 设置 Flight SQL 客户端,并为 Bearer token 和 database 添加标头。
    // Set up the Flight SQL client
    std::unique_ptr<flight::FlightClient> flight_client;
    ARROW_ASSIGN_OR_RAISE(flight_client, flight::FlightClient::Connect(location));
    std::unique_ptr<flightsql::FlightSqlClient> client(
    new flightsql::FlightSqlClient(std::move(flight_client)));
    
    flight::FlightCallOptions call_options;
    call_options.headers.emplace_back("authorization", "Bearer " + FLAGS_token);
    call_options.headers.emplace_back("database", FLAGS_database);
  5. 接下来,使用 Execute 方法并将标头和查询传递给客户端,以执行查询并获取结果。
    // Execute the query, getting a FlightInfo describing how to fetch the results
    std::cout << "Executing query: '" << FLAGS_query << "'" << std::endl;
    ARROW_ASSIGN_OR_RAISE(std::unique_ptr<flight::FlightInfo> flight_info,
    client->Execute(call_options, FLAGS_query));
  6. 最后,将结果提取到流中,将其读取到 Arrow Table 中,并将其转换为字符串。
    ARROW_ASSIGN_OR_RAISE(auto stream, client->DoGet(call_options, endpoint.ticket));
    // Read all results into an Arrow Table, though we can iteratively process record
    // batches as they arrive as well
    ARROW_ASSIGN_OR_RAISE(auto table, stream->ToTable());
    std::cout << "Read one chunk:" << std::endl;
    std::cout << table->ToString() << std::endl;

您可以在这里找到完整的脚本。

使用 C++ Flight SQL 查询 InfluxDB Cloud

要运行示例,请首先克隆相应的代码仓库,导航到该目录,并按照 README.md 中概述的以下步骤操作或

  1. 在您的终端中,设置以下环境变量。
    # Set environment variables
    
    export INFLUX_DATABASE=<your bucket> && \
    export INFLUX_HOST=<your host url i.e. us-east-1-1.aws.cloud2.influxdata.com> && \
    export INFLUX_TOKEN=<your token>
  2. 运行以下命令以使用 shell 脚本构建名为 ccflight 的镜像
    sh ./influxdb-build.sh build
  3. 要启动应用程序,请在您的终端中运行 docker run <IMAGE_NAME>
    docker run ccflight

查看以下文档。它帮助我构建了这个示例,并且可以在您查询 InfluxDB Cloud 的旅程中为您提供帮助

  1. Arrow Flight 的参考文档
  2. InfluxDB Cloud 文档,关于在 Python 中使用 Arrow Flight SQL 查询数据
  3. 关于 InfluxDB、Flight SQL、Pandas 和 Jupyter Notebooks 教程的博客文章
  4. 关于 TL;DR InfluxDB 技术提示:使用 Flight SQL 和 AWS Lambda 进行降采样的博客文章

我希望这篇博客文章能够启发您探索 InfluxDB Cloud,并利用 Flight SQL 从 InfluxDB 传输大型数据集,以便使用您选择的工具进行数据处理。如果您需要任何帮助,请使用我们的社区网站Slack 频道与我们联系。我很乐意了解您尝试实现的目标以及您希望 InfluxDB 拥有的功能。