Skip to content

Commit

Permalink
chore: replace usage of deprecated CsvExec::new with `CsvExec::buil…
Browse files Browse the repository at this point in the history
…der`
  • Loading branch information
connec committed Jul 24, 2024
1 parent 0c51fdb commit debfd19
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 224 deletions.
35 changes: 19 additions & 16 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,22 +344,25 @@ impl FileFormat for CsvFormat {
conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = CsvExec::new(
conf,
// If format options does not specify whether there is a header,
// we consult configuration options.
self.options
.has_header
.unwrap_or(state.config_options().catalog.has_header),
self.options.delimiter,
self.options.quote,
self.options.escape,
self.options.comment,
self.options
.newlines_in_values
.unwrap_or(state.config_options().catalog.newlines_in_values),
self.options.compression.into(),
);
// Consult configuration options for default values
let has_header = self
.options
.has_header
.unwrap_or(state.config_options().catalog.has_header);
let newlines_in_values = self
.options
.newlines_in_values
.unwrap_or(state.config_options().catalog.newlines_in_values);

let exec = CsvExec::builder(conf)
.with_has_header(has_header)
.with_delimeter(self.options.delimiter)
.with_quote(self.options.quote)
.with_escape(self.options.escape)
.with_comment(self.options.comment)
.with_newlines_in_values(newlines_in_values)
.with_file_compression_type(self.options.compression.into())
.build();
Ok(Arc::new(exec))
}

Expand Down
114 changes: 54 additions & 60 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,16 +747,15 @@ mod tests {
let mut config = partitioned_csv_config(file_schema, file_groups);
config.projection = Some(vec![0, 2, 4]);

let csv = CsvExec::new(
config,
true,
b',',
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
let csv = CsvExec::builder(config)
.with_has_header(true)
.with_delimeter(b',')
.with_quote(b'"')
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
.with_file_compression_type(file_compression_type)
.build();
assert_eq!(13, csv.base_config.file_schema.fields().len());
assert_eq!(3, csv.schema().fields().len());

Expand Down Expand Up @@ -817,16 +816,15 @@ mod tests {
let mut config = partitioned_csv_config(file_schema, file_groups);
config.projection = Some(vec![4, 0, 2]);

let csv = CsvExec::new(
config,
true,
b',',
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
let csv = CsvExec::builder(config)
.with_has_header(true)
.with_delimeter(b',')
.with_quote(b'"')
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned())
.build();
assert_eq!(13, csv.base_config.file_schema.fields().len());
assert_eq!(3, csv.schema().fields().len());

Expand Down Expand Up @@ -887,16 +885,15 @@ mod tests {
let mut config = partitioned_csv_config(file_schema, file_groups);
config.limit = Some(5);

let csv = CsvExec::new(
config,
true,
b',',
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
let csv = CsvExec::builder(config)
.with_has_header(true)
.with_delimeter(b',')
.with_quote(b'"')
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned())
.build();
assert_eq!(13, csv.base_config.file_schema.fields().len());
assert_eq!(13, csv.schema().fields().len());

Expand Down Expand Up @@ -954,16 +951,15 @@ mod tests {
let mut config = partitioned_csv_config(file_schema, file_groups);
config.limit = Some(5);

let csv = CsvExec::new(
config,
true,
b',',
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
let csv = CsvExec::builder(config)
.with_has_header(true)
.with_delimeter(b',')
.with_quote(b'"')
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned())
.build();
assert_eq!(14, csv.base_config.file_schema.fields().len());
assert_eq!(14, csv.schema().fields().len());

Expand Down Expand Up @@ -1020,16 +1016,15 @@ mod tests {

// we don't have `/date=xx/` in the path but that is ok because
// partitions are resolved during scan anyway
let csv = CsvExec::new(
config,
true,
b',',
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
let csv = CsvExec::builder(config)
.with_has_header(true)
.with_delimeter(b',')
.with_quote(b'"')
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned())
.build();
assert_eq!(13, csv.base_config.file_schema.fields().len());
assert_eq!(2, csv.schema().fields().len());

Expand Down Expand Up @@ -1116,16 +1111,15 @@ mod tests {
.unwrap();

let config = partitioned_csv_config(file_schema, file_groups);
let csv = CsvExec::new(
config,
true,
b',',
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
let csv = CsvExec::builder(config)
.with_has_header(true)
.with_delimeter(b',')
.with_quote(b'"')
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned())
.build();

let it = csv.execute(0, task_ctx).unwrap();
let batches: Vec<_> = it.try_collect().await.unwrap();
Expand Down
89 changes: 49 additions & 40 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1463,18 +1463,21 @@ pub(crate) mod tests {
}

fn csv_exec_with_sort(output_ordering: Vec<Vec<PhysicalSortExpr>>) -> Arc<CsvExec> {
Arc::new(CsvExec::new(
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema())
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_output_ordering(output_ordering),
false,
b',',
b'"',
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
Arc::new(
CsvExec::builder(
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema())
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_output_ordering(output_ordering),
)
.with_has_header(false)
.with_delimeter(b',')
.with_quote(b'"')
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
.build(),
)
}

fn csv_exec_multiple() -> Arc<CsvExec> {
Expand All @@ -1485,21 +1488,24 @@ pub(crate) mod tests {
fn csv_exec_multiple_sorted(
output_ordering: Vec<Vec<PhysicalSortExpr>>,
) -> Arc<CsvExec> {
Arc::new(CsvExec::new(
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema())
.with_file_groups(vec![
vec![PartitionedFile::new("x".to_string(), 100)],
vec![PartitionedFile::new("y".to_string(), 100)],
])
.with_output_ordering(output_ordering),
false,
b',',
b'"',
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
Arc::new(
CsvExec::builder(
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema())
.with_file_groups(vec![
vec![PartitionedFile::new("x".to_string(), 100)],
vec![PartitionedFile::new("y".to_string(), 100)],
])
.with_output_ordering(output_ordering),
)
.with_has_header(false)
.with_delimeter(b',')
.with_quote(b'"')
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
.build(),
)
}

fn projection_exec_with_alias(
Expand Down Expand Up @@ -3761,20 +3767,23 @@ pub(crate) mod tests {
};

let plan = aggregate_exec_with_alias(
Arc::new(CsvExec::new(
FileScanConfig::new(
ObjectStoreUrl::parse("test:///").unwrap(),
schema(),
Arc::new(
CsvExec::builder(
FileScanConfig::new(
ObjectStoreUrl::parse("test:///").unwrap(),
schema(),
)
.with_file(PartitionedFile::new("x".to_string(), 100)),
)
.with_file(PartitionedFile::new("x".to_string(), 100)),
false,
b',',
b'"',
None,
None,
false,
compression_type,
)),
.with_has_header(false)
.with_delimeter(b',')
.with_quote(b'"')
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
.with_file_compression_type(compression_type)
.build(),
),
vec![("a".to_string(), "a".to_string())],
);
assert_optimized!(expected, plan, true, false, 2, true, 10, false);
Expand Down
Loading

0 comments on commit debfd19

Please sign in to comment.