使用 FB Prophet 和 InfluxDB 进行预测

导航至

我认为很多人立即将“时间序列”这个词与“预测”联系起来。毕竟,谁不想能够预测未来呢?虽然我们还不能完全做到这一点,但我们可以生成具有置信区间的预测。在本教程中,我们将学习如何使用 Prophet 和 InfluxDB 进行单变量时间序列预测。

什么是 Prophet?

来自 Prophet GitHub 站点:“Prophet 是一种用于预测时间序列数据的程序,它基于加法模型,其中非线性趋势与年度、每周和每日季节性以及节假日效应相拟合。它最适用于具有强烈季节性效应和多个季节历史数据的时间序列。Prophet 对缺失数据和趋势变化具有鲁棒性,并且通常可以很好地处理异常值。Prophet 是 Facebook 核心数据科学团队发布的开源软件。它可以在 CRAN 和 PyPI 上下载。”

Prophet 的优势

根据 Prophet 文档,Prophet 的创建目的是为了

  • 准确且快速 - 模型在 Stan 中拟合,“Stan 是一个用于统计建模和高性能统计计算的先进平台”。
  • 完全自动化 - Facebook 了解到时间序列预测是小众的。数据科学家可能没有接受过广泛的培训,并且发现其他预测方法太难调整或过于不灵活。Prophet 应该易于使用。“Prophet 对异常值、缺失数据和时间序列的剧烈变化具有鲁棒性。”
  • 可调 - Facebook 假设数据科学家具有深厚的领域专业知识。Prophet 旨在通过允许数据科学家应用他们的专业知识并轻松调整预测来解释潜在的特殊特征。

Prophet 的工作原理

我们不会在这里详细介绍,但本质上 Prophet 的工作原理类似于 Holt-Winters 或三重指数平滑。Prophet 结合了季节性、趋势和节假日。Prophet 由以下公式表示

y(t)= g(t) + s(t) + h(t) + ?t

其中

g(t):用于建模时间序列中非周期性变化的逐段线性或逻辑增长曲线 s(t):周期性变化(例如,每周/每年的季节性) h(t):节假日的影响(用户提供)具有不规则的时间表 ?t:误差项解释模型未考虑的任何异常变化

假设和注意事项

本教程假设您已阅读并遵循上一篇文章“InfluxDB Python 客户端入门”中的步骤,并且您已安装 InfluxDB 和客户端。本教程随附的存储库可以在此处找到。本教程还假设您知道如何使用 Telegraf 将数据写入 InfluxDB。最后,请注意,此代码的大部分取自 Prophet 的快速入门示例笔记本。本教程仅旨在:1) 演示 Prophet 和 InfluxDB 的集成,以及 2) 使您熟悉 Prophet。

查询 InfluxDB、转换为 DataFrame、创建预测

导入依赖项

import pandas as pd
import time
from datetime import datetime
from fbprophet import Prophet

定义身份验证参数并连接到客户端

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
token = $my-token
bucket = $my-bucket
org = $my-org
client = InfluxDBClient(url="http://localhost:9999", token=token, org=org)
query_api = client.query_api()
write_api = client.write_api(write_options=SYNCHRONOUS)

创建 Flux 查询

query = 'from(bucket:"fbprophet")' \
        ' |> range(start:2007-12-10T15:00:00Z, stop:2016-01-20T15:00:00Z)'\
        ' |> filter(fn: (r) => r._measurement == "views")' \
        ' |> filter(fn: (r) => r._field == "y")'

查询 InfluxDB 并返回结果

result = client.query_api().query(org=org, query=query)

将结果转换为列表

raw = []
for table in result:
    for record in table.records:
        raw.append((record.get_value(), record.get_time()))
print(raw[0:5])

forecasting prophet influxdb

将原始数据转换为 DataFrame

print()
print("=== influxdb query into dataframe ===")
print()
df=pd.DataFrame(raw, columns=['y','ds'], index=None)
df['ds'] = df['ds'].values.astype('<M8[D]')
df.head()

influxdb query dataframe

通过实例化一个新的 Prophet 对象并传入历史 DataFrame 来拟合模型

m = Prophet()
m.fit(df)

使用辅助方法 Prophet.make_future_dataframe 准备您的 dataframe 以进行预测

#365 specifies the number of time series points you'd like to forecast onto 
future = m.make_future_dataframe(periods=365)

进行预测

#The predict method will assign each row in future a predicted value (yhat). The upper (yhat_upper) and lower (yhat_lower) confidence intervals are also included as a part of the forecast. Columns for components and uncertainty intervals are also included in the forecast, although they aren't displayed here. 

forecast = m.predict(future)
forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail()

forecasting prophet influxdb prediction yhat

将 DataFrame 转换为 Line Protocol

现在我们准备好将我们的预测转换为 Line Protocol 并将其写入我们的实例。

向我们的 DataFrame 添加一个 measurement 列

forecast['measurement'] = "views"

将 DataFrame 转换为 Line Protocol

cp = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper','measurement']].copy()
lines = [str(cp["measurement"][d]) 
         + ",type=forecast" 
         + " " 
         + "yhat=" + str(cp["yhat"][d]) + ","
         + "yhat_lower=" + str(cp["yhat_lower"][d]) + ","
         + "yhat_upper=" + str(cp["yhat_upper"][d])
         + " " + str(int(time.mktime(cp['ds'][d].timetuple()))) + "000000000" for d in range(len(cp))]

将行写入您的实例

from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS

_write_client = client.write_api(write_options=WriteOptions(batch_size=1000, 
                                                            flush_interval=10_000,
                                                            jitter_interval=2_000,
                                                            retry_interval=5_000))

_write_client.write(bucket, org, lines)

关闭客户端

_write_client.__del__()
client.__del__()

使用 InfluxDB UI 可视化我们的预测

最后,我们可以使用 UI 快速可视化我们所做的预测。我们只需选择我们要显示的 bucket、measurement 和 field 值来构建 Flux 查询。点击“提交”即可构建这个漂亮的可视化效果

visualizing forecast InfluxDB UI

使用 Prophet 和 InfluxDB 进行预测。原始数据(粉色)、预测(紫色)、yhat_upper(蓝色)、yhat_lower(橙色)。

使用 InfluxDB 和 Prophet 征服预测之战

数据收集只是战斗的一半。另一半是能够轻松地执行数据分析。FB Prophet 旨在使时间序列预测变得简单快捷。InfluxDB 通过提供多种语言的客户端库来补充这一努力,因此您可以轻松集成优秀的工具。对于本教程,我决定编写脚本将查询结果转换为 DataFrame。但是,2.0 InfluxDB Python 客户端具有 Pandas 功能。我建议查看这篇博客,了解如何直接从查询返回 DataFrame。

我还处理了一个相对较小的数据集,但是 2.0 InfluxDB Python 客户端具有多进程功能,允许您在本地机器上大约 20 秒内写入超过 400 万个点。如果您希望在 Python 中处理时间序列分析项目,InfluxDB Python 客户端是一个不错的选择。与往常一样,如果您遇到任何障碍,请在我们的社区网站或 Slack 频道上分享。我们很乐意获得您的反馈并帮助您解决遇到的任何问题。