使用时间序列数据库进行预测分析

导航至

本文最初发表在 The New Stack 上,经许可在此重新发布。

以高速和高量处理带时间戳的数据,时间序列数据库特别适合异常检测和预测性维护。

预测分析利用大数据、统计算法和机器学习技术,根据历史数据预测未来结果。从金融和医疗保健到零售和营销,各个行业都使用预测分析。在其众多应用中,预测维护和异常检测是两个重要的应用。预测维护利用预测分析预测设备故障,以便及时维护并预防意外停机。

同样,异常检测利用同样的预测能力来识别数据中的异常,这可能表明存在问题,如金融交易中的欺诈或网络安全中的入侵。这些预测分析的共同应用帮助组织保持积极主动和了解情况,为提高效率、降低风险和改善决策铺平道路。

时间序列数据库为执行预测分析提供关键功能。专门用于处理按时间顺序索引的数据点,它允许以高速和高容量存储、检索和处理带时间戳的数据。这些功能使时间序列数据库特别适合于异常检测和预测维护等任务。

InfluxDB 3.0 是一个时间序列数据库和平台,用于存储所有类型的时间序列数据,包括指标、事件、日志和跟踪。

在这篇文章中,我们将探讨如何将 InfluxDB Cloud、Quix 和 Hugging Face 结合起来进行预测维护,包括预测分析和预测。 Quix 是一个平台,允许您部署用于分析和机器学习的流式管道。 Hugging Face 是一个机器学习平台,使用户能够训练、构建、托管和部署开源机器学习模型和数据集。

数据集

本文中我们将使用的数据集来自此 repo,具体是 这个脚本。它包含各种机器ID的温度、负载和振动等机器数据。这是一个伪造的数据集,以便在需要时诱导异常来测试异常检测。以下是 influxdb-query 服务输出的数据。

the dataset - InfluxDB-query

Quix 异常检测和预测管道

Quix 允许我们部署用于分析和机器学习的流式管道。下面的图片展示了我们的管道

Prediction Pipeline

工作区管道包含以下服务

  1. 一个源项目:这个服务项目负责使用 InfluxQL(查询语言)查询 InfluxDB Cloud,并将输出转换为 Pandas DataFrame,以便转换块可以消费它。

  2. 两个转换:这两个服务项目负责在数据中找到异常并生成预测。它们并行处理来自源服务的源数据。

    • 事件检测寻找异常。
    • 预测转换生成预测。
  3. 两个写入项目:这些服务项目负责使用 InfluxDB 3.0 Python 客户端库 将数据写回 InfluxDB Cloud。有两个写入实例,因为我们正在写入两个不同的 InfluxDB Cloud 实例。然而,如果您希望将所有数据写入同一实例,可以选择并行写入数据。

Quix之所以易于使用,是因为您可以通过选择各种常见服务来添加新的服务(服务按用户定义的日程运行)或新的作业(仅运行一次)。它们包含所有必要的模板,以流式传输数据并将输入传递到正确的输出。此外,您还可以轻松地在项目之间传输Pandas DataFrame,从而消除任何数据转换工作。

code samples

一旦选择了一个新的转换服务,您可以从各种示例脚本中选择添加到您的管道中。

源项目

此块每60秒运行一次。它使用InfluxDB 3.0 Python客户端库查询以下代码过去5分钟的机器数据。

def get_data():

    # Query InfluxDB Cloud 3.0 
    while run:
        try:
            query = "SHOW TAG VALUES WITH KEY = \"machineID\""
            table = client.query(query=query, language="influxql")
            machines = table["value"].to_pylist()

            for machine in machines:
                # this will end up loop when all data has been sent
                table = client.query(query=f"SELECT vibration, machineID FROM machine_data WHERE time >= now() - 5m AND machineID = '{machine}'", language="influxql")
                df = table.to_pandas()
                print(df)
                if df.empty:
                    break

                # If there are rows to write to the stream at this time
                stream_producer.timeseries.buffer.publish(df)
                sleep(int(os.environ["task_interval"]))
        except:
            print("query failed")
            sleep(int(os.environ["task_interval"]))

您配置与InfluxDB Cloud的连接。

事件检测项目

在这个教程中,我们使用了Keras Autoencoders来创建和训练一个异常检测模型。自编码器是一种用于学习输入数据有效编码的人工神经网络。在异常检测中,自编码器在正常数据上训练,并学习尽可能准确地再现它。当呈现新数据时,自编码器会尝试使用从正常数据中学到的模式来重建它。如果重建误差(原始输入与自编码器输出的差异)显著较高,则模型将新数据点分类为异常,因为它与正常数据有显著差异。

我们使用这个Jupyter笔记本推送到Hugging Face之前训练数据模型。接下来,我们将模型导入到Quix事件检测项目中。该模型是变量,因此您可以轻松地在Hugging Face中交换模型。这使您可以将模型调优和训练工作流程与管道部署分离。

from huggingface_hub import from_pretrained_keras

# Quix injects credentials automatically to the client.
# Alternatively, you can always pass an SDK token manually as an argument.
client = qx.QuixStreamingClient()

print("Opening input and output topics")
consumer_topic = client.get_topic_consumer(os.environ["input"], "default-consumer-group")
producer_topic = client.get_topic_producer(os.environ["output"])
model = from_pretrained_keras(os.environ["model"])

An example of editing the model variable

编辑模型变量的示例,通过指定模型变量轻松地从Hugging Face拉取不同的训练模型。选择的模型是jayclifford345/vibration-autoencoder

预测转换项目

我们从“代码示例”中的示例Starter转换项目构建了这个项目。它使用statsmodels中的Holt Winters创建快速预测。

df = df.set_index('timestamp')
    data = df.drop(columns=['iox::measurement', 'machineID'])
    fit = Holt(data,damped_trend=True,initialization_method="estimated").fit(optimized=True)
    fcast = fit.forecast(10).rename("Multiplicative damped trend")
    fcast = fcast.reset_index().rename(columns={'index': 'timestamp'})

编写项目

编写项目(influxdb-write和influxdb-write-2)将异常和预测写入两个不同的InfluxDB实例。这个设计选择是任意的。它仅展示了这种架构作为一个选项。InfluxDB 3.0 Python客户端库将DataFrame写入这两个实例。

import influxdb_client_3 as InfluxDBClient3

client = qx.QuixStreamingClient()

# get the topic consumer for a specific consumer group
topic_consumer = client.get_topic_consumer(topic_id_or_name = os.environ["input"],
                                           consumer_group = "empty-destination")

client = InfluxDBClient3.InfluxDBClient3(token=os.environ["INFLUX_TOKEN"],
                         host=os.environ["INFLUX_HOST"],
                         org=os.environ["INFLUX_ORG"],
                         database=os.environ["INFLUX_DATABASE"])

def on_dataframe_received_handler(stream_consumer: qx.StreamConsumer, df: pd.DataFrame):
    # do something with the data here
    df = df.rename(columns={'timestamp': 'time'})
    df = df.set_index('time')
    client.write(df, data_frame_measurement_name='mlresult', data_frame_tag_columns=['machineID']) 
    print("Write successful")

最后的想法

将我们的异常和预测写入InfluxDB后,我们可以使用像Grafana这样的工具来可视化数据并创建对其采取行动的警报。例如,如果我们得到太多的异常,我们可能会决定需要诊断系统中的问题、更换传感器或机器、协助预测性维护或重新设计制造流程。对振动预测的警报可以防止制造中断或优化运营。

InfluxDB Cloud是一个很好的工具,用于存储所有的时间序列数据。它是基于Apache生态系统编写的,并利用DataFusion、Arrow和Parquet等技术,以实现非常高效的写入、存储和查询。它支持SQL查询,这使得开发人员可以专注于在InfluxDB上构建类似这种预测性维护管道的解决方案。它还提供与其他许多工具的互操作性,以便您可以根据特定的预测分析需求利用它们。从这里开始使用InfluxDB Cloud 3.0