diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 9f14e6c6c..815b41dc5 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -39,7 +39,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; - import org.apache.hudi.exception.HoodieSavepointException; import org.apache.hudi.table.HoodieTable; import org.apache.log4j.LogManager; @@ -48,6 +47,7 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -111,36 +111,77 @@ public class CleanPlanner> implements Serializa * @throws IOException when underlying file-system throws this exception */ public List getPartitionPathsToClean(Option newInstantToRetain) throws IOException { - if (config.incrementalCleanerModeEnabled() && newInstantToRetain.isPresent() - && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) { - Option lastClean = - hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant(); + switch (config.getCleanerPolicy()) { + case KEEP_LATEST_COMMITS: + return getPartitionPathsForCleanByCommits(newInstantToRetain); + case KEEP_LATEST_FILE_VERSIONS: + return getPartitionPathsForFullCleaning(); + default: + throw new IllegalStateException("Unknown Cleaner Policy"); + } + } + + /** + * Return partition paths for cleaning by commits mode. + * @param instantToRetain Earliest Instant to retain + * @return list of partitions + * @throws IOException + */ + private List getPartitionPathsForCleanByCommits(Option instantToRetain) throws IOException { + if (!instantToRetain.isPresent()) { + LOG.info("No earliest commit to retain. No need to scan partitions !!"); + return Collections.emptyList(); + } + + if (config.incrementalCleanerModeEnabled()) { + Option lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant(); if (lastClean.isPresent()) { HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils .deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get()); if ((cleanMetadata.getEarliestCommitToRetain() != null) && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) { - LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed " - + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain() - + ". New Instant to retain : " + newInstantToRetain); - return hoodieTable.getCompletedCommitsTimeline().getInstants() - .filter(instant -> - HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, cleanMetadata.getEarliestCommitToRetain()) - && HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp()) - ).flatMap(instant -> { - try { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class); - return commitMetadata.getPartitionToWriteStats().keySet().stream(); - } catch (IOException e) { - throw new HoodieIOException(e.getMessage(), e); - } - }).distinct().collect(Collectors.toList()); + return getPartitionPathsForIncrementalCleaning(cleanMetadata, instantToRetain); } } } - // Otherwise go to brute force mode of scanning all partitions - return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(), - hoodieTable.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()); + return getPartitionPathsForFullCleaning(); + } + + /** + * Use Incremental Mode for finding partition paths. + * @param cleanMetadata + * @param newInstantToRetain + * @return + */ + private List getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata, + Option newInstantToRetain) { + LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed " + + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain() + + ". New Instant to retain : " + newInstantToRetain); + return hoodieTable.getCompletedCommitsTimeline().getInstants().filter( + instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, + cleanMetadata.getEarliestCommitToRetain()) && HoodieTimeline.compareTimestamps(instant.getTimestamp(), + HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())).flatMap(instant -> { + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), + HoodieCommitMetadata.class); + return commitMetadata.getPartitionToWriteStats().keySet().stream(); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + }).distinct().collect(Collectors.toList()); + } + + /** + * Scan and list all paritions for cleaning. + * @return all partitions paths for the dataset. + * @throws IOException + */ + private List getPartitionPathsForFullCleaning() throws IOException { + // Go to brute force mode of scanning all partitions + return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(), hoodieTable.getMetaClient().getBasePath(), + config.shouldAssumeDatePartitioning()); } /** diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java index ad0589059..296530bd1 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -104,7 +104,8 @@ public class TestCleaner extends TestHoodieClientBase { */ private void insertFirstBigBatchForClientCleanerTest(HoodieWriteConfig cfg, HoodieWriteClient client, Function2, String, Integer> recordGenFunction, - Function3, HoodieWriteClient, JavaRDD, String> insertFn) throws Exception { + Function3, HoodieWriteClient, JavaRDD, String> insertFn, + HoodieCleaningPolicy cleaningPolicy) throws Exception { /* * do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages @@ -127,11 +128,15 @@ public class TestCleaner extends TestHoodieClientBase { HoodieTable table = HoodieTable.create(metaClient, client.getConfig(), jsc); assertFalse(table.getCompletedCommitsTimeline().empty()); - String instantTime = table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp(); - assertFalse(table.getCompletedCleanTimeline().empty()); - assertEquals(instantTime, - table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp(), - "The clean instant should be the same as the commit instant"); + if (cleaningPolicy.equals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)) { + // We no longer write empty cleaner plans when there are not enough commits present + assertTrue(table.getCompletedCleanTimeline().empty()); + } else { + String instantTime = table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp(); + assertFalse(table.getCompletedCleanTimeline().empty()); + assertEquals(instantTime, table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp(), + "The clean instant should be the same as the commit instant"); + } HoodieIndex index = HoodieIndex.createIndex(cfg, jsc); List taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table).collect(); @@ -201,7 +206,8 @@ public class TestCleaner extends TestHoodieClientBase { final Function2, String, Integer> recordUpsertGenWrappedFunction = generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates); - insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn); + insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn, + HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS); Map compactionFileIdToLatestFileSlice = new HashMap<>(); metaClient = HoodieTableMetaClient.reload(metaClient); @@ -349,7 +355,7 @@ public class TestCleaner extends TestHoodieClientBase { int maxCommits = 3; // keep upto 3 commits from the past HoodieWriteConfig cfg = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainCommits(maxCommits).build()) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build()) .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) .build(); @@ -361,7 +367,8 @@ public class TestCleaner extends TestHoodieClientBase { final Function2, String, Integer> recordUpsertGenWrappedFunction = generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates); - insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn); + insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn, + HoodieCleaningPolicy.KEEP_LATEST_COMMITS); // Keep doing some writes and clean inline. Make sure we have expected number of files remaining. HoodieTestUtils.monotonicIncreasingCommitTimestamps(8, 1).forEach(newCommitTime -> { @@ -376,7 +383,9 @@ public class TestCleaner extends TestHoodieClientBase { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table1 = HoodieTable.create(metaClient, cfg, jsc); HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline(); - Option earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits - 1); + // NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest + // commit + Option earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits); Set acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet()); if (earliestRetainedCommit.isPresent()) { acceptableCommits @@ -751,12 +760,7 @@ public class TestCleaner extends TestHoodieClientBase { metaClient = HoodieTableMetaClient.reload(metaClient); List hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry); - assertEquals(0, - getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() - .size(), "Must not clean any files"); - assertEquals(0, - getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles() - .size(), "Must not clean any files"); + assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files"); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", file1P0C0)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000", @@ -786,12 +790,7 @@ public class TestCleaner extends TestHoodieClientBase { new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"), Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); List hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry); - assertEquals(0, - getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() - .size(), "Must not clean any files"); - assertEquals(0, - getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles() - .size(), "Must not clean any files"); + assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files"); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file2P0C1)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java index fda192f96..087a283b7 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java @@ -139,9 +139,6 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness { new HoodieSnapshotExporter().export(jsc, cfg); // Check results - assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".clean"))); - assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".clean.inflight"))); - assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".clean.requested"))); assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit"))); assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit.requested"))); assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".inflight")));