Apache Arrow 基础知识:使用 Apache Arrow Python 进行编码
作者:Jay Clifford / 用例, 产品
2023 年 1 月 13 日
导航至
到目前为止,您可能已经意识到 InfluxData 一直在忙于构建下一代 InfluxDB 存储引擎。如果您深入挖掘,您将开始发现一些您可能不熟悉的概念
-
Apache Parquet
-
Apache Arrow
-
Arrow Flight
这些开源项目是构成新存储引擎的一些核心构建块。在大多数情况下,您无需担心底层原理。但是,如果您像我一样,想要更实际地了解其中一些项目,那就加入我的探索之旅吧。
我们将深入研究的第一个组件是 Apache Arrow。我的同事 Charles 给出了一个很棒的高级概述,您可以在这里找到。简而言之
“Arrow 以数组形式管理数据,这些数组可以分组在表中,以表示表格数据中的数据列。Arrow 还为各种格式提供支持,以便将这些表格数据导入和导出到磁盘和网络。最常用的格式是 Parquet(您将经常接触到这个概念)。”
出于性能原因,我们的开发人员使用 Rust 编写了 InfluxDB 新的存储引擎代码。我个人喜欢在 Python 中学习新的编码概念,因此我们将使用 pyarrow 客户端库。
基础知识
在 Apache Arrow 中,您有两个主要的数据容器/类:数组和表。 我们稍后将更深入地探讨这些内容,但让我们先编写一个快速代码片段来创建它们
import pyarrow as pa
# Create a array from a list of values
animal = pa.array(["sheep", "cows", "horses", "foxes"], type=pa.string())
count = pa.array([12, 5, 2, 1], type=pa.int8())
year = pa.array([2022, 2022, 2022, 2022], type=pa.int16())
# Create a table from the arrays
table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])
print(table)
在此示例中,您可以看到我们构建了 3 个值数组:animal、count 和 year。我们可以将这些数组组合起来形成表的列。运行此代码的结果如下所示
animal: string
count: int8
year: int16
----
animal: [["sheep","cows","horses","foxes"]]
count: [[12,5,2,1]]
year: [[2022,2022,2022,2022]]
现在我们有了一个可以使用的表,让我们看看我们可以用它做什么。Arrow 的首要功能是提供用于保存和恢复表格数据的功能(最常见的是 Parquet 格式,这将在未来的博客中大量介绍)。
让我们保存并加载我们新创建的表
import pyarrow as pa
import pyarrow.parquet as pq
# Create a array from a list of values
animal = pa.array(["sheep", "cows", "horses", "foxes"], type=pa.string())
count = pa.array([12, 5, 2, 1], type=pa.int8())
year = pa.array([2022, 2022, 2022, 2022], type=pa.int16())
# Create a table from the arrays
table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])
# Save the table to a Parquet file
pq.write_table(table, 'example.parquet')
# Load the table from the Parquet file
table2 = pq.read_table('example.parquet')
print(table2)
最后,为了完成基础知识,让我们尝试一个计算函数 (value_counts)。我们可以将计算函数应用于数组和表,这使我们能够对数据集应用转换。我们将在下一节中更详细地介绍这些内容,但让我们从一个简单的示例开始
import pyarrow as pa
import pyarrow.compute as pc
# Create a array from a list of values
animal = pa.array(["sheep", "cows", "horses", "foxes", "sheep"], type=pa.string())
count = pa.array([12, 5, 2, 1, 10], type=pa.int8())
year = pa.array([2022, 2022, 2022, 2022, 2021], type=pa.int16())
# Create a table from the arrays
table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])
count_y = pc.value_counts(table['animal'])
print(count_y)
如您所见,调用库 pyarrow.compute 为 pc,并使用内置的 count 函数。这使我们能够计算给定数组或表中的值数量。我们选择计算动物的数量,这将产生以下输出
-- child 0 type: string
[
"sheep",
"cows",
"horses",
"foxes"
]
-- child 1 type: int64
[
2,
1,
1,
1
]
一个实际示例
所以我决定跳过向您列出所有数据类型和处理器,并认为我会向您展示一个更实际的示例,说明如何在 InfluxDB 的 TSM 引擎中使用 Apache Arrow。现在,剧透:这不是您与 InfluxDB 新存储引擎交互的方式(查询比这要流畅得多)。这纯粹是为了将大型样本数据集从 InfluxDB 中提取到 PyArrow 中,以便我们可以对其进行实验。InfluxDB 的新存储引擎将允许自动将您的数据导出为 Parquet 文件。
计划如下
-
使用 InfluxDB Python 客户端库的传统方法查询 InfluxDB(使用 to data frame 方法)。
-
使用 Apache Arrow 的内置 Pandas Dataframe 转换方法将我们的数据集转换为 Arrow 表数据结构。
-
然后我们将使用一个新函数将表保存为一系列分区 Parquet 文件到磁盘。
-
最后,第二个脚本将重新加载分区,并对我们的 Arrow 表结构执行一系列基本聚合。
让我们看一下代码
create_parquet.py
from influxdb_client import InfluxDBClient
from pandas import DataFrame as df
import pyarrow.dataset as ds
import pyarrow as pa
with InfluxDBClient(url="http://localhost:8086", token="edge", org="influxdb", debug=False) as client:
query_api = client.query_api()
query = '''
import "influxdata/influxdb/sample"
sample.data(set: "usgs")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group()
'''
df = query_api.query_data_frame(query=query)
table = pa.Table.from_pandas(df)
print(table)
print("Saving to parquet files...")
# Drop result and table columns
table = table.drop(["result", "table"])
print(table)
# partitioning of your data in smaller chunks
ds.write_dataset(table, "usgs", format="parquet",
partitioning=ds.partitioning(
pa.schema([table.schema.field("_measurement")])
))
因此,您不熟悉的两个函数是:
-
pa.Table.pandas(df) : 这会自动将我们的数据帧转换为 Arrow 表。注意:请注意您的数据类型!有关更多信息,请查阅官方文档。
-
Write_dataset(… partitioning=ds.partitioning(…)) : 此修改后的方法根据我们的“_measurement”列中的值将我们的表分区为 Parquet 文件。这将看起来像一个目录树。此方法有助于将大型数据集分隔为更易于管理的资产。
现在让我们看一下第二个脚本,它处理我们保存的 Parquet 文件
import pyarrow.dataset as ds
# Loading back the partitioned dataset will detect the chunks
usgs = ds.dataset("usgs", format="parquet", partitioning=["_measurement"])
print(usgs.files)
# Convert to a table
usgs = usgs.to_table()
print(usgs)
# Grouped Aggregation example
aggregation = usgs.group_by("_measurement").aggregate([("rms", "mean"), ("rms", "max"), ("rms", "min") ]).to_pandas()
print(aggregation)
在此脚本中,我们部署了几个新函数,如果您使用 Pandas 或其他查询引擎,您可能很熟悉这些函数:group_by 和 aggregate。我们使用这些函数根据度量对我们的数据点进行分组,并为每个组提供数学聚合(平均值、最大值、众数)。这将生成一个基于聚合的新 Arrow 表。然后我们将表转换回数据帧以提高可读性。
结论
我希望这篇博客能让您开始更深入地研究 Apache Arrow,并帮助您了解我们为什么决定投资 Apache Arrow 及其子产品的未来。我还希望它能为您提供基础,开始探索如何从这个框架构建您自己的分析应用程序。InfluxDB 的新存储引擎强调了其对更大生态系统的承诺。例如,允许导出 Parquet 文件使我们有机会在 Rapid Miner 和其他分析平台等平台上分析我们的数据。
我对您的行动号召是查看这里的代码,并发现 Apache Arrow 提供的其他处理器功能。接下来的很多内容都将围绕 Apache Parquet,因此如果有任何使用 Parquet 的产品/平台您希望我们谈论,请告诉我们。欢迎加入我们的 Slack 和论坛。分享您的想法 — 我期待在那里见到您!