Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into adam/reduce-string-al…
Browse files Browse the repository at this point in the history
…locations-no-cow
  • Loading branch information
alamb committed May 11, 2024
2 parents 1ead25c + 1eff714 commit 63a1656
Show file tree
Hide file tree
Showing 34 changed files with 854 additions and 362 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Here are links to some important information
- [Rust Getting Started](https://datafusion.apache.org/user-guide/example-usage.html)
- [Rust DataFrame API](https://datafusion.apache.org/user-guide/dataframe.html)
- [Rust API docs](https://docs.rs/datafusion/latest/datafusion)
- [Rust Examples](https://github.com/apache/datafusion/tree/master/datafusion-examples)
- [Rust Examples](https://github.com/apache/datafusion/tree/main/datafusion-examples)
- [Python DataFrame API](https://arrow.apache.org/datafusion-python/)
- [Architecture](https://docs.rs/datafusion/latest/datafusion/index.html#architecture)

Expand Down
8 changes: 0 additions & 8 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,6 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
.collect::<Result<Vec<_>>>()?;
Ok(format!("{}({})", fun.name(), names.join(",")))
}
AggregateFunctionDefinition::Name(_) => {
internal_err!("Aggregate function `Expr` with name should be resolved.")
}
},
Expr::GroupingSet(grouping_set) => match grouping_set {
GroupingSet::Rollup(exprs) => Ok(format!(
Expand Down Expand Up @@ -1947,11 +1944,6 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
)?;
(agg_expr, filter, physical_sort_exprs)
}
AggregateFunctionDefinition::Name(_) => {
return internal_err!(
"Aggregate function name should have been resolved"
)
}
};
Ok((agg_expr, filter, order_by))
}
Expand Down
7 changes: 1 addition & 6 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,6 @@ pub enum AggregateFunctionDefinition {
BuiltIn(aggregate_function::AggregateFunction),
/// Resolved to a user defined aggregate function
UDF(Arc<crate::AggregateUDF>),
/// A aggregation function constructed with name. This variant can not be executed directly
/// and instead must be resolved to one of the other variants prior to physical planning.
Name(Arc<str>),
}

impl AggregateFunctionDefinition {
Expand All @@ -526,7 +523,6 @@ impl AggregateFunctionDefinition {
match self {
AggregateFunctionDefinition::BuiltIn(fun) => fun.name(),
AggregateFunctionDefinition::UDF(udf) => udf.name(),
AggregateFunctionDefinition::Name(func_name) => func_name.as_ref(),
}
}
}
Expand Down Expand Up @@ -1873,8 +1869,7 @@ fn write_name<W: Write>(w: &mut W, e: &Expr) -> Result<()> {
null_treatment,
}) => {
match func_def {
AggregateFunctionDefinition::BuiltIn(..)
| AggregateFunctionDefinition::Name(..) => {
AggregateFunctionDefinition::BuiltIn(..) => {
write_function_name(w, func_def.name(), *distinct, args)?;
}
AggregateFunctionDefinition::UDF(fun) => {
Expand Down
10 changes: 4 additions & 6 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::expr::{
};
use crate::field_util::GetFieldAccessSchema;
use crate::type_coercion::binary::get_result_type;
use crate::type_coercion::functions::data_types;
use crate::type_coercion::functions::data_types_with_scalar_udf;
use crate::{utils, LogicalPlan, Projection, Subquery};
use arrow::compute::can_cast_types;
use arrow::datatypes::{DataType, Field};
Expand Down Expand Up @@ -139,9 +139,10 @@ impl ExprSchemable for Expr {
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
// verify that function is invoked with correct number and type of arguments as defined in `TypeSignature`
data_types(&arg_data_types, func.signature()).map_err(|_| {
data_types_with_scalar_udf(&arg_data_types, func).map_err(|err| {
plan_datafusion_err!(
"{}",
"{} and {}",
err,
utils::generate_signature_error_msg(
func.name(),
func.signature().clone(),
Expand Down Expand Up @@ -173,9 +174,6 @@ impl ExprSchemable for Expr {
AggregateFunctionDefinition::UDF(fun) => {
Ok(fun.return_type(&data_types)?)
}
AggregateFunctionDefinition::Name(_) => {
internal_err!("Function `Expr` with name should be resolved.")
}
}
}
Expr::Not(_)
Expand Down
5 changes: 5 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,11 @@ impl LogicalPlan {
/// expressions. For example [`LogicalPlan::Filter`] schema is always the
/// same as its input schema.
///
/// This is useful after modifying a plans `Expr`s (or input plans) via
/// methods such as [Self::map_children] and [Self::map_expressions]. Unlike
/// [Self::with_new_exprs], this method does not require a new set of
/// expressions or inputs plans.
///
/// # Return value
/// Returns an error if there is some issue recomputing the schema.
///
Expand Down
23 changes: 10 additions & 13 deletions datafusion/expr/src/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,12 @@ pub enum TypeSignature {
/// # Examples
/// A function such as `concat` is `Variadic(vec![DataType::Utf8, DataType::LargeUtf8])`
Variadic(Vec<DataType>),
/// One or more arguments of an arbitrary but equal type.
/// DataFusion attempts to coerce all argument types to match the first argument's type
/// The acceptable signature and coercions rules to coerce arguments to this
/// signature are special for this function. If this signature is specified,
/// Datafusion will call [`ScalarUDFImpl::coerce_types`] to prepare argument types.
///
/// # Examples
/// Given types in signature should be coercible to the same final type.
/// A function such as `make_array` is `VariadicEqual`.
///
/// `make_array(i32, i64) -> make_array(i64, i64)`
VariadicEqual,
/// [`ScalarUDFImpl::coerce_types`]: crate::udf::ScalarUDFImpl::coerce_types
UserDefined,
/// One or more arguments with arbitrary types
VariadicAny,
/// Fixed number of arguments of an arbitrary but equal type out of a list of valid types.
Expand Down Expand Up @@ -190,8 +187,8 @@ impl TypeSignature {
.collect::<Vec<&str>>()
.join(", ")]
}
TypeSignature::VariadicEqual => {
vec!["CoercibleT, .., CoercibleT".to_string()]
TypeSignature::UserDefined => {
vec!["UserDefined".to_string()]
}
TypeSignature::VariadicAny => vec!["Any, .., Any".to_string()],
TypeSignature::OneOf(sigs) => {
Expand Down Expand Up @@ -255,10 +252,10 @@ impl Signature {
volatility,
}
}
/// An arbitrary number of arguments of the same type.
pub fn variadic_equal(volatility: Volatility) -> Self {
/// User-defined coercion rules for the function.
pub fn user_defined(volatility: Volatility) -> Self {
Self {
type_signature: TypeSignature::VariadicEqual,
type_signature: TypeSignature::UserDefined,
volatility,
}
}
Expand Down
5 changes: 1 addition & 4 deletions datafusion/expr/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{Expr, GetFieldAccess};
use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeIterator, TreeNodeRecursion,
};
use datafusion_common::{internal_err, map_until_stop_and_collect, Result};
use datafusion_common::{map_until_stop_and_collect, Result};

impl TreeNode for Expr {
fn apply_children<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
Expand Down Expand Up @@ -348,9 +348,6 @@ impl TreeNode for Expr {
null_treatment,
)))
}
AggregateFunctionDefinition::Name(_) => {
internal_err!("Function `Expr` with name should be resolved.")
}
},
)?,
Expr::GroupingSet(grouping_set) => match grouping_set {
Expand Down
176 changes: 150 additions & 26 deletions datafusion/expr/src/type_coercion/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,114 @@ use std::sync::Arc;
use crate::signature::{
ArrayFunctionSignature, FIXED_SIZE_LIST_WILDCARD, TIMEZONE_WILDCARD,
};
use crate::{Signature, TypeSignature};
use crate::{AggregateUDF, ScalarUDF, Signature, TypeSignature};
use arrow::{
compute::can_cast_types,
datatypes::{DataType, TimeUnit},
};
use datafusion_common::utils::{coerced_fixed_size_list_to_list, list_ndims};
use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result};
use datafusion_common::{
exec_err, internal_datafusion_err, internal_err, plan_err, Result,
};

use super::binary::{comparison_binary_numeric_coercion, comparison_coercion};

/// Performs type coercion for scalar function arguments.
///
/// Returns the data types to which each argument must be coerced to
/// match `signature`.
///
/// For more details on coercion in general, please see the
/// [`type_coercion`](crate::type_coercion) module.
pub fn data_types_with_scalar_udf(
current_types: &[DataType],
func: &ScalarUDF,
) -> Result<Vec<DataType>> {
let signature = func.signature();

if current_types.is_empty() {
if signature.type_signature.supports_zero_argument() {
return Ok(vec![]);
} else {
return plan_err!(
"[data_types_with_scalar_udf] signature {:?} does not support zero arguments.",
&signature.type_signature
);
}
}

let valid_types =
get_valid_types_with_scalar_udf(&signature.type_signature, current_types, func)?;

if valid_types
.iter()
.any(|data_type| data_type == current_types)
{
return Ok(current_types.to_vec());
}

// Try and coerce the argument types to match the signature, returning the
// coerced types from the first matching signature.
for valid_types in valid_types {
if let Some(types) = maybe_data_types(&valid_types, current_types) {
return Ok(types);
}
}

// none possible -> Error
plan_err!(
"[data_types_with_scalar_udf] Coercion from {:?} to the signature {:?} failed.",
current_types,
&signature.type_signature
)
}

pub fn data_types_with_aggregate_udf(
current_types: &[DataType],
func: &AggregateUDF,
) -> Result<Vec<DataType>> {
let signature = func.signature();

if current_types.is_empty() {
if signature.type_signature.supports_zero_argument() {
return Ok(vec![]);
} else {
return plan_err!(
"[data_types_with_aggregate_udf] Coercion from {:?} to the signature {:?} failed.",
current_types,
&signature.type_signature
);
}
}

let valid_types = get_valid_types_with_aggregate_udf(
&signature.type_signature,
current_types,
func,
)?;
if valid_types
.iter()
.any(|data_type| data_type == current_types)
{
return Ok(current_types.to_vec());
}

// Try and coerce the argument types to match the signature, returning the
// coerced types from the first matching signature.
for valid_types in valid_types {
if let Some(types) = maybe_data_types(&valid_types, current_types) {
return Ok(types);
}
}

// none possible -> Error
plan_err!(
"[data_types_with_aggregate_udf] Coercion from {:?} to the signature {:?} failed.",
current_types,
&signature.type_signature
)
}

/// Performs type coercion for function arguments.
///
/// Returns the data types to which each argument must be coerced to
Expand All @@ -46,7 +144,7 @@ pub fn data_types(
return Ok(vec![]);
} else {
return plan_err!(
"Coercion from {:?} to the signature {:?} failed.",
"[data_types] Coercion from {:?} to the signature {:?} failed.",
current_types,
&signature.type_signature
);
Expand All @@ -72,12 +170,56 @@ pub fn data_types(

// none possible -> Error
plan_err!(
"Coercion from {:?} to the signature {:?} failed.",
"[data_types] Coercion from {:?} to the signature {:?} failed.",
current_types,
&signature.type_signature
)
}

fn get_valid_types_with_scalar_udf(
signature: &TypeSignature,
current_types: &[DataType],
func: &ScalarUDF,
) -> Result<Vec<Vec<DataType>>> {
let valid_types = match signature {
TypeSignature::UserDefined => match func.coerce_types(current_types) {
Ok(coerced_types) => vec![coerced_types],
Err(e) => return exec_err!("User-defined coercion failed with {:?}", e),
},
TypeSignature::OneOf(signatures) => signatures
.iter()
.filter_map(|t| get_valid_types_with_scalar_udf(t, current_types, func).ok())
.flatten()
.collect::<Vec<_>>(),
_ => get_valid_types(signature, current_types)?,
};

Ok(valid_types)
}

fn get_valid_types_with_aggregate_udf(
signature: &TypeSignature,
current_types: &[DataType],
func: &AggregateUDF,
) -> Result<Vec<Vec<DataType>>> {
let valid_types = match signature {
TypeSignature::UserDefined => match func.coerce_types(current_types) {
Ok(coerced_types) => vec![coerced_types],
Err(e) => return exec_err!("User-defined coercion failed with {:?}", e),
},
TypeSignature::OneOf(signatures) => signatures
.iter()
.filter_map(|t| {
get_valid_types_with_aggregate_udf(t, current_types, func).ok()
})
.flatten()
.collect::<Vec<_>>(),
_ => get_valid_types(signature, current_types)?,
};

Ok(valid_types)
}

/// Returns a Vec of all possible valid argument types for the given signature.
fn get_valid_types(
signature: &TypeSignature,
Expand Down Expand Up @@ -184,32 +326,14 @@ fn get_valid_types(
.iter()
.map(|valid_type| (0..*number).map(|_| valid_type.clone()).collect())
.collect(),
TypeSignature::VariadicEqual => {
let new_type = current_types.iter().skip(1).try_fold(
current_types.first().unwrap().clone(),
|acc, x| {
// The coerced types found by `comparison_coercion` are not guaranteed to be
// coercible for the arguments. `comparison_coercion` returns more loose
// types that can be coerced to both `acc` and `x` for comparison purpose.
// See `maybe_data_types` for the actual coercion.
let coerced_type = comparison_coercion(&acc, x);
if let Some(coerced_type) = coerced_type {
Ok(coerced_type)
} else {
internal_err!("Coercion from {acc:?} to {x:?} failed.")
}
},
);

match new_type {
Ok(new_type) => vec![vec![new_type; current_types.len()]],
Err(e) => return Err(e),
}
TypeSignature::UserDefined => {
return internal_err!(
"User-defined signature should be handled by function-specific coerce_types."
)
}
TypeSignature::VariadicAny => {
vec![current_types.to_vec()]
}

TypeSignature::Exact(valid_types) => vec![valid_types.clone()],
TypeSignature::ArraySignature(ref function_signature) => match function_signature
{
Expand Down
4 changes: 4 additions & 0 deletions datafusion/expr/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ impl AggregateUDF {
pub fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
self.inner.create_groups_accumulator()
}

pub fn coerce_types(&self, _args: &[DataType]) -> Result<Vec<DataType>> {
not_impl_err!("coerce_types not implemented for {:?} yet", self.name())
}
}

impl<F> From<F> for AggregateUDF
Expand Down
Loading

0 comments on commit 63a1656

Please sign in to comment.