Skip to content

Commit

Permalink
[HUDI-7351] Implement partition pushdown for glue (#10604)
Browse files Browse the repository at this point in the history
  • Loading branch information
parisni authored Feb 4, 2024
1 parent 274aa47 commit ff0e67f
Show file tree
Hide file tree
Showing 14 changed files with 461 additions and 41 deletions.
16 changes: 16 additions & 0 deletions hudi-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

<properties>
<dynamodb-local.version>1.15.0</dynamodb-local.version>
<moto.version>latest</moto.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -261,6 +262,21 @@
</wait>
</run>
</image>
<image>
<name>motoserver/moto:${moto.version}</name>
<alias>it-aws</alias>
<run>
<ports>
<port>${moto.port}:${moto.port}</port>
</ports>
<wait>
<http>
<url>${moto.endpoint}/moto-api/</url>
</http>
<time>10000</time>
</wait>
</run>
</image>
</images>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.aws.sync;

import org.apache.hudi.aws.sync.util.GluePartitionFilterGenerator;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.CollectionUtils;
Expand All @@ -28,7 +29,9 @@
import org.apache.hudi.sync.common.model.FieldSchema;
import org.apache.hudi.sync.common.model.Partition;

import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.glue.GlueAsyncClient;
import software.amazon.awssdk.services.glue.GlueAsyncClientBuilder;
import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
import software.amazon.awssdk.services.glue.model.BatchCreatePartitionRequest;
import software.amazon.awssdk.services.glue.model.BatchCreatePartitionResponse;
Expand Down Expand Up @@ -66,6 +69,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.net.URISyntaxException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -84,6 +89,8 @@
import static org.apache.hudi.config.GlueCatalogSyncClientConfig.GLUE_METADATA_FILE_LISTING;
import static org.apache.hudi.config.GlueCatalogSyncClientConfig.META_SYNC_PARTITION_INDEX_FIELDS;
import static org.apache.hudi.config.GlueCatalogSyncClientConfig.META_SYNC_PARTITION_INDEX_FIELDS_ENABLE;
import static org.apache.hudi.config.HoodieAWSConfig.AWS_GLUE_ENDPOINT;
import static org.apache.hudi.config.HoodieAWSConfig.AWS_GLUE_REGION;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
import static org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType;
Expand All @@ -104,7 +111,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient {
private static final Logger LOG = LoggerFactory.getLogger(AWSGlueCatalogSyncClient.class);
private static final int MAX_PARTITIONS_PER_REQUEST = 100;
private static final int MAX_DELETE_PARTITIONS_PER_REQUEST = 25;
private final GlueAsyncClient awsGlue;
protected final GlueAsyncClient awsGlue;
private static final String GLUE_PARTITION_INDEX_ENABLE = "partition_filtering.enabled";
private static final int PARTITION_INDEX_MAX_NUMBER = 3;
/**
Expand All @@ -119,9 +126,17 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient {

public AWSGlueCatalogSyncClient(HiveSyncConfig config) {
super(config);
this.awsGlue = GlueAsyncClient.builder()
.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(config.getProps()))
.build();
try {
GlueAsyncClientBuilder awsGlueBuilder = GlueAsyncClient.builder()
.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(config.getProps()));
awsGlueBuilder = config.getString(AWS_GLUE_ENDPOINT) == null ? awsGlueBuilder :
awsGlueBuilder.endpointOverride(new URI(config.getString(AWS_GLUE_ENDPOINT)));
awsGlueBuilder = config.getString(AWS_GLUE_REGION) == null ? awsGlueBuilder :
awsGlueBuilder.region(Region.of(config.getString(AWS_GLUE_REGION)));
this.awsGlue = awsGlueBuilder.build();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
this.databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME);
this.skipTableArchive = config.getBooleanOrDefault(GlueCatalogSyncClientConfig.GLUE_SKIP_TABLE_ARCHIVE);
this.enableMetadataTable = Boolean.toString(config.getBoolean(GLUE_METADATA_FILE_LISTING)).toUpperCase();
Expand All @@ -130,25 +145,42 @@ public AWSGlueCatalogSyncClient(HiveSyncConfig config) {
@Override
public List<Partition> getAllPartitions(String tableName) {
try {
List<Partition> partitions = new ArrayList<>();
String nextToken = null;
do {
GetPartitionsResponse result = awsGlue.getPartitions(GetPartitionsRequest.builder()
.databaseName(databaseName)
.tableName(tableName)
.nextToken(nextToken)
.build()).get();
partitions.addAll(result.partitions().stream()
.map(p -> new Partition(p.values(), p.storageDescriptor().location()))
.collect(Collectors.toList()));
nextToken = result.nextToken();
} while (nextToken != null);
return partitions;
return getPartitions(GetPartitionsRequest.builder()
.databaseName(databaseName)
.tableName(tableName));
} catch (Exception e) {
throw new HoodieGlueSyncException("Failed to get all partitions for table " + tableId(databaseName, tableName), e);
}
}

@Override
public List<Partition> getPartitionsByFilter(String tableName, String filter) {
try {
return getPartitions(GetPartitionsRequest.builder()
.databaseName(databaseName)
.tableName(tableName)
.expression(filter));
} catch (Exception e) {
throw new HoodieGlueSyncException("Failed to get partitions for table " + tableId(databaseName, tableName) + " from expression: " + filter, e);
}
}

private List<Partition> getPartitions(GetPartitionsRequest.Builder partitionRequestBuilder) throws InterruptedException, ExecutionException {
List<Partition> partitions = new ArrayList<>();
String nextToken = null;
do {
GetPartitionsResponse result = awsGlue.getPartitions(partitionRequestBuilder
.excludeColumnSchema(true)
.nextToken(nextToken)
.build()).get();
partitions.addAll(result.partitions().stream()
.map(p -> new Partition(p.values(), p.storageDescriptor().location()))
.collect(Collectors.toList()));
nextToken = result.nextToken();
} while (nextToken != null);
return partitions;
}

@Override
public void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
if (partitionsToAdd.isEmpty()) {
Expand Down Expand Up @@ -700,6 +732,11 @@ public void deleteLastReplicatedTimeStamp(String tableName) {
throw new UnsupportedOperationException("Not supported: `deleteLastReplicatedTimeStamp`");
}

@Override
public String generatePushDownFilter(List<String> writtenPartitions, List<FieldSchema> partitionFields) {
return new GluePartitionFilterGenerator().generatePushDownFilter(writtenPartitions, partitionFields, (HiveSyncConfig) config);
}

private List<Column> getColumnsFromSchema(Map<String, String> mapSchema) {
List<Column> cols = new ArrayList<>();
for (String key : mapSchema.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.aws.sync.util;

import org.apache.hudi.hive.util.FilterGenVisitor;

public class GlueFilterGenVisitor extends FilterGenVisitor {

@Override
protected String quoteStringLiteral(String value) {
// Glue uses jSQLParser.
// https://jsqlparser.github.io/JSqlParser/usage.html#define-the-parser-features
return "'" + (value.contains("'") ? value.replaceAll("'", "''") : value) + "'";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.aws.sync.util;

import org.apache.hudi.expression.Expression;
import org.apache.hudi.hive.util.PartitionFilterGenerator;

public class GluePartitionFilterGenerator extends PartitionFilterGenerator {

protected String generateFilterString(Expression filter) {
return filter.accept(new GlueFilterGenVisitor());
}
}
14 changes: 14 additions & 0 deletions hudi-aws/src/main/java/org/apache/hudi/config/HoodieAWSConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ public class HoodieAWSConfig extends HoodieConfig {
.sinceVersion("0.13.2")
.withDocumentation("AWS Role ARN to assume");

public static final ConfigProperty<String> AWS_GLUE_ENDPOINT = ConfigProperty
.key("hoodie.aws.glue.endpoint")
.noDefaultValue()
.markAdvanced()
.sinceVersion("0.14.2")
.withDocumentation("Aws glue endpoint");

public static final ConfigProperty<String> AWS_GLUE_REGION = ConfigProperty
.key("hoodie.aws.glue.region")
.noDefaultValue()
.markAdvanced()
.sinceVersion("0.14.2")
.withDocumentation("Aws glue endpoint");

private HoodieAWSConfig() {
super();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.aws.sync;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.config.HoodieAWSConfig;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.sync.common.model.FieldSchema;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.glue.model.Column;
import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
import software.amazon.awssdk.services.glue.model.CreatePartitionRequest;
import software.amazon.awssdk.services.glue.model.CreateTableRequest;
import software.amazon.awssdk.services.glue.model.DatabaseInput;
import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
import software.amazon.awssdk.services.glue.model.PartitionInput;
import software.amazon.awssdk.services.glue.model.SerDeInfo;
import software.amazon.awssdk.services.glue.model.StorageDescriptor;
import software.amazon.awssdk.services.glue.model.TableInput;

import java.io.IOException;
import java.nio.file.Files;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;

public class ITTestGluePartitionPushdown {

private static final String MOTO_ENDPOINT = "http://localhost:5000";
private static final String DB_NAME = "db_name";
private static final String TABLE_NAME = "tbl_name";
private String basePath = Files.createTempDirectory("hivesynctest" + Instant.now().toEpochMilli()).toUri().toString();
private String tablePath = basePath + "/" + TABLE_NAME;
private TypedProperties hiveSyncProps;
private AWSGlueCatalogSyncClient glueSync;
private FileSystem fileSystem;
private Column[] partitionsColumn = {Column.builder().name("part1").type("int").build(), Column.builder().name("part2").type("string").build()};
List<FieldSchema> partitionsFieldSchema = Arrays.asList(new FieldSchema("part1", "int"), new FieldSchema("part2", "string"));

public ITTestGluePartitionPushdown() throws IOException {}

@BeforeEach
public void setUp() throws Exception {
hiveSyncProps = new TypedProperties();
hiveSyncProps.setProperty(HoodieAWSConfig.AWS_ACCESS_KEY.key(), "dummy");
hiveSyncProps.setProperty(HoodieAWSConfig.AWS_SECRET_KEY.key(), "dummy");
hiveSyncProps.setProperty(HoodieAWSConfig.AWS_SESSION_TOKEN.key(), "dummy");
hiveSyncProps.setProperty(HoodieAWSConfig.AWS_GLUE_ENDPOINT.key(), MOTO_ENDPOINT);
hiveSyncProps.setProperty(HoodieAWSConfig.AWS_GLUE_REGION.key(), "eu-west-1");
hiveSyncProps.setProperty(META_SYNC_BASE_PATH.key(), tablePath);
hiveSyncProps.setProperty(META_SYNC_DATABASE_NAME.key(), DB_NAME);

HiveSyncConfig hiveSyncConfig = new HiveSyncConfig(hiveSyncProps, new Configuration());
fileSystem = hiveSyncConfig.getHadoopFileSystem();
fileSystem.mkdirs(new Path(tablePath));
Configuration configuration = new Configuration();
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(TABLE_NAME)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(configuration, tablePath);

glueSync = new AWSGlueCatalogSyncClient(new HiveSyncConfig(hiveSyncProps));
glueSync.awsGlue.createDatabase(CreateDatabaseRequest.builder().databaseInput(DatabaseInput.builder().name(DB_NAME).build()).build()).get();

glueSync.awsGlue.createTable(CreateTableRequest.builder().databaseName(DB_NAME)
.tableInput(TableInput.builder().name(TABLE_NAME).partitionKeys(
partitionsColumn)
.storageDescriptor(
StorageDescriptor.builder()
.serdeInfo(SerDeInfo.builder().serializationLibrary("").build())
.location(tablePath)
.columns(
Column.builder().name("col1").type("string").build()
)
.build())
.build()).build()).get();
}

@AfterEach
public void teardown() throws Exception {
glueSync.awsGlue.deleteTable(DeleteTableRequest.builder().databaseName(DB_NAME).name(TABLE_NAME).build()).get();
glueSync.awsGlue.deleteDatabase(DeleteDatabaseRequest.builder().name(DB_NAME).build()).get();
fileSystem.delete(new Path(tablePath), true);
}

@Test
public void testEmptyPartitionShouldReturnEmpty() {
Assertions.assertEquals(0, glueSync.getPartitionsByFilter(TABLE_NAME,
glueSync.generatePushDownFilter(Arrays.asList("1/bar"), partitionsFieldSchema)).size());
}

@Test
public void testPresentPartitionShouldReturnIt() throws ExecutionException, InterruptedException {
glueSync.awsGlue.createPartition(CreatePartitionRequest.builder().databaseName(DB_NAME).tableName(TABLE_NAME)
.partitionInput(PartitionInput.builder()
.storageDescriptor(StorageDescriptor.builder().columns(partitionsColumn).build())
.values("1", "b'ar").build()).build()).get();

Assertions.assertEquals(1, glueSync.getPartitionsByFilter(TABLE_NAME,
glueSync.generatePushDownFilter(Arrays.asList("1/b'ar", "2/foo", "1/b''ar"), partitionsFieldSchema)).size());
}
}
Loading

0 comments on commit ff0e67f

Please sign in to comment.