使用FB Prophet和InfluxDB进行预测

导航到

我认为很多人会立即将“时间序列”这个词与“预测”联系起来。毕竟,谁不想能够预测未来呢?虽然我们还没有做到这一点,但我们可以用置信区间来生成预测。在本教程中,我们将学习如何使用Prophet和InfluxDB进行单变量时间序列预测

什么是Prophet?

来自Prophet GitHub网站:“Prophet是一种基于加性模型的预测时间序列数据的方法,其中非线性趋势通过年度、周和日季节性以及节假日效应进行拟合。它最适合具有强烈季节性效应和多个季节历史数据的时间序列。Prophet对缺失数据和趋势变化具有很强的鲁棒性,并且通常能够很好地处理异常值。Prophet是由Facebook核心数据科学团队发布的开源软件。它可在CRAN和PyPI上下载。”

Prophet的优点

根据Prophet 文档,Prophet 的设计目标是

  • 准确且快速 - 模型在Stan中拟合,"一个先进的统计建模和高性能统计计算平台"。
  • 全自动 - Facebook 理解时间序列预测是一个小众领域。数据科学家可能没有接受过广泛的相关培训,并且可能觉得其他预测方法难以调整或不够灵活。Prophet 应该易于使用。"Prophet 对异常值、缺失数据和时间序列中的剧烈变化有很强的鲁棒性"。
  • 可调整 - Facebook 假设数据科学家具有深厚的领域专业知识。Prophet 通过允许数据科学家应用他们的专业知识并轻松调整预测来旨在考虑潜在的特殊特征。

Prophet 的工作原理

这里不会过多详细说明,但基本上 Prophet 的工作原理与Holt-Winters或三重指数平滑相似。Prophet 结合季节性、趋势和假日。Prophet 可以用以下公式表示

y(t)= g(t) + s(t) + h(t) + ?t

其中

g(t):用于模拟时间序列非周期性变化的分段线性或逻辑增长曲线 s(t):周期性变化(例如每周/年度季节性) h(t):假日的影响(由用户提供)具有不规则的时间表 ?t:误差项用于处理模型无法适应的任何异常变化

假设和考虑事项

本教程假设您已阅读并遵循了前一篇名为“使用 InfluxDB Python 客户端入门”的文章中的步骤,并且您已安装 InfluxDB 和客户端。本教程的配套仓库可以在此处找到。本教程还假设您知道如何使用 Telegraf 将数据写入 InfluxDB。最后,请注意,本教程的大部分代码取自 Prophet 的快速入门示例笔记本。本教程的目的是:1)演示 Prophet 和 InfluxDB 的集成;2)使您熟悉 Prophet。

查询 InfluxDB,转换为 DataFrame,创建预测

导入依赖

import pandas as pd
import time
from datetime import datetime
from fbprophet import Prophet

定义认证参数并连接到客户端

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
token = $my-token
bucket = $my-bucket
org = $my-org
client = InfluxDBClient(url="https://127.0.0.1:9999", token=token, org=org)
query_api = client.query_api()
write_api = client.write_api(write_options=SYNCHRONOUS)

创建一个 Flux 查询

query = 'from(bucket:"fbprophet")' \
        ' |> range(start:2007-12-10T15:00:00Z, stop:2016-01-20T15:00:00Z)'\
        ' |> filter(fn: (r) => r._measurement == "views")' \
        ' |> filter(fn: (r) => r._field == "y")'

查询 InfluxDB 并返回结果

result = client.query_api().query(org=org, query=query)

将结果转换为列表

raw = []
for table in result:
    for record in table.records:
        raw.append((record.get_value(), record.get_time()))
print(raw[0:5])

forecasting prophet influxdb

将原始数据转换为 DataFrame

print()
print("=== influxdb query into dataframe ===")
print()
df=pd.DataFrame(raw, columns=['y','ds'], index=None)
df['ds'] = df['ds'].values.astype('<M8[D]')
df.head()

influxdb query dataframe

通过实例化一个新的 Prophet 对象并将历史 DataFrame 传递给该对象来拟合模型

m = Prophet()
m.fit(df)

使用辅助方法 Prophet.make_future_dataframe 为预测准备您的 DataFrame

#365 specifies the number of time series points you'd like to forecast onto 
future = m.make_future_dataframe(periods=365)

进行预测

#The predict method will assign each row in future a predicted value (yhat). The upper (yhat_upper) and lower (yhat_lower) confidence intervals are also included as a part of the forecast. Columns for components and uncertainty intervals are also included in the forecast, although they aren't displayed here. 

forecast = m.predict(future)
forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail()

forecasting prophet influxdb prediction yhat

将 DataFrame 转换为 Line Protocol

现在我们可以将预测转换为 Line Protocol 并将其写入实例。

向我们的 DataFrame 添加一个度量列

forecast['measurement'] = "views"

将 DataFrame 转换为 Line Protocol

cp = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper','measurement']].copy()
lines = [str(cp["measurement"][d]) 
         + ",type=forecast" 
         + " " 
         + "yhat=" + str(cp["yhat"][d]) + ","
         + "yhat_lower=" + str(cp["yhat_lower"][d]) + ","
         + "yhat_upper=" + str(cp["yhat_upper"][d])
         + " " + str(int(time.mktime(cp['ds'][d].timetuple()))) + "000000000" for d in range(len(cp))]

将行写入实例

from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS

_write_client = client.write_api(write_options=WriteOptions(batch_size=1000, 
                                                            flush_interval=10_000,
                                                            jitter_interval=2_000,
                                                            retry_interval=5_000))

_write_client.write(bucket, org, lines)

关闭客户端

_write_client.__del__()
client.__del__()

使用 InfluxDB UI 可视化我们的预测

最后,我们可以使用 UI 快速可视化我们所做的预测。我们只需选择要显示的桶、度量列和字段值来构造一个 Flux 查询。点击“提交”即可构建这个美丽的可视化

visualizing forecast InfluxDB UI

使用 Prophet 和 InfluxDB 进行预测。原始数据(粉色),预测(紫色),yhat_upper(蓝色),yhat_lower(橙色)。

使用 InfluxDB 和 Prophet 赢得预测战斗

数据收集只是战斗的一半。另一半是能够轻松进行数据分析。FB Prophet旨在使时间序列预测简单快捷。InfluxDB通过提供多种语言的客户端库来补充这一努力,以便您可以轻松集成优质工具。在本教程中,我决定编写脚本将查询结果转换为DataFrame。然而,2.0版本的InfluxDB Python客户端具有Pandas功能。我建议您查看这篇博客,了解如何直接从您的查询中返回DataFrame。

我还处理了一个相对较小的数据集,但2.0版本的InfluxDB Python客户端具有多进程能力,允许您在本地机器上大约20秒内写入超过400万个点。如果您正在寻找在Python中进行时间序列分析项目,InfluxDB Python客户端是一个不错的选择。当然,如果您遇到障碍,请在我们社区网站Slack频道上分享。我们很高兴得到您的反馈,并帮助您解决遇到的问题。