Skip to content

Commit

Permalink
MDT validator can configure if log truncation applies
Browse files Browse the repository at this point in the history
bump up unit test coverage
  • Loading branch information
Davis-Zhang-Onehouse committed Feb 24, 2025
1 parent 2ffe8cf commit 4a35671
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,10 @@ public static <T> String toStringWithThreshold(List<T> objectList, int lengthThr
if (objectList == null || objectList.isEmpty()) {
return "";
}
// For non-positive value, we will not do any truncation.
if (lengthThreshold <= 0) {
return objectList.toString();
}
StringBuilder sb = new StringBuilder();

for (Object obj : objectList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ void testToStringWithThreshold() {
toStringWithThreshold(Collections.singletonList(str1), 2));
assertEquals("string_...",
toStringWithThreshold(Collections.singletonList(str1), str1.length() - 3));
assertEquals("[string_value1]",
toStringWithThreshold(Collections.singletonList(str1), 0));
assertEquals(str1,
toStringWithThreshold(Collections.singletonList(str1), str1.length()));
assertEquals(str1,
Expand All @@ -253,6 +255,8 @@ void testToStringWithThreshold() {
toStringWithThreshold(stringList, str1.length() + str2.length() + str3.length() - 3));
assertEquals("string_value1,string_value2,string_value3",
toStringWithThreshold(stringList, str1.length() + str2.length() + str3.length() + 2));
assertEquals("[string_value1, string_value2, string_value3]",
toStringWithThreshold(stringList, - 1));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ public class HoodieMetadataTableValidator implements Serializable {

private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(HoodieMetadataTableValidator.class);
static final int LOG_DETAIL_MAX_LENGTH = 100_000;

// Spark context
private transient JavaSparkContext jsc;
Expand Down Expand Up @@ -403,6 +402,9 @@ public static class Config implements Serializable {
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;

@Parameter(names = {"--log-detail-max-length"}, description = "Maximum length for log details in validation messages. To avoid truncation, input a non-positive value.")
public int logDetailMaxLength = 100_000;

@Override
public String toString() {
return "MetadataTableValidatorConfig {\n"
Expand All @@ -429,6 +431,7 @@ public String toString() {
+ " --spark-master " + sparkMaster + ", \n"
+ " --spark-memory " + sparkMemory + ", \n"
+ " --assumeDatePartitioning-memory " + assumeDatePartitioning + ", \n"
+ " --log-detail-max-length " + logDetailMaxLength + ", \n"
+ " --props " + propsFilePath + ", \n"
+ " --hoodie-conf " + configs
+ "\n}";
Expand Down Expand Up @@ -465,6 +468,7 @@ public boolean equals(Object o) {
&& Objects.equals(sparkMaster, config.sparkMaster)
&& Objects.equals(sparkMemory, config.sparkMemory)
&& Objects.equals(assumeDatePartitioning, config.assumeDatePartitioning)
&& Objects.equals(logDetailMaxLength, config.logDetailMaxLength)
&& Objects.equals(propsFilePath, config.propsFilePath)
&& Objects.equals(configs, config.configs);
}
Expand All @@ -476,7 +480,7 @@ public int hashCode() {
validateSecondaryIndex, validateRecordIndexCount, validateRecordIndexContent, numRecordIndexErrorSamples,
viewStorageTypeForFSListing, viewStorageTypeForMetadata,
minValidateIntervalSeconds, parallelism, recordIndexParallelism, ignoreFailed,
sparkMaster, sparkMemory, assumeDatePartitioning, propsFilePath, configs, help);
sparkMaster, sparkMemory, assumeDatePartitioning, logDetailMaxLength, propsFilePath, configs, help);
}
}

Expand Down Expand Up @@ -776,12 +780,12 @@ List<String> validatePartitions(HoodieSparkEngineContext engineContext, StorageP
if (misMatch.get()) {
String message = "Compare Partitions Failed! " + " Additional "
+ additionalFromFS.size() + " partitions from FS, but missing from MDT : \""
+ toStringWithThreshold(additionalFromFS, LOG_DETAIL_MAX_LENGTH)
+ toStringWithThreshold(additionalFromFS, cfg.logDetailMaxLength)
+ "\" and additional " + actualAdditionalPartitionsInMDT.size()
+ "partitions from MDT, but missing from FS listing : \""
+ toStringWithThreshold(actualAdditionalPartitionsInMDT, LOG_DETAIL_MAX_LENGTH)
+ " partitions from MDT, but missing from FS listing : \""
+ toStringWithThreshold(actualAdditionalPartitionsInMDT, cfg.logDetailMaxLength)
+ "\".\n All " + allPartitionPathsFromFS.size() + " partitions from FS listing "
+ toStringWithThreshold(allPartitionPathsFromFS, LOG_DETAIL_MAX_LENGTH);
+ toStringWithThreshold(allPartitionPathsFromFS, cfg.logDetailMaxLength);
LOG.error(message);
throw new HoodieValidationException(message);
}
Expand Down Expand Up @@ -1401,9 +1405,9 @@ <T> void validate(
+ "MDT-based listing (%s): %s.",
label,
infoListFromFS.size(),
toStringWithThreshold(infoListFromFS, LOG_DETAIL_MAX_LENGTH),
toStringWithThreshold(infoListFromFS, cfg.logDetailMaxLength),
infoListFromMetadataTable.size(),
toStringWithThreshold(infoListFromMetadataTable, LOG_DETAIL_MAX_LENGTH));
toStringWithThreshold(infoListFromMetadataTable, cfg.logDetailMaxLength));
mismatch = true;
} else {
for (int i = 0; i < infoListFromMetadataTable.size(); i++) {
Expand Down Expand Up @@ -1440,9 +1444,9 @@ void validateFileSlices(
+ "metadata table. File system-based listing (%s file slices): %s; "
+ "MDT-based listing (%s file slices): %s.",
fileSliceListFromFS.size(),
toStringWithThreshold(fileSliceListFromFS, LOG_DETAIL_MAX_LENGTH),
toStringWithThreshold(fileSliceListFromFS, cfg.logDetailMaxLength),
fileSliceListFromMetadataTable.size(),
toStringWithThreshold(fileSliceListFromMetadataTable, LOG_DETAIL_MAX_LENGTH));
toStringWithThreshold(fileSliceListFromMetadataTable, cfg.logDetailMaxLength));
mismatch = true;
} else if (!fileSliceListFromMetadataTable.equals(fileSliceListFromFS)) {
// In-memory cache for the set of committed files of commits of interest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
Expand Down Expand Up @@ -105,7 +106,6 @@
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
import static org.apache.hudi.common.util.StringUtils.toStringWithThreshold;
import static org.apache.hudi.common.util.TestStringUtils.generateRandomString;
import static org.apache.hudi.utilities.HoodieMetadataTableValidator.LOG_DETAIL_MAX_LENGTH;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.StringType;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
Expand All @@ -123,6 +123,8 @@ private static Stream<Arguments> lastNFileSlicesTestArgs() {
return Stream.of(-1, 1, 3, 4, 5).flatMap(i -> Stream.of(Arguments.of(i, true), Arguments.of(i, false)));
}

private final int logDetailMaxLength = new HoodieMetadataTableValidator.Config().logDetailMaxLength;

private static Stream<Arguments> viewStorageArgs() {
return Stream.of(
Arguments.of(null, null, false),
Expand Down Expand Up @@ -900,10 +902,10 @@ private <T> void runValidateAndVerify(HoodieMetadataTableValidator validator,
List<T> listMdt, List<T> listFs, T newItem) {
assertEquals(
oversizeList,
toStringWithThreshold(listMdt, Integer.MAX_VALUE).length() > LOG_DETAIL_MAX_LENGTH);
toStringWithThreshold(listMdt, Integer.MAX_VALUE).length() > logDetailMaxLength);
assertEquals(
oversizeList,
toStringWithThreshold(listFs, Integer.MAX_VALUE).length() > LOG_DETAIL_MAX_LENGTH);
toStringWithThreshold(listFs, Integer.MAX_VALUE).length() > logDetailMaxLength);
// Equal case
assertDoesNotThrow(() ->
validator.validate(listMdt, listFs, partition, label));
Expand All @@ -919,8 +921,8 @@ private <T> void runValidateAndVerify(HoodieMetadataTableValidator validator,
+ "the metadata table. File system-based listing (%s): %s; "
+ "MDT-based listing (%s): %s.",
label, partition, basePath, label, listFs.size(),
toStringWithThreshold(listFs, LOG_DETAIL_MAX_LENGTH),
listMdt.size(), toStringWithThreshold(listMdt, LOG_DETAIL_MAX_LENGTH)),
toStringWithThreshold(listFs, logDetailMaxLength),
listMdt.size(), toStringWithThreshold(listMdt, logDetailMaxLength)),
exception.getMessage());
listFs.remove(listFs.size() - 1);
// Item mismatch
Expand Down Expand Up @@ -968,10 +970,10 @@ void testValidateFileSlices(boolean oversizeList) {
TimelineUtils.generateInstantTime(true, timeGenerator)).getLeft());
assertEquals(
oversizeList,
toStringWithThreshold(listMdt, Integer.MAX_VALUE).length() > LOG_DETAIL_MAX_LENGTH);
toStringWithThreshold(listMdt, Integer.MAX_VALUE).length() > logDetailMaxLength);
assertEquals(
oversizeList,
toStringWithThreshold(listFs, Integer.MAX_VALUE).length() > LOG_DETAIL_MAX_LENGTH);
toStringWithThreshold(listFs, Integer.MAX_VALUE).length() > logDetailMaxLength);
Exception exception = assertThrows(
HoodieValidationException.class,
() -> validator.validateFileSlices(listMdt, listFs, partition, metaClient, label));
Expand All @@ -982,8 +984,8 @@ void testValidateFileSlices(boolean oversizeList) {
+ "metadata table. File system-based listing (%s file slices): %s; "
+ "MDT-based listing (%s file slices): %s.",
label, partition, basePath, listFs.size(),
toStringWithThreshold(listFs, LOG_DETAIL_MAX_LENGTH),
listMdt.size(), toStringWithThreshold(listMdt, LOG_DETAIL_MAX_LENGTH)),
toStringWithThreshold(listFs, logDetailMaxLength),
listMdt.size(), toStringWithThreshold(listMdt, logDetailMaxLength)),
exception.getMessage());
listFs.remove(listFs.size() - 1);
// Item mismatch
Expand Down Expand Up @@ -1364,4 +1366,183 @@ protected Dataset<Row> makeUpdateDf(String instantTime, Integer n) {
throw new RuntimeException(e);
}
}

@Test
void testLogDetailMaxLength() {
Map<String, String> writeOptions = new HashMap<>();
writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
writeOptions.put("hoodie.table.name", "test_table");
writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), "MERGE_ON_READ");
writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key");
writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp");
writeOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition_path");

// Create a large dataset to generate long validation messages
Dataset<Row> inserts = makeInsertDf("000", 1000).cache();
inserts.write().format("hudi").options(writeOptions)
.option(DataSourceWriteOptions.OPERATION().key(), WriteOperationType.BULK_INSERT.value())
.mode(SaveMode.Overwrite)
.save(basePath);

// Test with default max length
HoodieMetadataTableValidator.Config config = new HoodieMetadataTableValidator.Config();
config.basePath = "file:" + basePath;
config.validateLatestFileSlices = true;
config.validateAllFileGroups = true;
MockHoodieMetadataTableValidator validator = new MockHoodieMetadataTableValidator(jsc, config);

// Generate two unequal lists to trigger validation error
TimeGenerator timeGenerator = TimeGenerators
.getTimeGenerator(HoodieTimeGeneratorConfig.defaultConfig(basePath),
HadoopFSUtils.getStorageConf(jsc.hadoopConfiguration()));
Pair<List<FileSlice>, List<FileSlice>> filelistPair = generateTwoEqualFileSliceList(500, timeGenerator);
List<FileSlice> listMdt = filelistPair.getLeft();
List<FileSlice> listFs = new ArrayList<>(filelistPair.getRight());
listFs.add(generateRandomFileSlice(TimelineUtils.generateInstantTime(true, timeGenerator),
TimelineUtils.generateInstantTime(true, timeGenerator),
TimelineUtils.generateInstantTime(true, timeGenerator)).getLeft());

// Verify default behavior (100,000 chars)
MockHoodieMetadataTableValidator finalValidator = validator;
Exception exception = assertThrows(
HoodieValidationException.class,
() -> finalValidator.validateFileSlices(listMdt, listFs, "partition", metaClient, "test"));
// The message include 3 parts: Truncated file slice list of MDT, truncated file slice list of File system, other exception message strings.
assertTrue(exception.getMessage().length() <= 100_000 * 2 + 1000);

// Test with custom small max length
config.logDetailMaxLength = 1000;
validator = new MockHoodieMetadataTableValidator(jsc, config);
MockHoodieMetadataTableValidator finalValidator1 = validator;
exception = assertThrows(
HoodieValidationException.class,
() -> finalValidator1.validateFileSlices(listMdt, listFs, "partition", metaClient, "test"));
// The message include 3 parts: Truncated file slice list of MDT, truncated file slice list of File system, other exception message strings.
assertTrue(exception.getMessage().length() <= 1000 * 2 + 1000);

// Test with custom large max length
config.logDetailMaxLength = 200_000;
validator = new MockHoodieMetadataTableValidator(jsc, config);
MockHoodieMetadataTableValidator finalValidator2 = validator;
exception = assertThrows(
HoodieValidationException.class,
() -> finalValidator2.validateFileSlices(listMdt, listFs, "partition", metaClient, "test"));
// The message include 3 parts: Truncated file slice list of MDT, truncated file slice list of File system, other exception message strings.
assertTrue(exception.getMessage().length() <= 200_000 * 2 + 1000);
}

@Test
void testValidatePartitionsTruncation() throws IOException {
// Setup mock objects
HoodieMetadataTableValidator.Config config = new HoodieMetadataTableValidator.Config();
config.basePath = basePath;
config.logDetailMaxLength = 100; // Small length to force truncation

MockHoodieMetadataTableValidator validator = new MockHoodieMetadataTableValidator(jsc, config);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
HoodieStorage fs = mock(HoodieStorage.class);

// Generate long partition lists that will exceed the truncation threshold
List<String> mdtPartitions = new ArrayList<>();
List<String> fsPartitions = new ArrayList<>();
for (int i = 0; i < 20; i++) {
mdtPartitions.add("partition_" + generateRandomString(20));
}
for (int i = 0; i < 15; i++) {
fsPartitions.add("partition_" + generateRandomString(20));
}

// Setup mocks
when(metaClient.getStorage()).thenReturn(fs);
for (String partition : mdtPartitions) {
when(fs.exists(new StoragePath(basePath + "/" + partition))).thenReturn(true);
}

// Mock timeline
HoodieTimeline commitsTimeline = mock(HoodieTimeline.class);
HoodieTimeline completedTimeline = mock(HoodieTimeline.class);
when(metaClient.getCommitsTimeline()).thenReturn(commitsTimeline);
when(commitsTimeline.filterCompletedInstants()).thenReturn(completedTimeline);

// Setup validator with test data
validator.setMetadataPartitionsToReturn(mdtPartitions);
validator.setFsPartitionsToReturn(fsPartitions);

// Test validation with truncation
HoodieValidationException exception = assertThrows(HoodieValidationException.class, () -> {
validator.validatePartitions(engineContext, new StoragePath(basePath), metaClient);
});

// Verify truncation in error message
String errorMsg = exception.getMessage();
assertTrue(errorMsg.contains("...")); // Should contain truncation indicator
assertTrue(errorMsg.length() <= config.logDetailMaxLength * 2 + 1000); // Account for both lists and additional message text

// Verify the error message contains the count of partitions
assertTrue(errorMsg.contains(String.format("Additional %d partitions from FS, but missing from MDT : ",
fsPartitions.size())));
assertTrue(errorMsg.contains(String.format("additional %d partitions from MDT, but missing from FS listing :",
mdtPartitions.size())));
}

@Test
void testValidateFileSlicesTruncation() {
// Setup mock objects
HoodieMetadataTableValidator.Config config = new HoodieMetadataTableValidator.Config();
config.basePath = basePath;
config.logDetailMaxLength = 100; // Small length to force truncation

MockHoodieMetadataTableValidator validator = new MockHoodieMetadataTableValidator(jsc, config);

// Generate large lists of file slices that will exceed truncation threshold
String partition = "partition_" + generateRandomString(10);
List<FileSlice> mdtFileSlices = new ArrayList<>();
List<FileSlice> fsFileSlices = new ArrayList<>();

// Generate 20 file slices for MDT and 15 for FS to ensure they're different
TimeGenerator timeGenerator = TimeGenerators
.getTimeGenerator(HoodieTimeGeneratorConfig.defaultConfig(basePath),
HadoopFSUtils.getStorageConf(jsc.hadoopConfiguration()));
for (int i = 0; i < 20; i++) {
String fileId = UUID.randomUUID().toString();

String baseInstantTime = metaClient.createNewInstantTime();

// Create file slice with base file and log files
HoodieBaseFile baseFile = new HoodieBaseFile(FSUtils.makeBaseFileName(
baseInstantTime, "1-0-1", fileId, HoodieFileFormat.PARQUET.getFileExtension()));
List<HoodieLogFile> logFiles = Arrays.asList(
new HoodieLogFile(FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, baseInstantTime, 1, "1-0-1")),
new HoodieLogFile(FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, baseInstantTime, 2, "1-0-1"))
);

FileSlice slice = new FileSlice(new HoodieFileGroupId(partition, fileId), baseInstantTime);
slice.setBaseFile(baseFile);
logFiles.forEach(slice::addLogFile);
mdtFileSlices.add(slice);

// Add to FS list for first 15 entries
if (i < 15) {
fsFileSlices.add(new FileSlice(slice));
}
}

// Test validation with truncation
HoodieValidationException exception = assertThrows(
HoodieValidationException.class,
() -> validator.validateFileSlices(mdtFileSlices, fsFileSlices, partition, metaClient, "test"));

String errorMsg = exception.getMessage();

// Verify truncation behavior
assertTrue(errorMsg.contains("...")); // Should contain truncation indicator
assertTrue(errorMsg.length() <= config.logDetailMaxLength * 2 + 1000); // Account for both lists and additional message text

// Verify error message contains file slice counts
assertTrue(errorMsg.contains(String.format("Number of file slices based on the file system does not match that based on the metadata table. File system-based listing (%d file slices)",
fsFileSlices.size())));
assertTrue(errorMsg.contains(String.format("MDT-based listing (%d file slices)",
mdtFileSlices.size())));
}
}

0 comments on commit 4a35671

Please sign in to comment.