使用 Flux 和 SQL 查询 InfluxDB 中的数据

导航至

(更新: InfluxDB 3.0 已远离 Flux 和内置任务引擎。用户可以使用外部工具,如基于 Python 的 Quix,在 InfluxDB 3.0 中创建任务。)

随着 InfluxDB Cloud 的新存储引擎 发布,InfluxDB Cloud 现在支持 SQL。这是因为更新后的 InfluxDB 使用 Apache Arrow DataFusion 项目作为其查询执行引擎的关键构建块。DataFusion 的复杂查询优化支持 InfluxDB Cloud 中几乎无限的数据基数。DataFusion 的另一个好处是它提供了一个兼容 PostgreSQL 的 SQL 实现,这使得 InfluxDB Cloud 现在支持 SQL 查询您的 时序数据,除了 Flux。使用 SQL 在 InfluxDB Cloud 中查询具有以下优点

  • 许多技术利用 PostgreSQL,这意味着在 InfluxDB 中包含 PostgreSQL 线协议提供了与以下数据库(更多)的查询兼容性
    • Postgresql
    • QuestDB
    • CockroachDB
    • CrateDB
    • YugabyteDB
    • Timescale

    要了解更多关于此主题的信息,我鼓励您阅读 PostgreSQL 线协议的世界。此外,您可以使用您选择的任何语言与 PostgreSQL 数据库适配器,如 Psycopg,在 Python 脚本中执行 SQL。

  • 您可以从其他 PostgreSQL 兼容数据库迁移 SQL 查询。

  • 如果您已经熟悉 SQL 并且不想学习 Flux,那么您有查询 InfluxDB 的可行选择。

  • 您想利用性能极高的列式数据库。要了解更多关于新的 InfluxDB 如何使用 Rust、Apache Arrow、DataFusion 和 Parquet 支持无限基数用例的信息,请阅读 InfluxDB IOx 和对开源的承诺 以及 以毫秒延迟查询 Parquet

  • 您可以查询所有数据,快速返回全局最后、第一个、最小和最大值,而无需指定时间范围。这是 Flux 用户长期以来一直要求的功能。

要了解更多关于 InfluxDB 支持的语言及其优势,请查看 与 InfluxDB 一起使用的最佳查询语言

在这篇文章中,我们将学习如何使用等效的 SQL 查询编写基本的 Flux 查询。在这个过程中,我们也将学习如何使用 SQL 执行基本查询。本文旨在帮助用户利用 SQL,无论是熟悉 InfluxDB 的 Flux 用户还是新手。要尝试在 InfluxDB Cloud 中运行 SQL 查询,请在此注册:这里

为了本文,我们将使用 空气传感器样本数据集。您可以在创建存储桶后,在数据探索器中将此样本数据集写入存储桶中 (创建存储桶)

import "influxdata/influxdb/sample"
sample.data(set: "airSensor")
|> to(bucket: "example-bucket")

空气传感器样本数据集包含以下架构

  • 1 个测量值:airSensors

  • 3 个字段:co、湿度和温度

  • 1 个标签:sensor_id

  • 8 个 sensor_id 标签值:TLM0100、TLM0101、TLM0102、TLM0103、TLM0200、TLM0201、TLM0202、TLM0203

为了充分利用 InfluxDB Cloud 中的 SQL,请开启 新的 脚本编辑器。InfluxDB Cloud 中的新 脚本编辑器 具有以下新功能

  • 保存和打开脚本的能力。

  • 创建 SQL 或 Flux 脚本。

  • 使用查询构建器作为无代码解决方案,将代码填充到脚本编辑器中,或者直接使用脚本编辑器。

  • 在表格或图表中查看您的结果。

Data Explorer - view results in the table or graph

基本的 Flux 查询

如果您不是 Flux 用户,请跳过本节。

让我们查询标签 TLM0100 的温度数据。我们的 Flux 查询如下所示

from(bucket: "anais-iox")
    |> range(start: 2022-12-01T19:05:41.000Z, stop: now())
    |> filter(fn: (r) => r._measurement == "airSensors")
    |> filter(fn: (r) => r._field == "temperature")
    |> filter(fn: (r) => r.sensor_id == "TLM0100")

现在让我们看一下结果

Table Results -1

所有结果都如预期返回,除了查询从查询结果中删除了 _start_stop 列。结果表仍然将数据分割成 _field_value 列。接下来,让我们看一下如何使用 SQL 查询相同的数据。

基本的 SQL 查询

让我们查询标签 TLM0100 的温度数据。我们的 SQL 查询如下所示

SELECT "sensor_id", "temperature", "time" FROM "airSensors"
where time >= ('2022-12-01 19:05:41.000')::TIMESTAMP
and time < now()::TIMESTAMP and sensor_id = 'TLM0100'

注意:您也可以使用 SELECT * 同时返回所有列。

我们从测量值中选择我们想要的列。我们还应用了 where 子句来指定时间范围,以及过滤特定的传感器 ID。我们可以使用 UI 中 脚本编辑器 左侧的面板来选择存储桶。

Basic SQL

InfluxDB Cloud 中的新 脚本资源管理器。在为新 SQL 脚本选择后,选择您要查询的存储桶。点击 运行。然后您还可以选择 自定义 以可视化和正确的列。

让我们看一下表格结果

table with results-2

如果您来自 Flux 背景,您会注意到一些令人惊讶的差异。首先,您可以看到返回的数据看起来像 SQL 查询将其转置,或者应用了 fieldsAsCols() 函数。要注意的第二件事是,结果只返回我们查询的列。由于我们没有选择测量值,结果流不包含测量值。此外,查询将 _time 列重命名为 time。最后,请注意查询返回的表和行数(在“表”按钮左侧)与上面的 Flux 查询相匹配。这有助于确认结果预期并了解数据的形状。

使用 iox.from() 函数的基本 Flux 查询

为了使用 Flux 查询 InfluxDB Cloud 的新引擎并返回与 SQL 查询外观相同的输出,请使用 iox.from() 函数。函数 iox.from() 返回的数据就像它是按时间转置的。

import "experimental/iox"
data = iox.from(bucket: "anais-iox", measurement: "airSensors")
|> range(start: 2022-12-01T19:05:41.000Z, stop: now())
|> filter(fn: (r) => r.sensor_id == "TLM0100")
|> yield()

现在让我们看一下结果

table with results-3

默认情况下,iox.from() 函数返回所有字段。为了将结果减少到只包含您希望的字段(例如温度,就像我们在上一节中所做的那样),请使用 keep()drop() 函数。

Flux到SQL查询转换

现在我们已经了解了使用SQL进行查询的一些基础知识,让我们来看看一些功能等效的查询。在本节中,我们将使用 iox.from() 函数。Flux的结果将不同于SQL的结果,因为Flux的结果包含其他字段数据(温度、co、湿度)。此外,SQL和Flux之间的数据形状也有细微差别。SQL查询始终返回一个表,而Flux可以返回多个表。您需要使用其他函数,如 group()union()keep(),以在两个查询之间产生相同的结果。

SQL Flux
1. 上5分钟的温度
SELECT "temperature" FROM "airSensors" 
WHERE time > (NOW() - INTERVAL '5 MINUTES')
import "experimental/iox"
iox.from(bucket: "anais-iox", 
measurement: "airSensors")
|> range(start: -5m, stop: now())
2. 一个传感器的最小、最大、平均温度
SELECT min("temperature"), 
max("temperature"), a
vg("temperature") FROM "airSensors" 
WHERE time >= ('2022-12-01 
19:05:41.000')::TIMESTAMP 
and time < now()::TIMESTAMP and 
sensor_id = 'TLM0100'
import "experimental/iox"
data = iox.from(bucket: "anais-iox", 
measurement: "airSensors")
|> range(start: 2022-12-01T19:05:41.000Z, 
stop: now())
|> filter(fn: (r) => r.sensor_id == "TLM0100")

data
|> min(column: "temperature")
|> yield(name: "min")

data 
|> max(column: "temperature")
|> yield(name: "max")

data 
|> mean(column: "temperature")
|> yield(name: "mean")
3. 每个传感器的最小、最大、平均温度
SELECT min("temperature"), 
max("temperature"), 
avg("temperature"), 
"sensor_id" FROM "airSensors" 
WHERE time >= ('2022-12-01 
19:05:41.000')::TIMESTAMP 
and time < now()::TIMESTAMP GROUP 
BY "sensor_id"
ORDER BY "sensor_id"
import "experimental/iox"
data = iox.from(bucket: "anais-iox", 
measurement: "airSensors")
|> range(start: 2022-12-01T19:05:41.000Z, 
stop: now())

data
|> min(column: "temperature")
|> yield(name: "min")

data 
|> max(column: "temperature")
|> yield(name: "max")

data 
|> mean(column: "temperature")
|> yield(name: "mean")
4. 所有传感器的最小、最大、平均温度
SELECT min("temperature"), 
max("temperature"), 
avg("temperature") FROM "airSensors" 
WHERE time >= ('2022-12-01 
19:05:41.000')::TIMESTAMP 
and time < now()::TIMESTAMP
import "experimental/iox"
data = iox.from(bucket: "anais-iox", 
measurement: "airSensors")
|> range(start: 2022-12-01T19:05:41.000Z, 
stop: now())
|> group()

data
|> min(column: "temperature")
|> yield(name: "min")

data 
|> max(column: "temperature")
|> yield(name: "max")

data 
|> mean(column: "temperature")
|> yield(name: "mean")
5. 对于一个传感器,过滤温度值 > 72.0
SELECT "temperature" FROM 
"airSensors" 
WHERE time >= ('2022-12-01 
19:05:41.000')::TIMESTAMP 
and time < now()::TIMESTAMP and 
sensor_id = 'TLM0100' and 
temperature > 72.0
import "experimental/iox"
iox.from(bucket: "anais-iox", measurement:
"airSensors")
|> range(start: 2022-12-01T19:05:41.000Z, 
stop: now())
|> filter(fn: (r) => r.sensor_id == "TLM0100")
|> filter(fn: (r) => r.temperature > 72.0)
6. 将时间戳截断到秒
SELECT "temperature", 
date_trunc('second', 
time::timestamp) as time  FROM 
"airSensors" 
WHERE time > (NOW() - INTERVAL '5 MINUTES') 
and sensor_id = 'TLM0100'
import "experimental/iox"
import "date"
iox.from(bucket: "anais-iox", 
measurement: "airSensors")
|> range(start: -5m, stop: now())
|> filter(fn: (r) => r.sensor_id 
== "TLM0100")
|> map(fn: (r) => ({r with _time: 
date.truncate(t: r._time, unit: 1s))
7. 计算所有传感器每小时平均温度
SELECT
DATE_BIN(INTERVAL '1' minute, 
time, TIMESTAMP '2022-01-01 
00:00:00Z') AS time,
avg("temperature")  as mean 
FROM "airSensors"
GROUP BY 1
ORDER BY 1 DESC
// The ORDER BY function is optional. 
It’s like adding a sort() function with Flux.
import "experimental/iox"
iox.from(bucket: "anais-iox", 
measurement: "airSensors")
|> range(start: 
2022-12-01T19:05:41.000Z, stop: now())
|> aggregateWindow(every: 1h, 
fn: mean, createEmpty: false)
//The window bounds are different 
between SQL and Flux. 
SQL returns the start of the window. 
Flux returns to the end of the window.

SQL中的聚合窗口

现在我们已经了解了如何执行一些基本的SQL查询,让我们来看看最后一个例子,更详细地了解如何将 aggregateWindow() 函数转换为SQL。该 aggregateWindow() 函数是一个Flux函数,它在时间周期内分组数据,并对这些分组应用函数。我们通常通过时间窗口进行聚合,以执行下采样任务或创建数据物的材料化视图。假设我们想从空气传感器样本数据集中计算一个传感器的1小时间隔的平均值。

我们的Flux查询看起来像这样

import "experimental/iox"
iox.from(bucket: "anais-iox", measurement: "airSensors")
    |> range(start: 2022-12-01T19:05:41.000Z, stop: now())
    |> filter(fn: (r) => r.sensor_id == "TLM0100")
    |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)

功能等效的SQL查询使用 date_bin 函数,看起来像这样

SELECT
  DATE_BIN(INTERVAL '1' hour, time, TIMESTAMP '2022-01-01 00:00:00Z') AS time,
  avg("temperature")  as mean
FROM "airSensors"
GROUP BY 1 
ORDER BY 1

date_bin 函数根据函数提供的第一个参数(步长)的固定周期或间隔创建数据“桶”。第二个参数(源)指定要放入桶中的时间值。第三个参数(原点)有助于确定桶是否会相对于源时间向上或向下舍入。这里有更多关于 如何使用 date_bin 函数 的例子。这里我们按1列分组,这是由 SELECT 子句指定的结果集的第一列。换句话说,通过桶分组来找到相同时间桶中值的平均值。

总结和下一步

要利用InfluxDB Cloud中的SQL查询功能,请在此处注册:这里

如果您想联系InfluxDB背后的开发人员,请加入 InfluxData社区Slack 并查找 #influxdb_iox 频道。要了解更多信息,请在该频道留言。

您对将SQL作为InfluxDB的查询语言感到兴奋吗?或者您打算继续使用Flux?我很乐意通过社区获得您的反馈。请通过我们的 社区网站Slack 频道联系。我希望这篇博客文章能激发您尝试在InfluxDB Cloud中查询SQL。

最后,如果您在InfluxDB上开发一个酷的物联网应用程序,我们很想了解它,所以请确保使用 #InfluxDB 在社交媒体上分享它!您可以直接在我们的社区Slack频道联系我,分享您的想法、担忧或问题。我很乐意收到您的反馈并帮助您解决遇到的问题!或者 分享您的故事 并获得免费的InfluxDB连帽衫。