使用Python中的DataFusion查询Arrow表格
作者:Anais Dotis-Georgiou / 开发者
2023年10月23日
导航至
InfluxDB v3 允许用户以每秒4.3百万点的速度写入数据。然而,如此高的数据摄入速度如果没有查询能力也是毫无意义的。 Apache DataFusion 是一个“可扩展的查询执行框架,使用Rust编写,以Apache Arrow作为其内存格式。”它使查询响应速度比之前未使用Apache生态系统的InfluxDB版本快5-25倍。 Apache Arrow 是一个定义内存中列式数据的框架。对于查询,InfluxDB v3利用了SQL DataFusion API,DataFusion还提供Python DataFrame API。在本教程中,我们将学习如何
- 使用InfluxDB v3 Python客户端库以pyarrow表格格式查询和获取数据。
- 将pyarrow表格转换为Pandas并进行一些转换。
- 将表格保存为Parquet文件。
- 将Parquet文件加载回Arrow表格。
- 创建DataFusion实例并使用SQL查询Parquet数据
Pandas 2.0
在Arrow中,Table与Pandas DataFrame等价。如果您关注了Pandas 2.0的新闻,您知道它支持Arrow和NumPy作为后端。但是,您必须通过指定dtype_backend
参数来显式选择Pandas 2.0,以便使用它。以下是Pandas 1.0和Pandas 2.0的性能比较。
您可以使用InfluxDB v3查询和写入Pandas DataFrame。但是,与Arrow表格和DataFusion一起工作可以提供性能优势。例如,PyArrow表格使用所有核心,而Pandas只使用一个。此外,对于有SQL背景的用户来说,使用SQL查询Arrow表格比使用Pandas进行转换要容易得多。
使用DataFusion和SQL查询、转换和转换Arrow表格
首先,我们将使用InfluxDB v3 Python客户端库查询和以pyarrow表格格式获取数据。在导入您的依赖项后,实例化客户端并查询您的数据。
from influxdb_client_3 import InfluxDBClient3
import pandas as pd
import datafusion
import pyarrow
client = InfluxDBClient3(token="[your authentication token]",
host="us-east-1-1.aws.cloud2.influxdata.com",
database="demo")
query = "SELECT * FROM \"cpu\" WHERE time >= now() - INTERVAL '2 days' AND \"cpu\" IN ('cpu-total')"
table = client.query(query=query)
print(table)
# Alternatively use the pandas mode to return a Pandas DataFrame directly.
# pd = client.query(query=query, mode="pandas")
# pd.head()
接下来,我们将使用DataFusion将Arrow表转换为Pandas DataFrame,并进行一些转换。为了专注于本例中数据类型之间的转换,我们将使用Pandas删除一些多余的列。然而,这正是您可能利用Pandas的全部力量来进行所有数据准备和分析的地方。
df = table.to_pandas()
pd = df[['cpu', 'usage_system', 'time']]
pd.head()
现在,我们使用以下代码将DataFrame保存为Parquet文件
pd.to_parquet(path="./cpu.parquet")
将Parquet文件重新加载到Arrow表,并将其转换回Pandas DataFrame
table = pyarrow.parquet.read_pandas("cpu.parquet")
table
最后,创建一个DataFusion实例,并使用SQL查询Parquet数据。
# create a context
ctx = datafusion.SessionContext()
# Register table with context
ctx.register_parquet("cpu", "cpu.parquet")
# Query with SQL
df = ctx.sql(
"select 'usage_system' from 'cpu'"
)
df.show()
# To convert back into a Pandas DataFrame
# pandas_df = df.to_pandas()
虽然您目前需要通过Arrow查询Parquet文件,但已有计划允许用户直接从InfluxDB查询Parquet文件。这将使用户能够更好地利用InfluxDB v3中的DataFusion和SQL。
结语
InfluxDB是一个优秀的工具,用于存储所有的时间序列数据。它基于Apache生态系统编写,利用了DataFusion、Arrow和Parquet等技术,以实现高效的写入、存储和查询。我希望这个教程能帮助您熟悉如何在这些不同数据类型之间进行转换,以及如何使用SQL查询Parquet文件。从这里开始使用InfluxDB Cloud 3.0。如果您需要任何帮助,请通过我们的社区网站或Slack频道联系。我很乐意了解您想实现的目标以及您希望InfluxDB拥有的功能。