使用Java Flight SQL客户端查询InfluxDB Cloud
作者:Anais Dotis-Georgiou / 产品
2023年5月12日
导航至
InfluxDB Cloud 3.0 是建立在Apache生态系统之上的通用时序数据库。您可以使用Apache Arrow 生态系统的 Arrow Flight SQL 接口查询InfluxDB Cloud,该接口为处理时序数据提供了 SQL 支持。在本教程中,我们将使用Java通过Flight SQL查询InfluxDB Cloud的过程进行演示。Java Flight SQL客户端是Apache Arrow Flight的一部分,Apache Arrow Flight是一个构建高性能数据服务的框架。它提供了一种通过gRPC(一种现代高性能RPC框架)高效传输大数据集的方法。您可以通过此 仓库 尝试使用Java Flight SQL客户端查询 InfluxDB Cloud。
要求和设置
本教程假设您已经拥有一个免费的InfluxDB Cloud账户。同时,也假设您的机器上已安装并运行了 Docker。
最后,您需要创建或获取以下InfluxDB资源
- 一个桶
- 一个令牌
- 您的组织(通常是您注册账户时使用的邮箱地址)
您还需要将数据写入您的InfluxDB账户。最简单的方法是通过UI手动写入一些行协议。导航到 加载数据 > 存储桶 > +添加数据 > 行协议 > 手动输入,选择您想要写入的存储桶,并将数据点写入InfluxDB。例如,您可以写入 measurementName,tagKey=tagValue fieldKey=1.0
。或者,如果您想查看真实的行协议数据,请尝试NOAA空气质量数据集。您还可以查看将数据写入InfluxDB Cloud 3.0的文档,了解其他将数据写入InfluxDB Cloud的方法。
代码分析
让我们将代码分解成更小的部分,以便了解发生了什么。
- 导入所需的类:我们首先从Apache Arrow Flight和其他必要的库中导入所需的类。
import io.grpc.CallOptions; import org.apache.arrow.flight.*; import org.apache.arrow.flight.auth2.BearerCredentialWriter; import org.apache.arrow.flight.grpc.CredentialCallOption; import org.apache.arrow.flight.sql.FlightSqlClient; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; import io.grpc.Metadata; import java.net.URI;
- 定义主类:我们定义一个JavaExample类,其中包含一个
main
方法,我们的代码将在其中执行。public class JavaExample { public static void main(String[] args) {...} }
- 设置连接:我们定义了
host
变量,该变量应该是InfluxDB实例URL(不包括协议部分“https://”)。String host = "<host without https:// i.e. us-east-1-1.aws.cloud2.influxdata.com>";
- 接下来,我们定义我们想要执行的查询
String query = "SELECT *";
- 现在,我们使用
forGrpcTls
方法创建一个Location对象,该方法设置与gRPC和TLS(传输层安全性)加密的连接。Location location = Location.forGrpcTls(host, 443);
- 身份验证:我们使用BearerCredentialWriter和CredentialCallOption类设置身份验证。用您的实际InfluxDB身份验证令牌替换'your token'。
CredentialCallOption auth = new CredentialCallOption(new BearerCredentialWriter("your token"));
- 初始化FlightClient:首先,使用RootAllocator创建一个BufferAllocator来管理内存分配
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
- 我们创建了一个自定义拦截器,它在每个请求中注入数据库头。用您的InfluxDB存储桶名称替换'your bucket'。
FlightClientMiddleware.Factory f = info -> new FlightClientMiddleware() { @Override public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) { outgoingHeaders.insert("database", "your bucket"); } @Override public void onHeadersReceived(CallHeaders incomingHeaders) { } @Override public void onCallCompleted(CallStatus status) { } };
- 现在,我们使用分配器、位置和自定义拦截器构建FlightClient
FlightClient client = FlightClient.builder(allocator, location).intercept(f) .build();
- 通过将FlightClient包装在FlightSQLClient构造函数中创建SQL客户端。
FlightSqlClient sqlClient = new FlightSqlClient(client);
- 使用
execute
方法和传递身份验证和查询在服务器上执行查询。flightInfo
是一个包含元数据和结果数据位置的对象。使用getStream()
获取数据。然后将输出转换为字符串并使用contentToTSVString()
打印它FlightInfo flightInfo = sqlClient.execute(query, auth); final FlightStream stream = sqlClient.getStream(flightInfo.getEndpoints().get(0).getTicket(), auth); while (stream.next()) { try { final VectorSchemaRoot root = stream.getRoot(); System.out.println(root.contentToTSVString()); } catch (Exception e) { // handle the exception here, e.g. print error message System.out.println("Error executing FlightSqlClient: " + e.getMessage()); } }
您可以在这里找到完整的脚本。
使用Java Flight SQL查询InfluxDB Cloud
要运行相应存储库中的示例,请按照以下步骤操作
- 克隆存储库并进入。
- 运行
docker build -t myimage
- 运行
docker run myimage
资源和结论
请查看以下文档。它帮助我构建了这个示例,也可以帮助您在查询InfluxDB Cloud的旅程中。
- Arrow Flight参考文档
- Java客户端的Arrow Flight SQL参考文档
- InfluxDB、Flight SQL、Pandas和Jupyter Notebooks教程博客
- TL;DR InfluxDB技术技巧:使用Flight SQL和AWS Lambda进行降采样博客
我希望这篇博客文章能够激发您探索 InfluxDB Cloud,并利用 Flight SQL 将大量数据集从 InfluxDB 转移出来,使用您选择的工具进行数据处理。如果您需要任何帮助,请通过我们的社区网站或Slack频道联系。我很乐意了解您想要实现的目标以及您希望 InfluxDB 中的任务系统具备哪些功能。