Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow-csv reader cannot produce RecordBatch even if the bytes are necessary #3674

Closed
metesynnada opened this issue Feb 8, 2023 · 6 comments · Fixed by #3677
Closed

Arrow-csv reader cannot produce RecordBatch even if the bytes are necessary #3674

metesynnada opened this issue Feb 8, 2023 · 6 comments · Fixed by #3677
Labels
arrow Changes to the arrow crate bug

Comments

@metesynnada
Copy link
Contributor

metesynnada commented Feb 8, 2023

Describe the bug

The bug in the arrow-csv reader of the arrow-rs library affects the reading of CSV files in a FIFO environment where an EOF is not received until the file is closed. The bug occurs because the code that reads the buffer is designed to wait for additional bytes, even if the batch size is set to the correct size.

For example, if the batch size is set to 64 and 64 rows are provided to the reader, the decoder will have enough data to create a RecordBatch. However, when the loop iterates for the second time, the code waits for additional bytes at self.reader.fill_buf()?, causing a deadlock. This prevents tests for streaming purposes from working, even though this was supported before the PR #3604.

impl<R: BufRead> BufReader<R> {
    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
        loop {
            let buf = self.reader.fill_buf()?;
            let decoded = self.decoder.decode(buf)?;
            if decoded == 0 {
                break;
            }
            self.reader.consume(decoded);
        }

        self.decoder.flush()
    }
}

To Reproduce

  • Add nix = "0.26.2" into dev-dependencies
  • Copy and paste the code and run the code within arrow-csv/src/reader/records.rs or any convenient place in arrow-csv.
#[cfg(test)]
mod pr {
    use crate::ReaderBuilder;
    use arrow_array::RecordBatch;
    use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef};
    use nix::sys::stat;
    use nix::unistd;
    use std::fs::{File, OpenOptions};
    use std::io::BufRead;
    use std::io::BufReader as StdBufReader;
    use std::io::Write;
    use std::path::Path;
    use std::path::PathBuf;
    use std::sync::{Arc, Mutex};
    use std::thread;
    use std::time::{Duration, Instant};
    use tempfile::TempDir;

    fn create_fifo_file(
        tmp_dir: &TempDir,
        file_name: &str,
    ) -> Result<PathBuf, ArrowError> {
        let file_path = tmp_dir.path().join(file_name);
        if let Err(e) = unistd::mkfifo(&file_path, stat::Mode::S_IRWXU) {
            Err(ArrowError::CsvError(e.to_string()))
        } else {
            Ok(file_path)
        }
    }

    fn write_to_fifo(mut file: &File, line: &str) -> Result<usize, ArrowError> {
        file.write(line.as_bytes()).or_else(|e| {
            // Broken pipe error
            if e.raw_os_error().unwrap() == 32 {
                thread::sleep(Duration::from_millis(100));
                return Ok(0);
            }
            Err(ArrowError::CsvError(e.to_string()))
        })
    }

    fn read_from_csv<R: BufRead>(
        mut reader: R,
        schema: SchemaRef,
        batch_size: usize,
    ) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> {
        let mut decoder = ReaderBuilder::new()
            .with_schema(schema)
            .with_batch_size(batch_size)
            .build_decoder();
        let mut next = move || {
            loop {
                //Deadlock happens here since we are waiting for bytes to produce the first batch.
                let buf = reader.fill_buf()?;
                let decoded = decoder.decode(buf)?;
                if decoded == 0 {
                    break;
                }
                reader.consume(decoded);
            }
            decoder.flush()
        };
        Ok(std::iter::from_fn(move || next().transpose()))
    }

    const TEST_BATCH_SIZE: usize = 50;

    #[test]
    fn csv_reader_env() -> Result<(), ArrowError> {
        // We use a lock to wait for a batch creation
        let waiting = Arc::new(Mutex::new(true));
        let waiting_thread = waiting.clone();
        let tmp_dir = TempDir::new()?;
        let fifo_path = create_fifo_file(&tmp_dir, "fifo_file.csv")?;
        let fifo_path_thread = fifo_path.clone();
        let joinable_iterator = (0..TEST_BATCH_SIZE).map(|_| "a".to_string());
        let fifo_writer = thread::spawn(move || {
            let first_file = OpenOptions::new()
                .write(true)
                .open(fifo_path_thread)
                .unwrap();
            for (cnt, string_col) in joinable_iterator.enumerate() {
                let line = format!("{string_col},{cnt}\n").to_owned();
                write_to_fifo(&first_file, &line).unwrap();
            }
            // This part prevents that we get an EOF in FIFO.
            while *waiting_thread.lock().unwrap() {
                thread::sleep(Duration::from_millis(200));
            }
        });
        let schema = Arc::new(Schema::new(vec![
            Field::new("a1", DataType::Utf8, false),
            Field::new("a2", DataType::UInt32, false),
        ]));

        let file = File::open(fifo_path).unwrap();
        let reader = StdBufReader::new(file);

        let mut read = read_from_csv(reader, schema.clone(), TEST_BATCH_SIZE)?;

        while let Some(Ok(batch)) = read.next() {
            // If we get a batch, the lock will be false and the experiment can finish.
            *waiting.lock().unwrap() = false;
            println!("We get a record batch");
        }
        fifo_writer.join().unwrap();
        Ok(())
    }
}

Expected behavior

  • For reproduced code: Produce the RecordBatch and finish.
  • For the algorithm, it should support the producing RecordBatch immediately after the necessary bytes are received.

Additional context
NA

cc @alamb @tustvold

@metesynnada metesynnada added the bug label Feb 8, 2023
@tustvold
Copy link
Contributor

tustvold commented Feb 8, 2023

the code waits for additional bytes at self.reader.fill_buf()?, causing a deadlock

I'm confused by this, what happens if there are less than batch size available or more? This feels like it just slightly changes the buffering behaviour, which isn't really guaranteed. Not saying we can't change this, but I'd like to understand the issue better. It almost feels like the deadlock is the fault of an overly restrictive test?

@metesynnada
Copy link
Contributor Author

metesynnada commented Feb 8, 2023

I intentionally make a test like this, to make it more clear.

I think waiting for the next byte to produce a RecordBatch even if we have the necessary bytes is avoidable.

We use this test pattern for testing stream pipelines. We test "If I give a batch, can I get a batch as output?", but testing with actual files becomes "If I give a batch_size + 1 row, can I get a batch as output?".

@metesynnada metesynnada changed the title Deadlock in arrow-csv reader for FIFO file reading Arrow-csv reader cannot produce RecordBatch even if the bytes are necessary Feb 8, 2023
@ozankabak
Copy link

Deadlock doesn't seem to be the right word. However, the current behavior can result in unnecessarily long latencies if data comes in chunks aligned with batch boundaries. This is an edge case, but when it happens, it becomes a problem in streaming use cases. Thankfully, it is easily avoidable.

@alamb
Copy link
Contributor

alamb commented Feb 8, 2023

I wonder if the latency can be reduces by calling flush() on the underlying Decoder when the driver program knows (somehow) that it has received the end of a record and is not in the middle of decoding.

https://docs.rs/arrow-csv/32.0.0/arrow_csv/reader/struct.Decoder.html#method.flush

Perhaps something like

        let mut next = move || {
            loop {
                // force flush to produce RecordBatches if we have fed the entire input
                // that is available and are sure the data has only complete rows
                let decoded = if check_have_read_to_boundary() {
                  decoder.flush()
                  decoder.decode(buf)
                } else {
                  let buf = reader.fill_buf()?;
                  decoder.decode(buf); 
                }?;
                if decoded == 0 {
                    break;
                }
                reader.consume(decoded);
            }
            decoder.flush()
        };

tustvold added a commit to tustvold/arrow-rs that referenced this issue Feb 8, 2023
@tustvold
Copy link
Contributor

tustvold commented Feb 8, 2023

Would you be able to test out #3677 and see if it meets your requirements, if so I can polish it up with some tests, etc...

tustvold added a commit to tustvold/arrow-rs that referenced this issue Feb 8, 2023
@metesynnada
Copy link
Contributor Author

@tustvold thank you for your effort. #3677 it meets our requirements.

tustvold added a commit that referenced this issue Feb 10, 2023
* Add CSV Decoder::capacity (#3674)

* Add test

* Remove unnecessary extern

* Add docs
@tustvold tustvold added the arrow Changes to the arrow crate label Feb 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants