From 27d2a7510d75163f1d8cb430666662f3bef8bbea Mon Sep 17 00:00:00 2001 From: Ryan Johnson Date: Tue, 11 Feb 2025 15:04:18 -0700 Subject: [PATCH] Expose record boundary information in JSON decoder (#7092) * Expose record boundary information in JSON decoder * fix doc links --- arrow-json/src/reader/mod.rs | 42 +++++++++++++++++++++++++++++++++-- arrow-json/src/reader/tape.rs | 23 ++++++++++++++++++- 2 files changed, 62 insertions(+), 3 deletions(-) diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index 704fdbeb95ea..14a8f6809f70 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -615,11 +615,27 @@ impl Decoder { self.tape_decoder.serialize(rows) } + /// True if the decoder is currently part way through decoding a record. + pub fn has_partial_record(&self) -> bool { + self.tape_decoder.has_partial_row() + } + + /// The number of unflushed records, including the partially decoded record (if any). + pub fn len(&self) -> usize { + self.tape_decoder.num_buffered_rows() + } + + /// True if there are no records to flush, i.e. [`Self::len`] is zero. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + /// Flushes the currently buffered data to a [`RecordBatch`] /// - /// Returns `Ok(None)` if no buffered data + /// Returns `Ok(None)` if no buffered data, i.e. [`Self::is_empty`] is true. /// - /// Note: if called part way through decoding a record, this will return an error + /// Note: This will return an error if called part way through decoding a record, + /// i.e. [`Self::has_partial_record`] is true. pub fn flush(&mut self) -> Result, ArrowError> { let tape = self.tape_decoder.finish()?; @@ -803,6 +819,20 @@ mod tests { Field::new("e", DataType::Date64, true), ])); + let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap(); + assert!(decoder.is_empty()); + assert_eq!(decoder.len(), 0); + assert!(!decoder.has_partial_record()); + assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221); + assert!(!decoder.is_empty()); + assert_eq!(decoder.len(), 6); + assert!(!decoder.has_partial_record()); + let batch = decoder.flush().unwrap().unwrap(); + assert_eq!(batch.num_rows(), 6); + assert!(decoder.is_empty()); + assert_eq!(decoder.len(), 0); + assert!(!decoder.has_partial_record()); + let batches = do_read(buf, 1024, false, false, schema); assert_eq!(batches.len(), 1); @@ -2158,6 +2188,14 @@ mod tests { true, )])); + let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap(); + let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap(); + assert!(decoder.tape_decoder.has_partial_row()); + assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1); + let _ = decoder.flush().unwrap_err(); + assert!(decoder.tape_decoder.has_partial_row()); + assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1); + let parse_err = |s: &str| { ReaderBuilder::new(schema.clone()) .build(Cursor::new(s.as_bytes())) diff --git a/arrow-json/src/reader/tape.rs b/arrow-json/src/reader/tape.rs index a93567f9fa18..2a3bb610ce82 100644 --- a/arrow-json/src/reader/tape.rs +++ b/arrow-json/src/reader/tape.rs @@ -545,6 +545,17 @@ impl TapeDecoder { Ok(()) } + /// The number of buffered rows, including the partially decoded row (if any). + pub fn num_buffered_rows(&self) -> usize { + self.cur_row + } + + /// True if the decoder is part way through decoding a row. If so, calling [`Self::finish`] + /// would return an error. + pub fn has_partial_row(&self) -> bool { + !self.stack.is_empty() + } + /// Finishes the current [`Tape`] pub fn finish(&self) -> Result, ArrowError> { if let Some(b) = self.stack.last() { @@ -726,8 +737,12 @@ mod tests { "#; let mut decoder = TapeDecoder::new(16, 2); decoder.decode(a.as_bytes()).unwrap(); + assert!(!decoder.has_partial_row()); + assert_eq!(decoder.num_buffered_rows(), 7); let finished = decoder.finish().unwrap(); + assert!(!decoder.has_partial_row()); + assert_eq!(decoder.num_buffered_rows(), 7); // didn't call clear() yet assert_eq!( finished.elements, &[ @@ -820,7 +835,11 @@ mod tests { 0, 5, 10, 13, 14, 17, 19, 22, 25, 28, 29, 30, 31, 32, 32, 32, 33, 34, 35, 41, 47, 52, 55, 57, 58, 59, 62, 63, 63, 66, 69, 70, 71, 72, 73, 74, 75, 76, 77 ] - ) + ); + + decoder.clear(); + assert!(!decoder.has_partial_row()); + assert_eq!(decoder.num_buffered_rows(), 0); } #[test] @@ -874,6 +893,8 @@ mod tests { // Test truncation let mut decoder = TapeDecoder::new(16, 2); decoder.decode(b"{\"he").unwrap(); + assert!(decoder.has_partial_row()); + assert_eq!(decoder.num_buffered_rows(), 1); let err = decoder.finish().unwrap_err().to_string(); assert_eq!(err, "Json error: Truncated record whilst reading string");