-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-51290][SQL] Enable filling default values in DSv2 writes #50044
base: master
Are you sure you want to change the base?
Conversation
@@ -3534,7 +3534,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor | |||
TableOutputResolver.suitableForByNameCheck(v2Write.isByName, | |||
expected = v2Write.table.output, queryOutput = v2Write.query.output) | |||
val projection = TableOutputResolver.resolveOutputColumns( | |||
v2Write.table.name, v2Write.table.output, v2Write.query, v2Write.isByName, conf) | |||
v2Write.table.name, v2Write.table.output, v2Write.query, v2Write.isByName, conf, | |||
supportColDefaultValue = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is value in validating if the catalog defines SUPPORT_COLUMN_DEFAULT_VALUE
in capabilities during writes. If a connector includes default value metadata in its schema, it should be enough to fill default values. The flag exists for ALTER and CREATE/REPLACE statements.
@@ -718,6 +724,11 @@ private class BufferedRowsReader( | |||
schema: StructType, | |||
row: InternalRow): Any = { | |||
val index = schema.fieldIndex(field.name) | |||
|
|||
if (index >= row.numFields) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed for support for adding columns with default values to the end.
@@ -423,8 +423,8 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { | |||
assertNotResolved(parsedPlan) | |||
assertAnalysisErrorCondition( | |||
parsedPlan, | |||
expectedErrorCondition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", | |||
expectedMessageParameters = Map("tableName" -> "`table-name`", "colName" -> "`x`") | |||
expectedErrorCondition = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_COLUMNS", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because of spark.sql.defaultColumn.useNullsForMissingDefaultValues
and is aligned with V1 writes.
What changes were proposed in this pull request?
This PR enables filling default values in DSv2 writes.
Why are the changes needed?
These changes are needed for proper support of default values for DSv2 connectors.
Does this PR introduce any user-facing change?
Users will be able to omit columns with default values. There is no impact to existing jobs.
How was this patch tested?
This patch comes with tests.
Was this patch authored or co-authored using generative AI tooling?
No.