From 8e08d498c9c822e5d23ef2392863a97ce6fc1a96 Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Fri, 14 Jun 2019 14:22:48 -0700 Subject: [PATCH] Reading baseCommitTime from the latest file slice as opposed to the tagged record value --- .../uber/hoodie/table/HoodieMergeOnReadTable.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 3c578d49e..bcc219015 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -489,13 +489,13 @@ public class HoodieMergeOnReadTable extends private HoodieRollbackStat rollback(HoodieIndex hoodieIndex, String partitionPath, String commit, HoodieCommitMetadata commitMetadata, final Map filesToDeletedStatus, Map filesToNumBlocksRollback, Set deletedFiles) { - // The following needs to be done since GlobalIndex at the moment does not store the latest commit time. - // Also, wStat.getPrevCommit() might not give the right commit time in the following + // wStat.getPrevCommit() might not give the right commit time in the following // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be // used to write the new log files. In this case, the commit time for the log file is the compaction requested time. - Map fileIdToBaseCommitTimeForLogMap = - hoodieIndex.isGlobal() ? this.getRTFileSystemView().getLatestFileSlices(partitionPath) - .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)) : null; + // But the index (global) might store the baseCommit of the parquet and not the requested, hence get the + // baseCommit always by listing the file slice + Map fileIdToBaseCommitTimeForLogMap = this.getRTFileSystemView().getLatestFileSlices(partitionPath) + .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)); commitMetadata.getPartitionToWriteStats().get(partitionPath).stream() .filter(wStat -> { // Filter out stats without prevCommit since they are all inserts @@ -503,10 +503,7 @@ public class HoodieMergeOnReadTable extends && !deletedFiles.contains(wStat.getFileId()); }).forEach(wStat -> { HoodieLogFormat.Writer writer = null; - String baseCommitTime = wStat.getPrevCommit(); - if (hoodieIndex.isGlobal()) { - baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); - } + String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); try { writer = HoodieLogFormat.newWriterBuilder().onParentPath( new Path(this.getMetaClient().getBasePath(), partitionPath))