Flux窗口和聚合

导航到

今天,我们要谈论查询。具体来说,我们要谈论Flux查询,这是InfluxData正在开发的新语言。您可以阅读我们为什么决定编写Flux,并查看Flux的技术预览

如果您是InfluxDB用户,您可能正在使用InfluxQL编写查询,并且您可以继续编写,只要您喜欢。然而,如果您正在寻找替代方案或者遇到了InfluxQL的一些边界,那么现在是开始学习Flux的时候了。

我的最佳作品

在过去的一年里,我一直在学习InfluxQL,所以我并不一定想再给我的心理负担增加另一个DSL。毕竟,我们以开发者的幸福而自豪,所以我想确保学习Flux是a)简单和b)值得花费时间去学习的。

我将通过学习如何窗口和聚合数据来测试这一点,这些数据我已经知道如何在InfluxQL中做了(我们将在最后介绍完整的InfluxQL查询)。

数据

因为我非常时尚,所以我将使用一个将加密货币交易所信息路由到InfluxDB的脚本。像大多数人一样,我想了解加密货币的趋势,以便我知道是否应该后悔没有购买。

问题

我们可以获取关于加密货币的大量信息,但我只关心几件事情

  1. 卖家要求什么?
  2. 买家愿意支付多少?
  3. 交易在什么价格发生?

本质上,我想知道实际交易价格与预期相差多少。如果交易价格远低于卖家支付的金额,那么整个加密货币的荒唐行为就结束了,我可以继续前进。如果交易价格远高于买家愿意支付的金额,那么当然我理解了加密货币狂热——我一直说加密货币是未来。

我们需要找出表示我们想要的结果的最佳方式。让我们用Flux来整理数据。

解决方案

以下是我正在收集的数据样本,我在Chronograf的“原始数据”视图中收集这些数据。

关于这些数据,重要的是什么?最上面一行显示数据类型,这总是很重要的,但在这种原始数据视图中进行调试尤其有用。第二行“group”,表示哪些数据是分组键,这是一个列表,其中每一行都具有相同的值(稍后会有更多介绍)。

还有一些需要理解的Flux特定列。在表模式中,有四个通用列:“_time”(记录的时间戳)、“_start”(所有记录的包含下限时间边界)、“_stop”(所有记录的排除上限时间边界)和“_value”(记录的值)。在我们的例子中,起始时间是48小时前,结束时间是现在减去48小时。

其他列代表我为加密货币数据定义的模式:“exchange”和“symbol”是标签,“_field”和“_measurement”代表该记录的具体字段和测量值。

为了开始编写查询,我阅读了这篇Flux指南,它解释了存储桶管道前进运算符的概念。本质上,每个Flux查询都需要三样东西:数据源、时间范围和数据过滤器。

根据指南,我开始编写这个查询

from(bucket: "crypto/autogen")
  |> range(start: -48h)
  |> filter(fn: (r) => r._measurement == "prices" and (r._field == "last" or r._field == "lowestAsk" or r._field == "highestBid"))

第一行是我的数据源,第二行是时间范围,接下来的一切都是数据过滤器。

这个查询要求我的数据库“crypto”提供与测量“prices”相关的特定字段:过去48小时内最后成功的交易价格(“last”)、卖家愿意接受的最低价(“lowestAsk”)和买家愿意支付的最高价(“highestBid”)。

这是一个好的开始,但结果太多了。结果看起来与上面的样本相同——但Chronograf显示了一个警告:“响应被截断到前103K行。”

在数据可视化方面,我们不想做不必要的麻烦。市场上有很多加密货币,即使将数据限制在48小时内,仍然太多以至于无法解析,更不用说可视化。说实话,我不关心所有的货币——尽管我喜欢狗狗币的想法,但比特币是趋势。

让我们看看只针对比特币的相同数据。

from(bucket: "crypto/autogen")
  |> range(start: -48h)
  |> filter(fn: (r) => r._measurement == "prices" and (r._field == "last" or r._field == "lowestAsk" or r._field == "highestBid"))
  |> filter(fn: (r) => r.symbol == "USDT_BTC")

这些结果更容易管理。现在我们可以更容易地检查结果。浏览原始数据,我们可以看到这个查询返回了三个不同的表(0-2):每个系列一个。

表0代表符号(比特币)、交易所(Poloniex)、测量(prices)和字段(highestBid)的唯一组合。

表1代表符号(比特币)、交易所(Poloniex)、测量(prices)和字段(last)的唯一组合。

表2代表符号(比特币)、交易所(Poloniex)、测量(prices)和字段(lowestAsk)的唯一组合。

在这种情况下,只有字段在变化,这限制了系列的数量。以下是Chronograf中折线图的结果。

数据窗口化

我们不一定需要看到结果中每个数据点的表示,特别是如果我们对趋势和随时间的变化感兴趣。在这种情况下,我们想要窗口化和汇总我们的数据,这意味着我们想要按某个时间窗口分割我们的输出,然后找出该窗口的平均值。首先,我跳到了官方Flux文档中的 window() 函数,以添加Flux查询的最后一行。

from(bucket: "crypto/autogen")
  |> range(start: -48h)
  |> filter(fn: (r) => r._measurement == "prices" and (r._field == "last" or r._field == "lowestAsk" or r._field == "highestBid"))
  |> filter(fn: (r) => r.symbol == "USDT_BTC")
  |> window(every: 1h)

让我们看看这对原始数据有什么改变。

首先,尽管在上面的屏幕截图中我们看不到,但“_start”和“_stop”列已改变以反映时间边界。如果我将鼠标悬停在“_start”和“_stop”上,我会看到以下时间戳。

2019-01-03T18:00:00Z                   2019-01-03T19:00:00Z

在我们之前的时间是从48小时前开始的,现在停止(不是现在,但你知道,now()),我们的起始和停止时间已改变,以反映我们查询中指定的窗口时间。

按表列顺序,我们有表0-23。如果你认为这个数字应该更高,你是正确的,但我们由于故障而丢失了一些数据。每个表代表每个唯一系列的一个小时窗口(就像上面表格下列出的组合一样)。

现在看看Chronograf中我们的折线图有什么不同。

图中的每种颜色代表一个不同的时间窗口;在我们的情况下,每种颜色代表一个一小时的窗口。这种特定的可视化帮助我理解了Flux的输出。Flux返回基于时间窗口的数据表——尽管我们在视觉上并排看到它们,不同颜色区域的图中的数据点完全位于单独的表中,因为它们是不同的时间序列。记住,图中的每个时间序列都得到自己的唯一颜色,并且颜色突出了每个序列的开始和结束时间。

数据汇总

当与窗口结合使用时,Flux中的聚合函数略有不同。当我们将如mean() 这样的函数应用于当前查询时,该函数应用于每个1小时窗口,将每个返回的表减少到包含平均值的一行。返回的表的数量保持不变,但表中只包含一个包含聚合值的行。

from(bucket: "crypto/autogen")
  |> range(start: -48h)
  |> filter(fn: (r) => r._measurement == "prices" and (r._field == "last" or r._field == "lowestAsk" or r._field == "highestBid"))
  |> filter(fn: (r) => r.symbol == "USDT_BTC")
  |> window(every: 1h)
  |> mean()

我们的原始数据视图显示,每个表是每小时每个字段的单行数据,这是好的,因为这意味着mean()做了我们期望的事情。我们表中的列有所不同:我们不再有一个“_time”列,因为此数据摘要没有个别的时间戳。

我必须研究的一个区域是mean()的默认值。当我们没有指定我们正在平均什么时,它默认为“_value”。每个Flux表都包括一个“_value”列,并且当我们执行如mean这样的聚合时,我们转换那里列出的值。

现在我们已经有了所有需要的平均值,我们想要我们的数据以更少的表返回。在Flux中,我们可以使用group() 函数取消窗口化数据,以通过使用相关值进行组合。

from(bucket: "crypto/autogen")
  |> range(start: -48h)
  |> filter(fn: (r) => r._measurement == "prices" and (r._field == "last" or r._field == "lowestAsk" or r._field == "highestBid"))
  |> filter(fn: (r) => r.symbol == "USDT_BTC")
  |> window(every: 1h)
  |> mean()
  |> group(columns: ["_time", "_start", "_stop, "_value"], mode: "except")

我们回到了3个表(0-2),这些表给出了完成交易价格的小时平均值、卖家的最低要价和买家的最高出价。

调用group()函数以几种重要的方式改变了我们的数据。记得我提到我们会回到group keys吗?这就是它!通过分组数据,我们定义了group keys。当我们按照“_field”和“_measurement”(排除了“_time”、“_start”、“_stop”和“_value”后的列)进行分组时,我们定义的group key为[_field, _measurement]。在上面的截图中,组行中“_field”和“_measurement”都列出了true。在实践中,这意味着我们查询的结果输出了一个表格,针对“_field”和“_measurement”的唯一组合。

这种对数据进行分组和定义group keys的过程表示了我们的数据去窗口化。

关于group()的注意事项自社区开始使用Flux以来,group()函数的语法已发生变化。原始的group()语法如下

|> group(by: ["_field", "_measurement"])

这是我们的Chronograf可视化中重新分组的数据看起来像什么。

这个Chronograf可视化以简单的方式让我们看到过去两天比特币市场的趋势。如果我有多一些数据,我就可以进行完整的历史分析,真正地了解我是否错过了我的加密机会。

更好的查询

我们编写的Flux查询正好符合我的需求,但它有点长。当我进一步调查时,我发现了一个辅助函数,它可以隐藏一些更复杂的逻辑。

from(bucket: "crypto/autogen")
  |> range(start: -48h)
  |> filter(fn: (r) => r._measurement == "prices" and (r._field == "last" or r._field == "lowestAsk" or r._field == "highestBid"))
  |> filter(fn: (r) => r.symbol == "USDT_BTC")
  |> aggregateWindow(every: 1h, fn:mean)

aggregateWindow()函数包括对数据的窗口化和去窗口化,这样我们在编写查询时就不必担心数据的格式。我喜欢一个好的辅助函数,尤其是当它的功能和语法都很清晰的时候。返回的数据看起来完全一样,而且我们节省了一些代码。

总结

我们已经编写了我们想要的Flux查询 - 现在让我们将其与用InfluxQL编写的相同查询进行比较。

SELECT 
mean("highestBid") AS "mean_highestBid", 
mean("last") AS "mean_last", 
mean("lowestAsk") AS "mean_lowestAsk" 
FROM 
"crypto"."autogen"."prices" 
WHERE 
time > now() - 48h 
AND 
"symbol"='USDT_BTC' 
GROUP BY 
time(1h) 
FILL(none)

这个InfluxQL查询仍然可读,尤其是如果你熟悉SQL。鉴于我们的查询并不复杂,Flux和InfluxQL中的查询最终非常相似。我特别喜欢在Flux中使用相对时间范围,因为它既简洁又易于阅读。

Flux - 喜欢的

可读性是我选择编程语言的重要部分,Flux在这方面得到了A。我喜欢能够解析查询,即使我不是Flux专家。Flux的函数特性使我能够根据需要不断转换返回的数据,这对于我们处理的大型数据集来说特别方便。

此外,因为技术预览已经发布,我可以在社区网站上询问其他用户遇到的问题和解决方案。

Flux - 不喜欢的

对我来说,只有一个不喜欢的,那就是理解返回数据的形状。我花了一些时间才明白返回数据的表格是如何形成的,为什么它们以表格的形式返回。话虽如此,我认为这并不是Flux的缺陷,而是我事先阅读的概念概述不够。理解表结构对于编写好的Flux查询至关重要,因此我们需要确保有更多的资源来解释这些概念。

结论

我们做到了!我们编写了一个有用的Flux查询,可以让我们比较比特币的询问价和售价。好消息是,我认为我没有任何加密货币资金也还好。可能吧。除非它的价值再次上涨。虽然我们可能没有对加密货币得出任何有意义的结论,但使用Flux探索我的数据让我能够轻松地进行有意义的比较。

我和我的想象中的财富