Apache Arrow基础知识:使用Apache Arrow Python进行编码

导航到

到如今,你可能已经了解到InfluxData一直在忙于构建下一代InfluxDB存储引擎。如果你深入了解,你将开始发现一些可能对你来说陌生的概念

  • Apache Parquet

  • Apache Arrow

  • Arrow Flight

这些开源项目是新存储引擎的核心构建块之一。大部分情况下,你不需要担心底层的实现。尽管如果你像我一样,想要更实际地了解一些项目,那么就和我一起探索这些项目的奥秘吧。

我们将要深入了解的第一个组件是Apache Arrow。我的同事Charles提供了一个很好的高级概述,你可以在这里找到这里。简而言之

“Arrow管理数组中的数据,可以将数组分组到表中,以表示表格数据的列。Arrow还提供对各种格式的支持,以便将这些表格数据存储到磁盘和网络中。最常用的格式是Parquet(您将相当多地接触到这个概念)。”

出于性能考虑,我们的开发人员使用Rust编写了InfluxDB的新存储引擎。我个人喜欢用Python学习新的编程概念,因此我们将利用pyarrow客户端库。

基础知识

在Apache Arrow中,您有两个主要的数据容器/类:数组和表。我们将在后面更深入地探讨这些是什么,但让我们先写一段代码来创建每个

import pyarrow as pa

# Create a array from a list of values
animal = pa.array(["sheep", "cows", "horses", "foxes"], type=pa.string())
count = pa.array([12, 5, 2, 1], type=pa.int8())
year = pa.array([2022, 2022, 2022, 2022], type=pa.int16())

# Create a table from the arrays
table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])
print(table)

所以在这个例子中,您可以看到我们构建了3个值数组:动物、计数和年份。我们可以将这些数组组合成表的列。运行此代码的结果如下

animal: string
count: int8
year: int16
----
animal: [["sheep","cows","horses","foxes"]]
count: [[12,5,2,1]]
year: [[2022,2022,2022,2022]]

所以现在我们有了可以工作的表,让我们看看我们能用它做什么。Arrow的第一个主要特性是提供保存和恢复表格数据的设施(通常保存为Parquet格式,这将在未来的博客中占很大比重)。

让我们保存和加载数据表

import pyarrow as pa
import pyarrow.parquet as pq

# Create a array from a list of values
animal = pa.array(["sheep", "cows", "horses", "foxes"], type=pa.string())
count = pa.array([12, 5, 2, 1], type=pa.int8())
year = pa.array([2022, 2022, 2022, 2022], type=pa.int16())

# Create a table from the arrays
table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])

# Save the table to a Parquet file
pq.write_table(table, 'example.parquet')

# Load the table from the Parquet file
table2 = pq.read_table('example.parquet')
print(table2)

最后,为了完成基础知识,让我们尝试一个计算函数(value_counts)。我们可以将计算函数应用于数组和表,然后允许我们对数据集进行转换。我们将在下一节中更详细地介绍这些内容,但让我们从一个简单的例子开始

import pyarrow as pa
import pyarrow.compute as pc

# Create a array from a list of values
animal = pa.array(["sheep", "cows", "horses", "foxes", "sheep"], type=pa.string())
count = pa.array([12, 5, 2, 1, 10], type=pa.int8())
year = pa.array([2022, 2022, 2022, 2022, 2021], type=pa.int16())

# Create a table from the arrays
table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])

count_y = pc.value_counts(table['animal'])
print(count_y)

如您所见,调用库pyarrow.compute为pc,并使用内置的计数函数。这允许我们在给定的数组或表中计算值数。我们选择计算动物的数量,产生以下输出

-- child 0 type: string
  [
    "sheep",
    "cows",
    "horses",
    "foxes"
  ]
-- child 1 type: int64
  [
    2,
    1,
    1,
    1
  ]

一个实际例子

所以我决定跳过列出所有数据类型和处理器,我想给你展示一个更现实的例子,即使用Apache Arrow与InfluxDB的TSM引擎。**现在,剧透一下:这不是与InfluxDB新存储引擎交互的方式(查询要比那个更简洁)。这纯粹是为了从InfluxDB中提取大量样本数据到PyArrow,以便我们可以对其进行实验。InfluxDB的新存储引擎将允许自动将数据导出为Parquet文件**。

计划如下

  • 使用InfluxDB Python客户端库的常规方法查询InfluxDB(使用转换为数据框方法)。

  • 使用Apache Arrow内置的Pandas数据框转换方法将我们的数据集转换为Arrow表数据结构。

  • 然后我们将使用一个新功能将表保存为一系列分区Parquet文件到磁盘。

  • 最后,第二个脚本将重新加载分区并对我们保存的Arrow表结构执行一系列基本聚合操作。

让我们看看代码

create_parquet.py

from influxdb_client import InfluxDBClient
from pandas import DataFrame as df
import pyarrow.dataset as ds
import pyarrow as pa

with InfluxDBClient(url="https://127.0.0.1:8086", token="edge", org="influxdb", debug=False) as client:
    query_api = client.query_api()
    query = '''
    import "influxdata/influxdb/sample"
    sample.data(set: "usgs")
        |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
        |> group() 
    '''

df = query_api.query_data_frame(query=query)

table = pa.Table.from_pandas(df)
print(table)
print("Saving to parquet files...")

# Drop result and table columns
table = table.drop(["result", "table"])
print(table)

# partitioning of your data in smaller chunks
ds.write_dataset(table, "usgs", format="parquet",
                 partitioning=ds.partitioning(
                    pa.schema([table.schema.field("_measurement")])
                ))

您可能不熟悉的两个函数是;

  • pa.Table.pandas(df) : 这个函数自动将我们的数据框转换为Arrow表。注意:注意您的数据类型!请参阅官方文档获取更多信息。

  • Write_dataset(… partitioning=ds.partitioning(…)) : 这个修改后的方法根据我们‘_ measurement’列中的值将我们的表分区为Parquet文件。这看起来像一棵目录树。此方法有助于将大型数据集分离成更易管理的资产。

现在让我们看看第二个脚本,它将处理我们保存的Parquet文件

import pyarrow.dataset as ds

# Loading back the partitioned dataset will detect the chunks                
usgs = ds.dataset("usgs", format="parquet", partitioning=["_measurement"])
print(usgs.files)

# Convert to a table
usgs = usgs.to_table()
print(usgs)

# Grouped Aggregation example
aggregation = usgs.group_by("_measurement").aggregate([("rms", "mean"), ("rms", "max"), ("rms", "min") ]).to_pandas()
print(aggregation)

在这段脚本中,我们部署了几个您可能在使用 Pandas 或其他查询引擎时熟悉的函数:group_byaggregate。我们使用这些函数根据测量值对数据点进行分组,并为每个组提供数学汇总(平均值、最大值、众数)。这基于汇总生成一个新的 Arrow 表格。然后,我们将表格转换回数据框以提高可读性。

结论

我希望这篇博客能够帮助您深入了解 Apache Arrow,并帮助您理解我们为何决定投资 Apache Arrow 及其子产品。我还希望它为您提供了从该框架构建自己的分析应用程序的基础。InfluxDB 的新存储引擎强调了其对更大生态系统的承诺。例如,允许导出 Parquet 文件为我们提供了在 Rapid Miner 和其他分析平台分析数据的机会。

我的行动号召是,您可以查看这里的代码,并发现 Apache Arrow 提供的一些其他处理器功能。接下来,大部分内容都将围绕 Apache Parquet 展开,因此,如果您想让我们讨论使用 Parquet 的任何产品/平台,请告诉我们。加入我们的 Slack论坛。分享您的想法——我期待在那里见到您!