使用 C++ Flight SQL 客户端查询 InfluxDB Cloud
作者:Anais Dotis-Georgiou / 开发者
2023年6月26日
导航至
InfluxDB Cloud 3.0 是一个基于 Apache 生态系统构建的多功能时序数据库。您可以使用 Apache Arrow Flight SQL 接口查询 InfluxDB Cloud,该接口为处理时序数据提供 SQL 支持。在本教程中,我们将逐步介绍如何使用 C++ 通过 Flight SQL 查询 InfluxDB Cloud。C++ Flight SQL 客户端是 Apache Arrow Flight 的一部分,Apache Arrow Flight 是一个用于构建高性能数据服务的框架。它提供了一种通过 gRPC(一种现代高性能 RPC 框架)高效传输大型数据集的方法。通过此代码仓库,亲自尝试使用 C++ Flight SQL 客户端查询 InfluxDB Cloud。
要求和设置
本教程假设您已经拥有一个免费的 InfluxDB Cloud 账户,并且您的机器上运行着 Docker。
最后,您需要创建或获取以下 InfluxDB 资源
- 一个 Bucket
- 一个 Token
- 您的组织 (通常是您注册账户时使用的电子邮件)
您还需要将数据写入您的 InfluxDB 账户。最简单的方法是通过 UI 手动写入一些 Line Protocol。导航到 Load Data > Buckets > +Add Data > Line Protocol > Enter Manually,选择您要写入的 Bucket,然后向 InfluxDB 写入一个数据点。例如,您可以写入 measurementName,tagKey=tagValue fieldKey=1.0
。或者,如果您想要真实的 Line Protocol 数据,请尝试 NOAA 空气传感器数据集。您还可以查看以下关于写入数据到 InfluxDB Cloud 3.0 的文档,了解其他将数据写入 InfluxDB Cloud 的方法。
代码演练
让我们将代码分解成更小的部分,以了解正在发生的事情。
- 导入所需的类:我们首先从 Apache Arrow Flight 和其他必要的库中导入所需的类。
#include <cstdlib> #include <iostream> #include <arrow/flight/client.h> #include <arrow/flight/sql/client.h> #include <arrow/table.h> #include <gflags/gflags.h> #include <arrow/flight/client_auth.h> #include <grpcpp/grpcpp.h> #include <grpcpp/support/channel_arguments.h> #include <arrow/flight/api.h> #include <arrow/flight/client_middleware.h> namespace flight = arrow::flight; namespace flightsql = arrow::flight::sql;
- 收集您的身份验证凭据以及您要执行的查询
const char* myhost = std::getenv("HOST"); const char* mytoken = std::getenv("TOKEN"); const char* mydatabase = std::getenv("DATABASE_NAME"); DEFINE_string(host, myhost , "The host of the Flight SQL server."); DEFINE_string(token, mytoken, "The token for InfluxDB"); DEFINE_string(database, mydatabase, "The database to query from"); DEFINE_int32(port, 443, "The port of the Flight SQL server."); DEFINE_string(query, "SELECT * FROM 'measurementName'", "The query to execute.");
- 定义主类:我们定义一个 C++ 类,其中包含一个
Main
方法,我们的代码将在其中执行。同时连接到 Arrow 客户端。arrow::Status Main() { ARROW_ASSIGN_OR_RAISE(auto location, flight::Location::ForGrpcTls(FLAGS_host, FLAGS_port)); std::cout << "Connecting to " << location.ToString() << std::endl;
- 设置 Flight SQL 客户端,并为
Bearer
token 和database
添加标头。// Set up the Flight SQL client std::unique_ptr<flight::FlightClient> flight_client; ARROW_ASSIGN_OR_RAISE(flight_client, flight::FlightClient::Connect(location)); std::unique_ptr<flightsql::FlightSqlClient> client( new flightsql::FlightSqlClient(std::move(flight_client))); flight::FlightCallOptions call_options; call_options.headers.emplace_back("authorization", "Bearer " + FLAGS_token); call_options.headers.emplace_back("database", FLAGS_database);
- 接下来,使用 Execute 方法并将标头和查询传递给客户端,以执行查询并获取结果。
// Execute the query, getting a FlightInfo describing how to fetch the results std::cout << "Executing query: '" << FLAGS_query << "'" << std::endl; ARROW_ASSIGN_OR_RAISE(std::unique_ptr<flight::FlightInfo> flight_info, client->Execute(call_options, FLAGS_query));
- 最后,将结果提取到流中,将其读取到 Arrow Table 中,并将其转换为字符串。
ARROW_ASSIGN_OR_RAISE(auto stream, client->DoGet(call_options, endpoint.ticket)); // Read all results into an Arrow Table, though we can iteratively process record // batches as they arrive as well ARROW_ASSIGN_OR_RAISE(auto table, stream->ToTable()); std::cout << "Read one chunk:" << std::endl; std::cout << table->ToString() << std::endl;
您可以在这里找到完整的脚本。
使用 C++ Flight SQL 查询 InfluxDB Cloud
要运行示例,请首先克隆相应的代码仓库,导航到该目录,并按照 README.md 中概述的以下步骤操作或
- 在您的终端中,设置以下环境变量。
# Set environment variables export INFLUX_DATABASE=<your bucket> && \ export INFLUX_HOST=<your host url i.e. us-east-1-1.aws.cloud2.influxdata.com> && \ export INFLUX_TOKEN=<your token>
- 运行以下命令以使用 shell 脚本构建名为
ccflight
的镜像sh ./influxdb-build.sh build
- 要启动应用程序,请在您的终端中运行
docker run <IMAGE_NAME>
docker run ccflight
推荐资源
查看以下文档。它帮助我构建了这个示例,并且可以在您查询 InfluxDB Cloud 的旅程中为您提供帮助
- Arrow Flight 的参考文档
- InfluxDB Cloud 文档,关于在 Python 中使用 Arrow Flight SQL 查询数据
- 关于 InfluxDB、Flight SQL、Pandas 和 Jupyter Notebooks 教程的博客文章
- 关于 TL;DR InfluxDB 技术提示:使用 Flight SQL 和 AWS Lambda 进行降采样的博客文章
我希望这篇博客文章能够启发您探索 InfluxDB Cloud,并利用 Flight SQL 从 InfluxDB 传输大型数据集,以便使用您选择的工具进行数据处理。如果您需要任何帮助,请使用我们的社区网站或 Slack 频道与我们联系。我很乐意了解您尝试实现的目标以及您希望 InfluxDB 拥有的功能。