Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid repeated open for one single file and simplify object reader API on the sync part #1905

Closed
wants to merge 6 commits into from

Conversation

yjshen
Copy link
Member

@yjshen yjshen commented Mar 2, 2022

Which issue does this PR close?

Closes #.

Rationale for this change

  • The extra chunk semantic introduced in ObjectReader introduces irrelevant file format details to object stores and incurs needless complexity.
  • Repeated open one single file brings unnecessary overhead.

What changes are included in this PR?

  • ObjectReader API simplification.
  • Avoid repeated open for one single file in the LocalObjectStore.

Are there any user-facing changes?

Yes. The ObjectReader API.

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels Mar 2, 2022
@yjshen yjshen changed the title Avoid expensive open for one single file and simplify object reader API on the sync part Avoid expensive open for one single file and simplify object reader API on the sync part Mar 2, 2022
@yjshen yjshen changed the title Avoid expensive open for one single file and simplify object reader API on the sync part Avoid repeated open for one single file and simplify object reader API on the sync part Mar 2, 2022
) -> Result<Box<dyn Read + Send + Sync>> {
// A new file descriptor is opened for each chunk reader.
// This okay because chunks are usually fairly large.
let mut file = File::open(&self.file.path)?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the original purpose of this PR. @liukun4515 initially found the HDFSObjectStore with many unnecessary opens. Same here for the local store, but more expensive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably misunderstand something here and I am sorry I don't quite follow all the comments on this PR.

If the issue you are trying to solve is that File::open is called too often, would it be possible to "memoize" the open here with a mutex inside of the FileReader?

Something like

struct LocalFileReader { 
...
    /// Keep the open file descriptor to avoid reopening it
   cache: Mutex<Option<Box<dyn Read + Send + Sync + Clone>>>
}

impl LocalFileReader { 
...
    fn sync_chunk_reader(
        &self,
        start: u64,
        length: usize,
    ) -> Result<Box<dyn Read + Send + Sync>> {
    let mut cache = self.cache.lock();
    if let Some(cache) = cache {
      return Ok(cache.clone())
    };
    *cache = File::open(...);
    return cache.clone();
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the original problem is about too much open, and we do solve it in our HDFS object store implementations similar to your suggestion above.

However, as I think more of the ObjectReader API and its use, I think I've brought in one extra layer of abstraction, the "chunk" reader layer, into ObjectReader without benefits. I prefer the chunk reader is only Parquet related, and object readers should only care about like seeks and reads.

Therefore the current PR stops creating new readers from ObjectReader, but directly reads in the ObjectReader itself. And If we are seeking an ability to fetch multi parts from a parquet file simultaneously, we can utilize the struct PartitionedFile.

/// A single file that should be read, along with its schema, statistics
/// and partition column values that need to be appended to each row.
pub struct PartitionedFile {
    /// Path for the file (e.g. URL, filesystem path, etc)
    pub file_meta: FileMeta,
    /// Values of partition columns to be appended to each row
    pub partition_values: Vec<ScalarValue>,
    // We may include row group range here for a more fine-grained parallel execution
}

We could have a max_bytes_per_partition configuration during query planing, combine multiple parquet files into one partition (like we do now), or split a large parquet file into many ranges, and have each partition handle only several ranges. with the help of apache/arrow-rs#158.

And the last several comments are about how to avoid Mutex from the ObjectReaderWrapper . Since we read parquet file sequentially in a partition, Mutex may incur unnecessary overhead. Just have to write like this way to achieve interior mutability since ChunkReader API in parquet-rs needs so:

pub trait ChunkReader: Length + Send + Sync {
    type T: Read + Send;
    /// get a serialy readeable slice of the current reader
    /// This should fail if the slice exceeds the current bounds
    fn get_read(&self, start: u64, length: usize) -> Result<Self::T>;   //  &self as well as send imposes the need for interior mutability
}

/// Get reader for the entire file
fn sync_reader(&self) -> Result<Box<dyn Read + Send + Sync>> {
self.sync_chunk_reader(0, self.length() as usize)
}
Copy link
Member Author

@yjshen yjshen Mar 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussions with @houqp and @richox, we agreed that the extra chunk semantic introduced in ObjectReader introduces irrelevant file format details to object stores and incurs needless complexity. Therefore the API simplifications.

Copy link
Contributor

@rdettai rdettai Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @yjshen! Do you have a pointer to that discussion so I can get the full context? also, why do you say that it introduces "irrelevant file format details". A chunk is a chunk, it can apply to any file format 😄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @rdettai long time no see! I think the chunk is only a term from parquet reader implementation. And we are always using chunks from one same file sequentially in DataFusion.

@yjshen
Copy link
Member Author

yjshen commented Mar 2, 2022

Cc @alamb @matthewmturner @seddonm1 @tustvold @rdettai to gather more input as this PR may affect your work.

@rdettai
Copy link
Contributor

rdettai commented Mar 3, 2022

the API change you suggest here is not just a simplification, it breaks the way we can currently parallelize the ObjectStore. You are replacing fn sync_reader(&self) -> Result<Box<dyn Read + Send + Sync>> which takes an immutable ref and can thus be called in parallel from different threads into fn set_limit(&mut self, limit: usize) which requires an exclusive reference and a Mutex lock that blocks the entire ObjectReader at each operation.

@yjshen
Copy link
Member Author

yjshen commented Mar 3, 2022

Yes, I'm aware of parallelizing ability the current API exposed out. However, it's hard to express or utilize in the current execution plan: how should I trigger the parallel chunk fetch while maintaining a single-partition sterilization read? Instead, we have PartitionedFile abstraction that can be extended with file slicing ability.

/// A single file that should be read, along with its schema, statistics
/// and partition column values that need to be appended to each row.
pub struct PartitionedFile {
    /// Path for the file (e.g. URL, filesystem path, etc)
    pub file_meta: FileMeta,
    /// Values of partition columns to be appended to each row
    pub partition_values: Vec<ScalarValue>,
    // We may include row group range here for a more fine-grained parallel execution
}

For example, by enabling parquet scan with row groups ability apache/arrow-rs#158, we could utilize the above PartitionedFile's last comment with real ranges when we want a finer-grained fetch and execution. And to control the parallelism of FileSan execution, we could tune a max_byte_per_partition configuration and partition all input files into Vec<Vec<PartitionedFile>.

Each Vec<PartitionedFile> could be summed up to the max_byte_per_partition size, from many individual parquet files, or one big slice from one big parquet file.

By controlling max_byte_per_partition, we could still achieve the parallel fetch of file chunks as you mentioned, if users choose a smaller partition input data size. Or avoid unexpected repeated opening of files, at each row-group, each column times.

@rdettai
Copy link
Contributor

rdettai commented Mar 3, 2022

just to be sure I understand you correctly: if we used PartitionedFile for the file slicing, you would obtain parallelism by creating multiple ObjectReader instances?

In the case we don't want one ObjectReader instance to be distributed accross threads, file_reader() should stop returning Arc<dyn ObjectReader> and return an owned object (Box<dyn ObjectReader>). This would allow you to cut out the mutex.

@yjshen
Copy link
Member Author

yjshen commented Mar 3, 2022

I think I may also need to tune chunk_reader in parquet a little bit, since it also imposes immutability:

impl ChunkReader for ObjectReaderWrapper {
    type T = Self;

    fn get_read(&self, start: u64, length: usize) -> ParquetResult<Self::T>

In order to change ChunkReader to &mut self to cut of Mutex, it seems I need to change the full code path along parquet reading. Since they all share an immutability interface.

@houqp houqp added the api change Changes the API exposed to users of the crate label Mar 5, 2022
@houqp
Copy link
Member

houqp commented Mar 5, 2022

I also think we should try to avoid forcing mutex at the api level to avoid its overheads.

In order to change ChunkReader to &mut self to cut of Mutex, it seems I need to change the full code path along parquet reading. Since they all share an immutability interface.

What if we only use Arc and call get_mut? This is similar to what's happening with FileSource in the parquet crate. In the long run though, I feel like it might be worth looking into getting rid of the ChunkReader abstraction in the Parquet crate entirely and replace with Read + Seek.

@tustvold
Copy link
Contributor

tustvold commented Mar 5, 2022

I'm afraid I've not had time to follow along here, but I have a draft PR that moves to an async parquet reader and doesn't use ChunkReader. So if that part is causing problems, I can maybe look to get that PR in sometime next week?

@yjshen
Copy link
Member Author

yjshen commented Mar 5, 2022

I cannot do Arc<RefCell<dyn ObjectReader>> since RefCell is not sync. Which is imposed by ChunkReader Arc:

pub trait ChunkReader: Length + Send + Sync 
343 | impl ChunkReader for ObjectReaderWrapper {
    |      ^^^^^^^^^^^ `RefCell<(dyn ObjectReader + 'static)>` cannot be shared between threads safely
    |
    = help: the trait `Sync` is not implemented for `RefCell<(dyn ObjectReader + 'static)>`
    = note: required because of the requirements on the impl of `Sync` for `Arc<RefCell<(dyn ObjectReader + 'static)>>`

I'm not sure I could remove Sync from ChunkReader, but I can try it first.

@houqp
Copy link
Member

houqp commented Mar 5, 2022

If @tustvold can help get rid of the chunkreader in the parquet crate in the short run, then I think that would be the best route forward.

I cannot do Arc<RefCell> since RefCell is not sync. Which is imposed by ChunkReader:

Do we really need RefCell here? Could we just use Arc::get_mut instead? But if you can get rid of the Sync requirement, that's even better :P

@yjshen
Copy link
Member Author

yjshen commented Mar 5, 2022

I'm afraid not. 😬
Arc::get_mut needs self to be a mutable reference.

    pub fn get_mut(this: &mut Self) -> Option<&mut T> {

but not the case in ChunkReader:

pub trait ChunkReader: Length + Send + Sync {
    type T: Read + Send;
    /// get a serialy readeable slice of the current reader
    /// This should fail if the slice exceeds the current bounds
    fn get_read(&self, start: u64, length: usize) -> Result<Self::T>;
}

get_read &self is immutable

Even if I remove Sync from ChunkReader, I still need Arc<Mutex<dyn ObjectReader>> which is required by Send.

@alamb
Copy link
Contributor

alamb commented Mar 7, 2022

FWIW I don't have a strong opinion on this one way or the other (as we have our own ObjectStore reader implementation in IOx for the time being). Maybe @tustvold has some thoughts

@tustvold
Copy link
Contributor

tustvold commented Mar 8, 2022

I intend to pick up #1617 tomorrow and so I might have some more thoughts then, but some initial thoughts:

In short I think this is working around a slightly funky API exposed by the parquet crate, and to be honest I'd rather fix this API and/or move to the async reader API which doesn't share the same funky.

@yjshen
Copy link
Member Author

yjshen commented Mar 9, 2022

Thanks @tustvold for the detailed analysis. ❤️

We already have a workaround for the repeated open issue in the HDFS object store. And I'm changing the object reader API here to avoid future object reader implementations falling into the pitfall of repeated open unintentionally.

I really like the idea of getting rid of ChunkReader APIs and using an async parquet exec. I expect we could also achieve file-handle reuse for the async reading path on top of tokio async io. And I think we could remove this API:

    /// Get reader for a part [start, start + length] in the file
    fn sync_chunk_reader(
        &self,
        start: u64,
        length: usize,
    ) -> Result<Box<dyn Read + Send + Sync>>;

totally since it's misuse-prone and only used by Parquet exec.

@yjshen yjshen closed this Mar 9, 2022
@yjshen yjshen deleted the object_store branch March 9, 2022 08:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants