Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change compound column field name rules #952

Merged
merged 6 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
418 changes: 209 additions & 209 deletions datafusion/src/execution/context.rs

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,15 +277,15 @@ mod tests {

assert_batches_sorted_eq!(
vec![
"+----+----------------------+--------------------+---------------------+--------------------+------------+---------------------+",
"| c1 | MIN(c12) | MAX(c12) | AVG(c12) | SUM(c12) | COUNT(c12) | COUNT(DISTINCT c12) |",
"+----+----------------------+--------------------+---------------------+--------------------+------------+---------------------+",
"| a | 0.02182578039211991 | 0.9800193410444061 | 0.48754517466109415 | 10.238448667882977 | 21 | 21 |",
"| b | 0.04893135681998029 | 0.9185813970744787 | 0.41040709263815384 | 7.797734760124923 | 19 | 19 |",
"| c | 0.0494924465469434 | 0.991517828651004 | 0.6600456536439784 | 13.860958726523545 | 21 | 21 |",
"| d | 0.061029375346466685 | 0.9748360509016578 | 0.48855379387549824 | 8.793968289758968 | 18 | 18 |",
"| e | 0.01479305307777301 | 0.9965400387585364 | 0.48600669271341534 | 10.206140546981722 | 21 | 21 |",
"+----+----------------------+--------------------+---------------------+--------------------+------------+---------------------+",
"+----+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------+----------------------------------------+",
"| c1 | MIN(aggregate_test_100.c12) | MAX(aggregate_test_100.c12) | AVG(aggregate_test_100.c12) | SUM(aggregate_test_100.c12) | COUNT(aggregate_test_100.c12) | COUNT(DISTINCT aggregate_test_100.c12) |",
"+----+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------+----------------------------------------+",
"| a | 0.02182578039211991 | 0.9800193410444061 | 0.48754517466109415 | 10.238448667882977 | 21 | 21 |",
"| b | 0.04893135681998029 | 0.9185813970744787 | 0.41040709263815384 | 7.797734760124923 | 19 | 19 |",
"| c | 0.0494924465469434 | 0.991517828651004 | 0.6600456536439784 | 13.860958726523545 | 21 | 21 |",
"| d | 0.061029375346466685 | 0.9748360509016578 | 0.48855379387549824 | 8.793968289758968 | 18 | 18 |",
"| e | 0.01479305307777301 | 0.9965400387585364 | 0.48600669271341534 | 10.206140546981722 | 21 | 21 |",
"+----+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------+----------------------------------------+",
],
&df
);
Expand Down
20 changes: 10 additions & 10 deletions datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@
//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?;
//!
//! let expected = vec![
//! "+---+--------+",
//! "| a | MIN(b) |",
//! "+---+--------+",
//! "| 1 | 2 |",
//! "+---+--------+"
//! "+---+--------------------------+",
//! "| a | MIN(tests/example.csv.b) |",
//! "+---+--------------------------+",
//! "| 1 | 2 |",
//! "+---+--------------------------+"
//! ];
//!
//! assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
Expand Down Expand Up @@ -95,11 +95,11 @@
//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?;
//!
//! let expected = vec![
//! "+---+--------+",
//! "| a | MIN(b) |",
//! "+---+--------+",
//! "| 1 | 2 |",
//! "+---+--------+"
//! "+---+----------------+",
//! "| a | MIN(example.b) |",
//! "+---+----------------+",
//! "| 1 | 2 |",
//! "+---+----------------+"
//! ];
//!
//! assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
Expand Down
75 changes: 45 additions & 30 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,10 @@ fn create_function_physical_name(
fun: &str,
distinct: bool,
args: &[Expr],
input_schema: &DFSchema,
) -> Result<String> {
let names: Vec<String> = args
.iter()
.map(|e| physical_name(e, input_schema))
.map(|e| create_physical_name(e, false))
.collect::<Result<_>>()?;

let distinct_str = match distinct {
Expand All @@ -76,15 +75,25 @@ fn create_function_physical_name(
Ok(format!("{}({}{})", fun, distinct_str, names.join(",")))
}

fn physical_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
fn physical_name(e: &Expr) -> Result<String> {
create_physical_name(e, true)
}

fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
match e {
Expr::Column(c) => Ok(c.name.clone()),
Expr::Column(c) => {
if is_first_expr {
Ok(c.name.clone())
} else {
Ok(c.flat_name())
}
}
Expr::Alias(_, name) => Ok(name.clone()),
Expr::ScalarVariable(variable_names) => Ok(variable_names.join(".")),
Expr::Literal(value) => Ok(format!("{:?}", value)),
Expr::BinaryExpr { left, op, right } => {
let left = physical_name(left, input_schema)?;
let right = physical_name(right, input_schema)?;
let left = create_physical_name(left, false)?;
let right = create_physical_name(right, false)?;
Ok(format!("{} {:?} {}", left, op, right))
}
Expr::Case {
Expand All @@ -106,50 +115,48 @@ fn physical_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
Ok(name)
}
Expr::Cast { expr, data_type } => {
let expr = physical_name(expr, input_schema)?;
let expr = create_physical_name(expr, false)?;
Ok(format!("CAST({} AS {:?})", expr, data_type))
}
Expr::TryCast { expr, data_type } => {
let expr = physical_name(expr, input_schema)?;
let expr = create_physical_name(expr, false)?;
Ok(format!("TRY_CAST({} AS {:?})", expr, data_type))
}
Expr::Not(expr) => {
let expr = physical_name(expr, input_schema)?;
let expr = create_physical_name(expr, false)?;
Ok(format!("NOT {}", expr))
}
Expr::Negative(expr) => {
let expr = physical_name(expr, input_schema)?;
let expr = create_physical_name(expr, false)?;
Ok(format!("(- {})", expr))
}
Expr::IsNull(expr) => {
let expr = physical_name(expr, input_schema)?;
let expr = create_physical_name(expr, false)?;
Ok(format!("{} IS NULL", expr))
}
Expr::IsNotNull(expr) => {
let expr = physical_name(expr, input_schema)?;
let expr = create_physical_name(expr, false)?;
Ok(format!("{} IS NOT NULL", expr))
}
Expr::ScalarFunction { fun, args, .. } => {
create_function_physical_name(&fun.to_string(), false, args, input_schema)
create_function_physical_name(&fun.to_string(), false, args)
}
Expr::ScalarUDF { fun, args, .. } => {
create_function_physical_name(&fun.name, false, args, input_schema)
create_function_physical_name(&fun.name, false, args)
}
Expr::WindowFunction { fun, args, .. } => {
create_function_physical_name(&fun.to_string(), false, args, input_schema)
create_function_physical_name(&fun.to_string(), false, args)
}
Expr::AggregateFunction {
fun,
distinct,
args,
..
} => {
create_function_physical_name(&fun.to_string(), *distinct, args, input_schema)
}
} => create_function_physical_name(&fun.to_string(), *distinct, args),
Expr::AggregateUDF { fun, args } => {
let mut names = Vec::with_capacity(args.len());
for e in args {
names.push(physical_name(e, input_schema)?);
names.push(create_physical_name(e, false)?);
}
Ok(format!("{}({})", fun.name, names.join(",")))
}
Expand All @@ -158,8 +165,8 @@ fn physical_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
list,
negated,
} => {
let expr = physical_name(expr, input_schema)?;
let list = list.iter().map(|expr| physical_name(expr, input_schema));
let expr = create_physical_name(expr, false)?;
let list = list.iter().map(|expr| create_physical_name(expr, false));
if *negated {
Ok(format!("{} NOT IN ({:?})", expr, list))
} else {
Expand Down Expand Up @@ -444,7 +451,7 @@ impl DefaultPhysicalPlanner {
&physical_input_schema,
ctx_state,
),
physical_name(e, logical_input_schema),
physical_name(e),
))
})
.collect::<Result<Vec<_>>>()?;
Expand Down Expand Up @@ -545,10 +552,10 @@ impl DefaultPhysicalPlanner {
}
// logical column is not a derived column, safe to pass along to
// physical_name
Err(_) => physical_name(e, input_schema),
Err(_) => physical_name(e),
}
} else {
physical_name(e, input_schema)
physical_name(e)
};

tuple_err((
Expand Down Expand Up @@ -1192,7 +1199,7 @@ impl DefaultPhysicalPlanner {
// unpack aliased logical expressions, e.g. "sum(col) over () as total"
let (name, e) = match e {
Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
_ => (physical_name(e, logical_input_schema)?, e),
_ => (physical_name(e)?, e),
};
self.create_window_expr_with_name(
e,
Expand Down Expand Up @@ -1271,7 +1278,7 @@ impl DefaultPhysicalPlanner {
// unpack aliased logical expressions, e.g. "sum(col) as total"
let (name, e) = match e {
Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
_ => (physical_name(e, logical_input_schema)?, e),
_ => (physical_name(e)?, e),
};

self.create_aggregate_expr_with_name(
Expand Down Expand Up @@ -1629,16 +1636,24 @@ mod tests {
let path = format!("{}/csv/aggregate_test_100.csv", testdata);

let options = CsvReadOptions::new().schema_infer_max_records(100);
let logical_plan = LogicalPlanBuilder::scan_csv(path, options, None)?
.aggregate(vec![col("c1")], vec![sum(col("c2"))])?
.build()?;
let logical_plan = LogicalPlanBuilder::scan_csv_with_name(
path,
options,
None,
"aggregate_test_100",
)?
.aggregate(vec![col("c1")], vec![sum(col("c2"))])?
.build()?;

let execution_plan = plan(&logical_plan)?;
let final_hash_agg = execution_plan
.as_any()
.downcast_ref::<HashAggregateExec>()
.expect("hash aggregate");
assert_eq!("SUM(c2)", final_hash_agg.schema().field(1).name());
assert_eq!(
"SUM(aggregate_test_100.c2)",
final_hash_agg.schema().field(1).name()
);
// we need access to the input to the partial aggregate so that other projects can
// implement serde
assert_eq!("c2", final_hash_agg.input_schema().field(1).name());
Expand Down
Loading