使用Flux将数据写入SQL数据存储

导航至

正如在多数据源Flux中所讨论的,Flux现在可以通过SQL.from()从RDBMs中提取数据,并用于执行更深入的分析。在许多情况下,数据分析的结果自然适合时序存储。在其他情况下,结果更适合在不同的数据存储中利用。为了实现Flux的通用分析引擎愿景,数据必须流入和流出InfluxDB。

为此,InfluxDB 2.0 Alpha 15引入了SQL.to()。SQL.to()通过启用写入MySQL、MariaDB和Postgres来复制Flux从SQL存储中提取数据的能力。以下,我将概述使用SQL.to()并介绍一个将事件计数并存储到Postgres的示例。

表格使世界运转

一旦我了解到Flux结果实际上是数据表,我就想知道Flux结果表与RDBMs表有多接近。结果是它们足够接近,Flux可以利用它们的相似性使SQL写入变得极为优雅。

一旦您将数据塑形并命名以符合目标表定义,您只需将数据通过管道发送到SQL.to()。让我们通过构建一个新的计费系统的用例来演示这个新功能。

水族箱健康供应商 - 一种新的商业模式

我最喜欢的虚构水族箱健康供应商注意到支持不同客户成本的大幅差异。经过仔细研究他们的数据,他们发现成本几乎完全由他们的自动化水系统执行的纠正操作数量驱动。他们决定调整订阅模式,改为按每周纠正事件的数量向客户收费。

幸运的是,每次部署到客户鱼缸中的设备采取纠正措施,如调整pH值或硝酸盐水平时,设备都会向InfluxDB发送事件。事件类似于传感器数据的流,只不过它们是按需发送,而不是每X秒发送一次。这些按需事件也称为“不规则时间序列”,因为它们可以随时发生。

事件的特定格式和内容可以是任何你想要的内容,但让我们假设这些事件看起来像下面这两个示例(显示在行协议中)

# Adjustment events - note timestamp will be added by InfluxDB at arrival time
water_sensor,type=adjustment,adjust_what=nitrates,change=up,device_id=TLM0101 triggering_value=5
water_sensor,type=adjustment,adjust_what=pH,change=down,device_id=TLM0102 triggering_value=9

为什么要使用SQL数据存储?

事件已经在InfluxDB中。为什么引入SQL数据存储?时间序列数据库的几个特性,从一般意义上讲,并不适合作为计费系统的基础。

InfluxDB旨在灵活地适应经常变化的大量数据,无论是由于端点变化还是数据变化,因为客户端可以在写入时执行像模式这样的操作。通常,用户会迅速输入数以千万计的数据点,但几周后就会下采样或删除大部分数据。当您有数百万个不同的传感器或数千个服务器需要不断升级时,这种灵活性是非常好的。

相比之下,任何试图创建计费系统的人都想保留多年的稳定客户计费数据。虽然在大规模捕获原始数据时,将它们存储在InfluxDB等时间序列数据库中是有意义的,但汇总和经过处理的数据则代表了一个“成品”,可能更适合与其他客户数据一起存储,例如计费地址和电话号码。为稳定、不常更改的表而设计的数据库可能是计费系统的更好基础。

快点,开始处理数字吧!

为了收集新订阅计费模型的数据,我们的鱼缸供应商决定创建一个简单的每周任务,计算每个设备的调整次数,并将它们保存到他们的Postgres数据库中。(注意:这个简单场景不会扩展——在设备数量很大时,Postgres和InfluxDB都会出现瓶颈。可能需要一个更全面的方案,比如为每个客户创建一个任务。)

存储调整次数的第一步是在Postgres中创建一个表。以下是创建简单示例表所使用的Postgres SQL命令

CREATE TABLE adjustment_billing 
     (billing_record SERIAL PRIMARY KEY,  device_id VARCHAR NOT NULL, 
             adjustment_actions integer,  created_on TIMESTAMP);

有了存储结果的表,供应商开始组装Flux脚本的各个部分。我们首先从tank-health存储桶中获取上周的数据,仅过滤调整事件,并对数据进行分组

import "sql"

adjustmentActions = from(bucket: "tank-health")
  |> range(start: -1w)
  |> filter(fn: (r) => r._measurement == "water_sensor" and r.type == "adjustment")
  |> group(columns: ["type", "device_id"], mode:"by")

Write data out to SQL data stores with Flux

按类型和设备ID分组的结果是每个设备的表格。为了得到每个设备(表格)的调整次数,我们将结果管道转发以计算change列中有值的行数

// Count the number of changes for each device
  |> count(column: "change")

Write data out to SQL data stores with Flux - table 2

请注意,在此阶段,仅保留了分组中指出的列和计数。(技术上,_start和_stop时间存在,但在下一步中被删除。)

这个结果与创建的SQL计费表格式非常接近,但并不完全匹配,需要进行最终调整。

// Adjust the table to match the SQL destination schema/format
  |> rename(columns: {change: "adjustment_actions"})
  |> keep(columns: ["device_id", "adjustment_actions"])

Write data out to SQL data stores - with - Flux - table3

现在,数据列与计费表完美匹配,Flux脚本可以简单地存储结果。

// Store result in Postgres
|> sql.to(driverName: "postgres", dataSourceName: "postgresql://127.0.0.1?sslmode=disable", table: "adjustment_billing")

任务保存并设置为每周运行后,每个设备的调整事件计数将存储在Postgres中。如果任务运行后直接使用“SELECT * from adjustment_billing”查询Postgres,你就可以看到所有Flux工作的成果。

今天就开始使用Flux吧

希望这个使用Flux将数据写入InfluxDB以外的数据存储的示例能够说明多数据源Flux的强大和灵活性。我期待着在接下来的几个月内探索多数据源Flux将带来的所有不同可能性。

加入我一起探索Flux,注册InfluxDB 云2 或下载最新的 开源alpha版本。一如既往,如果您对Flux有任何问题或功能请求,请前往 Flux社区论坛 并告诉我们!