使用 Flux 和 SQL 在 InfluxDB 中查询数据
作者:Anais Dotis-Georgiou / 产品
2022 年 12 月 19 日
导航至
(更新:InfluxDB 3.0 放弃了 Flux 和内置任务引擎。用户可以使用外部工具,例如基于 Python 的 Quix,在 InfluxDB 3.0 中创建任务。)
随着 InfluxDB 新存储引擎 的发布,InfluxDB Cloud 现在支持 SQL。这是因为更新后的 InfluxDB 使用 Apache Arrow DataFusion 项目作为其查询执行引擎的关键构建块。DataFusion 的复杂查询优化支持 InfluxDB Cloud 中接近无限基数的数据。DataFusion 的另一个好处是它提供了一个与 PostgreSQL 兼容的 SQL 实现,这就是 InfluxDB Cloud 现在除了 Flux 之外,还支持 SQL 来查询您的 时间序列数据 的原因。在 InfluxDB Cloud 中使用 SQL 查询具有以下优势
- 许多技术利用 PostgreSQL,这意味着在 InfluxDB 中加入 PostgreSQL 线协议可以实现与以下数据库(以及更多)的查询兼容性
- Postgresql
- QuestDB
- CockroachDB
- CrateDB
- YugabyteDB
- Timescale
要了解有关此主题的更多信息,我鼓励您阅读 PostgreSQL 线兼容性的世界。此外,您可以将 PostgreSQL 数据库适配器与您选择的语言一起使用,例如 Psycopg,以在 Python 脚本中执行 SQL。
-
您可以从另一个 PostgreSQL 兼容数据库迁移 SQL 查询。
-
如果您已经熟悉 SQL 并且不想学习 Flux,那么您有一个可行的选项来查询 InfluxDB。
-
您想利用性能极高的列式数据库。要了解有关新 InfluxDB 如何使用 Rust、Apache Arrow、DataFusion 和 Parquet 来支持无限基数用例的更多信息,请阅读 InfluxDB IOx 和对开源的承诺 和 以毫秒延迟查询 Parquet。
- 您可以查询所有数据以快速返回全局最后、第一、最小和最大值,而无需指定时间范围。此功能一直是 Flux 用户长期以来要求的功能。
要了解有关 InfluxDB 支持的语言及其优点的更多信息,请查看 与 InfluxDB 一起使用的最佳查询语言。
在这篇文章中,我们将学习如何编写基本的 Flux 查询以及等效的 SQL 查询。在此过程中,我们还将学习如何使用 SQL 执行基本查询。这篇文章旨在帮助用户利用 SQL,无论他们是熟悉 InfluxDB 的 Flux 用户还是新手。要在 InfluxDB Cloud 中试用 SQL 查询,请在此处注册 here。
在这篇文章中,我们将使用 空气传感器示例数据集。您可以在 创建存储桶 后,将此示例数据集写入数据浏览器中的存储桶
import "influxdata/influxdb/sample"
sample.data(set: "airSensor")
|> to(bucket: "example-bucket")
空气传感器示例数据集包含以下架构
-
1 个测量:airSensors
-
3 个字段:co、湿度和温度
-
1 个标签:sensor_id
-
8 个 sensor_id 标签值:TLM0100、TLM0101、TLM0102、TLM0103、TLM0200、TLM0201、TLM0202、TLM0203
为了在 InfluxDB Cloud 中利用 SQL,请打开 新的 脚本编辑器。InfluxDB Cloud 中的新 脚本编辑器 具有以下新功能
-
保存和打开脚本的功能。
-
创建 SQL 或 Flux 脚本。
-
使用查询构建器作为无代码解决方案,该解决方案使用代码填充脚本编辑器,或直接使用脚本编辑器。
-
在表格或图形中查看结果。
基本 Flux 查询
如果您不是 Flux 用户,请跳过此部分。
让我们查询来自一个标签 TLM0100 的温度数据。我们的 Flux 查询如下所示
from(bucket: "anais-iox")
|> range(start: 2022-12-01T19:05:41.000Z, stop: now())
|> filter(fn: (r) => r._measurement == "airSensors")
|> filter(fn: (r) => r._field == "temperature")
|> filter(fn: (r) => r.sensor_id == "TLM0100")
现在让我们看一下结果
一切都按预期返回,只是查询从查询结果中删除了 _start
和 _stop
列。结果表仍然将数据拆分为 _field
和 _value
列。接下来,让我们看一下如何使用 SQL 查询相同的数据。
基本 SQL 查询
让我们查询来自一个标签 TLM0100 的温度数据。我们的 SQL 查询如下所示
SELECT "sensor_id", "temperature", "time" FROM "airSensors"
where time >= ('2022-12-01 19:05:41.000')::TIMESTAMP
and time < now()::TIMESTAMP and sensor_id = 'TLM0100'
注意:您也可以使用 SELECT *
同时返回所有列。
我们 select
从测量中选择我们想要的列。我们还应用 where
子句来指定时间范围以及过滤特定的传感器 ID。我们可以使用 UI 中 脚本编辑器 左侧的面板来选择存储桶。
让我们看一下表格结果
如果您来自 Flux 背景,您会注意到一些令人惊讶的差异。首先,您可以看到返回的数据看起来像 SQL 查询对其进行了透视,或者应用了 fieldsAsCols()
函数。第二件要注意的事情是,结果只返回我们查询的列。由于我们没有选择测量,因此结果流不包含测量。此外,查询将 _time
列重命名为 time
。最后,请注意查询返回的表格和行数(表格按钮的左侧)与上面的 Flux 查询匹配。这有助于确认结果预期并了解数据的形状。
带有 iox.from() 函数的基本 Flux 查询
要使用 Flux 查询新的 InfluxDB Cloud 引擎并返回看起来与 SQL 查询相同的输出,请使用 iox.from() 函数。iox.from()
函数返回的数据就像它在时间上被透视一样。
import "experimental/iox"
data = iox.from(bucket: "anais-iox", measurement: "airSensors")
|> range(start: 2022-12-01T19:05:41.000Z, stop: now())
|> filter(fn: (r) => r.sensor_id == "TLM0100")
|> yield()
现在让我们看一下结果
iox.from()
函数默认返回所有字段。要减少结果以仅包含您想要的字段(即温度,就像我们在上一节中所做的那样),请使用 keep() 或 drop() 函数。
Flux 到 SQL 查询转换
现在我们了解了一些使用 SQL 查询背后的基础知识,让我们看一下一些功能等效的查询。在本节中,我们将使用 iox.from()
函数。Flux 结果与 SQL 结果的不同之处在于,Flux 结果包含其他字段数据(温度、co、湿度)。此外,SQL 和 Flux 之间的数据形状略有不同。SQL 查询始终返回一个表,而 Flux 可以返回多个表。您需要使用其他函数,例如 group()
、union()
或 keep()
,以在两个查询之间产生相同的结果。
SQL | Flux |
---|---|
1. 过去 5 分钟的温度 | |
SELECT "temperature" FROM "airSensors" WHERE time > (NOW() - INTERVAL '5 MINUTES') |
import "experimental/iox" iox.from(bucket: "anais-iox", measurement: "airSensors") |> range(start: -5m, stop: now()) |
2. 一个 传感器的最小、最大、平均温度 | |
SELECT min("temperature"), max("temperature"), a vg("temperature") FROM "airSensors" WHERE time >= ('2022-12-01 19:05:41.000')::TIMESTAMP and time < now()::TIMESTAMP and sensor_id = 'TLM0100' |
import "experimental/iox" data = iox.from(bucket: "anais-iox", measurement: "airSensors") |> range(start: 2022-12-01T19:05:41.000Z, stop: now()) |> filter(fn: (r) => r.sensor_id == "TLM0100") data |> min(column: "temperature") |> yield(name: "min") data |> max(column: "temperature") |> yield(name: "max") data |> mean(column: "temperature") |> yield(name: "mean") |
3. 每个传感器的最小、最大、平均温度 | |
SELECT min("temperature"), max("temperature"), avg("temperature"), "sensor_id" FROM "airSensors" WHERE time >= ('2022-12-01 19:05:41.000')::TIMESTAMP and time < now()::TIMESTAMP GROUP BY "sensor_id" ORDER BY "sensor_id" |
import "experimental/iox" data = iox.from(bucket: "anais-iox", measurement: "airSensors") |> range(start: 2022-12-01T19:05:41.000Z, stop: now()) data |> min(column: "temperature") |> yield(name: "min") data |> max(column: "temperature") |> yield(name: "max") data |> mean(column: "temperature") |> yield(name: "mean") |
4. 所有传感器的最小、最大、平均温度 | |
SELECT min("temperature"), max("temperature"), avg("temperature") FROM "airSensors" WHERE time >= ('2022-12-01 19:05:41.000')::TIMESTAMP and time < now()::TIMESTAMP |
import "experimental/iox" data = iox.from(bucket: "anais-iox", measurement: "airSensors") |> range(start: 2022-12-01T19:05:41.000Z, stop: now()) |> group() data |> min(column: "temperature") |> yield(name: "min") data |> max(column: "temperature") |> yield(name: "max") data |> mean(column: "temperature") |> yield(name: "mean") |
5. 过滤一个传感器温度值 > 72.0 的数据 | |
SELECT "temperature" FROM "airSensors" WHERE time >= ('2022-12-01 19:05:41.000')::TIMESTAMP and time < now()::TIMESTAMP and sensor_id = 'TLM0100' and temperature > 72.0 |
import "experimental/iox" iox.from(bucket: "anais-iox", measurement: "airSensors") |> range(start: 2022-12-01T19:05:41.000Z, stop: now()) |> filter(fn: (r) => r.sensor_id == "TLM0100") |> filter(fn: (r) => r.temperature > 72.0) |
6. 将时间戳截断到秒 | |
SELECT "temperature", date_trunc('second', time::timestamp) as time FROM "airSensors" WHERE time > (NOW() - INTERVAL '5 MINUTES') and sensor_id = 'TLM0100' |
import "experimental/iox" import "date" iox.from(bucket: "anais-iox", measurement: "airSensors") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r.sensor_id == "TLM0100") |> map(fn: (r) => ({r with _time: date.truncate(t: r._time, unit: 1s)) |
7. 计算所有传感器每小时的平均温度 | |
SELECT DATE_BIN(INTERVAL '1' minute, time, TIMESTAMP '2022-01-01 00:00:00Z') AS time, avg("temperature") as mean FROM "airSensors" GROUP BY 1 ORDER BY 1 DESC // The ORDER BY function is optional. It’s like adding a sort() function with Flux. |
import "experimental/iox" iox.from(bucket: "anais-iox", measurement: "airSensors") |> range(start: 2022-12-01T19:05:41.000Z, stop: now()) |> aggregateWindow(every: 1h, fn: mean, createEmpty: false) //The window bounds are different between SQL and Flux. SQL returns the start of the window. Flux returns to the end of the window. |
SQL 中的聚合窗口
现在我们了解了如何执行一些基本的 SQL 查询,让我们看一下最后一个示例,即如何更详细地将 aggregateWindow()
函数转换为 SQL。aggregateWindow() 函数是一个 Flux 函数,它按时间段对数据进行分组,并将一个函数应用于这些组。我们经常按时间窗口聚合以执行降采样任务或创建数据的物化视图。假设我们要计算空气传感器示例数据集中一个传感器 1 小时间隔的平均值。
我们的 Flux 查询如下所示
import "experimental/iox"
iox.from(bucket: "anais-iox", measurement: "airSensors")
|> range(start: 2022-12-01T19:05:41.000Z, stop: now())
|> filter(fn: (r) => r.sensor_id == "TLM0100")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
功能等效的 SQL 查询采用 date_bin 函数,如下所示
SELECT
DATE_BIN(INTERVAL '1' hour, time, TIMESTAMP '2022-01-01 00:00:00Z') AS time,
avg("temperature") as mean
FROM "airSensors"
GROUP BY 1
ORDER BY 1
date_bin
函数根据作为函数的第一个参数(步幅)提供的固定周期或间隔创建数据的时间“箱”。第二个参数(源)指定放置在箱中的时间值。第三个参数(原点)有助于确定箱相对于您的源时间是向上舍入还是向下舍入。以下是关于 如何使用 date_bin 函数 的更多示例。这里我们按 1 列分组,这是您的结果集的第一列,由 SELECT
子句指定。换句话说,按箱分组以查找同一时间箱中值的平均值。
总结和后续步骤
要在 InfluxDB Cloud 中利用 SQL 查询,请在此处注册 here。
如果您想联系这个新数据引擎背后的 InfluxDB 开发人员,请加入 InfluxData 社区 Slack 并查找 #influxdb_iox 频道。要了解更多信息,请在该频道留言。
InfluxDB 将 SQL 作为查询语言是否让您感到兴奋?或者您计划继续使用 Flux 吗?我很乐意通过社区获得您的反馈。请使用我们的 社区站点 或 Slack 频道与我们联系。我希望这篇博客文章能激励您尝试在 InfluxDB Cloud 中查询 SQL。
最后,如果您正在 InfluxDB 之上开发一个很酷的物联网应用程序,我们很乐意听到它,所以请务必在社交媒体上使用 #InfluxDB 分享它!您可以在我们的社区 Slack 频道中直接与我联系,分享您的想法、疑虑或问题。我很乐意获得您的反馈并帮助您解决遇到的任何问题!或者 分享您的故事 并获得免费的 InfluxDB 连帽衫。