使用 Flux 和 SQL 查询 InfluxDB 中的数据
作者:Anais Dotis-Georgiou / 产品
2022年12月19日
导航至
(更新: InfluxDB 3.0 已远离 Flux 和内置任务引擎。用户可以使用外部工具,如基于 Python 的 Quix,在 InfluxDB 3.0 中创建任务。)
随着 InfluxDB Cloud 的新存储引擎 发布,InfluxDB Cloud 现在支持 SQL。这是因为更新后的 InfluxDB 使用 Apache Arrow DataFusion 项目作为其查询执行引擎的关键构建块。DataFusion 的复杂查询优化支持 InfluxDB Cloud 中几乎无限的数据基数。DataFusion 的另一个好处是它提供了一个兼容 PostgreSQL 的 SQL 实现,这使得 InfluxDB Cloud 现在支持 SQL 查询您的 时序数据,除了 Flux。使用 SQL 在 InfluxDB Cloud 中查询具有以下优点
- 许多技术利用 PostgreSQL,这意味着在 InfluxDB 中包含 PostgreSQL 线协议提供了与以下数据库(更多)的查询兼容性
- Postgresql
- QuestDB
- CockroachDB
- CrateDB
- YugabyteDB
- Timescale
要了解更多关于此主题的信息,我鼓励您阅读 PostgreSQL 线协议的世界。此外,您可以使用您选择的任何语言与 PostgreSQL 数据库适配器,如 Psycopg,在 Python 脚本中执行 SQL。
-
您可以从其他 PostgreSQL 兼容数据库迁移 SQL 查询。
-
如果您已经熟悉 SQL 并且不想学习 Flux,那么您有查询 InfluxDB 的可行选择。
-
您想利用性能极高的列式数据库。要了解更多关于新的 InfluxDB 如何使用 Rust、Apache Arrow、DataFusion 和 Parquet 支持无限基数用例的信息,请阅读 InfluxDB IOx 和对开源的承诺 以及 以毫秒延迟查询 Parquet。
- 您可以查询所有数据,快速返回全局最后、第一个、最小和最大值,而无需指定时间范围。这是 Flux 用户长期以来一直要求的功能。
要了解更多关于 InfluxDB 支持的语言及其优势,请查看 与 InfluxDB 一起使用的最佳查询语言。
在这篇文章中,我们将学习如何使用等效的 SQL 查询编写基本的 Flux 查询。在这个过程中,我们也将学习如何使用 SQL 执行基本查询。本文旨在帮助用户利用 SQL,无论是熟悉 InfluxDB 的 Flux 用户还是新手。要尝试在 InfluxDB Cloud 中运行 SQL 查询,请在此注册:这里。
为了本文,我们将使用 空气传感器样本数据集。您可以在创建存储桶后,在数据探索器中将此样本数据集写入存储桶中 (创建存储桶)
import "influxdata/influxdb/sample"
sample.data(set: "airSensor")
|> to(bucket: "example-bucket")
空气传感器样本数据集包含以下架构
-
1 个测量值:airSensors
-
3 个字段:co、湿度和温度
-
1 个标签:sensor_id
-
8 个 sensor_id 标签值:TLM0100、TLM0101、TLM0102、TLM0103、TLM0200、TLM0201、TLM0202、TLM0203
为了充分利用 InfluxDB Cloud 中的 SQL,请开启 新的 脚本编辑器。InfluxDB Cloud 中的新 脚本编辑器 具有以下新功能
-
保存和打开脚本的能力。
-
创建 SQL 或 Flux 脚本。
-
使用查询构建器作为无代码解决方案,将代码填充到脚本编辑器中,或者直接使用脚本编辑器。
-
在表格或图表中查看您的结果。
基本的 Flux 查询
如果您不是 Flux 用户,请跳过本节。
让我们查询标签 TLM0100 的温度数据。我们的 Flux 查询如下所示
from(bucket: "anais-iox")
|> range(start: 2022-12-01T19:05:41.000Z, stop: now())
|> filter(fn: (r) => r._measurement == "airSensors")
|> filter(fn: (r) => r._field == "temperature")
|> filter(fn: (r) => r.sensor_id == "TLM0100")
现在让我们看一下结果
所有结果都如预期返回,除了查询从查询结果中删除了 _start
和 _stop
列。结果表仍然将数据分割成 _field
和 _value
列。接下来,让我们看一下如何使用 SQL 查询相同的数据。
基本的 SQL 查询
让我们查询标签 TLM0100 的温度数据。我们的 SQL 查询如下所示
SELECT "sensor_id", "temperature", "time" FROM "airSensors"
where time >= ('2022-12-01 19:05:41.000')::TIMESTAMP
and time < now()::TIMESTAMP and sensor_id = 'TLM0100'
注意:您也可以使用 SELECT *
同时返回所有列。
我们从测量值中选择我们想要的列。我们还应用了 where
子句来指定时间范围,以及过滤特定的传感器 ID。我们可以使用 UI 中 脚本编辑器 左侧的面板来选择存储桶。
让我们看一下表格结果
如果您来自 Flux 背景,您会注意到一些令人惊讶的差异。首先,您可以看到返回的数据看起来像 SQL 查询将其转置,或者应用了 fieldsAsCols()
函数。要注意的第二件事是,结果只返回我们查询的列。由于我们没有选择测量值,结果流不包含测量值。此外,查询将 _time
列重命名为 time
。最后,请注意查询返回的表和行数(在“表”按钮左侧)与上面的 Flux 查询相匹配。这有助于确认结果预期并了解数据的形状。
使用 iox.from() 函数的基本 Flux 查询
为了使用 Flux 查询 InfluxDB Cloud 的新引擎并返回与 SQL 查询外观相同的输出,请使用 iox.from() 函数。函数 iox.from()
返回的数据就像它是按时间转置的。
import "experimental/iox"
data = iox.from(bucket: "anais-iox", measurement: "airSensors")
|> range(start: 2022-12-01T19:05:41.000Z, stop: now())
|> filter(fn: (r) => r.sensor_id == "TLM0100")
|> yield()
现在让我们看一下结果
默认情况下,iox.from()
函数返回所有字段。为了将结果减少到只包含您希望的字段(例如温度,就像我们在上一节中所做的那样),请使用 keep() 或 drop() 函数。
Flux到SQL查询转换
现在我们已经了解了使用SQL进行查询的一些基础知识,让我们来看看一些功能等效的查询。在本节中,我们将使用 iox.from()
函数。Flux的结果将不同于SQL的结果,因为Flux的结果包含其他字段数据(温度、co、湿度)。此外,SQL和Flux之间的数据形状也有细微差别。SQL查询始终返回一个表,而Flux可以返回多个表。您需要使用其他函数,如 group()
、union()
或 keep()
,以在两个查询之间产生相同的结果。
SQL | Flux |
---|---|
1. 上5分钟的温度 | |
SELECT "temperature" FROM "airSensors" WHERE time > (NOW() - INTERVAL '5 MINUTES') |
import "experimental/iox" iox.from(bucket: "anais-iox", measurement: "airSensors") |> range(start: -5m, stop: now()) |
2. 一个传感器的最小、最大、平均温度 | |
SELECT min("temperature"), max("temperature"), a vg("temperature") FROM "airSensors" WHERE time >= ('2022-12-01 19:05:41.000')::TIMESTAMP and time < now()::TIMESTAMP and sensor_id = 'TLM0100' |
import "experimental/iox" data = iox.from(bucket: "anais-iox", measurement: "airSensors") |> range(start: 2022-12-01T19:05:41.000Z, stop: now()) |> filter(fn: (r) => r.sensor_id == "TLM0100") data |> min(column: "temperature") |> yield(name: "min") data |> max(column: "temperature") |> yield(name: "max") data |> mean(column: "temperature") |> yield(name: "mean") |
3. 每个传感器的最小、最大、平均温度 | |
SELECT min("temperature"), max("temperature"), avg("temperature"), "sensor_id" FROM "airSensors" WHERE time >= ('2022-12-01 19:05:41.000')::TIMESTAMP and time < now()::TIMESTAMP GROUP BY "sensor_id" ORDER BY "sensor_id" |
import "experimental/iox" data = iox.from(bucket: "anais-iox", measurement: "airSensors") |> range(start: 2022-12-01T19:05:41.000Z, stop: now()) data |> min(column: "temperature") |> yield(name: "min") data |> max(column: "temperature") |> yield(name: "max") data |> mean(column: "temperature") |> yield(name: "mean") |
4. 所有传感器的最小、最大、平均温度 | |
SELECT min("temperature"), max("temperature"), avg("temperature") FROM "airSensors" WHERE time >= ('2022-12-01 19:05:41.000')::TIMESTAMP and time < now()::TIMESTAMP |
import "experimental/iox" data = iox.from(bucket: "anais-iox", measurement: "airSensors") |> range(start: 2022-12-01T19:05:41.000Z, stop: now()) |> group() data |> min(column: "temperature") |> yield(name: "min") data |> max(column: "temperature") |> yield(name: "max") data |> mean(column: "temperature") |> yield(name: "mean") |
5. 对于一个传感器,过滤温度值 > 72.0 | |
SELECT "temperature" FROM "airSensors" WHERE time >= ('2022-12-01 19:05:41.000')::TIMESTAMP and time < now()::TIMESTAMP and sensor_id = 'TLM0100' and temperature > 72.0 |
import "experimental/iox" iox.from(bucket: "anais-iox", measurement: "airSensors") |> range(start: 2022-12-01T19:05:41.000Z, stop: now()) |> filter(fn: (r) => r.sensor_id == "TLM0100") |> filter(fn: (r) => r.temperature > 72.0) |
6. 将时间戳截断到秒 | |
SELECT "temperature", date_trunc('second', time::timestamp) as time FROM "airSensors" WHERE time > (NOW() - INTERVAL '5 MINUTES') and sensor_id = 'TLM0100' |
import "experimental/iox" import "date" iox.from(bucket: "anais-iox", measurement: "airSensors") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r.sensor_id == "TLM0100") |> map(fn: (r) => ({r with _time: date.truncate(t: r._time, unit: 1s)) |
7. 计算所有传感器每小时平均温度 | |
SELECT DATE_BIN(INTERVAL '1' minute, time, TIMESTAMP '2022-01-01 00:00:00Z') AS time, avg("temperature") as mean FROM "airSensors" GROUP BY 1 ORDER BY 1 DESC // The ORDER BY function is optional. It’s like adding a sort() function with Flux. |
import "experimental/iox" iox.from(bucket: "anais-iox", measurement: "airSensors") |> range(start: 2022-12-01T19:05:41.000Z, stop: now()) |> aggregateWindow(every: 1h, fn: mean, createEmpty: false) //The window bounds are different between SQL and Flux. SQL returns the start of the window. Flux returns to the end of the window. |
SQL中的聚合窗口
现在我们已经了解了如何执行一些基本的SQL查询,让我们来看看最后一个例子,更详细地了解如何将 aggregateWindow()
函数转换为SQL。该 aggregateWindow() 函数是一个Flux函数,它在时间周期内分组数据,并对这些分组应用函数。我们通常通过时间窗口进行聚合,以执行下采样任务或创建数据物的材料化视图。假设我们想从空气传感器样本数据集中计算一个传感器的1小时间隔的平均值。
我们的Flux查询看起来像这样
import "experimental/iox"
iox.from(bucket: "anais-iox", measurement: "airSensors")
|> range(start: 2022-12-01T19:05:41.000Z, stop: now())
|> filter(fn: (r) => r.sensor_id == "TLM0100")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
功能等效的SQL查询使用 date_bin 函数,看起来像这样
SELECT
DATE_BIN(INTERVAL '1' hour, time, TIMESTAMP '2022-01-01 00:00:00Z') AS time,
avg("temperature") as mean
FROM "airSensors"
GROUP BY 1
ORDER BY 1
date_bin
函数根据函数提供的第一个参数(步长)的固定周期或间隔创建数据“桶”。第二个参数(源)指定要放入桶中的时间值。第三个参数(原点)有助于确定桶是否会相对于源时间向上或向下舍入。这里有更多关于 如何使用 date_bin 函数 的例子。这里我们按1列分组,这是由 SELECT
子句指定的结果集的第一列。换句话说,通过桶分组来找到相同时间桶中值的平均值。
总结和下一步
要利用InfluxDB Cloud中的SQL查询功能,请在此处注册:这里。
如果您想联系InfluxDB背后的开发人员,请加入 InfluxData社区Slack 并查找 #influxdb_iox 频道。要了解更多信息,请在该频道留言。
您对将SQL作为InfluxDB的查询语言感到兴奋吗?或者您打算继续使用Flux?我很乐意通过社区获得您的反馈。请通过我们的 社区网站 或 Slack 频道联系。我希望这篇博客文章能激发您尝试在InfluxDB Cloud中查询SQL。
最后,如果您在InfluxDB上开发一个酷的物联网应用程序,我们很想了解它,所以请确保使用 #InfluxDB 在社交媒体上分享它!您可以直接在我们的社区Slack频道联系我,分享您的想法、担忧或问题。我很乐意收到您的反馈并帮助您解决遇到的问题!或者 分享您的故事 并获得免费的InfluxDB连帽衫。