[Hudi-3376] Add an option to skip under deletion files for HoodieMetadataTableValidator (#4994)
Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
@@ -19,6 +19,7 @@
|
||||
package org.apache.hudi.utilities;
|
||||
|
||||
import org.apache.hudi.async.HoodieAsyncService;
|
||||
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
||||
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
||||
import org.apache.hudi.common.bloom.BloomFilter;
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
||||
@@ -29,16 +30,21 @@ import org.apache.hudi.common.model.BaseFile;
|
||||
import org.apache.hudi.common.model.FileSlice;
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.model.HoodieFileGroup;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.TableSchemaResolver;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewManager;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||
import org.apache.hudi.common.util.CleanerUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ParquetUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.exception.HoodieValidationException;
|
||||
import org.apache.hudi.io.storage.HoodieFileReader;
|
||||
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
|
||||
@@ -63,6 +69,7 @@ import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
@@ -173,6 +180,9 @@ public class HoodieMetadataTableValidator implements Serializable {
|
||||
+ "Can use --min-validate-interval-seconds to control validation frequency", required = false)
|
||||
public boolean continuous = false;
|
||||
|
||||
@Parameter(names = {"--skip-data-files-for-cleaning"}, description = "Skip to compare the data files which are under deletion by cleaner", required = false)
|
||||
public boolean skipDataFilesForCleaning = false;
|
||||
|
||||
@Parameter(names = {"--validate-latest-file-slices"}, description = "Validate latest file slices for all partitions.", required = false)
|
||||
public boolean validateLatestFileSlices = false;
|
||||
|
||||
@@ -230,6 +240,7 @@ public class HoodieMetadataTableValidator implements Serializable {
|
||||
+ " --validate-all-column-stats " + validateAllColumnStats + ", \n"
|
||||
+ " --validate-bloom-filters " + validateBloomFilters + ", \n"
|
||||
+ " --continuous " + continuous + ", \n"
|
||||
+ " --skip-data-files-for-cleaning " + skipDataFilesForCleaning + ", \n"
|
||||
+ " --ignore-failed " + ignoreFailed + ", \n"
|
||||
+ " --min-validate-interval-seconds " + minValidateIntervalSeconds + ", \n"
|
||||
+ " --parallelism " + parallelism + ", \n"
|
||||
@@ -252,6 +263,7 @@ public class HoodieMetadataTableValidator implements Serializable {
|
||||
Config config = (Config) o;
|
||||
return basePath.equals(config.basePath)
|
||||
&& Objects.equals(continuous, config.continuous)
|
||||
&& Objects.equals(skipDataFilesForCleaning, config.skipDataFilesForCleaning)
|
||||
&& Objects.equals(validateLatestFileSlices, config.validateLatestFileSlices)
|
||||
&& Objects.equals(validateLatestBaseFiles, config.validateLatestBaseFiles)
|
||||
&& Objects.equals(validateAllFileGroups, config.validateAllFileGroups)
|
||||
@@ -269,7 +281,7 @@ public class HoodieMetadataTableValidator implements Serializable {
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(basePath, continuous, validateLatestFileSlices, validateLatestBaseFiles,
|
||||
return Objects.hash(basePath, continuous, skipDataFilesForCleaning, validateLatestFileSlices, validateLatestBaseFiles,
|
||||
validateAllFileGroups, validateAllColumnStats, validateBloomFilters, minValidateIntervalSeconds,
|
||||
parallelism, ignoreFailed, sparkMaster, sparkMemory, assumeDatePartitioning, propsFilePath, configs, help);
|
||||
}
|
||||
@@ -345,6 +357,33 @@ public class HoodieMetadataTableValidator implements Serializable {
|
||||
boolean finalResult = true;
|
||||
metaClient.reloadActiveTimeline();
|
||||
String basePath = metaClient.getBasePath();
|
||||
Set<String> baseFilesForCleaning = Collections.emptySet();
|
||||
|
||||
if (cfg.skipDataFilesForCleaning) {
|
||||
HoodieTimeline inflightCleaningTimeline = metaClient.getActiveTimeline().getCleanerTimeline().filterInflights();
|
||||
|
||||
baseFilesForCleaning = inflightCleaningTimeline.getInstants().flatMap(instant -> {
|
||||
try {
|
||||
// convert inflight instant to requested and get clean plan
|
||||
instant = new HoodieInstant(HoodieInstant.State.REQUESTED, instant.getAction(), instant.getTimestamp());
|
||||
HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(metaClient, instant);
|
||||
|
||||
return cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().flatMap(cleanerFileInfoList -> {
|
||||
return cleanerFileInfoList.stream().map(fileInfo -> {
|
||||
return new Path(fileInfo.getFilePath()).getName();
|
||||
});
|
||||
});
|
||||
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Error reading cleaner metadata for " + instant);
|
||||
}
|
||||
// only take care of base files here.
|
||||
}).filter(path -> {
|
||||
String fileExtension = FSUtils.getFileExtension(path);
|
||||
return HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(fileExtension);
|
||||
}).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
|
||||
List<String> allPartitions = validatePartitions(engineContext, basePath);
|
||||
HoodieMetadataValidationContext metadataTableBasedContext =
|
||||
@@ -352,9 +391,10 @@ public class HoodieMetadataTableValidator implements Serializable {
|
||||
HoodieMetadataValidationContext fsBasedContext =
|
||||
new HoodieMetadataValidationContext(engineContext, cfg, metaClient, false);
|
||||
|
||||
Set<String> finalBaseFilesForCleaning = baseFilesForCleaning;
|
||||
List<Boolean> result = engineContext.parallelize(allPartitions, allPartitions.size()).map(partitionPath -> {
|
||||
try {
|
||||
validateFilesInPartition(metadataTableBasedContext, fsBasedContext, partitionPath);
|
||||
validateFilesInPartition(metadataTableBasedContext, fsBasedContext, partitionPath, finalBaseFilesForCleaning);
|
||||
LOG.info("Metadata table validation succeeded for " + partitionPath);
|
||||
return true;
|
||||
} catch (HoodieValidationException e) {
|
||||
@@ -410,42 +450,64 @@ public class HoodieMetadataTableValidator implements Serializable {
|
||||
* @param metadataTableBasedContext Validation context containing information based on metadata table
|
||||
* @param fsBasedContext Validation context containing information based on the file system
|
||||
* @param partitionPath Partition path String
|
||||
* @param baseDataFilesForCleaning Base files for un-complete cleaner action
|
||||
*/
|
||||
private void validateFilesInPartition(
|
||||
HoodieMetadataValidationContext metadataTableBasedContext,
|
||||
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
|
||||
HoodieMetadataValidationContext fsBasedContext, String partitionPath,
|
||||
Set<String> baseDataFilesForCleaning) {
|
||||
if (cfg.validateLatestFileSlices) {
|
||||
validateLatestFileSlices(metadataTableBasedContext, fsBasedContext, partitionPath);
|
||||
validateLatestFileSlices(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
|
||||
}
|
||||
|
||||
if (cfg.validateLatestBaseFiles) {
|
||||
validateLatestBaseFiles(metadataTableBasedContext, fsBasedContext, partitionPath);
|
||||
validateLatestBaseFiles(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
|
||||
}
|
||||
|
||||
if (cfg.validateAllFileGroups) {
|
||||
validateAllFileGroups(metadataTableBasedContext, fsBasedContext, partitionPath);
|
||||
validateAllFileGroups(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
|
||||
}
|
||||
|
||||
if (cfg.validateAllColumnStats) {
|
||||
validateAllColumnStats(metadataTableBasedContext, fsBasedContext, partitionPath);
|
||||
validateAllColumnStats(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
|
||||
}
|
||||
|
||||
if (cfg.validateBloomFilters) {
|
||||
validateBloomFilters(metadataTableBasedContext, fsBasedContext, partitionPath);
|
||||
validateBloomFilters(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
|
||||
}
|
||||
}
|
||||
|
||||
private void validateAllFileGroups(
|
||||
HoodieMetadataValidationContext metadataTableBasedContext,
|
||||
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
|
||||
List<FileSlice> allFileSlicesFromMeta = metadataTableBasedContext
|
||||
.getSortedAllFileGroupList(partitionPath).stream()
|
||||
.flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator())
|
||||
.collect(Collectors.toList());
|
||||
List<FileSlice> allFileSlicesFromFS = fsBasedContext
|
||||
.getSortedAllFileGroupList(partitionPath).stream()
|
||||
.flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator())
|
||||
.collect(Collectors.toList());
|
||||
HoodieMetadataValidationContext fsBasedContext,
|
||||
String partitionPath,
|
||||
Set<String> baseDataFilesForCleaning) {
|
||||
|
||||
List<FileSlice> allFileSlicesFromMeta;
|
||||
List<FileSlice> allFileSlicesFromFS;
|
||||
|
||||
if (!baseDataFilesForCleaning.isEmpty()) {
|
||||
List<FileSlice> fileSlicesFromMeta = metadataTableBasedContext
|
||||
.getSortedAllFileGroupList(partitionPath).stream()
|
||||
.flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator())
|
||||
.collect(Collectors.toList());
|
||||
List<FileSlice> fileSlicesFromFS = fsBasedContext
|
||||
.getSortedAllFileGroupList(partitionPath).stream()
|
||||
.flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
allFileSlicesFromMeta = filterFileSliceBasedOnInflightCleaning(fileSlicesFromMeta, baseDataFilesForCleaning);
|
||||
allFileSlicesFromFS = filterFileSliceBasedOnInflightCleaning(fileSlicesFromFS, baseDataFilesForCleaning);
|
||||
} else {
|
||||
allFileSlicesFromMeta = metadataTableBasedContext
|
||||
.getSortedAllFileGroupList(partitionPath).stream()
|
||||
.flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator())
|
||||
.collect(Collectors.toList());
|
||||
allFileSlicesFromFS = fsBasedContext
|
||||
.getSortedAllFileGroupList(partitionPath).stream()
|
||||
.flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
LOG.debug("All file slices from metadata: " + allFileSlicesFromMeta + ". For partitions " + partitionPath);
|
||||
LOG.debug("All file slices from direct listing: " + allFileSlicesFromFS + ". For partitions " + partitionPath);
|
||||
@@ -459,10 +521,20 @@ public class HoodieMetadataTableValidator implements Serializable {
|
||||
*/
|
||||
private void validateLatestBaseFiles(
|
||||
HoodieMetadataValidationContext metadataTableBasedContext,
|
||||
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
|
||||
HoodieMetadataValidationContext fsBasedContext,
|
||||
String partitionPath,
|
||||
Set<String> baseDataFilesForCleaning) {
|
||||
|
||||
List<HoodieBaseFile> latestFilesFromMetadata = metadataTableBasedContext.getSortedLatestBaseFileList(partitionPath);
|
||||
List<HoodieBaseFile> latestFilesFromFS = fsBasedContext.getSortedLatestBaseFileList(partitionPath);
|
||||
List<HoodieBaseFile> latestFilesFromMetadata;
|
||||
List<HoodieBaseFile> latestFilesFromFS;
|
||||
|
||||
if (!baseDataFilesForCleaning.isEmpty()) {
|
||||
latestFilesFromMetadata = filterBaseFileBasedOnInflightCleaning(metadataTableBasedContext.getSortedLatestBaseFileList(partitionPath), baseDataFilesForCleaning);
|
||||
latestFilesFromFS = filterBaseFileBasedOnInflightCleaning(fsBasedContext.getSortedLatestBaseFileList(partitionPath), baseDataFilesForCleaning);
|
||||
} else {
|
||||
latestFilesFromMetadata = metadataTableBasedContext.getSortedLatestBaseFileList(partitionPath);
|
||||
latestFilesFromFS = fsBasedContext.getSortedLatestBaseFileList(partitionPath);
|
||||
}
|
||||
|
||||
LOG.debug("Latest base file from metadata: " + latestFilesFromMetadata + ". For partitions " + partitionPath);
|
||||
LOG.debug("Latest base file from direct listing: " + latestFilesFromFS + ". For partitions " + partitionPath);
|
||||
@@ -483,10 +555,19 @@ public class HoodieMetadataTableValidator implements Serializable {
|
||||
*/
|
||||
private void validateLatestFileSlices(
|
||||
HoodieMetadataValidationContext metadataTableBasedContext,
|
||||
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
|
||||
HoodieMetadataValidationContext fsBasedContext,
|
||||
String partitionPath,
|
||||
Set<String> baseDataFilesForCleaning) {
|
||||
List<FileSlice> latestFileSlicesFromMetadataTable;
|
||||
List<FileSlice> latestFileSlicesFromFS;
|
||||
|
||||
List<FileSlice> latestFileSlicesFromMetadataTable = metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath);
|
||||
List<FileSlice> latestFileSlicesFromFS = fsBasedContext.getSortedLatestFileSliceList(partitionPath);
|
||||
if (!baseDataFilesForCleaning.isEmpty()) {
|
||||
latestFileSlicesFromMetadataTable = filterFileSliceBasedOnInflightCleaning(metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath), baseDataFilesForCleaning);
|
||||
latestFileSlicesFromFS = filterFileSliceBasedOnInflightCleaning(fsBasedContext.getSortedLatestFileSliceList(partitionPath), baseDataFilesForCleaning);
|
||||
} else {
|
||||
latestFileSlicesFromMetadataTable = metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath);
|
||||
latestFileSlicesFromFS = fsBasedContext.getSortedLatestFileSliceList(partitionPath);
|
||||
}
|
||||
|
||||
LOG.debug("Latest file list from metadata: " + latestFileSlicesFromMetadataTable + ". For partition " + partitionPath);
|
||||
LOG.debug("Latest file list from direct listing: " + latestFileSlicesFromFS + ". For partition " + partitionPath);
|
||||
@@ -495,11 +576,31 @@ public class HoodieMetadataTableValidator implements Serializable {
|
||||
LOG.info("Validation of getLatestFileSlices succeeded for partition " + partitionPath);
|
||||
}
|
||||
|
||||
private List<FileSlice> filterFileSliceBasedOnInflightCleaning(List<FileSlice> sortedLatestFileSliceList, Set<String> baseDataFilesForCleaning) {
|
||||
return sortedLatestFileSliceList.stream()
|
||||
.filter(fileSlice -> {
|
||||
if (!fileSlice.getBaseFile().isPresent()) {
|
||||
return true;
|
||||
} else {
|
||||
return !baseDataFilesForCleaning.contains(fileSlice.getBaseFile().get().getFileName());
|
||||
}
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private List<HoodieBaseFile> filterBaseFileBasedOnInflightCleaning(List<HoodieBaseFile> sortedBaseFileList, Set<String> baseDataFilesForCleaning) {
|
||||
return sortedBaseFileList.stream()
|
||||
.filter(baseFile -> {
|
||||
return !baseDataFilesForCleaning.contains(baseFile.getFileName());
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private void validateAllColumnStats(
|
||||
HoodieMetadataValidationContext metadataTableBasedContext,
|
||||
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
|
||||
List<String> latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath)
|
||||
.stream().map(BaseFile::getFileName).collect(Collectors.toList());
|
||||
HoodieMetadataValidationContext fsBasedContext,
|
||||
String partitionPath,
|
||||
Set<String> baseDataFilesForCleaning) {
|
||||
|
||||
List<String> latestBaseFilenameList = getLatestBaseFileNames(fsBasedContext, partitionPath, baseDataFilesForCleaning);
|
||||
List<HoodieColumnRangeMetadata<String>> metadataBasedColStats = metadataTableBasedContext
|
||||
.getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
|
||||
List<HoodieColumnRangeMetadata<String>> fsBasedColStats = fsBasedContext
|
||||
@@ -512,9 +613,11 @@ public class HoodieMetadataTableValidator implements Serializable {
|
||||
|
||||
private void validateBloomFilters(
|
||||
HoodieMetadataValidationContext metadataTableBasedContext,
|
||||
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
|
||||
List<String> latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath)
|
||||
.stream().map(BaseFile::getFileName).collect(Collectors.toList());
|
||||
HoodieMetadataValidationContext fsBasedContext,
|
||||
String partitionPath,
|
||||
Set<String> baseDataFilesForCleaning) {
|
||||
|
||||
List<String> latestBaseFilenameList = getLatestBaseFileNames(fsBasedContext, partitionPath, baseDataFilesForCleaning);
|
||||
List<BloomFilterData> metadataBasedBloomFilters = metadataTableBasedContext
|
||||
.getSortedBloomFilterList(partitionPath, latestBaseFilenameList);
|
||||
List<BloomFilterData> fsBasedBloomFilters = fsBasedContext
|
||||
@@ -525,6 +628,19 @@ public class HoodieMetadataTableValidator implements Serializable {
|
||||
LOG.info("Validation of bloom filters succeeded for partition " + partitionPath);
|
||||
}
|
||||
|
||||
private List<String> getLatestBaseFileNames(HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
|
||||
List<String> latestBaseFilenameList;
|
||||
if (!baseDataFilesForCleaning.isEmpty()) {
|
||||
List<HoodieBaseFile> sortedLatestBaseFileList = fsBasedContext.getSortedLatestBaseFileList(partitionPath);
|
||||
latestBaseFilenameList = filterBaseFileBasedOnInflightCleaning(sortedLatestBaseFileList, baseDataFilesForCleaning)
|
||||
.stream().map(BaseFile::getFileName).collect(Collectors.toList());
|
||||
} else {
|
||||
latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath)
|
||||
.stream().map(BaseFile::getFileName).collect(Collectors.toList());
|
||||
}
|
||||
return latestBaseFilenameList;
|
||||
}
|
||||
|
||||
private <T> void validate(
|
||||
List<T> infoListFromMetadataTable, List<T> infoListFromFS, String partitionPath, String label) {
|
||||
if (infoListFromMetadataTable.size() != infoListFromFS.size()
|
||||
|
||||
Reference in New Issue
Block a user