diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml
index 35de0b9087ed..fd3cc67976a1 100644
--- a/.github/workflows/bot.yml
+++ b/.github/workflows/bot.yml
@@ -119,7 +119,7 @@ jobs:
include:
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.2"
- flinkProfile: "flink1.17"
+ flinkProfile: "flink1.18"
steps:
- uses: actions/checkout@v3
@@ -210,6 +210,7 @@ jobs:
- flinkProfile: "flink1.15"
- flinkProfile: "flink1.16"
- flinkProfile: "flink1.17"
+ - flinkProfile: "flink1.18"
steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
@@ -234,7 +235,7 @@ jobs:
env:
SCALA_PROFILE: 'scala-2.12'
FLINK_PROFILE: ${{ matrix.flinkProfile }}
- if: ${{ endsWith(env.FLINK_PROFILE, '1.17') }}
+ if: ${{ endsWith(env.FLINK_PROFILE, '1.18') }}
run: |
mvn clean install -Pintegration-tests -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -pl hudi-flink-datasource/hudi-flink -am -Davro.version=1.10.0 -DskipTests=true $MVN_ARGS
mvn verify -Pintegration-tests -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -pl hudi-flink-datasource/hudi-flink $MVN_ARGS
@@ -244,7 +245,7 @@ jobs:
strategy:
matrix:
include:
- - flinkProfile: 'flink1.17'
+ - flinkProfile: 'flink1.18'
sparkProfile: 'spark3.4'
sparkRuntime: 'spark3.4.0'
@@ -272,9 +273,12 @@ jobs:
strategy:
matrix:
include:
- - flinkProfile: 'flink1.17'
+ - flinkProfile: 'flink1.18'
sparkProfile: 'spark3.4'
sparkRuntime: 'spark3.4.0'
+ - flinkProfile: 'flink1.18'
+ sparkProfile: 'spark3.3'
+ sparkRuntime: 'spark3.3.2'
- flinkProfile: 'flink1.17'
sparkProfile: 'spark3.3'
sparkRuntime: 'spark3.3.2'
diff --git a/README.md b/README.md
index ff2b95ec5473..20016f689ad3 100644
--- a/README.md
+++ b/README.md
@@ -118,14 +118,15 @@ Starting from versions 0.11, Hudi no longer requires `spark-avro` to be specifie
### Build with different Flink versions
-The default Flink version supported is 1.17. The default Flink 1.17.x version, corresponding to `flink1.17` profile is 1.17.0.
+The default Flink version supported is 1.18. The default Flink 1.18.x version, corresponding to `flink1.18` profile is 1.18.0.
Flink is Scala-free since 1.15.x, there is no need to specify the Scala version for Flink 1.15.x and above versions.
Refer to the table below for building with different Flink and Scala versions.
| Maven build options | Expected Flink bundle jar name | Notes |
|:---------------------------|:-------------------------------|:---------------------------------|
-| (empty) | hudi-flink1.17-bundle | For Flink 1.17 (default options) |
-| `-Dflink1.17` | hudi-flink1.17-bundle | For Flink 1.17 (same as default) |
+| (empty) | hudi-flink1.18-bundle | For Flink 1.18 (default options) |
+| `-Dflink1.18` | hudi-flink1.18-bundle | For Flink 1.18 (same as default) |
+| `-Dflink1.17` | hudi-flink1.17-bundle | For Flink 1.17 |
| `-Dflink1.16` | hudi-flink1.16-bundle | For Flink 1.16 |
| `-Dflink1.15` | hudi-flink1.15-bundle | For Flink 1.15 |
| `-Dflink1.14` | hudi-flink1.14-bundle | For Flink 1.14 and Scala 2.12 |
diff --git a/azure-pipelines-20230430.yml b/azure-pipelines-20230430.yml
index ee5c016693a5..85d185fbc2c5 100644
--- a/azure-pipelines-20230430.yml
+++ b/azure-pipelines-20230430.yml
@@ -14,7 +14,7 @@
# limitations under the License.
# NOTE:
-# This config file defines how Azure CI runs tests with Spark 2.4 and Flink 1.17 profiles.
+# This config file defines how Azure CI runs tests with Spark 2.4 and Flink 1.18 profiles.
# PRs will need to keep in sync with master's version to trigger the CI runs.
trigger:
@@ -37,6 +37,7 @@ parameters:
- 'hudi-flink-datasource/hudi-flink1.15.x'
- 'hudi-flink-datasource/hudi-flink1.16.x'
- 'hudi-flink-datasource/hudi-flink1.17.x'
+ - 'hudi-flink-datasource/hudi-flink1.18.x'
- name: job2Modules
type: object
default:
@@ -69,6 +70,7 @@ parameters:
- '!hudi-flink-datasource/hudi-flink1.15.x'
- '!hudi-flink-datasource/hudi-flink1.16.x'
- '!hudi-flink-datasource/hudi-flink1.17.x'
+ - '!hudi-flink-datasource/hudi-flink1.18.x'
- '!hudi-spark-datasource'
- '!hudi-spark-datasource/hudi-spark'
- '!hudi-spark-datasource/hudi-spark3.2.x'
@@ -92,9 +94,10 @@ parameters:
- '!hudi-flink-datasource/hudi-flink1.15.x'
- '!hudi-flink-datasource/hudi-flink1.16.x'
- '!hudi-flink-datasource/hudi-flink1.17.x'
+ - '!hudi-flink-datasource/hudi-flink1.18.x'
variables:
- BUILD_PROFILES: '-Dscala-2.12 -Dspark3.2 -Dflink1.17'
+ BUILD_PROFILES: '-Dscala-2.12 -Dspark3.2 -Dflink1.18'
PLUGIN_OPTS: '-Dcheckstyle.skip=true -Drat.skip=true -Djacoco.skip=true -ntp -B -V -Pwarn-log -Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.shade=warn -Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.dependency=warn'
MVN_OPTS_INSTALL: '-Phudi-platform-service -DskipTests $(BUILD_PROFILES) $(PLUGIN_OPTS) -Dmaven.wagon.httpconnectionManager.ttlSeconds=25 -Dmaven.wagon.http.retryHandler.count=5'
MVN_OPTS_TEST: '-fae -Pwarn-log $(BUILD_PROFILES) $(PLUGIN_OPTS)'
diff --git a/docker/hoodie/hadoop/base/pom.xml b/docker/hoodie/hadoop/base/pom.xml
index 8aaa7ba371e6..9695f427bda3 100644
--- a/docker/hoodie/hadoop/base/pom.xml
+++ b/docker/hoodie/hadoop/base/pom.xml
@@ -19,7 +19,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/base_java11/pom.xml b/docker/hoodie/hadoop/base_java11/pom.xml
index ac6e72c72aeb..71d0b8350d69 100644
--- a/docker/hoodie/hadoop/base_java11/pom.xml
+++ b/docker/hoodie/hadoop/base_java11/pom.xml
@@ -20,7 +20,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/datanode/pom.xml b/docker/hoodie/hadoop/datanode/pom.xml
index 52b1d050993a..5e33a75e9658 100644
--- a/docker/hoodie/hadoop/datanode/pom.xml
+++ b/docker/hoodie/hadoop/datanode/pom.xml
@@ -19,7 +19,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/historyserver/pom.xml b/docker/hoodie/hadoop/historyserver/pom.xml
index 9ee4d731aa5c..2ee437944b06 100644
--- a/docker/hoodie/hadoop/historyserver/pom.xml
+++ b/docker/hoodie/hadoop/historyserver/pom.xml
@@ -19,7 +19,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/hive_base/pom.xml b/docker/hoodie/hadoop/hive_base/pom.xml
index eb5045d65a13..a99220464484 100644
--- a/docker/hoodie/hadoop/hive_base/pom.xml
+++ b/docker/hoodie/hadoop/hive_base/pom.xml
@@ -19,7 +19,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/namenode/pom.xml b/docker/hoodie/hadoop/namenode/pom.xml
index 292a3409d626..aae8059cd11a 100644
--- a/docker/hoodie/hadoop/namenode/pom.xml
+++ b/docker/hoodie/hadoop/namenode/pom.xml
@@ -19,7 +19,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/pom.xml b/docker/hoodie/hadoop/pom.xml
index e47d4ec5ecae..eeaaba75b8b1 100644
--- a/docker/hoodie/hadoop/pom.xml
+++ b/docker/hoodie/hadoop/pom.xml
@@ -19,7 +19,7 @@
hudiorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT../../../pom.xml4.0.0
diff --git a/docker/hoodie/hadoop/prestobase/pom.xml b/docker/hoodie/hadoop/prestobase/pom.xml
index 2d399195c7bb..68c9f98f5b60 100644
--- a/docker/hoodie/hadoop/prestobase/pom.xml
+++ b/docker/hoodie/hadoop/prestobase/pom.xml
@@ -20,7 +20,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/spark_base/pom.xml b/docker/hoodie/hadoop/spark_base/pom.xml
index 5514aaa8fd47..07118f1460ba 100644
--- a/docker/hoodie/hadoop/spark_base/pom.xml
+++ b/docker/hoodie/hadoop/spark_base/pom.xml
@@ -19,7 +19,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/sparkadhoc/pom.xml b/docker/hoodie/hadoop/sparkadhoc/pom.xml
index 976384c66bd0..aef228bab84a 100644
--- a/docker/hoodie/hadoop/sparkadhoc/pom.xml
+++ b/docker/hoodie/hadoop/sparkadhoc/pom.xml
@@ -19,7 +19,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/sparkmaster/pom.xml b/docker/hoodie/hadoop/sparkmaster/pom.xml
index 16ff5629c7ce..35c8fef9cec1 100644
--- a/docker/hoodie/hadoop/sparkmaster/pom.xml
+++ b/docker/hoodie/hadoop/sparkmaster/pom.xml
@@ -19,7 +19,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/sparkworker/pom.xml b/docker/hoodie/hadoop/sparkworker/pom.xml
index 95e2b156e4a3..3f7cce36efe1 100644
--- a/docker/hoodie/hadoop/sparkworker/pom.xml
+++ b/docker/hoodie/hadoop/sparkworker/pom.xml
@@ -19,7 +19,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/trinobase/pom.xml b/docker/hoodie/hadoop/trinobase/pom.xml
index f858d99866a2..2df42f24a168 100644
--- a/docker/hoodie/hadoop/trinobase/pom.xml
+++ b/docker/hoodie/hadoop/trinobase/pom.xml
@@ -22,7 +22,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/trinocoordinator/pom.xml b/docker/hoodie/hadoop/trinocoordinator/pom.xml
index 7e22f0f5b3f0..f0f10507de1a 100644
--- a/docker/hoodie/hadoop/trinocoordinator/pom.xml
+++ b/docker/hoodie/hadoop/trinocoordinator/pom.xml
@@ -22,7 +22,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/trinoworker/pom.xml b/docker/hoodie/hadoop/trinoworker/pom.xml
index 80f3ba5b2e0b..6a514ca1d6f8 100644
--- a/docker/hoodie/hadoop/trinoworker/pom.xml
+++ b/docker/hoodie/hadoop/trinoworker/pom.xml
@@ -22,7 +22,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0pom
diff --git a/docker/setup_demo.sh b/docker/setup_demo.sh
index e847f913a5ac..d183086d26c7 100755
--- a/docker/setup_demo.sh
+++ b/docker/setup_demo.sh
@@ -24,13 +24,13 @@ if [ "$HUDI_DEMO_ENV" = "--mac-aarch64" ]; then
COMPOSE_FILE_NAME="docker-compose_hadoop284_hive233_spark244_mac_aarch64.yml"
fi
# restart cluster
-HUDI_WS=${WS_ROOT} docker-compose -f ${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} down
+HUDI_WS=${WS_ROOT} docker compose down -f ${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME}
if [ "$HUDI_DEMO_ENV" != "dev" ]; then
echo "Pulling docker demo images ..."
- HUDI_WS=${WS_ROOT} docker-compose -f ${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} pull
+ HUDI_WS=${WS_ROOT} docker compose -f ${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} pull
fi
sleep 5
-HUDI_WS=${WS_ROOT} docker-compose -f ${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} up -d
+HUDI_WS=${WS_ROOT} docker compose up -f ${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} -d
sleep 15
docker exec -it adhoc-1 /bin/bash /var/hoodie/ws/docker/demo/setup_demo_container.sh
diff --git a/docker/stop_demo.sh b/docker/stop_demo.sh
index 32a0e70c3791..25fb8d827349 100755
--- a/docker/stop_demo.sh
+++ b/docker/stop_demo.sh
@@ -25,7 +25,7 @@ if [ "$HUDI_DEMO_ENV" = "--mac-aarch64" ]; then
COMPOSE_FILE_NAME="docker-compose_hadoop284_hive233_spark244_mac_aarch64.yml"
fi
# shut down cluster
-HUDI_WS=${WS_ROOT} docker-compose -f ${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} down
+HUDI_WS=${WS_ROOT} docker compose down -f ${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME}
# remove houst mount directory
rm -rf /tmp/hadoop_data
diff --git a/hudi-aws/pom.xml b/hudi-aws/pom.xml
index 7e95d364a780..3093134d3a09 100644
--- a/hudi-aws/pom.xml
+++ b/hudi-aws/pom.xml
@@ -19,12 +19,12 @@
hudiorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0hudi-aws
- 0.14.2-rc1
+ 0.14.2-SNAPSHOThudi-awsjar
diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml
index 0dcbb2bee5e6..fcc9a63a8779 100644
--- a/hudi-cli/pom.xml
+++ b/hudi-cli/pom.xml
@@ -19,7 +19,7 @@
hudiorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0
diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml
index f00c5f1c4bf2..7a10ae12c35d 100644
--- a/hudi-client/hudi-client-common/pom.xml
+++ b/hudi-client/hudi-client-common/pom.xml
@@ -20,12 +20,12 @@
hudi-clientorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0hudi-client-common
- 0.14.2-rc1
+ 0.14.2-SNAPSHOThudi-client-commonjar
diff --git a/hudi-client/hudi-flink-client/pom.xml b/hudi-client/hudi-flink-client/pom.xml
index 4638050460a8..460e723496a2 100644
--- a/hudi-client/hudi-flink-client/pom.xml
+++ b/hudi-client/hudi-flink-client/pom.xml
@@ -20,12 +20,12 @@
hudi-clientorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0hudi-flink-client
- 0.14.2-rc1
+ 0.14.2-SNAPSHOThudi-flink-clientjar
diff --git a/hudi-client/hudi-java-client/pom.xml b/hudi-client/hudi-java-client/pom.xml
index 6f686bd30373..f66592688669 100644
--- a/hudi-client/hudi-java-client/pom.xml
+++ b/hudi-client/hudi-java-client/pom.xml
@@ -19,12 +19,12 @@
hudi-clientorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0hudi-java-client
- 0.14.2-rc1
+ 0.14.2-SNAPSHOThudi-java-clientjar
diff --git a/hudi-client/hudi-spark-client/pom.xml b/hudi-client/hudi-spark-client/pom.xml
index af126137c998..fa437494fd9f 100644
--- a/hudi-client/hudi-spark-client/pom.xml
+++ b/hudi-client/hudi-spark-client/pom.xml
@@ -19,12 +19,12 @@
hudi-clientorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0hudi-spark-client
- 0.14.2-rc1
+ 0.14.2-SNAPSHOThudi-spark-clientjar
diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml
index a73a1fd0124a..6498ddbd8c71 100644
--- a/hudi-client/pom.xml
+++ b/hudi-client/pom.xml
@@ -19,7 +19,7 @@
hudiorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0
diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml
index c8089ec4567f..1d5cda0f75e8 100644
--- a/hudi-common/pom.xml
+++ b/hudi-common/pom.xml
@@ -20,7 +20,7 @@
hudiorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0
diff --git a/hudi-examples/hudi-examples-common/pom.xml b/hudi-examples/hudi-examples-common/pom.xml
index 703537629b81..406449f90bfd 100644
--- a/hudi-examples/hudi-examples-common/pom.xml
+++ b/hudi-examples/hudi-examples-common/pom.xml
@@ -21,7 +21,7 @@
hudi-examplesorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0
diff --git a/hudi-examples/hudi-examples-flink/pom.xml b/hudi-examples/hudi-examples-flink/pom.xml
index 4a3cb93418a0..e66540201771 100644
--- a/hudi-examples/hudi-examples-flink/pom.xml
+++ b/hudi-examples/hudi-examples-flink/pom.xml
@@ -21,7 +21,7 @@
hudi-examplesorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0
diff --git a/hudi-examples/hudi-examples-java/pom.xml b/hudi-examples/hudi-examples-java/pom.xml
index 698564812024..d5ea102f4571 100644
--- a/hudi-examples/hudi-examples-java/pom.xml
+++ b/hudi-examples/hudi-examples-java/pom.xml
@@ -21,7 +21,7 @@
hudi-examplesorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0
diff --git a/hudi-examples/hudi-examples-spark/pom.xml b/hudi-examples/hudi-examples-spark/pom.xml
index 7b1b161f3432..c21ec28e0a93 100644
--- a/hudi-examples/hudi-examples-spark/pom.xml
+++ b/hudi-examples/hudi-examples-spark/pom.xml
@@ -21,7 +21,7 @@
hudi-examplesorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0
diff --git a/hudi-examples/pom.xml b/hudi-examples/pom.xml
index b87bd506302b..d8553639b026 100644
--- a/hudi-examples/pom.xml
+++ b/hudi-examples/pom.xml
@@ -20,7 +20,7 @@
hudiorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0
diff --git a/hudi-flink-datasource/hudi-flink/pom.xml b/hudi-flink-datasource/hudi-flink/pom.xml
index 3a479358478d..2f03b55322d4 100644
--- a/hudi-flink-datasource/hudi-flink/pom.xml
+++ b/hudi-flink-datasource/hudi-flink/pom.xml
@@ -22,12 +22,12 @@
hudi-flink-datasourceorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0hudi-flink
- 0.14.2-rc1
+ 0.14.2-SNAPSHOTjar
@@ -181,6 +181,7 @@
org.apache.flink${flink.connector.kafka.artifactId}
+ ${flink.connector.kafka.version}compile
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 23a7a1fcca71..5ea7a585a0d2 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table.catalog;
+import org.apache.hudi.adapter.HiveCatalogConstants.AlterHiveDatabaseOp;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.fs.FSUtils;
@@ -47,9 +48,6 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase;
-import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner;
-import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
@@ -107,17 +105,20 @@
import java.util.List;
import java.util.Map;
-import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP;
-import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME;
-import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_TYPE;
-import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+import static org.apache.hudi.adapter.HiveCatalogConstants.ALTER_DATABASE_OP;
+import static org.apache.hudi.adapter.HiveCatalogConstants.DATABASE_LOCATION_URI;
+import static org.apache.hudi.adapter.HiveCatalogConstants.DATABASE_OWNER_NAME;
+import static org.apache.hudi.adapter.HiveCatalogConstants.DATABASE_OWNER_TYPE;
+import static org.apache.hudi.adapter.HiveCatalogConstants.ROLE_OWNER;
+import static org.apache.hudi.adapter.HiveCatalogConstants.USER_OWNER;
import static org.apache.hudi.configuration.FlinkOptions.PATH;
import static org.apache.hudi.table.catalog.TableOptionProperties.COMMENT;
import static org.apache.hudi.table.catalog.TableOptionProperties.PK_CONSTRAINT_NAME;
import static org.apache.hudi.table.catalog.TableOptionProperties.SPARK_SOURCE_PROVIDER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
/**
* A catalog implementation for Hoodie based on MetaStore.
@@ -219,7 +220,7 @@ public CatalogDatabase getDatabase(String databaseName)
Map properties = new HashMap<>(hiveDatabase.getParameters());
- properties.put(SqlCreateHiveDatabase.DATABASE_LOCATION_URI, hiveDatabase.getLocationUri());
+ properties.put(DATABASE_LOCATION_URI, hiveDatabase.getLocationUri());
return new CatalogDatabaseImpl(properties, hiveDatabase.getDescription());
}
@@ -248,7 +249,7 @@ public void createDatabase(
Map properties = database.getProperties();
- String dbLocationUri = properties.remove(SqlCreateHiveDatabase.DATABASE_LOCATION_URI);
+ String dbLocationUri = properties.remove(DATABASE_LOCATION_URI);
if (dbLocationUri == null && this.catalogPath != null) {
// infer default location uri
dbLocationUri = new Path(this.catalogPath, databaseName).toString();
@@ -318,11 +319,10 @@ private static Database alterDatabase(Database hiveDB, CatalogDatabase newDataba
String opStr = newParams.remove(ALTER_DATABASE_OP);
if (opStr == null) {
// by default is to alter db properties
- opStr = SqlAlterHiveDatabase.AlterHiveDatabaseOp.CHANGE_PROPS.name();
+ opStr = AlterHiveDatabaseOp.CHANGE_PROPS.name();
}
- String newLocation = newParams.remove(SqlCreateHiveDatabase.DATABASE_LOCATION_URI);
- SqlAlterHiveDatabase.AlterHiveDatabaseOp op =
- SqlAlterHiveDatabase.AlterHiveDatabaseOp.valueOf(opStr);
+ String newLocation = newParams.remove(DATABASE_LOCATION_URI);
+ AlterHiveDatabaseOp op = AlterHiveDatabaseOp.valueOf(opStr);
switch (op) {
case CHANGE_PROPS:
hiveDB.setParameters(newParams);
@@ -335,10 +335,10 @@ private static Database alterDatabase(Database hiveDB, CatalogDatabase newDataba
String ownerType = newParams.remove(DATABASE_OWNER_TYPE);
hiveDB.setOwnerName(ownerName);
switch (ownerType) {
- case SqlAlterHiveDatabaseOwner.ROLE_OWNER:
+ case ROLE_OWNER:
hiveDB.setOwnerType(PrincipalType.ROLE);
break;
- case SqlAlterHiveDatabaseOwner.USER_OWNER:
+ case USER_OWNER:
hiveDB.setOwnerType(PrincipalType.USER);
break;
default:
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/pom.xml b/hudi-flink-datasource/hudi-flink1.13.x/pom.xml
index cef3a87e41fe..5581c7a2b40b 100644
--- a/hudi-flink-datasource/hudi-flink1.13.x/pom.xml
+++ b/hudi-flink-datasource/hudi-flink1.13.x/pom.xml
@@ -20,12 +20,12 @@
hudi-flink-datasourceorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0hudi-flink1.13.x
- 0.14.2-rc1
+ 0.14.2-SNAPSHOTjar
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java
new file mode 100644
index 000000000000..94ed3b538879
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase;
+import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner;
+import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase;
+
+/**
+ * Constants for Hive Catalog.
+ */
+public class HiveCatalogConstants {
+
+ // -----------------------------------------------------------------------------------
+ // Constants for ALTER DATABASE
+ // -----------------------------------------------------------------------------------
+ public static final String ALTER_DATABASE_OP = SqlAlterHiveDatabase.ALTER_DATABASE_OP;
+
+ public static final String DATABASE_LOCATION_URI = SqlCreateHiveDatabase.DATABASE_LOCATION_URI;
+
+ public static final String DATABASE_OWNER_NAME = SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME;
+
+ public static final String DATABASE_OWNER_TYPE = SqlAlterHiveDatabaseOwner.DATABASE_OWNER_TYPE;
+
+ public static final String ROLE_OWNER = SqlAlterHiveDatabaseOwner.ROLE_OWNER;
+
+ public static final String USER_OWNER = SqlAlterHiveDatabaseOwner.USER_OWNER;
+
+ /** Type of ALTER DATABASE operation. */
+ public enum AlterHiveDatabaseOp {
+ CHANGE_PROPS,
+ CHANGE_LOCATION,
+ CHANGE_OWNER
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/pom.xml b/hudi-flink-datasource/hudi-flink1.14.x/pom.xml
index fee71f633f3b..6aad1fcdce48 100644
--- a/hudi-flink-datasource/hudi-flink1.14.x/pom.xml
+++ b/hudi-flink-datasource/hudi-flink1.14.x/pom.xml
@@ -20,12 +20,12 @@
hudi-flink-datasourceorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0hudi-flink1.14.x
- 0.14.2-rc1
+ 0.14.2-SNAPSHOTjar
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java
new file mode 100644
index 000000000000..5d40e7ed1d87
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase;
+import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner;
+import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase;
+
+/**
+ * Constants for Hive Catalog.
+ */
+public class HiveCatalogConstants {
+
+ // -----------------------------------------------------------------------------------
+ // Constants for ALTER DATABASE
+ // -----------------------------------------------------------------------------------
+ public static final String ALTER_DATABASE_OP = SqlAlterHiveDatabase.ALTER_DATABASE_OP;
+
+ public static final String DATABASE_LOCATION_URI = SqlCreateHiveDatabase.DATABASE_LOCATION_URI;
+
+ public static final String DATABASE_OWNER_NAME = SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME;
+
+ public static final String DATABASE_OWNER_TYPE = SqlAlterHiveDatabaseOwner.DATABASE_OWNER_TYPE;
+
+ public static final String ROLE_OWNER = SqlAlterHiveDatabaseOwner.ROLE_OWNER;
+
+ public static final String USER_OWNER = SqlAlterHiveDatabaseOwner.USER_OWNER;
+
+ /** Type of ALTER DATABASE operation. */
+ public enum AlterHiveDatabaseOp {
+ CHANGE_PROPS,
+ CHANGE_LOCATION,
+ CHANGE_OWNER
+ }
+}
+
diff --git a/hudi-flink-datasource/hudi-flink1.15.x/pom.xml b/hudi-flink-datasource/hudi-flink1.15.x/pom.xml
index 7b6b1b28bbc0..a4777eaf732a 100644
--- a/hudi-flink-datasource/hudi-flink1.15.x/pom.xml
+++ b/hudi-flink-datasource/hudi-flink1.15.x/pom.xml
@@ -20,12 +20,12 @@
hudi-flink-datasourceorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0hudi-flink1.15.x
- 0.14.2-rc1
+ 0.14.2-SNAPSHOTjar
diff --git a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java
new file mode 100644
index 000000000000..5d40e7ed1d87
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase;
+import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner;
+import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase;
+
+/**
+ * Constants for Hive Catalog.
+ */
+public class HiveCatalogConstants {
+
+ // -----------------------------------------------------------------------------------
+ // Constants for ALTER DATABASE
+ // -----------------------------------------------------------------------------------
+ public static final String ALTER_DATABASE_OP = SqlAlterHiveDatabase.ALTER_DATABASE_OP;
+
+ public static final String DATABASE_LOCATION_URI = SqlCreateHiveDatabase.DATABASE_LOCATION_URI;
+
+ public static final String DATABASE_OWNER_NAME = SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME;
+
+ public static final String DATABASE_OWNER_TYPE = SqlAlterHiveDatabaseOwner.DATABASE_OWNER_TYPE;
+
+ public static final String ROLE_OWNER = SqlAlterHiveDatabaseOwner.ROLE_OWNER;
+
+ public static final String USER_OWNER = SqlAlterHiveDatabaseOwner.USER_OWNER;
+
+ /** Type of ALTER DATABASE operation. */
+ public enum AlterHiveDatabaseOp {
+ CHANGE_PROPS,
+ CHANGE_LOCATION,
+ CHANGE_OWNER
+ }
+}
+
diff --git a/hudi-flink-datasource/hudi-flink1.16.x/pom.xml b/hudi-flink-datasource/hudi-flink1.16.x/pom.xml
index 724b01770826..dd18efb45ca5 100644
--- a/hudi-flink-datasource/hudi-flink1.16.x/pom.xml
+++ b/hudi-flink-datasource/hudi-flink1.16.x/pom.xml
@@ -20,12 +20,12 @@
hudi-flink-datasourceorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0hudi-flink1.16.x
- 0.14.2-rc1
+ 0.14.2-SNAPSHOTjar
diff --git a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java
new file mode 100644
index 000000000000..5d40e7ed1d87
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase;
+import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner;
+import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase;
+
+/**
+ * Constants for Hive Catalog.
+ */
+public class HiveCatalogConstants {
+
+ // -----------------------------------------------------------------------------------
+ // Constants for ALTER DATABASE
+ // -----------------------------------------------------------------------------------
+ public static final String ALTER_DATABASE_OP = SqlAlterHiveDatabase.ALTER_DATABASE_OP;
+
+ public static final String DATABASE_LOCATION_URI = SqlCreateHiveDatabase.DATABASE_LOCATION_URI;
+
+ public static final String DATABASE_OWNER_NAME = SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME;
+
+ public static final String DATABASE_OWNER_TYPE = SqlAlterHiveDatabaseOwner.DATABASE_OWNER_TYPE;
+
+ public static final String ROLE_OWNER = SqlAlterHiveDatabaseOwner.ROLE_OWNER;
+
+ public static final String USER_OWNER = SqlAlterHiveDatabaseOwner.USER_OWNER;
+
+ /** Type of ALTER DATABASE operation. */
+ public enum AlterHiveDatabaseOp {
+ CHANGE_PROPS,
+ CHANGE_LOCATION,
+ CHANGE_OWNER
+ }
+}
+
diff --git a/hudi-flink-datasource/hudi-flink1.17.x/pom.xml b/hudi-flink-datasource/hudi-flink1.17.x/pom.xml
index 95ccb5389f22..9a87908527aa 100644
--- a/hudi-flink-datasource/hudi-flink1.17.x/pom.xml
+++ b/hudi-flink-datasource/hudi-flink1.17.x/pom.xml
@@ -20,12 +20,12 @@
hudi-flink-datasourceorg.apache.hudi
- 0.14.2-rc1
+ 0.14.2-SNAPSHOT4.0.0hudi-flink1.17.x
- 0.14.2-rc1
+ 0.14.2-SNAPSHOTjar
diff --git a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java
new file mode 100644
index 000000000000..5d40e7ed1d87
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase;
+import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner;
+import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase;
+
+/**
+ * Constants for Hive Catalog.
+ */
+public class HiveCatalogConstants {
+
+ // -----------------------------------------------------------------------------------
+ // Constants for ALTER DATABASE
+ // -----------------------------------------------------------------------------------
+ public static final String ALTER_DATABASE_OP = SqlAlterHiveDatabase.ALTER_DATABASE_OP;
+
+ public static final String DATABASE_LOCATION_URI = SqlCreateHiveDatabase.DATABASE_LOCATION_URI;
+
+ public static final String DATABASE_OWNER_NAME = SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME;
+
+ public static final String DATABASE_OWNER_TYPE = SqlAlterHiveDatabaseOwner.DATABASE_OWNER_TYPE;
+
+ public static final String ROLE_OWNER = SqlAlterHiveDatabaseOwner.ROLE_OWNER;
+
+ public static final String USER_OWNER = SqlAlterHiveDatabaseOwner.USER_OWNER;
+
+ /** Type of ALTER DATABASE operation. */
+ public enum AlterHiveDatabaseOp {
+ CHANGE_PROPS,
+ CHANGE_LOCATION,
+ CHANGE_OWNER
+ }
+}
+
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/pom.xml b/hudi-flink-datasource/hudi-flink1.18.x/pom.xml
new file mode 100644
index 000000000000..dab044bb0821
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/pom.xml
@@ -0,0 +1,168 @@
+
+
+
+
+ hudi-flink-datasource
+ org.apache.hudi
+ 0.14.2-SNAPSHOT
+
+ 4.0.0
+
+ hudi-flink1.18.x
+ 0.14.2-SNAPSHOT
+ jar
+
+
+ ${project.parent.parent.basedir}
+
+
+
+
+
+ org.apache.logging.log4j
+ log4j-1.2-api
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+ org.apache.hudi
+ hudi-common
+ ${project.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+ provided
+
+
+
+
+ org.apache.flink
+ flink-connector-hive_2.12
+ ${flink1.18.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-api-java
+ ${flink1.18.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-api-java-bridge
+ ${flink1.18.version}
+ provided
+
+
+ org.apache.flink
+ flink-shaded-guava
+ 30.1.1-jre-14.0
+ provided
+
+
+ org.apache.flink
+ flink-core
+ ${flink1.18.version}
+ provided
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink1.18.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-runtime
+ ${flink1.18.version}
+ provided
+
+
+ org.apache.flink
+ flink-parquet
+ ${flink1.18.version}
+ provided
+
+
+ org.apache.flink
+ flink-json
+ ${flink1.18.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-planner_2.12
+ ${flink1.18.version}
+ provided
+
+
+
+
+ org.apache.flink
+ flink-runtime
+ ${flink1.18.version}
+ test
+ test-jar
+
+
+ org.apache.hudi
+ hudi-tests-common
+ ${project.version}
+ test
+
+
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+ test-compile
+
+
+
+ false
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+
+
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/AbstractStreamOperatorAdapter.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/AbstractStreamOperatorAdapter.java
new file mode 100644
index 000000000000..d4c6bc3a8f4d
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/AbstractStreamOperatorAdapter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+
+/**
+ * Adapter clazz for {@code AbstractStreamOperator}.
+ */
+public abstract class AbstractStreamOperatorAdapter extends AbstractStreamOperator {
+}
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/AbstractStreamOperatorFactoryAdapter.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/AbstractStreamOperatorFactoryAdapter.java
new file mode 100644
index 000000000000..6dcfe71ccfd9
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/AbstractStreamOperatorFactoryAdapter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
+
+/**
+ * Adapter clazz for {@link AbstractStreamOperatorFactory}.
+ */
+public abstract class AbstractStreamOperatorFactoryAdapter
+ extends AbstractStreamOperatorFactory implements YieldingOperatorFactory {
+
+ public MailboxExecutorAdapter getMailboxExecutorAdapter() {
+ return new MailboxExecutorAdapter(getMailboxExecutor());
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/DataStreamScanProviderAdapter.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/DataStreamScanProviderAdapter.java
new file mode 100644
index 000000000000..a6b5439ea1ff
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/DataStreamScanProviderAdapter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Adapter clazz for {@code DataStreamScanProvider}.
+ */
+public interface DataStreamScanProviderAdapter extends DataStreamScanProvider {
+ default DataStream produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment streamExecutionEnvironment) {
+ return produceDataStream(streamExecutionEnvironment);
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/DataStreamSinkProviderAdapter.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/DataStreamSinkProviderAdapter.java
new file mode 100644
index 000000000000..349f60f30acf
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/DataStreamSinkProviderAdapter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Adapter clazz for {@code DataStreamSinkProvider}.
+ */
+public interface DataStreamSinkProviderAdapter extends DataStreamSinkProvider {
+ DataStreamSink> consumeDataStream(DataStream dataStream);
+
+ @Override
+ default DataStreamSink> consumeDataStream(ProviderContext providerContext, DataStream dataStream) {
+ return consumeDataStream(dataStream);
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java
new file mode 100644
index 000000000000..7c1649301607
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.catalog.hive.util.Constants;
+
+/**
+ * Constants for Hive Catalog.
+ */
+public class HiveCatalogConstants {
+
+ // -----------------------------------------------------------------------------------
+ // Constants for ALTER DATABASE
+ // -----------------------------------------------------------------------------------
+ public static final String ALTER_DATABASE_OP = Constants.ALTER_DATABASE_OP;
+
+ public static final String DATABASE_LOCATION_URI = Constants.DATABASE_LOCATION_URI;
+
+ public static final String DATABASE_OWNER_NAME = Constants.DATABASE_OWNER_NAME;
+
+ public static final String DATABASE_OWNER_TYPE = Constants.DATABASE_OWNER_TYPE;
+
+ public static final String ROLE_OWNER = Constants.ROLE_OWNER;
+
+ public static final String USER_OWNER = Constants.USER_OWNER;
+
+ /** Type of ALTER DATABASE operation. */
+ public enum AlterHiveDatabaseOp {
+ CHANGE_PROPS,
+ CHANGE_LOCATION,
+ CHANGE_OWNER
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/MailboxExecutorAdapter.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/MailboxExecutorAdapter.java
new file mode 100644
index 000000000000..0c836f3db391
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/MailboxExecutorAdapter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Adapter clazz for {@link MailboxExecutor}.
+ */
+public class MailboxExecutorAdapter {
+ private final MailboxExecutor executor;
+
+ public MailboxExecutorAdapter(MailboxExecutor executor) {
+ this.executor = executor;
+ }
+
+ public void execute(ThrowingRunnable extends Exception> command, String description) {
+ this.executor.execute(command, description);
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/MaskingOutputAdapter.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/MaskingOutputAdapter.java
new file mode 100644
index 000000000000..e84da0d6ec30
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/MaskingOutputAdapter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+import org.apache.flink.util.OutputTag;
+
+/** Adapter class for {@code Output} to handle async compaction/clustering service thread safe issues */
+public class MaskingOutputAdapter implements Output> {
+
+ private final Output> output;
+
+ public MaskingOutputAdapter(Output> output) {
+ this.output = output;
+ }
+
+ @Override
+ public void emitWatermark(Watermark watermark) {
+ // For thread safe, not to propagate the watermark
+ }
+
+ @Override
+ public void emitLatencyMarker(LatencyMarker latencyMarker) {
+ // For thread safe, not to propagate latency marker
+ }
+
+ @Override
+ public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
+ // For thread safe, not to propagate watermark status
+ }
+
+ @Override
+ public void collect(OutputTag outputTag, StreamRecord streamRecord) {
+ this.output.collect(outputTag, streamRecord);
+ }
+
+ @Override
+ public void collect(StreamRecord outStreamRecord) {
+ this.output.collect(outStreamRecord);
+ }
+
+ @Override
+ public void close() {
+ this.output.close();
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/OperatorCoordinatorAdapter.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/OperatorCoordinatorAdapter.java
new file mode 100644
index 000000000000..9c37de17bd1f
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/OperatorCoordinatorAdapter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+
+import javax.annotation.Nullable;
+
+/**
+ * Adapter clazz for {@code OperatorCoordinator}.
+ */
+public interface OperatorCoordinatorAdapter extends OperatorCoordinator {
+ void handleEventFromOperator(int i, OperatorEvent operatorEvent) throws Exception;
+
+ @Override
+ default void handleEventFromOperator(int i, int attemptNumber, OperatorEvent operatorEvent) throws Exception {
+ handleEventFromOperator(i, operatorEvent);
+ }
+
+ void subtaskReady(int i, SubtaskGateway subtaskGateway);
+
+ @Override
+ default void executionAttemptReady(int i, int attemptNumber, SubtaskGateway subtaskGateway) {
+ subtaskReady(i, subtaskGateway);
+ }
+
+ @Override
+ default void executionAttemptFailed(int i, int attemptNumber, Throwable throwable) {
+ subtaskReady(i, null);
+ }
+
+ void subtaskFailed(int i, @Nullable Throwable throwable);
+}
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/RateLimiterAdapter.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/RateLimiterAdapter.java
new file mode 100644
index 000000000000..865c0c81d4d9
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/RateLimiterAdapter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter;
+
+/**
+ * Bridge class for shaded guava clazz {@code RateLimiter}.
+ */
+public class RateLimiterAdapter {
+ private final RateLimiter rateLimiter;
+
+ private RateLimiterAdapter(double permitsPerSecond) {
+ this.rateLimiter = RateLimiter.create(permitsPerSecond);
+ }
+
+ public static RateLimiterAdapter create(double permitsPerSecond) {
+ return new RateLimiterAdapter(permitsPerSecond);
+ }
+
+ public void acquire() {
+ this.rateLimiter.acquire();
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/SortCodeGeneratorAdapter.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/SortCodeGeneratorAdapter.java
new file mode 100644
index 000000000000..e38a58a0ccfb
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/SortCodeGeneratorAdapter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Adapter clazz for {@code SortCodeGenerator}.
+ */
+public class SortCodeGeneratorAdapter extends SortCodeGenerator {
+ public SortCodeGeneratorAdapter(ReadableConfig tableConfig, RowType input, SortSpec sortSpec) {
+ super(tableConfig, Thread.currentThread().getContextClassLoader(), input, sortSpec);
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
new file mode 100644
index 000000000000..de0019d41bd9
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
+
+import javax.annotation.Nullable;
+
+/**
+ * Adapter clazz for {@link org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
+ */
+public interface SupportsRowLevelDeleteAdapter extends SupportsRowLevelDelete {
+ @Override
+ default RowLevelDeleteInfo applyRowLevelDelete(@Nullable RowLevelModificationScanContext context) {
+ return applyRowLevelDelete();
+ }
+
+ RowLevelDeleteInfoAdapter applyRowLevelDelete();
+
+ /**
+ * Adapter clazz for {@link SupportsRowLevelDelete.RowLevelDeleteInfo}.
+ */
+ interface RowLevelDeleteInfoAdapter extends RowLevelDeleteInfo {
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
new file mode 100644
index 000000000000..17c785d48455
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * Adapter clazz for {@link org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}.
+ */
+public interface SupportsRowLevelUpdateAdapter extends SupportsRowLevelUpdate {
+ @Override
+ default RowLevelUpdateInfo applyRowLevelUpdate(List updatedColumns, @Nullable RowLevelModificationScanContext context) {
+ return applyRowLevelUpdate(updatedColumns);
+ }
+
+ RowLevelUpdateInfoAdapter applyRowLevelUpdate(List updatedColumns);
+
+ /**
+ * Adapter clazz for {@link SupportsRowLevelUpdate.RowLevelUpdateInfo}.
+ */
+ interface RowLevelUpdateInfoAdapter extends RowLevelUpdateInfo {
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/Utils.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/Utils.java
new file mode 100644
index 000000000000..659c65973674
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/Utils.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamSourceContexts;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
+import org.apache.flink.table.runtime.generated.RecordComparator;
+import org.apache.flink.table.runtime.operators.sort.BinaryExternalSorter;
+import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
+
+import java.util.Collections;
+
+/**
+ * Adapter utils.
+ */
+public class Utils {
+ public static SourceFunction.SourceContext getSourceContext(
+ TimeCharacteristic timeCharacteristic,
+ ProcessingTimeService processingTimeService,
+ StreamTask, ?> streamTask,
+ Output> output,
+ long watermarkInterval) {
+ return StreamSourceContexts.getSourceContext(
+ timeCharacteristic,
+ processingTimeService,
+ new Object(), // no actual locking needed
+ output,
+ watermarkInterval,
+ -1,
+ true);
+ }
+
+ public static FactoryUtil.DefaultDynamicTableContext getTableContext(
+ ObjectIdentifier tablePath,
+ ResolvedCatalogTable catalogTable,
+ ReadableConfig conf) {
+ return new FactoryUtil.DefaultDynamicTableContext(tablePath, catalogTable,
+ Collections.emptyMap(), conf, Thread.currentThread().getContextClassLoader(), false);
+ }
+
+ public static BinaryExternalSorter getBinaryExternalSorter(
+ final Object owner,
+ MemoryManager memoryManager,
+ long reservedMemorySize,
+ IOManager ioManager,
+ AbstractRowDataSerializer inputSerializer,
+ BinaryRowDataSerializer serializer,
+ NormalizedKeyComputer normalizedKeyComputer,
+ RecordComparator comparator,
+ Configuration conf) {
+ return new BinaryExternalSorter(owner, memoryManager, reservedMemorySize,
+ ioManager, inputSerializer, serializer, normalizedKeyComputer, comparator,
+ conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+ conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+ (int) conf.get(
+ ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE).getBytes(),
+ conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED));
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
new file mode 100644
index 000000000000..1b636c63b2f6
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow;
+
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.HeapDecimalVector;
+import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
+import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
+import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
+import org.apache.hudi.table.format.cow.vector.reader.RowColumnReader;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.vector.reader.BooleanColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.ByteColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.BytesColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.DoubleColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.FloatColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.IntColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.LongColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.ShortColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.columnar.vector.ColumnVector;
+import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBooleanVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapByteVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapDoubleVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapFloatVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapIntVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapLongVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapShortVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapTimestampVector;
+import org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetRuntimeException;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.InvalidSchemaException;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.utils.DateTimeUtils.toInternal;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+import static org.apache.parquet.Preconditions.checkArgument;
+
+/**
+ * Util for generating {@link ParquetColumnarRowSplitReader}.
+ *
+ *
NOTE: reference from Flink release 1.11.2 {@code ParquetSplitReaderUtil}, modify to support INT64
+ * based TIMESTAMP_MILLIS as ConvertedType, should remove when Flink supports that.
+ */
+public class ParquetSplitReaderUtil {
+
+ /**
+ * Util for generating partitioned {@link ParquetColumnarRowSplitReader}.
+ */
+ public static ParquetColumnarRowSplitReader genPartColumnarRowReader(
+ boolean utcTimestamp,
+ boolean caseSensitive,
+ Configuration conf,
+ String[] fullFieldNames,
+ DataType[] fullFieldTypes,
+ Map partitionSpec,
+ int[] selectedFields,
+ int batchSize,
+ Path path,
+ long splitStart,
+ long splitLength,
+ FilterPredicate filterPredicate,
+ UnboundRecordFilter recordFilter) throws IOException {
+ List selNonPartNames = Arrays.stream(selectedFields)
+ .mapToObj(i -> fullFieldNames[i])
+ .filter(n -> !partitionSpec.containsKey(n))
+ .collect(Collectors.toList());
+
+ int[] selParquetFields = Arrays.stream(selectedFields)
+ .filter(i -> !partitionSpec.containsKey(fullFieldNames[i]))
+ .toArray();
+
+ ParquetColumnarRowSplitReader.ColumnBatchGenerator gen = readVectors -> {
+ // create and initialize the row batch
+ ColumnVector[] vectors = new ColumnVector[selectedFields.length];
+ for (int i = 0; i < vectors.length; i++) {
+ String name = fullFieldNames[selectedFields[i]];
+ LogicalType type = fullFieldTypes[selectedFields[i]].getLogicalType();
+ vectors[i] = createVector(readVectors, selNonPartNames, name, type, partitionSpec, batchSize);
+ }
+ return new VectorizedColumnBatch(vectors);
+ };
+
+ return new ParquetColumnarRowSplitReader(
+ utcTimestamp,
+ caseSensitive,
+ conf,
+ Arrays.stream(selParquetFields)
+ .mapToObj(i -> fullFieldTypes[i].getLogicalType())
+ .toArray(LogicalType[]::new),
+ selNonPartNames.toArray(new String[0]),
+ gen,
+ batchSize,
+ new org.apache.hadoop.fs.Path(path.toUri()),
+ splitStart,
+ splitLength,
+ filterPredicate,
+ recordFilter);
+ }
+
+ private static ColumnVector createVector(
+ ColumnVector[] readVectors,
+ List selNonPartNames,
+ String name,
+ LogicalType type,
+ Map partitionSpec,
+ int batchSize) {
+ if (partitionSpec.containsKey(name)) {
+ return createVectorFromConstant(type, partitionSpec.get(name), batchSize);
+ }
+ ColumnVector readVector = readVectors[selNonPartNames.indexOf(name)];
+ if (readVector == null) {
+ // when the read vector is null, use a constant null vector instead
+ readVector = createVectorFromConstant(type, null, batchSize);
+ }
+ return readVector;
+ }
+
+ private static ColumnVector createVectorFromConstant(
+ LogicalType type,
+ Object value,
+ int batchSize) {
+ switch (type.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ HeapBytesVector bsv = new HeapBytesVector(batchSize);
+ if (value == null) {
+ bsv.fillWithNulls();
+ } else {
+ bsv.fill(value instanceof byte[]
+ ? (byte[]) value
+ : getUTF8Bytes(value.toString()));
+ }
+ return bsv;
+ case BOOLEAN:
+ HeapBooleanVector bv = new HeapBooleanVector(batchSize);
+ if (value == null) {
+ bv.fillWithNulls();
+ } else {
+ bv.fill((boolean) value);
+ }
+ return bv;
+ case TINYINT:
+ HeapByteVector byteVector = new HeapByteVector(batchSize);
+ if (value == null) {
+ byteVector.fillWithNulls();
+ } else {
+ byteVector.fill(((Number) value).byteValue());
+ }
+ return byteVector;
+ case SMALLINT:
+ HeapShortVector sv = new HeapShortVector(batchSize);
+ if (value == null) {
+ sv.fillWithNulls();
+ } else {
+ sv.fill(((Number) value).shortValue());
+ }
+ return sv;
+ case INTEGER:
+ HeapIntVector iv = new HeapIntVector(batchSize);
+ if (value == null) {
+ iv.fillWithNulls();
+ } else {
+ iv.fill(((Number) value).intValue());
+ }
+ return iv;
+ case BIGINT:
+ HeapLongVector lv = new HeapLongVector(batchSize);
+ if (value == null) {
+ lv.fillWithNulls();
+ } else {
+ lv.fill(((Number) value).longValue());
+ }
+ return lv;
+ case DECIMAL:
+ HeapDecimalVector decv = new HeapDecimalVector(batchSize);
+ if (value == null) {
+ decv.fillWithNulls();
+ } else {
+ DecimalType decimalType = (DecimalType) type;
+ int precision = decimalType.getPrecision();
+ int scale = decimalType.getScale();
+ DecimalData decimal = Preconditions.checkNotNull(
+ DecimalData.fromBigDecimal((BigDecimal) value, precision, scale));
+ decv.fill(decimal.toUnscaledBytes());
+ }
+ return decv;
+ case FLOAT:
+ HeapFloatVector fv = new HeapFloatVector(batchSize);
+ if (value == null) {
+ fv.fillWithNulls();
+ } else {
+ fv.fill(((Number) value).floatValue());
+ }
+ return fv;
+ case DOUBLE:
+ HeapDoubleVector dv = new HeapDoubleVector(batchSize);
+ if (value == null) {
+ dv.fillWithNulls();
+ } else {
+ dv.fill(((Number) value).doubleValue());
+ }
+ return dv;
+ case DATE:
+ if (value instanceof LocalDate) {
+ value = Date.valueOf((LocalDate) value);
+ }
+ return createVectorFromConstant(
+ new IntType(),
+ value == null ? null : toInternal((Date) value),
+ batchSize);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ HeapTimestampVector tv = new HeapTimestampVector(batchSize);
+ if (value == null) {
+ tv.fillWithNulls();
+ } else {
+ tv.fill(TimestampData.fromLocalDateTime((LocalDateTime) value));
+ }
+ return tv;
+ case ARRAY:
+ HeapArrayVector arrayVector = new HeapArrayVector(batchSize);
+ if (value == null) {
+ arrayVector.fillWithNulls();
+ return arrayVector;
+ } else {
+ throw new UnsupportedOperationException("Unsupported create array with default value.");
+ }
+ case MAP:
+ HeapMapColumnVector mapVector = new HeapMapColumnVector(batchSize, null, null);
+ if (value == null) {
+ mapVector.fillWithNulls();
+ return mapVector;
+ } else {
+ throw new UnsupportedOperationException("Unsupported create map with default value.");
+ }
+ case ROW:
+ HeapRowColumnVector rowVector = new HeapRowColumnVector(batchSize);
+ if (value == null) {
+ rowVector.fillWithNulls();
+ return rowVector;
+ } else {
+ throw new UnsupportedOperationException("Unsupported create row with default value.");
+ }
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ }
+ }
+
+ private static List filterDescriptors(int depth, Type type, List columns) throws ParquetRuntimeException {
+ List filtered = new ArrayList<>();
+ for (ColumnDescriptor descriptor : columns) {
+ if (depth >= descriptor.getPath().length) {
+ throw new InvalidSchemaException("Expect depth " + depth + " for schema: " + descriptor);
+ }
+ if (type.getName().equals(descriptor.getPath()[depth])) {
+ filtered.add(descriptor);
+ }
+ }
+ ValidationUtils.checkState(filtered.size() > 0, "Corrupted Parquet schema");
+ return filtered;
+ }
+
+ public static ColumnReader createColumnReader(
+ boolean utcTimestamp,
+ LogicalType fieldType,
+ Type physicalType,
+ List descriptors,
+ PageReadStore pages) throws IOException {
+ return createColumnReader(utcTimestamp, fieldType, physicalType, descriptors,
+ pages, 0);
+ }
+
+ private static ColumnReader createColumnReader(
+ boolean utcTimestamp,
+ LogicalType fieldType,
+ Type physicalType,
+ List columns,
+ PageReadStore pages,
+ int depth) throws IOException {
+ List descriptors = filterDescriptors(depth, physicalType, columns);
+ ColumnDescriptor descriptor = descriptors.get(0);
+ PageReader pageReader = pages.getPageReader(descriptor);
+ switch (fieldType.getTypeRoot()) {
+ case BOOLEAN:
+ return new BooleanColumnReader(descriptor, pageReader);
+ case TINYINT:
+ return new ByteColumnReader(descriptor, pageReader);
+ case DOUBLE:
+ return new DoubleColumnReader(descriptor, pageReader);
+ case FLOAT:
+ return new FloatColumnReader(descriptor, pageReader);
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ return new IntColumnReader(descriptor, pageReader);
+ case BIGINT:
+ return new LongColumnReader(descriptor, pageReader);
+ case SMALLINT:
+ return new ShortColumnReader(descriptor, pageReader);
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ return new BytesColumnReader(descriptor, pageReader);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+ case INT64:
+ int precision = fieldType instanceof TimestampType
+ ? ((TimestampType) fieldType).getPrecision()
+ : ((LocalZonedTimestampType) fieldType).getPrecision();
+ return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision);
+ case INT96:
+ return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
+ default:
+ throw new AssertionError();
+ }
+ case DECIMAL:
+ switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+ case INT32:
+ return new IntColumnReader(descriptor, pageReader);
+ case INT64:
+ return new LongColumnReader(descriptor, pageReader);
+ case BINARY:
+ return new BytesColumnReader(descriptor, pageReader);
+ case FIXED_LEN_BYTE_ARRAY:
+ return new FixedLenBytesColumnReader(
+ descriptor, pageReader);
+ default:
+ throw new AssertionError();
+ }
+ case ARRAY:
+ return new ArrayColumnReader(
+ descriptor,
+ pageReader,
+ utcTimestamp,
+ descriptor.getPrimitiveType(),
+ fieldType);
+ case MAP:
+ MapType mapType = (MapType) fieldType;
+ ArrayColumnReader keyReader =
+ new ArrayColumnReader(
+ descriptor,
+ pageReader,
+ utcTimestamp,
+ descriptor.getPrimitiveType(),
+ new ArrayType(mapType.getKeyType()));
+ ArrayColumnReader valueReader =
+ new ArrayColumnReader(
+ descriptors.get(1),
+ pages.getPageReader(descriptors.get(1)),
+ utcTimestamp,
+ descriptors.get(1).getPrimitiveType(),
+ new ArrayType(mapType.getValueType()));
+ return new MapColumnReader(keyReader, valueReader, fieldType);
+ case ROW:
+ RowType rowType = (RowType) fieldType;
+ GroupType groupType = physicalType.asGroupType();
+ List fieldReaders = new ArrayList<>();
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ // schema evolution: read the parquet file with a new extended field name.
+ int fieldIndex = getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+ if (fieldIndex < 0) {
+ fieldReaders.add(new EmptyColumnReader());
+ } else {
+ fieldReaders.add(
+ createColumnReader(
+ utcTimestamp,
+ rowType.getTypeAt(i),
+ groupType.getType(fieldIndex),
+ descriptors,
+ pages,
+ depth + 1));
+ }
+ }
+ return new RowColumnReader(fieldReaders);
+ default:
+ throw new UnsupportedOperationException(fieldType + " is not supported now.");
+ }
+ }
+
+ public static WritableColumnVector createWritableColumnVector(
+ int batchSize,
+ LogicalType fieldType,
+ Type physicalType,
+ List descriptors) {
+ return createWritableColumnVector(batchSize, fieldType, physicalType, descriptors, 0);
+ }
+
+ private static WritableColumnVector createWritableColumnVector(
+ int batchSize,
+ LogicalType fieldType,
+ Type physicalType,
+ List columns,
+ int depth) {
+ List descriptors = filterDescriptors(depth, physicalType, columns);
+ PrimitiveType primitiveType = descriptors.get(0).getPrimitiveType();
+ PrimitiveType.PrimitiveTypeName typeName = primitiveType.getPrimitiveTypeName();
+ switch (fieldType.getTypeRoot()) {
+ case BOOLEAN:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.BOOLEAN,
+ "Unexpected type: %s", typeName);
+ return new HeapBooleanVector(batchSize);
+ case TINYINT:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.INT32,
+ "Unexpected type: %s", typeName);
+ return new HeapByteVector(batchSize);
+ case DOUBLE:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.DOUBLE,
+ "Unexpected type: %s", typeName);
+ return new HeapDoubleVector(batchSize);
+ case FLOAT:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.FLOAT,
+ "Unexpected type: %s", typeName);
+ return new HeapFloatVector(batchSize);
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.INT32,
+ "Unexpected type: %s", typeName);
+ return new HeapIntVector(batchSize);
+ case BIGINT:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.INT64,
+ "Unexpected type: %s", typeName);
+ return new HeapLongVector(batchSize);
+ case SMALLINT:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.INT32,
+ "Unexpected type: %s", typeName);
+ return new HeapShortVector(batchSize);
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.BINARY,
+ "Unexpected type: %s", typeName);
+ return new HeapBytesVector(batchSize);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ checkArgument(primitiveType.getOriginalType() != OriginalType.TIME_MICROS,
+ "TIME_MICROS original type is not ");
+ return new HeapTimestampVector(batchSize);
+ case DECIMAL:
+ checkArgument(
+ (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
+ || typeName == PrimitiveType.PrimitiveTypeName.BINARY)
+ && primitiveType.getOriginalType() == OriginalType.DECIMAL,
+ "Unexpected type: %s", typeName);
+ return new HeapDecimalVector(batchSize);
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) fieldType;
+ return new HeapArrayVector(
+ batchSize,
+ createWritableColumnVector(
+ batchSize,
+ arrayType.getElementType(),
+ physicalType,
+ descriptors,
+ depth));
+ case MAP:
+ MapType mapType = (MapType) fieldType;
+ GroupType repeatedType = physicalType.asGroupType().getType(0).asGroupType();
+ // the map column has three level paths.
+ return new HeapMapColumnVector(
+ batchSize,
+ createWritableColumnVector(
+ batchSize,
+ mapType.getKeyType(),
+ repeatedType.getType(0),
+ descriptors,
+ depth + 2),
+ createWritableColumnVector(
+ batchSize,
+ mapType.getValueType(),
+ repeatedType.getType(1),
+ descriptors,
+ depth + 2));
+ case ROW:
+ RowType rowType = (RowType) fieldType;
+ GroupType groupType = physicalType.asGroupType();
+ WritableColumnVector[] columnVectors = new WritableColumnVector[rowType.getFieldCount()];
+ for (int i = 0; i < columnVectors.length; i++) {
+ // schema evolution: read the file with a new extended field name.
+ int fieldIndex = getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+ if (fieldIndex < 0) {
+ columnVectors[i] = (WritableColumnVector) createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
+ } else {
+ columnVectors[i] =
+ createWritableColumnVector(
+ batchSize,
+ rowType.getTypeAt(i),
+ groupType.getType(fieldIndex),
+ descriptors,
+ depth + 1);
+ }
+ }
+ return new HeapRowColumnVector(batchSize, columnVectors);
+ default:
+ throw new UnsupportedOperationException(fieldType + " is not supported now.");
+ }
+ }
+
+ /**
+ * Returns the field index with given physical row type {@code groupType} and field name {@code fieldName}.
+ *
+ * @return The physical field index or -1 if the field does not exist
+ */
+ private static int getFieldIndexInPhysicalType(String fieldName, GroupType groupType) {
+ // get index from fileSchema type, else, return -1
+ return groupType.containsField(fieldName) ? groupType.getFieldIndex(fieldName) : -1;
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
new file mode 100644
index 000000000000..7db66d23d6fc
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.columnar.ColumnarArrayData;
+import org.apache.flink.table.data.columnar.vector.ArrayColumnVector;
+import org.apache.flink.table.data.columnar.vector.ColumnVector;
+import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector;
+import org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+
+/**
+ * This class represents a nullable heap array column vector.
+ */
+public class HeapArrayVector extends AbstractHeapVector
+ implements WritableColumnVector, ArrayColumnVector {
+
+ public long[] offsets;
+ public long[] lengths;
+ public ColumnVector child;
+ private int size;
+
+ public HeapArrayVector(int len) {
+ super(len);
+ offsets = new long[len];
+ lengths = new long[len];
+ }
+
+ public HeapArrayVector(int len, ColumnVector vector) {
+ super(len);
+ offsets = new long[len];
+ lengths = new long[len];
+ this.child = vector;
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public void setSize(int size) {
+ this.size = size;
+ }
+
+ public int getLen() {
+ return this.isNull.length;
+ }
+
+ @Override
+ public ArrayData getArray(int i) {
+ long offset = offsets[i];
+ long length = lengths[i];
+ return new ColumnarArrayData(child, (int) offset, (int) length);
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
new file mode 100644
index 000000000000..fdc55ac18fc6
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.columnar.vector.DecimalColumnVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector;
+
+/**
+ * This class represents a nullable heap map decimal vector.
+ */
+public class HeapDecimalVector extends HeapBytesVector implements DecimalColumnVector {
+
+ public HeapDecimalVector(int len) {
+ super(len);
+ }
+
+ @Override
+ public DecimalData getDecimal(int i, int precision, int scale) {
+ return DecimalData.fromUnscaledBytes(
+ this.getBytes(i).getBytes(), precision, scale);
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
new file mode 100644
index 000000000000..a37973716950
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.columnar.ColumnarMapData;
+import org.apache.flink.table.data.columnar.vector.ColumnVector;
+import org.apache.flink.table.data.columnar.vector.MapColumnVector;
+import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector;
+import org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+
+/**
+ * This class represents a nullable heap map column vector.
+ */
+public class HeapMapColumnVector extends AbstractHeapVector
+ implements WritableColumnVector, MapColumnVector {
+
+ private long[] offsets;
+ private long[] lengths;
+ private int size;
+ private ColumnVector keys;
+ private ColumnVector values;
+
+ public HeapMapColumnVector(int len, ColumnVector keys, ColumnVector values) {
+ super(len);
+ size = 0;
+ offsets = new long[len];
+ lengths = new long[len];
+ this.keys = keys;
+ this.values = values;
+ }
+
+ public void setOffsets(long[] offsets) {
+ this.offsets = offsets;
+ }
+
+ public void setLengths(long[] lengths) {
+ this.lengths = lengths;
+ }
+
+ public void setKeys(ColumnVector keys) {
+ this.keys = keys;
+ }
+
+ public void setValues(ColumnVector values) {
+ this.values = values;
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public void setSize(int size) {
+ this.size = size;
+ }
+
+ @Override
+ public MapData getMap(int i) {
+ long offset = offsets[i];
+ long length = lengths[i];
+ return new ColumnarMapData(keys, values, (int) offset, (int) length);
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
new file mode 100644
index 000000000000..ae194e4e6ab0
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.columnar.ColumnarRowData;
+import org.apache.flink.table.data.columnar.vector.RowColumnVector;
+import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
+import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector;
+import org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+
+/**
+ * This class represents a nullable heap row column vector.
+ */
+public class HeapRowColumnVector extends AbstractHeapVector
+ implements WritableColumnVector, RowColumnVector {
+
+ public WritableColumnVector[] vectors;
+
+ public HeapRowColumnVector(int len, WritableColumnVector... vectors) {
+ super(len);
+ this.vectors = vectors;
+ }
+
+ @Override
+ public ColumnarRowData getRow(int i) {
+ ColumnarRowData columnarRowData = new ColumnarRowData(new VectorizedColumnBatch(vectors));
+ columnarRowData.setRowId(i);
+ return columnarRowData;
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ for (WritableColumnVector vector : vectors) {
+ vector.reset();
+ }
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ParquetDecimalVector.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ParquetDecimalVector.java
new file mode 100644
index 000000000000..98b5e6105089
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ParquetDecimalVector.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.columnar.vector.BytesColumnVector;
+import org.apache.flink.table.data.columnar.vector.ColumnVector;
+import org.apache.flink.table.data.columnar.vector.DecimalColumnVector;
+
+/**
+ * Parquet write decimal as int32 and int64 and binary, this class wrap the real vector to
+ * provide {@link DecimalColumnVector} interface.
+ *
+ *
Reference Flink release 1.11.2 {@link org.apache.flink.formats.parquet.vector.ParquetDecimalVector}
+ * because it is not public.
+ */
+public class ParquetDecimalVector implements DecimalColumnVector {
+
+ public final ColumnVector vector;
+
+ public ParquetDecimalVector(ColumnVector vector) {
+ this.vector = vector;
+ }
+
+ @Override
+ public DecimalData getDecimal(int i, int precision, int scale) {
+ return DecimalData.fromUnscaledBytes(
+ ((BytesColumnVector) vector).getBytes(i).getBytes(),
+ precision,
+ scale);
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNullAt(i);
+ }
+}
+
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/AbstractColumnReader.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/AbstractColumnReader.java
new file mode 100644
index 000000000000..a8b733de636a
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/AbstractColumnReader.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.formats.parquet.vector.ParquetDictionary;
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+import org.apache.flink.table.data.columnar.vector.writable.WritableIntVector;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+
+/**
+ * Abstract {@link ColumnReader}.
+ * See {@link org.apache.parquet.column.impl.ColumnReaderImpl},
+ * part of the code is referred from Apache Spark and Apache Parquet.
+ *
+ *
Note: Reference Flink release 1.11.2 {@link org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader}
+ * because some of the package scope methods.
+ */
+public abstract class AbstractColumnReader
+ implements ColumnReader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.class);
+
+ private final PageReader pageReader;
+
+ /**
+ * The dictionary, if this column has dictionary encoding.
+ */
+ protected final Dictionary dictionary;
+
+ /**
+ * Maximum definition level for this column.
+ */
+ protected final int maxDefLevel;
+
+ protected final ColumnDescriptor descriptor;
+
+ /**
+ * Total number of values read.
+ */
+ private long valuesRead;
+
+ /**
+ * value that indicates the end of the current page. That is, if valuesRead ==
+ * endOfPageValueCount, we are at the end of the page.
+ */
+ private long endOfPageValueCount;
+
+ /**
+ * If true, the current page is dictionary encoded.
+ */
+ private boolean isCurrentPageDictionaryEncoded;
+
+ /**
+ * Total values in the current page.
+ */
+ private int pageValueCount;
+
+ /*
+ * Input streams:
+ * 1.Run length encoder to encode every data, so we have run length stream to get
+ * run length information.
+ * 2.Data maybe is real data, maybe is dictionary ids which need be decode to real
+ * data from Dictionary.
+ *
+ * Run length stream ------> Data stream
+ * |
+ * ------> Dictionary ids stream
+ */
+
+ /**
+ * Run length decoder for data and dictionary.
+ */
+ protected RunLengthDecoder runLenDecoder;
+
+ /**
+ * Data input stream.
+ */
+ ByteBufferInputStream dataInputStream;
+
+ /**
+ * Dictionary decoder to wrap dictionary ids input stream.
+ */
+ private RunLengthDecoder dictionaryIdsDecoder;
+
+ public AbstractColumnReader(
+ ColumnDescriptor descriptor,
+ PageReader pageReader) throws IOException {
+ this.descriptor = descriptor;
+ this.pageReader = pageReader;
+ this.maxDefLevel = descriptor.getMaxDefinitionLevel();
+
+ DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+ if (dictionaryPage != null) {
+ try {
+ this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
+ this.isCurrentPageDictionaryEncoded = true;
+ } catch (IOException e) {
+ throw new IOException("could not decode the dictionary for " + descriptor, e);
+ }
+ } else {
+ this.dictionary = null;
+ this.isCurrentPageDictionaryEncoded = false;
+ }
+ /*
+ * Total number of values in this column (in this row group).
+ */
+ long totalValueCount = pageReader.getTotalValueCount();
+ if (totalValueCount == 0) {
+ throw new IOException("totalValueCount == 0");
+ }
+ }
+
+ protected void checkTypeName(PrimitiveType.PrimitiveTypeName expectedName) {
+ PrimitiveType.PrimitiveTypeName actualName = descriptor.getPrimitiveType().getPrimitiveTypeName();
+ Preconditions.checkArgument(
+ actualName == expectedName,
+ "Expected type name: %s, actual type name: %s",
+ expectedName,
+ actualName);
+ }
+
+ /**
+ * Reads `total` values from this columnReader into column.
+ */
+ @Override
+ public final void readToVector(int readNumber, V vector) throws IOException {
+ int rowId = 0;
+ WritableIntVector dictionaryIds = null;
+ if (dictionary != null) {
+ dictionaryIds = vector.reserveDictionaryIds(readNumber);
+ }
+ while (readNumber > 0) {
+ // Compute the number of values we want to read in this page.
+ int leftInPage = (int) (endOfPageValueCount - valuesRead);
+ if (leftInPage == 0) {
+ DataPage page = pageReader.readPage();
+ if (page instanceof DataPageV1) {
+ readPageV1((DataPageV1) page);
+ } else if (page instanceof DataPageV2) {
+ readPageV2((DataPageV2) page);
+ } else {
+ throw new RuntimeException("Unsupported page type: " + page.getClass());
+ }
+ leftInPage = (int) (endOfPageValueCount - valuesRead);
+ }
+ int num = Math.min(readNumber, leftInPage);
+ if (isCurrentPageDictionaryEncoded) {
+ // Read and decode dictionary ids.
+ runLenDecoder.readDictionaryIds(
+ num, dictionaryIds, vector, rowId, maxDefLevel, this.dictionaryIdsDecoder);
+
+ if (vector.hasDictionary() || (rowId == 0 && supportLazyDecode())) {
+ // Column vector supports lazy decoding of dictionary values so just set the dictionary.
+ // We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
+ // non-dictionary encoded values have already been added).
+ vector.setDictionary(new ParquetDictionary(dictionary));
+ } else {
+ readBatchFromDictionaryIds(rowId, num, vector, dictionaryIds);
+ }
+ } else {
+ if (vector.hasDictionary() && rowId != 0) {
+ // This batch already has dictionary encoded values but this new page is not. The batch
+ // does not support a mix of dictionary and not so we will decode the dictionary.
+ readBatchFromDictionaryIds(0, rowId, vector, vector.getDictionaryIds());
+ }
+ vector.setDictionary(null);
+ readBatch(rowId, num, vector);
+ }
+
+ valuesRead += num;
+ rowId += num;
+ readNumber -= num;
+ }
+ }
+
+ private void readPageV1(DataPageV1 page) throws IOException {
+ this.pageValueCount = page.getValueCount();
+ ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
+
+ // Initialize the decoders.
+ if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) {
+ throw new UnsupportedOperationException("Unsupported encoding: " + page.getDlEncoding());
+ }
+ int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
+ this.runLenDecoder = new RunLengthDecoder(bitWidth);
+ try {
+ BytesInput bytes = page.getBytes();
+ ByteBufferInputStream in = bytes.toInputStream();
+ rlReader.initFromPage(pageValueCount, in);
+ this.runLenDecoder.initFromStream(pageValueCount, in);
+ prepareNewPage(page.getValueEncoding(), in);
+ } catch (IOException e) {
+ throw new IOException("could not read page " + page + " in col " + descriptor, e);
+ }
+ }
+
+ private void readPageV2(DataPageV2 page) throws IOException {
+ this.pageValueCount = page.getValueCount();
+
+ int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
+ // do not read the length from the stream. v2 pages handle dividing the page bytes.
+ this.runLenDecoder = new RunLengthDecoder(bitWidth, false);
+ this.runLenDecoder.initFromStream(
+ this.pageValueCount, page.getDefinitionLevels().toInputStream());
+ try {
+ prepareNewPage(page.getDataEncoding(), page.getData().toInputStream());
+ } catch (IOException e) {
+ throw new IOException("could not read page " + page + " in col " + descriptor, e);
+ }
+ }
+
+ private void prepareNewPage(
+ Encoding dataEncoding,
+ ByteBufferInputStream in) throws IOException {
+ this.endOfPageValueCount = valuesRead + pageValueCount;
+ if (dataEncoding.usesDictionary()) {
+ if (dictionary == null) {
+ throw new IOException("Could not read page in col "
+ + descriptor
+ + " as the dictionary was missing for encoding "
+ + dataEncoding);
+ }
+ @SuppressWarnings("deprecation")
+ Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression
+ if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) {
+ throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
+ }
+ this.dataInputStream = null;
+ this.dictionaryIdsDecoder = new RunLengthDecoder();
+ try {
+ this.dictionaryIdsDecoder.initFromStream(pageValueCount, in);
+ } catch (IOException e) {
+ throw new IOException("could not read dictionary in col " + descriptor, e);
+ }
+ this.isCurrentPageDictionaryEncoded = true;
+ } else {
+ if (dataEncoding != Encoding.PLAIN) {
+ throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
+ }
+ this.dictionaryIdsDecoder = null;
+ LOG.debug("init from page at offset {} for length {}", in.position(), in.available());
+ this.dataInputStream = in.remainingStream();
+ this.isCurrentPageDictionaryEncoded = false;
+ }
+
+ afterReadPage();
+ }
+
+ final ByteBuffer readDataBuffer(int length) {
+ try {
+ return dataInputStream.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read " + length + " bytes", e);
+ }
+ }
+
+ /**
+ * After read a page, we may need some initialization.
+ */
+ protected void afterReadPage() {
+ }
+
+ /**
+ * Support lazy dictionary ids decode. See more in {@link ParquetDictionary}.
+ * If return false, we will decode all the data first.
+ */
+ protected boolean supportLazyDecode() {
+ return true;
+ }
+
+ /**
+ * Read batch from {@link #runLenDecoder} and {@link #dataInputStream}.
+ */
+ protected abstract void readBatch(int rowId, int num, V column);
+
+ /**
+ * Decode dictionary ids to data.
+ * From {@link #runLenDecoder} and {@link #dictionaryIdsDecoder}.
+ */
+ protected abstract void readBatchFromDictionaryIds(
+ int rowId,
+ int num,
+ V column,
+ WritableIntVector dictionaryIds);
+}
+
diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java
new file mode 100644
index 000000000000..6a8a01b74946
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBooleanVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapByteVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapDoubleVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapFloatVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapIntVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapLongVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapShortVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapTimestampVector;
+import org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Array {@link ColumnReader}.
+ */
+public class ArrayColumnReader extends BaseVectorizedColumnReader {
+
+ // The value read in last time
+ private Object lastValue;
+
+ // flag to indicate if there is no data in parquet data page
+ private boolean eof = false;
+
+ // flag to indicate if it's the first time to read parquet data page with this instance
+ boolean isFirstRow = true;
+
+ public ArrayColumnReader(
+ ColumnDescriptor descriptor,
+ PageReader pageReader,
+ boolean isUtcTimestamp,
+ Type type,
+ LogicalType logicalType)
+ throws IOException {
+ super(descriptor, pageReader, isUtcTimestamp, type, logicalType);
+ }
+
+ @Override
+ public void readToVector(int readNumber, WritableColumnVector vector) throws IOException {
+ HeapArrayVector lcv = (HeapArrayVector) vector;
+ // before readBatch, initial the size of offsets & lengths as the default value,
+ // the actual size will be assigned in setChildrenInfo() after reading complete.
+ lcv.offsets = new long[VectorizedColumnBatch.DEFAULT_SIZE];
+ lcv.lengths = new long[VectorizedColumnBatch.DEFAULT_SIZE];
+ // Because the length of ListColumnVector.child can't be known now,
+ // the valueList will save all data for ListColumnVector temporary.
+ List