学习 Flux (#fluxlang) 就像学习 API 一样简单

导航至

Flux (#fluxlang) 是我们正在创建的新数据脚本语言,旨在使查询和分析时间序列和其他类型的数据变得快速而简单。Flux 将能够处理来自 InfluxDB、Prometheus、关系数据库、CSV 文件、S3 以及任何其他类型的 API 或数据源的数据。当我第一次开始谈论 Flux(以前称为 IFQL)时,我们听到的一种担忧是,要求用户学习一种新语言会带来过于陡峭的学习曲线。但是,我认为学习 Flux 的难度并不比学习任何新的 API 更大,甚至可能比使用 SQL 更容易,即使您已经“了解”后者。

在跳到具体的 API 内容之前,我将介绍该语言的基础知识。让我们从一些代码开始介绍 Flux。考虑以下脚本

// Here are some basic parts of the language. This is a comment

// you can assign variables, like a string
s = "this is a string"

// or an int64
i = 1

// or a float64
f = 2.0


// or an object
o = {foo: "bar", asdf: 2.0, jkl: s}
// now access one of those properties
foo = o.foo
asdf = o["asdf"]

// you can also create an object with a shorthand notation
// where key will be the same as variable names
o = {s, i, f}
// equivalent to o = {s: s, i: i, f: f}


// here's an array
a = ["a", "b", "c"]

// here's a duration
d = 2h10m5s
// they're actually represented as seconds, days, and months.
// this is because those units can vary with time zones, Gregorian 
// calendar and all that. Here's one with months and days
d = 1mo7d

// here's a time
t = 2018-08-28T10:20:00Z

// define a function that takes argument named n. Flux only has
// named arguments (no positional ones)
square = (n) => {
  // the standard math operators are in the language
  return n * n
}
// call that function
num = square(n: 23)

// or if a function is one statement you can exclude the braces
square = (n) => n * n

// Now let's do a query. The functions in this query work
// with a stream of data. The stream is made up of tables
// that have columns and records (like in CSV).
// Conceptually, functions are applied to each table in
// the stream

// start with getting data from the InfluxDB server telegraf DB
from(host: "https://localhost:9070", bucket:"telegraf/default")
    // here's the pipe-forward operator. It says to send the
    // output of the previous function to the next one (range)
    // range will filter by time. You can also pass start as
    // a time, so functions have polymorphic arguments
    |> range(start:-1h)

    // now filter specific data. Here we pass an anonymous function.
    // note that brackets and a return statement aren't required if
    // the function is a single statement. Also note that we have
    // comparison and boolean operators
    |> filter(fn: (r) => r._measurement == "cpu" and r.host == "serverA")

    // now group all records from all rows into a table for
    // each region and service that we have. This converts 
    // however many tables we have into one table for each
    // unique region, service pair in the data.
    |> group(keys: ["region", "service"])

    // now compute some aggregates in 10 minute buckets of time.
    // each of these aggregates will get applied to each table.
    // Note that we're passing in an array of function pointers.
    // That is min, max, and mean are all functions that are
    // pipe-forwardable and can be applied to streams.
    |> applyWindow(aggregates: [min, max, mean], every: 10m)

    // And we can iterate over the rows in a table and add or 
    // modify the returned result with the map function. This
    // will add a new column in every table called spread.
    |> map(fn: (r) => {return {spread: r.max - r.min}})

    // Return output of the function chain as a stream called
    // "result". This isn't necessary if only one function
    // chain in the script.
    |> yield(name:"result")

// Finally, here's how we can define a function
// that is pipe-forwardable:
// Filter the stream to a specific set of measurements. This
// can be used with other filter functions and they'll combine.
filterMeasurements = (names, tables=<-) {
  return tables |> filter(fn: (r) => r._measurement in names)
}
// Note that tables=<- defines a parameter called tables that
// can be passed in as a function argument, or pipe-forwarded
// from another function.
// Now we can call to it through a pipe-forward operation
from(bucket:"telegraf/default") |> range(start: 2018-08-27)
    |> filterMeasurements(names: ["cpu", "mem"])
    |> yield(name:"cpu_mem")

// Or we can pass the tables input as an argument like this
filterMeasurements(names: ["cpu", "mem"], tables: (
    from(bucket:"telegraf/default")
        |> range(start:2018-07-27)
    ))
  |> yield(name:"cpu_mem")

这个简短的脚本介绍了该语言的主要语法结构,并展示了从 InfluxDB 查询数据的示例。该语言还有更多内容,但这足以在 Flux 中完成许多事情。其余的学习曲线是纯粹的 API。也就是说,了解存在哪些函数、它们的参数、它们的作用以及它们返回的内容。

实际上,学习 Flux 主要是在学习 API 以完成任务。如果我们选择使用 Lua、Javascript 或带有我们自己定义的函数的 SQL,情况也是如此。对于任何有少量 Javascript 经验的人来说,该语言的语法元素都非常熟悉。其中最奇怪的是管道转发运算符,但这很快就会习惯。

学习 API 意味着学习什么类型的数据输入函数,参数是什么,函数做什么,以及它输出什么。无论用户使用什么语言,这都很重要。具有包含所有这些信息的构建器的用户界面将在很大程度上帮助新用户入门该语言,而他们甚至不必学习这些结构。

从概念上讲,Flux 中的函数可以分为四个主要边界。首先,我们有输入函数,它们从某些源(如 InfluxDB、CSV 文件、Prometheus 或任何地方)读取数据。接下来,我们有函数,它们要么组合结果流中的表,要么将它们分开。group、join 和 window 函数符合此角色。然后,我们有应用于流中每个表的函数,如聚合、选择器和排序。最后,还有输出函数,用于将结果发送到某些数据同步。Yield 将这些结果返回给用户,同时还将有其他输出,用于将结果发送到 InfluxDB、Kafka、S3、文件等。

如果您读到这里,那么您现在对 Flux 的了解程度几乎与您需要的一样,只要您与 API 参考配对,就可以完成大多数查询和数据任务。对于不到 1,000 字的散文和代码来说,还不错。

现在,冒着激怒一群 SQL 爱好者的风险,让我们重新审视一下学习 Flux 与当您已经了解 SQL 时使用 SQL 的想法。在我认识的开发者群体中,他们拥有的 SQL 知识水平差异很大。通常,这取决于他们实际需要编写 SQL 语句的频率。在许多情况下,开发者每天都在使用关系数据库,而根本不编写任何 SQL,因为有像 ActiveRecord 这样的 ORM。他们只是偶尔才编写 SQL,这意味着他们的知识很快就会萎缩。

就我个人而言,我分别在 2001 年和 2008 年学习了 SQL,我可以肯定地说,我忘记的 SQL 比我现在知道的还要多。任何比基本的 SELECT、INNER JOIN、WHERE 和 HAVING 语句更复杂的内容都需要查阅文档和参考资料。因此,当我实际完成任务时,我已经“了解”SQL 这一事实对我几乎没有帮助。我很冒昧,但我认为绝大多数程序员都处于相同的境地。

对于更复杂的时间序列和分析查询,您几乎肯定需要查找 WINDOW存储程序 以及我可能忘记的其他一些内容。再加上 SQL 的语法看起来不像任何其他语言,并且经常看起来像 Yoda 查询数据,即使您已经被介绍过它,您也会得到一些具有真正学习曲线的东西。除非您定期编写查询,否则语法可能需要查阅文档。

我们对 Flux 的目标之一是,即使对于该语言的新手来说,它也应该是可读和可理解的。我们希望开发者在使用该语言本身时尽可能减少认知负荷,以便他们可以将精力集中在思考他们想用数据做什么上。使该语言看起来像许多其他流行的语言是一个特定的设计目标。我们已经在 公开场合 进行了迭代,但我们始终有兴趣听到更多反馈。

我们将在未来几个月内发布更适合所有人使用的早期版本的 Flux。我们将把它包含在 InfluxDB 1.7 的开源版本、InfluxDB 2.0 的 alpha 版本以及可以像 Ruby 或 Python 解释器一样运行的单独的 flux 可执行文件中。同时,您可以查看 Flux 特定问题Flux 语言规范、更多关于 我们构建 Flux 的动机,或 我在六月份伦敦 InfluxDays 大会上做的 Flux 演讲