在InfluxDB和Flux中使用GraphQL

导航至

在之前的文章《在Rails应用程序中执行Flux查询》中,我们向您展示了如何在Rails应用程序中执行Flux查询。在这篇文章中,我们将扩展这一知识,以在Rails应用程序中使用GraphQL,该应用程序利用Flux查询InfluxDB的强大功能。这允许客户端精细调整底层Flux查询的结果。您将开始理解当将GraphQL与Flux结合时可以访问的强大功能。

在我们开始之前,让我们谈谈GraphQL以及为什么您会选择它而不是传统的REST API。我强烈建议您在How to GraphQL上完成教程并查阅文档,以更好地了解GraphQL是什么以及如何在您的应用程序中实现它。我在那里学到了很多。

什么是GraphQL?

GraphQL是一种查询语言,允许客户端在单个请求中指定它想要的确切数据,而不是必须多次请求以获取相同的数据或处理数据过度获取或不足获取的问题。它在数据库方面是透明的,因为它本身不直接查询数据库。后端逻辑仍然负责处理(在我们的案例中,Flux查询将承担重负荷)。GraphQL使用解析器来处理传入的查询,然后实现后端逻辑以获取数据。它允许客户端比使用返回预定义数据结构的传统REST API具有更多控制数据获取的能力。

我们正在构建什么?

我们将构建一个后端,当客户端发起GraphQL查询时,它将触发Flux查询我们的InfluxDB数据库。在这种情况下,我们是一家使用传感器捕获有关我们的火车以及其他相关信息(如户外温度)的火车公司。我们希望我们的客户端能够选择它想要检索的确切数据,例如速度和/或轨道温度,而不是过度或不足获取数据。因此,我们将向我们的客户端提供一些可以定制以满足其需求的GraphQL查询。

火车、飞机和数据点

在上次文章中,我使用了名为cpu_load_short的测量值,并带有各种标签和字段。在这篇文章中,我已经在我的 InfluxDB 数据库中种下了名为trains的火车数据。测量值是train_speed。我的标签是driverlocationtrain。我的字段是outdoor_tempspeedtrack_temp。下面你可以看到三位不同司机的数据样本。

下面是一个司机的数据样本。

将注释的 CSV 响应格式化为我们之前在那篇文章中看到的 Flux 查询的方法,在经过一些重构后现在看起来是这样的。它执行与之前相同的功能——除了我现在生成的是一个火车对象而不是 CPU 负载对象。

def parse_trains(response)
 header, *rows = response.body.split("\r\n").drop(3)
 column_names = parse_row(header)
 rows.map do |row|
   values = parse_row(row)
   train_data = column_names.zip(values).to_h
   Train.new(train_data)
 end
end

def parse_row(raw_row)
 raw_row.split(",").drop(3)
end

我还为我的火车对象创建了一个新的 PORO(普通的 Ruby 对象)。

class Train
   attr_reader :time, :measurement, :driver, :location, :train, :outdoor_temp, :speed, :track_temp

   def initialize(train)
      @time = train["_time"]
      @measurement = train["_measurement"]
      @driver = train["driver"]
      @location = train["location"]
      @train = train["train"]
      @outdoor_temp = train["outdoor_temp"]
      @speed = train["speed"]
      @track_temp = train["track_temp"]
   end

end

这个 PORO 在我的parse_trains方法结束时生成,非常有用,因为 GraphQL 晶石(gem)在解析传入的 GraphQL 查询时只需简单地获取相应的实例变量值。

将 GraphQL 添加到 Rails 项目中

我发现How to GraphQLRails 教程非常有用,以开始使用,我建议按照该教程中概述的步骤将graphql 晶石和 GraphiQL 工具添加到您的项目中。

该教程的其余部分使用 Active Record 查询通过 GraphQL 解析器从传统的关联数据库中获取数据。了解其工作原理很有帮助,但如果我们想从 InfluxDB 实例获取数据怎么办?我们不能使用 Active Record,但正如我们在上次文章中所学,我们可以在 HTTP 调用中使用 Flux 查询来获取这些数据(当 InfluxDB v2.0 客户端库准备好时,我们可以使用这些库)。

对于我的新火车数据点,我向我的 InfluxDB 数据库的查询端点发出的 HTTP 请求现在看起来像这样

def query_influxdb(time, filter_param = nil)
 uri = URI.parse("https://127.0.0.1:8086/api/v2/query")
 request = Net::HTTP::Post.new(uri)
 request.content_type = "application/vnd.flux"
 request["Accept"] = "application/csv"
 request.body = flux_query(time, filter_param)

 req_options = {
 use_ssl: uri.scheme == "https",
 }
 response = Net::HTTP.start(uri.hostname, uri.port, req_options) do |http|
   http.request(request)
 end
end

这是一个通用的 HTTP 请求,它被调用以执行每个 GraphQL 查询。然后动态生成一个 Flux 查询并将其传递给其request.body

根据正在解决的 GraphQL 查询,Flux 查询的构造方式如下

def flux_query(time, filter_param = nil)
 %Q[
   #{flux_from}
   |> #{flux_range(time)}
   |> #{flux_filter(filter_param)}
   |> #{flux_pivot}
   |> #{flux_yield}
 ]
end

def flux_from
 "from(bucket:\"trains/autogen\")"
End

# There are two possible options in the range method because for most queries I just
# want a default range of the last 60 days, but in order to search by
# timestamp, I need to set my range to that exact timestamp. Flux
# allows me to do that by choosing an inclusive start time, which is my timestamp,
# and an exclusive stop time, which is one second later.
def flux_range(time)
 if time == TIME
   "range(start: #{time})"
 else
   start_time = Time.parse(time)
   stop_time = start_time + 1.second
   "range(start: #{start_time.rfc3339}, stop: #{stop_time.rfc3339})"
 end
end

def flux_filter(filter_param = nil)
 if filter_param
   "filter(fn: (r) => r.driver == \"#{filter_param}\" and
                      r._measurement == \"train_speed\")"
 else
   "filter(fn: (r) => r._measurement == \"train_speed\")"
 end
end

def flux_pivot
 "pivot(
       rowKey:[\"_time\"],
       columnKey: [\"_field\"],
       valueColumn: \"_value\"
     )"
end

def flux_yield
 "yield()"
end

定义 GraphQL 类型

在遵循How to GraphQLRails 教程中概述的初始设置说明后,我创建了一个新的文件app/graphql/types/train_type.rb,并将以下代码放入其中

# defines a new GraphQL type

Types::TrainType = GraphQL::ObjectType.define do
   # this type is named 'Train'
   name 'Train'

   # it has the following fields
   field :time, types.String
   field :measurement, types.String
   field :driver, types.String
   field :location, types.String
   field :train, types.String
   field :outdoor_temp, types.String
   field :speed, types.String
   field :track_temp, types.String

end

这里到底发生了什么?我在定义一个对象类型。我知道我的 InfluxDB 数据库中充满了火车数据点,测量值是train_speed,标签为driverlocationtrain,字段为speedoutdoor_temptrack_temp。我需要我的 GraphQL 对象类型的形状来模仿我的数据点的形状,以便渲染每个字段。因此,每个 GraphQL 字段都对应于我的 InfluxDB 数据点的列。请注意,Flux 透视函数允许我将 InfluxDB 字段作为它们自己的单独列出现,这使得数据更易于阅读。

除了对象类型外,GraphQL 还包含根类型。根类型包括查询类型和突变类型。由于我们将要查询数据,我们需要建立我们的查询类型,并将所有查询的代码放置在其中。当我们运行 rails generate graphql:install 以进行初始设置时,我们生成了一个名为 app/graphql/types/query_type.rb 的模板文件。我们现在将各种查询放入此文件,这些查询只是定义为字段,类似于对象类型。我的整个文件如下(为了便于阅读,我将上述HTTP调用、解析方法和Flux查询生成方法放在了底部作为私有方法)

require 'net/http'
require 'uri'

TIME = "-60d"

Types::QueryType = GraphQL::ObjectType.define do
 name "Query"

   field :allTrains, !types[Types::TrainType] do
     description "Return all train data"
     resolve -> (obj, args, ctx) {
       parse_trains(query_influxdb(TIME))
     }
   end

   field :trainByDriver, !types[Types::TrainType] do
     argument :driver, !types.String
     description "Find train stats by driver"
     resolve -> (obj, args, ctx) {
       parse_trains(query_influxdb(TIME, args[:driver]))
     }
   end

   field :trainByTime, !types[Types::TrainType] do
     argument :time, !types.String
     description "Find train stats by time"
     resolve -> (obj, args, ctx) {
       parse_trains(query_influxdb(args[:time]))
     }
   end

end

private

def parse_trains(response)
 header, *rows = response.body.split("\r\n").drop(3)
 column_names = parse_row(header)
 rows.map do |row|
   values = parse_row(row)
   train_data = column_names.zip(values).to_h
   Train.new(train_data)
 end
end

def parse_row(raw_row)
 raw_row.split(",").drop(3)
end

def query_influxdb(time, filter_param = nil)
 uri = URI.parse("https://127.0.0.1:8086/api/v2/query")
 request = Net::HTTP::Post.new(uri)
 request.content_type = "application/vnd.flux"
 request["Accept"] = "application/csv"
 request.body = flux_query(time, filter_param)

 req_options = {
 use_ssl: uri.scheme == "https",
 }
 response = Net::HTTP.start(uri.hostname, uri.port, req_options) do |http|
   http.request(request)
 end
end

def flux_query(time, filter_param = nil)
 %Q[
   #{flux_from}
   |> #{flux_range(time)}
   |> #{flux_filter(filter_param)}
   |> #{flux_pivot}
   |> #{flux_yield}
 ]
end

def flux_from
 "from(bucket:\"trains/autogen\")"
end

def flux_range(time)
 if time == TIME
   "range(start: #{time})"
 else
   start_time = Time.parse(time)
   stop_time = start_time + 1.second
   "range(start: #{start_time.rfc3339}, stop: #{stop_time.rfc3339})"
 end
end

def flux_filter(filter_param = nil)
 if filter_param
   "filter(fn: (r) => r.driver == \"#{filter_param}\" and
                      r._measurement == \"train_speed\")"
 else
   "filter(fn: (r) => r._measurement == \"train_speed\")"
 end
end

def flux_pivot
 "pivot(
       rowKey:[\"_time\"],
       columnKey: [\"_field\"],
       valueColumn: \"_value\"
     )"
end

def flux_yield
 "yield()"
end

预期的 allTrains 查询返回所有火车。我的解析器只是返回整个数据集,在幕后,grapqhl 珠宝简单地使用点语法调用方法来访问我在我的 Train PORO 中定义的每个实例变量。

如果我们正确完成了初始设置,我们可以使用 GraphiQL 工具来模拟客户端查询。让我们前往 https://127.0.0.1:3000/graphiql 来测试我们的查询。我可以通过 allTrains 查询返回所有字段,如下所示

我也可以只返回我想要的字段

这就是为什么 GraphQL 如此有用的原因。客户端可以决定它想要返回哪些字段,而不是像传统的 REST API 端点那样返回预先确定的数据集。在后端,我可以为客户端提供一系列选项来获取所需的数据,并且这些选项可以高度自定义。

例如,我可以允许客户端传递一个参数,使用 trainByDriver 查询返回单个司机的所有火车。在这个查询中,转换被传递到 Flux 去完成返回与请求的司机相关联的火车数据的工作,而 GraphQL 负责字段选择。这个查询也具有只返回请求的字段的功能。

由于我们处理的是时间序列数据,允许我们的客户端使用 trainByTime 查询通过单个时间戳进行搜索是有意义的。正如我们在前面讨论的 Flux 查询生成器代码中所看到的,我们再次将转换数据为 Flux 查询的工作推到检索与该时间戳相关联的数据,然后让 GraphQL 返回客户端请求的字段。

你可能可以想出其他对客户端有用的查询,所有这些查询都将提供仅提供客户端明确请求的信息的好处。

总结

GraphQL 是 InfluxDB 数据库的强大伴侣。它允许客户端精确指定所需的信息,避免了数据的过度检索或不足检索。结合 Flux 查询的结果,你现在拥有两个控制点:Flux 查询的范围和筛选函数,以及你向客户端开放的 GraphQL 查询类型。客户端可以简化其代码并避免不必要的代码,尤其是当您将转换工作推送到数据库查询而不是要求客户端进行转换时。此外,由于客户端使用 GraphQL 而不是直接访问您的端点来查询 API,因此不再需要客户端更改其触发的端点,以适应需求的变化。这为后端提供了许多灵活性,同时赋予客户端自主做出数据决策的能力。

值得注意的是,您可以为前端开发者暴露一个 Flux 端点,他们可以编写 Flux 而不是 GraphQL 来获取相同的结果。然而,GraphQL 的优势在于,您拥有建立在它之上的客户端库和其他工具(如探索器)。此外,如果您最终连接到其他数据库,这些结果可以在单个 API 调用中合并(即 InfluxDB + Postgres)。