使用 Python 获取和存储时序数据

导航至

本文最初发布于 The New Stack,在此经授权重新发布。

在本教程中,我们将学习如何使用 Python 从 OpenWeatherMap API 获取时序数据并将其转换为 Pandas DataFrame。接下来,我们将使用 InfluxDB Python 客户端 将数据写入 InfluxDB,一个时序数据平台。我们将把 API 调用的 JSON 响应转换为 Pandas DataFrame,因为这对我来说是写入 InfluxDB 最简单的方式。我们写入 InfluxDB 是因为它是一个专门构建的数据库,旨在处理大量时序数据的摄入需求。

要求

本教程是在 macOS 系统上执行的,通过 Homebrew 安装了 Python 3。我建议设置其他工具,如 Python 3,以简化 Python 和客户端的安装。否则,完整的要求如下

influxdb-client=1.30.0
pandas=1.4.3
requests>=2.27.1

本教程还假定您已经创建了一个 免费层 InfluxDB Cloud 账户或正在使用 InfluxDB OSS。它还假定您已经

最后,本教程要求您已创建 OpenWeatherMap 账户并已创建[令牌](https://home.openweathermap.org/api_keys)。

请求天气数据

首先,我们需要请求我们的数据。我们将使用请求库通过 OpenWeatherMap API 从指定的经纬度返回每小时天气数据。

# Get time series data from OpenWeatherMap API
params = {'lat':openWeatherMap_lat, 'lon':openWeatherMap_lon, 'exclude': "minutely,daily", 'appid':openWeatherMap_token}
r = requests.get(openWeather_url, params = params).json()
hourly = r['hourly']

将数据转换为 Pandas DataFrame

接下来,将JSON数据转换为Pandas DataFrame。我们还将Unix时间戳(精确到秒)转换为datetime对象。进行这种转换是因为InfluxDB的写入方法要求时间戳为datetime对象格式。我们将使用此方法将数据写入InfluxDB。我们还将删除不希望写入InfluxDB的列。

# Convert data to Pandas DataFrame and convert timestamp to datetime object
df = pd.json_normalize(hourly)
df = df.drop(columns=['weather', 'pop'])
df['dt'] = pd.to_datetime(df['dt'], unit='s')
print(df.head)

将Pandas DataFrame写入InfluxDB

现在实例化InfluxDB Python客户端库并将DataFrame写入InfluxDB。我们指定了一个测量名称。测量包含一个bucket中的数据。你可以将其视为InfluxDB中bucket之后的第二个数据组织层次。

您还可以使用data_frame__tag_columns参数指定哪些列要转换为标签。

由于我们没有指定任何列作为标签,所以所有列都将转换为InfluxDB中的字段。标签用于写入有关您的时序数据的元数据,并且可以用于更有效地查询数据子集。字段是您在InfluxDB中存储实际时序数据的地方。《[InfluxDB文档](https://docs.influxdb.org.cn/influxdb/cloud/reference/key-concepts/)》更详细地介绍了InfluxDB中的这些数据概念。

# Write data to InfluxDB
with InfluxDBClient(url=url, token=token, org=org) as client:
   df = df
   client.write_api(write_options=SYNCHRONOUS).write(bucket=bucket,record=df,
       data_frame_measurement_name="weather",
       data_frame_timestamp_column="dt")

完整脚本

为了复习,让我们一起查看完整的脚本。我们遵循以下步骤

  1. 导入库
  2. 收集以下内容
    • InfluxDB bucket
    • InfluxDB org
    • InfluxDB token
    • InfluxDB URL
    • OpenWeatherMap URL
    • OpenWeatherMap token
  3. 构建您的请求。
  4. 将JSON响应转换为Pandas DataFrame。
  5. 删除您不希望写入InfluxDB的任何列。
  6. 将时间戳列从Unix时间转换为Pandas datetime对象。
  7. 实例化InfluxDB Python客户端库。
  8. 写入DataFrame并指定测量名称和时间戳列。
import requests
import influxdb_client
import pandas as pd
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS

bucket = "OpenWeather"
org = "<my_InfluxDB_org>" # or email you used to create your Free Tier InfluxDB Cloud account
token = "<my_InfluxDB_token"
url = "<my_InfluxDB_url>" # for example, https://us-west-2-1.aws.cloud2.influxdata.com/
openWeatherMap_token = "<my_OpenWeatherMap_token>"
openWeatherMap_lat = "33.44"
openWeatherMap_lon = "-94.04"
openWeather_url = "https://api.openweathermap.org/data/2.5/onecall"

# Get time series data from OpenWeatherMap API
params = {'lat':openWeatherMap_lat, 'lon':openWeatherMap_lon, 'exclude': "minutely,daily", 'appid':openWeatherMap_token}
r = requests.get(openWeather_url, params = params).json()
hourly = r['hourly']

# Convert data to Pandas DataFrame and convert timestamp to datetime object
df = pd.json_normalize(hourly)
df = df.drop(columns=['weather', 'pop'])
df['dt'] = pd.to_datetime(df['dt'], unit='s')
print(df.head)

# Write data to InfluxDB
with InfluxDBClient(url=url, token=token, org=org) as client:
   df = df
   client.write_api(write_options=SYNCHRONOUS).write(bucket=bucket,record=df,
       data_frame_measurement_name="weather",
       data_frame_timestamp_column="dt")

查询数据

现在我们已经将数据写入InfluxDB,我们可以使用InfluxDB UI查询我们的数据。导航到数据探索器(从左侧导航栏)。使用查询构建器选择您想要可视化的数据以及您想要可视化的时间范围,然后点击提交

Data Explorer-Query the Data

我们的天气数据的默认物化视图。InfluxDB自动对时序数据进行聚合,以便新用户不会意外查询过多数据并超时。

提示:InfluxDB在您使用查询构建器查询数据时会自动对您的数据进行下采样。要查询原始数据,请导航到脚本编辑器以查看底层Flux查询。Flux是InfluxDB的本地查询和脚本语言,可以用于分析创建预测您的时序数据。取消注释或删除带有aggregateWindow()函数的行以查看原始数据。

data explorer-veiw raw data

导航到脚本编辑器并取消注释或删除aggregateWindow()函数以查看原始天气数据。

结论

我希望这篇博客文章能够激发您使用InfluxDB Python客户端库从InfluxDB获取和存储时序数据的兴趣。如果您想了解更多关于如何使用Python客户端库从InfluxDB查询数据的信息,我鼓励您查看这篇文章。还值得注意的是,您可以使用Flux任务从OpenWeatherMap API获取数据并将其与InfluxDB一起存储。如果您使用的是InfluxDB Cloud,这意味着这个Flux脚本将被定期托管和执行,因此您可以确保将可靠的天气数据流输入到您的实例中。想了解如何使用Flux按用户定义的时间表获取天气数据,请阅读这篇文章