Flux:使用 InfluxDB 进行边缘数据复制的关键

导航至

本文最初发表于 The New Stack,经许可在此转载。

EDR 使开发者能够在边缘使用 InfluxDB 的全部功能。开发者还可以在云端为不同目的使用相同的数据。

Flux 是 InfluxDB 时间序列数据库平台的数据脚本和查询语言,支持边缘数据复制 (EDR) 等实用功能。

EDR 允许开发者将数据从 InfluxDB 开源实例中的存储桶自动复制到 InfluxDB Cloud 的实例中。此功能改变了物联网开发者在 InfluxDB 之上构建物联网应用程序的思路,因为它在给定系统的不同元素之间创建了持久且可靠的数据传输。EDR 使开发者能够在边缘利用 InfluxDB 的全部功能,专为边缘而设计。与此同时,它允许开发者在云端为完全不同的目的使用相同的数据或数据的子集。

EDR 可以实现诸如从边缘源收集高保真数据、在边缘清理和转换数据、在云端整合边缘数据以生成边缘整体的全局视图,以及对云端整合的数据进行时间序列分析和警报等任务。

Flux 的优势

作为一种功能齐全的脚本和查询语言,Flux 具有强大的功能。它允许您查询时间序列数据,为创建警报、管理时间序列数据生命周期以及清理、分析和转换数据奠定基础。

Flux 的优势在于它提供了在服务器端执行这些查询和功能的能力,从而提高了性能。事实上,一些 Flux 函数将查询下推到 InfluxDB 平台中,在存储层执行它们,以优先考虑速度和内存优化。

您可以在 此处 了解更多关于 InfluxDB 中的下推的信息。

使用 Flux 查询

一个简单的 Flux 查询如下所示

from(bucket: "bucket1") 
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r.tag1 == "tagvalue1")
|> filter(fn: (r) => r._field == "field1")

您可以使用 from() 函数选择要查询数据的存储桶。range() 函数指定数据查询的时间窗口或间隔。最后,filter() 函数允许您指定要查询的数据子集。

InfluxDB 使用 measurements 来分组或组织大型数据子集,使用 tags 来存储关于时间序列数据的元数据,并使用 fields 来存储时间序列数据的实际值。

例如,要查询天气数据,Flux 查询可能如下所示

from(bucket: "weather") 
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "United States")
|> filter(fn: (r) => r.location == "Austin")
|> filter(fn: (r) => r.temperature == 78.0)

要了解更多关于编写 Flux 查询的信息,有很多资源可以帮助您入门。

管理您的时间序列数据生命周期

Flux 允许您以多种方式转换和聚合时间序列数据。Flux 的一个常见用途是创建任务来降采样数据,以便为您的数据生成物化视图。降采样将高分辨率数据转换为低分辨率摘要。除其他外,降采样减少了 InfluxDB 实例的总体磁盘大小,同时保留了数据的整体形状。

Flux 中的降采样任务可能如下所示

// Task Options
option task = {name: "downsample task example", every: 1w}

// Defines a data source
data = from(bucket: "weather")
// queries data for 1w based off of the task configuration options above
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "United States")
|> filter(fn: (r) => r.location == "Austin")
|> filter(fn: (r) => r.temperature == 78.0)

data
// Windows and aggregates the data in to 1h averages
|> aggregateWindow(fn: mean, every: 1h)
// Stores the aggregated data in a new bucket
|> to(bucket: "data-downsampled")

此任务获取高精度温度数据并收集一小时平均值,然后使用 to() 函数将该数据写入新存储桶。

对您的数据发出警报

鉴于最近推出的边缘数据复制,我们希望更深入地探讨如何使用 Flux 来实现从边缘到云端的数据转换。Flux 提供了多种方法来检查您的数据是否满足某些条件,并在满足条件时发出警报。您可以使用 Flux 根据您在边缘使用或收集的数据以及云端整合的数据设置警报。

在 Flux 中创建警报有几种不同的方法。您可以通过 InfluxDB Cloud UI 构建阈值或 ‘deadman’ 检查。此方法会自动为您创建的每个警报生成 Flux 脚本,并使用 InfluxDB 检查和通知系统。此外,您可以编写自己的自定义检查和通知任务。如何检查数据和发出警报取决于您。Flux 在处理数据方面为用户提供了很大的灵活性。您可以使用 Flux 将警报发送到以下通知端点:Slack、Microsoft Teams、PagerDuty 以及许多其他平台

Notification endpoints with Flux

要了解更多关于创建检查和通知的信息,请查看以下资源

  • 警报任务: “Time to Awesome” 的这一节描述了如何编写一个警报任务,该任务检查您的数据并将警报发送到 Slack。
  • 检查和通知: “Time to Awesome” 的这一节详细描述了 InfluxDB 中的检查和通知系统是如何工作的。
  • 在 InfluxDB UI 中创建检查:此文档描述了几种不同的方法来使用 InfluxDB 和 Flux 创建检查。
  • 自定义检查文档:InfluxDB 文档始终是关于 Flux 和 InfluxDB 的所有内容的绝佳资源。

分析、清理和转换您的数据

转换时间序列数据并对其应用统计分析可以让您从中获得有价值的见解。在使用边缘数据复制功能并在云端整合数据之前,您可能还需要清理和准备数据。

Flux 有许多不同的函数用于分析和准备时间序列数据。有用于统计时间序列分析的转换函数、用于动态统计和基本时间序列分析的转换函数,以及用于金融分析的技术动量指标。还有用于 Math 的附加包,用于处理地理时空数据

Flux 还可以转换您的数据。例如,joins (视频)、unionspivots (视频) 会改变数据的形状。

成功使用时间序列数据的另一个关键组成部分是操纵时间戳的能力。有一组函数允许您操纵时间戳并执行各种时间序列转换。

您还可以使用 Flux 编写自定义算法。例如,以下链接演示了用 Flux 编写的两种不同的异常检测算法

  1. 使用中位数绝对偏差进行异常检测
  2. Flux 中机器学习的深入研究:朴素贝叶斯分类

InfluxDB 中边缘数据复制的强大功能

最近,InfluxDB 发布了一项名为边缘数据复制的新功能。EDR 允许您配置 InfluxDB 开源实例中的存储桶,以将数据从该存储桶自动复制到 InfluxDB Cloud 实例中的存储桶。

例如,使用 EDR,您可以在边缘或雾端运行多个 OSS 实例,并在那里收集高精度数据。然后,您可以创建任务来降采样、处理或转换该数据,然后再将其自动复制到 InfluxDB Cloud 的实例。您可以使用此功能构建混合架构,并通过最大限度地减少传输到云端的数据量来节省成本,或者在数据到达云数据存储之前清理数据,从而提高效率。

设置边缘数据复制只需几个步骤

  1. 使用 CLI 和 influx remote command 创建远程连接,如下所示
    influx remote create \
    --name example-remote-name \
    --remote-url https://us-west-2-1.aws.cloud2.influxdata.com \
    --remote-api-token mYsuP3r5Ecr37t0k3n \
    --remote-org-id 00xoXXoxXX00
  2. 添加任何所需的 降采样或转换任务,并将数据写入 OSS 中的新存储桶。
  3. 使用 influx replication command 从您的 OSS 存储桶创建到 InfluxDB Cloud 存储桶的复制流,如下所示
    influx replication create \
    --name example-replication-stream-name \
    --remote-id 00xoXXXo0X0x \
    --local-bucket-id Xxxo00Xx000o \
    --remote-bucket-id 0xXXx00oooXx

底线

我希望本文能启发您探索 InfluxDB、Flux 和边缘数据复制。要了解更多关于这些主题的信息,请查看我们在 Data + AI Summit 会议上的即将到来的演讲,我们在其中详细讨论了这些主题,或者在我们的社区 Slack 频道上与我联系。