Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix broken sync json parsing and harmonize file reading #373

Merged
merged 4 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 107 additions & 4 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
//! Some utilities for working with arrow data types

use std::{collections::HashSet, sync::Arc};
use std::collections::HashSet;
use std::io::{BufRead, BufReader};
use std::sync::Arc;

use crate::{
engine::arrow_data::ArrowEngineData,
schema::{DataType, PrimitiveType, Schema, SchemaRef, StructField, StructType},
utils::require,
DeltaResult, Error,
DeltaResult, EngineData, Error,
};

use arrow_array::{
cast::AsArray, new_null_array, Array as ArrowArray, GenericListArray, OffsetSizeTrait,
StructArray,
RecordBatch, StringArray, StructArray,
};
use arrow_json::ReaderBuilder;
use arrow_schema::{
DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, Fields,
SchemaRef as ArrowSchemaRef,
};
use arrow_select::concat::concat_batches;
use itertools::Itertools;
use parquet::{arrow::ProjectionMask, schema::types::SchemaDescriptor};
use tracing::debug;
Expand Down Expand Up @@ -757,6 +762,57 @@ fn reorder_list<O: OffsetSizeTrait>(
}
}

/// Arrow lacks the functionality to json-parse a string column into a struct column -- even tho the
/// JSON file reader does exactly the same thing. This function is a hack to work around that gap.
pub(crate) fn parse_json(
json_strings: Box<dyn EngineData>,
schema: SchemaRef,
) -> DeltaResult<Box<dyn EngineData>> {
let json_strings: RecordBatch = ArrowEngineData::try_from_engine_data(json_strings)?.into();
let json_strings = json_strings
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
Error::generic("Expected json_strings to be a StringArray, found something else")
})?;
let schema: ArrowSchemaRef = Arc::new(schema.as_ref().try_into()?);
let result = parse_json_impl(json_strings, schema)?;
Ok(Box::new(ArrowEngineData::new(result)))
}

// Raw arrow implementation of the json parsing. Separate from the public function for testing.
//
// NOTE: This code is really inefficient because arrow lacks the native capability to perform robust
// StringArray -> StructArray JSON parsing. See https://github.com/apache/arrow-rs/issues/6522. If
// that shortcoming gets fixed upstream, this method can simplify or hopefully even disappear.
fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaResult<RecordBatch> {
if json_strings.is_empty() {
return Ok(RecordBatch::new_empty(schema));
}

// Use batch size of 1 to force one record per string input
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(1)
.build_decoder()?;
let parse_one = |json_string: Option<&str>| -> DeltaResult<RecordBatch> {
let mut reader = BufReader::new(json_string.unwrap_or("{}").as_bytes());
let buf = reader.fill_buf()?;
let read = buf.len();
require!(
decoder.decode(buf)? == read,
Error::missing_data("Incomplete JSON string")
);
let Some(batch) = decoder.flush()? else {
return Err(Error::missing_data("Expected data"));
};
require!(batch.num_rows() == 1, Error::generic("Expected one row"));
Ok(batch)
};
let output: Vec<_> = json_strings.iter().map(parse_one).try_collect()?;
Ok(concat_batches(&schema, output.iter())?)
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand All @@ -775,7 +831,7 @@ mod tests {

use crate::schema::{ArrayType, DataType, MapType, StructField, StructType};

use super::{get_requested_indices, reorder_struct_array, ReorderIndex};
use super::*;

fn nested_parquet_schema() -> ArrowSchemaRef {
Arc::new(ArrowSchema::new(vec![
Expand All @@ -795,6 +851,53 @@ mod tests {
]))
}

#[test]
fn test_json_parsing() {
let requested_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("a", ArrowDataType::Int32, true),
ArrowField::new("b", ArrowDataType::Utf8, true),
ArrowField::new("c", ArrowDataType::Int32, true),
]));
let input: Vec<&str> = vec![];
let result = parse_json_impl(&input.into(), requested_schema.clone()).unwrap();
assert_eq!(result.num_rows(), 0);

let input: Vec<Option<&str>> = vec![Some("")];
let result = parse_json_impl(&input.into(), requested_schema.clone());
result.expect_err("empty string");

let input: Vec<Option<&str>> = vec![Some(" \n\t")];
let result = parse_json_impl(&input.into(), requested_schema.clone());
result.expect_err("empty string");

let input: Vec<Option<&str>> = vec![Some(r#""a""#)];
let result = parse_json_impl(&input.into(), requested_schema.clone());
result.expect_err("invalid string");

let input: Vec<Option<&str>> = vec![Some(r#"{ "a": 1"#)];
let result = parse_json_impl(&input.into(), requested_schema.clone());
result.expect_err("incomplete object");

let input: Vec<Option<&str>> = vec![Some("{}{}")];
let result = parse_json_impl(&input.into(), requested_schema.clone());
result.expect_err("multiple objects (complete)");

let input: Vec<Option<&str>> = vec![Some(r#"{} { "a": 1"#)];
let result = parse_json_impl(&input.into(), requested_schema.clone());
result.expect_err("multiple objects (partial)");

let input: Vec<Option<&str>> = vec![Some(r#"{ "a": 1"#), Some(r#", "b"}"#)];
let result = parse_json_impl(&input.into(), requested_schema.clone());
result.expect_err("split object");

let input: Vec<Option<&str>> = vec![None, Some(r#"{"a": 1, "b": "2", "c": 3}"#), None];
let result = parse_json_impl(&input.into(), requested_schema.clone()).unwrap();
assert_eq!(result.num_rows(), 3);
assert_eq!(result.column(0).null_count(), 2);
assert_eq!(result.column(1).null_count(), 2);
assert_eq!(result.column(2).null_count(), 2);
}

#[test]
fn simple_mask_indices() {
let requested_schema = Arc::new(StructType::new(vec![
Expand Down
56 changes: 5 additions & 51 deletions kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,16 @@ use std::ops::Range;
use std::sync::Arc;
use std::task::{ready, Poll};

use arrow_array::{new_null_array, Array, RecordBatch, StringArray, StructArray};
use arrow_json::ReaderBuilder;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use arrow_select::concat::concat_batches;
use bytes::{Buf, Bytes};
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use object_store::path::Path;
use object_store::{DynObjectStore, GetResultPayload};

use super::executor::TaskExecutor;
use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::parse_json as arrow_parse_json;
use crate::schema::SchemaRef;
use crate::{
DeltaResult, EngineData, Error, Expression, FileDataReadResultIterator, FileMeta, JsonHandler,
Expand Down Expand Up @@ -62,57 +59,13 @@ impl<E: TaskExecutor> DefaultJsonHandler<E> {
}
}

fn hack_parse(
stats_schema: &ArrowSchemaRef,
json_string: Option<&str>,
) -> DeltaResult<RecordBatch> {
match json_string {
Some(s) => Ok(ReaderBuilder::new(stats_schema.clone())
.build(BufReader::new(s.as_bytes()))?
.next()
.transpose()?
.ok_or(Error::missing_data("Expected data"))?),
None => Ok(RecordBatch::try_new(
stats_schema.clone(),
stats_schema
.fields
.iter()
.map(|field| new_null_array(field.data_type(), 1))
.collect(),
)?),
}
}

impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
fn parse_json(
&self,
json_strings: Box<dyn EngineData>,
output_schema: SchemaRef,
) -> DeltaResult<Box<dyn EngineData>> {
let json_strings: RecordBatch = ArrowEngineData::try_from_engine_data(json_strings)?.into();
// TODO(nick): this is pretty terrible
let struct_array: StructArray = json_strings.into();
let json_strings = struct_array
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
Error::generic("Expected json_strings to be a StringArray, found something else")
})?;
let output_schema: ArrowSchemaRef = Arc::new(output_schema.as_ref().try_into()?);
if json_strings.is_empty() {
return Ok(Box::new(ArrowEngineData::new(RecordBatch::new_empty(
output_schema,
))));
}
let output: Vec<_> = json_strings
.iter()
.map(|json_string| hack_parse(&output_schema, json_string))
.try_collect()?;
Ok(Box::new(ArrowEngineData::new(concat_batches(
&output_schema,
output.iter(),
)?)))
arrow_parse_json(json_strings, output_schema)
}

fn read_json_files(
Expand Down Expand Up @@ -220,14 +173,15 @@ impl FileOpener for JsonOpener {
mod tests {
use std::path::PathBuf;

use arrow::array::AsArray;
use arrow::array::{AsArray, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use itertools::Itertools;
use object_store::{local::LocalFileSystem, ObjectStore};

use super::*;
use crate::{
actions::get_log_schema, engine::default::executor::tokio::TokioBackgroundExecutor,
actions::get_log_schema, engine::arrow_data::ArrowEngineData,
engine::default::executor::tokio::TokioBackgroundExecutor,
};

fn string_array_to_engine_data(string_array: StringArray) -> Box<dyn EngineData> {
Expand Down
89 changes: 17 additions & 72 deletions kernel/src/engine/sync/json.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,26 @@
use std::{
fs::File,
io::{BufReader, Cursor},
sync::Arc,
};
use std::{fs::File, io::BufReader};

use crate::{
schema::SchemaRef, utils::require, DeltaResult, EngineData, Error, Expression,
FileDataReadResultIterator, FileMeta, JsonHandler,
engine::arrow_utils::parse_json as arrow_parse_json, schema::SchemaRef, DeltaResult,
EngineData, Expression, FileDataReadResultIterator, FileMeta, JsonHandler,
};
use arrow_array::{cast::AsArray, RecordBatch};
use arrow_json::ReaderBuilder;
use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use arrow_select::concat::concat_batches;
use itertools::Itertools;
use tracing::debug;
use url::Url;
use arrow_schema::SchemaRef as ArrowSchemaRef;

use super::read_files;
use crate::engine::arrow_data::ArrowEngineData;

pub(crate) struct SyncJsonHandler;

fn try_create_from_json(schema: SchemaRef, location: Url) -> DeltaResult<ArrowEngineData> {
let arrow_schema: ArrowSchema = (&*schema).try_into()?;
debug!("Reading {:#?} with schema: {:#?}", location, arrow_schema);
let file = File::open(
location
.to_file_path()
.map_err(|_| Error::generic("can only read local files"))?,
)?;
let mut json =
arrow_json::ReaderBuilder::new(Arc::new(arrow_schema)).build(BufReader::new(file))?;
let data = json
.next()
.ok_or(Error::generic("No data found reading json file"))?;
Ok(ArrowEngineData::new(data?))
fn try_create_from_json(
file: File,
_schema: SchemaRef,
arrow_schema: ArrowSchemaRef,
_predicate: Option<&Expression>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ArrowEngineData>>> {
let json = arrow_json::ReaderBuilder::new(arrow_schema)
.build(BufReader::new(file))?
.map(|data| Ok(ArrowEngineData::new(data?)));
Ok(json)
}

impl JsonHandler for SyncJsonHandler {
Expand All @@ -43,56 +30,14 @@ impl JsonHandler for SyncJsonHandler {
schema: SchemaRef,
predicate: Option<Expression>,
) -> DeltaResult<FileDataReadResultIterator> {
debug!("Reading json files: {files:#?} with predicate {predicate:#?}");
if files.is_empty() {
return Ok(Box::new(std::iter::empty()));
}
let res: Vec<_> = files
.iter()
.map(|file| {
try_create_from_json(schema.clone(), file.location.clone())
.map(|d| Box::new(d) as _)
})
.collect();
Ok(Box::new(res.into_iter()))
read_files(files, schema, predicate, try_create_from_json)
}

fn parse_json(
&self,
json_strings: Box<dyn EngineData>,
output_schema: SchemaRef,
) -> DeltaResult<Box<dyn EngineData>> {
// TODO: This is taken from the default engine as it's the same. We should share an
// implementation at some point
let json_strings: RecordBatch = ArrowEngineData::try_from_engine_data(json_strings)?.into();
require!(
json_strings.num_columns() == 1,
Error::missing_column("Expected single column")
);
let json_strings =
json_strings
.column(0)
.as_string_opt::<i32>()
.ok_or(Error::unexpected_column_type(
"Expected column to be String",
))?;

let data: Vec<_> = json_strings
.into_iter()
.filter_map(|d| {
d.map(|dd| {
let mut data = dd.as_bytes().to_vec();
data.extend("\n".as_bytes());
data
})
})
.flatten()
.collect();

let schema: ArrowSchemaRef = Arc::new(output_schema.as_ref().try_into()?);
let batches: Vec<_> = ReaderBuilder::new(schema.clone())
.build(Cursor::new(data))?
.try_collect()?;
Ok(Box::new(ArrowEngineData::new(concat_batches(&schema, &batches)?)) as _)
arrow_parse_json(json_strings, output_schema)
}
}
Loading
Loading