Quix社区插件为InfluxDB:构建您自己的流式任务引擎

导航到

随着我们为InfluxDB 3.0 OSS制定的计划,我自己和DevRel团队的其他成员一直在积极寻找将逻辑整合到InfluxDB未来的生态系统平台。这些平台之一就是Quix!

Quix是一个专为使用Python创建、启动和监督事件流应用程序的综合解决方案。如果您想实时筛选时间序列或事件数据以进行即时决策,Quix就是您的首选。我们发现它是一个出色的InfluxDB 3.0替代任务引擎。如果您想查看我们使用Quix创建异常检测管道的初始项目,请查看这篇博客

在此基础上,我们非常高兴地宣布推出两个Quix的InfluxDB社区插件。在这篇博客文章中,我们将逐一介绍每个插件的功能,并提供一个您可以尝试的用例示例。

Quix-Community-Plugins-for-InfluxDB-OG

插件

社区贡献包括两个插件

  1. InfluxDB源:此插件允许用户在用户定义的间隔内使用Apache Arrow Flight SQL查询InfluxDB 3.0,将数据解析为pandas DataFrame,并将其发布到Quix流式主题。
  2. InfluxDB目标:此插件允许用户从Quix流式主题中获取DataFrame,并在将数据写入InfluxDB实例(与InfluxDB 2.x和3.x兼容)之前定义其结构(测量和标签)。

quix-influxdb-image18

您可以通过左侧菜单导航到“代码示例”并搜索“InfluxDB”来在Quix平台中找到这些插件。让我们分别查看每个插件的配置。

InfluxDB源

InfluxDB源是一个基于Python的插件,因此在Quix环境中可以根据您的需求高度自定义。您可以在他们的云编辑器中简单编辑部署前的Python脚本。让我们先从“开箱即用”的插件概述开始。

quix-influxdb-image19

InfluxDB源插件需要定义以下环境变量才能运行

  • 输出:这是将接收流的输出主题(默认:influxdb,必需:是)
  • 任务间隔:运行查询的间隔。必须在InfluxDB表示法内;1s、1m、1h、1d、1w、1mo、1y(默认:5m,必需:是)
  • INFLUXDB_HOST:InfluxDB实例的主机地址(默认:eu-central-1-1.aws.cloud2.influxdata.com,必需:是)
  • INFLUXDB_TOKEN:访问InfluxDB的认证令牌(默认:<TOKEN>,必需:是)
  • INFLUXDB_ORG:InfluxDB中的组织名称(默认:<ORG>,非必需:否)
  • INFLUXDB_DATABASE:InfluxDB中存储数据的数据库名称(默认:<DATABASE>,必需:是)
  • INFLUXDB_MEASUREMENT_NAME:要从中读取数据的InfluxDB测量(默认:<INSERT MEASUREMENT>,必需:是)

一旦设置了这些参数,脚本将按照以下方式运行

quix-influxdb-image1

如图所示,此插件在关闭前循环。根据提供的间隔,脚本在重复查询之前会暂停。该间隔还用于形成查询,因为我们只会根据插件暂停的时间查询最新的数据。请注意,此插件使用DataFrame格式推送结果数据。

InfluxDB目标

InfluxDB 目标插件 – 与源插件一样,完全是基于 Python 的 – 允许您自定义插件如何将数据写入 InfluxDB(我们将在这个演示中使用这个功能)。现在,让我们先从另一个“开箱即用”插件的概述开始。

quix-influxdb-image5

InfluxDB 目标插件需要您定义以下环境变量才能运行

  • input:这是输入主题(默认:detection-result,必需:是)
  • INFLUXDB_HOST:InfluxDB实例的主机地址(默认:eu-central-1-1.aws.cloud2.influxdata.com,必需:是)
  • INFLUXDB_TOKEN:访问InfluxDB的认证令牌(默认:<TOKEN>,必需:是)
  • INFLUXDB_ORG:InfluxDB中的组织名称(默认:<ORG>,非必需:否)
  • INFLUXDB_DATABASE:数据应存储的 InfluxDB 数据库名称。(默认:<DATABASE>,必需:是)
  • INFLUXDB_TAG_COLUMNS:写入 InfluxDB 数据时要使用的列作为标签。(默认:['tag1', 'tag2'],必需:否)
  • INFLUXDB_MEASUREMENT_NAME:要写入数据的 InfluxDB 度量名称。如果未指定,则使用输入主题的名称。(默认:<INSERT MEASUREMENT>,必需:否)

一旦设置了这些参数,脚本将按照以下方式运行

quix-influxdb-image7

如图所示,在启动时,脚本会加载指定的环境变量并初始化 InfluxDB 客户端。然后,它等待选定的主题将数据流式传输到插件。流式数据作为 pandas DataFrame 接收,因此在写入 InfluxDB 之前,我们需要应用一些基本转换(重命名时间列,将时间设置为索引)。然后,我们根据环境变量中指定的架构将 DataFrame 写入 InfluxDB。

演示

现在我们已经讨论了每个插件及其相应的设计,让我们将它们应用到工业物联网用例中。

Quix-Community-Plugins-for-InfluxDB-OG

在这个例子中,我们有三个在生产线上工作的机器,它们生成传感器数据并写入 MQTT 代理(对于这个用例,我们使用 HiveMQ)。每个机器的有效负载都在一个如下所示的 JSON 结构中。

{"metadata": {"machineID": "machine1", "barcode": "31856669", "provider": "Miller-Phillips"}, "data": [{"temperature": 40}, {"load": 100}, {"power": 204}, {"vibration": 90}]}

我们首先将 Quix MQTT 客户端连接到 HiveMQ 代理。

MQTT 客户端 -> Quix -> InfluxDB

首先,我找到了 MQTT 插件,它连接到代理并将数据写入 Quix 流(右侧的那个)。

quix-influxdb-image15

在检查代码时,我意识到我需要进行一些小的代码更改,因为我连接到一个不需要 TLS 验证的代理。我需要删除这些行

# we'll be using tls
mqtt_client.tls_set(tls_version = mqtt.client.ssl.PROTOCOL_TLS)
mqtt_client.username_pw_set(os.environ["mqtt_username"], os.environ["mqtt_password"])

接下来,我使用提供的环境变量建立与代理的连接。

quix-influxdb-image6

然后点击 新建部署。配置我们的资源限制(默认即可)并点击 部署。这样,我们的第一阶段就完成了。

quix-influxdb-image9

接下来,我们需要将此数据写入 InfluxDB。为此,我们使用新的目标插件进行一些修改。

我们遵循类似的过程来选择 InfluxDB 3.0 目标插件并生成一个项目。现在我们有一个小问题需要克服。目前,目标插件仅支持摄入 Quix DataFrames。在我们的情况下,我们正在写入 JSON 格式的 Event 数据。因此,我们需要编写一个小型的转换函数来处理事件数据,您可以看到这里

def on_event_data_received_handler(stream_consumer: qx.StreamConsumer,data: qx.EventData):
    with data:
        jsondata = json.loads(data.value)
        metadata = jsondata['metadata']
        data_points = jsondata['data']
        fields = {k: v for d in data_points for k, v in d.items()}
        timestamp = str(data.timestamp)

        point = {"measurement": measurement_name, "tags" : metadata, "fields": fields, "time": timestamp}

        print(point)
        client.write(record=point)

def on_stream_received_handler(stream_consumer: qx.StreamConsumer):

    # subscribe to new DataFrames being received
    # if you aren't familiar with DataFrames there are other callbacks available
    # refer to the docs here: https://docs.quix.io/sdk/subscribe.html
    stream_consumer.timeseries.on_dataframe_received = on_dataframe_received_handler

    stream_consumer.events.on_data_received = on_event_data_received_handler

您将使用的代码的大部分已经存在。我们添加的主要元素是 on_event_data_received_handler。现在我们已经这样做,我们就像为 MQTT 连接器那样定义我们的环境变量。

quix-influxdb-image16

关于这两个环境变量的说明

  1. 您可以将 InfluxDB_token 修改为安全的环境变量以保护您的令牌。
  2. 在这个例子中,我们没有使用 InfluxDB_tag,因为我们使用JSON负载中的元数据作为标签。

点击 新部署。配置我们的资源限制(默认设置即可)并点击 部署

quix-influxdb-image17

我们现在将原始机器数据写入InfluxDB中选择好的测量值中。接下来,让我们看看如何利用Quix作为基于任务的引擎来转换我们存储的原始数据。

InfluxDB -> Quix(转换)-> InfluxDB

我们将利用社区插件开箱即用的配置来组合一个转换任务。这个转换任务很简单

  1. 查询最后1分钟的数据
  2. 添加一个新的列,检查该间隔内的振动是否超过了用户定义的阈值。这个列将根据结果包含true或false。
  3. 将数据写回到InfluxDB中的一个新表中

让我们从查询InfluxDB中的数据开始。这次我们将使用我们的InfluxDB 3.0源插件。像我们之前的例子一样,我们从 代码示例 库中搜索并选择它,然后创建一个项目。

我们不需要为此插件修改任何代码。只需简单地定义我们的环境变量

quix-influxdb-image14

然后我们点击 新部署。配置我们的资源限制(默认设置即可)并点击 部署

quix-influxdb-image13

我们的查询数据现在正直接作为DataFrame写入InfluxDB的主题。我们现在可以部署并创建一系列转换插件来重塑我们的数据。对于这个例子,我们将保持简单

  1. 摄取DataFrame
  2. 在振动列中使用基本条件逻辑检查是否超过我们预定义的阈值
  3. 创建新的布尔列并将DataFrame写入一个新主题读取器以进行摄取。

代码如下

import quixstreams as qx
import os
import pandas as pd

client = qx.QuixStreamingClient()

topic_consumer = client.get_topic_consumer(os.environ["input"], consumer_group = "empty-transformation")
topic_producer = client.get_topic_producer(os.environ["output"])

def on_dataframe_received_handler(stream_consumer: qx.StreamConsumer, df: pd.DataFrame):

        vibration_limit = int(os.environ["vibration_limit"])
        df['over_limit'] = df['vibration'] > vibration_limit

        stream_producer = topic_producer.get_or_create_stream(stream_id = stream_consumer.stream_id)
        stream_producer.timeseries.buffer.publish(df)

def on_event_data_received_handler(stream_consumer: qx.StreamConsumer, data: qx.EventData):
    print(data)
    # handle your event data here

def on_stream_received_handler(stream_consumer: qx.StreamConsumer):
    stream_consumer.events.on_data_received = on_event_data_received_handler # register the event data callback
    stream_consumer.timeseries.on_dataframe_received = on_dataframe_received_handler

topic_consumer.on_stream_received = on_stream_received_handler

print("Listening to streams. Press CTRL-C to exit.")

qx.App.run()

基于这个基本的转换插件,我们有三个环境变量需要定义。

quix-influxdb-image11

然后我们点击 新部署。配置我们的资源限制(默认设置即可)并点击 部署

quix-influxdb-image3

我们的最后一步是将数据写回到InfluxDB。对于这个任务,我们部署了另一个InfluxDB目标插件实例。这次,因为我们从转换主题中摄取DataFrame,所以我们只需要定义环境变量。

quix-influxdb-image12

关于这两个环境变量的说明

  1. InfluxDB_measurement 将我们的转换数据写入一个名为 transformed 的新表。InfluxDB会在需要时创建这个表。
  2. 我们提供了一个字符串数组,用于定义我们希望作为标签的列:['machineID', 'barcode', 'provider']

然后我们点击 新部署。配置我们的资源限制(默认设置即可)并点击 部署

quix-influxdb-image8

结论

这就是我们成功部署了针对InfluxDB的事件流管道和任务引擎!从宏观的角度来看,它看起来是这样的。

quix-influxdb-image10

(感谢Quix的出色用户界面)

以下是我们的原始机器数据和转换数据在InfluxDB中的样子。

quix-influxdb-image20 (原始机器数据)

quix-influxdb-image4 (转换后的机器数据)

总的来说,我们利用了Quix平台和新的InfluxDB社区插件,从三个MQTT主题中摄取“实时”原始机器数据,将这些数据存储在InfluxDB中,然后从存储的数据中提取新的价值。 那么你接下来可以做什么呢?

我希望你能够将这个例子应用到自己的需求中,并使用Quix平台进行扩展。使用Quix的主要好处是我们可以有效地扩展源、转换和目标的数量以满足我们的需求。一个很好的例子是扩展我们的任务引擎

  1. 下采样脚本
  2. 异常检测算法
  3. 检查和警报脚本

每个脚本都将订阅我们的InfluxDB主题,并与彼此并行工作,相比传统的需要重新查询数据的方法,更有效地利用事件流管道中的数据。

您可以在此处找到类似项目的源代码。如果您有任何问题或想进一步讨论InfluxDB或Quix,欢迎加入我们的Slack频道