[HUDI-3058] Simplify Precommit file system view (#4570)
This commit is contained in:
@@ -22,7 +22,6 @@ import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
@@ -75,16 +74,12 @@ public class HoodieTablePreCommitFileSystemView {
|
||||
new HoodieBaseFile(new Path(tableMetaClient.getBasePath(), writeStat.getPath()).toString())));
|
||||
|
||||
Stream<HoodieBaseFile> committedBaseFiles = this.completedCommitsFileSystemView.getLatestBaseFiles(partitionStr);
|
||||
Stream<HoodieBaseFile> baseFilesForCommittedFileIds = committedBaseFiles
|
||||
// Remove files replaced by current inflight commit
|
||||
.filter(baseFile -> !replacedFileIdsForPartition.contains(baseFile.getFileId()))
|
||||
// if there is new version of file created by inflight commit, use that file instead of committed version
|
||||
.map(baseFile -> {
|
||||
HoodieBaseFile fileIdNewVersionExists = newFilesWrittenForPartition.remove(baseFile.getFileId());
|
||||
return Option.ofNullable(fileIdNewVersionExists).orElse(baseFile);
|
||||
});
|
||||
|
||||
Stream<HoodieBaseFile> baseFilesWithNewFileIds = newFilesWrittenForPartition.values().stream();
|
||||
return Stream.concat(baseFilesForCommittedFileIds, baseFilesWithNewFileIds);
|
||||
Map<String, HoodieBaseFile> allFileIds = committedBaseFiles
|
||||
// Remove files replaced by current inflight commit
|
||||
.filter(baseFile -> !replacedFileIdsForPartition.contains(baseFile.getFileId()))
|
||||
.collect(Collectors.toMap(HoodieBaseFile::getFileId, baseFile -> baseFile));
|
||||
|
||||
allFileIds.putAll(newFilesWrittenForPartition);
|
||||
return allFileIds.values().stream();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user