长话短说:InfluxDB 技术提示:Join(连接)

导航至

如果您是 InfluxDB 用户,您几乎肯定使用过 join() 函数。join() 函数执行两个表流的内连接。它最常用于跨度量执行数学运算。但是,现在它已被弃用,取而代之的是新的 join.inner() 函数,它是新 join 包的一部分。随着 join 包的添加,Flux 现在能够执行以下类型的连接:

A-visualization-of-different-types-of-joins

来自这篇 文章 的不同连接类型的可视化。

在这个长话短说中,我们将介绍执行不同类型连接的语法。然后,我们将回答社区提出的问题,这些问题以前由于 Flux 中缺乏多种连接类型支持而没有解决方案。

我强烈建议您阅读 这篇文章 以及本文,以全面了解 Flux 中的 joins 包。

Flux 中的连接语法

Flux 中的连接非常强大,因为您可以创建条件和自定义函数,以便基于多个列执行不同类型的连接。我们将使用 array.from() 函数构建的以下数据执行连接。

left = array.from(rows: [
              {_time: 2022-01-01, label: "a", _value: 1},
              {_time: 2022-01-02, label: "b", _value: 2},
              {_time: 2022-01-03, label: "d", _value: 3}
])

right = array.from(rows: [
              {_time: 2022-01-01, id: "a", _value: 0.4, foo: "bar"},
              {_time: 2022-01-02, id: "c", _value: 0.5, foo: "bar"},
              {_time: 2022-01-03, id: "d", _value: 0.6, foo: "bar"}
])

左连接

让我们看一个 左连接 的示例,以熟悉 Flux 中连接的语法和强大功能。

Left-Join

对于此连接,我们假设我们已查询了两个表流,并将这些流命名为 left_tableright_table。join 函数的 leftright 参数分别指定左输入流和右输入流。这两个表都有一个 _time 列,其中包含相同的时间戳。左表还有一个 label 列,右表有一个 id 列。当满足以下两个条件时,我们想要执行左连接:

  1. 两个表中 _time 列中的值匹配
  2. 两个表中 label 列和 id 列中的值匹配。

这些条件由 join 函数中的 on 参数指定。on 参数必须是单个布尔表达式,用于比较左表流和右表流的属性。

最后,我们使用 as 参数指定我们希望作为连接一部分返回的内容。as 参数是一个用户定义的函数,它必须接受左流和右流并返回一条记录。我们决定要包含以下列:

  1. join_time 列:来自左表流的 _time 列。
  2. join_label 列:来自左表流的 label 列。
  3. v_left:来自左表流的 _value 列。
  4. v_right:来自左表流的 _value 列,其中满足 on 参数指定的条件。
  5. foo:来自右表流的 foo 列。

但是,您可能不想显式命名每个列或返回如上图所示的每个列。相反,您可以使用 with 子句

join.left(
    left: left_table,
    right: right_table,
    on: (l, r) => l._time == r._time and l.label == r.id,
    as: (l, r) => ({l with v_right: r._value, foo: r.foo}),
)

此查询将返回与上述表相同的表,但列名略有不同:_time、label、_value、v_right、foo。此语法的优点是它提供的语法简洁性。缺点是,如果您想要特定的列名,则可能需要重命名列。此外,当您连接分组数据时,这种方法变得尤为重要。with 运算符确保您不会删除组键中的任何列。总的来说,我推荐这种方法。

右连接

类似地,右连接_time 列以及 label 列和 id 列上的连接将如下所示:

Right-Join

请注意,join_label 列如何更改为包含来自右侧 id 列的值。类似地,v_leftv_right 和 foo 列中的值也已更改以反映右连接。对于 join_label 列和 join_time 列,我们必须分别从右侧 id 列和 _time 列中提取。

但是,您可能不想显式命名每个列或返回每个列。请注意,您也可以使用 with 子句

join.left(
    left: left_table,
    right: right_table,
    on: (l, r) => l._time == r._time and l.label == r.id,
    as: (l, r) => ({r with l_right: l._value, foo: r.foo}),
)

此查询将返回与上述表相同的表,但列名略有不同:_timelabel_valuel_rightfoo。此语法的优点是它提供的语法简洁性,而缺点是,如果您想要特定的列名,则可能需要重命名列。此外,当您连接分组数据时,这种方法变得尤为重要。with 运算符确保您不会删除组键中的任何列。总的来说,我推荐这种方法。

全连接

让我们看一下更复杂的示例,即定义一个函数 as 作为 全连接 的一部分。

Full-Join

我们使用 as 参数指定我们希望作为连接一部分返回的内容。由于我们想要执行全外连接,因此我们使用 条件逻辑exists 运算符 从右表和左表中提取时间戳和标签值。最后,我们返回我们想要的列,包括 time、label 以及 left 和 right 值。

内连接

按照其他连接示例,让我们对 _time 列以及 label 列和 id 列上的数据执行 内连接

Inner Join

新的内连接的工作方式与旧的 join 类似。例如,以下两个查询是等效的:

// use an alias to distinguish between the universe join function and the functions that are a part of the join package
import j "join"
// new join 
j.inner(
    left: left_table,
    right: right_table,
    on: (l, r) => l._time == r._time,
    as: (l, r) => ({l with v_right: r._value, foo: r.foo, id: r.id}),
)
|> yield(name: "new")
// deprecated join (the table output is identical aside from different column names)
join(tables: {left_table: left_table, left_table: right_table}, on: ["_time"])
|> yield(name: "deprecated")

产生以下表:

_time _value v_right foo id label
time1 1 0.4 bar a a
time2 2 0.5 bar c b
time3 3 0.6 bar d d

但是,新的 join 包的一个强大组件是能够像之前演示的那样在多个列上执行连接。相比之下,如果您尝试在 _time 列和 _id 列上都进行连接,则将无法返回结果。换句话说,以下查询将不会返回结果:

join(tables: {left_table: left_table, left_table: right_table}, on: ["_time","id"])

这确实提出了一个有趣的用例。用户可能并不总是希望在多个列上进行连接。为了简化此用例的语法,Flux 团队创建了 join.time() 函数,以便专门在 _time 列上进行连接。

按时间连接

join.time() 函数始终仅在 _time 列上连接表。这样,您不必为 on 参数提供值。相反,您只需提供连接 method 类型和 as 参数的函数。例如,以下 join 函数是等效的:

// use an alias to distinguish between the universe join function and the functions that are a part of the join package
import j "join"
// inner join on _time 
j.inner(
    left: left_table,
    right: right_table,
    on: (l, r) => l._time == r._time,
    as: (l, r) => ({l with v_right: r._value, foo: r.foo, id: r.id}),
)
|> yield(name: "inner join on _time")
// time join 
j.time(left: left_table, right: right_table, as: (l, r) => ({l with v_right: r._value, foo: r.foo, id: r.id}), method: "inner")
// deprecated join (the table output is identical aside from different column names)
join(tables: {left_table: left_table, left_table: right_table}, on: ["_time"])
|> yield(name: "deprecated")

此结果与正上方的表相同:

_time _value v_right foo id label
time1 1 0.4 bar a a
time2 2 0.5 bar c b
time3 3 0.6 bar d d

使用空列注入和有条件地连接空值的示例

join 包的功能之一是在多个列上执行多种不同连接的能力。同样重要的是指定您自己的自定义 as 函数,并使其尽可能复杂。您甚至可以预测您尝试连接在一起的序列的变化,以适应缺少列的情况。例如,假设我们想要定期连接数据作为任务的一部分。但是,来自一个表的某一列的存在是稀疏的。我们可以结合使用空值注入和条件逻辑来仍然执行连接。此解决方案是为了规避类似 runtime error @31:12-39:3: left: table is missing label "column name" 的错误。让我们看下面的示例:

table1 有三列,其中列 foo 和 bar 有一些空值:

on foo bar
1 1
2 3

请注意 table2 有一个额外的列 yeet,而 table1 缺少此列:

on foo bar yeet
1 0 4 42
2 2 5 42

假设我们要执行左连接。我们需要使用条件逻辑作为函数参数的一部分,以指定在 table1 的 foo 和 bar 列中存在空值的情况下我们希望返回什么。我们还需要在 map() 函数中使用 debug.null() 函数来创建一个包含空值的空列,以使连接不会失败。

on foo bar yeet
1 1 4 42
2 2 3 42

使用以下 Flux 代码自行尝试:

import "experimental/array"
import "array"
import "join"
import "internal/debug"

table1 = array.from(rows: [{pivot: 1, baz: "foo"}, {pivot: 3, baz: "bar"}])
|> pivot(rowKey:["pivot"], columnKey: ["baz"], valueColumn: "pivot")
|> map(fn: (r) => ({ r with on: 1 }))
|> cumulativeSum(columns: ["on"])
// ignore the code before this line, this is just to create a good example. 
// use the debug.null function to create a new column with null values so the join doesn’t fail. 
|> map(fn: (r) => ({ r with  yeet: debug.null(type: "string") }))
|> yield(name: "table1")

table2 = array.from(rows: [{on: 1, foo: 0, bar: 4, yeet: "42"}, {on: 2, foo: 2, bar: 5, yeet: "42"}])
|> yield(name: "table2")

joined = join.left(left: table1, right: table2, on: (l, r) => l.on == r.on, as: (l, r) => {
  return {
    foo: if exists l.foo then int(v: l.foo) else int(v: r.bar),
    yeet: if exists l.yeet then int(v: l.yeet) else int(v: r.yeet),
    bar: if exists l.bar then int(v: l.bar) else int(v: r.bar),
  }
})

joined |> yield(name: "joined")

使用新的 Join 包解决旧问题

现在我们了解了新的 Join 包的语法,让我们回答一些旧问题,这些问题在没有添加新的连接类型的情况下无法解决。

问题和解决方案 1

用户想要比较两个标签键在不同时间戳下的不同字段值。他们有一个字段,price,和两个标签值,Bookie 1 和 Bookie 2。此数据在第一个表中显示。但是,请注意,用户透视了字段键和字段值,以更轻松地表示他们的数据(就像 schema.fieldsAsCol() 函数已经应用一样)。接下来,他们想要透视他们的数据以获得第二个表。最后,他们想要过滤他们的数据并执行外连接。回填数据为用户提供了第三个表中的所需结果。

_time 价格 庄家
18:50 20 Bookie1
18:55 18 Bookie2
19:00 21 Bookie1
19:05 20 Bookie2
_time Bookie1 Bookie2
18:50 20
18:55 18
19:00 21
19:05 20
_time Bookie1 Bookie2
18:50 20 0
18:55 20 18
19:00 21 18
19:05 21 20

让我们看一下我们可以用来实现此目的的 Flux 脚本:

import "array"
import "join"

//create the data. We use arbitrary timestamps. 
data =
    array.from(
        rows: [
              {_time: 2022-01-01T00:00:00Z, Price: 20, Bookmaker: "Bookie1"},
              {_time: 2022-02-01T00:00:00Z, Price: 18, Bookmaker: "Bookie2"},
              {_time: 2022-03-01T00:00:00Z, Price: 21, Bookmaker: "Bookie1"},
              {_time: 2022-04-01T00:00:00Z, Price: 20, Bookmaker: "Bookie2"},
        ],
    )

// Pivot the data to get the second table 
data
  |> pivot(rowKey:["_time"], columnKey: ["Bookmaker"], valueColumn: "Price")
  |> fill(column: "Bookie1", usePrevious: true)
  |> fill(column: "Bookie2", usePrevious: true)
  |> yield(name: "solution before joins")

// filter for the right values to prepare for the join. This filter could also be performed before the pivot and we could pivot for each Bookmaker tag value. 

left = data 
|> filter(fn: (r) => r.Bookmaker == "Bookie1")

right = data 
|> filter(fn: (r) => r.Bookmaker == "Bookie2")

// join the left and right tables together and fill the missing timestamps. 
join.full(
    left: left,
    right: right,
    on: (l, r) => l._time == r._time,
    as: (l, r) => {
        time = if exists l._time then l._time else r._time
        return {_time: time, Bookie1: l.Price, Bookie2: r.Price}
    },
)
  |> fill(column: "Bookie1", usePrevious: true)
  |> fill(column: "Bookie2", usePrevious: true)
  |> yield(name: "solution after joins")

但是,Flux 如此强大的部分原因是您可以通过多种方式解决问题。您不必使用连接来解决此问题。相反,您可以使用 debug.null() 函数执行以下查询来解决问题 1:

import "array"
import "internal/debug"

data =
    array.from(
        rows: [
              {_time: 2022-01-01T00:00:00Z, Price: 20, Bookmaker: "Bookie1"},
              {_time: 2022-02-01T00:00:00Z, Price: 18, Bookmaker: "Bookie2"},
              {_time: 2022-03-01T00:00:00Z, Price: 21, Bookmaker: "Bookie1"},
              {_time: 2022-04-01T00:00:00Z, Price: 20, Bookmaker: "Bookie2"},
        ],
    )
data 
|> map(
        fn: (r) => {
            bookie1Value = if r.Bookmaker == "Bookie1" then r.Price else debug.null(type: "int")
            bookie2Value = if r.Bookmaker == "Bookie2" then r.Price else debug.null(type: "int")

            return {_time: r._time, bookie1: bookie1Value, bookie2: bookie2Value}
        },
    )
    |> fill(column: "bookie1", usePrevious: true)
    |> fill(column: "bookie2", usePrevious: true)

此解决方案是首选解决方案,但之前的解决方案有助于我们理解连接的工作原理,并突显了 Flux 的多功能性。

问题和解决方案 2

此用户有一个事件标签。他们想要过滤在事件发生时间和之后 10 秒之间写入的数据,以便每次记录事件时都进行过滤。

Problem-and-Solution-2

import "array"
import "join"
import "experimental"

// our event data or StartData
startData = 
    array.from( 
        rows: [{_time: 2022-01-01T00:00:00Z, _value: 2, event: "start"},
               {_time: 2022-01-01T00:00:15Z, _value: 2, event: "start"}
        ]
)
// we add a stop column, “myStop” with a timestamp 10 seconds after each event timestamp.  
  |> map(fn: (r) => ({ r with myStart: r._time }))
  |> map(fn: (r) => ({ r with myStop: experimental.addDuration(d: 10s, to: r._time) }))

// the data we want to filter for based on the startData start and stop times. In this example we expect to filter out all the data where the timestamps are outside of 0-10s and 15-25s, or where the value is equal to 0. 
data =
    array.from(
        rows: [
              {_time: 2022-01-01T00:00:03Z, _value: 1},
              {_time: 2022-01-01T00:00:05Z, _value: 1},
              {_time: 2022-01-01T00:00:08Z, _value: 1},
              {_time: 2022-01-01T00:00:12Z, _value: 0},
              {_time: 2022-01-01T00:00:17Z, _value: 1},
              {_time: 2022-01-01T00:00:18Z, _value: 1},
              {_time: 2022-01-01T00:00:20Z, _value: 1},
              {_time: 2022-01-01T00:00:23Z, _value: 1},
              {_time: 2022-01-01T00:00:27Z, _value: 0},
              {_time: 2022-01-01T00:00:30Z, _value: 0}
        ],
    )

// perform a full join
join.full(
    left: startData,
    right: data,
    on: (l, r) => l._time == r._time,
    as: (l, r) => {
        time = if exists l._time then l._time else r._time
        value = if exists l._value then l._value else r._value

        return {_time: time, value: value, myStart: l.myStart, myStop: l.myStop}
    },
)
|> fill(column: "myStart", usePrevious: true)
// finally filter for data that’s 10 seconds after each event. 
|> filter(fn: (r) => r._time >= r.myStart and r._time <= r.myStop)

请注意,此解决方案存在一些错误。如果您目前正在尝试解决此类问题,请参与此 issue #5127

问题和解决方案 3

此用户想要识别其数据中的不规则季节性。

Problem-and-Solution-3

为了解决他们的问题,我们将使用 NOAA 样本数据集中的前 400 个点,并将数据分组为不同的周期。以下是原始数据的外观:

groupped data into separate periods

然后我们使用 Flux 将数据分组为周期,并生成以下结果:

use Flux to group the data into periods

这是一般方法:

  • 计算导数 (der)。
  • der 结果移动一个时间戳 (der_shift) 并将它们连接在一起 local_min_max
  • 过滤斜率符号发生变化时。
  • 使用 elapse() 函数来考虑导数实际等于 0 的情况,或确保斜率符号的变化发生在局部最小值或最大值处。
  • 向那些斜率符号更改的时间戳添加索引。
  • 执行外连接并填充先前值,以便我们可以确定原始数据中的哪些值属于哪个“周期”。
  • 按标签分组以可视化我们的不同“周期”。
import "math"
import outer "join" 
// data is written 6 minutes apart 
data = from(bucket: "noaa")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "h2o_feet")
  |> filter(fn: (r) => r["_field"] == "water_level")
  |> filter(fn: (r) => r["location"] == "coyote_creek")
  |> keep(columns: ["_time", "_value"])  
  |> limit(n: 400)
//   |> yield(name: "raw")

// find the derivatives
der = data 
// unit for the time in between timestamps
|> derivative(unit: 6m)
|> keep(columns: ["_time", "_value"])
// |> yield(name: "der")

// shift the derivatives by one interval 
der_shift = der
|> timeShift(duration: 6m)
|> keep(columns: ["_time", "_value"])
// |> yield(name: "der_shift")

// join the derivative and shifted derivative data together to find where the derivative changes from positive to negative between the original and shifted derivatives 
local_max_min = join(tables: {der: der, der_shift: der_shift} , on: ["_time"])
// filter to find where the slope changes
|> filter(fn: (r) => r._value_der >= 0.0 and r._value_der_shift <= 0.0 or r._value_der <= 0.0 and r._value_der_shift >= 0.0 )
// to account for when the derivative does == 0 and define local minima/maxima points from either side of crossing from pos to neg or neg to pos
// also helps us make sure that local maxima and minima have adequate time between them. That they’re true local maxima and minima. 
|> elapsed(unit: 6m)
|> filter(fn: (r) => r.elapsed > 1)
// |> yield(name: "local maxima/minima total of 5 points here")
// use the map function to assign a numerical value to group on later after we join with the original data 
|> map(fn: (r) => ({r with period_group: 1.0}))
|> cumulativeSum()
|> keep(columns: ["_time", "period_group"])
// join the local maxima and minima with the original data and join based off of the period group 
outer.time(left: data, right: local_max_min, as: (l, r) => ({l with label: r.period_group}), method: "full" )
|> fill(column: "label", usePrevious: true)
|> group(columns: ["label"], mode:"by")
// |> yield(name: "_result")

结论

以上所有问题均来自论坛上的问题。请随时在此处查看原始讨论:

最后,还值得注意 joins 包的以下问题:

  1. #5135: join.tables() 方法:“full” 与空表一起返回 “table is missing label _value”
  2. #5127: 外连接和填充 + usePrevious 时的 Panic

如果您遇到任何这些错误,我鼓励您在问题上发表评论,以帮助团队更好地了解问题并快速解决它。

我希望这篇博文能启发您利用 Flux 中的新 join 包来处理延迟到达的数据。如果您正在寻找解决需要连接的问题,我鼓励您尝试使用其中一些函数。如果您无法解决问题,请使用我们的 社区站点Slack 频道与我们联系。我很乐意了解您尝试实现的目标以及您希望 InfluxDB 中的任务系统具备哪些功能。最后,如果您正在 InfluxDB 之上开发酷炫的物联网应用程序,我们很乐意听到,因此请务必在社交媒体上使用 #InfluxDB 分享它!此外,欢迎随时在我们的社区 Slack 频道中直接与我联系,分享您的想法、疑虑或问题。我很高兴收到您的反馈并帮助您解决遇到的任何问题!