Skip to content

Commit

Permalink
Minor: Improve documentation for AggregateUDFImpl::accumulator and `A…
Browse files Browse the repository at this point in the history
…ccumulatorArgs` (#9920)

* Minor: Improve documentation for AggregateUDFImpl::accumulator and `AccumulatorArgs`

* Add test and helper functions

* Improve docs and examples

* Fix CI

* Remove checks for ORDER BY and IGNORE NULLS
  • Loading branch information
alamb authored Apr 5, 2024
1 parent 308ebc5 commit 2dad904
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 12 deletions.
3 changes: 0 additions & 3 deletions datafusion/core/tests/user_defined/user_defined_aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,6 @@ impl Accumulator for TimeSum {
let arr = arr.as_primitive::<TimestampNanosecondType>();

for v in arr.values().iter() {
println!("Adding {v}");
self.sum += v;
}
Ok(())
Expand All @@ -538,7 +537,6 @@ impl Accumulator for TimeSum {
}

fn evaluate(&mut self) -> Result<ScalarValue> {
println!("Evaluating to {}", self.sum);
Ok(ScalarValue::TimestampNanosecond(Some(self.sum), None))
}

Expand All @@ -558,7 +556,6 @@ impl Accumulator for TimeSum {
let arr = arr.as_primitive::<TimestampNanosecondType>();

for v in arr.values().iter() {
println!("Retracting {v}");
self.sum -= v;
}
Ok(())
Expand Down
29 changes: 22 additions & 7 deletions datafusion/expr/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,33 @@ pub type ScalarFunctionImplementation =
pub type ReturnTypeFunction =
Arc<dyn Fn(&[DataType]) -> Result<Arc<DataType>> + Send + Sync>;

/// Arguments passed to create an accumulator
/// [`AccumulatorArgs`] contains information about how an aggregate
/// function was called, including the types of its arguments and any optional
/// ordering expressions.
pub struct AccumulatorArgs<'a> {
// default arguments
/// the return type of the function
/// The return type of the aggregate function.
pub data_type: &'a DataType,
/// the schema of the input arguments
/// The schema of the input arguments
pub schema: &'a Schema,
/// whether to ignore nulls
/// Whether to ignore nulls.
///
/// SQL allows the user to specify `IGNORE NULLS`, for example:
///
/// ```sql
/// SELECT FIRST_VALUE(column1) IGNORE NULLS FROM t;
/// ```
pub ignore_nulls: bool,

// ordering arguments
/// the expressions of `order by`, if no ordering is required, this will be an empty slice
/// The expressions in the `ORDER BY` clause passed to this aggregator.
///
/// SQL allows the user to specify the ordering of arguments to the
/// aggregate using an `ORDER BY`. For example:
///
/// ```sql
/// SELECT FIRST_VALUE(column1 ORDER BY column2) FROM t;
/// ```
///
/// If no `ORDER BY` is specified, `sort_exprs`` will be empty.
pub sort_exprs: &'a [Expr],
}

Expand Down
12 changes: 10 additions & 2 deletions datafusion/expr/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ where
/// See [`advanced_udaf.rs`] for a full example with complete implementation and
/// [`AggregateUDF`] for other available options.
///
///
/// [`advanced_udaf.rs`]: https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/advanced_udaf.rs
///
/// # Basic Example
/// ```
/// # use std::any::Any;
Expand Down Expand Up @@ -282,7 +282,8 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
/// Return a new [`Accumulator`] that aggregates values for a specific
/// group during query execution.
///
/// `acc_args`: the arguments to the accumulator. See [`AccumulatorArgs`] for more details.
/// acc_args: [`AccumulatorArgs`] contains information about how the
/// aggregate function was called.
fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>>;

/// Return the fields used to store the intermediate state of this accumulator.
Expand Down Expand Up @@ -325,6 +326,13 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
/// If the aggregate expression has a specialized
/// [`GroupsAccumulator`] implementation. If this returns true,
/// `[Self::create_groups_accumulator]` will be called.
///
/// # Notes
///
/// Even if this function returns true, DataFusion will still use
/// `Self::accumulator` for certain queries, such as when this aggregate is
/// used as a window function or when there no GROUP BY columns in the
/// query.
fn groups_accumulator_supported(&self) -> bool {
false
}
Expand Down

0 comments on commit 2dad904

Please sign in to comment.