Skip to content

Commit

Permalink
Use comparator for sort
Browse files Browse the repository at this point in the history
  • Loading branch information
Vamsi committed Feb 24, 2025
1 parent a3ddda6 commit 3a5e401
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,17 @@ public void testSimpleBucketIndexPartitionerConfig() {
assertEquals("org.apache.hudi.table.action.commit.UpsertPartitioner", overwritePartitioner.getString(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME));
}

@Test
void testBloomIndexFileIdKeySortPartitionerConfig() {
Properties props = new Properties();
props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp")
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BLOOM)
.bloomIndexFileGroupIdKeySortPartitioner(true).build())
.build();
assertTrue(writeConfig.useBloomIndexFileGroupIdKeySortPartitioner());
}

@Test
public void testAutoAdjustCleanPolicyForNonBlockingConcurrencyControl() {
TypedProperties props = new TypedProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ public HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecor
.map(Tuple2::_1)
.mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true);
} else if (config.useBloomIndexFileGroupIdKeySortPartitioner()) {
keyLookupResultRDD = fileComparisonsRDD.map(fileGroupAndRecordKey -> fileGroupAndRecordKey)
.sortBy(fileGroupAndRecordKey -> fileGroupAndRecordKey._1
+ "+" + fileGroupAndRecordKey._2, true, targetParallelism
).mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true);
keyLookupResultRDD = fileComparisonsRDD.mapToPair(fileGroupAndRecordKey -> new Tuple2<>(fileGroupAndRecordKey, false))
.sortByKey(new FileGroupIdAndRecordKeyComparator(), true, targetParallelism)
.map(Tuple2::_1)
.mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true);
} else {
keyLookupResultRDD = fileComparisonsRDD.sortByKey(true, targetParallelism)
.mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true);
Expand All @@ -197,6 +197,17 @@ public int compare(Tuple2<HoodieFileGroupId, String> o1, Tuple2<HoodieFileGroupI
}
}

private static class FileGroupIdAndRecordKeyComparator implements Comparator<Tuple2<HoodieFileGroupId, String>>, Serializable {
@Override
public int compare(Tuple2<HoodieFileGroupId, String> o1, Tuple2<HoodieFileGroupId, String> o2) {
int fileGroupIdComparison = o1._1.compareTo(o2._1);
if (fileGroupIdComparison != 0) {
return fileGroupIdComparison;
}
return o1._2.compareTo(o2._2);
}
}

/**
* Compute the estimated number of bloom filter comparisons to be performed on each file group.
*/
Expand Down

0 comments on commit 3a5e401

Please sign in to comment.