InfluxDB、Flight SQL、Pandas 和 Jupyter Notebook 教程
作者:Anais Dotis-Georgiou / 产品
2023年3月29日
导航到
InfluxDB Cloud,由 IOx 驱动,是一个基于 Apache 生态系统构建的通用时序数据库。您可以使用 Apache Arrow Flight SQL 接口查询 InfluxDB Cloud,该接口提供了对时序数据的 SQL 支持。在本教程中,我们将指导您如何使用 Flight SQL 查询 InfluxDB Cloud,利用 Pandas 和 Jupyter Notebook 探索和分析结果数据,并创建交互式图表和可视化。Pandas 是一个开源的数据结构库,可以轻松地处理和分析数据。Jupyter Notebook 是“创建和分享计算文档的原始网络应用。它提供了一个简单、流畅、以文档为中心的体验”。无论您处理的是传感器数据、金融数据还是任何其他类型的时序数据,本教程都为您提供了使用 InfluxDB Cloud、Flight SQL、Pandas 和 Jupyter Notebook 获取洞察并基于数据做出明智决策的坚实基础。您可以尝试使用以下 笔记本。
要求
本教程假设您满足以下要求
-
Python 3.6
-
InfluxDB v2 Python 客户端库。使用
pip3 install 'influxdb-client[ciso]`
安装它。 -
Flight SQL Python 库。使用
pip install git+https://github.com/influxdata/flightsql-dbapi.git
或bash pip3 install flightsql-dbapi
-
Pandas 库。使用
pip3 install flightsql-dbapi
-
Jupyter Notebook。使用
pip3 install notebook
您也可以使用以下 requirements.txt 安装所有以下依赖项
pandas==0.23.4
influxdb_client==1.30.0
flightsql-dbapi==0.0.1
notebook ==6.4.12
工作流程
- 导入依赖项。
- 收集认证凭证,包括
- 实例化 Flight SQL 客户端。
- 执行 SQL 查询。在之前的文章中,我们介绍了如何使用
Date_Bin()
函数进行降采样。在本教程中,我们将使用 Pandas。 - 创建一个读取器对象以消费结果。
- 将所有数据读入
pyarrow.Table
对象。 - 将数据转换为 Pandas DataFrame。
- 使用 Pandas 降采样数据。
- 使用 matplotlib 和 seaborn 创建数据的可视化。
- 实例化 InfluxDB v2 Python 客户端库,并将 Pandas DataFrame 写回 InfluxDB。
步骤 1-7:模板代码,使用 Flight SQL 查询 InfluxDB Cloud,并将输出转换为 Pandas DataFrame
步骤 1-4基本上是模板代码。在这个例子中,我们使用 SQL 从该存储桶中选择所有数据。我们查询所有数据,因为我们的数据有缺失。使用 Pandas 和 Python 的一个优点是能够轻松地清理数据并利用库进行自定义可视化。以下部分展示了如何使用 Seaborn 可视化有缺失的时间序列。Seaborn 是一个基于 matplotlib
的 Python 数据可视化库,另一个流行的 Python 可视化库。
# 1. Import dependencies.
from flightsql import FlightSQLClient
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
# 2. Gather authentication credentials.
token = "xxx"
url = "https://us-east-1-1.aws.cloud2.influxdata.com/"
org = "xxx"
bucket = "anais-iox"
# 3. Instantiate the FlightSQL Client
client = FlightSQLClient(host='us-east-1-1.aws.cloud2.influxdata.com',
token=token,
metadata={'bucket-name': 'anais-iox'},
features={'metadata-reflection': 'true'})
# 4. Execute a query against InfluxDB's Flight SQL endpoint. Here we are querying for all of our data.
query = client.execute("SELECT * FROM \"airSensors\"")
# 5. Create reader to consume result
reader = client.do_get(query.endpoints[0].ticket)
# 6. Read all data into a pyarrow.Table
Table = reader.read_all()
# 7. Convert to Pandas DataFrame
df = Table.to_pandas()
df = df.sort_values(by="time")
df.head(20)
得到的 DataFrame 看起来像这样。我们使用 head()
函数包含 20 个值,只是为了确保它为每个传感器返回多个时间点。
现在我们想使用 Pandas 对我们的数据进行下采样。
步骤 8:使用 Pandas 进行下采样
这里的目的是找到 temperature
、co
和 humidity
字段在 10 分钟间隔内的平均值。使用 groupby() 函数按 sensor_id
标签(或列)对数据框进行分组。然后我们使用 resample() 和 mean() 函数进行下采样,并在间隔上应用平均值。
df_mean = df.groupby(by=["sensor_id"]).resample('10min', on='time').mean().dropna()
# create a copy of the downsampled data so we can write it back to InfluxDB Cloud powered by IOx.
df_write = df_mean.reset_index()
df_mean
得到的 DataFrame 看起来像这样
步骤 9:使用 matplotlib 和 seaborn 创建数据可视化
首先,我们需要使用 matplotlib 创建一个可视化。我们将专注于可视化所有传感器的下采样温度值。为了绘制所有传感器,我们需要首先对数据进行转换。以下代码创建了后续的图表
df_mean.reset_index().pivot(index='time', columns='sensor_id', values='temperature').plot(rot=90)
这个图表不太好。时间戳没有分钟精度,图例位置不佳,并且我们的数据集中有一个大的缺口,这导致了那些直线。让我们使用 Pandas 清理我们的数据,并利用 Seaborn 创建一个更令人愉悦的可视化。
# Convert the time object to a string to handle the gap.
df_mean.reset_index('sensor_id', inplace=True)
df_mean.index = df_mean.index.map(str)
# Make a plot with seaborn instead to make the visualization stronger by moving the labels, pivoting the x-axis values, and removing the gap.
sns.set_style('darkgrid')
sns.set(rc={'figure.figsize':(14,8)})
ax = sns.lineplot(data=df_mean.reset_index(), x ='time', y = 'temperature',
hue='sensor_id', palette='viridis',
legend='full', lw=3)
plt.legend(bbox_to_anchor=(1, 1))
plt.ylabel('Temperature')
plt.xlabel('Time')
plt.xticks(rotation=45)
plt.show()
结果是以下图表:我们消除了数据中的缺口,并创建了一个更高精度的 y 轴。
步骤 10:将下采样数据写回 InfluxDB Cloud
最后,我们使用 InfluxDB v2 Python 客户端和 .write
方法将下采样数据写回 InfluxDB Cloud,并指定要写回 InfluxDB 的 DataFrame。
# write data back to InfluxDB Cloud powered by IOx
client = InfluxDBClient(url=url, token=token, org=org)
client.write_api(write_options=SYNCHRONOUS).write(bucket=bucket,
record=df_write,
data_frame_measurement_name="mean_downsampled",
data_frame_timestamp_column='time',
data_frame_tag_columns=['sensor_id'])
使用 InfluxDB Cloud UI 中的 SQL 查询来验证我们成功地将下采样数据写回 InfluxDB。
结论
使用 Flight SQL 查询 InfluxDB Cloud 允许开发者利用 Flight SQL 提供的性能优势,以及大量用于分析、清理和可视化的工具。我鼓励您通过将它们与包含在 此存储库 中的预测和异常检测示例相结合来扩展本文中讨论的方法,这是一个 Jupyter notebooks 的集合,强调如何使用各种流行的 Python 库进行时间序列数据分析应用。最后,在此处找到与本教程相关的笔记本 这里。
要利用 InfluxDB IOx 的所有进步,请在此处注册 这里。如果您想联系 InfluxDB IOx 开发者,请加入 InfluxData Community Slack 并查找 #influxdb_iox 频道。
希望这篇博客文章能够激发您探索InfluxDB Cloud,并利用Flight SQL将大量数据集从InfluxDB传输出来,使用您选择的工具进行数据处理。如果您需要任何帮助,请通过我们的社区网站或Slack频道联系。我很乐意了解您正在尝试实现什么,以及您希望InfluxDB的任务系统拥有哪些功能。