1
0

Reading baseCommitTime from the latest file slice as opposed to the tagged record value

This commit is contained in:
Nishith Agarwal
2019-06-14 14:22:48 -07:00
committed by vinoth chandar
parent 129e433641
commit 8e08d498c9

View File

@@ -489,13 +489,13 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
private HoodieRollbackStat rollback(HoodieIndex hoodieIndex, String partitionPath, String commit, private HoodieRollbackStat rollback(HoodieIndex hoodieIndex, String partitionPath, String commit,
HoodieCommitMetadata commitMetadata, final Map<FileStatus, Boolean> filesToDeletedStatus, HoodieCommitMetadata commitMetadata, final Map<FileStatus, Boolean> filesToDeletedStatus,
Map<FileStatus, Long> filesToNumBlocksRollback, Set<String> deletedFiles) { Map<FileStatus, Long> filesToNumBlocksRollback, Set<String> deletedFiles) {
// The following needs to be done since GlobalIndex at the moment does not store the latest commit time. // wStat.getPrevCommit() might not give the right commit time in the following
// Also, wStat.getPrevCommit() might not give the right commit time in the following
// scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be
// used to write the new log files. In this case, the commit time for the log file is the compaction requested time. // used to write the new log files. In this case, the commit time for the log file is the compaction requested time.
Map<String, String> fileIdToBaseCommitTimeForLogMap = // But the index (global) might store the baseCommit of the parquet and not the requested, hence get the
hoodieIndex.isGlobal() ? this.getRTFileSystemView().getLatestFileSlices(partitionPath) // baseCommit always by listing the file slice
.collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)) : null; Map<String, String> fileIdToBaseCommitTimeForLogMap = this.getRTFileSystemView().getLatestFileSlices(partitionPath)
.collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime));
commitMetadata.getPartitionToWriteStats().get(partitionPath).stream() commitMetadata.getPartitionToWriteStats().get(partitionPath).stream()
.filter(wStat -> { .filter(wStat -> {
// Filter out stats without prevCommit since they are all inserts // Filter out stats without prevCommit since they are all inserts
@@ -503,10 +503,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
&& !deletedFiles.contains(wStat.getFileId()); && !deletedFiles.contains(wStat.getFileId());
}).forEach(wStat -> { }).forEach(wStat -> {
HoodieLogFormat.Writer writer = null; HoodieLogFormat.Writer writer = null;
String baseCommitTime = wStat.getPrevCommit(); String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
if (hoodieIndex.isGlobal()) {
baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
}
try { try {
writer = HoodieLogFormat.newWriterBuilder().onParentPath( writer = HoodieLogFormat.newWriterBuilder().onParentPath(
new Path(this.getMetaClient().getBasePath(), partitionPath)) new Path(this.getMetaClient().getBasePath(), partitionPath))