diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 4f370894ace1..caef0b7a7eb7 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -47,7 +47,7 @@ clap = { version = "3", optional = true, features = ["derive", "env"] } serde_json = { version = "1.0", features = ["preserve_order"], optional = true } rand = "0.8" futures = { version = "0.3", optional = true } -tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "fs", "rt", "io-util"] } +tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "sync"] } [dev-dependencies] criterion = "0.3" diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index b8fafec1e7ce..6ba7e0decbe7 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -25,7 +25,7 @@ //! use arrow::record_batch::RecordBatch; //! use arrow::util::pretty::pretty_format_batches; //! use futures::TryStreamExt; -//! use tokio::fs::File; +//! use std::fs::File; //! //! use parquet::arrow::ParquetRecordBatchStreamBuilder; //! @@ -41,7 +41,7 @@ //! //! let testdata = arrow::util::test_util::parquet_test_data(); //! let path = format!("{}/alltypes_plain.parquet", testdata); -//! let file = tokio::fs::File::open(path).await.unwrap(); +//! let file = File::open(path).unwrap(); //! //! let builder = ParquetRecordBatchStreamBuilder::new(file) //! .await @@ -74,33 +74,26 @@ //! # } //! ``` -use std::collections::VecDeque; use std::fmt::Formatter; -use std::io::{Cursor, SeekFrom}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use byteorder::{ByteOrder, LittleEndian}; -use futures::future::{BoxFuture, FutureExt}; +use futures::future::FutureExt; +use futures::ready; use futures::stream::Stream; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use crate::arrow::array_reader::{build_array_reader, RowGroupCollection}; -use crate::arrow::arrow_reader::ParquetRecordBatchReader; use crate::arrow::schema::parquet_to_arrow_schema; -use crate::basic::Compression; -use crate::column::page::{PageIterator, PageReader}; +use crate::arrow::{ArrowReader, ParquetFileArrowReader}; use crate::errors::{ParquetError, Result}; -use crate::file::footer::parse_metadata_buffer; use crate::file::metadata::ParquetMetaData; -use crate::file::reader::SerializedPageReader; -use crate::file::PARQUET_MAGIC; -use crate::schema::types::{ColumnDescPtr, SchemaDescPtr}; -use crate::util::memory::ByteBufferPtr; +use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader}; +use crate::file::serialized_reader::{ReadOptions, ReadOptionsBuilder}; /// A builder used to construct a [`ParquetRecordBatchStream`] for a parquet file /// @@ -108,48 +101,56 @@ use crate::util::memory::ByteBufferPtr; /// to use this information to select what specific columns, row groups, etc... /// they wish to be read by the resulting stream /// -pub struct ParquetRecordBatchStreamBuilder { - input: T, - - metadata: Arc, - - schema: SchemaRef, - +pub struct ParquetRecordBatchStreamBuilder { batch_size: usize, - row_groups: Option>, + reader: SerializedFileReader, + + file_schema: SchemaRef, projection: Option>, } -impl ParquetRecordBatchStreamBuilder { +impl ParquetRecordBatchStreamBuilder { /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file - pub async fn new(mut input: T) -> Result { - let metadata = Arc::new(read_footer(&mut input).await?); + pub async fn new(input: T) -> Result { + Self::new_with_options(input, ReadOptionsBuilder::new().build()).await + } + + pub async fn new_with_options(input: T, options: ReadOptions) -> Result { + let maybe_reader = tokio::task::spawn_blocking(move || { + SerializedFileReader::new_with_options(input, options) + }) + .await; + + let reader = match maybe_reader { + Ok(maybe_reader) => maybe_reader?, + Err(e) => { + return Err(general_err!("error joining file creation task: {}", e)) + } + }; - let schema = Arc::new(parquet_to_arrow_schema( - metadata.file_metadata().schema_descr(), - metadata.file_metadata().key_value_metadata(), - )?); + let file_schema = parquet_to_arrow_schema( + reader.metadata().file_metadata().schema_descr(), + reader.metadata().file_metadata().key_value_metadata(), + )?; Ok(Self { - input, - metadata, - schema, + reader, batch_size: 1024, - row_groups: None, + file_schema: Arc::new(file_schema), projection: None, }) } /// Returns a reference to the [`ParquetMetaData`] for this parquet file - pub fn metadata(&self) -> &Arc { - &self.metadata + pub fn metadata(&self) -> &ParquetMetaData { + self.reader.metadata() } /// Returns the arrow [`SchemaRef`] for this parquet file pub fn schema(&self) -> &SchemaRef { - &self.schema + &self.file_schema } /// Set the size of [`RecordBatch`] to produce @@ -157,14 +158,6 @@ impl ParquetRecordBatchStreamBuilder { Self { batch_size, ..self } } - /// Only read data from the provided row group indexes - pub fn with_row_groups(self, row_groups: Vec) -> Self { - Self { - row_groups: Some(row_groups), - ..self - } - } - /// Only read data from the provided column indexes pub fn with_projection(self, projection: Vec) -> Self { Self { @@ -174,310 +167,83 @@ impl ParquetRecordBatchStreamBuilder { } /// Build a new [`ParquetRecordBatchStream`] - pub fn build(self) -> Result> { - let num_columns = self.schema.fields().len(); - let num_row_groups = self.metadata.row_groups().len(); - - let columns = match self.projection { + pub fn build(self) -> Result { + let (columns, schema) = match self.projection { Some(projection) => { - if let Some(col) = projection.iter().find(|x| **x >= num_columns) { - return Err(general_err!( - "column projection {} outside bounds of schema 0..{}", - col, - num_columns - )); - } - projection + let schema = self.file_schema.project(&projection)?; + (projection, Arc::new(schema)) } - None => (0..num_columns).collect::>(), + None => ( + (0..self.file_schema.fields().len()).collect::>(), + self.file_schema.clone(), + ), }; - let row_groups = match self.row_groups { - Some(row_groups) => { - if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) { - return Err(general_err!( - "row group {} out of bounds 0..{}", - col, - num_row_groups - )); + let (sender, receiver) = mpsc::channel(1); + let handle = tokio::task::spawn_blocking(move || { + let file_reader = Arc::new(self.reader); + let mut arrow_reader = ParquetFileArrowReader::new(file_reader); + let reader = + arrow_reader.get_record_reader_by_columns(columns, self.batch_size)?; + + for result in reader { + if let Err(_) = sender.blocking_send(result.map_err(Into::into)) { + // Receiver hung up + break; } - row_groups.into() } - None => (0..self.metadata.row_groups().len()).collect(), - }; + + Ok(()) + }); Ok(ParquetRecordBatchStream { - row_groups, - columns: columns.into(), - batch_size: self.batch_size, - metadata: self.metadata, - schema: self.schema, - input: Some(self.input), - state: StreamState::Init, + receiver, + schema, + handle, }) } } -enum StreamState { - /// At the start of a new row group, or the end of the parquet stream - Init, - /// Decoding a batch - Decoding(ParquetRecordBatchReader), - /// Reading data from input - Reading(BoxFuture<'static, Result<(T, InMemoryRowGroup)>>), - /// Error - Error, -} - -impl std::fmt::Debug for StreamState { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - StreamState::Init => write!(f, "StreamState::Init"), - StreamState::Decoding(_) => write!(f, "StreamState::Decoding"), - StreamState::Reading(_) => write!(f, "StreamState::Reading"), - StreamState::Error => write!(f, "StreamState::Error"), - } - } -} - /// An asynchronous [`Stream`] of [`RecordBatch`] for a parquet file -pub struct ParquetRecordBatchStream { - metadata: Arc, - +pub struct ParquetRecordBatchStream { schema: SchemaRef, - batch_size: usize, - - columns: Arc<[usize]>, - - row_groups: VecDeque, - - /// This is an option so it can be moved into a future - input: Option, + receiver: mpsc::Receiver>, - state: StreamState, + handle: JoinHandle>, } -impl std::fmt::Debug for ParquetRecordBatchStream { +impl std::fmt::Debug for ParquetRecordBatchStream { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("ParquetRecordBatchStream") - .field("metadata", &self.metadata) .field("schema", &self.schema) - .field("batch_size", &self.batch_size) - .field("columns", &self.columns) - .field("state", &self.state) .finish() } } -impl ParquetRecordBatchStream { +impl ParquetRecordBatchStream { /// Returns the [`SchemaRef`] for this parquet file pub fn schema(&self) -> &SchemaRef { &self.schema } } -impl Stream - for ParquetRecordBatchStream -{ +impl Stream for ParquetRecordBatchStream { type Item = Result; fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - loop { - match &mut self.state { - StreamState::Decoding(batch_reader) => match batch_reader.next() { - Some(Ok(batch)) => return Poll::Ready(Some(Ok(batch))), - Some(Err(e)) => { - self.state = StreamState::Error; - return Poll::Ready(Some(Err(ParquetError::ArrowError( - e.to_string(), - )))); - } - None => self.state = StreamState::Init, - }, - StreamState::Init => { - let row_group_idx = match self.row_groups.pop_front() { - Some(idx) => idx, - None => return Poll::Ready(None), - }; - - let metadata = self.metadata.clone(); - let mut input = match self.input.take() { - Some(input) => input, - None => { - self.state = StreamState::Error; - return Poll::Ready(Some(Err(general_err!( - "input stream lost" - )))); - } - }; - - let columns = Arc::clone(&self.columns); - - self.state = StreamState::Reading( - async move { - let row_group_metadata = metadata.row_group(row_group_idx); - let mut column_chunks = - vec![None; row_group_metadata.columns().len()]; - - for column_idx in columns.iter() { - let column = row_group_metadata.column(*column_idx); - let (start, length) = column.byte_range(); - let end = start + length; - - input.seek(SeekFrom::Start(start)).await?; - - let mut buffer = vec![0_u8; (end - start) as usize]; - input.read_exact(buffer.as_mut_slice()).await?; - - column_chunks[*column_idx] = Some(InMemoryColumnChunk { - num_values: column.num_values(), - compression: column.compression(), - physical_type: column.column_type(), - data: ByteBufferPtr::new(buffer), - }); - } - - Ok(( - input, - InMemoryRowGroup { - schema: metadata.file_metadata().schema_descr_ptr(), - column_chunks, - }, - )) - } - .boxed(), - ) - } - StreamState::Reading(f) => { - let result = futures::ready!(f.poll_unpin(cx)); - self.state = StreamState::Init; - - let row_group: Box = match result { - Ok((input, row_group)) => { - self.input = Some(input); - Box::new(row_group) - } - Err(e) => { - self.state = StreamState::Error; - return Poll::Ready(Some(Err(e))); - } - }; - - let parquet_schema = self.metadata.file_metadata().schema_descr_ptr(); - - let array_reader = build_array_reader( - parquet_schema, - self.schema.clone(), - self.columns.iter().cloned(), - row_group, - )?; - - let batch_reader = - ParquetRecordBatchReader::try_new(self.batch_size, array_reader) - .expect("reader"); - - self.state = StreamState::Decoding(batch_reader) - } - StreamState::Error => return Poll::Pending, - } + match ready!(self.receiver.poll_recv(cx)) { + Some(r) => Poll::Ready(Some(r)), + None => match ready!(self.handle.poll_unpin(cx)) { + Ok(r) => Poll::Ready(r.err().map(Err)), + Err(e) => Poll::Ready(Some(Err(general_err!( + "error joining ParquetRecordBatchStream background task: {}", + e + )))), + }, } } } - -async fn read_footer( - input: &mut T, -) -> Result { - input.seek(SeekFrom::End(-8)).await?; - - let mut buf = [0_u8; 8]; - input.read_exact(&mut buf).await?; - - if buf[4..] != PARQUET_MAGIC { - return Err(general_err!("Invalid Parquet file. Corrupt footer")); - } - - let metadata_len = LittleEndian::read_i32(&buf[..4]) as i64; - if metadata_len < 0 { - return Err(general_err!( - "Invalid Parquet file. Metadata length is less than zero ({})", - metadata_len - )); - } - - input.seek(SeekFrom::End(-8 - metadata_len)).await?; - - let mut buf = Vec::with_capacity(metadata_len as usize + 8); - input.read_to_end(&mut buf).await?; - - parse_metadata_buffer(&mut Cursor::new(buf)) -} - -struct InMemoryRowGroup { - schema: SchemaDescPtr, - column_chunks: Vec>, -} - -impl RowGroupCollection for InMemoryRowGroup { - fn schema(&self) -> Result { - Ok(self.schema.clone()) - } - - fn column_chunks(&self, i: usize) -> Result> { - let page_reader = self.column_chunks[i].as_ref().unwrap().pages(); - - Ok(Box::new(ColumnChunkIterator { - schema: self.schema.clone(), - column_schema: self.schema.columns()[i].clone(), - reader: Some(page_reader), - })) - } -} - -#[derive(Clone)] -struct InMemoryColumnChunk { - num_values: i64, - compression: Compression, - physical_type: crate::basic::Type, - data: ByteBufferPtr, -} - -impl InMemoryColumnChunk { - fn pages(&self) -> Result> { - let page_reader = SerializedPageReader::new( - Cursor::new(self.data.clone()), - self.num_values, - self.compression, - self.physical_type, - )?; - - Ok(Box::new(page_reader)) - } -} - -struct ColumnChunkIterator { - schema: SchemaDescPtr, - column_schema: ColumnDescPtr, - reader: Option>>, -} - -impl Iterator for ColumnChunkIterator { - type Item = Result>; - - fn next(&mut self) -> Option { - self.reader.take() - } -} - -impl PageIterator for ColumnChunkIterator { - fn schema(&mut self) -> Result { - Ok(self.schema.clone()) - } - - fn column_schema(&mut self) -> Result { - Ok(self.column_schema.clone()) - } -} diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 4c10d26fabfd..1a4059e06b6f 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -131,7 +131,7 @@ pub struct SerializedFileReader { /// For the predicates that are added to the builder, /// they will be chained using 'AND' to filter the row groups. pub struct ReadOptionsBuilder { - predicates: Vec bool>>, + predicates: Vec bool + Send>>, } impl ReadOptionsBuilder { @@ -144,7 +144,7 @@ impl ReadOptionsBuilder { /// Filter only row groups that match the predicate criteria pub fn with_predicate( mut self, - predicate: Box bool>, + predicate: Box bool + Send>, ) -> Self { self.predicates.push(predicate); self @@ -175,7 +175,7 @@ impl ReadOptionsBuilder { /// Currently, only predicates on row group metadata are supported. /// All predicates will be chained using 'AND' to filter the row groups. pub struct ReadOptions { - predicates: Vec bool>>, + predicates: Vec bool + Send>>, } impl SerializedFileReader {