Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-11822: [Rust][Datafusion] Support case sensitive for function #9600

Closed
wants to merge 44 commits into from
Closed
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
e45e355
Merge pull request #9 from apache/master
wqc200 Feb 27, 2021
fadef3f
Make some functions public
wqc200 Feb 27, 2021
97fb5cd
Support case sensitive
wqc200 Feb 28, 2021
122ab74
remove to_string()
wqc200 Feb 28, 2021
6793160
remove unused variable fun
wqc200 Feb 28, 2021
33a1a9e
equality checks against false can be replaced by a negation
wqc200 Feb 28, 2021
33a1b95
format code
wqc200 Feb 28, 2021
ea8ca2a
format code
wqc200 Feb 28, 2021
a2e5eac
format code
wqc200 Feb 28, 2021
f897970
fix test
wqc200 Feb 28, 2021
b135fe8
Update rust/datafusion/src/sql/planner.rs
wqc200 Mar 2, 2021
2092439
Update rust/datafusion/src/execution/context.rs
wqc200 Mar 2, 2021
fb1a41d
Update rust/datafusion/src/execution/context.rs
wqc200 Mar 2, 2021
474cb61
remove get_config to config
wqc200 Mar 2, 2021
b2e49cf
Merge branch 'master' of https://github.com/wqc200/arrow
wqc200 Mar 2, 2021
af1568f
fix bug about input_name
wqc200 Mar 2, 2021
8fa5b29
make config with arc
wqc200 Mar 3, 2021
8902378
add arc with config
wqc200 Mar 4, 2021
6578bfb
add arc with config
wqc200 Mar 6, 2021
7fe6ae8
add arc with config
wqc200 Mar 6, 2021
72fb896
add arc with config
wqc200 Mar 6, 2021
edfcdb7
Merge pull request #10 from apache/master
wqc200 Mar 6, 2021
bb10ef3
rollback for arc
wqc200 Mar 6, 2021
2025c02
rollback for arc
wqc200 Mar 6, 2021
09926df
Merge pull request #11 from apache/master
wqc200 Mar 8, 2021
49c1029
The style of case display
wqc200 Mar 9, 2021
942cad6
add documentation for case style enum
wqc200 Mar 9, 2021
fc88704
format code
wqc200 Mar 9, 2021
6cd8519
Merge pull request #12 from apache/master
wqc200 Mar 10, 2021
edff233
Merge pull request #13 from apache/master
wqc200 Mar 13, 2021
dba97ca
Resolve some conversation
wqc200 Mar 13, 2021
2bc66d6
Set the input name to uppercase
wqc200 Mar 13, 2021
8c8fa50
Set the input name to uppercase
wqc200 Mar 13, 2021
ca63ae9
Set the input name to uppercase
wqc200 Mar 13, 2021
f09f10d
Merge pull request #14 from apache/master
wqc200 Mar 20, 2021
acf2c0d
Merge pull request #15 from apache/master
wqc200 Mar 22, 2021
53d3492
merge from the official branch
wqc200 Mar 25, 2021
910aba4
Merge branch 'apache-master'
wqc200 Mar 25, 2021
93f57de
Merge pull request #17 from apache/master
wqc200 Mar 26, 2021
4624955
merge from offical branch
wqc200 Mar 31, 2021
cd336f1
Merge branch 'apache-master'
wqc200 Mar 31, 2021
c8608dd
Merge pull request #19 from apache/master
wqc200 Apr 4, 2021
167e501
Merge branch 'master' into master
wqc200 Apr 6, 2021
c12c243
Merge pull request #20 from apache/master
wqc200 Apr 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,13 +498,34 @@ impl QueryPlanner for DefaultQueryPlanner {
}
}

/// The style of case display
#[derive(Clone, PartialEq)]
pub enum CaseStyle {
/// function names in the output are displayed as they were provided:
/// 'mD5("a")' will appear in the results as `mD5("a")`. This
/// mimics MySQL behavior
PreserveCase,
/// function names in the output are displayed in lower case:
/// `mD5("a")` will appear in the results as `md5("a")`This
/// mimics PostgreSQL behavior
LikePostgreSQL,
}

/// Configuration options for execution context
#[derive(Clone)]
pub struct ExecutionConfig {
/// Number of concurrent threads for query execution.
pub concurrency: usize,
/// Default batch size when reading data sources
pub batch_size: usize,
/// Whether to use case-sensitive matching for function names.
/// If `false` both `"SELECT COUNT(*) FROM t;` and "`SELECT count(*) FROM t;`
/// can be used to compute the `COUNT` aggregate. If `true` then only
/// `"SELECT count(*) FROM t"` can be used.
/// Defaults to `true`
pub case_sensitive: bool,
/// Default to like PostgreSQL
pub case_style: CaseStyle,
/// Responsible for optimizing a logical plan
optimizers: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
/// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
Expand All @@ -517,6 +538,8 @@ impl ExecutionConfig {
Self {
concurrency: num_cpus::get(),
batch_size: 32768,
case_sensitive: true,
case_style: CaseStyle::LikePostgreSQL,
optimizers: vec![
Arc::new(ConstantFolding::new()),
Arc::new(ProjectionPushDown::new()),
Expand Down Expand Up @@ -544,6 +567,18 @@ impl ExecutionConfig {
self
}

/// Customize case sensitive
pub fn with_case_sensitive(mut self, cs: bool) -> Self {
self.case_sensitive = cs;
self
}

/// Customize case style
pub fn with_case_style(mut self, cs: CaseStyle) -> Self {
self.case_style = cs;
self
}

/// Replace the default query planner
pub fn with_query_planner(
mut self,
Expand Down Expand Up @@ -593,6 +628,10 @@ impl ContextProvider for ExecutionContextState {
fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
self.aggregate_functions.get(name).cloned()
}

fn get_config(&self) -> ExecutionConfig {
self.config.clone()
}
}

impl FunctionRegistry for ExecutionContextState {
Expand Down
66 changes: 51 additions & 15 deletions rust/datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,20 +160,26 @@ pub enum Expr {
},
/// Represents the call of a built-in scalar function with a set of arguments.
ScalarFunction {
/// The input name of the function
input_name: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I looked through the diff and I couldn't figure out what input_name is actually used for. Is this new field needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mainly used to display the name of the function entered by the user.
md5 (Utf8("a")) instead of mD5(Utf8("a")) if there is no input_name.

For example:

mysql> select mD5('a');
+----------------------------------+
| mD5(Utf8("a")) |
+----------------------------------+
| 0cc175b9c0f1b6a831c399e269772661 |
+----------------------------------+
1 row in set (0.01 sec)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW here is what postgres does for the same query (uses the lowercase, canonical function name)

alamb=# select mD5('a');
               md5                
----------------------------------
 0cc175b9c0f1b6a831c399e269772661
(1 row)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, i fix it, use the lowercase in postgress. I add a enum with two feild, one is LikeMySQL, another is LikePostgreSQL.

select mD5('a');
md5

/// The function
fun: functions::BuiltinScalarFunction,
/// List of expressions to feed to the functions as arguments
args: Vec<Expr>,
},
/// Represents the call of a user-defined scalar function with arguments.
ScalarUDF {
/// The input name of the function
input_name: String,
/// The function
fun: Arc<ScalarUDF>,
/// List of expressions to feed to the functions as arguments
args: Vec<Expr>,
},
/// Represents the call of an aggregate built-in function with arguments.
AggregateFunction {
/// The input name of the function
input_name: String,
/// Name of the function
fun: aggregates::AggregateFunction,
/// List of expressions to feed to the functions as arguments
Expand All @@ -183,6 +189,8 @@ pub enum Expr {
},
/// aggregate function
AggregateUDF {
/// The input name of the function
input_name: String,
/// The function
fun: Arc<AggregateUDF>,
/// List of expressions to feed to the functions as arguments
Expand Down Expand Up @@ -220,14 +228,14 @@ impl Expr {
Expr::Literal(l) => Ok(l.get_datatype()),
Expr::Case { when_then_expr, .. } => when_then_expr[0].1.get_type(schema),
Expr::Cast { data_type, .. } => Ok(data_type.clone()),
Expr::ScalarUDF { fun, args } => {
Expr::ScalarUDF { fun, args, .. } => {
let data_types = args
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
Ok((fun.return_type)(&data_types)?.as_ref().clone())
}
Expr::ScalarFunction { fun, args } => {
Expr::ScalarFunction { fun, args, .. } => {
let data_types = args
.iter()
.map(|e| e.get_type(schema))
Expand Down Expand Up @@ -680,24 +688,42 @@ impl Expr {
asc,
nulls_first,
},
Expr::ScalarFunction { args, fun } => Expr::ScalarFunction {
Expr::ScalarFunction {
input_name,
args,
fun,
} => Expr::ScalarFunction {
input_name,
args: rewrite_vec(args, rewriter)?,
fun,
},
Expr::ScalarUDF { args, fun } => Expr::ScalarUDF {
Expr::ScalarUDF {
input_name,
args,
fun,
..
} => Expr::ScalarUDF {
input_name,
args: rewrite_vec(args, rewriter)?,
fun,
},
Expr::AggregateFunction {
input_name,
args,
fun,
distinct,
} => Expr::AggregateFunction {
input_name,
args: rewrite_vec(args, rewriter)?,
fun,
distinct,
},
Expr::AggregateUDF { args, fun } => Expr::AggregateUDF {
Expr::AggregateUDF {
input_name,
args,
fun,
} => Expr::AggregateUDF {
input_name,
args: rewrite_vec(args, rewriter)?,
fun,
},
Expand Down Expand Up @@ -928,6 +954,7 @@ pub fn col(name: &str) -> Expr {
/// Create an expression to represent the min() aggregate function
pub fn min(expr: Expr) -> Expr {
Expr::AggregateFunction {
input_name: aggregates::AggregateFunction::Min.to_string(),
fun: aggregates::AggregateFunction::Min,
distinct: false,
args: vec![expr],
Expand All @@ -937,6 +964,7 @@ pub fn min(expr: Expr) -> Expr {
/// Create an expression to represent the max() aggregate function
pub fn max(expr: Expr) -> Expr {
Expr::AggregateFunction {
input_name: aggregates::AggregateFunction::Max.to_string(),
fun: aggregates::AggregateFunction::Max,
distinct: false,
args: vec![expr],
Expand All @@ -946,6 +974,7 @@ pub fn max(expr: Expr) -> Expr {
/// Create an expression to represent the sum() aggregate function
pub fn sum(expr: Expr) -> Expr {
Expr::AggregateFunction {
input_name: aggregates::AggregateFunction::Sum.to_string(),
fun: aggregates::AggregateFunction::Sum,
distinct: false,
args: vec![expr],
Expand All @@ -955,6 +984,7 @@ pub fn sum(expr: Expr) -> Expr {
/// Create an expression to represent the avg() aggregate function
pub fn avg(expr: Expr) -> Expr {
Expr::AggregateFunction {
input_name: aggregates::AggregateFunction::Avg.to_string(),
fun: aggregates::AggregateFunction::Avg,
distinct: false,
args: vec![expr],
Expand All @@ -964,6 +994,7 @@ pub fn avg(expr: Expr) -> Expr {
/// Create an expression to represent the count() aggregate function
pub fn count(expr: Expr) -> Expr {
Expr::AggregateFunction {
input_name: aggregates::AggregateFunction::Count.to_string(),
fun: aggregates::AggregateFunction::Count,
distinct: false,
args: vec![expr],
Expand All @@ -973,6 +1004,7 @@ pub fn count(expr: Expr) -> Expr {
/// Create an expression to represent the count(distinct) aggregate function
pub fn count_distinct(expr: Expr) -> Expr {
Expr::AggregateFunction {
input_name: aggregates::AggregateFunction::Count.to_string(),
fun: aggregates::AggregateFunction::Count,
distinct: true,
args: vec![expr],
Expand Down Expand Up @@ -1046,6 +1078,7 @@ macro_rules! unary_scalar_expr {
#[allow(missing_docs)]
pub fn $FUNC(e: Expr) -> Expr {
Expr::ScalarFunction {
input_name: functions::BuiltinScalarFunction::$ENUM.to_string(),
fun: functions::BuiltinScalarFunction::$ENUM,
args: vec![e],
}
Expand Down Expand Up @@ -1113,6 +1146,7 @@ unary_scalar_expr!(Upper, upper);
/// returns an array of fixed size with each argument on it.
pub fn array(args: Vec<Expr>) -> Expr {
Expr::ScalarFunction {
input_name: functions::BuiltinScalarFunction::Array.to_string(),
fun: functions::BuiltinScalarFunction::Array,
args,
}
Expand Down Expand Up @@ -1330,24 +1364,26 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
let expr = create_name(expr, input_schema)?;
Ok(format!("{} IS NOT NULL", expr))
}
Expr::ScalarFunction { fun, args, .. } => {
create_function_name(&fun.to_string(), false, args, input_schema)
}
Expr::ScalarUDF { fun, args, .. } => {
create_function_name(&fun.name, false, args, input_schema)
}
Expr::ScalarFunction {
input_name, args, ..
} => create_function_name(input_name, false, args, input_schema),
Expr::ScalarUDF {
input_name, args, ..
} => create_function_name(input_name, false, args, input_schema),
Expr::AggregateFunction {
fun,
input_name,
distinct,
args,
..
} => create_function_name(&fun.to_string(), *distinct, args, input_schema),
Expr::AggregateUDF { fun, args } => {
} => create_function_name(input_name, *distinct, args, input_schema),
Expr::AggregateUDF {
input_name, args, ..
} => {
let mut names = Vec::with_capacity(args.len());
for e in args {
names.push(create_name(e, input_schema)?);
}
Ok(format!("{}({})", fun.name, names.join(",")))
Ok(format!("{}({})", input_name, names.join(",")))
}
Expr::InList {
expr,
Expand Down
23 changes: 19 additions & 4 deletions rust/datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,20 +295,35 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
}),
Expr::IsNull(_) => Ok(Expr::IsNull(Box::new(expressions[0].clone()))),
Expr::IsNotNull(_) => Ok(Expr::IsNotNull(Box::new(expressions[0].clone()))),
Expr::ScalarFunction { fun, .. } => Ok(Expr::ScalarFunction {
Expr::ScalarFunction {
input_name, fun, ..
} => Ok(Expr::ScalarFunction {
input_name: input_name.to_string(),
fun: fun.clone(),
args: expressions.to_vec(),
}),
Expr::ScalarUDF { fun, .. } => Ok(Expr::ScalarUDF {
Expr::ScalarUDF {
input_name, fun, ..
} => Ok(Expr::ScalarUDF {
input_name: input_name.clone(),
fun: fun.clone(),
args: expressions.to_vec(),
}),
Expr::AggregateFunction { fun, distinct, .. } => Ok(Expr::AggregateFunction {
Expr::AggregateFunction {
input_name,
fun,
distinct,
..
} => Ok(Expr::AggregateFunction {
input_name: input_name.clone(),
fun: fun.clone(),
args: expressions.to_vec(),
distinct: *distinct,
}),
Expr::AggregateUDF { fun, .. } => Ok(Expr::AggregateUDF {
Expr::AggregateUDF {
input_name, fun, ..
} => Ok(Expr::AggregateUDF {
input_name: input_name.clone(),
fun: fun.clone(),
args: expressions.to_vec(),
}),
Expand Down
4 changes: 2 additions & 2 deletions rust/datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,14 +554,14 @@ impl DefaultPhysicalPlanner {
Expr::IsNotNull(expr) => expressions::is_not_null(
self.create_physical_expr(expr, input_schema, ctx_state)?,
),
Expr::ScalarFunction { fun, args } => {
Expr::ScalarFunction { fun, args, .. } => {
let physical_args = args
.iter()
.map(|e| self.create_physical_expr(e, input_schema, ctx_state))
.collect::<Result<Vec<_>>>()?;
functions::create_physical_expr(fun, &physical_args, input_schema)
}
Expr::ScalarUDF { fun, args } => {
Expr::ScalarUDF { fun, args, .. } => {
let mut physical_args = vec![];
for e in args {
physical_args.push(self.create_physical_expr(
Expand Down
1 change: 1 addition & 0 deletions rust/datafusion/src/physical_plan/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ impl AggregateUDF {
/// This utility allows using the UDAF without requiring access to the registry.
pub fn call(&self, args: Vec<Expr>) -> Expr {
Expr::AggregateUDF {
input_name: self.name.to_string(),
fun: Arc::new(self.clone()),
args,
}
Expand Down
1 change: 1 addition & 0 deletions rust/datafusion/src/physical_plan/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl ScalarUDF {
/// This utility allows using the UDF without requiring access to the registry.
pub fn call(&self, args: Vec<Expr>) -> Expr {
Expr::ScalarUDF {
input_name: self.name.to_string(),
fun: Arc::new(self.clone()),
args,
}
Expand Down
Loading