Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): Scan and read datasets from remote object stores (parquet only) #11256

Closed
wants to merge 9 commits into from
3 changes: 3 additions & 0 deletions crates/polars-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pub enum PolarsError {
StringCacheMismatch(ErrString),
#[error("field not found: {0}")]
StructFieldNotFound(ErrString),
#[error("generic error: {0}")]
Generic(Box<dyn Error + Send + Sync>),
}

impl From<ArrowError> for PolarsError {
Expand Down Expand Up @@ -113,6 +115,7 @@ impl PolarsError {
ShapeMismatch(msg) => ShapeMismatch(func(msg).into()),
StringCacheMismatch(msg) => StringCacheMismatch(func(msg).into()),
StructFieldNotFound(msg) => StructFieldNotFound(func(msg).into()),
Generic(err) => ComputeError(func(&format!("IO: {err}")).into()),
}
}
}
Expand Down
30 changes: 23 additions & 7 deletions crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ polars-utils = { workspace = true }

ahash = { workspace = true }
arrow = { workspace = true }
async-trait = { version = "0.1.59", optional = true }
async-trait = { version = "0.1.59" }
bytes = { version = "1.3" }
chrono = { workspace = true, optional = true }
chrono-tz = { workspace = true, optional = true }
fast-float = { version = "0.2", optional = true }
flate2 = { version = "1", optional = true, default-features = false }
futures = { workspace = true, optional = true }
futures = { workspace = true }
glob = { version = "0.3" }
itertools = { version = "0" }
itoa = { workspace = true, optional = true }
lexical = { version = "6", optional = true, default-features = false, features = ["std", "parse-integers"] }
lexical-core = { workspace = true, optional = true }
Expand All @@ -33,16 +35,30 @@ memmap = { package = "memmap2", version = "0.7" }
num-traits = { workspace = true }
object_store = { workspace = true, optional = true }
once_cell = { workspace = true }
percent-encoding = { version = "2.3" }
rayon = { workspace = true }
regex = { workspace = true }
ryu = { workspace = true, optional = true }
serde = { workspace = true, features = ["derive"], optional = true }
serde_json = { version = "1", default-features = false, features = ["alloc", "raw_value"], optional = true }
simd-json = { workspace = true, optional = true }
simdutf8 = { workspace = true, optional = true }
tokio = { version = "1.26", features = ["net"], optional = true }
tokio-util = { version = "0.7.8", features = ["io", "io-util"], optional = true }
url = { workspace = true, optional = true }
tokio = { version = "1.26", features = ["net", "io-util", "rt-multi-thread"] }
tokio-util = { version = "0.7.8", features = ["io", "io-util"] }
url = { workspace = true }

[dependencies.opendal]
version = "0.40"
default-features = false
features = [
"services-azblob",
"services-azdls",
"services-gcs",
"services-s3",
"services-http",
"services-fs",
"services-webhdfs",
]

[target.'cfg(not(target_family = "wasm"))'.dependencies]
home = "0.5.4"
Expand Down Expand Up @@ -90,8 +106,8 @@ dtype-decimal = ["polars-core/dtype-decimal"]
fmt = ["polars-core/fmt"]
lazy = []
parquet = ["polars-core/parquet", "arrow/io_parquet", "arrow/io_parquet_compression"]
async = ["async-trait", "futures", "tokio", "tokio-util", "arrow/io_ipc_write_async", "polars-error/regex"]
cloud = ["object_store", "async", "polars-error/object_store", "url"]
async = ["arrow/io_ipc_write_async", "polars-error/regex"]
cloud = ["object_store", "async", "polars-error/object_store"]
aws = ["object_store/aws", "cloud"]
azure = ["object_store/azure", "cloud"]
gcp = ["object_store/gcp", "cloud"]
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/csv/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ static BOOLEAN_RE: Lazy<Regex> = Lazy::new(|| {
});

/// Infer the data type of a record
fn infer_field_schema(string: &str, try_parse_dates: bool) -> DataType {
pub(crate) fn infer_field_schema(string: &str, try_parse_dates: bool) -> DataType {
// when quoting is enabled in the reader, these quotes aren't escaped, we default to
// Utf8 for them
if string.starts_with('"') {
Expand Down
91 changes: 91 additions & 0 deletions crates/polars-io/src/input/file_format/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use std::fmt::Debug;
use std::sync::Arc;

use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use opendal::Operator;
use polars_core::prelude::{DataFrame, PlHashMap, Schema};
use polars_error::PolarsResult;

use crate::input::file_listing::ObjectListingUrl;
use crate::input::try_blocking_io;
use crate::predicates::PhysicalIoExpr;

#[cfg(feature = "parquet")]
pub mod parquet;

pub type ObjectInfo = (String, Schema, (Option<usize>, usize));

pub trait FileFormatOptions {}

/// The number of objects to read in parallel when inferring schema
const SCHEMA_INFERENCE_CONCURRENCY: usize = 32;

pub type DynFileFormat = dyn FileFormat;

#[async_trait]
pub trait FileFormat: std::fmt::Display + Send + Sync + Debug + 'static {
/// To instantiate
fn create() -> Self;

/// Uses a size hint obtained from the reader to produce,
/// - Known row count (may or may not be known)
/// - Estimated row count (can be calculated from reader hints)
fn calculate_rows_count(
&self,
reader_size_hint: (usize, Option<usize>),
) -> (Option<usize>, usize);

/// Globs object info (path, schema, size_hint) for a given set of options and
/// a base [ObjectListingUrl].
/// Operator to connect to the remote store is inferred internally.
///
/// This is a sync API but runs the tasks for each child in an async manner internally
/// and blocks till all tasks are successfully completed.
fn glob_object_info(
&self,
listing_url: ObjectListingUrl,
cloud_opts: PlHashMap<String, String>,
exclude_empty: bool,
recursive: bool,
) -> PolarsResult<Vec<ObjectInfo>> {
try_blocking_io(async {
let url = listing_url.clone();
let operator = url
.infer_operator(cloud_opts)
.expect("failed to create an operator for remote store");

let objects = url
.glob_object_list(&operator, "", exclude_empty, recursive)
.await
.expect("failed to glob objects from remote store");

futures::stream::iter(objects)
.map(|(path, _)| async { self.get_object_info(&operator, path).await })
.buffer_unordered(SCHEMA_INFERENCE_CONCURRENCY)
.try_collect::<Vec<_>>()
.await
.expect("failed to get info for one or more objects")
})
}

/// Fetches metadata of an object from the provided `path` and returns the results as
/// object info (path, schema, size_hint).
///
/// The [Schema] is inferred from the format specific metadata.
async fn get_object_info(&self, operator: &Operator, path: String) -> PolarsResult<ObjectInfo>;

fn finish_read(
&self,
_n_rows: Option<usize>,
_columns: Option<Vec<String>>,
_predicate: Option<Arc<dyn PhysicalIoExpr>>,
_projection: Option<Vec<usize>>,
) -> PolarsResult<DataFrame> {
todo!()
}

fn get_batches(&self) -> PolarsResult<Vec<DataFrame>> {
todo!()
}
}
74 changes: 74 additions & 0 deletions crates/polars-io/src/input/file_format/parquet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use std::fmt::{Display, Formatter};

use arrow::io::parquet::read::FileMetaData;
use async_trait::async_trait;
use futures::Stream;
use opendal::Operator;
use polars_core::schema::Schema;
use polars_error::{to_compute_err, PolarsResult};

use crate::file_format::ObjectInfo;
use crate::input::file_format::FileFormat;

#[derive(Debug)]
pub struct ParquetFormat {}

impl ParquetFormat {
/// Read and parse the schema of the Parquet file at location `path`
async fn fetch_metadata_async(
&self,
operator: &Operator,
path: impl AsRef<str>,
) -> PolarsResult<(FileMetaData, (usize, Option<usize>))> {
let mut reader = operator
.reader(path.as_ref())
.await
.map_err(to_compute_err)?;

let metadata = arrow::io::parquet::read::read_metadata_async(&mut reader).await?;
Ok((metadata, reader.size_hint()))
}

/// Parses the schema of the Parquet file from FileMetadata
fn infer_schema(&self, metadata: FileMetaData) -> PolarsResult<Schema> {
let arrow_schema =
arrow::io::parquet::read::infer_schema(&metadata).map_err(to_compute_err)?;
let polars_schema = Schema::from_iter(arrow_schema.clone().fields.iter());

Ok(polars_schema)
}
}

impl Display for ParquetFormat {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "ParquetFormat()")
}
}

#[async_trait]
impl FileFormat for ParquetFormat {
/// Construct a new Format with no local overrides
fn create() -> Self {
Self {}
}

/// Uses a size hint obtained from the reader to produce,
/// - Known row count (may or may not be known)
/// - Estimated row count (can be calculated from reader hints)
fn calculate_rows_count(
&self,
_reader_size_hint: (usize, Option<usize>),
) -> (Option<usize>, usize) {
let (estimated, known) = _reader_size_hint;
(known, estimated)
}

async fn get_object_info(&self, operator: &Operator, path: String) -> PolarsResult<ObjectInfo> {
let (metadata, _) = self.fetch_metadata_async(operator, path.clone()).await?;
let num_rows = &metadata.num_rows;
let size_hint = self.calculate_rows_count((*num_rows, Some(*num_rows)));
let polars_schema = self.infer_schema(metadata.clone())?;

Ok((path.to_string(), polars_schema, size_hint))
}
}
Loading