-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid repeated open
for one single file and simplify object reader API on the sync
part
#1905
Conversation
sync
partopen
for one single file and simplify object reader API on the sync
part
open
for one single file and simplify object reader API on the sync
partopen
for one single file and simplify object reader API on the sync
part
) -> Result<Box<dyn Read + Send + Sync>> { | ||
// A new file descriptor is opened for each chunk reader. | ||
// This okay because chunks are usually fairly large. | ||
let mut file = File::open(&self.file.path)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the original purpose of this PR. @liukun4515 initially found the HDFSObjectStore with many unnecessary opens. Same here for the local store, but more expensive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I probably misunderstand something here and I am sorry I don't quite follow all the comments on this PR.
If the issue you are trying to solve is that File::open
is called too often, would it be possible to "memoize" the open here with a mutex inside of the FileReader?
Something like
struct LocalFileReader {
...
/// Keep the open file descriptor to avoid reopening it
cache: Mutex<Option<Box<dyn Read + Send + Sync + Clone>>>
}
impl LocalFileReader {
...
fn sync_chunk_reader(
&self,
start: u64,
length: usize,
) -> Result<Box<dyn Read + Send + Sync>> {
let mut cache = self.cache.lock();
if let Some(cache) = cache {
return Ok(cache.clone())
};
*cache = File::open(...);
return cache.clone();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the original problem is about too much open, and we do solve it in our HDFS object store implementations similar to your suggestion above.
However, as I think more of the ObjectReader API and its use, I think I've brought in one extra layer of abstraction, the "chunk" reader layer, into ObjectReader without benefits. I prefer the chunk reader
is only Parquet related, and object readers should only care about like seeks and reads.
Therefore the current PR stops creating new readers from ObjectReader
, but directly reads in the ObjectReader
itself. And If we are seeking an ability to fetch multi parts from a parquet file simultaneously, we can utilize the struct PartitionedFile
.
/// A single file that should be read, along with its schema, statistics
/// and partition column values that need to be appended to each row.
pub struct PartitionedFile {
/// Path for the file (e.g. URL, filesystem path, etc)
pub file_meta: FileMeta,
/// Values of partition columns to be appended to each row
pub partition_values: Vec<ScalarValue>,
// We may include row group range here for a more fine-grained parallel execution
}
We could have a max_bytes_per_partition
configuration during query planing, combine multiple parquet files into one partition (like we do now), or split a large parquet file into many ranges, and have each partition handle only several ranges. with the help of apache/arrow-rs#158.
And the last several comments are about how to avoid Mutex
from the ObjectReaderWrapper
. Since we read parquet file sequentially in a partition, Mutex may incur unnecessary overhead. Just have to write like this way to achieve interior mutability since ChunkReader API in parquet-rs needs so:
pub trait ChunkReader: Length + Send + Sync {
type T: Read + Send;
/// get a serialy readeable slice of the current reader
/// This should fail if the slice exceeds the current bounds
fn get_read(&self, start: u64, length: usize) -> Result<Self::T>; // &self as well as send imposes the need for interior mutability
}
/// Get reader for the entire file | ||
fn sync_reader(&self) -> Result<Box<dyn Read + Send + Sync>> { | ||
self.sync_chunk_reader(0, self.length() as usize) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @yjshen! Do you have a pointer to that discussion so I can get the full context? also, why do you say that it introduces "irrelevant file format details". A chunk is a chunk, it can apply to any file format 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @rdettai long time no see! I think the chunk is only a term from parquet reader implementation. And we are always using chunks from one same file sequentially in DataFusion.
Cc @alamb @matthewmturner @seddonm1 @tustvold @rdettai to gather more input as this PR may affect your work. |
the API change you suggest here is not just a simplification, it breaks the way we can currently parallelize the ObjectStore. You are replacing |
Yes, I'm aware of parallelizing ability the current API exposed out. However, it's hard to express or utilize in the current execution plan: how should I trigger the parallel chunk fetch while maintaining a single-partition sterilization read? Instead, we have /// A single file that should be read, along with its schema, statistics
/// and partition column values that need to be appended to each row.
pub struct PartitionedFile {
/// Path for the file (e.g. URL, filesystem path, etc)
pub file_meta: FileMeta,
/// Values of partition columns to be appended to each row
pub partition_values: Vec<ScalarValue>,
// We may include row group range here for a more fine-grained parallel execution
} For example, by enabling parquet scan with row groups ability apache/arrow-rs#158, we could utilize the above PartitionedFile's last comment with real ranges when we want a finer-grained fetch and execution. And to control the parallelism of FileSan execution, we could tune a Each By controlling |
just to be sure I understand you correctly: if we used In the case we don't want one |
I think I may also need to impl ChunkReader for ObjectReaderWrapper {
type T = Self;
fn get_read(&self, start: u64, length: usize) -> ParquetResult<Self::T> In order to change ChunkReader to |
I also think we should try to avoid forcing mutex at the api level to avoid its overheads.
What if we only use Arc and call |
I'm afraid I've not had time to follow along here, but I have a draft PR that moves to an async parquet reader and doesn't use ChunkReader. So if that part is causing problems, I can maybe look to get that PR in sometime next week? |
I cannot do pub trait ChunkReader: Length + Send + Sync
I'm not sure I could remove Sync from ChunkReader, but I can try it first. |
If @tustvold can help get rid of the chunkreader in the parquet crate in the short run, then I think that would be the best route forward.
Do we really need RefCell here? Could we just use |
I'm afraid not. 😬 pub fn get_mut(this: &mut Self) -> Option<&mut T> { but not the case in ChunkReader: pub trait ChunkReader: Length + Send + Sync {
type T: Read + Send;
/// get a serialy readeable slice of the current reader
/// This should fail if the slice exceeds the current bounds
fn get_read(&self, start: u64, length: usize) -> Result<Self::T>;
}
Even if I remove Sync from ChunkReader, I still need |
FWIW I don't have a strong opinion on this one way or the other (as we have our own ObjectStore reader implementation in IOx for the time being). Maybe @tustvold has some thoughts |
I intend to pick up #1617 tomorrow and so I might have some more thoughts then, but some initial thoughts:
In short I think this is working around a slightly funky API exposed by the parquet crate, and to be honest I'd rather fix this API and/or move to the async reader API which doesn't share the same funky. |
Thanks @tustvold for the detailed analysis. ❤️ We already have a workaround for the repeated open issue in the HDFS object store. And I'm changing the object reader API here to avoid future object reader implementations falling into the pitfall of repeated open unintentionally. I really like the idea of getting rid of /// Get reader for a part [start, start + length] in the file
fn sync_chunk_reader(
&self,
start: u64,
length: usize,
) -> Result<Box<dyn Read + Send + Sync>>; totally since it's misuse-prone and only used by Parquet exec. |
Which issue does this PR close?
Closes #.
Rationale for this change
chunk
semantic introduced in ObjectReader introduces irrelevant file format details to object stores and incurs needless complexity.What changes are included in this PR?
ObjectReader
API simplification.open
for one single file in the LocalObjectStore.Are there any user-facing changes?
Yes. The
ObjectReader
API.