使用 GraphQL 与 InfluxDB 和 Flux
作者:Sonia Gupta / 用例, 开发者, 产品
2019 年 1 月 17 日
导航至
在之前的文章中,我们向您展示了如何在 Rails 应用程序中进行 Flux 查询。在本文中,我们将扩展这些知识,在 Rails 应用程序中使用 GraphQL,利用 Flux 的强大功能查询 InfluxDB。这允许客户端微调底层 Flux 查询的结果。您将开始了解当您将 GraphQL 与 Flux 配对时可以访问的强大功能。
在我们开始之前,让我们谈谈 GraphQL 以及为什么您会选择它而不是传统的 REST API。我强烈建议您完成 How to GraphQL 上的教程并浏览文档,以更好地了解 GraphQL 是什么以及如何在您的应用程序中实现它。我在这里学到了很多东西。
什么是 GraphQL?
GraphQL 是一种查询语言,允许客户端在单个请求中准确指定它想要的数据,而不是必须发出多个请求来获取相同的数据或处理数据的过度获取或获取不足。它是数据库不可知的,因为它本身不直接查询数据库。后端逻辑仍然像往常一样处理这个问题(在我们的例子中,Flux 查询将完成繁重的工作)。GraphQL 使用解析器来处理传入的查询,然后实现后端逻辑来获取数据。与传统的 REST API 返回预先确定的数据结构相比,它允许客户端在获取数据方面拥有更大的权力。
我们正在构建什么?
我们将构建一个后端,当客户端启动 GraphQL 查询时,它会触发 Flux 查询我们的 InfluxDB 数据库。在本例中,我们是一家火车公司,使用传感器捕获有关我们火车的数据以及其他相关信息,例如室外温度。我们希望我们的客户能够准确选择它想要检索的数据,例如速度和/或轨道温度,而不是过度获取或获取不足数据。因此,我们将为我们的客户提供一些 GraphQL 查询,他们可以自定义这些查询以满足其需求。
火车、飞机和数据点
在上一篇文章中,我使用了名为cpu_load_short
的 measurement,其中包含各种标签和字段。在本文中,我使用火车数据填充了我的 InfluxDB 数据库,名为trains
。measurement 是train_speed
。我的标签是driver
、location
和train
。我的字段是outdoor_temp
、speed
和track_temp
。下面您可以看到三位不同驾驶员的数据样本。
下面是一位驾驶员的数据样本。
您在上一篇文章中看到的用于格式化 Flux 查询的带注释 CSV 响应的方法,在一些重构之后,现在看起来像这样。它执行与以前相同的功能——只是我现在生成的是火车对象而不是 cpu 负载对象
def parse_trains(response)
header, *rows = response.body.split("\r\n").drop(3)
column_names = parse_row(header)
rows.map do |row|
values = parse_row(row)
train_data = column_names.zip(values).to_h
Train.new(train_data)
end
end
def parse_row(raw_row)
raw_row.split(",").drop(3)
end
我还为我的火车对象制作了一个新的 PORO(普通 Ruby 对象)
class Train
attr_reader :time, :measurement, :driver, :location, :train, :outdoor_temp, :speed, :track_temp
def initialize(train)
@time = train["_time"]
@measurement = train["_measurement"]
@driver = train["driver"]
@location = train["location"]
@train = train["train"]
@outdoor_temp = train["outdoor_temp"]
@speed = train["speed"]
@track_temp = train["track_temp"]
end
end
这个 PORO 在我的parse_trains
方法末尾生成,非常方便,因为 GraphQL gem 在解析传入的 GraphQL 查询时,只需获取相应的实例变量值即可。
将 GraphQL 添加到 Rails 项目
我发现 How to GraphQL Rails 教程 对于入门非常有用,我建议按照其中概述的步骤添加graphql
gem 和 GraphiQL 工具到您的项目。
该教程的其余部分使用 Active Record 查询从使用 GraphQL 解析器的传统关系数据库中获取数据。了解其工作原理很有帮助,但是如果我们想从 InfluxDB 实例中获取数据怎么办?我们不能为此使用 Active Record,但是正如我们在上一篇文章中学到的那样,我们可以在 HTTP 调用中使用 Flux 查询来获取该数据(并且当 InfluxDB v2.0 客户端库准备就绪时,我们可以使用这些库)。
对于我的新火车数据点,我现在对我的 InfluxDB 数据库的查询端点的 HTTP 请求看起来像这样
def query_influxdb(time, filter_param = nil)
uri = URI.parse("http://localhost:8086/api/v2/query")
request = Net::HTTP::Post.new(uri)
request.content_type = "application/vnd.flux"
request["Accept"] = "application/csv"
request.body = flux_query(time, filter_param)
req_options = {
use_ssl: uri.scheme == "https",
}
response = Net::HTTP.start(uri.hostname, uri.port, req_options) do |http|
http.request(request)
end
end
这是一个通用的 HTTP 请求,它为每个 GraphQL 查询调用。然后动态生成 Flux 查询并将其传递到其request.body
中。
根据正在解析的 GraphQL 查询,Flux 查询的构造方式如下
def flux_query(time, filter_param = nil)
%Q[
#{flux_from}
|> #{flux_range(time)}
|> #{flux_filter(filter_param)}
|> #{flux_pivot}
|> #{flux_yield}
]
end
def flux_from
"from(bucket:\"trains/autogen\")"
End
# There are two possible options in the range method because for most queries I just
# want a default range of the last 60 days, but in order to search by
# timestamp, I need to set my range to that exact timestamp. Flux
# allows me to do that by choosing an inclusive start time, which is my timestamp,
# and an exclusive stop time, which is one second later.
def flux_range(time)
if time == TIME
"range(start: #{time})"
else
start_time = Time.parse(time)
stop_time = start_time + 1.second
"range(start: #{start_time.rfc3339}, stop: #{stop_time.rfc3339})"
end
end
def flux_filter(filter_param = nil)
if filter_param
"filter(fn: (r) => r.driver == \"#{filter_param}\" and
r._measurement == \"train_speed\")"
else
"filter(fn: (r) => r._measurement == \"train_speed\")"
end
end
def flux_pivot
"pivot(
rowKey:[\"_time\"],
columnKey: [\"_field\"],
valueColumn: \"_value\"
)"
end
def flux_yield
"yield()"
end
定义 GraphQL 类型
在按照 How to GraphQL Rails 教程 中概述的初始设置说明进行操作后,我创建了一个新文件 app/graphql/types/train_type.rb
,并将以下代码放入其中
# defines a new GraphQL type
Types::TrainType = GraphQL::ObjectType.define do
# this type is named 'Train'
name 'Train'
# it has the following fields
field :time, types.String
field :measurement, types.String
field :driver, types.String
field :location, types.String
field :train, types.String
field :outdoor_temp, types.String
field :speed, types.String
field :track_temp, types.String
end
那么这里到底发生了什么?我正在定义一个对象类型。我知道我有一个填充了火车数据点的 InfluxDB 数据库,我的 measurement 是train_speed
,标签为driver
、location
和train
,字段为speed
、outdoor_temp
和track_temp
。我需要我的 GraphQL 对象类型的形状来模拟我的数据点的形状,以便呈现每个字段。因此,每个 GraphQL 字段都对应于我的 InfluxDB 数据点的列。请注意,Flux pivot 函数允许我的 InfluxDB 字段显示为它们自己的单独列,这使数据更具可读性。
除了对象类型之外,GraphQL 中还有根类型。根类型包括查询类型和 mutation 类型。由于我们将查询数据,因此我们需要建立我们的查询类型,我们将在其中放置所有查询的代码。当我们在初始设置中运行 rails generate graphql:install
时,我们生成了一个名为 app/graphql/types/query_type.rb
的模板文件。我们现在将我们的各种查询放入此文件中,这些查询只是定义为字段,类似于对象类型。我的整个文件看起来像这样(为了便于阅读,我将前面提到的 HTTP 调用、解析方法和 Flux 查询生成器方法放在底部作为私有方法)
require 'net/http'
require 'uri'
TIME = "-60d"
Types::QueryType = GraphQL::ObjectType.define do
name "Query"
field :allTrains, !types[Types::TrainType] do
description "Return all train data"
resolve -> (obj, args, ctx) {
parse_trains(query_influxdb(TIME))
}
end
field :trainByDriver, !types[Types::TrainType] do
argument :driver, !types.String
description "Find train stats by driver"
resolve -> (obj, args, ctx) {
parse_trains(query_influxdb(TIME, args[:driver]))
}
end
field :trainByTime, !types[Types::TrainType] do
argument :time, !types.String
description "Find train stats by time"
resolve -> (obj, args, ctx) {
parse_trains(query_influxdb(args[:time]))
}
end
end
private
def parse_trains(response)
header, *rows = response.body.split("\r\n").drop(3)
column_names = parse_row(header)
rows.map do |row|
values = parse_row(row)
train_data = column_names.zip(values).to_h
Train.new(train_data)
end
end
def parse_row(raw_row)
raw_row.split(",").drop(3)
end
def query_influxdb(time, filter_param = nil)
uri = URI.parse("http://localhost:8086/api/v2/query")
request = Net::HTTP::Post.new(uri)
request.content_type = "application/vnd.flux"
request["Accept"] = "application/csv"
request.body = flux_query(time, filter_param)
req_options = {
use_ssl: uri.scheme == "https",
}
response = Net::HTTP.start(uri.hostname, uri.port, req_options) do |http|
http.request(request)
end
end
def flux_query(time, filter_param = nil)
%Q[
#{flux_from}
|> #{flux_range(time)}
|> #{flux_filter(filter_param)}
|> #{flux_pivot}
|> #{flux_yield}
]
end
def flux_from
"from(bucket:\"trains/autogen\")"
end
def flux_range(time)
if time == TIME
"range(start: #{time})"
else
start_time = Time.parse(time)
stop_time = start_time + 1.second
"range(start: #{start_time.rfc3339}, stop: #{stop_time.rfc3339})"
end
end
def flux_filter(filter_param = nil)
if filter_param
"filter(fn: (r) => r.driver == \"#{filter_param}\" and
r._measurement == \"train_speed\")"
else
"filter(fn: (r) => r._measurement == \"train_speed\")"
end
end
def flux_pivot
"pivot(
rowKey:[\"_time\"],
columnKey: [\"_field\"],
valueColumn: \"_value\"
)"
end
def flux_yield
"yield()"
end
allTrains
查询按预期返回所有火车。我的解析器只是返回整个数据集,而在幕后,grapqhl
gem 只是使用点语法来调用方法以访问我在我的 Train PORO 中定义的每个实例变量。
如果我们正确完成了初始设置,我们可以使用 GraphiQL 工具来模拟客户端查询。让我们转到 http://localhost:3000/graphiql
来测试我们的查询。我可以使用 allTrains
查询返回所有字段,如下所示
我也可以只返回我想要的字段的选择
这就是 GraphQL 如此有用的原因。客户端可以决定要返回哪些字段,而不是由传统的 REST API 端点返回预先确定的数据集。在后端,我可以为我的客户端提供许多选项来获取所需的数据,并且这些选项是高度可定制的。
例如,我可以使用 trainByDriver
查询允许我的客户端传入一个参数来返回单个驾驶员的所有火车。在此查询中,转换向下传递到 Flux,以完成仅返回与请求的驾驶员关联的火车数据的工作,而 GraphQL 处理字段选择。仅返回请求字段的相同功能也存在于此查询中。
由于这是我们正在处理的时间序列数据,因此允许我们的客户端使用 trainByTime
查询按单个时间戳搜索是有意义的。正如您在前面讨论的 Flux 查询生成器代码中看到的那样,我们再次将转换数据的工作推送到 Flux 查询中,以检索与该时间戳关联的数据,然后让 GraphQL 仅返回客户端请求的字段。
您可能会想到许多其他对客户端有用的查询,所有这些查询都将具有仅提供客户端专门请求的信息的优势。
总结
GraphQL 是您 InfluxDB 数据库的强大伴侣。它允许客户端准确指定它需要的信息,而不会过度获取或获取不足数据。结合 Flux 查询的结果,您现在有两个控制点:Flux 查询的范围和过滤器功能,以及您向客户端开放的 GraphQL 查询类型。客户端可以简化其代码并避免多余的代码,尤其是在您将转换工作下推到数据库查询而不是要求客户端进行转换时。此外,您不再需要您的客户端更改其命中的端点,因为需求发生变化,因为它正在使用 GraphQL 查询 API 而不是直接访问您的端点。这在后端创建了很大的灵活性,同时使客户端能够自行做出数据决策。
值得注意的是,您可以向前端开发人员公开 Flux 端点,他们可以编写 Flux 而不是 GraphQL 来获得相同的结果。但是,GraphQL 的优势在于您拥有客户端库和其他工具(如资源管理器),这些工具都构建在其之上。此外,如果您最终连接到其他数据库,这些结果可以在单个 API 调用中连接在一起(即 InfluxDB + Postgres)。