Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7466] Add tests to AWSGlueCatalogSyncClient #10897

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ private List<Partition> getPartitionsSegment(Segment segment, String tableName)
.databaseName(databaseName)
.tableName(tableName)
.segment(segment)
.excludeColumnSchema(true)
.nextToken(nextToken)
.build()).get();
partitions.addAll(result.partitions().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.config.HoodieAWSConfig;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.sync.common.model.FieldSchema;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.glue.model.BatchCreatePartitionRequest;
import software.amazon.awssdk.services.glue.model.Column;
import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
import software.amazon.awssdk.services.glue.model.CreatePartitionRequest;
import software.amazon.awssdk.services.glue.model.CreateTableRequest;
import software.amazon.awssdk.services.glue.model.DatabaseInput;
import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
Expand All @@ -48,14 +48,20 @@
import java.io.IOException;
import java.nio.file.Files;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;

@Disabled("HUDI-7475 The tests do not work. Disabling them to unblock Azure CI")
@Disabled("The tests do not work due to misconfiguration. Disabling them to unblock Azure CI")
public class ITTestGluePartitionPushdown {

private static final String MOTO_ENDPOINT = "http://localhost:5000";
Expand All @@ -67,9 +73,9 @@ public class ITTestGluePartitionPushdown {
private AWSGlueCatalogSyncClient glueSync;
private FileSystem fileSystem;
private Column[] partitionsColumn = {Column.builder().name("part1").type("int").build(), Column.builder().name("part2").type("string").build()};
List<FieldSchema> partitionsFieldSchema = Arrays.asList(new FieldSchema("part1", "int"), new FieldSchema("part2", "string"));

public ITTestGluePartitionPushdown() throws IOException {}
public ITTestGluePartitionPushdown() throws IOException {
}

@BeforeEach
public void setUp() throws Exception {
Expand Down Expand Up @@ -116,11 +122,26 @@ public void teardown() throws Exception {
fileSystem.delete(new Path(tablePath), true);
}

private void createPartitions(String...partitions) throws ExecutionException, InterruptedException {
glueSync.awsGlue.createPartition(CreatePartitionRequest.builder().databaseName(DB_NAME).tableName(TABLE_NAME)
.partitionInput(PartitionInput.builder()
.storageDescriptor(StorageDescriptor.builder().columns(partitionsColumn).build())
.values(partitions).build()).build()).get();
private List<String> createPartitions(int amount, int partitionSize, boolean createInGlue) throws ExecutionException, InterruptedException {
Set<String[]> partitions = IntStream.range(0, amount)
.mapToObj(i -> IntStream.range(0, partitionSize)
.mapToObj(j -> String.valueOf(ThreadLocalRandom.current().nextInt(100000)))
.toArray(String[]::new))
.collect(Collectors.toSet());
List<String[]> partitionsList = new ArrayList<>(partitions);
if (createInGlue) {
for (List<String[]> inp : CollectionUtils.batches(partitionsList, 1000)) {
glueSync.awsGlue.batchCreatePartition(BatchCreatePartitionRequest.builder().databaseName(DB_NAME).tableName(TABLE_NAME)
.partitionInputList(
inp.stream().map(
inp1 -> PartitionInput
.builder()
.storageDescriptor(StorageDescriptor.builder().columns(partitionsColumn).build())
.values(inp1).build()).collect(Collectors.toList()))
.build()).get();
}
}
return partitionsList.stream().map(partition -> String.join("/", partition)).collect(Collectors.toList());
}

@Test
Expand All @@ -131,8 +152,40 @@ public void testEmptyPartitionShouldReturnEmpty() {

@Test
public void testPresentPartitionShouldReturnIt() throws ExecutionException, InterruptedException {
createPartitions("1", "b'ar");
List<String> partitions = createPartitions(1, 2, true);
Assertions.assertEquals(1, glueSync.getPartitionsFromList(TABLE_NAME,
Arrays.asList("1/b'ar", "2/foo", "1/b''ar")).size());
Arrays.asList(partitions.get(0), "2/foo", "1/b''ar")).size());
}

@Test
public void testCreatingManyPartitionsAndThenReadingAllShouldShowAll() throws ExecutionException, InterruptedException {
List<String> partitions = createPartitions(2000, 2, false);
glueSync.addPartitionsToTable(TABLE_NAME, partitions);
Assertions.assertEquals(partitions.size(), glueSync.getAllPartitions(TABLE_NAME).size());
}

@Test
public void testDeletingAllPartitionsShouldReturnEmpty() throws ExecutionException, InterruptedException {
List<String> partitions = createPartitions(2000, 2, true);
int firstDeleteBatch = 1000;
glueSync.dropPartitions(TABLE_NAME, partitions.subList(0, firstDeleteBatch));
Assertions.assertEquals(partitions.size() - 1000, glueSync.getAllPartitions(TABLE_NAME).size());
glueSync.dropPartitions(TABLE_NAME, partitions.subList(firstDeleteBatch, partitions.size()));
Assertions.assertEquals(0, glueSync.getAllPartitions(TABLE_NAME).size());
}

@Test
public void testReadFromListAlongWithUnknownShouldReturnKnown() throws ExecutionException, InterruptedException {
List<String> existingPartitions = createPartitions(1000, 2, true);
List<String> fakePartitions = createPartitions(1000, 2, false);
Assertions.assertEquals(existingPartitions.size(), glueSync.getPartitionsFromList(
TABLE_NAME,
Stream.concat(existingPartitions.stream(), fakePartitions.stream())
.collect(Collectors.toList())
).size());
Assertions.assertEquals(300, glueSync.getPartitionsFromList(
TABLE_NAME,
existingPartitions.subList(0, 300)
).size());
}
}
Loading