使用 Flux 将数据写入 SQL 数据存储
作者:Nate Isley / 使用案例, 开发者, 产品
2019 年 8 月 28 日
导航至
正如在 多数据源 Flux 中讨论的那样,Flux 现在可以使用 SQL.from()
从 RDBM 中获取数据,并使用它进行更深入的分析。在许多情况下,数据分析的结果自然适合时间序列存储。在其他情况下,结果更适合在不同的数据存储上加以利用。为了实现 Flux 通用分析引擎的愿景,数据必须流入和流出 InfluxDB。
为此,InfluxDB 2.0 Alpha 15 引入了 SQL.to()。SQL.to() 镜像了 Flux 从 SQL 存储中提取数据的能力,从而能够写入 MySQL、MariaDB 和 Postgres。下面,我概述了如何使用 SQL.to()
,并通过一个示例演示如何计数事件并将它们存储到 Postgres 中。
表格让世界运转
一旦我 理解了 Flux 结果实际上只是数据表格,我就想知道 Flux 结果表格与 RDBM 表格 有多接近。事实证明它们足够接近,Flux 可以利用它们的相似性使 SQL 写入变得异常优雅。
一旦您塑造并命名您的数据以符合目标表格定义,您只需将您的数据管道传输到 SQL.to()
。让我们为一个新的计费系统构建一个使用案例来演示这个新功能。
鱼缸健康供应商 - 一种新的商业模式
我最喜欢的虚构 鱼缸健康供应商 注意到支持不同客户的成本差异很大。在仔细研究他们的数据后,他们了解到成本几乎完全由他们的自动化水系统执行的纠正措施的数量驱动。他们决定调整他们的订阅模式,改为根据每周纠正事件的数量向客户收费的模式。
幸运的是,每当部署到他们客户鱼缸中的设备采取纠正措施时,例如调整 pH 值或硝酸盐水平,这些设备都会向 InfluxDB 发送一个事件。事件类似于传感器数据流,只是事件是按需发出的,而不是每隔 X 秒发出一次。这些按需事件也称为“不规则时间序列”,因为它们可能在任何时间发生。
事件的具体格式和内容可以是您想要的任何形式,但让我们假设事件看起来像下面的两个示例(以 行协议 显示)
# Adjustment events - note timestamp will be added by InfluxDB at arrival time
water_sensor,type=adjustment,adjust_what=nitrates,change=up,device_id=TLM0101 triggering_value=5
water_sensor,type=adjustment,adjust_what=pH,change=down,device_id=TLM0102 triggering_value=9
为什么要使用 SQL 数据存储?
事件已经存在于 InfluxDB 中。为什么要引入 SQL 数据存储?时间序列数据库的几个特性,通常来说,并不使其成为计费系统的理想基础。
InfluxDB 旨在灵活地适应经常变化的大量数据,这可能是由于端点更改或数据可变性,因为客户端可以执行诸如写入时架构之类的操作。通常,用户会非常快速地导入数千万个数据点,然后在几周后对其中大部分进行降采样或删除。当您拥有数百万个不同的传感器或数千台您不断升级的服务器时,这种灵活性非常棒。
相比之下,任何试图创建计费系统的人都希望保留多年的稳定客户计费数据。虽然大规模捕获原始数据对于存储在像 InfluxDB 这样的时间序列数据库中是有意义的,但聚合和处理后的数据代表了一种“成品”,将其与其他客户数据(例如账单地址和电话号码)一起存储可能更有意义。为稳定、不频繁的表格更改而设计的关系数据库可能是计费系统的更好基础。
快点,开始计算数字!
为了收集新订阅计费模型的数据,我们的鱼缸供应商决定创建一个简单的 每周任务,以计算每个设备的调整次数,并将它们保存到他们的 Postgres 数据库中。(注意:这种朴素的场景无法扩展——在设备数量达到某个较大值时,Postgres 和 InfluxDB 都会成为瓶颈。更全面的方法可能是为每个客户创建一个任务。)
存储调整计数的第一个步骤是在 Postgres 中创建一个表格。这是我用来创建一个简单示例表格的 Postgres SQL 命令
CREATE TABLE adjustment_billing
(billing_record SERIAL PRIMARY KEY, device_id VARCHAR NOT NULL,
adjustment_actions integer, created_on TIMESTAMP);
有了用于存储结果的表格,供应商开始将 Flux 脚本的各个部分组合在一起。我们首先从 tank-health bucket 中获取过去一周的数据,仅过滤调整事件,并对数据进行分组
import "sql"
adjustmentActions = from(bucket: "tank-health")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "water_sensor" and r.type == "adjustment")
|> group(columns: ["type", "device_id"], mode:"by")
按类型和设备 ID 分组的结果是每个设备的表格。为了获得每个设备(表格)的调整计数,我们将结果管道传输到计数 change 列中具有值的行数
// Count the number of changes for each device
|> count(column: "change")
请注意,此时,仅保留分组和计数调出的列。(从技术上讲,_start 和 _stop 时间存在,但它们在下一步中被删除)。
此结果与我们创建的 SQL 计费表格的格式非常接近,但它并不完全一致,并且需要进行最终调整。
// Adjust the table to match the SQL destination schema/format
|> rename(columns: {change: "adjustment_actions"})
|> keep(columns: ["device_id", "adjustment_actions"])
现在,数据列与计费表格完美匹配,并且 Flux 脚本可以简单地存储结果。
// Store result in Postgres
|> sql.to(driverName: "postgres", dataSourceName: "postgresql://localhost?sslmode=disable", table: "adjustment_billing")
在保存任务并设置为每周运行后,每个设备的调整事件计数将存储在 Postgres 中。如果您在任务运行后使用“SELECT * from adjustment_billing”直接查询 Postgres,您可以看到所有 Flux 工作的成果。
立即开始使用 Flux
希望这个使用 Flux 将数据写入 InfluxDB 之外的数据存储的示例说明了多数据源 Flux 的强大功能和灵活性。我很高兴探索多数据源 Flux 在未来几个月内肯定会开启的所有不同可能性。
加入我一起探索 Flux,方法是注册 InfluxDB Cloud 2 或下载 最新的 OSS alpha。与往常一样,如果您对 Flux 有疑问或功能请求,请访问 Flux 的社区论坛 并告诉我们!