From 2b60641d17f60fc0d90ad470d6dead95805495fa Mon Sep 17 00:00:00 2001 From: YueZhang <69956021+zhangyue19921010@users.noreply.github.com> Date: Thu, 31 Mar 2022 05:23:37 +0800 Subject: [PATCH] [HUDI-3635] Fix HoodieMetadataTableValidator around comparison of partition path listing (#5100) Co-authored-by: yuezhang --- .../common/model/HoodiePartitionMetadata.java | 18 ++++++++++++++++++ .../HoodieMetadataTableValidator.java | 16 ++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java index faad46653..3a19c187f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.model; +import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hadoop.fs.FSDataInputStream; @@ -135,6 +136,23 @@ public class HoodiePartitionMetadata { } } + /** + * Read out the COMMIT_TIME_KEY metadata for this partition. + */ + public Option readPartitionCreatedCommitTime() { + try { + if (props.containsKey(COMMIT_TIME_KEY)) { + return Option.of(props.getProperty(COMMIT_TIME_KEY)); + } else { + readFromFS(); + return Option.of(props.getProperty(COMMIT_TIME_KEY)); + } + } catch (IOException ioe) { + LOG.warn("Error fetch Hoodie partition metadata for " + partitionPath, ioe); + return Option.empty(); + } + } + // methods related to partition meta data public static boolean hasPartitionMetadata(FileSystem fs, Path partitionPath) { try { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index 16866558c..991edc530 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileGroup; +import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -443,6 +444,21 @@ public class HoodieMetadataTableValidator implements Serializable { private List validatePartitions(HoodieSparkEngineContext engineContext, String basePath) { // compare partitions List allPartitionPathsFromFS = FSUtils.getAllPartitionPaths(engineContext, basePath, false, cfg.assumeDatePartitioning); + HoodieTimeline completedTimeline = metaClient.getActiveTimeline().filterCompletedInstants(); + + // ignore partitions created by uncommitted ingestion. + allPartitionPathsFromFS = allPartitionPathsFromFS.stream().parallel().filter(part -> { + HoodiePartitionMetadata hoodiePartitionMetadata = new HoodiePartitionMetadata(metaClient.getFs(), new Path(basePath, part)); + + Option instantOption = hoodiePartitionMetadata.readPartitionCreatedCommitTime(); + if (instantOption.isPresent()) { + String instantTime = instantOption.get(); + return completedTimeline.containsOrBeforeTimelineStarts(instantTime); + } else { + return false; + } + }).collect(Collectors.toList()); + List allPartitionPathsMeta = FSUtils.getAllPartitionPaths(engineContext, basePath, true, cfg.assumeDatePartitioning); Collections.sort(allPartitionPathsFromFS);