Apache Arrow 基础知识:使用 Apache Arrow Python 进行编码

导航至

到目前为止,您可能已经意识到 InfluxData 一直在忙于构建下一代 InfluxDB 存储引擎。如果您深入挖掘,您将开始发现一些您可能不熟悉的概念

  • Apache Parquet

  • Apache Arrow

  • Arrow Flight

这些开源项目是构成新存储引擎的一些核心构建块。在大多数情况下,您无需担心底层原理。但是,如果您像我一样,想要更实际地了解其中一些项目,那就加入我的探索之旅吧。

我们将深入研究的第一个组件是 Apache Arrow。我的同事 Charles 给出了一个很棒的高级概述,您可以在这里找到。简而言之

“Arrow 以数组形式管理数据,这些数组可以分组在表中,以表示表格数据中的数据列。Arrow 还为各种格式提供支持,以便将这些表格数据导入和导出到磁盘和网络。最常用的格式是 Parquet(您将经常接触到这个概念)。”

出于性能原因,我们的开发人员使用 Rust 编写了 InfluxDB 新的存储引擎代码。我个人喜欢在 Python 中学习新的编码概念,因此我们将使用 pyarrow 客户端库。

基础知识

在 Apache Arrow 中,您有两个主要的数据容器/类:数组和表。 我们稍后将更深入地探讨这些内容,但让我们先编写一个快速代码片段来创建它们

import pyarrow as pa

# Create a array from a list of values
animal = pa.array(["sheep", "cows", "horses", "foxes"], type=pa.string())
count = pa.array([12, 5, 2, 1], type=pa.int8())
year = pa.array([2022, 2022, 2022, 2022], type=pa.int16())

# Create a table from the arrays
table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])
print(table)

在此示例中,您可以看到我们构建了 3 个值数组:animal、count 和 year。我们可以将这些数组组合起来形成表的列。运行此代码的结果如下所示

animal: string
count: int8
year: int16
----
animal: [["sheep","cows","horses","foxes"]]
count: [[12,5,2,1]]
year: [[2022,2022,2022,2022]]

现在我们有了一个可以使用的表,让我们看看我们可以用它做什么。Arrow 的首要功能是提供用于保存和恢复表格数据的功能(最常见的是 Parquet 格式,这将在未来的博客中大量介绍)。

让我们保存并加载我们新创建的表

import pyarrow as pa
import pyarrow.parquet as pq

# Create a array from a list of values
animal = pa.array(["sheep", "cows", "horses", "foxes"], type=pa.string())
count = pa.array([12, 5, 2, 1], type=pa.int8())
year = pa.array([2022, 2022, 2022, 2022], type=pa.int16())

# Create a table from the arrays
table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])

# Save the table to a Parquet file
pq.write_table(table, 'example.parquet')

# Load the table from the Parquet file
table2 = pq.read_table('example.parquet')
print(table2)

最后,为了完成基础知识,让我们尝试一个计算函数 (value_counts)。我们可以将计算函数应用于数组和表,这使我们能够对数据集应用转换。我们将在下一节中更详细地介绍这些内容,但让我们从一个简单的示例开始

import pyarrow as pa
import pyarrow.compute as pc

# Create a array from a list of values
animal = pa.array(["sheep", "cows", "horses", "foxes", "sheep"], type=pa.string())
count = pa.array([12, 5, 2, 1, 10], type=pa.int8())
year = pa.array([2022, 2022, 2022, 2022, 2021], type=pa.int16())

# Create a table from the arrays
table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])

count_y = pc.value_counts(table['animal'])
print(count_y)

如您所见,调用库 pyarrow.compute 为 pc,并使用内置的 count 函数。这使我们能够计算给定数组或表中的值数量。我们选择计算动物的数量,这将产生以下输出

-- child 0 type: string
  [
    "sheep",
    "cows",
    "horses",
    "foxes"
  ]
-- child 1 type: int64
  [
    2,
    1,
    1,
    1
  ]

一个实际示例

所以我决定跳过向您列出所有数据类型和处理器,并认为我会向您展示一个更实际的示例,说明如何在 InfluxDB 的 TSM 引擎中使用 Apache Arrow。现在,剧透:这不是您与 InfluxDB 新存储引擎交互的方式(查询比这要流畅得多)。这纯粹是为了将大型样本数据集从 InfluxDB 中提取到 PyArrow 中,以便我们可以对其进行实验。InfluxDB 的新存储引擎将允许自动将您的数据导出为 Parquet 文件。

计划如下

  • 使用 InfluxDB Python 客户端库的传统方法查询 InfluxDB(使用 to data frame 方法)。

  • 使用 Apache Arrow 的内置 Pandas Dataframe 转换方法将我们的数据集转换为 Arrow 表数据结构。

  • 然后我们将使用一个新函数将表保存为一系列分区 Parquet 文件到磁盘。

  • 最后,第二个脚本将重新加载分区,并对我们的 Arrow 表结构执行一系列基本聚合。

让我们看一下代码

create_parquet.py

from influxdb_client import InfluxDBClient
from pandas import DataFrame as df
import pyarrow.dataset as ds
import pyarrow as pa

with InfluxDBClient(url="http://localhost:8086", token="edge", org="influxdb", debug=False) as client:
    query_api = client.query_api()
    query = '''
    import "influxdata/influxdb/sample"
    sample.data(set: "usgs")
        |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
        |> group() 
    '''

df = query_api.query_data_frame(query=query)

table = pa.Table.from_pandas(df)
print(table)
print("Saving to parquet files...")

# Drop result and table columns
table = table.drop(["result", "table"])
print(table)

# partitioning of your data in smaller chunks
ds.write_dataset(table, "usgs", format="parquet",
                 partitioning=ds.partitioning(
                    pa.schema([table.schema.field("_measurement")])
                ))

因此,您不熟悉的两个函数是:

  • pa.Table.pandas(df) : 这会自动将我们的数据帧转换为 Arrow 表。注意:请注意您的数据类型!有关更多信息,请查阅官方文档

  • Write_dataset(… partitioning=ds.partitioning(…)) : 此修改后的方法根据我们的“_measurement”列中的值将我们的表分区为 Parquet 文件。这将看起来像一个目录树。此方法有助于将大型数据集分隔为更易于管理的资产。

现在让我们看一下第二个脚本,它处理我们保存的 Parquet 文件

import pyarrow.dataset as ds

# Loading back the partitioned dataset will detect the chunks                
usgs = ds.dataset("usgs", format="parquet", partitioning=["_measurement"])
print(usgs.files)

# Convert to a table
usgs = usgs.to_table()
print(usgs)

# Grouped Aggregation example
aggregation = usgs.group_by("_measurement").aggregate([("rms", "mean"), ("rms", "max"), ("rms", "min") ]).to_pandas()
print(aggregation)

在此脚本中,我们部署了几个新函数,如果您使用 Pandas 或其他查询引擎,您可能很熟悉这些函数:group_byaggregate。我们使用这些函数根据度量对我们的数据点进行分组,并为每个组提供数学聚合(平均值、最大值、众数)。这将生成一个基于聚合的新 Arrow 表。然后我们将表转换回数据帧以提高可读性。

结论

我希望这篇博客能让您开始更深入地研究 Apache Arrow,并帮助您了解我们为什么决定投资 Apache Arrow 及其子产品的未来。我还希望它能为您提供基础,开始探索如何从这个框架构建您自己的分析应用程序。InfluxDB 的新存储引擎强调了其对更大生态系统的承诺。例如,允许导出 Parquet 文件使我们有机会在 Rapid Miner 和其他分析平台等平台上分析我们的数据。

我对您的行动号召是查看这里的代码,并发现 Apache Arrow 提供的其他处理器功能。接下来的很多内容都将围绕 Apache Parquet,因此如果有任何使用 Parquet 的产品/平台您希望我们谈论,请告诉我们。欢迎加入我们的 Slack论坛。分享您的想法 — 我期待在那里见到您!