使用PyTorch和InfluxDB进行时间序列预测

导航至

本文由Smriti Satyan撰写。向下滚动以查看作者简介。

时间序列数据(也称为时间戳数据)是指在一定时间内测量的观察值(数据点)的集合。当在图表上绘制时,此类数据的坐标轴之一始终是时间。因为时间是每个可观测实体的组成部分,所以时间序列数据可以应用于各种行业,如股市、气象数据、日志和跟踪。

InfluxDB 是一个开源的时间序列数据库(TSDB)管理系统,专注于存储 时间序列 数据,并帮助组织构建实时分析和云应用。它是一个全面的支持时间序列数据收集、监控、分析可视化的平台。

在这篇文章中,您将学习 InfluxDB 的基础知识,包括如何在云中设置它,如何配置客户端以与它通信,以及如何从 InfluxDB Cloud 获取数据。您还将了解如何使用这些数据在 PyTorch 上训练模型以进行 预测。PyTorch 是一个开源的机器学习框架,具有可扩展的分布式训练和优化功能,可加速工作流程进入生产。

什么是 InfluxDB?

InfluxDB 是一个专为时间序列数据构建的时间序列数据平台。最新版本 InfluxDB 2.0 使用 Go 编写,可作为名为 InfluxDB Cloud 的云服务使用,并具有数据处理和可视化的网页界面。

InfluxDB 提供以下功能

  • 支持多种数据类型

  • 在数据传输过程中无网络协议问题,无论字段和标签的数量如何

  • 能够对时间序列数据进行元数据编码

  • 时间戳精度达到纳秒级(对于科学计算和金融至关重要)

注意:字段值是指与字段键关联的数据。这可能包括字符串、浮点数、整数或其他任何数据类型。字段值始终与时间戳相关联,因为 InfluxDB 是一个时间序列数据库。字段键和字段值对的组合称为 字段集。字段集是 InfluxDB 结构中的一个非索引的强制性组成部分。

在实际应用中,InfluxDB 已经帮助 劳斯莱斯动力系统 通过提供高效的数据存储、实时性能监控、识别季节性和趋势以及预测维护需求,提高了其制造工厂的运营效率。这极大地减少了昂贵的发动机故障,并加速了增长。

它还帮助德州仪器 监控和改进生产 和质量保证。使用 InfluxDB,德州仪器能够实时识别和排除生产线上效率低下的问题,从而提高产品质量标准。

有关多个行业的更多客户用例,请查看 此页面

开始时间序列预测

现在您对 InfluxDB 了解更多,您可以设置 InfluxDB,使其与 Python 客户端 通信,并拉取数据,以便您可以使用这些数据进行预测。

设置 InfluxDB

首先,您需要创建一个 InfluxDB 账户,该账户可用于与其他客户端无缝集成。请注册并填写所需信息

Sign up

创建账户后,指定一个 提供商区域,并将 公司名称 指定为“NA”

Specify-Region-and-Provider

查看并同意服务订阅协议,然后选择 继续 以选择计划

Service subscription agreement

在选择计划时,请选择 免费 订阅。现在,您已成功创建了一个 InfluxDB 账户。您将自动跳转到 InfluxDB 仪表板的 入门 页面。从这里,您可以了解如何从不同的客户端连接到 InfluxDB、推送数据、生成 Flux 查询 以及可视化时间序列数据

InfluxDB dashboard

为了与 InfluxDB 客户端(Python)通信,需要在 Python 脚本中指定一组凭证。

设置您的 API 令牌和桶名称

与客户端通信所需的凭证包括 API 令牌、组织、桶名称和 URL。一个 是一个具有保留期(即数据在桶中存储的时间段)的存储单元。一个 组织 是用户的工作空间。

要生成 API 令牌,请转到 UI 中的 API 令牌 选项卡

API Tokens tab

点击 生成 API 令牌 并选择 所有访问 API 令牌

Generate token

为 API 令牌提供名称(例如,“Sample1”)并复制其值。请安全存储。如果您丢失了 API 令牌,您将不得不创建一个新的,因为您以后无法访问 API 令牌的值

All Access Token

现在,您需要创建一个桶,方法是导航到左侧导航栏上的

Data Explorer

然后点击 创建桶。如果您想重命名桶,您可以转到 设置 > 重命名。这不太推荐,因为您将需要在脚本中更新桶名称,这可能会导致意外的后果,例如 ApiErrorConnectionError,以及无法连接到 InfluxDB Cloud

Create a bucket

安装 InfluxDB Python 库

在 InfluxDB Cloud 上设置账户后,您需要安装 Python 客户端,以便在云和客户端之间建立通信通道。

对于 pip 安装,请使用以下命令:

pip install influxdb-client

对于 macOS 安装,请使用以下命令:

brew install influxdb-client

您已安装了所需的依赖项。

配置客户端以与 InfluxDB Cloud 通信

安装客户端后,您需要通过指定一组凭证来配置它以与 InfluxDB 通信。为了确认已建立连接,请执行一个简单的 Flux 查询,并检查 InfluxDB 中的数据是否与您刚刚获取的数据匹配

# Fill in the below attributes after creating an account on InfluxDB Cloud

token = "YOUR_TOKEN_HERE"

org = "YOUR_EMAIL_HERE"

bucket = "YOUR_BUCKET_HERE"

# url depends on the region selected during sign up, an example is <url= "https://europe-west1-1.gcp.cloud2.influxdata.com">

url = "YOUR_URL_HERE"

client = InfluxDBClient(url=url, token=token, org=org, debug=True)

Flux 查询执行/其他源代码

results=[] # create an empty list

query = """option v = {timeRangeStart: -30d, timeRangeStop: now()}

from(bucket: "myBucket")

|> range(start: v.timeRangeStart, stop: v.timeRangeStop)

|> filter(fn: (r) => r["_measurement"] == "stallion_data")"""

tables = client.query_api().query(query, org=org)

for table in tables:

for record in table.records:

results.append(

[ record.get_field(),

record.get_value(),

record.get_measurement(),

]

) # append the values fetched into a list

安装 PyTorch

由于 PyTorch Forecasting 是在 PyTorch Lightning 的基础上构建的,因此需要安装 PyTorch 和 PyTorch Forecasting

如果您在 Windows 机器上,请运行以下命令:

pip install torch -f https://download.pytorch.org/whl/torch_stable.html.

如果您在 macOS 上,请运行此命令:

pip install pytorch-forecasting

对于 Conda 安装,您可以使用以下命令:

conda install pytorch-forecasting pytorch -c pytorch>=1.7 -c conda-forge

探索数据集

在本教程中,您将使用 PyTorch Forecasting 中现有的 Stallion 数据集。这些数据显示了各种饮料(以美元计)的销售情况。您的目标将是使用 21,000 个月的销售历史数据来 预测 未来六个月的销售情况。

Stallion 数据包含以下数据集:

  • pricesalespromotion.csv 包含价格、销售和促销(以美元计)

  • historicalvolume.csv 包含销售数据

  • weather.csv 包含月平均最高温度

  • industrysodasales.csv 包含行业级汽水销售

  • eventcalendar.csv 包含活动详情(体育、嘉年华等)

  • industry_volume.csv 包含行业实际啤酒容量

  • demographics.csv 包含人口统计细节

您可以使用多种方法将客户端数据加载到InfluxDB中,包括导入Pandas DataFrame,使用注释上传CSV文件,使用和消费InfluxDB API/write端点,或使用influx write命令。

在这个例子中,由于您使用自己的数据集(PyTorch的Stallion数据),您需要首先将数据(存储为Pandas DataFrame)推送到InfluxDB Cloud,然后再获取它。

然后编写一个Flux脚本,使用以下代码片段从InfluxDB Cloud查询数据到您的客户端环境

results = []

with InfluxDBClient(

url="https://us-east-1-1.aws.cloud2.influxdata.com", token=token, org=org

) as client:

query = """option v = {timeRangeStart: -30d, timeRangeStop: now()}

from(bucket: "myBucket")

|> range(start: v.timeRangeStart, stop: v.timeRangeStop)

|> filter(fn: (r) => r["_measurement"] == "stallion_data")"""

tables = client.query_api().query(query, org=org)

for table in tables:

for record in table.records:

# results.append(record)

results.append(

[

record.get_field(),

record.get_value(),

record.get_measurement(),

record.get_time(),

record.values.get("agency"),

record.values.get("sku"),

]

)

# convert the list to a dataframe

influx_df = pd.DataFrame(

results, columns=["_field", "_value", "_measurement", "time", "agency", "sku"]

)

注意:如果您使用的是免费套餐,将有一些限制。例如,您不能超过数据的时间戳三十天,并且在InfluxDB中不能写入超过特定数量的行。付费模式是克服这些限制的选项。

以下是您的数据在InfluxDB UI中应呈现的示例

Sample data in the bucket

Sample table

检查和清理数据

在任何预测管道中,最重要的步骤之一是了解数据集,这将给您一个关于数据的数量、单位和术语的概念。然后您需要清理数据以消除异常值、缺失值和不一致之处。这种清理后的数据将更容易处理、建模和预测。

Stallion数据相对较干净,并提取了像time_idx(每一步递增的索引)和month(从date列提取)这样的特征。

注意:如果您使用不同的数据集,数据清理步骤将涉及删除包含NaNsNaTs的行。您还可以删除包含缺失信息或冗余的行。

特征工程

时间序列数据的一个要求是它应该有一个显示时间戳或日期和时间的列,这对于预测目的来说是一个重要特征。

在Stallion数据集中,time_idx扮演了这一角色,它被转换为DataFrame的索引。还添加了如monthlog_volume等附加特征,这将有助于提高预测的准确性。

由于您可以使用PyTorch Forecasting进行预测,您需要将数据集转换为PyTorch数据集TimeSeriesDataSet,以简化处理、分析、建模和数据拟合。

training = TimeSeriesDataSet(

stallion_df[lambda x: x.time_idx <= training_cutoff],

time_idx="time_idx",

target="volume",

group_ids=["agency", "sku"],

min_encoder_length=max_encoder_length

// 2, # Encoder length should be long since it is in the validation set

max_encoder_length=max_encoder_length,

min_prediction_length=1,

max_prediction_length=max_prediction_length,

static_categoricals=["agency", "sku"],

static_reals=["avg_population_2017", "avg_yearly_household_income_2017"],

time_varying_known_categoricals=["special_days", "month"],

variable_groups={

"special_days": special_days

}, # a group of categorical variables is treated as a single variable

time_varying_known_reals=["time_idx", "price_regular", "discount_in_percent"],

time_varying_unknown_categoricals=[],

time_varying_unknown_reals=[

"volume",

"log_volume",

"industry_volume",

"soda_volume",

"avg_max_temp",

"avg_volume_by_agency",

"avg_volume_by_sku",

],

target_normalizer=GroupNormalizer(

groups=["agency", "sku"], transformation="softplus"

), # use softplus and normalize by group

add_relative_time_idx=True,

add_target_scales=True,

add_encoder_length=True,

allow_missing_timesteps=True,

)

对于这个数据集,您使用单步模型(即TemporalFusionTransformer),这是Google最先进的深度学习模型,用于预测时间序列。这个网络在基准测试中比Amazon的DeepAR表现好了36-69个百分点。

tft = TemporalFusionTransformer.from_dataset(

training,

learning_rate=0.03,

hidden_size=16,

# number of attention heads. Set to 4 for large datasets

attention_head_size=1,

dropout=0.1,

hidden_continuous_size=8, # set to <= hidden_size

output_size=7, # 7 quantiles by default

loss=QuantileLoss(),

# reduce learning rate if no improvement is seen in the validation loss after 'x' epochs

reduce_on_plateau_patience=4,

)

print(f"Number of parameters in network: {tft.size()/1e3:.1f}k")

输出如下

Number of parameters in network: 29.5k

在这里,您使用TemporalFusionTransformer构建模型,并将Stallion数据拟合到该模型。然后您使用predict()在新的数据上生成预测(即下六个月的数据)。最终输出pytorch_forecasting.utils.TupleOutputMixIn.to_network_output.locals.Output显示了下六个月数据的volume

以下是包含下六个月数据的DataFrame的示例输出

Sample-output-for-the-dataframe

以下是下六个月volume的预测

volume prediction for the six months

本文中所有代码都可以在这个GitHub仓库中找到。

结论

在这篇文章中,您学习了如何高效地使用InfluxDB存储和处理时序数据。您可以看到,InfluxDB的UI是交互式的,易于使用,可以创建桶、添加数据,并支持多种语言生成Flux查询。

选择要可视化的数据后,InfluxDB会自动构建Flux查询。它可以构建客户端代码(将Flux查询作为字符串嵌入),该代码可以在客户端IDE中执行。

InfluxDB还提供InfluxDB OSS,它可以本地安装和使用。

除了Python客户端,它还可以与其他客户端集成,您可以使用像Telegraf这样的插件,Telegraf是一个开源的数据收集代理,可以与数据库、传感器、服务和第三方API通信。

关于作者

Smriti Satyan是一位对技术写作充满热情的机器学习工程师。Smriti喜欢以简单和易于记忆的方式传达复杂的技术细节和用法。