diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java index cb46cec82426..f50a884bcd28 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java @@ -615,6 +615,17 @@ public void testSimpleBucketIndexPartitionerConfig() { assertEquals("org.apache.hudi.table.action.commit.UpsertPartitioner", overwritePartitioner.getString(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME)); } + @Test + void testBloomIndexFileIdKeySortPartitionerConfig() { + Properties props = new Properties(); + props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid"); + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp") + .withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BLOOM) + .bloomIndexFileGroupIdKeySortPartitioner(true).build()) + .build(); + assertTrue(writeConfig.useBloomIndexFileGroupIdKeySortPartitioner()); + } + @Test public void testAutoAdjustCleanPolicyForNonBlockingConcurrencyControl() { TypedProperties props = new TypedProperties(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java index 8fa51f02aa81..99be1bc511f8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java @@ -171,10 +171,10 @@ public HoodiePairData findMatchingFilesForRecor .map(Tuple2::_1) .mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true); } else if (config.useBloomIndexFileGroupIdKeySortPartitioner()) { - keyLookupResultRDD = fileComparisonsRDD.map(fileGroupAndRecordKey -> fileGroupAndRecordKey) - .sortBy(fileGroupAndRecordKey -> fileGroupAndRecordKey._1 - + "+" + fileGroupAndRecordKey._2, true, targetParallelism - ).mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true); + keyLookupResultRDD = fileComparisonsRDD.mapToPair(fileGroupAndRecordKey -> new Tuple2<>(fileGroupAndRecordKey, false)) + .sortByKey(new FileGroupIdAndRecordKeyComparator(), true, targetParallelism) + .map(Tuple2::_1) + .mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true); } else { keyLookupResultRDD = fileComparisonsRDD.sortByKey(true, targetParallelism) .mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true); @@ -197,6 +197,17 @@ public int compare(Tuple2 o1, Tuple2>, Serializable { + @Override + public int compare(Tuple2 o1, Tuple2 o2) { + int fileGroupIdComparison = o1._1.compareTo(o2._1); + if (fileGroupIdComparison != 0) { + return fileGroupIdComparison; + } + return o1._2.compareTo(o2._2); + } + } + /** * Compute the estimated number of bloom filter comparisons to be performed on each file group. */