1
0

[HUDI-3421]Pending clustering may break AbstractTableFileSystemView#getxxBaseFile() (#4810)

This commit is contained in:
YueZhang
2022-02-25 19:16:27 +08:00
committed by GitHub
parent a4ee7463ae
commit 742810070b
4 changed files with 258 additions and 12 deletions

View File

@@ -380,6 +380,19 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
&& baseFile.getCommitTime().equals(compactionWithInstantTime.get().getKey());
}
/**
* With async clustering, it is possible to see partial/complete base-files due to inflight-clustering, Ignore those
* base-files.
*
* @param baseFile base File
*/
protected boolean isBaseFileDueToPendingClustering(HoodieBaseFile baseFile) {
List<String> pendingReplaceInstants =
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
return !pendingReplaceInstants.isEmpty() && pendingReplaceInstants.contains(baseFile.getCommitTime());
}
/**
* Returns true if the file-group is under pending-compaction and the file-slice' baseInstant matches compaction
* Instant.
@@ -492,7 +505,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
.map(fileGroup -> Option.fromJavaOptional(fileGroup.getAllBaseFiles()
.filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime
))
.filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst()))
.filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst()))
.filter(Option::isPresent).map(Option::get)
.map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, df.getFileId()), df));
} finally {
@@ -511,7 +524,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
} else {
return fetchHoodieFileGroup(partitionPath, fileId).map(fileGroup -> fileGroup.getAllBaseFiles()
.filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.EQUALS,
instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst().orElse(null))
instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst().orElse(null))
.map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, fileId), df));
}
} finally {
@@ -547,7 +560,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
.filter(fileGroup -> !isFileGroupReplacedBeforeAny(fileGroup.getFileGroupId(), commitsToReturn))
.map(fileGroup -> Pair.of(fileGroup.getFileGroupId(), Option.fromJavaOptional(
fileGroup.getAllBaseFiles().filter(baseFile -> commitsToReturn.contains(baseFile.getCommitTime())
&& !isBaseFileDueToPendingCompaction(baseFile)).findFirst()))).filter(p -> p.getValue().isPresent())
&& !isBaseFileDueToPendingCompaction(baseFile) && !isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p -> p.getValue().isPresent())
.map(p -> addBootstrapBaseFileIfPresent(p.getKey(), p.getValue().get()));
} finally {
readLock.unlock();
@@ -563,7 +576,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
return fetchAllBaseFiles(partitionPath)
.filter(df -> !isFileGroupReplaced(partitionPath, df.getFileId()))
.filter(df -> visibleCommitsAndCompactionTimeline.containsOrBeforeTimelineStarts(df.getCommitTime()))
.filter(df -> !isBaseFileDueToPendingCompaction(df))
.filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df))
.map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, df.getFileId()), df));
} finally {
readLock.unlock();
@@ -953,7 +966,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
protected Option<HoodieBaseFile> getLatestBaseFile(HoodieFileGroup fileGroup) {
return Option
.fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst());
.fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst());
}
/**