1
0

Enable hive sync even if there is no compaction commit

This commit is contained in:
Nishith Agarwal
2017-11-30 17:21:34 -08:00
committed by prazanna
parent 9b610f82c7
commit 051f600b7f

View File

@@ -326,27 +326,32 @@ public class HoodieHiveClient {
.getCompactionTimeline().filterCompletedInstants().lastInstant(); .getCompactionTimeline().filterCompletedInstants().lastInstant();
LOG.info("Found the last compaction commit as " + lastCompactionCommit); LOG.info("Found the last compaction commit as " + lastCompactionCommit);
Optional<HoodieInstant> lastDeltaCommitAfterCompaction = Optional.empty(); Optional<HoodieInstant> lastDeltaCommit;
if (lastCompactionCommit.isPresent()) { if (lastCompactionCommit.isPresent()) {
lastDeltaCommitAfterCompaction = metaClient.getActiveTimeline() lastDeltaCommit = metaClient.getActiveTimeline()
.getDeltaCommitTimeline() .getDeltaCommitTimeline()
.filterCompletedInstants() .filterCompletedInstants()
.findInstantsAfter(lastCompactionCommit.get().getTimestamp(), Integer.MAX_VALUE) .findInstantsAfter(lastCompactionCommit.get().getTimestamp(), Integer.MAX_VALUE)
.lastInstant(); .lastInstant();
} else {
lastDeltaCommit = metaClient.getActiveTimeline()
.getDeltaCommitTimeline()
.filterCompletedInstants()
.lastInstant();
} }
LOG.info("Found the last delta commit after last compaction as " LOG.info("Found the last delta commit "
+ lastDeltaCommitAfterCompaction); + lastDeltaCommit);
if (lastDeltaCommitAfterCompaction.isPresent()) { if (lastDeltaCommit.isPresent()) {
HoodieInstant lastDeltaCommit = lastDeltaCommitAfterCompaction.get(); HoodieInstant lastDeltaInstant = lastDeltaCommit.get();
// read from the log file wrote // read from the log file wrote
commitMetadata = HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(activeTimeline.getInstantDetails(lastDeltaCommit).get()); .fromBytes(activeTimeline.getInstantDetails(lastDeltaInstant).get());
filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values() filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values()
.stream().filter(s -> s.contains( .stream().filter(s -> s.contains(
HoodieLogFile.DELTA_EXTENSION)).findAny() HoodieLogFile.DELTA_EXTENSION)).findAny()
.orElseThrow(() -> new IllegalArgumentException( .orElseThrow(() -> new IllegalArgumentException(
"Could not find any data file written for commit " + lastDeltaCommit "Could not find any data file written for commit " + lastDeltaInstant
+ ", could not get schema for dataset " + metaClient.getBasePath())); + ", could not get schema for dataset " + metaClient.getBasePath()));
return readSchemaFromLogFile(lastCompactionCommit, new Path(filePath)); return readSchemaFromLogFile(lastCompactionCommit, new Path(filePath));
} else { } else {