-
Notifications
You must be signed in to change notification settings - Fork 881
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Arrow-csv reader cannot produce RecordBatch even if the bytes are necessary #3674
Comments
I'm confused by this, what happens if there are less than batch size available or more? This feels like it just slightly changes the buffering behaviour, which isn't really guaranteed. Not saying we can't change this, but I'd like to understand the issue better. It almost feels like the deadlock is the fault of an overly restrictive test? |
I intentionally make a test like this, to make it more clear. I think waiting for the next byte to produce a RecordBatch even if we have the necessary bytes is avoidable. We use this test pattern for testing stream pipelines. We test "If I give a batch, can I get a batch as output?", but testing with actual files becomes "If I give a batch_size + 1 row, can I get a batch as output?". |
Deadlock doesn't seem to be the right word. However, the current behavior can result in unnecessarily long latencies if data comes in chunks aligned with batch boundaries. This is an edge case, but when it happens, it becomes a problem in streaming use cases. Thankfully, it is easily avoidable. |
I wonder if the latency can be reduces by calling https://docs.rs/arrow-csv/32.0.0/arrow_csv/reader/struct.Decoder.html#method.flush Perhaps something like let mut next = move || {
loop {
// force flush to produce RecordBatches if we have fed the entire input
// that is available and are sure the data has only complete rows
let decoded = if check_have_read_to_boundary() {
decoder.flush()
decoder.decode(buf)
} else {
let buf = reader.fill_buf()?;
decoder.decode(buf);
}?;
if decoded == 0 {
break;
}
reader.consume(decoded);
}
decoder.flush()
}; |
Would you be able to test out #3677 and see if it meets your requirements, if so I can polish it up with some tests, etc... |
Describe the bug
The bug in the
arrow-csv
reader of the arrow-rs library affects the reading of CSV files in a FIFO environment where an EOF is not received until the file is closed. The bug occurs because the code that reads the buffer is designed to wait for additional bytes, even if the batch size is set to the correct size.For example, if the batch size is set to 64 and 64 rows are provided to the reader, the decoder will have enough data to create a
RecordBatch
. However, when the loop iterates for the second time, the code waits for additional bytes atself.reader.fill_buf()?
, causing a deadlock. This prevents tests for streaming purposes from working, even though this was supported before the PR #3604.To Reproduce
nix = "0.26.2"
intodev-dependencies
arrow-csv/src/reader/records.rs
or any convenient place in arrow-csv.Expected behavior
RecordBatch
and finish.Additional context
NA
cc @alamb @tustvold
The text was updated successfully, but these errors were encountered: