使用 TensorFlow 和 InfluxDB 进行时间序列预测

导航至

本文最初发表于 The New Stack,并经许可在此处转载。

您可能熟悉机器学习 (ML) 和深度学习 (DL) 技术的实时示例,例如面部识别、光学字符识别 OCR、Python 语言翻译器自然语言搜索 (NLS)。

但现在,DL 和 ML 正在努力以惊人的准确度预测股票市场、天气和信用卡欺诈等。您可能已经注意到,这些类别都与时间相关,并且属于时间序列数据类别。

时间序列数据是指随时间变化或可以用时间表示的一组值。使用时间序列数据时,您始终处理大量数据,这些数据需要长期存储。由于可扩展性问题,在关系数据库中大规模存储时间序列数据可能很困难。这就是时间序列数据库 (TSDB) 的用武之地。

TSDB 专为存储时间序列数据而设计。InfluxDB 是一种广泛使用的 TSDB,可跟踪随时间推移的测量和事件,并根据聚合时间存储它们。

但是,在使用 InfluxDB 存储数据后,您的工作还没有完成。数据只有在您可以分析并使用它来改进业务时才有用。例如,您可以使用时间序列数据来预测未来天气模式,这个过程称为时间序列预测

在本文中,您将学习如何使用来自 InfluxDB 的数据在 TensorFlow 中训练模型并进行预测。

什么是 InfluxDB?

InfluxDB 的数据模型专门用于时间序列数据,因此没有任何限制,例如用较新的值替换旧值或更高的访问时间。它使用标签并使字段保持未索引状态,这会自动确保您拥有良好的数据库性能。InfluxDB 功能多样,支持多种数据类型,并允许用户根据需要创建其他字段和标签。

InfluxDB 可用于多种时间敏感的场景,包括以下内容

  • 股票市场:关系数据库一次批量存储许多条目中的数据,而股票价格需要一次存储一行,因为您没有第二天的价格。如果您使用 InfluxDB 随时间存储股票市场数据,它可以为您提供速度和效率。一旦存储了这些数据,您就可以应用不同的时间序列算法,例如自回归积分滑动平均模型 (ARIMA) 或神经网络方法来预测未来几天的股价。(请注意:这并非 100% 准确,但它可以让您了解市场可能走向何方。)
  • 健康监控:智能手表和手机等设备用于跟踪您的健康状况。在这些设备的后端,ML/DL 算法应用于数据以进行必要的预测。如果在用户的健康状况中发现任何异常,他们的医生会收到有关更改的通知。
  • 天气数据:多个传感器可以存储特定时期的天气数据。如果存储在 InfluxDB 中,则可以将数据与不同的算法一起使用以帮助进行预测。但是,并非所有存储在数据库中的数据都可以用于进行预测,因为它会过于详尽。在这种情况下,会选择特定的时间范围,例如两个月的数据,然后使用不同的统计方法(如 ARIMA、ARIMAX 或 SARIMAX)来预测天气。

使用 TensorFlow 和 InfluxDB 进行时间序列预测

如前所述,您将连接到 InfluxDB 并安装 InfluxDB Python 库。然后,您将使用数据集并从数据构建模型以进行一些预测。您需要做的第一件事是设置您的 InfluxDB 帐户。

本教程假设您使用的是 macOS 界面,但如果您使用的是 Windows 或 Unix,说明也类似。

设置 InfluxDB

要在 macOS 中安装 InfluxDB,您可以使用 Homebrew

$ brew update
$ brew install influxdb influxdb-cli

或者,您可以从 安装 InfluxDB 页面手动下载它,或者注册一个免费的 InfluxDB Cloud 帐户以开始使用,无需本地安装。

注意:如果您遇到打开文件过多错误,请按照这些步骤修复它。

安装完成后,在终端中使用以下命令启动 InfluxDB

influxd

首次启动 InfluxDB 时,您需要为其提供一些必需的值,否则您将无法使用它。您可以使用命令行终端或 localhost GUI 进行设置。要通过终端配置 InfluxDB,请使用 setup 命令

$ influx setup

对于初始设置,将需要以下详细信息

  • 用户名:可以为您的帐户选择任何用户名。
  • 密码:必须为同一用户名创建密码才能访问数据库。
  • 组织名称:数据库操作需要组织名称。
  • 存储桶名称: 您可以为一个组织拥有任意数量的存储桶,但在初始设置中,您需要创建至少一个存储桶。
  • 保留期:这是您的存储桶将存储数据然后自动删除数据的期限。如果您选择永不或将其留空,它将无限期存储数据。

如果您想使用 GUI 配置 InfluxDB,您需要访问localhost:8086localhost:8086。进入后,它会要求您提供前面提到的所有必需详细信息。

要在其他平台上设置 InfluxDB,您可以参考安装 InfluxDB页面以获取更多信息。

完成初始设置并创建帐户后,您需要登录到 localhost:8086localhost:8086,您应该会看到以下内容

influx-getting-started

在本教程中,一旦您连接到数据库,您只需要关注数据组件。但是,如果您愿意,可以探索仪表板上可用的所有选项。

现在,单击左侧边栏上的数据图标,您将看到如下所示的屏幕

influxdb-load-data

InfluxDB 加载数据

为了使用 Python TensorFlow 库,您需要查看存储桶令牌部分。存储桶类似于关系数据库中的数据库名称,令牌是唯一的密钥,只能用于不同的数据库操作。

在本例中,您已经创建了一个初始存储桶。要生成唯一令牌,请导航到令牌部分并单击生成令牌按钮。您可以使用两种不同类型的令牌

  1. 读/写令牌: 此令牌仅允许您读取和写入您选择的不同存储桶

generate-read-write-token

  1. 完全访问令牌: 此令牌使您可以完全控制数据库。您可以对 InfluxDB 中存在的任何存储桶执行读取、写入、更新和删除操作。在本教程中,您只需要使用此令牌,因为它比读/写令牌更灵活

generate-token

现在您已经设置了 InfluxDB,是时候使用您的数据实现时间序列预测了。Python 3.9 与 Jupyter Notebook 一起用于开发。

安装 InfluxDB Python 库

要安装 InfluxDB Python 库,您可以使用终端或 Jupyter Notebook 中的 Python 包管理器 (pip)

## install using terminal
$ pip install influxdb-client

## install using jupyter notebook
! pip install influxdb-client

安装 TensorFlow

TensorFlow 是一个非常强大的库,您可以使用它来实现任何类型的神经网络,例如人工神经网络 (ANN)]、卷积神经网络 (CNN) 或循环神经网络 (RNN)。要使用它,您需要导入您要使用的不同层和模型,编译它们,然后运行它们以获得训练后的模型。要使用 pip 安装 TensorFlow,请运行以下命令

## install using terminal
$ pip install tensorflow

## install using jupyter-notebook
! pip install tensorflow

探索数据集

在本教程中,您将使用一个流行的数据集,其中包含特定时期内识别出的太阳黑子。您将在本教程中看到的代码引用自文章“使用 TensorFlow 和深度混合学习进行时间序列预测”。您可以从此 GitHub 存储库下载它。

下载后,您将看到数据具有以下字段

  1. 日期:记录黑子的日期。
  2. 月平均太阳黑子总数:随时间记录的太阳黑子的平均数量。

要在 Python 中读取数据集并检查数据的前几行,请使用以下代码

(代码)

dataset

将数据集连接到 InfluxDB

在将数据连接到 InfluxDB 之前,您需要使用以下代码通过 InfluxDB 2.0 Python 客户端连接到您的存储桶

## import dependencies
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

## You can generate a Token from the "Tokens Tab" in the UI
token = "your-token"
org = "my-org"
bucket = "Testing"

## connect to influxdb
client = InfluxDBClient(url="http://localhost:8086", token=token, org = org)

前两行显示了使用 InfluxDB 的重要导入。然后,您定义基本的连接详细信息,例如令牌、组织名称和要存储数据的存储桶名称。最后,您调用 InfluxDBClient 函数以连接到 InfluxDB。这里提到了 localhost URL,因为您要连接到本地 InfluxDB 客户端。

如果您想连接到 InfluxDB Cloud 实例,则需要指定相应的 URL。您可以在InfluxDB Cloud 区域页面上找到不同的端点。

插入数据

现在您已经连接到 InfluxDB,您需要在存储数据之前执行一项任务。InfluxDB 假定您数据集中的索引将是时间戳值;如果不是,它会将列假定为不同的事件,并将其存储在当前时间戳(当前日期和时间)。您可以使用以下代码将时间戳列作为索引

# convert Date column to datetime
data['Date'] = pd.to_datetime(data['Date'])

## create date as index
data.set_index(data['Date'], drop = True, inplace = True)
data.drop('Date', axis = 1, inplace = True)

如果您有兴趣了解有关将 Pandas 与 InfluxDB 结合使用的更多信息,请查看此Pandas 和 InfluxDB 教程

reindexed data

重新索引的数据

要使用 Python 在 InfluxDB 中插入数据,您需要创建一个 Write API 对象

## create object of write API
write_api = client.write_api(write_options=SYNCHRONOUS)

此处,SYNCHRONOUS 指定您将一次存储多行数据。现在,您只需要将数据帧传递给您的 write_api 对象

## write data to influxdb
response = write_api.write(bucket, record = data, data_frame_measurement_name='sunspot',
                    	data_frame_tag_columns=['sunspot'])

在前面的代码中,data_frame_tag_columns 用于存储列的元数据信息,而 data_frame_measurement_name 类似于关系数据库中的表名。如果来自 write API 的响应为 None,则您的数据已成功存储;否则,您将收到错误消息。

注意:根据您的系统功能,您可能会遇到打开文件过多错误。在这种情况下,您需要一次存储一些固定数量的行,而不是一次传递整个数据集

i_lower = 0
## iterate over 25 rows at once and store it in influxdb
for i in range(0, len(data), 25):
	response = write_api.write(bucket, record = data[i_lower:i], data_frame_measurement_name='sunspot',
                    	data_frame_tag_columns=['sunspot'])

	i_lower = i
	print('%d rows inserted'%(i))

读取数据

现在您已将数据存储在 InfluxDB 数据库中,您可以读取该数据。要从 InfluxDB 查询数据,您需要创建一个 read API 对象。Flux 是从数据库查询数据的最简单方法。您只需要指定要查询的期间

## query data
query_api = client.query_api()
tables = query_api.query('from(bucket:"Testing") |> range(start: -275y)')

此处,期间定义为 -275y,因为您要选择过去 275 年的所有条目。

查询数据后,您可以迭代每一行以从该数据创建数据帧

## iterate over queried data
time, sunspot = [], []
for table in tables:
	for row in table.records:
    	time.append(row.values.get('_time'))
    	sunspot.append(row.values.get('_value'))

## create dataframe
data = pd.DataFrame({'Date':time, 'Monthly Mean Total Sunspot Number': sunspot})

检查和清理数据集

现在您有了数据,您需要从 Date 列中检索和清理数据

## convert datetime to only date
data['Date'] = data['Date'].dt.date

要可视化太阳黑子数据与 Date 列的关系,您可以使用以下代码

## import plotting dependency
import matplotlib.pyplot as plt 
## plot the data
data.plot()
plt.show()

plot 函数绘制所有太阳黑子随时间变化的图

time-series-plot

特征工程

目前,您只有一个列(太阳黑子)作为特征,这不可能进行预测。为了进行必要的预测,您需要将最近六十个条目作为输入,并且您需要将该数据拆分为训练集和测试集

# Convert the data values to numpy for better and faster processing
time_index = np.array(data['Date'])
data = np.array(data['Monthly Mean Total Sunspot Number'])   

# ratio to split the data
SPLIT_RATIO = 0.8

# Dividing into train-test split
split_index = int(SPLIT_RATIO * data.shape[0])   

# Train-Test Split
train_data = data[:split_index]
train_time = time_index[:split_index]  
test_data = data[split_index:]
test_time = time_index[split_index:]

创建训练集和测试集后,您需要创建一个函数来准备输入特征。您无需担心在数据上计算不同的内容来创建输入特征,因为 TensorFlow 会为您执行此操作。您只需要定义窗口大小(您要作为输入特征的条目数)

## required parameters
WINDOW_SIZE = 60
BATCH_SIZE = 32
SHUFFLE_BUFFER = 1000

## function to create the input features
def ts_data_generator(data, window_size, batch_size, shuffle_buffer):
	'''
	Utility function for time series data generation in batches
	'''
	ts_data = tf.data.Dataset.from_tensor_slices(data)
	ts_data = ts_data.window(window_size + 1, shift=1, drop_remainder=True)
	ts_data = ts_data.flat_map(lambda window: window.batch(window_size + 1))
	ts_data = ts_data.shuffle(shuffle_buffer).map(lambda window: (window[:-1], window[-1]))
	ts_data = ts_data.batch(batch_size).prefetch(1)
	return ts_data# Expanding data into tensors

tensor_train_data = tf.expand_dims(train_data, axis=-1)
tensor_test_data = tf.expand_dims(test_data, axis=-1)

## generate input and output features for training and testing set
tensor_train_dataset = ts_data_generator(tensor_train_data, WINDOW_SIZE, BATCH_SIZE, SHUFFLE_BUFFER)
tensor_test_dataset = ts_data_generator(tensor_test_data, WINDOW_SIZE, BATCH_SIZE, SHUFFLE_BUFFER)

在前面的函数中,数据被转换为 TensorFlow 数据集以进行更快的处理。然后调用 window 函数来创建不同的输入特征(最近六十个输入条目作为特征)。之后,使用 flat_map 函数通过展平数据来保留时间序列的顺序。最后,您对数据进行洗牌并创建批次以训练 DL 模型。

构建模型并进行预测

现在数据已准备好供您对其执行时间序列分析。要进行预测,您需要创建一个基于 DL 的模型。在本例中,您将使用 CNN长短期记忆网络 (LSTM) 模型的组合


## combination of 1D CNN and LSTM
model = tf.keras.models.Sequential([tf.keras.layers.Conv1D(filters=32, kernel_size=5,strides=1, padding="causal",activation="relu",                  	 
                        	input_shape=[None, 1]),
                        	tf.keras.layers.LSTM(64, return_sequences=True),
                        	tf.keras.layers.LSTM(64, return_sequences=True),  
                        	tf.keras.layers.Dense(30, activation="relu"),   
                        	tf.keras.layers.Dense(10, activation="relu"),  
                        	tf.keras.layers.Dense(1)])

在前面的代码中,使用 1-D CNN 层、几个 LSTM 层和一些 Dense 层初始化顺序模型。顺序类准备神经网络层的级联管道,以便您定义的每一层都将添加到前一层。input_shape 参数使用 None, 1 初始化,指示提供给模型的输入的形状。在本例中,提供展平的输入,模型的形状确定为 1。

定义模型后,您需要编译它并在训练数据上训练它

## compile neural network model
optimizer = tf.keras.optimizers.SGD(lr=1e-3, momentum=0.9)
model.compile(loss=tf.keras.losses.Huber(),
          	optimizer=optimizer,
          	metrics=["mae"])
## training neural network model
history = model.fit(tensor_train_dataset, epochs=200, validation_data=tensor_test_dataset)

一些参数用于编译模型,包括优化器算法,该算法调整网络神经元的权重和学习率,从而减少损失并提高整体准确性。基于其进行权重修改的过程的损失是使用损失函数计算的。然后,指标用于估计模型的整体准确性。同时,使用 mae 是因为您要预测数值数据。训练开始后,您应该会看到您的模型正在运行

model running

模型的最后一行在训练数据上训练了 200 个 epoch。您可以使用以下代码检查模型的训练损失和验证损失

import matplotlib.pyplot as plt
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

由于模型如果不能进行预测就毫无用处,因此您需要为要进行预测的日期之前的六十天提供值

def model_forecast(model, data, window_size):
	ds = tf.data.Dataset.from_tensor_slices(data)
	ds = ds.window(window_size, shift=1, drop_remainder=True)
	ds = ds.flat_map(lambda w: w.batch(window_size))
	ds = ds.batch(32).prefetch(1)
	forecast = model.predict(ds)
	return forecast

rnn_forecast = model_forecast(model, data[..., np.newaxis], WINDOW_SIZE)
rnn_forecast = rnn_forecast[split_index - WINDOW_SIZE:-1, -1, 0]
# Overall Error
error = tf.keras.metrics.mean_absolute_error(test_data, rnn_forecast).numpy()
print(error)

这里使用了 model_forecast 函数,它类似于 ts_data_generator,只是它用于进行预测。函数之后的接下来的两行用于对测试数据进行预测。同时,最后一行用于显示预测中的误差(实际测试数据与模型预测之间的差异)。

现在,您的训练损失和验证损失图应如下所示

占位符 占位符

要检查预测的准确性,您可以使用以下代码绘制比较图

import matplotlib.pyplot as plt
# summarize history for loss
plt.plot(test_data)
plt.plot(rnn_forecast)
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['test', 'predictions'], loc='upper left')
plt.show()

您的预测图将如下所示

predictions-graph

如您所见,预测效果足够好,它们能够保留时间序列的模式,即使有时它们与实际结果相差甚远。

本教程的完整代码在此 GitHub 存储库中提供。

结论

在本文中,您了解了什么是时间序列数据,以及如何将其有效地存储在专门为此设计的时间序列数据库中,例如 InfluxDB

其他资源

关于作者

Gourav-Singh-Bais

Gourav Singh Bais 是 ValueMomentum Inc. 的应用机器学习工程师。他擅长开发机器学习/深度学习管道、重新训练系统以及将数据科学原型转换为生产级解决方案。他在同一领域工作了 3 年,并为许多客户提供服务,包括财富 500 强公司,这使他有机会撰写关于他的经验和技能的文章,这些经验和技能可以为机器学习社区做出贡献。