使用时间序列数据库进行预测分析
作者 Anais Dotis-Georgiou / 用例, 产品
2023年9月5日
导航至
本文最初发表在 The New Stack 上,经许可在此重新发布。
以高速和高量处理带时间戳的数据,时间序列数据库特别适合异常检测和预测性维护。
预测分析利用大数据、统计算法和机器学习技术,根据历史数据预测未来结果。从金融和医疗保健到零售和营销,各个行业都使用预测分析。在其众多应用中,预测维护和异常检测是两个重要的应用。预测维护利用预测分析预测设备故障,以便及时维护并预防意外停机。
同样,异常检测利用同样的预测能力来识别数据中的异常,这可能表明存在问题,如金融交易中的欺诈或网络安全中的入侵。这些预测分析的共同应用帮助组织保持积极主动和了解情况,为提高效率、降低风险和改善决策铺平道路。
时间序列数据库为执行预测分析提供关键功能。专门用于处理按时间顺序索引的数据点,它允许以高速和高容量存储、检索和处理带时间戳的数据。这些功能使时间序列数据库特别适合于异常检测和预测维护等任务。
InfluxDB 3.0 是一个时间序列数据库和平台,用于存储所有类型的时间序列数据,包括指标、事件、日志和跟踪。
在这篇文章中,我们将探讨如何将 InfluxDB Cloud、Quix 和 Hugging Face 结合起来进行预测维护,包括预测分析和预测。 Quix 是一个平台,允许您部署用于分析和机器学习的流式管道。 Hugging Face 是一个机器学习平台,使用户能够训练、构建、托管和部署开源机器学习模型和数据集。
数据集
本文中我们将使用的数据集来自此 repo,具体是 这个脚本。它包含各种机器ID的温度、负载和振动等机器数据。这是一个伪造的数据集,以便在需要时诱导异常来测试异常检测。以下是 influxdb-query 服务输出的数据。
Quix 异常检测和预测管道
Quix 允许我们部署用于分析和机器学习的流式管道。下面的图片展示了我们的管道
工作区管道包含以下服务
-
一个源项目:这个服务项目负责使用 InfluxQL(查询语言)查询 InfluxDB Cloud,并将输出转换为 Pandas DataFrame,以便转换块可以消费它。
-
两个转换:这两个服务项目负责在数据中找到异常并生成预测。它们并行处理来自源服务的源数据。
- 事件检测寻找异常。
- 预测转换生成预测。
-
两个写入项目:这些服务项目负责使用 InfluxDB 3.0 Python 客户端库 将数据写回 InfluxDB Cloud。有两个写入实例,因为我们正在写入两个不同的 InfluxDB Cloud 实例。然而,如果您希望将所有数据写入同一实例,可以选择并行写入数据。
Quix之所以易于使用,是因为您可以通过选择各种常见服务来添加新的服务(服务按用户定义的日程运行)或新的作业(仅运行一次)。它们包含所有必要的模板,以流式传输数据并将输入传递到正确的输出。此外,您还可以轻松地在项目之间传输Pandas DataFrame,从而消除任何数据转换工作。
源项目
此块每60秒运行一次。它使用InfluxDB 3.0 Python客户端库查询以下代码过去5分钟的机器数据。
def get_data():
# Query InfluxDB Cloud 3.0
while run:
try:
query = "SHOW TAG VALUES WITH KEY = \"machineID\""
table = client.query(query=query, language="influxql")
machines = table["value"].to_pylist()
for machine in machines:
# this will end up loop when all data has been sent
table = client.query(query=f"SELECT vibration, machineID FROM machine_data WHERE time >= now() - 5m AND machineID = '{machine}'", language="influxql")
df = table.to_pandas()
print(df)
if df.empty:
break
# If there are rows to write to the stream at this time
stream_producer.timeseries.buffer.publish(df)
sleep(int(os.environ["task_interval"]))
except:
print("query failed")
sleep(int(os.environ["task_interval"]))
您配置与InfluxDB Cloud的连接。
事件检测项目
在这个教程中,我们使用了Keras Autoencoders来创建和训练一个异常检测模型。自编码器是一种用于学习输入数据有效编码的人工神经网络。在异常检测中,自编码器在正常数据上训练,并学习尽可能准确地再现它。当呈现新数据时,自编码器会尝试使用从正常数据中学到的模式来重建它。如果重建误差(原始输入与自编码器输出的差异)显著较高,则模型将新数据点分类为异常,因为它与正常数据有显著差异。
我们使用这个Jupyter笔记本在推送到Hugging Face之前训练数据模型。接下来,我们将模型导入到Quix事件检测项目中。该模型是变量,因此您可以轻松地在Hugging Face中交换模型。这使您可以将模型调优和训练工作流程与管道部署分离。
from huggingface_hub import from_pretrained_keras
# Quix injects credentials automatically to the client.
# Alternatively, you can always pass an SDK token manually as an argument.
client = qx.QuixStreamingClient()
print("Opening input and output topics")
consumer_topic = client.get_topic_consumer(os.environ["input"], "default-consumer-group")
producer_topic = client.get_topic_producer(os.environ["output"])
model = from_pretrained_keras(os.environ["model"])
jayclifford345/vibration-autoencoder
。预测转换项目
我们从“代码示例”中的示例Starter转换项目构建了这个项目。它使用statsmodels中的Holt Winters创建快速预测。
df = df.set_index('timestamp')
data = df.drop(columns=['iox::measurement', 'machineID'])
fit = Holt(data,damped_trend=True,initialization_method="estimated").fit(optimized=True)
fcast = fit.forecast(10).rename("Multiplicative damped trend")
fcast = fcast.reset_index().rename(columns={'index': 'timestamp'})
编写项目
编写项目(influxdb-write和influxdb-write-2)将异常和预测写入两个不同的InfluxDB实例。这个设计选择是任意的。它仅展示了这种架构作为一个选项。InfluxDB 3.0 Python客户端库将DataFrame写入这两个实例。
import influxdb_client_3 as InfluxDBClient3
client = qx.QuixStreamingClient()
# get the topic consumer for a specific consumer group
topic_consumer = client.get_topic_consumer(topic_id_or_name = os.environ["input"],
consumer_group = "empty-destination")
client = InfluxDBClient3.InfluxDBClient3(token=os.environ["INFLUX_TOKEN"],
host=os.environ["INFLUX_HOST"],
org=os.environ["INFLUX_ORG"],
database=os.environ["INFLUX_DATABASE"])
def on_dataframe_received_handler(stream_consumer: qx.StreamConsumer, df: pd.DataFrame):
# do something with the data here
df = df.rename(columns={'timestamp': 'time'})
df = df.set_index('time')
client.write(df, data_frame_measurement_name='mlresult', data_frame_tag_columns=['machineID'])
print("Write successful")
最后的想法
将我们的异常和预测写入InfluxDB后,我们可以使用像Grafana这样的工具来可视化数据并创建对其采取行动的警报。例如,如果我们得到太多的异常,我们可能会决定需要诊断系统中的问题、更换传感器或机器、协助预测性维护或重新设计制造流程。对振动预测的警报可以防止制造中断或优化运营。
InfluxDB Cloud是一个很好的工具,用于存储所有的时间序列数据。它是基于Apache生态系统编写的,并利用DataFusion、Arrow和Parquet等技术,以实现非常高效的写入、存储和查询。它支持SQL查询,这使得开发人员可以专注于在InfluxDB上构建类似这种预测性维护管道的解决方案。它还提供与其他许多工具的互操作性,以便您可以根据特定的预测分析需求利用它们。从这里开始使用InfluxDB Cloud 3.0。