Rust对象存储捐赠

导航至

今天,我们很高兴正式宣布,InfluxData已向Apache Arrow项目捐赠了一个通用的对象存储实现

使用这个包,通过简单的运行时配置更改,相同的代码可以轻松地与AWS S3、Azure Blob Storage、Google Cloud Storage、本地文件、内存等交互。

您可以在crates.io上找到最新版本

我们预计这将加速Rust生态系统中的创新步伐。无论您是在构建一个云无关的服务来处理用户上传的视频、图片和文档,一个高性能的分析系统,还是需要访问商品对象存储的其他东西,这个包都可以帮助您,我们迫不及待地想看看人们用它来做什么。

为什么你需要一个对象存储包?

除了为许多基于云的服务提供大量数据存储之外,我们相信分析系统的未来特别涉及到查询存储在对象存储上的数据。

对象存储是一个通用术语,可以松散地描述为“云中的无限FTP服务器”,提供几乎无限的高可用性和耐用性键值存储。与虚拟机和块存储一样,对象存储是所有现代云服务提供商提供的关键商品服务之一。例如包括 S3Microsoft Azure Blob StorageGoogle Cloud StorageMinIOCeph Object GatewayHDFS 等。

为了实现这种几乎无限的扩展,对象存储提供了传统文件系统(如 NTFSext4)功能的一部分。具体来说,它们使用“键”来标识对象,并将任意字节作为值存储

Rust Object Store Donation - Figure 1 图1:对象存储通过字符串键存储任意字节。

与文件系统不同,对象存储通常没有显式的目录概念,最佳实践是使用ASCII的子集作为键。相反,通过使用带前缀的LIST操作实现路径式遍历,并将非法字符序列进行百分编码。

Rust Object Store Donation - Figure 2

图2:对象存储可以列出具有指定前缀的对象,这可以用来将文件分组在一起。在此示例中,请求带有前缀“/pictures/”的对象将返回所有 .jpg 对象,而请求前缀“/parquet/”的对象将返回所有 .parquet 对象。

在对象存储实现和本地文件系统中,一致地列出和遍历对象键中编码的准目录结构是常见的不满来源,因为不仅文件系统的行为与对象存储有很大不同,而且每个对象存储实现都有自己的怪癖。

拥有一个专注于易用、高性能、异步对象存储库,用惯用的Rust编写,可以让你不必担心这些细节,而是专注于你系统的逻辑。底层实现从应用程序代码中抽象出来,并可以在运行时轻松选择,允许同一二进制文件在多个云中运行。

这种灵活性也促进了本地开发,因为它允许对本地文件系统进行测试,甚至对内存存储进行测试,而无需任何额外的二进制文件,如 MinIO,并允许使用熟悉的工具,如 ls、cat 或您选择的文件浏览器。

如何使用它?

以下是一个简单的示例,用于查找远程对象存储中文件的零的数量

let object_store: Arc<dyn ObjectStore> = get_object_store();

    // list all objects in the "parquet" prefix (aka directory)                                                                                                                     
    let path: Path = "parquet".try_into().unwrap();
    let list_stream = object_store
        .list(Some(&path))
        .await
        .expect("Error listing files");

    // List all files in the store                                                                                                                                                  
    list_stream
        .map(|meta| async {
            let meta = meta.expect("Error listing");

            // fetch the bytes from object store                                                                                                                                    
            let stream = object_store
                .get(&meta.location)
                .await
                .unwrap()
                .into_stream();

            // Count the zeros                                                                                                                                                      
            let num_zeros = stream
                .map(|bytes| {
                    let bytes = bytes.unwrap();
                    bytes.iter().filter(|b| **b == 0).count()
                })
                .collect::<Vec<usize>>()
                .await
                .into_iter()
                .sum::<usize>();

            (meta.location.to_string(), num_zeros)
        })
        .collect::<FuturesOrdered<_>>()
        .await
        .collect::<Vec<_>>()
        .await
        .into_iter()
        .for_each(|i| println!("{} has {} zeros", i.0, i.1));
}

这将打印出类似以下内容

test_fixtures/parquet/1.parquet has 174 zeros
test_fixtures/parquet/2.parquet has 53 zeros

按原样编写的代码按分页方式列出文件并并行获取其内容。如果文件数量达到数千,这可能不是很好。然而,我们可以很容易地利用Rust流并改为

.collect::<FuturesOrdered<_>>()

改为

.buffered(10)

现在程序将限制为并行10个GET请求。

object_store crate最酷的部分是相同的代码适用于所有不同的对象存储,唯一改变的是 get_object_store 的定义。

要读取S3

fn get_object_store() -> Arc<dyn ObjectStore> {
    let s3 = AmazonS3Builder::new()
        .with_access_key_id(ACCESS_KEY_ID)
        .with_secret_access_key(SECRET_KEY)
        .with_region(REGION)
        .with_bucket_name(BUCKET_NAME)
        .build()
        .expect("error creating s3");

    Arc::new(s3)
}

要读取Azure

fn get_object_store() -> Arc<dyn ObjectStore> {
    let azure = MicrosoftAzureBuilder::new()
        .with_account(STORAGE_ACCOUNT)
        .with_access_key(ACCESS_KEY)
        .with_container_name(BUCKET_NAME)
        .build()
        .expect("error creating azure");

    Arc::new(azure)
}

要读取GCP

fn get_object_store() -> Arc<dyn ObjectStore> {
    let gcs = GoogleCloudStorageBuilder::new()
        .with_service_account_path(PATH_TO_SERVICE_ACCOUNT_JSON)
        .with_bucket_name(BUCKET_NAME)
        .build()
        .expect("error creating gcs");
    Arc::new(gcs)
}

要读取本地文件系统

fn get_object_store() -> Arc<dyn ObjectStore> {
    let local_fs =
        LocalFileSystem::new_with_prefix(PREFIX)
          .expect("Error creating local file system");
    Arc::new(local_fs)
}

重申,主要好处是您无需为不同的对象存储集成不同的抽象 - 客户端代码始终相同,并且底层使用适当的优化实现。

《object_store》库也具有可扩展性,允许集成其他对象存储系统,同时仍然可以读取本地文件系统中的文件,利用某些系统提供的优化文件访问功能——请参阅GetFileResult

更完整且可工作的示例可以在rust_object_store_demo仓库中找到。

为什么捐赠Apache

Rust的梦想是拥有Python或Ruby的开发生产率,以及C/C++的速度和内存效率。实现这一梦想的部分是确保它能够轻松地与更广泛的技术生态系统集成,在现代分析系统中,这意味着对象存储中的数据。

因此,让Rust程序轻松且高效地读取和写入对象存储(AWS、S3、GCP)中的数据非常重要。有一些库实现了云提供商特定的SDK,例如rusoto_s3Azure_storage;然而,通过相同的接口访问最常用的功能集通常是加速跨云分析系统开发所需。这个库明确不是为了取代完整的云SDK,而是提供一个跨不同底层实现的统一对象存储抽象。

当我们的目标是开发influxdb_iox时,我们恰好有这个需求。InfluxDB和InfluxData Cloud运行在AWS、GCP、Azure和本地环境中,我们也需要IOx也这样做。我们没有找到满足我们需求的现有库,因此InfluxData IOx团队在我们的项目中开发了一个。

这个努力最初是由Rust生态系统传奇人物Carol(Nichols II Goulding)@carols10cents(《Rust Book》的主要作者)实施的,并由Marco NeumannRaphael Taylor-Davies大量扩展,我们在将其集成到DataFusion的过程中进行了这些工作。

IOx使用了Rust、Apache Arrow、Apache Parquet和DataFusion项目,我们也为这些项目做出了大量贡献,而且越来越重要的是,IOx与对象存储的交互需要通过DataFusion进行高效处理。在调查替代方案时,我们发现这需要与对象存储进行更深入的集成。

我们希望这次捐赠将进一步加快在Rust中创建高质量分析系统的速度,并且迫不及待地想看看社区将用它来做什么!我们特别希望与Apache Arrow的对齐能够允许与可以轻松高效读取箭头兼容文件(如parquet、CSV和换行符分隔的JSON)的库进行优雅的集成,这些文件可以从本地或远程对象存储中直接读取。对于希望具有SQL或其他高级查询引擎功能的程序,请查看Apache Arrow DataFusion

您可以在这个GitHub问题以及这个问题中了解更多关于捐赠及其理由的信息。

接下来是什么

在短期内,我们计划与parquet库更好地集成。特别是异步parquet读取器是专门为具有通用object_store crate的开发而设计的。它目前支持投影和行组级别谓词下推,以最小化从对象存储中获取的数据,并且预计在2022年8月22日发布的下一个版本中将支持页和行级别谓词下推。

我们预计将继续改进与Apache Arrow DataFusion的集成,确保它为从对象存储查询数据提供一流的支持,有效地解耦I/O和CPU密集型工作,并充分利用现代多核处理器。

最后,我们正在努力摆脱对rusoto和Rust的Azure SDK等大型SDK的依赖。虽然它们为我们提供了很好的服务,但摆脱它们将大大减少依赖负担,简化实现,并进一步提高各种实现的一致性。

加入社区

我们认为一个繁荣的社区可以推动每个人前进。我们鼓励您查看crate,并帮助我们!在您的项目中尝试它,并告诉我们效果如何,或者在这里找到我们。这里有一份为新成员准备的优秀开放任务列表这里

赞赏

感谢Raphael Taylor-DaviesPaul DixNga TranMarco Neumann审阅了本文件的早期版本,并贡献了许多改进。