[HUDI-3635] Fix HoodieMetadataTableValidator around comparison of partition path listing (#5100)
Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.common.model;
|
package org.apache.hudi.common.model;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
@@ -135,6 +136,23 @@ public class HoodiePartitionMetadata {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read out the COMMIT_TIME_KEY metadata for this partition.
|
||||||
|
*/
|
||||||
|
public Option<String> readPartitionCreatedCommitTime() {
|
||||||
|
try {
|
||||||
|
if (props.containsKey(COMMIT_TIME_KEY)) {
|
||||||
|
return Option.of(props.getProperty(COMMIT_TIME_KEY));
|
||||||
|
} else {
|
||||||
|
readFromFS();
|
||||||
|
return Option.of(props.getProperty(COMMIT_TIME_KEY));
|
||||||
|
}
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.warn("Error fetch Hoodie partition metadata for " + partitionPath, ioe);
|
||||||
|
return Option.empty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// methods related to partition meta data
|
// methods related to partition meta data
|
||||||
public static boolean hasPartitionMetadata(FileSystem fs, Path partitionPath) {
|
public static boolean hasPartitionMetadata(FileSystem fs, Path partitionPath) {
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
|
|||||||
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
|
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||||
import org.apache.hudi.common.model.HoodieFileGroup;
|
import org.apache.hudi.common.model.HoodieFileGroup;
|
||||||
|
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.TableSchemaResolver;
|
import org.apache.hudi.common.table.TableSchemaResolver;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
@@ -443,6 +444,21 @@ public class HoodieMetadataTableValidator implements Serializable {
|
|||||||
private List<String> validatePartitions(HoodieSparkEngineContext engineContext, String basePath) {
|
private List<String> validatePartitions(HoodieSparkEngineContext engineContext, String basePath) {
|
||||||
// compare partitions
|
// compare partitions
|
||||||
List<String> allPartitionPathsFromFS = FSUtils.getAllPartitionPaths(engineContext, basePath, false, cfg.assumeDatePartitioning);
|
List<String> allPartitionPathsFromFS = FSUtils.getAllPartitionPaths(engineContext, basePath, false, cfg.assumeDatePartitioning);
|
||||||
|
HoodieTimeline completedTimeline = metaClient.getActiveTimeline().filterCompletedInstants();
|
||||||
|
|
||||||
|
// ignore partitions created by uncommitted ingestion.
|
||||||
|
allPartitionPathsFromFS = allPartitionPathsFromFS.stream().parallel().filter(part -> {
|
||||||
|
HoodiePartitionMetadata hoodiePartitionMetadata = new HoodiePartitionMetadata(metaClient.getFs(), new Path(basePath, part));
|
||||||
|
|
||||||
|
Option<String> instantOption = hoodiePartitionMetadata.readPartitionCreatedCommitTime();
|
||||||
|
if (instantOption.isPresent()) {
|
||||||
|
String instantTime = instantOption.get();
|
||||||
|
return completedTimeline.containsOrBeforeTimelineStarts(instantTime);
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
|
||||||
List<String> allPartitionPathsMeta = FSUtils.getAllPartitionPaths(engineContext, basePath, true, cfg.assumeDatePartitioning);
|
List<String> allPartitionPathsMeta = FSUtils.getAllPartitionPaths(engineContext, basePath, true, cfg.assumeDatePartitioning);
|
||||||
|
|
||||||
Collections.sort(allPartitionPathsFromFS);
|
Collections.sort(allPartitionPathsFromFS);
|
||||||
|
|||||||
Reference in New Issue
Block a user