Flux中的外连接
作者 Sean Brickley / 产品,用例,开发者
2022年8月3日
导航到
连接是任何查询语言中的常见转换,为了使Flux成为用户越来越有价值的工具,InfluxData的查询团队创建了两个独立的连接函数,并继续维护它们。尽管这些解决方案满足了一些用户的需求,但它们都缺少一个关键功能:对外连接的支持。
这是我们社区成员最频繁提出的要求之一,也许也是使用Flux代替某些其他查询语言的最明显缺点之一。
这就是为什么查询团队很高兴地宣布,如果您是等待Flux外连接支持的许多社区成员之一,等待结束了!Flux有一个新的join
包,在性能和与其他查询语言的功能兼容性方面都优于之前的两个连接,包括(掌声…)对外连接的支持!
在您的Flux脚本中使用连接包
通过在脚本开头添加以下行来导入连接包
import "join"
join
包含六个新函数。在代码示例中,我将使用 tables()
函数,因为它是最灵活的。其他函数使用相同的底层实现——它们只是为了用户方便和代码可读性而设置了默认参数。因此,如果您已经知道您查询需要哪种类型的连接,您可以使用以下之一来节省输入
定义您的输入
首先,定义您的左侧和右侧输入表流。这些可以来自任何来源,但关键是要记住,连接在考虑要连接的行时,只会比较具有相同分组键的表。因此,如果您的两个输入在不同的列上分组,或者它们的表没有共享任何分组键,您可能不会得到您期望的结果。
将左侧和右侧输入分别作为参数传递给 left
和 right
。join
包中的所有函数都需要您显式设置 right
的参数。然而,left
参数可以省略,以便于管道转发数据。以下两个代码块在功能上是相同的。
import "join"
tbl_1 = from(...) |> ...
tbl_2 = from(...) |> ...
join.tables(
left: tbl_1,
right: tbl_2,
...
)
import "join"
tbl_1 = from(...) |> ...
tbl_2 = from(...) |> ...
tbl_1 |> join.tables(
right: tbl_2,
...
)
‘on’ 函数
接下来,我们需要为 on 参数定义一个函数。连接转换使用此函数来比较左侧和右侧输入中具有相同分组键的记录。如果该函数计算结果为 true
,则比较的两个记录将在最终输出表中连接。
此函数接受两个参数,一个左侧记录(1
)和一个右侧记录(r
),并且函数体需要遵循相当严格的格式:一个布尔表达式,由一个或多个相等比较(==
)组成,这些比较是一个或多个 1
的属性和 r
的属性之间的比较,由 and
运算符连接在一起。
import "join"
tbl_1 = from(...) |> ...
tbl_2 = from(...) |> ...
join.tables(
left: tbl_1,
right: tbl_2,
on: (l, r) => l._time == r._time and r.id == l.label
...
)
连接方法和 ‘as’
函数
还有两个参数需要定义,并且了解它们如何协同工作非常重要。
method
(正如其名称所示)确定用于连接两个表的方法:inner
、left
、right
或 full
。
as
函数定义了两个匹配行的连接输出。与 on
函数类似,它接受一个左侧记录(1
)和一个右侧记录(分别为 r
)作为其参数。其任务是返回一个由 1
和 r
的组件属性构建的新记录。返回的记录将包含在连接的最终输出中。
请注意,as
函数的返回值需要保持输入的分组键。如果您定义了一个输出记录,该记录修改或排除了分组列的值,连接将返回错误。
构建输出记录的详细信息根据使用的连接方法略有不同。内部连接会跳过任何不匹配的记录,使我们能够定义一个相当简单的 as
函数
import "join"
tbl_1 = from(...) |> ...
tbl_2 = from(...) |> ...
join.tables(
method: "inner"
left: tbl_1,
right: tbl_2,
on: (l, r) => l._time == r._time and r.id == l.label
as: (l, r) => ({l with r_val: r._value})
)
然而,如果它正在执行任何类型的外部连接(即 left
、right
或 full
),我们需要更仔细地考虑我们如何构建我们的输出记录。
当 join
在一个输入中找到一个记录,在另一个输入中没有匹配的记录时,它会检查连接方法,并在适当的情况下替换一个默认记录。默认记录具有与缺失记录相同的模式,但只有组键列被填充。所有其他值都是 null。
因此,当在外部连接中定义我们的 as
函数的返回值时,我们需要特别注意我们从哪里获取其属性。
左连接将为左输入中的每个记录产生一个或多个输出行。因此,l 参数保证不是一个默认记录。我们对于 r
没有这样的保证,所以如果我们用来自 r
的任何非组键值来构建我们的输出记录,我们应该预期这些值有时将是 null。
对于右连接,情况正好相反。在右连接中,r
保证不是一个默认记录,而 1
没有这样的保证。因此,如果我们想确保输出记录中的某些值不会为 null,我们应该从 r
中获取这些值。
全外部连接稍微复杂一些。在全外部连接中,1
或 r
有可能是默认记录。我们唯一可以保证的是,它们永远不会同时都是默认记录。其中之一将包含我们输出记录所需的非空值,但我们不能确定是哪一个,所以我们必须检查。
import "array"
import "array"
import "join"
right =
array.from(
rows: [
{_time: 2022-06-01T00:00:00Z, _value: 1, id: "a", key: 1},
{_time: 2022-06-01T00:00:00Z, _value: 2, id: "b", key: 1},
{_time: 2022-06-01T00:00:00Z, _value: 3, id: "d", key: 1},
],
)
|> group(columns: ["key"])
left =
array.from(
rows: [
{_time: 2022-06-01T00:00:00Z, _value: 12.34, label: "a", key: 1},
{_time: 2022-06-01T00:00:00Z, _value: 56.78, label: "c", key: 1},
{_time: 2022-06-01T00:00:00Z, _value: 90.12, label: "d", key: 1},
],
)
|> group(columns: ["key"])
join.tables(
method: "full",
left: left,
right: right,
on: (l, r) => l._time == r._time and l.label == r.id,
as: (l, r) => {
time = if exists l._time then l._time else r._time
label = if exists l.label then l.label else r.id
return {
label: label,
v_left: l._value,
key: r.key,
v_right: r._value,
_time: time,
}
},
)
在这个例子中,我们使用 exists
关键字来检查 1
或 r
是否具有我们需要的非空值来表示 _time
和 label
属性。我们仍然直接从 1
和 r
中提取 v_left
和 v_right
的值,因为我们预计这些值在最终输出中有时将是 null。我们还直接从 r
中提取 key
属性的值,因为它属于组键,因此保证不为 null。我们也可以用 l.key 来代替。
运行上面的示例脚本会产生以下输出
正如我们所预期的,全外部连接为两个输入中的每个记录都产生了连接行,即使那些在另一张表中没有匹配的记录。
这些都是使用新 Flux 连接包的基本方法。我们查询团队鼓励您尝试使用它,并给我们反馈!
请注意,与任何开源项目一样,Flux的维护和开发是一个持续的努力,而join
包也不例外。如果您发现join包中的某个函数表现不符合预期,或者不符合您的需求,请通过在我们的社区论坛发帖或在GitHub上的Flux存储库中创建一个问题来告诉我们。
1 通过按组键连接,我们可以在许多情况下显著减少连接转换需要进行的行比较次数,从而提高性能。
2 您可能想知道为什么新版的join包使用具有严格约束的谓词函数,而不是仅仅使用列名列表。主要原因可能是,出人意料的是,这种方法提供了最大的灵活性。我们不仅可以比较左右表中的不同名称的列,而且我们可能能够利用这个函数在未来允许更复杂的连接谓词。
目前,强制执行这种格式使Flux能够确保所有连接都是等值连接,这意味着两个行是否应该连接的唯一标准是两组值的相等性。因此,Flux查询规划器能够将此函数转换为一组列名对,然后连接转换使用这些列名对来比较左右表中的记录,从而节省编译时间。
未来,我们可能会放宽对谓词函数的限制,以允许更复杂的比较。对于使用列名列表作为谓词的连接版本,这样做将会更加困难。
3 …假设这些值在输入表中不是null