在 Rails 中进行 Flux 查询

导航至

现在我们已经发布了 Flux,这是一种查询和脚本语言的组合,您可能迫不及待地想在您的应用程序中开始使用它。我们正在构建多种语言的库,但与此同时,您可以开始使用简单的原始 HTTP 请求进行 Flux 查询。在这篇文章中,我们将逐步介绍如何使用 sandbox 来启动和运行堆栈,然后我们将在 Rails 应用程序中使用 Flux 查询我们的数据库。在后续文章中,我们将向我们的 Rails 应用程序添加 GraphQL,以使客户端能够控制它想要从我们的 Flux 查询中检索的信息。

首先,克隆 sandbox repo。接下来,如果您还没有在您的机器上安装 Docker。最快的方法是使用 Homebrew 运行 homebrew cask install docker 。然后您可以使用 spotlight 启动 Docker。一旦您运行了 docker,您可以 cd 到克隆下来的 repo 的文件路径并运行 ./sandbox 以查看 sandbox 中可用的所有命令。

运行 ./sandbox up 将为您加载 sandbox 中最新的稳定版本软件,并将在您的浏览器中打开两个运行 localhost 的选项卡。其中一个选项卡包含 sandbox 的教程和文档,另一个选项卡是 Chronograf 中大部分有趣的事情发生的地方,Chronograf 是 InfluxData 的数据可视化和查询层。使用 sandbox 最好的部分之一是堆栈的所有四个部分(InfluxDB、Chronograf、Telegraf 和 Kapacitor)都自动连接起来以相互通信。

您可以在 Chronograf 中点击查看已经开箱即用的功能,但您来这里是为了学习如何在 Rails 应用程序中使用 Flux 查询 InfluxDB 数据库,所以让我们开始吧!

我们可以使用 InfluxDB CLI 创建我们打算查询的数据库。sandbox 通过允许我们运行 ./sandbox influxdb 来简化此操作。进入 CLI 后,您可以通过运行 CREATE <database name> 来创建您的数据库。

我们也可以使用 Chronograf 创建我们打算查询的数据库。在左侧导航面板中,选择“Admin”图标,然后选择“InfluxDB”。

现在您应该在 InfluxDB sandbox 实例中看到可用的数据库。单击“Create Database”按钮创建您的数据库。

我建议您按照 入门 文档中的步骤,使用行协议将一些数据种子到其中(您也可以使用 InfluxDB API 将多个点写入您的数据库)。我按照文档创建了“mydb”数据库,并使用几个 cpu 负载值对其进行了播种。

您现在可以导航到 Chronograf 的 Explore 选项卡,实际查看您创建的数据库,以及您插入其中的值。您还可以通过选择 Flux 选项(而不是默认的 InfluxQL)开始使用 Flux 查询您的数据库。

请注意,Flux 在 sandbox 中默认启用,因此您无需更改 influxdb.conf 文件即可启用它。

在下面的屏幕截图中,我使用 Flux 查询我的“cpu_load_short”测量中的所有点。在我的例子中,“cpu_load_short”测量有两个标签“host”和“region”,以及一个字段“value”。我在此测量中只有 31 个点,所以这没什么大不了的,但作为一般规则,您可能不会在 InfluxQL 中运行 SELECT * 或在 Flux 中运行 |> group(by: ["_measurement"]) 。我在我的 Flux 查询中添加了“group”函数,以便我的所有条目在 Chronograf 中以相同的表格视图直观地显示,但如果没有“group”函数,您仍然会获得所有点,并且当您切换“查看原始数据”时可以看到它们。我在这里使用它仅仅是为了说明一个简单的小数据集上的 Flux 查询。

现在到了有趣的部分。让我们在 Rails 应用程序中运行相同的 Flux 查询!Flux 文档为我们提供了一个很好的起点,因为我们只是要发出原始 HTTP 请求(至少在客户端库准备好之前)。

根据 文档,我们可以简单地查询 InfluxDB 的查询端点。我们将向 /api/v2/query 端点发出 POST 请求。我们的请求将将 accept 标头设置为 application/csv,并将 content-type 标头设置为 application/vnd.flux。我们收到的响应将采用带注释的 CSV 格式。文档为我们提供了一个 curl 示例,我们可以将其粘贴到 curl-to-ruby 以转换为 Ruby 的 net/http。让我们使用 Flux 文档中的示例来看看它是如何工作的

它真的就这么简单。因此,要进行我的查询,我只需要将 net/http 和 uri gems 添加到我的 Rails 应用程序,并将上面的 request.body 替换为我的 Flux 查询,并正确格式化以考虑字符串中的多个字符串。

我将 HTTP 请求封装在一个看起来像这样的方法中,基于上面 curl-to-ruby 转换的格式。您可以将类似这样的内容放在您的应用程序中最合适的任何位置。因为我也在我的应用程序中构建 GraphQL,所以我将我的代码放在与我的查询类型相同的文件中,但稍后将其重构为 helper 可能更有意义

def all_cpu_loads
 uri = URI.parse("http://localhost:8086/api/v2/query")
 request = Net::HTTP::Post.new(uri)
 request.content_type = "application/vnd.flux"
 request["Accept"] = "application/csv"
 request.body = "from(bucket:\"mydb/autogen\")
 |> range(start:-30d)
 |> filter(fn: (r) => r._measurement == \"cpu_load_short\")
 |> group(by: [\"_measurement\"])"

 req_options = {
 use_ssl: uri.scheme == "https",
 }

 response = Net::HTTP.start(uri.hostname, uri.port, req_options) do |http|
   http.request(request)
 end
end

看到我的请求正文了吗?我实际上是从我在 Chronograf 中生成的上述查询中剪切并粘贴了它,并简单地修复了字符串格式并添加了实际的时间范围(因为我的应用程序显然不知道 Chronograf 变量 dashboardTime)。

如果我们查看该请求的 response.body,它的带注释的 CSV 格式非常混乱。我们可以使用 PORO(普通旧 Ruby 对象)和一个简单的解析方法来清理它。在我的例子中,我知道我的“cpu_load_short”测量中的每个点(每个“负载”,可以这么说)都有一个时间戳、一个值、一个字段、一个测量(每个点的“cpu_load_short”)、一个主机和一个区域。我希望能够在一个整洁的包中访问每个属性,所以我已将每个点转换为哈希并将其放入一个新的“Load”对象中

def formatted_response(response)
 column_names = response.body.split(",0,")[0].split("\r\n")[3]
 column_names.slice!(",result,table,_start,_stop,") 
 arrayed_column_names = column_names.split(",")

 entries = response.body.split(",0,").drop(1)
 formatted_entries = entries.map do |entry|
   entry.gsub(/\r\n?/, "").split(",").drop(2)
 end
 hashed = formatted_entries.map do |load|
   arrayed_column_names.zip(load).to_h
 end
 hashed.map do |hash|
   Load.new(hash)
 end
end

这为我留下了一个不错的 Load 对象数组,当我需要在以后实现 GraphQL 时,它会派上用场。作为参考,我的 Load 看起来像这样

class Load
   attr_reader :time, :value, :field, :measurement, :host, :region

   def initialize(load)
      @time = load["_time"]
      @value = load["_value"]  
      @field = load["_field"]
      @measurement = load["_measurement"]
      @host = load["host"]
      @region = load["region"]
   end

end

就是这样!在我们的客户端库准备好之前,这就是您如何在 Rails 应用程序中实现任意数量的 Flux 查询并从查询结果中呈现可用对象的方法。当库发布后,我们将创建另一篇文章,向您展示如何使用库函数替换原始 HTTP 请求。在后续文章中,我们将向您展示如何使用 GraphQL 来允许客户端从您的 Flux 查询结果中查询它想要的确切信息。 Fluxing 愉快