From 9b78523d62d83253068fcb545c6b60be516be5b4 Mon Sep 17 00:00:00 2001 From: Balaji Varadarajan Date: Thu, 31 May 2018 14:16:19 -0700 Subject: [PATCH] Ensure Cleaner and Archiver do not delete file-slices and workload marked for compaction --- .../com/uber/hoodie/HoodieWriteClient.java | 88 +++++- .../com/uber/hoodie/io/HoodieCleanHelper.java | 70 +++-- .../hoodie/io/HoodieCommitArchiveLog.java | 77 ++++- .../java/com/uber/hoodie/TestCleaner.java | 276 ++++++++++++++++-- .../com/uber/hoodie/TestClientRollback.java | 2 +- .../common/HoodieTestDataGenerator.java | 9 + .../hoodie/io/TestHoodieCommitArchiveLog.java | 164 +++++++++++ .../common/model/CompactionOperation.java | 2 +- .../table/view/HoodieTableFileSystemView.java | 32 +- .../hoodie/common/model/HoodieTestUtils.java | 22 ++ 10 files changed, 666 insertions(+), 76 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 061fa908e..8ff78619e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -36,6 +36,7 @@ import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.timeline.HoodieInstant.State; import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieCompactionConfig; @@ -65,6 +66,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; @@ -91,6 +93,7 @@ public class HoodieWriteClient implements Seriali private final transient FileSystem fs; private final transient JavaSparkContext jsc; private final HoodieWriteConfig config; + private final boolean rollbackInFlight; private final transient HoodieMetrics metrics; private final transient HoodieIndex index; private transient Timer.Context writeContext = null; @@ -122,10 +125,7 @@ public class HoodieWriteClient implements Seriali this.config = clientConfig; this.index = index; this.metrics = new HoodieMetrics(config, config.getTableName()); - - if (rollbackInFlight) { - rollbackInflightCommits(); - } + this.rollbackInFlight = rollbackInFlight; } public static SparkConf registerClasses(SparkConf conf) { @@ -681,6 +681,42 @@ public class HoodieWriteClient implements Seriali logger.info("Savepoint " + savepointTime + " deleted"); } + /** + * Delete a compaction request that is pending. + * + * NOTE - This is an Admin operation. + * With async compaction, this is expected to be called with async compaction and ingestion shutdown. + * Otherwise, async compactor could fail with errors + * + * @param compactionTime - delete the compaction time + * @return + */ + private void deletePendingCompaction(String compactionTime) { + HoodieTable table = HoodieTable.getHoodieTable( + new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); + + HoodieInstant compactionRequestedInstant = + new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionTime); + HoodieInstant compactionInflightInstant = + new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionTime); + boolean isCompactionInstantInRequestedState = table.getActiveTimeline().filterPendingCompactionTimeline() + .containsInstant(compactionRequestedInstant); + boolean isCompactionInstantInInflightState = table.getActiveTimeline().filterPendingCompactionTimeline() + .containsInstant(compactionInflightInstant); + + if (isCompactionInstantInRequestedState) { + activeTimeline.deleteCompactionRequested(compactionRequestedInstant); + } else if (isCompactionInstantInInflightState) { + activeTimeline.revertCompactionInflightToRequested(compactionInflightInstant); + activeTimeline.deleteCompactionRequested(compactionRequestedInstant); + } else { + logger.error("No compaction present " + compactionTime); + throw new IllegalArgumentException("No compaction present " + compactionTime); + } + logger.info("Compaction " + compactionTime + " deleted"); + } + /** * Rollback the state to the savepoint. WARNING: This rollsback recent commits and deleted data * files. Queries accessing the files will mostly fail. This should be done during a downtime. @@ -692,6 +728,11 @@ public class HoodieWriteClient implements Seriali HoodieTable table = HoodieTable.getHoodieTable( new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); + + // Rollback to savepoint is expected to be a manual operation and no concurrent ingestion or compaction is expected + // to be running. Rollback to savepoint also removes any pending compaction actions that are generated after + // savepoint time. Allowing pending compaction to be retained is not safe as those workload could be referencing + // file-slices that will be rolled-back as part of this operation HoodieTimeline commitTimeline = table.getMetaClient().getCommitsAndCompactionTimeline(); HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, @@ -740,8 +781,10 @@ public class HoodieWriteClient implements Seriali // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable.getHoodieTable( new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); - HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); - HoodieTimeline inflightTimeline = table.getInflightCommitTimeline(); + Set pendingCompactions = + table.getActiveTimeline().filterPendingCompactionTimeline().getInstants() + .map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); + HoodieTimeline inflightCommitTimeline = table.getInflightCommitTimeline(); HoodieTimeline commitTimeline = table.getCompletedCommitTimeline(); // Check if any of the commits is a savepoint - do not allow rollback on those commits @@ -755,37 +798,45 @@ public class HoodieWriteClient implements Seriali } }); + List pendingCompactionToRollback = + commits.stream().filter(c -> pendingCompactions.contains(c)).collect(Collectors.toList()); + List commitsToRollback = + commits.stream().filter(c -> !pendingCompactions.contains(c)).collect(Collectors.toList()); + try { - if (commitTimeline.empty() && inflightTimeline.empty()) { + if (commitTimeline.empty() && inflightCommitTimeline.empty()) { // nothing to rollback - logger.info("No commits to rollback " + commits); + logger.info("No commits to rollback " + commitsToRollback); } // Make sure only the last n commits are being rolled back // If there is a commit in-between or after that is not rolled back, then abort - String lastCommit = commits.get(commits.size() - 1); + String lastCommit = commitsToRollback.get(commitsToRollback.size() - 1); if (!commitTimeline.empty() && !commitTimeline .findInstantsAfter(lastCommit, Integer.MAX_VALUE).empty()) { throw new HoodieRollbackException( "Found commits after time :" + lastCommit + ", please rollback greater commits first"); } - List inflights = inflightTimeline.getInstants().map(HoodieInstant::getTimestamp) + List inflights = inflightCommitTimeline.getInstants().map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); if (!inflights.isEmpty() && inflights.indexOf(lastCommit) != inflights.size() - 1) { throw new HoodieRollbackException("Found in-flight commits after time :" + lastCommit + ", please rollback greater commits first"); } - List stats = table.rollback(jsc, commits); + // Remove interleaving pending compactions before rolling back commits + pendingCompactionToRollback.stream().forEach(this::deletePendingCompaction); + + List stats = table.rollback(jsc, commitsToRollback); // cleanup index entries - commits.stream().forEach(s -> { + commitsToRollback.stream().forEach(s -> { if (!index.rollbackCommit(s)) { throw new HoodieRollbackException("Rollback index changes failed, for time :" + s); } }); - logger.info("Index rolled back for commits " + commits); + logger.info("Index rolled back for commits " + commitsToRollback); Optional durationInMs = Optional.empty(); if (context != null) { @@ -795,11 +846,11 @@ public class HoodieWriteClient implements Seriali metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted); } HoodieRollbackMetadata rollbackMetadata = AvroUtils - .convertRollbackMetadata(startRollbackTime, durationInMs, commits, stats); + .convertRollbackMetadata(startRollbackTime, durationInMs, commitsToRollback, stats); table.getActiveTimeline().saveAsComplete( new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime), AvroUtils.serializeRollbackMetadata(rollbackMetadata)); - logger.info("Commits " + commits + " rollback is complete"); + logger.info("Commits " + commitsToRollback + " rollback is complete"); if (!table.getActiveTimeline().getCleanerTimeline().empty()) { logger.info("Cleaning up older rollback meta files"); @@ -810,7 +861,7 @@ public class HoodieWriteClient implements Seriali } } catch (IOException e) { throw new HoodieRollbackException( - "Failed to rollback " + config.getBasePath() + " commits " + commits, e); + "Failed to rollback " + config.getBasePath() + " commits " + commitsToRollback, e); } } @@ -890,6 +941,10 @@ public class HoodieWriteClient implements Seriali } public void startCommitWithTime(String commitTime) { + if (rollbackInFlight) { + // Only rollback inflight commit/delta-commits. Do not touch compaction commits + rollbackInflightCommits(); + } logger.info("Generate a new commit time " + commitTime); HoodieTable table = HoodieTable.getHoodieTable( new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); @@ -1061,6 +1116,7 @@ public class HoodieWriteClient implements Seriali } private HoodieTable getTableAndInitCtx() { + // Create a Hoodie table which encapsulated the commits and files visible // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable.getHoodieTable( new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleanHelper.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleanHelper.java index 2f7006fe4..e7864d496 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleanHelper.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleanHelper.java @@ -16,6 +16,7 @@ package com.uber.hoodie.io; +import com.uber.hoodie.common.model.CompactionOperation; import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieCleaningPolicy; import com.uber.hoodie.common.model.HoodieDataFile; @@ -25,14 +26,17 @@ import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.table.HoodieTable; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -48,6 +52,7 @@ public class HoodieCleanHelper> { private final TableFileSystemView fileSystemView; private final HoodieTimeline commitTimeline; + private final Map fileIdToPendingCompactionOperations; private HoodieTable hoodieTable; private HoodieWriteConfig config; @@ -56,9 +61,12 @@ public class HoodieCleanHelper> { this.fileSystemView = hoodieTable.getCompletedFileSystemView(); this.commitTimeline = hoodieTable.getCompletedCommitTimeline(); this.config = config; + this.fileIdToPendingCompactionOperations = + ((HoodieTableFileSystemView)hoodieTable.getRTFileSystemView()).getFileIdToPendingCompaction().entrySet() + .stream().map(entry -> Pair.of(entry.getKey(), entry.getValue().getValue())) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); } - /** * Selects the older versions of files for cleaning, such that it bounds the number of versions of * each file. This policy is useful, if you are simply interested in querying the table, and you @@ -81,8 +89,8 @@ public class HoodieCleanHelper> { while (fileSliceIterator.hasNext() && keepVersions > 0) { // Skip this most recent version FileSlice nextSlice = fileSliceIterator.next(); - HoodieDataFile dataFile = nextSlice.getDataFile().get(); - if (savepointedFiles.contains(dataFile.getFileName())) { + Optional dataFile = nextSlice.getDataFile(); + if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) { // do not clean up a savepoint data file continue; } @@ -91,12 +99,16 @@ public class HoodieCleanHelper> { // Delete the remaining files while (fileSliceIterator.hasNext()) { FileSlice nextSlice = fileSliceIterator.next(); - HoodieDataFile dataFile = nextSlice.getDataFile().get(); - deletePaths.add(dataFile.getFileStatus().getPath().toString()); - if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { - // If merge on read, then clean the log files for the commits as well - deletePaths.addAll(nextSlice.getLogFiles().map(file -> file.getPath().toString()) - .collect(Collectors.toList())); + if (!isFileSliceNeededForPendingCompaction(nextSlice)) { + if (nextSlice.getDataFile().isPresent()) { + HoodieDataFile dataFile = nextSlice.getDataFile().get(); + deletePaths.add(dataFile.getFileStatus().getPath().toString()); + } + if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { + // If merge on read, then clean the log files for the commits as well + deletePaths.addAll(nextSlice.getLogFiles().map(file -> file.getPath().toString()) + .collect(Collectors.toList())); + } } } } @@ -133,17 +145,21 @@ public class HoodieCleanHelper> { .collect(Collectors.toList()); for (HoodieFileGroup fileGroup : fileGroups) { List fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList()); - HoodieDataFile dataFile = fileSliceList.get(0).getDataFile().get(); - String lastVersion = dataFile.getCommitTime(); + + if (fileSliceList.isEmpty()) { + continue; + } + + String lastVersion = fileSliceList.get(0).getBaseInstantTime(); String lastVersionBeforeEarliestCommitToRetain = getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain); // Ensure there are more than 1 version of the file (we only clean old files from updates) // i.e always spare the last commit. for (FileSlice aSlice : fileSliceList) { - HoodieDataFile aFile = aSlice.getDataFile().get(); - String fileCommitTime = aFile.getCommitTime(); - if (savepointedFiles.contains(aFile.getFileName())) { + Optional aFile = aSlice.getDataFile(); + String fileCommitTime = aSlice.getBaseInstantTime(); + if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) { // do not clean up a savepoint data file continue; } @@ -159,11 +175,14 @@ public class HoodieCleanHelper> { } // Always keep the last commit - if (HoodieTimeline + if (!isFileSliceNeededForPendingCompaction(aSlice) + && HoodieTimeline .compareTimestamps(earliestCommitToRetain.getTimestamp(), fileCommitTime, HoodieTimeline.GREATER)) { // this is a commit, that should be cleaned. - deletePaths.add(aFile.getFileStatus().getPath().toString()); + if (aFile.isPresent()) { + deletePaths.add(aFile.get().getFileStatus().getPath().toString()); + } if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { // If merge on read, then clean the log files for the commits as well deletePaths.addAll(aSlice.getLogFiles().map(file -> file.getPath().toString()) @@ -183,7 +202,7 @@ public class HoodieCleanHelper> { private String getLatestVersionBeforeCommit(List fileSliceList, HoodieInstant commitTime) { for (FileSlice file : fileSliceList) { - String fileCommitTime = file.getDataFile().get().getCommitTime(); + String fileCommitTime = file.getBaseInstantTime(); if (HoodieTimeline .compareTimestamps(commitTime.getTimestamp(), fileCommitTime, HoodieTimeline.GREATER)) { // fileList is sorted on the reverse, so the first commit we find <= commitTime is the @@ -226,4 +245,19 @@ public class HoodieCleanHelper> { } return earliestCommitToRetain; } -} + + /** + * Determine if file slice needed to be preserved for pending compaction + * @param fileSlice File Slice + * @return true if file slice needs to be preserved, false otherwise. + */ + private boolean isFileSliceNeededForPendingCompaction(FileSlice fileSlice) { + CompactionOperation op = fileIdToPendingCompactionOperations.get(fileSlice.getFileId()); + if (null != op) { + // If file slice's instant time is newer or same as that of operation, do not clean + return HoodieTimeline.compareTimestamps(fileSlice.getBaseInstantTime(), op.getBaseInstantTime(), + HoodieTimeline.GREATER_OR_EQUAL); + } + return false; + } +} \ No newline at end of file diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java index bf78467ff..bf5febd1a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java @@ -16,6 +16,10 @@ package com.uber.hoodie.io; +import static com.uber.hoodie.common.table.HoodieTimeline.COMMIT_ACTION; +import static com.uber.hoodie.common.table.HoodieTimeline.DELTA_COMMIT_ACTION; +import static com.uber.hoodie.common.table.HoodieTimeline.LESSER_OR_EQUAL; + import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; @@ -32,6 +36,7 @@ import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.log.HoodieLogFormat; import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieArchivedTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.AvroUtils; @@ -42,6 +47,7 @@ import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.table.HoodieTable; import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -100,7 +106,7 @@ public class HoodieCommitArchiveLog { /** * Check if commits need to be archived. If yes, archive commits. */ - public boolean archiveIfRequired(final JavaSparkContext jsc) { + public boolean archiveIfRequired(final JavaSparkContext jsc) throws IOException { try { List instantsToArchive = getInstantsToArchive(jsc).collect(Collectors.toList()); boolean success = true; @@ -144,23 +150,34 @@ public class HoodieCommitArchiveLog { //TODO (na) : Add a way to return actions associated with a timeline and then merge/unify // with logic above to avoid Stream.concats HoodieTimeline commitTimeline = table.getCompletedCommitTimeline(); + Optional oldestPendingCompactionInstant = + table.getActiveTimeline().filterPendingCompactionTimeline().firstInstant(); + // We cannot have any holes in the commit timeline. We cannot archive any commits which are // made after the first savepoint present. Optional firstSavepoint = table.getCompletedSavepointTimeline().firstInstant(); if (!commitTimeline.empty() && commitTimeline.countInstants() > maxCommitsToKeep) { // Actually do the commits - instants = Stream.concat(instants, commitTimeline.getInstants().filter(s -> { - // if no savepoint present, then dont filter - return !(firstSavepoint.isPresent() && HoodieTimeline - .compareTimestamps(firstSavepoint.get().getTimestamp(), s.getTimestamp(), - HoodieTimeline.LESSER_OR_EQUAL)); - }).limit(commitTimeline.countInstants() - minCommitsToKeep)); + instants = Stream.concat(instants, commitTimeline.getInstants() + .filter(s -> { + // if no savepoint present, then dont filter + return !(firstSavepoint.isPresent() && HoodieTimeline + .compareTimestamps(firstSavepoint.get().getTimestamp(), s.getTimestamp(), + HoodieTimeline.LESSER_OR_EQUAL)); + }) + .filter(s -> { + // Ensure commits >= oldest pending compaction commit is retained + return oldestPendingCompactionInstant.map(instant -> { + return HoodieTimeline.compareTimestamps(instant.getTimestamp(), s.getTimestamp(), HoodieTimeline.GREATER); + }).orElse(true); + }) + .limit(commitTimeline.countInstants() - minCommitsToKeep)); } return instants; } - private boolean deleteArchivedInstants(List archivedInstants) { + private boolean deleteArchivedInstants(List archivedInstants) throws IOException { log.info("Deleting instants " + archivedInstants); boolean success = true; for (HoodieInstant archivedInstant : archivedInstants) { @@ -174,6 +191,48 @@ public class HoodieCommitArchiveLog { throw new HoodieIOException("Failed to delete archived instant " + archivedInstant, e); } } + + // Remove older meta-data from auxiliary path too + Optional latestCommitted = + archivedInstants.stream() + .filter(i -> { + return i.isCompleted() + && (i.getAction().equals(COMMIT_ACTION) || (i.getAction().equals(DELTA_COMMIT_ACTION))); + }) + .sorted(Comparator.comparing(HoodieInstant::getTimestamp).reversed()).findFirst(); + if (latestCommitted.isPresent()) { + success &= deleteAllInstantsOlderorEqualsInAuxMetaFolder(latestCommitted.get()); + } + return success; + } + + /** + * Remove older instants from auxiliary meta folder + * + * @param thresholdInstant Hoodie Instant + * @return success if all eligible file deleted successfully + * @throws IOException in case of error + */ + private boolean deleteAllInstantsOlderorEqualsInAuxMetaFolder(HoodieInstant thresholdInstant) + throws IOException { + List instants = + HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(metaClient.getFs(), + new Path(metaClient.getMetaAuxiliaryPath()), + HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE); + + List instantsToBeDeleted = + instants.stream().filter(instant1 -> HoodieTimeline.compareTimestamps(instant1.getTimestamp(), + thresholdInstant.getTimestamp(), LESSER_OR_EQUAL)).collect(Collectors.toList()); + + boolean success = true; + for (HoodieInstant deleteInstant : instantsToBeDeleted) { + log.info("Deleting instant " + deleteInstant + " in auxiliary meta path " + metaClient.getMetaAuxiliaryPath()); + Path metaFile = new Path(metaClient.getMetaAuxiliaryPath(), deleteInstant.getFileName()); + if (metaClient.getFs().exists(metaFile)) { + success &= metaClient.getFs().delete(metaFile, false); + log.info("Deleted instant file in auxiliary metapath : " + metaFile); + } + } return success; } @@ -212,7 +271,7 @@ public class HoodieCommitArchiveLog { archivedMetaWrapper.setActionType(ActionType.clean.name()); break; } - case HoodieTimeline.COMMIT_ACTION: { + case COMMIT_ACTION: { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get()); archivedMetaWrapper.setHoodieCommitMetadata(commitMetadataConverter(commitMetadata)); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java index a05cbc7b6..2408333be 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java @@ -19,12 +19,16 @@ package com.uber.hoodie; import static com.uber.hoodie.common.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; import static com.uber.hoodie.common.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; import static com.uber.hoodie.common.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH; +import static com.uber.hoodie.common.table.HoodieTimeline.COMPACTION_ACTION; +import static com.uber.hoodie.common.table.HoodieTimeline.GREATER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import com.google.common.collect.Iterables; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.common.HoodieCleanStat; +import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieCleaningPolicy; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieDataFile; @@ -38,12 +42,16 @@ import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.timeline.HoodieInstant.State; +import com.uber.hoodie.common.util.AvroUtils; +import com.uber.hoodie.common.util.CompactionUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.table.HoodieTable; import java.io.IOException; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -53,7 +61,10 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.TreeSet; +import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -61,6 +72,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.scheduler.SparkListener; import org.apache.spark.scheduler.SparkListenerTaskEnd; import org.apache.spark.util.AccumulatorV2; +import org.junit.Assert; import org.junit.Test; import scala.Option; import scala.collection.Iterator; @@ -82,7 +94,7 @@ public class TestCleaner extends TestHoodieClientBase { * @param insertFn Insertion API for testing * @throws Exception in case of error */ - private void insertFirstBigBatchForClientCleanerTest( + private String insertFirstBigBatchForClientCleanerTest( HoodieWriteConfig cfg, HoodieWriteClient client, Function2, String, Integer> recordGenFunction, @@ -118,6 +130,7 @@ public class TestCleaner extends TestHoodieClientBase { HoodieIndex index = HoodieIndex.createIndex(cfg, jsc); List taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table).collect(); checkTaggedRecords(taggedRecords, newCommitTime); + return newCommitTime; } /** @@ -185,21 +198,51 @@ public class TestCleaner extends TestHoodieClientBase { insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn); + Map selectedFileIdForCompaction = new HashMap<>(); + Map compactionFileIdToLatestFileSlice = new HashMap<>(); + HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTable table = HoodieTable.getHoodieTable(metadata, getConfig(), jsc); + for (String partitionPath : dataGen.getPartitionPaths()) { + TableFileSystemView fsView = table.getFileSystemView(); + Optional added = fsView.getAllFileGroups(partitionPath).findFirst() + .map(fg -> { + selectedFileIdForCompaction.put(fg.getId(), partitionPath); + fg.getLatestFileSlice().map(fs -> compactionFileIdToLatestFileSlice.put(fg.getId(), fs)); + return true; + }); + if (added.isPresent()) { + // Select only one file-group for compaction + break; + } + } + + // Create workload with selected file-slices + List> partitionFileSlicePairs = compactionFileIdToLatestFileSlice.entrySet().stream() + .map(e -> Pair.of(selectedFileIdForCompaction.get(e.getKey()), e.getValue())).collect(Collectors.toList()); + HoodieCompactionPlan compactionPlan = + CompactionUtils.buildFromFileSlices(partitionFileSlicePairs, Optional.empty(), Optional.empty()); + List instantTimes = HoodieTestUtils.monotonicIncreasingCommitTimestamps(9, 1); + String compactionTime = instantTimes.get(0); + table.getActiveTimeline().saveToCompactionRequested( + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, compactionTime), + AvroUtils.serializeCompactionPlan(compactionPlan)); + + instantTimes = instantTimes.subList(1, instantTimes.size()); // Keep doing some writes and clean inline. Make sure we have expected number of files // remaining. - HoodieTestUtils.monotonicIncreasingCommitTimestamps(8, 1).stream().forEach(newCommitTime -> { + for (String newInstantTime : instantTimes) { try { - client.startCommitWithTime(newCommitTime); - List records = recordUpsertGenWrappedFunction.apply(newCommitTime, 100); + client.startCommitWithTime(newInstantTime); + List records = recordUpsertGenWrappedFunction.apply(newInstantTime, 100); List statuses = - upsertFn.apply(client, jsc.parallelize(records, 1), newCommitTime).collect(); + upsertFn.apply(client, jsc.parallelize(records, 1), newInstantTime).collect(); // Verify there are no errors assertNoWriteErrors(statuses); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - HoodieTable table = HoodieTable.getHoodieTable(metadata, getConfig(), jsc); - HoodieTimeline timeline = metadata.getCommitsTimeline(); + metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + table = HoodieTable.getHoodieTable(metadata, getConfig(), jsc); + HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline(); TableFileSystemView fsView = table.getFileSystemView(); // Need to ensure the following @@ -221,25 +264,39 @@ public class TestCleaner extends TestHoodieClientBase { List fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList()); for (HoodieFileGroup fileGroup : fileGroups) { - // No file has no more than max versions - String fileId = fileGroup.getId(); - List dataFiles = fileGroup.getAllDataFiles().collect(Collectors.toList()); + if (selectedFileIdForCompaction.containsKey(fileGroup.getId())) { + // Ensure latest file-slice selected for compaction is retained + String oldestCommitRetained = + fileGroup.getAllDataFiles().map(HoodieDataFile::getCommitTime).sorted().findFirst().get(); - assertTrue("fileId " + fileId + " has more than " + maxVersions + " versions", - dataFiles.size() <= maxVersions); + Optional dataFileForCompactionPresent = + fileGroup.getAllDataFiles().filter(df -> { + return compactionFileIdToLatestFileSlice.get(fileGroup.getId()) + .getBaseInstantTime().equals(df.getCommitTime()); + }).findAny(); + Assert.assertTrue("Data File selected for compaction is retained", + dataFileForCompactionPresent.isPresent()); + } else { + // file has no more than max versions + String fileId = fileGroup.getId(); + List dataFiles = fileGroup.getAllDataFiles().collect(Collectors.toList()); - // Each file, has the latest N versions (i.e cleaning gets rid of older versions) - List commitedVersions = new ArrayList<>(fileIdToVersions.get(fileId)); - for (int i = 0; i < dataFiles.size(); i++) { - assertEquals("File " + fileId + " does not have latest versions on commits" + commitedVersions, - Iterables.get(dataFiles, i).getCommitTime(), commitedVersions.get(commitedVersions.size() - 1 - i)); + assertTrue("fileId " + fileId + " has more than " + maxVersions + " versions", + dataFiles.size() <= maxVersions); + + // Each file, has the latest N versions (i.e cleaning gets rid of older versions) + List commitedVersions = new ArrayList<>(fileIdToVersions.get(fileId)); + for (int i = 0; i < dataFiles.size(); i++) { + assertEquals("File " + fileId + " does not have latest versions on commits" + commitedVersions, + Iterables.get(dataFiles, i).getCommitTime(), commitedVersions.get(commitedVersions.size() - 1 - i)); + } } } } } catch (IOException ioe) { throw new RuntimeException(ioe); } - }); + } } /** @@ -679,6 +736,168 @@ public class TestCleaner extends TestHoodieClientBase { stageOneShuffleReadTaskRecordsCountMap.values().stream().filter(a -> a > 10 && a < 100).count() == 3); } + + /** + * Test Keep Latest Commits when there are pending compactions + */ + @Test + public void testKeepLatestCommitsWithPendingCompactions() throws IOException { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().withCleanerPolicy( + HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()).build(); + // Deletions: + // . FileId Parquet Logs Total Retained Commits + // FileId7 5 10 15 009, 011 + // FileId6 5 10 15 009 + // FileId5 3 6 9 005 + // FileId4 2 4 6 003 + // FileId3 1 2 3 001 + // FileId2 0 0 0 000 + // FileId1 0 0 0 000 + testPendingCompactions(config, 48, 18); + } + + /** + * Test Keep Latest Versions when there are pending compactions + */ + @Test + public void testKeepLatestVersionsWithPendingCompactions() throws IOException { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().withCleanerPolicy( + HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(2).build()).build(); + // Deletions: + // . FileId Parquet Logs Total Retained Commits + // FileId7 5 10 15 009, 011 + // FileId6 4 8 12 007, 009 + // FileId5 2 4 6 003 005 + // FileId4 1 2 3 001, 003 + // FileId3 0 0 0 000, 001 + // FileId2 0 0 0 000 + // FileId1 0 0 0 000 + testPendingCompactions(config, 36, 9); + } + + /** + * Common test method for validating pending compactions + * + * @param config Hoodie Write Config + * @param expNumFilesDeleted Number of files deleted + */ + public void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDeleted, + int expNumFilesUnderCompactionDeleted) throws IOException { + HoodieTableMetaClient metaClient = HoodieTestUtils.initTableType(jsc.hadoopConfiguration(), basePath, + HoodieTableType.MERGE_ON_READ); + String[] instants = new String[]{"000", "001", "003", "005", "007", "009", "011", "013"}; + String[] compactionInstants = new String[]{"002", "004", "006", "008", "010"}; + Map expFileIdToPendingCompaction = new HashMap<>(); + Map fileIdToLatestInstantBeforeCompaction = new HashMap<>(); + Map> compactionInstantsToFileSlices = new HashMap<>(); + + for (String instant : instants) { + HoodieTestUtils.createCommitFiles(basePath, instant); + } + + // Generate 7 file-groups. First one has only one slice and no pending compaction. File Slices (2 - 5) has + // multiple versions with pending compaction. File Slices (6 - 7) have multiple file-slices but not under + // compactions + // FileIds 2-5 will be under compaction + int maxNumFileIds = 7; + String[] fileIds = new String[] + {"fileId1", "fileId2", "fileId3", "fileId4", "fileId5", "fileId6", "fileId7"}; + int maxNumFileIdsForCompaction = 4; + for (int i = 0; i < maxNumFileIds; i++) { + final String fileId = HoodieTestUtils.createDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, instants[0], + fileIds[i]); + HoodieTestUtils.createNewLogFile(fs, basePath, DEFAULT_FIRST_PARTITION_PATH, instants[0], + fileId, Optional.empty()); + HoodieTestUtils.createNewLogFile(fs, basePath, DEFAULT_FIRST_PARTITION_PATH, instants[0], + fileId, Optional.of(2)); + fileIdToLatestInstantBeforeCompaction.put(fileId, instants[0]); + for (int j = 1; j <= i; j++) { + if (j == i && j <= maxNumFileIdsForCompaction) { + expFileIdToPendingCompaction.put(fileId, compactionInstants[j]); + HoodieTable table = HoodieTable.getHoodieTable( + new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, + jsc); + FileSlice slice = table.getRTFileSystemView().getLatestFileSlices(DEFAULT_FIRST_PARTITION_PATH) + .filter(fs -> fs.getFileId().equals(fileId)).findFirst().get(); + List slices = new ArrayList<>(); + if (compactionInstantsToFileSlices.containsKey(compactionInstants[j])) { + slices = compactionInstantsToFileSlices.get(compactionInstants[j]); + } + slices.add(slice); + compactionInstantsToFileSlices.put(compactionInstants[j], slices); + // Add log-files to simulate delta-commits after pending compaction + HoodieTestUtils.createNewLogFile(fs, basePath, DEFAULT_FIRST_PARTITION_PATH, compactionInstants[j], + fileId, Optional.empty()); + HoodieTestUtils.createNewLogFile(fs, basePath, DEFAULT_FIRST_PARTITION_PATH, compactionInstants[j], + fileId, Optional.of(2)); + } else { + HoodieTestUtils.createDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, instants[j], fileId); + HoodieTestUtils.createNewLogFile(fs, basePath, DEFAULT_FIRST_PARTITION_PATH, instants[j], fileId, + Optional.empty()); + HoodieTestUtils.createNewLogFile(fs, basePath, DEFAULT_FIRST_PARTITION_PATH, instants[j], fileId, + Optional.of(2)); + fileIdToLatestInstantBeforeCompaction.put(fileId, instants[j]); + } + } + } + + // Setup pending compaction plans + for (String instant : compactionInstants) { + List fileSliceList = compactionInstantsToFileSlices.get(instant); + if (null != fileSliceList) { + HoodieTestUtils.createCompactionRequest(metaClient, instant, + fileSliceList.stream().map(fs -> Pair.of(DEFAULT_FIRST_PARTITION_PATH, fs)).collect(Collectors.toList())); + } + } + + // Clean now + HoodieTable table = HoodieTable.getHoodieTable( + new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, + jsc); + List hoodieCleanStats = table.clean(jsc); + + // Test for safety + final HoodieTable hoodieTable = HoodieTable.getHoodieTable( + new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, + jsc); + + expFileIdToPendingCompaction.entrySet().stream().forEach(entry -> { + String fileId = entry.getKey(); + String baseInstantForCompaction = fileIdToLatestInstantBeforeCompaction.get(fileId); + Optional fileSliceForCompaction = + hoodieTable.getRTFileSystemView().getLatestFileSlicesBeforeOrOn(DEFAULT_FIRST_PARTITION_PATH, + baseInstantForCompaction).filter(fs -> fs.getFileId().equals(fileId)).findFirst(); + Assert.assertTrue("Base Instant for Compaction must be preserved", fileSliceForCompaction.isPresent()); + Assert.assertTrue("FileSlice has data-file", fileSliceForCompaction.get().getDataFile().isPresent()); + Assert.assertEquals("FileSlice has log-files", 2, + fileSliceForCompaction.get().getLogFiles().count()); + }); + + // Test for progress (Did we clean some files ?) + long numFilesUnderCompactionDeleted = + hoodieCleanStats.stream().flatMap(cleanStat -> { + return convertPathToFileIdWithCommitTime(metaClient, cleanStat.getDeletePathPatterns()).map( + fileIdWithCommitTime -> { + if (expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) { + Assert.assertTrue("Deleted instant time must be less than pending compaction", + HoodieTimeline.compareTimestamps( + fileIdToLatestInstantBeforeCompaction.get(fileIdWithCommitTime.getKey()), + fileIdWithCommitTime.getValue(), GREATER)); + return true; + } + return false; + }); + }).filter(x -> x).count(); + long numDeleted = hoodieCleanStats.stream() + .flatMap(cleanStat -> cleanStat.getDeletePathPatterns().stream()).count(); + // Tighter check for regression + Assert.assertEquals("Correct number of files deleted", expNumFilesDeleted, numDeleted); + Assert.assertEquals("Correct number of files under compaction deleted", + expNumFilesUnderCompactionDeleted, numFilesUnderCompactionDeleted); + } + /** * Utility method to create temporary data files * @@ -703,4 +922,23 @@ public class TestCleaner extends TestHoodieClientBase { private int getTotalTempFiles() throws IOException { return fs.listStatus(new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME)).length; } + + private Stream> convertPathToFileIdWithCommitTime( + final HoodieTableMetaClient metaClient, List paths) { + Predicate roFilePredicate = path -> + path.contains(metaClient.getTableConfig().getROFileFormat().getFileExtension()); + Predicate rtFilePredicate = path -> + path.contains(metaClient.getTableConfig().getRTFileFormat().getFileExtension()); + Stream> stream1 = paths.stream().filter(roFilePredicate) + .map(fullPath -> { + String fileName = Paths.get(fullPath).getFileName().toString(); + return Pair.of(FSUtils.getFileId(fileName), FSUtils.getCommitTime(fileName)); + }); + Stream> stream2 = paths.stream().filter(rtFilePredicate) + .map(path -> { + return Pair.of(FSUtils.getFileIdFromLogPath(new Path(path)), + FSUtils.getBaseCommitTimeFromLogPath(new Path(path))); + }); + return Stream.concat(stream1, stream2); + } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestClientRollback.java b/hoodie-client/src/test/java/com/uber/hoodie/TestClientRollback.java index 68e3e9f0b..2c8860219 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestClientRollback.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestClientRollback.java @@ -303,7 +303,7 @@ public class TestClientRollback extends TestHoodieClientBase { && HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime1, file13)); // Turn auto rollback on - new HoodieWriteClient(jsc, config, true); + new HoodieWriteClient(jsc, config, true).startCommit(); assertTrue(HoodieTestUtils.doesCommitExist(basePath, commitTime1)); assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime2)); assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime3)); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java index 1d3f6a7ee..a36faef1c 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java @@ -138,6 +138,15 @@ public class HoodieTestDataGenerator { } } + public static void createCompactionRequestedFile(String basePath, String commitTime, Configuration configuration) + throws IOException { + Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + HoodieTimeline.makeRequestedCompactionFileName(commitTime)); + FileSystem fs = FSUtils.getFs(basePath, configuration); + FSDataOutputStream os = fs.create(commitFile, true); + os.close(); + } + public static void createCompactionAuxiliaryMetadata(String basePath, HoodieInstant instant, Configuration configuration) throws IOException { Path commitFile = new Path( diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java index 57d285688..6f0369180 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java @@ -16,7 +16,9 @@ package com.uber.hoodie.io; +import static com.uber.hoodie.common.table.HoodieTimeline.COMPACTION_ACTION; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import com.google.common.collect.Sets; @@ -30,7 +32,9 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.log.HoodieLogFormat; import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.timeline.HoodieInstant.State; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieWriteConfig; import java.io.File; @@ -133,11 +137,47 @@ public class TestHoodieCommitArchiveLog { HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 4).build()) .forTable("test-trip-table").build(); HoodieTestUtils.init(hadoopConf, basePath); + // Requested Compaction + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "100"), dfs.getConf()); + // Inflight Compaction + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "100"), dfs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf()); + // Requested Compaction + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "101"), dfs.getConf()); + // Inflight Compaction + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "101"), dfs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf()); + // Requested Compaction + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "102"), dfs.getConf()); + // Inflight Compaction + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "102"), dfs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf()); + // Requested Compaction + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "103"), dfs.getConf()); + // Inflight Compaction + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "103"), dfs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "103", dfs.getConf()); + // Requested Compaction + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "104"), dfs.getConf()); + // Inflight Compaction + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "104"), dfs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "104", dfs.getConf()); + // Requested Compaction + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "105"), dfs.getConf()); + // Inflight Compaction + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "105"), dfs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath); @@ -172,6 +212,37 @@ public class TestHoodieCommitArchiveLog { timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); originalCommits.removeAll(timeline.getInstants().collect(Collectors.toList())); + // Check compaction instants + List instants = + HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(metaClient.getFs(), + new Path(metaClient.getMetaAuxiliaryPath()), + HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE); + assertEquals("Should delete all compaction instants < 104", 4, instants.size()); + assertFalse("Requested Compaction must be absent for 100", instants.contains( + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "100"))); + assertFalse("Inflight Compaction must be absent for 100", instants.contains( + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "100"))); + assertFalse("Requested Compaction must be absent for 101", instants.contains( + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "101"))); + assertFalse("Inflight Compaction must be absent for 101", instants.contains( + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "101"))); + assertFalse("Requested Compaction must be absent for 102", instants.contains( + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "102"))); + assertFalse("Inflight Compaction must be absent for 102", instants.contains( + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "102"))); + assertFalse("Requested Compaction must be absent for 103", instants.contains( + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "103"))); + assertFalse("Inflight Compaction must be absent for 103", instants.contains( + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "103"))); + assertTrue("Requested Compaction must be present for 104", instants.contains( + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "104"))); + assertTrue("Inflight Compaction must be present for 104", instants.contains( + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "104"))); + assertTrue("Requested Compaction must be present for 105", instants.contains( + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "105"))); + assertTrue("Inflight Compaction must be present for 105", instants.contains( + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "105"))); + //read the file HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(dfs, new HoodieLogFile(new Path(basePath + "/.hoodie/.commits_.archive.1")), @@ -210,9 +281,33 @@ public class TestHoodieCommitArchiveLog { HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build(); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); + // Requested Compaction + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "100"), dfs.getConf()); + // Inflight Compaction + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "100"), dfs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf()); + // Requested Compaction + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "101"), dfs.getConf()); + // Inflight Compaction + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "101"), dfs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf()); + // Requested Compaction + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "102"), dfs.getConf()); + // Inflight Compaction + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "102"), dfs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf()); + // Requested Compaction + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "103"), dfs.getConf()); + // Inflight Compaction + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "103"), dfs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "103", dfs.getConf()); HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); @@ -221,6 +316,28 @@ public class TestHoodieCommitArchiveLog { assertTrue(result); timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); assertEquals("Should not archive commits when maxCommitsToKeep is 5", 4, timeline.countInstants()); + + List instants = + HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(metaClient.getFs(), + new Path(metaClient.getMetaAuxiliaryPath()), + HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE); + assertEquals("Should not delete any aux compaction files when maxCommitsToKeep is 5", 8, instants.size()); + assertTrue("Requested Compaction must be present for 100", instants.contains( + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "100"))); + assertTrue("Inflight Compaction must be present for 100", instants.contains( + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "100"))); + assertTrue("Requested Compaction must be present for 101", instants.contains( + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "101"))); + assertTrue("Inflight Compaction must be present for 101", instants.contains( + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "101"))); + assertTrue("Requested Compaction must be present for 102", instants.contains( + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "102"))); + assertTrue("Inflight Compaction must be present for 102", instants.contains( + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "102"))); + assertTrue("Requested Compaction must be present for 103", instants.contains( + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "103"))); + assertTrue("Inflight Compaction must be present for 103", instants.contains( + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, "103"))); } @Test @@ -281,6 +398,53 @@ public class TestHoodieCommitArchiveLog { timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "103"))); } + @Test + public void testArchiveCommitCompactionNoHole() throws IOException { + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .forTable("test-trip-table").withCompactionConfig( + HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build(); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath); + HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); + HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf()); + HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "101", dfs.getConf()); + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "101"), dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "103", dfs.getConf()); + HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "104", dfs.getConf()); + HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, "104"), dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "106", dfs.getConf()); + HoodieTestDataGenerator.createCommitFile(basePath, "107", dfs.getConf()); + + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline(); + assertEquals("Loaded 6 commits and the count should match", 8, timeline.countInstants()); + boolean result = archiveLog.archiveIfRequired(jsc); + assertTrue(result); + timeline = metaClient.getActiveTimeline().reload().getCommitsAndCompactionTimeline(); + assertFalse("Instants before oldest pending compaction can be removed", + timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "100"))); + assertEquals( + "Since we have a pending compaction at 101, we should never archive any commit " + + "after 101 (we only " + "archive 100)", 7, timeline.countInstants()); + assertTrue("Requested Compaction must still be present", + timeline.containsInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "101"))); + assertTrue("Instants greater than oldest pending compaction must be present", + timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "102"))); + assertTrue("Instants greater than oldest pending compaction must be present", + timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "103"))); + assertTrue("Instants greater than oldest pending compaction must be present", + timeline.containsInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "104"))); + assertTrue("Instants greater than oldest pending compaction must be present", + timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "105"))); + assertTrue("Instants greater than oldest pending compaction must be present", + timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "106"))); + assertTrue("Instants greater than oldest pending compaction must be present", + timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "107"))); + } + private void verifyInflightInstants(HoodieTableMetaClient metaClient, int expectedTotalInstants) { HoodieTimeline timeline = metaClient.getActiveTimeline().reload() .getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)).filterInflights(); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionOperation.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionOperation.java index 9f23c4a12..f7f886e2a 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionOperation.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionOperation.java @@ -106,7 +106,7 @@ public class CompactionOperation implements Serializable { op.dataFilePath = Optional.ofNullable(operation.getDataFilePath()); op.deltaFilePaths = new ArrayList<>(operation.getDeltaFilePaths()); op.fileId = operation.getFileId(); - op.metrics = new HashMap<>(operation.getMetrics()); + op.metrics = operation.getMetrics() == null ? new HashMap<>() : new HashMap<>(operation.getMetrics()); op.partitionPath = operation.getPartitionPath(); return op; } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java index e1ef48a35..8aa585728 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java @@ -17,6 +17,7 @@ package com.uber.hoodie.common.table.view; import com.google.common.collect.ImmutableMap; +import com.uber.hoodie.common.model.CompactionOperation; import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieFileGroup; @@ -74,7 +75,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView, /** * File Id to pending compaction instant time */ - private final Map fileIdToPendingCompactionInstantTime; + private final Map> fileIdToPendingCompaction; /** * Create a file system view, as of the given timeline @@ -89,9 +90,10 @@ public class HoodieTableFileSystemView implements TableFileSystemView, // Build fileId to Pending Compaction Instants List pendingCompactionInstants = metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().collect(Collectors.toList()); - this.fileIdToPendingCompactionInstantTime = ImmutableMap.copyOf( + this.fileIdToPendingCompaction = ImmutableMap.copyOf( CompactionUtils.getAllPendingCompactionOperations(metaClient).entrySet().stream().map(entry -> { - return Pair.of(entry.getKey(), entry.getValue().getKey()); + return Pair.of(entry.getKey(), Pair.of(entry.getValue().getKey(), + CompactionOperation.convertFromAvroRecordInstance(entry.getValue().getValue()))); }).collect(Collectors.toMap(Pair::getKey, Pair::getValue))); } @@ -155,10 +157,10 @@ public class HoodieTableFileSystemView implements TableFileSystemView, if (logFiles.containsKey(pair)) { logFiles.get(pair).forEach(logFile -> group.addLogFile(logFile)); } - if (fileIdToPendingCompactionInstantTime.containsKey(fileId)) { + if (fileIdToPendingCompaction.containsKey(fileId)) { // If there is no delta-commit after compaction request, this step would ensure a new file-slice appears // so that any new ingestion uses the correct base-instant - group.addNewFileSliceAtInstant(fileIdToPendingCompactionInstantTime.get(fileId)); + group.addNewFileSliceAtInstant(fileIdToPendingCompaction.get(fileId).getKey()); } fileGroups.add(group); }); @@ -196,8 +198,9 @@ public class HoodieTableFileSystemView implements TableFileSystemView, * @param dataFile Data File */ private boolean isDataFileDueToPendingCompaction(HoodieDataFile dataFile) { - String compactionInstantTime = fileIdToPendingCompactionInstantTime.get(dataFile.getFileId()); - if ((null != compactionInstantTime) && dataFile.getCommitTime().equals(compactionInstantTime)) { + Pair compactionWithInstantTime = fileIdToPendingCompaction.get(dataFile.getFileId()); + if ((null != compactionWithInstantTime) && (null != compactionWithInstantTime.getLeft()) + && dataFile.getCommitTime().equals(compactionWithInstantTime.getKey())) { return true; } return false; @@ -277,7 +280,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView, FileSlice fileSlice = fileGroup.getLatestFileSlice().get(); // if the file-group is under compaction, pick the latest before compaction instant time. if (isFileSliceAfterPendingCompaction(fileSlice)) { - String compactionInstantTime = fileIdToPendingCompactionInstantTime.get(fileSlice.getFileId()); + String compactionInstantTime = fileIdToPendingCompaction.get(fileSlice.getFileId()).getLeft(); return fileGroup.getLatestFileSliceBefore(compactionInstantTime); } return Optional.of(fileSlice); @@ -292,8 +295,9 @@ public class HoodieTableFileSystemView implements TableFileSystemView, * @return */ private boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) { - String compactionInstantTime = fileIdToPendingCompactionInstantTime.get(fileSlice.getFileId()); - if ((null != compactionInstantTime) && fileSlice.getBaseInstantTime().equals(compactionInstantTime)) { + Pair compactionWithInstantTime = fileIdToPendingCompaction.get(fileSlice.getFileId()); + if ((null != compactionWithInstantTime) + && fileSlice.getBaseInstantTime().equals(compactionWithInstantTime.getKey())) { return true; } return false; @@ -352,8 +356,8 @@ public class HoodieTableFileSystemView implements TableFileSystemView, */ private FileSlice getMergedFileSlice(HoodieFileGroup fileGroup, FileSlice fileSlice) { // if the file-group is under construction, pick the latest before compaction instant time. - if (fileIdToPendingCompactionInstantTime.containsKey(fileSlice.getFileId())) { - String compactionInstantTime = fileIdToPendingCompactionInstantTime.get(fileSlice.getFileId()); + if (fileIdToPendingCompaction.containsKey(fileSlice.getFileId())) { + String compactionInstantTime = fileIdToPendingCompaction.get(fileSlice.getFileId()).getKey(); if (fileSlice.getBaseInstantTime().equals(compactionInstantTime)) { Optional prevFileSlice = fileGroup.getLatestFileSliceBefore(compactionInstantTime); if (prevFileSlice.isPresent()) { @@ -416,4 +420,8 @@ public class HoodieTableFileSystemView implements TableFileSystemView, "Failed to list data files in partition " + partitionPathStr, e); } } + + public Map> getFileIdToPendingCompaction() { + return fileIdToPendingCompaction; + } } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index 0fbb46cd2..475cc4f80 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -26,6 +26,7 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.uber.hoodie.avro.model.HoodieCleanMetadata; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats; import com.uber.hoodie.common.table.HoodieTableConfig; @@ -36,7 +37,10 @@ import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer; import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.timeline.HoodieInstant.State; import com.uber.hoodie.common.util.AvroUtils; +import com.uber.hoodie.common.util.CompactionUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; import java.io.ByteArrayInputStream; @@ -61,6 +65,7 @@ import java.util.stream.Stream; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -113,6 +118,14 @@ public class HoodieTestUtils { } } + public static final void createDeltaCommitFiles(String basePath, String... commitTimes) throws IOException { + for (String commitTime : commitTimes) { + new File( + basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeDeltaFileName(commitTime)) + .createNewFile(); + } + } + public static final void createInflightCommitFiles(String basePath, String... commitTimes) throws IOException { for (String commitTime : commitTimes) { new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeInflightCommitFileName( @@ -177,6 +190,15 @@ public class HoodieTestUtils { } } + public static final void createCompactionRequest(HoodieTableMetaClient metaClient, String instant, + List> fileSliceList) throws IOException { + HoodieCompactionPlan plan = CompactionUtils.buildFromFileSlices(fileSliceList, Optional.empty(), Optional.empty()); + HoodieInstant compactionInstant = + new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instant); + metaClient.getActiveTimeline().saveToCompactionRequested(compactionInstant, + AvroUtils.serializeCompactionPlan(plan)); + } + public static final String getDataFilePath(String basePath, String partitionPath, String commitTime, String fileID) throws IOException { return basePath + "/" + partitionPath + "/" + FSUtils