使用 Python 获取和存储时间序列数据

导航至

本文最初发表于 The New Stack,并经许可在此处转载。

在本教程中,我们将学习如何使用 Python 从 OpenWeatherMap API 获取时间序列数据,并将其转换为 Pandas DataFrame。接下来,我们将使用 InfluxDB Python 客户端将数据写入 InfluxDB,一个时间序列数据平台。我们将 JSON 响应从 API 调用转换为 Pandas DataFrame,因为我发现这是将数据写入 InfluxDB 的最简单方法。我们写入 InfluxDB 是因为它是一个专门构建的数据库,旨在处理 时间序列数据 的高摄取需求。

要求

本教程在 macOS 系统上执行,该系统通过 Homebrew 安装了 Python 3。 我建议设置额外的工具,例如 virtualenvpyenvconda-env,以简化 Python 和客户端的安装。否则,完整的要求如下

influxdb-client=1.30.0
pandas=1.4.3
requests>=2.27.1

本教程还假设您已创建 免费层 InfluxDB Cloud 帐户或正在使用 InfluxDB OSS。它还假设您已

最后,本教程要求您在 OpenWeatherMap 上创建一个帐户并创建令牌

请求天气数据

首先,我们需要请求我们的数据。我们将使用 request 库从指定的经度和纬度以及 OpenWeatherMap API 返回每小时天气数据。

# Get time series data from OpenWeatherMap API
params = {'lat':openWeatherMap_lat, 'lon':openWeatherMap_lon, 'exclude': "minutely,daily", 'appid':openWeatherMap_token}
r = requests.get(openWeather_url, params = params).json()
hourly = r['hourly']

将数据转换为 Pandas DataFrame

接下来,将 JSON 数据转换为 Pandas DataFrame。我们还将时间戳从具有秒精度的 Unix 时间戳转换为 datetime 对象。进行此转换是因为 InfluxDB 写入方法要求时间戳采用 datetime 对象格式。接下来我们将使用此方法将我们的数据写入 InfluxDB。我们还删除我们不想写入 InfluxDB 的列。

# Convert data to Pandas DataFrame and convert timestamp to datetime object
df = pd.json_normalize(hourly)
df = df.drop(columns=['weather', 'pop'])
df['dt'] = pd.to_datetime(df['dt'], unit='s')
print(df.head)

将 Pandas DataFrame 写入 InfluxDB

现在实例化 InfluxDB Python 客户端库并将 DataFrame 写入 InfluxDB。我们指定一个 measurement 名称。measurement 包含 bucket 中的数据。您可以将其视为 bucket 之后 InfluxDB 中数据的第二高层级组织级别。

您还可以使用 data_frame__tag_columns 参数指定要转换为标签的列。

由于我们没有将任何列指定为标签,因此我们所有的列都将转换为 InfluxDB 中的字段。标签用于编写关于您的时间序列数据的元数据,并且可以用于更有效地一起查询数据子集。字段是您在 InfluxDB 中存储实际时间序列数据的位置。 此文档 更详细地介绍了 InfluxDB 中的这些数据概念。

# Write data to InfluxDB
with InfluxDBClient(url=url, token=token, org=org) as client:
   df = df
   client.write_api(write_options=SYNCHRONOUS).write(bucket=bucket,record=df,
       data_frame_measurement_name="weather",
       data_frame_timestamp_column="dt")

完整脚本

回顾一下,让我们看一下完整的脚本。我们遵循了以下步骤

  1. 导入库
  2. 收集以下内容
    • InfluxDB bucket
    • InfluxDB org
    • InfluxDB 令牌
    • InfluxDB URL
    • OpenWeatherMap URL
    • OpenWeatherMap 令牌
  3. 构建您的请求。
  4. 将 JSON 响应转换为 Pandas DataFrame。
  5. 删除您不想写入 InfluxDB 的任何列。
  6. 将时间戳列从 unix 时间转换为 Pandas datetime 对象。
  7. 实例化 InfluxDB Python 客户端库。
  8. 写入 DataFrame 并指定 measurement 名称和时间戳列。
import requests
import influxdb_client
import pandas as pd
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS

bucket = "OpenWeather"
org = "<my_InfluxDB_org>" # or email you used to create your Free Tier InfluxDB Cloud account
token = "<my_InfluxDB_token"
url = "<my_InfluxDB_url>" # for example, https://us-west-2-1.aws.cloud2.influxdata.com/
openWeatherMap_token = "<my_OpenWeatherMap_token>"
openWeatherMap_lat = "33.44"
openWeatherMap_lon = "-94.04"
openWeather_url = "https://api.openweathermap.org/data/2.5/onecall"

# Get time series data from OpenWeatherMap API
params = {'lat':openWeatherMap_lat, 'lon':openWeatherMap_lon, 'exclude': "minutely,daily", 'appid':openWeatherMap_token}
r = requests.get(openWeather_url, params = params).json()
hourly = r['hourly']

# Convert data to Pandas DataFrame and convert timestamp to datetime object
df = pd.json_normalize(hourly)
df = df.drop(columns=['weather', 'pop'])
df['dt'] = pd.to_datetime(df['dt'], unit='s')
print(df.head)

# Write data to InfluxDB
with InfluxDBClient(url=url, token=token, org=org) as client:
   df = df
   client.write_api(write_options=SYNCHRONOUS).write(bucket=bucket,record=df,
       data_frame_measurement_name="weather",
       data_frame_timestamp_column="dt")

查询数据

现在我们已经将数据写入 InfluxDB,我们可以使用 InfluxDB UI 查询我们的数据。导航到数据资源管理器(从左侧导航栏)。使用查询构建器选择您要可视化的数据以及您要 可视化 它的范围,然后点击提交

Data Explorer-Query the Data

我们天气数据的默认物化视图。InfluxDB 自动聚合时间序列数据,以便新用户不会意外查询过多数据并导致超时。

专家提示: 当您使用查询构建器查询数据时,InfluxDB 会自动对您的数据进行降采样。要查询您的原始数据,请导航到 脚本编辑器 以查看底层 Flux 查询。 Flux 是 InfluxDB 的原生查询和脚本语言,可用于 分析创建预测 您的时间序列数据。取消注释或删除包含 aggregateWindow() 函数的行以查看您的原始数据。

data explorer-veiw raw data

导航到 脚本编辑器 并取消注释或删除 aggregateWindow() 函数以查看原始天气数据。

结论

我希望这篇博文能启发您利用 InfluxDB Python 客户端库在 InfluxDB 中获取和存储时间序列数据。如果您想了解更多关于使用 Python 客户端库从 InfluxDB 查询数据的信息,我鼓励您查看 这篇文章。还值得注意的是,您可以使用 Flux 从 OpenWeatherMap API 获取数据,并通过任务将其存储在 InfluxDB 中。如果您正在使用 InfluxDB Cloud,这意味着此 Flux 脚本将被托管并定期执行,因此您可以将可靠的天气数据流获取到您的实例中。要了解更多关于如何在用户定义的计划上使用 Flux 获取天气数据的信息,请阅读 这篇文章