enabling global index for MOR
This commit is contained in:
committed by
vinoth chandar
parent
dfc0c61eb7
commit
23d53763c4
@@ -39,6 +39,7 @@ import com.uber.hoodie.config.HoodieWriteConfig;
|
|||||||
import com.uber.hoodie.exception.HoodieCompactionException;
|
import com.uber.hoodie.exception.HoodieCompactionException;
|
||||||
import com.uber.hoodie.exception.HoodieRollbackException;
|
import com.uber.hoodie.exception.HoodieRollbackException;
|
||||||
import com.uber.hoodie.exception.HoodieUpsertException;
|
import com.uber.hoodie.exception.HoodieUpsertException;
|
||||||
|
import com.uber.hoodie.index.HoodieIndex;
|
||||||
import com.uber.hoodie.io.HoodieAppendHandle;
|
import com.uber.hoodie.io.HoodieAppendHandle;
|
||||||
import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor;
|
import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@@ -154,11 +155,10 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
|
|||||||
// Atomically un-publish all non-inflight commits
|
// Atomically un-publish all non-inflight commits
|
||||||
commitsAndCompactions.entrySet().stream().map(entry -> entry.getValue())
|
commitsAndCompactions.entrySet().stream().map(entry -> entry.getValue())
|
||||||
.filter(i -> !i.isInflight()).forEach(this.getActiveTimeline()::revertToInflight);
|
.filter(i -> !i.isInflight()).forEach(this.getActiveTimeline()::revertToInflight);
|
||||||
|
|
||||||
logger.info("Unpublished " + commits);
|
logger.info("Unpublished " + commits);
|
||||||
|
|
||||||
Long startTime = System.currentTimeMillis();
|
Long startTime = System.currentTimeMillis();
|
||||||
|
// TODO (NA) : remove this once HoodieIndex is a member of HoodieTable
|
||||||
|
HoodieIndex hoodieIndex = HoodieIndex.createIndex(config, jsc);
|
||||||
List<HoodieRollbackStat> allRollbackStats = jsc.parallelize(FSUtils
|
List<HoodieRollbackStat> allRollbackStats = jsc.parallelize(FSUtils
|
||||||
.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
|
.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
|
||||||
config.shouldAssumeDatePartitioning()))
|
config.shouldAssumeDatePartitioning()))
|
||||||
@@ -195,17 +195,24 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
|
|||||||
|
|
||||||
// append rollback blocks for updates
|
// append rollback blocks for updates
|
||||||
if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
|
if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
|
||||||
|
// This needs to be done since GlobalIndex at the moment does not store the latest commit time
|
||||||
|
Map<String, String> fileIdToLatestCommitTimeMap =
|
||||||
|
hoodieIndex.isGlobal() ? this.getRTFileSystemView().getLatestFileSlices(partitionPath)
|
||||||
|
.collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseCommitTime)) : null;
|
||||||
commitMetadata.getPartitionToWriteStats().get(partitionPath).stream()
|
commitMetadata.getPartitionToWriteStats().get(partitionPath).stream()
|
||||||
.filter(wStat -> {
|
.filter(wStat -> {
|
||||||
return wStat != null
|
return wStat != null && wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT
|
||||||
&& wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT
|
|
||||||
&& wStat.getPrevCommit() != null;
|
&& wStat.getPrevCommit() != null;
|
||||||
}).forEach(wStat -> {
|
}).forEach(wStat -> {
|
||||||
HoodieLogFormat.Writer writer = null;
|
HoodieLogFormat.Writer writer = null;
|
||||||
|
String baseCommitTime = wStat.getPrevCommit();
|
||||||
|
if (hoodieIndex.isGlobal()) {
|
||||||
|
baseCommitTime = fileIdToLatestCommitTimeMap.get(wStat.getFileId());
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
writer = HoodieLogFormat.newWriterBuilder().onParentPath(
|
writer = HoodieLogFormat.newWriterBuilder().onParentPath(
|
||||||
new Path(this.getMetaClient().getBasePath(), partitionPath))
|
new Path(this.getMetaClient().getBasePath(), partitionPath))
|
||||||
.withFileId(wStat.getFileId()).overBaseCommit(wStat.getPrevCommit())
|
.withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime)
|
||||||
.withFs(this.metaClient.getFs())
|
.withFs(this.metaClient.getFs())
|
||||||
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
|
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
|
||||||
Long numRollbackBlocks = 0L;
|
Long numRollbackBlocks = 0L;
|
||||||
|
|||||||
Reference in New Issue
Block a user