使用 PyTorch 和 InfluxDB 进行时间序列预测
作者: 社区 / 产品, 用例
2022 年 11 月 08 日
导航至
本文由 Smriti Satyan 撰写。向下滚动查看作者简介。
时间序列数据(也称为时间戳数据)是指随时间推移测量的观测值(数据点)的集合。当绘制在图表上时,此类数据的一个轴始终是时间。由于时间是每个可观测实体的一部分,因此时间序列数据可用于各种行业,例如股票市场、天气数据、日志和跟踪。
InfluxDB 是一个开源时间序列数据库 (TSDB) 管理系统,专门用于存储 时间序列 数据,并帮助组织构建实时分析和云应用程序。它是一个全面的平台,支持时间序列数据的收集、监控、分析 和 可视化。
在本文中,您将学习 InfluxDB 的基础知识,包括如何在云中设置它、如何配置客户端以与其通信,以及如何从 InfluxDB Cloud 获取数据。您还将学习如何使用这些数据在 PyTorch 上训练模型,以进行 预测。PyTorch 是一个开源机器学习框架,具有可扩展的分布式训练和优化等功能,可加速工作流程投入生产。
什么是 InfluxDB?
InfluxDB 是一个专为时间序列数据构建的时间序列数据平台。最新版本 InfluxDB 2.0,用 Go 编写,可以用作云服务,称为 InfluxDB Cloud,并具有用于数据处理和可视化的 Web 界面。
InfluxDB 提供以下功能
-
支持多种数据类型
-
数据传输期间没有网络协议问题,无论字段和标签的数量如何
-
能够将元数据与时间序列数据一起编码
-
时间戳的纳秒级精度(对于科学计算和金融至关重要)
注意:字段值是指与字段键关联的数据。这可以是字符串、浮点数、整数或任何其他数据类型。字段值始终与时间戳关联,因为 InfluxDB 是一个时间序列数据库。字段键和字段值对的组合称为字段集。字段集是 InfluxDB 结构中未索引的强制性部分。
在使用中,InfluxDB 帮助 罗尔斯·罗伊斯动力系统 通过提供高效的数据存储、实时性能监控、季节性和趋势识别以及维护需求预测,提高了其制造工厂的 运营效率。这呈指数级地减少了昂贵的发动机故障并加速了增长。
它还帮助德州仪器 监控和改进生产 和质量保证。借助 InfluxDB,德州仪器能够实时识别和排除生产线上的低效率问题,从而提高产品标准。
有关跨多个行业的更多客户用例,请查看 此页面。
时间序列预测入门
现在您已经对 InfluxDB 有了更多了解,您可以设置 InfluxDB 并使其与 Python 客户端 通信并提取数据,以便您可以使用该数据进行预测。
设置 InfluxDB
首先,您需要注册一个 InfluxDB 帐户,该帐户可以无缝地与其他客户端集成。注册 并输入所需信息
创建帐户后,指定提供商和区域,并将公司名称指定为“NA”
查看并同意服务订阅协议,然后选择继续以选择计划
选择计划时,选择 免费 订阅。现在,您已成功创建 InfluxDB 帐户。您将自动进入 InfluxDB 仪表板的入门页面。从这里,您可以探索如何从不同的客户端连接到 InfluxDB、推送数据、生成 Flux 查询 以及可视化时间序列数据
为了与 InfluxDB 客户端 (Python) 通信,需要在 Python 脚本中指定凭据列表。
设置您的 API 令牌和存储桶名称
与客户端通信所需的凭据包括 API 令牌、组织、存储桶名称和 URL。存储桶是一个存储单元,它具有保留期(即数据在存储桶中存储的时间段)。组织是用户的工作空间。
要生成 API 令牌,请转到 UI 中的 API 令牌 选项卡
单击 生成 API 令牌 并选择 完全访问 API 令牌
为 API 令牌提供一个名称(例如“Sample1”),然后复制该值。安全地存储它。如果您丢失了 API 令牌,您将必须创建一个新的令牌,因为您以后无法访问 API 令牌的值
现在,您需要通过从左侧导航栏导航到 存储桶 来创建一个存储桶
然后单击 创建存储桶。如果您希望重命名存储桶,您可以转到 设置 > 重命名。不建议这样做,因为您需要在整个脚本中更新存储桶名称,这可能会导致意外后果,例如 ApiError
或 ConnectionError
,以及无法连接到 InfluxDB Cloud
安装 InfluxDB Python 库
在 InfluxDB Cloud 上设置帐户后,您需要安装 Python 客户端,以便在云和客户端之间建立通信管道。
对于 pip 安装,请使用以下命令
pip install influxdb-client
对于 macOS 安装,请使用以下命令
brew install influxdb-client
您现在已安装必要的依赖项。
配置客户端以与 InfluxDB Cloud 通信
安装客户端后,您需要通过指定凭据列表来配置客户端以与 InfluxDB 通信。为了确认已建立连接,请执行一个简单的 Flux 查询,并检查 InfluxDB 中的数据是否与您刚刚获取的数据匹配
# Fill in the below attributes after creating an account on InfluxDB Cloud
token = "YOUR_TOKEN_HERE"
org = "YOUR_EMAIL_HERE"
bucket = "YOUR_BUCKET_HERE"
# url depends on the region selected during sign up, an example is <url= "https://europe-west1-1.gcp.cloud2.influxdata.com">
url = "YOUR_URL_HERE"
client = InfluxDBClient(url=url, token=token, org=org, debug=True)
Flux 查询执行/其他源代码
results=[] # create an empty list
query = """option v = {timeRangeStart: -30d, timeRangeStop: now()}
from(bucket: "myBucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "stallion_data")"""
tables = client.query_api().query(query, org=org)
for table in tables:
for record in table.records:
results.append(
[ record.get_field(),
record.get_value(),
record.get_measurement(),
]
) # append the values fetched into a list
安装 PyTorch
安装 PyTorch 和 PyTorch Forecasting,因为 PyTorch Forecasting 构建在 PyTorch Lightning 之上。
如果您使用的是 Windows 计算机,请运行以下命令
pip install torch -f https://download.pytorch.org/whl/torch_stable.html.
如果您使用的是 macOS,请运行以下命令
pip install pytorch-forecasting
对于 Conda 安装,您可以使用以下命令
conda install pytorch-forecasting pytorch -c pytorch>=1.7 -c conda-forge
探索数据集
在本教程中,您将使用 PyTorch Forecasting 中提供的 Stallion 数据集。此数据展示了各种饮料以美元计价的销售额。您的目标是使用 21,000 个月度记录的历史销售数据,预测未来六个月的销售额。
Stallion 数据包含以下数据集
-
pricesalespromotion.csv 包含价格、销售额和促销费用(美元)
-
historicalvolume.csv 包含销售数据
-
weather.csv 包含月平均最高温度
-
industrysodasales.csv 包含行业级苏打水销量
-
eventcalendar.csv 包含活动详情(体育赛事、嘉年华等)
-
industry_volume.csv 包含行业实际啤酒销量
-
demographics.csv 包含人口统计详情
您可以使用多种不同的方法将数据从客户端加载到 InfluxDB 中,包括摄取 Pandas DataFrame、使用 注释上传 CSV 文件、使用和消耗 InfluxDB API/写入端点,或使用 influx write
命令。
在本例中,由于您使用的是自己的数据集(来自 PyTorch 的 Stallion 数据),因此您需要先将数据(存储为 Pandas DataFrame)推送到 InfluxDB Cloud,然后再获取它。
然后编写一个 Flux 脚本,使用以下代码片段将此数据从 InfluxDB Cloud 查询到您的客户端环境
results = []
with InfluxDBClient(
url="https://us-east-1-1.aws.cloud2.influxdata.com", token=token, org=org
) as client:
query = """option v = {timeRangeStart: -30d, timeRangeStop: now()}
from(bucket: "myBucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "stallion_data")"""
tables = client.query_api().query(query, org=org)
for table in tables:
for record in table.records:
# results.append(record)
results.append(
[
record.get_field(),
record.get_value(),
record.get_measurement(),
record.get_time(),
record.values.get("agency"),
record.values.get("sku"),
]
)
# convert the list to a dataframe
influx_df = pd.DataFrame(
results, columns=["_field", "_value", "_measurement", "time", "agency", "sku"]
)
注意:如果您使用的是免费层级,则有一些限制。例如,您不能超过数据时间戳的三十天,并且您不能在 InfluxDB 中写入超过特定数量的行。定价模型 是克服这些限制的一种选择。
以下是您的数据在 InfluxDB UI 中的外观示例
检查和清理数据
任何预测管道中最重要步骤之一是了解数据集,这将使您了解与数据关联的数量、单位和术语。然后,您需要清理数据以消除异常值、缺失值和差异。清理后的数据将更易于处理、构建模型和进行预测。
Stallion 数据相对干净,并且提取了诸如 time_idx
(时间索引,每步递增 1)和 month
(从 date
列中提取)之类的特征。
注意:如果您使用的是不同的数据集,则数据清理步骤将包括删除具有
NaNs
或NaTs
的行。您还可以删除具有缺失信息或冗余行的行。
特征工程
时间序列数据的要求之一是它应该有一列显示时间戳或日期和时间,这是用于预测的重要特征。
在 Stallion 数据集中,time_idx
起着该作用,它被转换为数据帧的索引。添加了诸如 month
和 log_volume
之类的附加特征,这将有助于提高预测的准确性。
由于您可以将 PyTorch Forecasting 用于预测,因此您需要将数据集转换为 TimeSeriesDataSet
,这是一个 PyTorch 数据集,以便于处理、分析、建模和拟合数据
training = TimeSeriesDataSet(
stallion_df[lambda x: x.time_idx <= training_cutoff],
time_idx="time_idx",
target="volume",
group_ids=["agency", "sku"],
min_encoder_length=max_encoder_length
// 2, # Encoder length should be long since it is in the validation set
max_encoder_length=max_encoder_length,
min_prediction_length=1,
max_prediction_length=max_prediction_length,
static_categoricals=["agency", "sku"],
static_reals=["avg_population_2017", "avg_yearly_household_income_2017"],
time_varying_known_categoricals=["special_days", "month"],
variable_groups={
"special_days": special_days
}, # a group of categorical variables is treated as a single variable
time_varying_known_reals=["time_idx", "price_regular", "discount_in_percent"],
time_varying_unknown_categoricals=[],
time_varying_unknown_reals=[
"volume",
"log_volume",
"industry_volume",
"soda_volume",
"avg_max_temp",
"avg_volume_by_agency",
"avg_volume_by_sku",
],
target_normalizer=GroupNormalizer(
groups=["agency", "sku"], transformation="softplus"
), # use softplus and normalize by group
add_relative_time_idx=True,
add_target_scales=True,
add_encoder_length=True,
allow_missing_timesteps=True,
)
对于此数据集,您使用单步模型(即 TemporalFusionTransformer
),这是 Google 最先进的深度学习模型,用于预测时间序列。该网络在基准测试中优于亚马逊的 DeepAR 36–69%
tft = TemporalFusionTransformer.from_dataset(
training,
learning_rate=0.03,
hidden_size=16,
# number of attention heads. Set to 4 for large datasets
attention_head_size=1,
dropout=0.1,
hidden_continuous_size=8, # set to <= hidden_size
output_size=7, # 7 quantiles by default
loss=QuantileLoss(),
# reduce learning rate if no improvement is seen in the validation loss after 'x' epochs
reduce_on_plateau_patience=4,
)
print(f"Number of parameters in network: {tft.size()/1e3:.1f}k")
输出如下
Number of parameters in network: 29.5k
在这里,您使用 TemporalFusionTransformer
构建模型并将 Stallion 数据拟合到该模型。然后,您使用 predict()
对新数据(即未来六个月的数据)生成预测。最终输出 pytorch_forecasting.utils.TupleOutputMixIn.to_network_output.locals.Output
显示了未来六个月数据的 volume
。
以下是包含未来六个月数据的数据帧的示例输出
这是未来六个月 volume
的预测
本文的所有代码都可以在此 GitHub 存储库 中找到。
结论
在本文中,您学习了 InfluxDB 如何有效地存储和处理时间序列数据。您可以看到 InfluxDB 的 UI 是交互式的、易于使用,并且可以创建存储桶、添加数据和生成多种语言的 Flux 查询。
一旦您选择要可视化的数据,InfluxDB 就会自动构建 Flux 查询。它可以构建客户端代码(嵌入 Flux 查询作为字符串),该代码可以在客户端 IDE 中执行。
InfluxDB 还具有可以 安装 并在本地使用的 InfluxDB OSS。
除了 Python 客户端之外,它还可以与许多 其他客户端 集成,您可以使用诸如 Telegraf 之类的插件,Telegraf 是一个开源数据收集代理,可以与数据库、传感器、服务和 第三方 API 通信。
关于作者
Smriti Satyan 是一位机器学习工程师,热衷于撰写所有技术文章。Smriti 喜欢以简单易记的方式传达复杂的技术细节和用途。