TL;DR InfluxDB 技术提示:连接操作

导航至

如果您是 InfluxDB 用户,您几乎肯定使用过 join() 函数。该函数执行两个表流的内连接。它最常用于执行度量之间的数学运算。然而,现在它已被废弃,转而使用新的 join.inner() 函数,该函数是新的 join 包 的一部分。随着 join 包的添加,Flux 现在能够执行以下类型的连接

A-visualization-of-different-types-of-joins

文章 中展示了不同类型的连接可视化。

在本 TL;DR 中,我们将介绍执行不同类型连接的语法。然后我们将回答社区提出的问题,这些问题之前因为没有多类型连接支持而无法解决。

我强烈推荐您阅读此篇文章,以及此篇,以便完全理解 Flux 中的连接包。

Flux 中的连接语法

Flux 中的连接非常强大,因为您可以根据多个列创建条件和自定义函数来执行不同类型的连接。我们将使用 array.from() 函数构造以下数据来执行连接。

left = array.from(rows: [
              {_time: 2022-01-01, label: "a", _value: 1},
              {_time: 2022-01-02, label: "b", _value: 2},
              {_time: 2022-01-03, label: "d", _value: 3}
])

right = array.from(rows: [
              {_time: 2022-01-01, id: "a", _value: 0.4, foo: "bar"},
              {_time: 2022-01-02, id: "c", _value: 0.5, foo: "bar"},
              {_time: 2022-01-03, id: "d", _value: 0.6, foo: "bar"}
])

左连接

让我们来看一个 左连接 的示例,以熟悉语法和 Flux 中连接的强大功能。

Left-Join

对于此连接,我们假设我们已经查询了两个表流,并分别将这些流命名为 left_tableright_table。连接函数的 leftright 参数分别指定左和右输入流。每个表都有一个包含相同时间戳的 _time 列。左表还有一个 label 列,右表有一个 id 列。当以下两个条件满足时,我们想要执行左连接

  1. _time 列的值在这两个表中匹配
  2. label 和 id 列的值在这两个表中匹配。

这些条件通过连接函数中的on参数指定。on参数必须是一个比较左右表流属性的单一布尔表达式。

最后,我们通过as参数指定我们想要返回的内容。as参数是一个用户定义的函数,它必须接受一个左流和一个右流并返回一个记录。我们决定要包含以下列

  1. join_time列:来自左表流的_time列。
  2. join_label列:来自左表流的label列。
  3. v_left:来自左表流的_value列。
  4. v_right:满足由on参数指定的条件的左表流的_value列。
  5. foo:来自右表流的foo列。

然而,你可能不想显式命名每个列或像上图所示返回每个列。相反,你可以使用with子句

join.left(
    left: left_table,
    right: right_table,
    on: (l, r) => l._time == r._time and l.label == r.id,
    as: (l, r) => ({l with v_right: r._value, foo: r.foo}),
)

此查询将返回与上述相同的表,但列名略有不同:_time, label, _value, v_right, foo。这种语法的优点是提供了语法简短性。缺点是,如果你想要特定的列名,你可能需要重命名列。此外,当你在连接分组数据时,这种方法变得尤为重要。with运算符确保你不丢失任何组键列。总的来说,我推荐这种方法。

右连接

类似地,对_time列、labelid列的右连接将如下所示

Right-Join

注意join_label列已更改为包含来自右表流的id列的值。类似地,v_leftv_right和foo列的值也发生了变化,以反映右连接。对于join_labeljoin_time列,我们必须从右表流的id列和_time列中提取。

然而,你可能不想显式命名每个列或返回每个列。请注意,你也可以使用with子句

join.left(
    left: left_table,
    right: right_table,
    on: (l, r) => l._time == r._time and l.label == r.id,
    as: (l, r) => ({r with l_right: l._value, foo: r.foo}),
)

此查询将返回与上述相同的表,但列名略有不同:_timelabel_valuel_rightfoo。这种语法的优点是提供了语法简短性,缺点是,如果你想要特定的列名,你可能需要重命名列。此外,当你在连接分组数据时,这种方法变得尤为重要。with运算符确保你不丢失任何组键列。总的来说,我推荐这种方法。

全连接

让我们看看一个更复杂的例子,在as参数中使用函数全连接定义函数。

Full-Join

我们在与as参数的连接中指定我们想要返回的内容。由于我们想要执行全外连接,我们使用条件逻辑存在运算符从右侧和左侧的表中抽取时间戳和标签值。最后,我们返回我们想要的列,包括时间、标签和左右值。

内连接

遵循其他连接示例,让我们在_time列以及labelid列上的数据上执行内连接

Inner Join

新的内连接与旧的内连接工作方式类似。例如,以下两个查询将是等效的

// use an alias to distinguish between the universe join function and the functions that are a part of the join package
import j "join"
// new join 
j.inner(
    left: left_table,
    right: right_table,
    on: (l, r) => l._time == r._time,
    as: (l, r) => ({l with v_right: r._value, foo: r.foo, id: r.id}),
)
|> yield(name: "new")
// deprecated join (the table output is identical aside from different column names)
join(tables: {left_table: left_table, left_table: right_table}, on: ["_time"])
|> yield(name: "deprecated")

生成以下表格

_time _value v_right foo id label
time1 1 0.4 bar c c
time2 2 0.5 bar b a
time3 3 0.6 bar d d

然而,新连接包的一个强大组件是能够在多个列上执行连接,如前所述。相比之下,如果您尝试在_time列和_id列上执行连接,您将无法返回结果。换句话说,以下查询将不会返回结果

join(tables: {left_table: left_table, left_table: right_table}, on: ["_time","id"])

这确实带来了一些有趣的用例。用户可能并不总是需要在多个列上执行连接。为了简化此用例的语法,Flux团队创建了join.time()函数,仅使用_time列进行连接。

按时间连接

join.time()函数始终在_time列上独家连接表。这样,您不需要为on参数提供值。相反,您只需提供连接method类型和as参数的函数。例如,以下连接函数将是等效的

// use an alias to distinguish between the universe join function and the functions that are a part of the join package
import j "join"
// inner join on _time 
j.inner(
    left: left_table,
    right: right_table,
    on: (l, r) => l._time == r._time,
    as: (l, r) => ({l with v_right: r._value, foo: r.foo, id: r.id}),
)
|> yield(name: "inner join on _time")
// time join 
j.time(left: left_table, right: right_table, as: (l, r) => ({l with v_right: r._value, foo: r.foo, id: r.id}), method: "inner")
// deprecated join (the table output is identical aside from different column names)
join(tables: {left_table: left_table, left_table: right_table}, on: ["_time"])
|> yield(name: "deprecated")

此结果与上面的表格相同

_time _value v_right foo id label
time1 1 0.4 bar c c
time2 2 0.5 bar b a
time3 3 0.6 bar d d

带有null列注入和条件连接null值的示例

连接包的一部分力量在于能够在多个列上执行多种不同的连接。同样重要的是能够指定您自己的自定义as函数,并使其尽可能复杂。您甚至可以预测您尝试连接的系列中的变化,以适应您有缺失列的实例。例如,想象我们想要定期将数据作为任务的一部分进行连接。然而,一个表中的一个列的存在是稀疏的。我们可以使用null值注入和条件逻辑的组合来执行连接。这个解决方案是为了避免像runtime error @31:12-39:3: left: table is missing label "column name"这样的错误。让我们看一下以下示例

table1有三个列,其中foo和bar列有一些null值

on foo bar
1 1
2 3

注意table2有一个额外的列yeet,而table1中没有

on foo bar yeet
1 0 4 42
2 2 5 42

想象我们想要执行左连接。我们需要在函数参数中使用条件逻辑来指定在table1的foo和bar列中存在null值时的返回内容。我们还需要在map()函数中使用debug.null()函数来创建一个包含null值的空列,以便连接不会失败。

on foo bar yeet
1 1 4 42
2 2 3 42

用以下Flux代码亲自试一试

import "experimental/array"
import "array"
import "join"
import "internal/debug"

table1 = array.from(rows: [{pivot: 1, baz: "foo"}, {pivot: 3, baz: "bar"}])
|> pivot(rowKey:["pivot"], columnKey: ["baz"], valueColumn: "pivot")
|> map(fn: (r) => ({ r with on: 1 }))
|> cumulativeSum(columns: ["on"])
// ignore the code before this line, this is just to create a good example. 
// use the debug.null function to create a new column with null values so the join doesn’t fail. 
|> map(fn: (r) => ({ r with  yeet: debug.null(type: "string") }))
|> yield(name: "table1")

table2 = array.from(rows: [{on: 1, foo: 0, bar: 4, yeet: "42"}, {on: 2, foo: 2, bar: 5, yeet: "42"}])
|> yield(name: "table2")

joined = join.left(left: table1, right: table2, on: (l, r) => l.on == r.on, as: (l, r) => {
  return {
    foo: if exists l.foo then int(v: l.foo) else int(v: r.bar),
    yeet: if exists l.yeet then int(v: l.yeet) else int(v: r.yeet),
    bar: if exists l.bar then int(v: l.bar) else int(v: r.bar),
  }
})

joined |> yield(name: "joined")

使用新 Join 包解决旧问题

现在我们了解了新 Join 包的语法,让我们来回答一些以前无法解决的老问题。

问题和解决方案 1

用户希望在不同时间戳下比较两个标签键的不同字段值。他们有一个字段,价格,以及两个标签值,Bookie 1 和 Bookie 2。这些数据在第一个表中显示。但是请注意,用户已经将字段键和字段值旋转,以便更容易地表示他们的数据(就像已经应用了 schema.fieldsAsCol() 函数一样)。接下来,他们想旋转他们的数据以获得第二个表。最后,他们想过滤数据并执行外连接。数据回填在第三个表中给出了用户期望的结果。

_time 价格 博彩公司
18:50 20 Bookie1
18:55 18 Bookie2
19:00 21 Bookie1
19:05 20 Bookie2
_time Bookie1 Bookie2
18:50 20
18:55 18
19:00 21
19:05 20
_time Bookie1 Bookie2
18:50 20 0
18:55 20 18
19:00 21 18
19:05 21 20

让我们看看可以用来实现这一点的 Flux 脚本

import "array"
import "join"

//create the data. We use arbitrary timestamps. 
data =
    array.from(
        rows: [
              {_time: 2022-01-01T00:00:00Z, Price: 20, Bookmaker: "Bookie1"},
              {_time: 2022-02-01T00:00:00Z, Price: 18, Bookmaker: "Bookie2"},
              {_time: 2022-03-01T00:00:00Z, Price: 21, Bookmaker: "Bookie1"},
              {_time: 2022-04-01T00:00:00Z, Price: 20, Bookmaker: "Bookie2"},
        ],
    )

// Pivot the data to get the second table 
data
  |> pivot(rowKey:["_time"], columnKey: ["Bookmaker"], valueColumn: "Price")
  |> fill(column: "Bookie1", usePrevious: true)
  |> fill(column: "Bookie2", usePrevious: true)
  |> yield(name: "solution before joins")

// filter for the right values to prepare for the join. This filter could also be performed before the pivot and we could pivot for each Bookmaker tag value. 

left = data 
|> filter(fn: (r) => r.Bookmaker == "Bookie1")

right = data 
|> filter(fn: (r) => r.Bookmaker == "Bookie2")

// join the left and right tables together and fill the missing timestamps. 
join.full(
    left: left,
    right: right,
    on: (l, r) => l._time == r._time,
    as: (l, r) => {
        time = if exists l._time then l._time else r._time
        return {_time: time, Bookie1: l.Price, Bookie2: r.Price}
    },
)
  |> fill(column: "Bookie1", usePrevious: true)
  |> fill(column: "Bookie2", usePrevious: true)
  |> yield(name: "solution after joins")

然而,使 Flux 如此强大的部分在于您可以以多种方式解决问题。您不必使用连接来解决这个问题。您可以使用 debug.null() 函数执行以下查询来解决问题 1

import "array"
import "internal/debug"

data =
    array.from(
        rows: [
              {_time: 2022-01-01T00:00:00Z, Price: 20, Bookmaker: "Bookie1"},
              {_time: 2022-02-01T00:00:00Z, Price: 18, Bookmaker: "Bookie2"},
              {_time: 2022-03-01T00:00:00Z, Price: 21, Bookmaker: "Bookie1"},
              {_time: 2022-04-01T00:00:00Z, Price: 20, Bookmaker: "Bookie2"},
        ],
    )
data 
|> map(
        fn: (r) => {
            bookie1Value = if r.Bookmaker == "Bookie1" then r.Price else debug.null(type: "int")
            bookie2Value = if r.Bookmaker == "Bookie2" then r.Price else debug.null(type: "int")

            return {_time: r._time, bookie1: bookie1Value, bookie2: bookie2Value}
        },
    )
    |> fill(column: "bookie1", usePrevious: true)
    |> fill(column: "bookie2", usePrevious: true)

这是一个首选的解决方案,但以前的解决方案帮助我们理解连接是如何工作的,并突出了 Flux 的多功能性。

问题和解决方案 2

这位用户有一个事件标签。他们希望筛选出在事件发生时间和每次事件记录后 10 秒之间写入的数据。

Problem-and-Solution-2

import "array"
import "join"
import "experimental"

// our event data or StartData
startData = 
    array.from( 
        rows: [{_time: 2022-01-01T00:00:00Z, _value: 2, event: "start"},
               {_time: 2022-01-01T00:00:15Z, _value: 2, event: "start"}
        ]
)
// we add a stop column, “myStop” with a timestamp 10 seconds after each event timestamp.  
  |> map(fn: (r) => ({ r with myStart: r._time }))
  |> map(fn: (r) => ({ r with myStop: experimental.addDuration(d: 10s, to: r._time) }))

// the data we want to filter for based on the startData start and stop times. In this example we expect to filter out all the data where the timestamps are outside of 0-10s and 15-25s, or where the value is equal to 0. 
data =
    array.from(
        rows: [
              {_time: 2022-01-01T00:00:03Z, _value: 1},
              {_time: 2022-01-01T00:00:05Z, _value: 1},
              {_time: 2022-01-01T00:00:08Z, _value: 1},
              {_time: 2022-01-01T00:00:12Z, _value: 0},
              {_time: 2022-01-01T00:00:17Z, _value: 1},
              {_time: 2022-01-01T00:00:18Z, _value: 1},
              {_time: 2022-01-01T00:00:20Z, _value: 1},
              {_time: 2022-01-01T00:00:23Z, _value: 1},
              {_time: 2022-01-01T00:00:27Z, _value: 0},
              {_time: 2022-01-01T00:00:30Z, _value: 0}
        ],
    )

// perform a full join
join.full(
    left: startData,
    right: data,
    on: (l, r) => l._time == r._time,
    as: (l, r) => {
        time = if exists l._time then l._time else r._time
        value = if exists l._value then l._value else r._value

        return {_time: time, value: value, myStart: l.myStart, myStop: l.myStop}
    },
)
|> fill(column: "myStart", usePrevious: true)
// finally filter for data that’s 10 seconds after each event. 
|> filter(fn: (r) => r._time >= r.myStart and r._time <= r.myStop)

请注意,这个解决方案有一些错误。如果您目前正在尝试解决此类问题,请参与此 问题 #5127

问题和解决方案 3

这位用户希望识别数据中的不规则季节性。

Problem-and-Solution-3

为了解决这个问题,我们将使用来自 NOAA 样本数据集的前 400 个点,并将数据分组到不同的时间段中。以下是原始数据的外观

groupped data into separate periods

然后我们使用 Flux 将数据分组到时间段并生成以下结果

use Flux to group the data into periods

以下是一般方法

  • 计算导数(der)。
  • der 结果向前移动一个时间戳(der_shift)并将它们连接起来 local_min_max
  • 筛选出斜率符号改变的时刻。
  • 使用 elapse() 函数来处理导数实际上等于 0 的情况,或确保斜率符号的改变发生在局部最小值或最大值。
  • 在发生斜率符号改变的时间戳上添加索引。
  • 执行外连接并填充,这样我们就可以确定原始数据中的哪些值属于哪个“时间段”。
  • 按标签分组以可视化我们的不同“时间段”。
import "math"
import outer "join" 
// data is written 6 minutes apart 
data = from(bucket: "noaa")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "h2o_feet")
  |> filter(fn: (r) => r["_field"] == "water_level")
  |> filter(fn: (r) => r["location"] == "coyote_creek")
  |> keep(columns: ["_time", "_value"])  
  |> limit(n: 400)
//   |> yield(name: "raw")

// find the derivatives
der = data 
// unit for the time in between timestamps
|> derivative(unit: 6m)
|> keep(columns: ["_time", "_value"])
// |> yield(name: "der")

// shift the derivatives by one interval 
der_shift = der
|> timeShift(duration: 6m)
|> keep(columns: ["_time", "_value"])
// |> yield(name: "der_shift")

// join the derivative and shifted derivative data together to find where the derivative changes from positive to negative between the original and shifted derivatives 
local_max_min = join(tables: {der: der, der_shift: der_shift} , on: ["_time"])
// filter to find where the slope changes
|> filter(fn: (r) => r._value_der >= 0.0 and r._value_der_shift <= 0.0 or r._value_der <= 0.0 and r._value_der_shift >= 0.0 )
// to account for when the derivative does == 0 and define local minima/maxima points from either side of crossing from pos to neg or neg to pos
// also helps us make sure that local maxima and minima have adequate time between them. That they’re true local maxima and minima. 
|> elapsed(unit: 6m)
|> filter(fn: (r) => r.elapsed > 1)
// |> yield(name: "local maxima/minima total of 5 points here")
// use the map function to assign a numerical value to group on later after we join with the original data 
|> map(fn: (r) => ({r with period_group: 1.0}))
|> cumulativeSum()
|> keep(columns: ["_time", "period_group"])
// join the local maxima and minima with the original data and join based off of the period group 
outer.time(left: data, right: local_max_min, as: (l, r) => ({l with label: r.period_group}), method: "full" )
|> fill(column: "label", usePrevious: true)
|> group(columns: ["label"], mode:"by")
// |> yield(name: "_result")

结论

上述所有问题都来自论坛上的问题。您可以在这里自由查看原始讨论

最后,也值得注意以下与连接包相关的问题

  1. #5135: join.tables() 方法:“full”与空表返回“table is missing label _value”
  2. #5127: 外连接和填充 + 使用Previous时恐慌

如果您遇到这些错误中的任何一种,我鼓励您对问题进行评论,以帮助团队更好地了解问题并快速解决。

我希望这篇博客文章能激发您利用Flux中的新连接包来处理迟到数据的优势。如果您正在寻找需要连接操作的问题,我鼓励您尝试这些函数中的一些。如果您无法解决问题,请通过我们的社区网站Slack频道寻求帮助。我很乐意了解您试图实现的目标以及您希望InfluxDB的任务系统拥有的功能。最后,如果您正在开发基于InfluxDB的酷IoT应用程序,我们很想了解它,请确保通过#InfluxDB在社交媒体上分享它!此外,请随时通过我们社区Slack频道直接联系我,分享您的想法、担忧或问题。我很乐意获得您的反馈,并帮助您解决遇到的问题!