Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support IntegralDivide function #1428

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

wForget
Copy link
Member

@wForget wForget commented Feb 20, 2025

Which issue does this PR close?

Closes #1422.

Rationale for this change

Support IntegralDivide function

What changes are included in this PR?

Since datafusion div operator conforms to the logic of intergal div, we only need to convert IntegralDivide(...) to Cast(Divide(...), LongType) and then convert it to native.

How are these changes tested?

added unit test

@wForget wForget marked this pull request as draft February 20, 2025 06:32
@codecov-commenter
Copy link

codecov-commenter commented Feb 20, 2025

Codecov Report

Attention: Patch coverage is 57.69231% with 11 lines in your changes missing coverage. Please review.

Project coverage is 58.55%. Comparing base (f09f8af) to head (2f3d3a7).
Report is 53 commits behind head on main.

Files with missing lines Patch % Lines
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 57.69% 6 Missing and 5 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1428      +/-   ##
============================================
+ Coverage     56.12%   58.55%   +2.43%     
- Complexity      976     1015      +39     
============================================
  Files           119      122       +3     
  Lines         11743    12249     +506     
  Branches       2251     2304      +53     
============================================
+ Hits           6591     7173     +582     
+ Misses         4012     3919      -93     
- Partials       1140     1157      +17     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@wForget wForget marked this pull request as ready for review February 20, 2025 10:00
Copy link
Contributor

@kazuyukitanimura kazuyukitanimura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @wForget

@wForget
Copy link
Member Author

wForget commented Feb 26, 2025

Integral divide of decimal type has inconsistent behavior.

test:

  test("test integral divide") {
    withTable("t1", "t2") {
      if (isSpark34Plus) {
        // Decimal support requires Spark 3.4 or later
        sql("create table t2(c1 DECIMAL(38, 37), c2 DECIMAL(38, 37)) using parquet")
        sql("insert into t2 values(6.0096743305738933273387748827369321010, 6.0096763826458053191384497987259478584)")
        checkSparkAnswerAndOperator("select c1 / c2, cast((c1 / c2 ) as long), c1 div c2 from t2 order by c1")
      }
    }
  }

Results do not match:

== Results ==
!== Correct Answer - 1 ==                                                              == Spark Answer - 1 ==
 struct<(c1 / c2):decimal(38,6),CAST((c1 / c2) AS BIGINT):bigint,(c1 div c2):bigint>   struct<(c1 / c2):decimal(38,6),CAST((c1 / c2) AS BIGINT):bigint,(c1 div c2):bigint>
![1.000000,1,0]                                                                        [1.000000,1,1]

This is due to the rounding behavior of decimal_div:

let res = if div.is_negative() {
div - &five
} else {
div + &five
} / &ten;

Copy link
Contributor

@kazuyukitanimura kazuyukitanimura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -922,13 +956,18 @@ impl PhysicalPlanner {
Ok(DataType::Decimal128(_p2, _s2)),
) => {
let data_type = return_type.map(to_arrow_datatype).unwrap();
let func_name = if options.is_integral_div {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized, we maybe able to reuse the previous case-match instead of here.
We needed to treat decimal_div differently because we had to deal with rounding. However we just need to round down for IntegralDivide?
I.e. instead of 77digits for scale, we only need 76digits that fits into Decimal256

Will need a similar calculation to

                || (op == DataFusionOperator::Modulo
                    && max(s1, s2) as u8 + max(p1 - s1 as u8, p2 - s2 as u8)
                        > DECIMAL128_MAX_PRECISION) 

In this way, we do not need the decimal_div change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion, it makes sense to me. But I am getting an overflow error and I will continue debugging this tomorrow.

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for trying, if this is too much trouble, we can file an issue ticket and can be worked on separately.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be a bug in arrow, I have reported an issue: apache/arrow-rs#7216

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for trying, if this is too much trouble, we can file an issue ticket and can be worked on separately.

Could you please continue review this pr and let us keep changes to decimal_div ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, perhaps it is good to mention the ticket in a comment
Otherwise, my only comment is

Do we need to update https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/spark/sql/comet/DecimalPrecision.scala
?

Additionally https://github.com/apache/datafusion-comet/blob/main/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala#L1793 Decimal random number tests is another good test to extend

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed it before.

Do we need to update https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/spark/sql/comet/DecimalPrecision.scala
?

I guess it may not be necessary, IntegralDivide always returns a long type.

Additionally https://github.com/apache/datafusion-comet/blob/main/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala#L1793 Decimal random number tests is another good test to extend

I added div operator to this test case.

Copy link
Contributor

@kazuyukitanimura kazuyukitanimura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending CI
Thank you @wForget

@wForget
Copy link
Member Author

wForget commented Feb 28, 2025

Unrelated failure: org.apache.spark.sql.execution.ui.SQLAppStatusListenerMemoryLeakSuite in spark-sql-sql/core-1/ubuntu-24.04/spark-4.0.0-preview1/java-17

2025-02-28T09:02:37.8103153Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m- no memory leak *** FAILED *** (1 second, 674 milliseconds)�[0m�[0m
2025-02-28T09:02:37.8104534Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  statusStore.listener.get.noLiveData() was false (SQLAppStatusListenerSuite.scala:1123)�[0m�[0m
2025-02-28T09:02:37.8105808Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  org.scalatest.exceptions.TestFailedException:�[0m�[0m
2025-02-28T09:02:37.8106955Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)�[0m�[0m
2025-02-28T09:02:37.8108421Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)�[0m�[0m
2025-02-28T09:02:37.8109849Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)�[0m�[0m
2025-02-28T09:02:37.8114038Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)�[0m�[0m
2025-02-28T09:02:37.8116016Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.execution.ui.SQLAppStatusListenerMemoryLeakSuite.$anonfun$new$73(SQLAppStatusListenerSuite.scala:1123)�[0m�[0m
2025-02-28T09:02:37.8117758Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.util.package$.quietly(package.scala:42)�[0m�[0m
2025-02-28T09:02:37.8119470Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.execution.ui.SQLAppStatusListenerMemoryLeakSuite.$anonfun$new$72(SQLAppStatusListenerSuite.scala:1101)�[0m�[0m
2025-02-28T09:02:37.8121938Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.LocalSparkContext$.withSpark(LocalSparkContext.scala:65)�[0m�[0m
2025-02-28T09:02:37.8123770Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.execution.ui.SQLAppStatusListenerMemoryLeakSuite.$anonfun$new$71(SQLAppStatusListenerSuite.scala:1100)�[0m�[0m
2025-02-28T09:02:37.8125430Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)�[0m�[0m
2025-02-28T09:02:37.8126602Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)�[0m�[0m
2025-02-28T09:02:37.8127800Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)�[0m�[0m
2025-02-28T09:02:37.8129012Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)�[0m�[0m

......
2025-02-28T09:30:07.2309620Z �[0m[�[0m�[0minfo�[0m] �[0m�[0mScalaTest�[0m
2025-02-28T09:30:07.2310959Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[36mRun completed in 45 minutes, 48 seconds.�[0m�[0m
2025-02-28T09:30:07.2313089Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[36mTotal number of tests run: 10244�[0m�[0m
2025-02-28T09:30:07.2313952Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[36mSuites: completed 641, aborted 0�[0m�[0m
2025-02-28T09:30:07.2314989Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[36mTests: succeeded 10243, failed 1, canceled 45, ignored 295, pending 0�[0m�[0m
2025-02-28T09:30:07.2316047Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m*** 1 TEST FAILED ***�[0m�[0m
2025-02-28T09:30:07.2335333Z �[0m[�[0m�[31merror�[0m] �[0m�[0mFailed: Total 10426, Failed 1, Errors 0, Passed 10243, Skipped 182, Ignored 295, Canceled 45�[0m
2025-02-28T09:30:07.2395390Z �[0m[�[0m�[31merror�[0m] �[0m�[0mFailed tests:�[0m
2025-02-28T09:30:07.2396715Z �[0m[�[0m�[31merror�[0m] �[0m�[0m	org.apache.spark.sql.execution.ui.SQLAppStatusListenerMemoryLeakSuite�[0m
2025-02-28T09:30:08.3033860Z �[0m[�[0m�[31merror�[0m] �[0m�[0m(sql / Test / �[31mtestOnly�[0m) sbt.TestsFailedException: Tests unsuccessful�[0m
2025-02-28T09:30:08.3067181Z �[0m[�[0m�[31merror�[0m] �[0m�[0mTotal time: 3190 s (53:10), completed Feb 28, 2025, 9:30:08 AM�[0m

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support IntegralDivide function
3 participants