InfluxDB、Flight SQL、Pandas 和 Jupyter Notebooks 教程

导航至

InfluxDB Cloud 由 IOx 提供支持,是一个通用的时间序列数据库,构建于 Apache 生态系统之上。您可以使用 Apache Arrow Flight SQL 接口查询 InfluxDB Cloud,该接口提供 SQL 支持,用于处理时间序列数据。在本教程中,我们将逐步介绍使用 Flight SQL 查询 InfluxDB Cloud 的过程,使用 Pandas 和 Jupyter Notebooks 探索和分析结果数据,并创建交互式绘图和可视化。Pandas 是一个 OSS 数据结构,使数据处理和分析变得容易。Jupyter Notebook 是“用于创建和共享计算文档的原始 Web 应用程序。它提供了一个简单、精简、以文档为中心的体验”。无论您是处理传感器数据、金融数据还是任何其他类型的时间序列数据,本教程都为您使用 InfluxDB Cloud、Flight SQL、Pandas 和 Jupyter Notebooks 获得见解并根据您的数据做出明智的决策奠定了坚实的基础。使用以下 notebook 亲自试用一下。

要求

本教程假定您满足以下要求

  • Python 3.6

  • InfluxDB v2 Python 客户端库。使用 pip3 install 'influxdb-client[ciso]' 安装它。

  • Flight SQL Python 库。使用 pip install git+https://github.com/influxdata/flightsql-dbapi.gitbash pip3 install flightsql-dbapi

  • Pandas 库。使用 pip3 install flightsql-dbapi

  • Jupyter Notebooks。使用 pip3 install notebook

您也可以使用以下 requirements.txt 安装所有以下依赖项

pandas==0.23.4
influxdb_client==1.30.0
flightsql-dbapi==0.0.1
notebook ==6.4.12

工作流程

  1. 导入依赖项。
  2. 收集身份验证凭据,包括
  3. 实例化 Flight SQL 客户端。
  4. 执行 SQL 查询。在之前的文章中,我们描述了如何使用 Date_Bin() 函数执行降采样。在本教程中,我们将改用 Pandas。
  5. 创建一个读取器对象来消费结果。
  6. 将所有数据读取到 `pyarrow.Table` 对象中。
  7. 将数据转换为 Pandas DataFrame。
  8. 使用 Pandas 降采样数据。
  9. 使用 matplotlib 和 seaborn 创建数据可视化。
  10. 实例化 InfluxDB v2 Python 客户端库,并将 Pandas DataFrame 写回 InfluxDB。

步骤 1-7:样板代码、使用 Flight SQL 查询 InfluxDB Cloud 以及将输出转换为 Pandas DataFrame

步骤 1-4 基本上是样板代码。在本示例中,我们使用 SQL 从该 bucket 中选择所有数据。我们查询所有数据是因为我们的数据中存在空白。使用 Pandas 和 Python 的一个优势是可以轻松清理数据并利用库进行自定义可视化。以下部分演示了我们如何使用 Seaborn 可视化带有空白的时间序列。Seaborn 是一个基于 `matplotlib` 构建的 Python 数据可视化库,`matplotlib` 是另一个流行的 Python 可视化库。

# 1. Import dependencies. 
from flightsql import FlightSQLClient
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS

# 2. Gather authentication credentials. 
token = "xxx"
url = "https://us-east-1-1.aws.cloud2.influxdata.com/"
org = "xxx"
bucket = "anais-iox"

# 3. Instantiate the FlightSQL Client
client = FlightSQLClient(host='us-east-1-1.aws.cloud2.influxdata.com',
                        token=token,
                        metadata={'bucket-name': 'anais-iox'},
                        features={'metadata-reflection': 'true'})

# 4. Execute a query against InfluxDB's Flight SQL endpoint. Here we are querying for all of our data.                         
query = client.execute("SELECT * FROM \"airSensors\"")

# 5. Create reader to consume result
reader = client.do_get(query.endpoints[0].ticket)

# 6. Read all data into a pyarrow.Table
Table = reader.read_all()

# 7. Convert to Pandas DataFrame
df = Table.to_pandas()
df = df.sort_values(by="time")
df.head(20)

生成的 DataFrame 如下所示。我们使用 `head()` 函数包含 20 个值,只是为了确保它为每个传感器返回多个时间点。

pandas-downsampling

现在我们想使用 Pandas 降采样我们的数据。

步骤 8:使用 Pandas 降采样

这里的目标是找到我们的 `temperature`、`co` 和 `humidity` 字段在 10 分钟间隔内的平均值。使用 `groupby()` 函数按 `sensor_id` 标签(或列)对我们的 dataframe 进行分组。然后我们分别使用 `resample()` 和 `mean()` 函数来降采样并在间隔内应用平均值。

df_mean = df.groupby(by=["sensor_id"]).resample('10min', on='time').mean().dropna() 
# create a copy of the downsampled data so we can write it back to InfluxDB Cloud powered by IOx. 
df_write = df_mean.reset_index()
df_mean

生成的 DataFrame 如下所示

pandas-downsampling-functions

步骤 9:使用 matplotlib 和 seaborn 创建数据可视化

首先,我们需要使用 matplotlib 创建可视化。我们将重点可视化所有传感器的降采样温度值。为了绘制所有传感器的图形,我们需要先透视我们的数据。以下代码创建了后续图形

df_mean.reset_index().pivot(index='time', columns='sensor_id', values='temperature').plot(rot=90)

data-visualization-matplotlib-seaborn

此图表效果不佳。时间戳没有分钟精度,图例位置不佳,并且我们的数据集中还有一个很大的空白,导致了那些直线。让我们使用 Pandas 清理我们的数据并利用 Seaborn 创建更令人愉悦的可视化效果。

# Convert the time object to a string to handle the gap. 
df_mean.reset_index('sensor_id', inplace=True)
df_mean.index = df_mean.index.map(str)

# Make a plot with seaborn instead to make the visualization stronger by moving the labels, pivoting the x-axis values, and removing the gap. 
sns.set_style('darkgrid')
sns.set(rc={'figure.figsize':(14,8)})

ax = sns.lineplot(data=df_mean.reset_index(), x ='time', y = 'temperature',
                  hue='sensor_id', palette='viridis',
                  legend='full', lw=3)

plt.legend(bbox_to_anchor=(1, 1))
plt.ylabel('Temperature')
plt.xlabel('Time')
plt.xticks(rotation=45)
plt.show()

结果是以下图表:我们消除了数据中的空白并创建了更高精度的 y 轴。

pandas-visualization

步骤 10:将降采样数据写回 InfluxDB Cloud

最后,我们使用 InfluxDB v2 Python 客户端,使用 `.write` 方法并指定我们要写回 InfluxDB 的 DataFrame,将我们的降采样数据写回 InfluxDB Cloud。

# write data back to InfluxDB Cloud powered by IOx
client = InfluxDBClient(url=url, token=token, org=org)
client.write_api(write_options=SYNCHRONOUS).write(bucket=bucket,
               record=df_write,
               data_frame_measurement_name="mean_downsampled",
               data_frame_timestamp_column='time',
               data_frame_tag_columns=['sensor_id'])

在 InfluxDB Cloud UI 中使用 SQL 查询来验证我们是否成功地将降采样数据写回 InfluxDB。

sql-influxdb-cloud

结论

使用 Flight SQL 查询 InfluxDB Cloud 使开发者能够利用 Flight SQL 提供的性能优势,以及用于分析、清理和可视化数据的各种工具。我鼓励您扩展本文中讨论的方法,将其与此 repo 中包含的预测和异常检测示例相结合,该 repo 是一系列 Jupyter Notebooks,重点介绍了如何为时间序列数据分析应用程序使用各种流行的 Python 库。最后,在此处找到与本教程相关的 notebook。

要利用 InfluxDB IOx 的所有进步,请在此处注册。如果您想联系 InfluxDB IOx 开发者,请加入 InfluxData Community Slack 并查找 #influxdb_iox 频道。

我希望这篇博文能启发您探索 InfluxDB Cloud,并利用 Flight SQL 从 InfluxDB 传输大型数据集,以便使用您选择的工具进行数据处理。如果您需要任何帮助,请使用我们的社区网站Slack 频道联系我们。我很想了解您尝试实现的目标以及您希望 InfluxDB 中的任务系统具备哪些功能。