[HUDI-3637] Exclude uncommitted log files from metadata table validation (#5234)
This commit is contained in:
@@ -32,15 +32,19 @@ import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.model.HoodieFileGroup;
|
||||
import org.apache.hudi.common.model.HoodieLogFile;
|
||||
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.TableSchemaResolver;
|
||||
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
||||
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewManager;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||
import org.apache.hudi.common.util.CleanerUtils;
|
||||
import org.apache.hudi.common.util.FileIOUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ParquetUtils;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
@@ -57,10 +61,13 @@ import org.apache.hudi.utilities.util.BloomFilterData;
|
||||
import com.beust.jcommander.JCommander;
|
||||
import com.beust.jcommander.Parameter;
|
||||
import jline.internal.Log;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.parquet.avro.AvroSchemaConverter;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
|
||||
@@ -70,6 +77,7 @@ import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
@@ -78,6 +86,8 @@ import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
|
||||
|
||||
/**
|
||||
* A validator with spark-submit to compare information, such as partitions, file listing, index, etc.,
|
||||
* between metadata table and filesystem.
|
||||
@@ -578,9 +588,9 @@ public class HoodieMetadataTableValidator implements Serializable {
|
||||
|
||||
LOG.debug("All file slices from metadata: " + allFileSlicesFromMeta + ". For partitions " + partitionPath);
|
||||
LOG.debug("All file slices from direct listing: " + allFileSlicesFromFS + ". For partitions " + partitionPath);
|
||||
validate(allFileSlicesFromMeta, allFileSlicesFromFS, partitionPath, "file slices");
|
||||
|
||||
LOG.info("Validation of all file groups succeeded for partition " + partitionPath);
|
||||
validateFileSlices(
|
||||
allFileSlicesFromMeta, allFileSlicesFromFS, partitionPath,
|
||||
fsBasedContext.getMetaClient(), "all file groups");
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -605,16 +615,8 @@ public class HoodieMetadataTableValidator implements Serializable {
|
||||
|
||||
LOG.debug("Latest base file from metadata: " + latestFilesFromMetadata + ". For partitions " + partitionPath);
|
||||
LOG.debug("Latest base file from direct listing: " + latestFilesFromFS + ". For partitions " + partitionPath);
|
||||
if (latestFilesFromMetadata.size() != latestFilesFromFS.size()
|
||||
|| !latestFilesFromMetadata.equals(latestFilesFromFS)) {
|
||||
String message = "Validation of metadata get latest base file for partition " + partitionPath + " failed. "
|
||||
+ "Latest base file from metadata: " + latestFilesFromMetadata
|
||||
+ "Latest base file from direct listing: " + latestFilesFromFS;
|
||||
LOG.error(message);
|
||||
throw new HoodieValidationException(message);
|
||||
} else {
|
||||
LOG.info("Validation of getLatestBaseFiles succeeded for partition " + partitionPath);
|
||||
}
|
||||
|
||||
validate(latestFilesFromMetadata, latestFilesFromFS, partitionPath, "latest base files");
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -639,8 +641,9 @@ public class HoodieMetadataTableValidator implements Serializable {
|
||||
LOG.debug("Latest file list from metadata: " + latestFileSlicesFromMetadataTable + ". For partition " + partitionPath);
|
||||
LOG.debug("Latest file list from direct listing: " + latestFileSlicesFromFS + ". For partition " + partitionPath);
|
||||
|
||||
validate(latestFileSlicesFromMetadataTable, latestFileSlicesFromFS, partitionPath, "file slices");
|
||||
LOG.info("Validation of getLatestFileSlices succeeded for partition " + partitionPath);
|
||||
validateFileSlices(
|
||||
latestFileSlicesFromMetadataTable, latestFileSlicesFromFS, partitionPath,
|
||||
fsBasedContext.getMetaClient(), "latest file slices");
|
||||
}
|
||||
|
||||
private List<FileSlice> filterFileSliceBasedOnInflightCleaning(List<FileSlice> sortedLatestFileSliceList, Set<String> baseDataFilesForCleaning) {
|
||||
@@ -675,8 +678,6 @@ public class HoodieMetadataTableValidator implements Serializable {
|
||||
.getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
|
||||
|
||||
validate(metadataBasedColStats, fsBasedColStats, partitionPath, "column stats");
|
||||
|
||||
LOG.info("Validation of column stats succeeded for partition " + partitionPath);
|
||||
}
|
||||
|
||||
private void validateBloomFilters(
|
||||
@@ -692,8 +693,6 @@ public class HoodieMetadataTableValidator implements Serializable {
|
||||
.getSortedBloomFilterList(partitionPath, latestBaseFilenameList);
|
||||
|
||||
validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath, "bloom filters");
|
||||
|
||||
LOG.info("Validation of bloom filters succeeded for partition " + partitionPath);
|
||||
}
|
||||
|
||||
private List<String> getLatestBaseFileNames(HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
|
||||
@@ -723,6 +722,121 @@ public class HoodieMetadataTableValidator implements Serializable {
|
||||
}
|
||||
}
|
||||
|
||||
private void validateFileSlices(
|
||||
List<FileSlice> fileSliceListFromMetadataTable, List<FileSlice> fileSliceListFromFS,
|
||||
String partitionPath, HoodieTableMetaClient metaClient, String label) {
|
||||
boolean mismatch = false;
|
||||
if (fileSliceListFromMetadataTable.size() != fileSliceListFromFS.size()) {
|
||||
mismatch = true;
|
||||
} else if (!fileSliceListFromMetadataTable.equals(fileSliceListFromFS)) {
|
||||
for (int i = 0; i < fileSliceListFromMetadataTable.size(); i++) {
|
||||
FileSlice fileSlice1 = fileSliceListFromMetadataTable.get(i);
|
||||
FileSlice fileSlice2 = fileSliceListFromFS.get(i);
|
||||
if (!Objects.equals(fileSlice1.getFileGroupId(), fileSlice2.getFileGroupId())
|
||||
|| !Objects.equals(fileSlice1.getBaseInstantTime(), fileSlice2.getBaseInstantTime())
|
||||
|| !Objects.equals(fileSlice1.getBaseFile(), fileSlice2.getBaseFile())) {
|
||||
mismatch = true;
|
||||
break;
|
||||
}
|
||||
if (!areFileSliceCommittedLogFilesMatching(fileSlice1, fileSlice2, metaClient)) {
|
||||
mismatch = true;
|
||||
break;
|
||||
} else {
|
||||
LOG.warn(String.format("There are uncommitted log files in the latest file slices "
|
||||
+ "but the committed log files match: %s %s", fileSlice1, fileSlice2));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (mismatch) {
|
||||
String message = String.format("Validation of %s for partition %s failed."
|
||||
+ "\n%s from metadata: %s\n%s from file system and base files: %s",
|
||||
label, partitionPath, label, fileSliceListFromMetadataTable, label, fileSliceListFromFS);
|
||||
LOG.error(message);
|
||||
throw new HoodieValidationException(message);
|
||||
} else {
|
||||
LOG.info(String.format("Validation of %s succeeded for partition %s", label, partitionPath));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compares committed log files from two file slices.
|
||||
*
|
||||
* @param fs1 File slice 1
|
||||
* @param fs2 File slice 2
|
||||
* @param metaClient {@link HoodieTableMetaClient} instance
|
||||
* @return {@code true} if matching; {@code false} otherwise.
|
||||
*/
|
||||
private boolean areFileSliceCommittedLogFilesMatching(
|
||||
FileSlice fs1, FileSlice fs2, HoodieTableMetaClient metaClient) {
|
||||
Set<String> fs1LogPathSet =
|
||||
fs1.getLogFiles().map(f -> f.getPath().toString()).collect(Collectors.toSet());
|
||||
Set<String> fs2LogPathSet =
|
||||
fs2.getLogFiles().map(f -> f.getPath().toString()).collect(Collectors.toSet());
|
||||
Set<String> commonLogPathSet = new HashSet<>(fs1LogPathSet);
|
||||
commonLogPathSet.retainAll(fs2LogPathSet);
|
||||
// Only keep log file paths that differ
|
||||
fs1LogPathSet.removeAll(commonLogPathSet);
|
||||
fs2LogPathSet.removeAll(commonLogPathSet);
|
||||
// Check if the remaining log files are uncommitted. If there is any log file
|
||||
// that is committed, the committed log files of two file slices are different
|
||||
FileSystem fileSystem = metaClient.getFs();
|
||||
HoodieTimeline commitsTimeline = metaClient.getCommitsTimeline();
|
||||
if (hasCommittedLogFiles(fileSystem, fs1LogPathSet, commitsTimeline)) {
|
||||
LOG.error("The first file slice has committed log files that cause mismatching: "
|
||||
+ fs1);
|
||||
return false;
|
||||
}
|
||||
if (hasCommittedLogFiles(fileSystem, fs2LogPathSet, commitsTimeline)) {
|
||||
LOG.error("The second file slice has committed log files that cause mismatching: "
|
||||
+ fs2);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean hasCommittedLogFiles(
|
||||
FileSystem fs, Set<String> logFilePathSet, HoodieTimeline commitsTimeline) {
|
||||
if (logFilePathSet.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
AvroSchemaConverter converter = new AvroSchemaConverter();
|
||||
HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
|
||||
HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights();
|
||||
|
||||
for (String logFilePathStr : logFilePathSet) {
|
||||
HoodieLogFormat.Reader reader = null;
|
||||
try {
|
||||
Schema readerSchema =
|
||||
converter.convert(Objects.requireNonNull(
|
||||
TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePathStr))));
|
||||
reader =
|
||||
HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFilePathStr)), readerSchema);
|
||||
// read the avro blocks
|
||||
if (reader.hasNext()) {
|
||||
HoodieLogBlock block = reader.next();
|
||||
final String instantTime = block.getLogBlockHeader().get(INSTANT_TIME);
|
||||
if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
|
||||
|| inflightInstantsTimeline.containsInstant(instantTime)) {
|
||||
// hit an uncommitted block possibly from a failed write
|
||||
LOG.warn("Log file is uncommitted: " + logFilePathStr);
|
||||
} else {
|
||||
LOG.warn("Log file is committed: " + logFilePathStr);
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
LOG.warn("There is no log block in " + logFilePathStr);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new HoodieValidationException("Validation failed due to IOException", e);
|
||||
} finally {
|
||||
FileIOUtils.closeQuietly(reader);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public class AsyncMetadataTableValidateService extends HoodieAsyncService {
|
||||
private final transient ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
|
||||
@@ -824,6 +938,10 @@ public class HoodieMetadataTableValidator implements Serializable {
|
||||
}
|
||||
}
|
||||
|
||||
public HoodieTableMetaClient getMetaClient() {
|
||||
return metaClient;
|
||||
}
|
||||
|
||||
public List<HoodieBaseFile> getSortedLatestBaseFileList(String partitionPath) {
|
||||
return fileSystemView.getLatestBaseFiles(partitionPath)
|
||||
.sorted(new HoodieBaseFileComparator()).collect(Collectors.toList());
|
||||
|
||||
Reference in New Issue
Block a user