diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java index 2050e4f60ffe..fe397fc0e1ff 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java @@ -170,6 +170,7 @@ private List getPartitionsSegment(Segment segment, String tableName) .databaseName(databaseName) .tableName(tableName) .segment(segment) + .excludeColumnSchema(true) .nextToken(nextToken) .build()).get(); partitions.addAll(result.partitions().stream() diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGluePartitionPushdown.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGluePartitionPushdown.java index 9601482b65af..31b804862f0c 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGluePartitionPushdown.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGluePartitionPushdown.java @@ -25,17 +25,17 @@ import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.config.HoodieAWSConfig; import org.apache.hudi.hive.HiveSyncConfig; -import org.apache.hudi.sync.common.model.FieldSchema; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.glue.model.BatchCreatePartitionRequest; import software.amazon.awssdk.services.glue.model.Column; import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest; -import software.amazon.awssdk.services.glue.model.CreatePartitionRequest; import software.amazon.awssdk.services.glue.model.CreateTableRequest; import software.amazon.awssdk.services.glue.model.DatabaseInput; import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest; @@ -48,14 +48,20 @@ import java.io.IOException; import java.nio.file.Files; import java.time.Instant; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; -@Disabled("HUDI-7475 The tests do not work. Disabling them to unblock Azure CI") +@Disabled("The tests do not work due to misconfiguration. Disabling them to unblock Azure CI") public class ITTestGluePartitionPushdown { private static final String MOTO_ENDPOINT = "http://localhost:5000"; @@ -67,9 +73,9 @@ public class ITTestGluePartitionPushdown { private AWSGlueCatalogSyncClient glueSync; private FileSystem fileSystem; private Column[] partitionsColumn = {Column.builder().name("part1").type("int").build(), Column.builder().name("part2").type("string").build()}; - List partitionsFieldSchema = Arrays.asList(new FieldSchema("part1", "int"), new FieldSchema("part2", "string")); - public ITTestGluePartitionPushdown() throws IOException {} + public ITTestGluePartitionPushdown() throws IOException { + } @BeforeEach public void setUp() throws Exception { @@ -116,11 +122,26 @@ public void teardown() throws Exception { fileSystem.delete(new Path(tablePath), true); } - private void createPartitions(String...partitions) throws ExecutionException, InterruptedException { - glueSync.awsGlue.createPartition(CreatePartitionRequest.builder().databaseName(DB_NAME).tableName(TABLE_NAME) - .partitionInput(PartitionInput.builder() - .storageDescriptor(StorageDescriptor.builder().columns(partitionsColumn).build()) - .values(partitions).build()).build()).get(); + private List createPartitions(int amount, int partitionSize, boolean createInGlue) throws ExecutionException, InterruptedException { + Set partitions = IntStream.range(0, amount) + .mapToObj(i -> IntStream.range(0, partitionSize) + .mapToObj(j -> String.valueOf(ThreadLocalRandom.current().nextInt(100000))) + .toArray(String[]::new)) + .collect(Collectors.toSet()); + List partitionsList = new ArrayList<>(partitions); + if (createInGlue) { + for (List inp : CollectionUtils.batches(partitionsList, 1000)) { + glueSync.awsGlue.batchCreatePartition(BatchCreatePartitionRequest.builder().databaseName(DB_NAME).tableName(TABLE_NAME) + .partitionInputList( + inp.stream().map( + inp1 -> PartitionInput + .builder() + .storageDescriptor(StorageDescriptor.builder().columns(partitionsColumn).build()) + .values(inp1).build()).collect(Collectors.toList())) + .build()).get(); + } + } + return partitionsList.stream().map(partition -> String.join("/", partition)).collect(Collectors.toList()); } @Test @@ -131,8 +152,40 @@ public void testEmptyPartitionShouldReturnEmpty() { @Test public void testPresentPartitionShouldReturnIt() throws ExecutionException, InterruptedException { - createPartitions("1", "b'ar"); + List partitions = createPartitions(1, 2, true); Assertions.assertEquals(1, glueSync.getPartitionsFromList(TABLE_NAME, - Arrays.asList("1/b'ar", "2/foo", "1/b''ar")).size()); + Arrays.asList(partitions.get(0), "2/foo", "1/b''ar")).size()); + } + + @Test + public void testCreatingManyPartitionsAndThenReadingAllShouldShowAll() throws ExecutionException, InterruptedException { + List partitions = createPartitions(2000, 2, false); + glueSync.addPartitionsToTable(TABLE_NAME, partitions); + Assertions.assertEquals(partitions.size(), glueSync.getAllPartitions(TABLE_NAME).size()); + } + + @Test + public void testDeletingAllPartitionsShouldReturnEmpty() throws ExecutionException, InterruptedException { + List partitions = createPartitions(2000, 2, true); + int firstDeleteBatch = 1000; + glueSync.dropPartitions(TABLE_NAME, partitions.subList(0, firstDeleteBatch)); + Assertions.assertEquals(partitions.size() - 1000, glueSync.getAllPartitions(TABLE_NAME).size()); + glueSync.dropPartitions(TABLE_NAME, partitions.subList(firstDeleteBatch, partitions.size())); + Assertions.assertEquals(0, glueSync.getAllPartitions(TABLE_NAME).size()); + } + + @Test + public void testReadFromListAlongWithUnknownShouldReturnKnown() throws ExecutionException, InterruptedException { + List existingPartitions = createPartitions(1000, 2, true); + List fakePartitions = createPartitions(1000, 2, false); + Assertions.assertEquals(existingPartitions.size(), glueSync.getPartitionsFromList( + TABLE_NAME, + Stream.concat(existingPartitions.stream(), fakePartitions.stream()) + .collect(Collectors.toList()) + ).size()); + Assertions.assertEquals(300, glueSync.getPartitionsFromList( + TABLE_NAME, + existingPartitions.subList(0, 300) + ).size()); } }