Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use schema_name to create the physical_name #11977

Merged
merged 1 commit into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ use datafusion_common::{
};
use datafusion_expr::dml::CopyTo;
use datafusion_expr::expr::{
self, create_function_physical_name, physical_name, AggregateFunction, Alias,
GroupingSet, WindowFunction,
self, physical_name, AggregateFunction, Alias, GroupingSet, WindowFunction,
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
Expand Down Expand Up @@ -1563,12 +1562,7 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
let name = if let Some(name) = name {
name
} else {
create_function_physical_name(
func.name(),
*distinct,
args,
order_by.as_ref(),
)?
physical_name(e)?
};

let physical_args =
Expand All @@ -1582,8 +1576,7 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
None => None,
};

let ignore_nulls = null_treatment
.unwrap_or(sqlparser::ast::NullTreatment::RespectNulls)
let ignore_nulls = null_treatment.unwrap_or(NullTreatment::RespectNulls)
== NullTreatment::IgnoreNulls;

let (agg_expr, filter, order_by) = {
Expand Down
272 changes: 12 additions & 260 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion_common::{
internal_err, not_impl_err, plan_err, Column, DFSchema, Result, ScalarValue,
TableReference,
plan_err, Column, DFSchema, Result, ScalarValue, TableReference,
};
use sqlparser::ast::{
display_comma_separated, ExceptSelectItem, ExcludeSelectItem, IlikeSelectItem,
Expand Down Expand Up @@ -1082,7 +1081,7 @@ impl Expr {
/// For example, for a projection (e.g. `SELECT <expr>`) the resulting arrow
/// [`Schema`] will have a field with this name.
///
/// Note that the resulting string is subtlety different than the `Display`
/// Note that the resulting string is subtlety different from the `Display`
/// representation for certain `Expr`. Some differences:
///
/// 1. [`Expr::Alias`], which shows only the alias itself
Expand All @@ -1104,6 +1103,7 @@ impl Expr {
}

/// Returns a full and complete string representation of this expression.
#[deprecated(note = "use format! instead")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

pub fn canonical_name(&self) -> String {
format!("{self}")
}
Expand Down Expand Up @@ -2386,263 +2386,13 @@ fn fmt_function(
write!(f, "{}({}{})", fun, distinct_str, args.join(", "))
}

pub fn create_function_physical_name(
fun: &str,
distinct: bool,
args: &[Expr],
order_by: Option<&Vec<Expr>>,
) -> Result<String> {
let names: Vec<String> = args
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<_>>()?;

let distinct_str = match distinct {
true => "DISTINCT ",
false => "",
};

let phys_name = format!("{}({}{})", fun, distinct_str, names.join(","));

Ok(order_by
.map(|order_by| format!("{} ORDER BY [{}]", phys_name, expr_vec_fmt!(order_by)))
.unwrap_or(phys_name))
}

pub fn physical_name(e: &Expr) -> Result<String> {
create_physical_name(e, true)
}

fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya this was the flag, I remembered it incorrectly: is_first_expr

match e {
Expr::Unnest(_) => {
internal_err!(
"Expr::Unnest should have been converted to LogicalPlan::Unnest"
)
}
Expr::Column(c) => {
if is_first_expr {
Ok(c.name.clone())
} else {
Ok(c.flat_name())
}
}
Expr::Alias(Alias { name, .. }) => Ok(name.clone()),
Expr::ScalarVariable(_, variable_names) => Ok(variable_names.join(".")),
Expr::Literal(value) => Ok(format!("{value:?}")),
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
let left = create_physical_name(left, false)?;
let right = create_physical_name(right, false)?;
Ok(format!("{left} {op} {right}"))
}
Expr::Case(case) => {
let mut name = "CASE ".to_string();
if let Some(e) = &case.expr {
let _ = write!(name, "{} ", create_physical_name(e, false)?);
}
for (w, t) in &case.when_then_expr {
let _ = write!(
name,
"WHEN {} THEN {} ",
create_physical_name(w, false)?,
create_physical_name(t, false)?
);
}
if let Some(e) = &case.else_expr {
let _ = write!(name, "ELSE {} ", create_physical_name(e, false)?);
}
name += "END";
Ok(name)
}
Expr::Cast(Cast { expr, .. }) => {
// CAST does not change the expression name
create_physical_name(expr, false)
}
Expr::TryCast(TryCast { expr, .. }) => {
// CAST does not change the expression name
create_physical_name(expr, false)
}
Expr::Not(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("NOT {expr}"))
}
Expr::Negative(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("(- {expr})"))
}
Expr::IsNull(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NULL"))
}
Expr::IsNotNull(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT NULL"))
}
Expr::IsTrue(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS TRUE"))
}
Expr::IsFalse(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS FALSE"))
}
Expr::IsUnknown(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS UNKNOWN"))
}
Expr::IsNotTrue(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT TRUE"))
}
Expr::IsNotFalse(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT FALSE"))
}
Expr::IsNotUnknown(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT UNKNOWN"))
}
Expr::ScalarFunction(fun) => fun.func.schema_name(&fun.args),
Expr::WindowFunction(WindowFunction {
fun,
args,
order_by,
..
}) => {
create_function_physical_name(&fun.to_string(), false, args, Some(order_by))
}
Expr::AggregateFunction(AggregateFunction {
func,
distinct,
args,
filter: _,
order_by,
null_treatment: _,
}) => {
create_function_physical_name(func.name(), *distinct, args, order_by.as_ref())
}
Expr::GroupingSet(grouping_set) => match grouping_set {
GroupingSet::Rollup(exprs) => Ok(format!(
"ROLLUP ({})",
exprs
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<Vec<_>>>()?
.join(", ")
)),
GroupingSet::Cube(exprs) => Ok(format!(
"CUBE ({})",
exprs
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<Vec<_>>>()?
.join(", ")
)),
GroupingSet::GroupingSets(lists_of_exprs) => {
let mut strings = vec![];
for exprs in lists_of_exprs {
let exprs_str = exprs
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<Vec<_>>>()?
.join(", ");
strings.push(format!("({exprs_str})"));
}
Ok(format!("GROUPING SETS ({})", strings.join(", ")))
}
},

Expr::InList(InList {
expr,
list,
negated,
}) => {
let expr = create_physical_name(expr, false)?;
let list = list.iter().map(|expr| create_physical_name(expr, false));
if *negated {
Ok(format!("{expr} NOT IN ({list:?})"))
} else {
Ok(format!("{expr} IN ({list:?})"))
}
}
Expr::Exists { .. } => {
not_impl_err!("EXISTS is not yet supported in the physical plan")
}
Expr::InSubquery(_) => {
not_impl_err!("IN subquery is not yet supported in the physical plan")
}
Expr::ScalarSubquery(_) => {
not_impl_err!("Scalar subqueries are not yet supported in the physical plan")
}
Expr::Between(Between {
expr,
negated,
low,
high,
}) => {
let expr = create_physical_name(expr, false)?;
let low = create_physical_name(low, false)?;
let high = create_physical_name(high, false)?;
if *negated {
Ok(format!("{expr} NOT BETWEEN {low} AND {high}"))
} else {
Ok(format!("{expr} BETWEEN {low} AND {high}"))
}
}
Expr::Like(Like {
negated,
expr,
pattern,
escape_char,
case_insensitive,
}) => {
let expr = create_physical_name(expr, false)?;
let pattern = create_physical_name(pattern, false)?;
let op_name = if *case_insensitive { "ILIKE" } else { "LIKE" };
let escape = if let Some(char) = escape_char {
format!("CHAR '{char}'")
} else {
"".to_string()
};
if *negated {
Ok(format!("{expr} NOT {op_name} {pattern}{escape}"))
} else {
Ok(format!("{expr} {op_name} {pattern}{escape}"))
}
}
Expr::SimilarTo(Like {
negated,
expr,
pattern,
escape_char,
case_insensitive: _,
}) => {
let expr = create_physical_name(expr, false)?;
let pattern = create_physical_name(pattern, false)?;
let escape = if let Some(char) = escape_char {
format!("CHAR '{char}'")
} else {
"".to_string()
};
if *negated {
Ok(format!("{expr} NOT SIMILAR TO {pattern}{escape}"))
} else {
Ok(format!("{expr} SIMILAR TO {pattern}{escape}"))
}
}
Expr::Sort { .. } => {
internal_err!("Create physical name does not support sort expression")
}
Expr::Wildcard { qualifier, options } => match qualifier {
Some(qualifier) => Ok(format!("{}.*{}", qualifier, options)),
None => Ok(format!("*{}", options)),
},
Expr::Placeholder(_) => {
internal_err!("Create physical name does not support placeholder")
}
Expr::OuterReferenceColumn(_, _) => {
internal_err!("Create physical name does not support OuterReferenceColumn")
}
/// The name of the column (field) that this `Expr` will produce in the physical plan.
/// The difference from [Expr::schema_name] is that top-level columns are unqualified.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better if we can provide example like schema_name to show difference.

pub fn physical_name(expr: &Expr) -> Result<String> {
if let Expr::Column(col) = expr {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😍

Ok(col.name.clone())
} else {
Ok(expr.schema_name().to_string())
}
}

Expand All @@ -2658,6 +2408,7 @@ mod test {
use std::any::Any;

#[test]
#[allow(deprecated)]
fn format_case_when() -> Result<()> {
let expr = case(col("a"))
.when(lit(1), lit(true))
Expand All @@ -2670,6 +2421,7 @@ mod test {
}

#[test]
#[allow(deprecated)]
fn format_cast() -> Result<()> {
let expr = Expr::Cast(Cast {
expr: Box::new(Expr::Literal(ScalarValue::Float32(Some(1.23)))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::{internal_err, not_impl_err, Result};
use datafusion_expr::expr::create_function_physical_name;
use datafusion_expr::AggregateUDF;
use datafusion_expr::ReversedUDAF;
use datafusion_expr_common::accumulator::Accumulator;
Expand Down Expand Up @@ -110,8 +109,7 @@ impl AggregateExprBuilder {

let data_type = fun.return_type(&input_exprs_types)?;
let name = match alias {
// TODO: Ideally, we should build the name from physical expressions
None => create_function_physical_name(fun.name(), is_distinct, &[], None)?,
None => return internal_err!("alias should be provided"),
Some(alias) => alias,
};

Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2179,6 +2179,7 @@ mod tests {
.map(|order_by_expr| {
let ordering_req = order_by_expr.unwrap_or_default();
AggregateExprBuilder::new(array_agg_udaf(), vec![Arc::clone(col_a)])
.alias("a")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this change needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the issue, we are not able to get the correct name from the args, so alias is a workaround solution

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need it? Is there a situation where we would not provide the alias to the physical expression?
I made this change because the alias generated previously was incorrect (it didn't use the arguments).

.order_by(ordering_req.to_vec())
.schema(Arc::clone(&test_schema))
.build()
Expand Down