From 23d53763c4db744fe936e2e609fe6abaf7bd9a82 Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Wed, 2 May 2018 00:52:37 -0700 Subject: [PATCH] enabling global index for MOR --- .../hoodie/table/HoodieMergeOnReadTable.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 79239ada8..857402564 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -39,6 +39,7 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieCompactionException; import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.exception.HoodieUpsertException; +import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.io.HoodieAppendHandle; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; import java.io.IOException; @@ -154,11 +155,10 @@ public class HoodieMergeOnReadTable extends // Atomically un-publish all non-inflight commits commitsAndCompactions.entrySet().stream().map(entry -> entry.getValue()) .filter(i -> !i.isInflight()).forEach(this.getActiveTimeline()::revertToInflight); - logger.info("Unpublished " + commits); - Long startTime = System.currentTimeMillis(); - + // TODO (NA) : remove this once HoodieIndex is a member of HoodieTable + HoodieIndex hoodieIndex = HoodieIndex.createIndex(config, jsc); List allRollbackStats = jsc.parallelize(FSUtils .getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning())) @@ -195,17 +195,24 @@ public class HoodieMergeOnReadTable extends // append rollback blocks for updates if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { + // This needs to be done since GlobalIndex at the moment does not store the latest commit time + Map fileIdToLatestCommitTimeMap = + hoodieIndex.isGlobal() ? this.getRTFileSystemView().getLatestFileSlices(partitionPath) + .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseCommitTime)) : null; commitMetadata.getPartitionToWriteStats().get(partitionPath).stream() .filter(wStat -> { - return wStat != null - && wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT + return wStat != null && wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT && wStat.getPrevCommit() != null; }).forEach(wStat -> { HoodieLogFormat.Writer writer = null; + String baseCommitTime = wStat.getPrevCommit(); + if (hoodieIndex.isGlobal()) { + baseCommitTime = fileIdToLatestCommitTimeMap.get(wStat.getFileId()); + } try { writer = HoodieLogFormat.newWriterBuilder().onParentPath( new Path(this.getMetaClient().getBasePath(), partitionPath)) - .withFileId(wStat.getFileId()).overBaseCommit(wStat.getPrevCommit()) + .withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime) .withFs(this.metaClient.getFs()) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); Long numRollbackBlocks = 0L;