使用 Python 和地理时空分析入门

导航至

本文最初发布在 The New Stack,并在此处获得许可转载。

处理地理时空数据可能很困难。除了通常与时间序列分析相关联的挑战,例如需要实时访问的大量数据外,使用经纬度通常需要使用三角学,因为您必须考虑地球的曲率。这很费计算。它可能会增加成本并减慢程序。幸运的是,InfluxDB 的地理时空包旨在处理这些问题。

该包使用 S2 几何形状 来实现这一点。S2 系统将地球划分为单元格,以帮助计算机更快地计算位置。与许多其他投影不同,它基于球体而不是平面,因此没有间隙或重叠区域。您可以选择不同的级别来调整每个单元格的大小。使用此系统,计算机可以检查两点之间的单元格数量,以估计它们之间的距离。

在许多情况下,您可以从S2计算中获得足够精确的估计,并且比三角学更快。在需要精确答案的情况下,使用S2几何仍然可以加快速度,因为计算机可以先得到粗略估计,然后仅进行真正需要的昂贵计算。

以示例入门

在这个示例中,我们将计算指定区域和不同时间窗口内海洋的平均表面温度以及标准差。我们将在Jupyter笔记本中使用InfluxDB的Python客户端库。有关这些主题的其他博客文章更多内容

我通过Anaconda使用Jupyter笔记本,所以我的第一步是在命令窗口中输入以下内容来在Anaconda中安装InfluxDB。

conda install -c conda-forge influxdb

然后,按照提示,我必须使用以下内容更新anaconda

conda update -n base -c defaults conda

本示例中使用的文件是NetCDF格式,因此我还必须安装以下内容来读取它

conda install -c conda-forge netcdf4

NetCDF文件是科学数据的一种常见格式。本示例中使用的数据是Roemmich-Gilson Argo温度气候学,可通过页面“2004-2018 RG Argo Temperature Climatology”的第二链接从这里获取。这些数据来自数千个浮标在海洋中不规则的时期和位置进行的测量,这些测量值被平均到一个1°网格的网格产品中。

在您的Jupyter笔记本中,首先运行以下命令以导入本示例所需的各个包。

import matplotlib.pyplot as plt
import numpy as np
import datetime
import pandas as pd
import influxdb_client, os, time
from influxdb_client import InfluxDBClient, Point, WritePrecision, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS
import netCDF4 as nc

清理数据

运行以下命令来读取文件。

file_name = '/filepath/RG_ArgoClim_Temperature_2019.nc'
data_structure = nc.Dataset(file_name)

如果您运行

print(data_structure.variables.keys())

那么以下应该是输出。

dict_keys(['LONGITUDE', 'LATITUDE', 'PRESSURE', 'TIME', 'ARGO_TEMPERATURE_MEAN', 'ARGO_TEMPERATURE_ANOMALY', 'BATHYMETRY_MASK', 'MAPPING_MASK'])

这些都是文件内的各种数组,存储数据。我们感兴趣的是 LONGITUDELATITUDETIMEARGO_TEMPERATURE_MEANARGO_TEMPERATURE_ANOMALY。要从文件中读取数据,请运行

lon = data_structure.variables['LONGITUDE']
lat = data_structure.variables['LATITUDE']
time = data_structure.variables['TIME']
temp_mean = data_structure.variables['ARGO_TEMPERATURE_MEAN']
temp_anom = data_structure.variables['ARGO_TEMPERATURE_ANOMALY']

要查看每个数组的维度,请运行

print(lon.shape)
print(lat.shape)
print(time.shape)
print(temp_mean.shape)
print(temp_anom.shape)

您应该看到

(360,)
(145,)
(180,)
(58, 145, 360)
(180, 58, 145, 360)

这里有360个经度值,145个纬度值和180个时间值。还有58个压力值。由于我们只对表面海洋温度感兴趣,即最低压力,我们将对数组进行子集化以获取第一个压力索引。

您还会注意到,平均值没有时间维度。该数据集将温度分离为整个时间序列的平均值和与平均值的月度差异,称为异常值。为了得到每个月的实际温度,我们只需将平均值和异常值相加。

如果您尝试显示数组中的一个随机点的值,您应该看到

temp_anom[0,0,0,0]

masked_array(data=1.082,
             mask=False,
       fill_value=1e+20,
            dtype=float32)

目前,每个值都作为NetCDF文件内的一个数组存在。运行以下命令以获取简单的值数组

lon = lon[:]
lat = lat[:]
time = time[:]
temp_mean = temp_mean[:]
temp_anom = temp_anom[:]

temp_anom[0,0,0,0]

1.082

时间单位是从数据集开始以来的月份数。为了创建一个可理解的日期时间向量,请运行以下。

time_pass=pd.date_range(start='1/1/2004', periods=180, freq='MS')

要创建只包含表面温度的数组,请使用以下代码将第一个压力索引处的平均值和异常值数组相加。

ocean_surface_temp=np.empty((180,145,360))
for itime in range(180):
ocean_surface_temp[itime,:,:]=temp_mean[0,:,:]+temp_anom[itime,0,:,:]

将数据写入InfluxDB Cloud

现在我们想要的数据已经在一个格式良好的数组中,我们可以开始将其发送到InfluxDB云。为了节省存储空间,在这个例子中,我们不会上传全部数据,而是上传大西洋上一个10度×10度的方块10年的数据。我选择的坐标是15.5°N到25.5°N和34.5°W到44.5°W,即纬度索引80到90,经度索引295到305。

data from Atlantic Ocean

为了最有效地将数据发送到InfluxDB,我们将创建一个数据点的数组。我们将每个点命名为ocean_temperature,并将此名称设置为它的测量值。

要使用InfluxDB中的地理时间包,您需要以纬度和经度作为字段发送您的数据,因此我们的每个点将具有纬度、经度和温度字段。在InfluxDB中,如果在同一时间戳有两个字段值,则上传的下一个值将覆盖上一个值。这对我们来说是个问题,因为我们的数据在同一时间有大量的纬度、经度和温度值。

防止数据被覆盖的简单方法是为每个点提供一个位置标记。在我们的数据中,同时有许多测量值,但没有两个在同一位置和时间。您可以在不希望覆盖的点上同时开发其他唯一的标记,如此处所述。

points_to_send = []
for itime in range(120):
    for ilat in range(80, 90):
        for ilon in range(295, 305):
            p = Point("ocean_temperature")
            p.tag("location", str(lat[ilat]) + str(lon[ilon]))
            p.field("lat", lat[ilat])
            p.field('lon', lon[ilon])
            p.field('temp', ocean_surface_temp[itime,ilat,ilon])
            p.time(time_pass[itime])
            points_to_send.append(p)

然后在InfluxDB UI中,从“加载数据”侧边栏创建一个桶和令牌,如这些截图所示。

Creat Bucket

Create API Token

您的组织是账户的电子邮件,url是您的云账户的url。

token=token
org = org
url = url
bucket = bucket

在这里,我们将批量大小设置为5,000,因为这样可以更有效地执行,如此处所述。

with InfluxDBClient(url=url, token=token, org=org) as client:
    with client.write_api(write_options=WriteOptions(batch_size=5000)) as write_api:
        write_api.write(bucket=bucket, record=points_to_send)

查询数据

现在数据已经在InfluxDB中,我们可以查询它。我将使用几个查询来逐步展示每个命令的作用。首先,设置查询API。

client = influxdb_client.InfluxDBClient(url=url, token=token, org=org)
query_api = client.query_api()

在您的Jupyter笔记本中,每个查询都是一串Flux代码,然后您使用查询API调用它。在这些所有情况下,我们都有很多选项来处理查询结果。对于前几个,我将打印出前几个结果以确保它们正常工作,然后我们将以图表结束。

此查询简单地收集了所有纬度、经度和温度字段。

query1='from(bucket: "sample_geo")\
  |> range(start: 2003-12-31, stop: 2020-01-01)\
  |> filter(fn: (r) => r["_measurement"] == "ocean_temperature")\
  |> filter(fn: (r) => r["_field"] == "lat" or r["_field"] == "temp" or r["_field"] == "lon")\
  |> yield(name: "all points")'

此查询返回纬度。

query2='from(bucket: "sample_geo")\
  |> range(start: 2003-12-31, stop: 2020-01-01)\
  |> filter(fn: (r) => r["_measurement"] == "ocean_temperature")\
  |> filter(fn: (r) => r["_field"] == "lat")\
  |> yield(name: "lat")'

要执行这两个查询并打印出点的数量和前几个结果,您可以运行以下命令,更改传递的查询。

result = client.query_api().query(org=org, query=query2)
results = []
for table in result:
    for record in table.records:
        results.append((record.get_value(), record.get_field()))
print(len(results))
print(results[0:10])

现在我们将开始使用地理时间包。函数geo.shapeData重新格式化数据,并为每个点分配一个S2单元格ID。您指定您的纬度和经度字段名称,“lat”和“lon”在此情况下,以及您想要的S2单元格级别。在这种情况下,我选择了10,这对应于平均1.27平方公里的单元格。您可以在此处了解单元格级别。

接下来,我们将使用函数geo.filterRows来选择我们想要计算平均温度的区域。我选择以20.5°N和39.5°W为中心,半径为150公里的圆形区域,但您可以选择任何类型的方块、圆形或多边形,如此处所述。

默认情况下,数据按s2_cell_id分组,因此要计算整个区域的运行平均值,我们需要运行分组函数,并告诉它不按任何内容分组,以便将该区域的所有数据分组在一起。然后您可以使用aggregateWindow函数来计算您选择的任何时间窗口内的运行平均值和标准偏差。

将所有这些放在一起,下面的代码每三个月和每年计算并绘制这个圆的平均值,每三个月计算标准偏差,我将它放在下面的图表中的误差线上。

query3='import "experimental/geo"\
from(bucket: "sample_geo")\
    |> range(start: 2003-12-31, stop: 2020-01-01)\
    |> filter(fn: (r) => r["_measurement"] == "ocean_temperature")\
    |> geo.shapeData(latField: "lat", lonField: "lon", level: 13)\
    |> geo.filterRows(region: {lat: 20.5, lon: -39.5, radius: 150.0}, strict: true)\
    |> group()\
    |> aggregateWindow(column: "temp",every: 3mo, fn: mean, createEmpty: false)\
    |> yield(name: "running mean")\
    '

query4='import "experimental/geo"\
from(bucket: "sample_geo")\
    |> range(start: 2003-12-31, stop: 2020-01-01)\
    |> filter(fn: (r) => r["_measurement"] == "ocean_temperature")\
    |> geo.shapeData(latField: "lat", lonField: "lon", level: 13)\
    |> geo.filterRows(region: {lat: 20.5, lon: -39.5, radius: 150.0}, strict: true)\
    |> group()\
    |> aggregateWindow(column: "temp",every: 3mo, fn: stddev, createEmpty: false)\
    |> yield(name: "standard deviation")\
    '

query5='import "experimental/geo"\
from(bucket: "sample_geo")\
    |> range(start: 2003-12-31, stop: 2020-01-01)\
    |> filter(fn: (r) => r["_measurement"] == "ocean_temperature")\
    |> geo.shapeData(latField: "lat", lonField: "lon", level: 13)\
    |> geo.filterRows(region: {lat: 20.5, lon: -39.5, radius: 150.0}, strict: true)\
    |> group()\
    |> aggregateWindow(column: "temp",every: 12mo, fn: mean, createEmpty: false)\
    |> yield(name: "running mean")\
    '

result = client.query_api().query(org=org, query=query3)
results_mean = []
results_time = []
for table in result:
    for record in table.records:
        results_mean.append((record["temp"]))
        results_time.append((record["_time"]))

result = client.query_api().query(org=org, query=query4)
results_stddev = []
for table in result:
    for record in table.records:
        results_stddev.append((record["temp"]))
result = client.query_api().query(org=org, query=query5)

results_mean_annual = []
results_time_annual = []
for table in result:
    for record in table.records:
        results_mean_annual.append((record["temp"]))
        results_time_annual.append((record["_time"]))

plt.rcParams["figure.figsize"] = (10,7)
plt.errorbar(results_time,results_mean,results_stddev)
plt.plot(results_time_annual,results_mean_annual)
plt.xlabel("Time")
plt.ylabel("Degrees C")
plt.title("Average Ocean Surface Temperature")

Geo-temporal graph

其他资源

在Python中使用InfluxDB可以使地理时间分析更高效。我希望这个例子可以激发您对使用这个包可以进行的计算的灵感。这仅仅是冰山一角。您还可以使用此包来计算距离、查找交叉点、查找某些区域是否包含特定点等。它还可以在具有更多点的更复杂数据集中节省更多的计算。

为时序数据构建的平台与S2单元系统的组合非常强大。有关更多信息,您可以在我们的文档中阅读有关Flux地理时间包的内容这里,并观看我们关于此主题的“Meet the Developers”迷你系列这里