From 3bd8fc1c3eb1d647d4f181b20fd54c6860cf24dc Mon Sep 17 00:00:00 2001 From: satishkotha Date: Mon, 7 Feb 2022 12:16:50 -0800 Subject: [PATCH] [HUDI-3058] Simplify Precommit file system view (#4570) --- .../HoodieTablePreCommitFileSystemView.java | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java index 76f7e3ca5..7401617a6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java @@ -22,7 +22,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.util.Option; import java.util.Collections; import java.util.List; @@ -75,16 +74,12 @@ public class HoodieTablePreCommitFileSystemView { new HoodieBaseFile(new Path(tableMetaClient.getBasePath(), writeStat.getPath()).toString()))); Stream committedBaseFiles = this.completedCommitsFileSystemView.getLatestBaseFiles(partitionStr); - Stream baseFilesForCommittedFileIds = committedBaseFiles - // Remove files replaced by current inflight commit - .filter(baseFile -> !replacedFileIdsForPartition.contains(baseFile.getFileId())) - // if there is new version of file created by inflight commit, use that file instead of committed version - .map(baseFile -> { - HoodieBaseFile fileIdNewVersionExists = newFilesWrittenForPartition.remove(baseFile.getFileId()); - return Option.ofNullable(fileIdNewVersionExists).orElse(baseFile); - }); - - Stream baseFilesWithNewFileIds = newFilesWrittenForPartition.values().stream(); - return Stream.concat(baseFilesForCommittedFileIds, baseFilesWithNewFileIds); + Map allFileIds = committedBaseFiles + // Remove files replaced by current inflight commit + .filter(baseFile -> !replacedFileIdsForPartition.contains(baseFile.getFileId())) + .collect(Collectors.toMap(HoodieBaseFile::getFileId, baseFile -> baseFile)); + + allFileIds.putAll(newFilesWrittenForPartition); + return allFileIds.values().stream(); } }