Flux 中的外连接
作者:Sean Brickley / 产品, 用例, 开发者
2022 年 8 月 3 日
导航至
连接是任何查询语言中常见的转换,为了使 Flux 成为对用户越来越有价值的工具,InfluxData 查询团队的工程师创建并持续维护了两个独立的连接函数。虽然这些解决方案满足了我们用户的一些需求,但它们都缺少一个关键功能:支持外连接。
这是我们社区成员最频繁的要求之一,也可能是使用 Flux 代替其他查询语言最显著的缺点之一。
因此,查询团队很高兴地宣布,如果您是众多等待 Flux 中外连接支持的社区成员之一,那么等待已经结束!Flux 有一个新的 join
包,它在性能和与其他查询语言的功能对等方面都优于之前的两个连接函数,包括(请击鼓...)对外连接的支持!
在您的 Flux 脚本中使用 join 包
通过在脚本的开头添加以下行来导入 join 包
import "join"
join
包提供了六个新函数。我将在我的代码示例中使用 tables()
函数,因为它最灵活。其他函数使用相同的底层实现——它们只是为了用户方便和代码可读性设置了一些默认参数。因此,如果您已经知道查询需要哪种类型的连接,则可以使用以下函数之一来节省一些输入
定义您的输入
首先,定义您的左侧和右侧输入表流。这些可以来自任何来源,但要记住的关键是,在考虑要连接哪些行时,join 将仅比较具有相同组键的表。因此,如果您的两个输入按不同的列分组,或者它们的表不共享任何组键,您可能无法获得预期的结果。
将您的左侧和右侧输入作为参数分别传递给 left
和 right
参数。join
包中的所有函数都要求您显式设置 right
的参数。但是,left
参数可以省略,而支持管道转发的数据。以下两个代码块在功能上是相同的。
import "join"
tbl_1 = from(...) |> ...
tbl_2 = from(...) |> ...
join.tables(
left: tbl_1,
right: tbl_2,
...
)
import "join"
tbl_1 = from(...) |> ...
tbl_2 = from(...) |> ...
tbl_1 |> join.tables(
right: tbl_2,
...
)
“on
”函数
接下来,我们需要为 on 参数定义一个函数。join 转换使用此函数来比较来自左侧和右侧输入且具有相同组键的记录。如果函数评估为 true
,则要比较的两个记录将在最终输出表中连接。
此函数接受两个参数,一个左侧记录 (1
) 和一个右侧记录 (r
),并且函数体需要遵循相当严格的格式:一个布尔表达式,由一个或多个相等比较 (==
) 组成,比较 1
的属性和 r
的属性,并通过 and
运算符链接在一起。
import "join"
tbl_1 = from(...) |> ...
tbl_2 = from(...) |> ...
join.tables(
left: tbl_1,
right: tbl_2,
on: (l, r) => l._time == r._time and r.id == l.label
...
)
连接方法和“as
”函数
还剩下两个参数需要定义,重要的是要了解它们如何协同工作。
method
(顾名思义)确定用于连接两个表的方法:inner
、left
、right
或 full
。
as
函数定义两个匹配行的连接输出。与 on
函数类似,它以左侧和右侧记录(分别为 1
和 r
)作为其参数。它的工作是返回一个由 1
和 r
的组件属性构建的新记录。返回的记录将包含在 join 的最终输出中。
请注意,as
函数的返回值需要保持输入的组键。如果您定义的输出记录修改或排除了组列的值,则 join 将返回错误。
如何构建输出记录的详细信息因所使用的连接方法而略有不同。内连接将跳过任何不匹配的记录,从而允许我们定义一个相当简单的 as
函数
import "join"
tbl_1 = from(...) |> ...
tbl_2 = from(...) |> ...
join.tables(
method: "inner"
left: tbl_1,
right: tbl_2,
on: (l, r) => l._time == r._time and r.id == l.label
as: (l, r) => ({l with r_val: r._value})
)
但是,如果它执行任何类型的外连接(即 left
、right
或 full
),我们需要更仔细地考虑如何构建输出记录。
当 join
在一个输入中找到一个在另一个输入中没有匹配记录的记录时,它将检查连接方法,并在适当的情况下替换为默认记录。默认记录具有与缺失记录相同的架构,但仅填充组键列。所有其他值都为 null。
因此,在定义外连接中 as
函数的返回值时,我们需要特别注意从哪里提取其属性。
左连接将为左侧输入中的每个记录生成一个或多个输出行。因此,可以保证 l 参数不是默认记录。我们对 r
没有相同的保证,因此,如果我们使用 r
中的任何非组键值构建输出记录,我们应该预料到这些值有时会为 null。
右连接的情况正好相反。在右连接中,可以保证 r
不是默认记录,而 1
没有这样的保证。因此,如果我们想确保某些值在输出记录中不会为 null,我们应该从 r
中提取这些值。
全外连接有点棘手。在全外连接中,1
或 r
都可能是默认记录。我们唯一的保证是它们永远不会同时都是默认记录。其中一个将具有输出记录所需的非 null 值,但我们不能确定是哪一个,因此我们必须检查。
import "array"
import "array"
import "join"
right =
array.from(
rows: [
{_time: 2022-06-01T00:00:00Z, _value: 1, id: "a", key: 1},
{_time: 2022-06-01T00:00:00Z, _value: 2, id: "b", key: 1},
{_time: 2022-06-01T00:00:00Z, _value: 3, id: "d", key: 1},
],
)
|> group(columns: ["key"])
left =
array.from(
rows: [
{_time: 2022-06-01T00:00:00Z, _value: 12.34, label: "a", key: 1},
{_time: 2022-06-01T00:00:00Z, _value: 56.78, label: "c", key: 1},
{_time: 2022-06-01T00:00:00Z, _value: 90.12, label: "d", key: 1},
],
)
|> group(columns: ["key"])
join.tables(
method: "full",
left: left,
right: right,
on: (l, r) => l._time == r._time and l.label == r.id,
as: (l, r) => {
time = if exists l._time then l._time else r._time
label = if exists l.label then l.label else r.id
return {
label: label,
v_left: l._value,
key: r.key,
v_right: r._value,
_time: time,
}
},
)
在本示例中,我们使用 exists
关键字来查看 1
或 r
是否具有 _time
和 label
属性所需的非 null 值。我们仍然直接从 1
和 r
中提取 v_left
和 v_right
的值,因为我们预计这些值有时在最终输出中为 null。我们还直接从 r
中提取 key
属性的值,因为它属于组键,因此保证为非 null。我们可以同样轻松地使用 l.key 代替。
运行上面的示例脚本会产生以下输出
正如我们所预期的那样,全外连接为两个输入中的每个记录生成一个连接行,即使是那些在另一张表中没有匹配项的记录也是如此。
以上是关于如何使用新的 Flux join 包的基础知识。我们查询团队鼓励您试用并给我们您的反馈!
请记住,与任何开源项目一样,Flux 的维护和开发是一项持续的努力,join
包也不例外。如果您发现 join 包中的某个函数未按预期运行,或者不能满足您的需求,请通过在我们的社区论坛上发帖或在 GitHub 上的 Flux 存储库中打开 issue 来告知我们。
1 按组键连接使我们能够在许多情况下显着减少 join 转换需要进行的行比较次数,从而提高性能。
2 您可能想知道为什么新的 join 包使用具有如此严格执行的指南的谓词函数,而不是仅仅使用列名列表。主要原因是,也许具有讽刺意味的是,这种方法提供了最大的灵活性。我们不仅可以比较左右表中名称不同的列,而且将来我们或许可以利用此函数来允许更复杂的连接谓词。
目前,强制执行此格式使 Flux 能够确保所有连接都是等值连接,这意味着判断是否应连接两行的唯一标准是两组值是否相等。因此,Flux 查询计划器能够将此函数转换为一系列列名对,然后 join 转换使用这些列名对来比较左右表中的记录,从而节省编译时间。
将来,我们或许可以放宽对谓词函数的限制,以允许更复杂的比较。对于使用列名列表作为其谓词的 join 版本,这样做将更加困难。
3 ...假设这些值在输入表中不为 null