diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index 2570e204e..4003a07de 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.table.action.clean.CleaningTriggerStrategy; import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy; @@ -129,6 +130,17 @@ public class HoodieCompactionConfig extends HoodieConfig { .withDocumentation("Controls how compaction scheduling is triggered, by time or num delta commits or combination of both. " + "Valid options: " + Arrays.stream(CompactionTriggerStrategy.values()).map(Enum::name).collect(Collectors.joining(","))); + public static final ConfigProperty CLEAN_TRIGGER_STRATEGY = ConfigProperty + .key("hoodie.clean.trigger.strategy") + .defaultValue(CleaningTriggerStrategy.NUM_COMMITS.name()) + .withDocumentation("Controls how cleaning is scheduled. Valid options: " + + Arrays.stream(CleaningTriggerStrategy.values()).map(Enum::name).collect(Collectors.joining(","))); + + public static final ConfigProperty CLEAN_MAX_COMMITS = ConfigProperty + .key("hoodie.clean.max.commits") + .defaultValue("1") + .withDocumentation("Number of commits after the last clean operation, before scheduling of a new clean is attempted."); + public static final ConfigProperty CLEANER_FILE_VERSIONS_RETAINED = ConfigProperty .key("hoodie.cleaner.fileversions.retained") .defaultValue("3") @@ -583,6 +595,16 @@ public class HoodieCompactionConfig extends HoodieConfig { return this; } + public Builder withCleaningTriggerStrategy(String cleaningTriggerStrategy) { + compactionConfig.setValue(CLEAN_TRIGGER_STRATEGY, cleaningTriggerStrategy); + return this; + } + + public Builder withMaxCommitsBeforeCleaning(int maxCommitsBeforeCleaning) { + compactionConfig.setValue(CLEAN_MAX_COMMITS, String.valueOf(maxCommitsBeforeCleaning)); + return this; + } + public Builder withCleanerPolicy(HoodieCleaningPolicy policy) { compactionConfig.setValue(CLEANER_POLICY, policy.name()); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index b2e8b999b..4847d9c91 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -61,6 +61,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.metrics.MetricsReporterType; import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite; import org.apache.hudi.table.RandomFileIdPrefixProvider; +import org.apache.hudi.table.action.clean.CleaningTriggerStrategy; import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode; import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; @@ -1153,6 +1154,18 @@ public class HoodieWriteConfig extends HoodieConfig { return getInt(HoodieCompactionConfig.CLEANER_PARALLELISM_VALUE); } + public int getCleaningMaxCommits() { + return getInt(HoodieCompactionConfig.CLEAN_MAX_COMMITS); + } + + public CleaningTriggerStrategy getCleaningTriggerStrategy() { + return CleaningTriggerStrategy.valueOf(getString(HoodieCompactionConfig.CLEAN_TRIGGER_STRATEGY)); + } + + public boolean isAutoClean() { + return getBoolean(HoodieCompactionConfig.AUTO_CLEAN); + } + public boolean getArchiveMergeEnable() { return getBoolean(HoodieCompactionConfig.ARCHIVE_MERGE_ENABLE); } @@ -1169,10 +1182,6 @@ public class HoodieWriteConfig extends HoodieConfig { return getBoolean(HoodieCompactionConfig.ASYNC_ARCHIVE); } - public boolean isAutoClean() { - return getBoolean(HoodieCompactionConfig.AUTO_CLEAN); - } - public boolean isAsyncClean() { return getBoolean(HoodieCompactionConfig.ASYNC_CLEAN); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index a64cb8845..86f65cae5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.BaseActionExecutor; @@ -58,8 +59,30 @@ public class CleanPlanActionExecutor ext this.extraMetadata = extraMetadata; } - protected Option createCleanerPlan() { - return execute(); + private int getCommitsSinceLastCleaning() { + Option lastCleanInstant = table.getActiveTimeline().getCleanerTimeline().filterCompletedInstants().lastInstant(); + HoodieTimeline commitTimeline = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + + String latestCleanTs; + int numCommits = 0; + if (lastCleanInstant.isPresent()) { + latestCleanTs = lastCleanInstant.get().getTimestamp(); + numCommits = commitTimeline.findInstantsAfter(latestCleanTs).countInstants(); + } else { + numCommits = commitTimeline.countInstants(); + } + + return numCommits; + } + + private boolean needsCleaning(CleaningTriggerStrategy strategy) { + if (strategy == CleaningTriggerStrategy.NUM_COMMITS) { + int numberOfCommits = getCommitsSinceLastCleaning(); + int maxInlineCommitsForNextClean = config.getCleaningMaxCommits(); + return numberOfCommits >= maxInlineCommitsForNextClean; + } else { + throw new HoodieException("Unsupported cleaning trigger strategy: " + config.getCleaningTriggerStrategy()); + } } /** @@ -128,6 +151,9 @@ public class CleanPlanActionExecutor ext @Override public Option execute() { + if (!needsCleaning(config.getCleaningTriggerStrategy())) { + return Option.empty(); + } // Plan a new clean action return requestClean(instantTime); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleaningTriggerStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleaningTriggerStrategy.java new file mode 100644 index 000000000..f1ffad261 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleaningTriggerStrategy.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.clean; + +public enum CleaningTriggerStrategy { + // trigger cleaning when reach n commits + NUM_COMMITS +} diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestHoodieConcatHandle.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestHoodieConcatHandle.java index d81b76b0f..4539c6de3 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestHoodieConcatHandle.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestHoodieConcatHandle.java @@ -117,7 +117,7 @@ public class TestHoodieConcatHandle extends HoodieJavaClientTestBase { records1.add(new HoodieAvroRecord(new HoodieKey(insertRow2.getRowKey(), insertRow2.getPartitionPath()), insertRow2)); int startInstant = 1; - String firstCommitTime = makeNewCommitTime(startInstant++); + String firstCommitTime = makeNewCommitTime(startInstant++, "%09d"); // First insert writeClient.startCommitWithTime(firstCommitTime); writeClient.insert(records1, firstCommitTime); @@ -145,7 +145,7 @@ public class TestHoodieConcatHandle extends HoodieJavaClientTestBase { records2.add(new HoodieAvroRecord(new HoodieKey(insertRow1.getRowKey(), insertRow1.getPartitionPath()), insertRow1)); records2.add(new HoodieAvroRecord(new HoodieKey(insertRow2.getRowKey(), insertRow2.getPartitionPath()), insertRow2)); - String newCommitTime = makeNewCommitTime(startInstant++); + String newCommitTime = makeNewCommitTime(startInstant++, "%09d"); writeClient.startCommitWithTime(newCommitTime); // Second insert is the same as the _row_key of the first one,test allowDuplicateInserts writeClient.insert(records2, newCommitTime); @@ -183,7 +183,7 @@ public class TestHoodieConcatHandle extends HoodieJavaClientTestBase { HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{partitionPath}); int startInstant = 1; - String firstCommitTime = makeNewCommitTime(startInstant++); + String firstCommitTime = makeNewCommitTime(startInstant++, "%09d"); List records1 = dataGenerator.generateInserts(firstCommitTime, 100); // First insert @@ -200,7 +200,7 @@ public class TestHoodieConcatHandle extends HoodieJavaClientTestBase { assertTrue(filter.mightContain(record.getRecordKey())); } - String newCommitTime = makeNewCommitTime(startInstant++); + String newCommitTime = makeNewCommitTime(startInstant++, "%09d"); List records2 = dataGenerator.generateUpdates(newCommitTime, 100); writeClient.startCommitWithTime(newCommitTime); // Second insert is the same as the _row_key of the first one,test allowDuplicateInserts diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java index 793b26703..8f296d510 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java @@ -123,7 +123,7 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase // Prepare the AvroParquetIO HoodieWriteConfig config = makeHoodieClientConfig(); int startInstant = 1; - String firstCommitTime = makeNewCommitTime(startInstant++); + String firstCommitTime = makeNewCommitTime(startInstant++, "%09d"); HoodieJavaWriteClient writeClient = getHoodieWriteClient(config); writeClient.startCommitWithTime(firstCommitTime); metaClient = HoodieTableMetaClient.reload(metaClient); @@ -185,7 +185,7 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase List updatedRecords = Arrays.asList(updatedRecord1, insertedRecord1); - String newCommitTime = makeNewCommitTime(startInstant++); + String newCommitTime = makeNewCommitTime(startInstant++, "%09d"); metaClient = HoodieTableMetaClient.reload(metaClient); writeClient.startCommitWithTime(newCommitTime); List statuses = writeClient.upsert(updatedRecords, newCommitTime); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 552e85af4..56cfe959b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -26,7 +26,6 @@ import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieClusteringStrategy; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieSliceInfo; @@ -88,14 +87,11 @@ import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; @@ -523,7 +519,8 @@ public class TestCleaner extends HoodieClientTestBase { HoodieCleaningPolicy.KEEP_LATEST_COMMITS); // Keep doing some writes and clean inline. Make sure we have expected number of files remaining. - makeIncrementalCommitTimes(8).forEach(newCommitTime -> { + for (int i = 0; i < 8; i++) { + String newCommitTime = makeNewCommitTime(); try { client.startCommitWithTime(newCommitTime); List records = recordUpsertGenWrappedFunction.apply(newCommitTime, 100); @@ -563,7 +560,7 @@ public class TestCleaner extends HoodieClientTestBase { } catch (IOException ioe) { throw new RuntimeException(ioe); } - }); + } } /** @@ -629,16 +626,20 @@ public class TestCleaner extends HoodieClientTestBase { * * @param config HoodieWriteConfig */ - private List runCleaner(HoodieWriteConfig config) throws IOException { - return runCleaner(config, false, 1); + protected List runCleaner(HoodieWriteConfig config) throws IOException { + return runCleaner(config, false, 1, false); } - private List runCleaner(HoodieWriteConfig config, int firstCommitSequence) throws IOException { - return runCleaner(config, false, firstCommitSequence); + protected List runCleanerWithInstantFormat(HoodieWriteConfig config, boolean needInstantInHudiFormat) throws IOException { + return runCleaner(config, false, 1, needInstantInHudiFormat); + } + + protected List runCleaner(HoodieWriteConfig config, int firstCommitSequence, boolean needInstantInHudiFormat) throws IOException { + return runCleaner(config, false, firstCommitSequence, needInstantInHudiFormat); } protected List runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure) throws IOException { - return runCleaner(config, simulateRetryFailure, 1); + return runCleaner(config, simulateRetryFailure, 1, false); } /** @@ -646,9 +647,9 @@ public class TestCleaner extends HoodieClientTestBase { * * @param config HoodieWriteConfig */ - private List runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure, int firstCommitSequence) throws IOException { + protected List runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure, Integer firstCommitSequence, boolean needInstantInHudiFormat) throws IOException { SparkRDDWriteClient writeClient = getHoodieWriteClient(config); - String cleanInstantTs = makeNewCommitTime(firstCommitSequence); + String cleanInstantTs = needInstantInHudiFormat ? makeNewCommitTime(firstCommitSequence, "%014d") : makeNewCommitTime(firstCommitSequence, "%09d"); HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs); if (null == cleanMetadata1) { @@ -706,116 +707,6 @@ public class TestCleaner extends HoodieClientTestBase { return new ArrayList<>(cleanStatMap.values()); } - /** - * Test Hudi COW Table Cleaner - Keep the latest file versions policy. - */ - @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throws Exception { - HoodieWriteConfig config = - HoodieWriteConfig.newBuilder().withPath(basePath) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) - .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) - .build(); - - HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context); - HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter); - - final String p0 = "2020/01/01"; - final String p1 = "2020/01/02"; - final Map> bootstrapMapping = enableBootstrapSourceClean - ? generateBootstrapIndexAndSourceData(p0, p1) : null; - - // make 1 commit, with 1 file per partition - final String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p0).get(0).getFileId() - : UUID.randomUUID().toString(); - final String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p1).get(0).getFileId() - : UUID.randomUUID().toString(); - - Map>> c1PartitionToFilesNameLengthMap = new HashMap<>(); - c1PartitionToFilesNameLengthMap.put(p0, Collections.singletonList(Pair.of(file1P0C0, 100))); - c1PartitionToFilesNameLengthMap.put(p1, Collections.singletonList(Pair.of(file1P1C0, 200))); - testTable.doWriteOperation("00000000000001", WriteOperationType.INSERT, Arrays.asList(p0, p1), - c1PartitionToFilesNameLengthMap, false, false); - - List hoodieCleanStatsOne = runCleaner(config); - assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files"); - assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); - assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); - - // make next commit, with 1 insert & 1 update per partition - final String file2P0C1 = UUID.randomUUID().toString(); - final String file2P1C1 = UUID.randomUUID().toString(); - Map>> c2PartitionToFilesNameLengthMap = new HashMap<>(); - c2PartitionToFilesNameLengthMap.put(p0, Arrays.asList(Pair.of(file1P0C0, 101), Pair.of(file2P0C1, 100))); - c2PartitionToFilesNameLengthMap.put(p1, Arrays.asList(Pair.of(file1P1C0, 201), Pair.of(file2P1C1, 200))); - testTable.doWriteOperation("00000000000002", WriteOperationType.UPSERT, Collections.emptyList(), - c2PartitionToFilesNameLengthMap, false, false); - - // enableBootstrapSourceClean would delete the bootstrap base file at the same time - List hoodieCleanStatsTwo = runCleaner(config, 1); - HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsTwo, p0); - assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size() - + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 - : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); - - if (enableBootstrapSourceClean) { - HoodieFileStatus fstatus = - bootstrapMapping.get(p0).get(0).getBootstrapFileStatus(); - // This ensures full path is recorded in metadata. - assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()), - "Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles() - + " but did not contain " + fstatus.getPath().getUri()); - assertFalse(Files.exists(Paths.get(bootstrapMapping.get( - p0).get(0).getBootstrapFileStatus().getPath().getUri()))); - } - - cleanStat = getCleanStat(hoodieCleanStatsTwo, p1); - assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); - assertTrue(testTable.baseFileExists(p1, "00000000000002", file2P1C1)); - assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); - assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); - assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size() - + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 - : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); - - if (enableBootstrapSourceClean) { - HoodieFileStatus fstatus = - bootstrapMapping.get(p1).get(0).getBootstrapFileStatus(); - // This ensures full path is recorded in metadata. - assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()), - "Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles() - + " but did not contain " + fstatus.getPath().getUri()); - assertFalse(Files.exists(Paths.get(bootstrapMapping.get( - p1).get(0).getBootstrapFileStatus().getPath().getUri()))); - } - - // make next commit, with 2 updates to existing files, and 1 insert - final String file3P0C2 = UUID.randomUUID().toString(); - Map>> c3PartitionToFilesNameLengthMap = new HashMap<>(); - c3PartitionToFilesNameLengthMap.put(p0, Arrays.asList(Pair.of(file1P0C0, 102), Pair.of(file2P0C1, 101), - Pair.of(file3P0C2, 100))); - testTable.doWriteOperation("00000000000003", WriteOperationType.UPSERT, Collections.emptyList(), - c3PartitionToFilesNameLengthMap, false, false); - - List hoodieCleanStatsThree = runCleaner(config, 3); - assertEquals(2, - getCleanStat(hoodieCleanStatsThree, p0) - .getSuccessDeleteFiles().size(), "Must clean two files"); - assertFalse(testTable.baseFileExists(p0, "00000000000002", file1P0C0)); - assertFalse(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); - assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2)); - - // No cleaning on partially written file, with no commit. - testTable.forCommit("00000000000004").withBaseFilesInPartition(p0, file3P0C2); - - List hoodieCleanStatsFour = runCleaner(config); - assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files"); - assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2)); - } - @Test public void testCleanEmptyInstants() throws Exception { HoodieWriteConfig config = @@ -831,16 +722,19 @@ public class TestCleaner extends HoodieClientTestBase { int cleanCount = 20; int startInstant = 1; - for (int i = 0; i < commitCount; i++, startInstant++) { - String commitTime = makeNewCommitTime(startInstant); - HoodieTestTable.of(metaClient).addCommit(commitTime); - } for (int i = 0; i < cleanCount; i++, startInstant++) { - String commitTime = makeNewCommitTime(startInstant); + String commitTime = makeNewCommitTime(startInstant, "%09d"); createCleanMetadata(commitTime + "", false, true); } + int instantClean = startInstant; + + for (int i = 0; i < commitCount; i++, startInstant++) { + String commitTime = makeNewCommitTime(startInstant, "%09d"); + HoodieTestTable.of(metaClient).addCommit(commitTime); + } + List cleanStats = runCleaner(config); HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); @@ -852,7 +746,7 @@ public class TestCleaner extends HoodieClientTestBase { assertEquals(--cleanCount, timeline.getTimelineOfActions( CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants().countInstants()); assertTrue(timeline.getTimelineOfActions( - CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().containsInstant(makeNewCommitTime(--startInstant))); + CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().containsInstant(makeNewCommitTime(--instantClean, "%09d"))); cleanStats = runCleaner(config); timeline = metaClient.reloadActiveTimeline(); @@ -865,91 +759,7 @@ public class TestCleaner extends HoodieClientTestBase { assertEquals(--cleanCount, timeline.getTimelineOfActions( CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants().countInstants()); assertTrue(timeline.getTimelineOfActions( - CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().containsInstant(makeNewCommitTime(--startInstant))); - } - - - /** - * Test HoodieTable.clean() Cleaning by versions logic for MOR table with Log files. - */ - @Test - public void testKeepLatestFileVersionsMOR() throws Exception { - - HoodieWriteConfig config = - HoodieWriteConfig.newBuilder().withPath(basePath) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) - .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) - .build(); - - HoodieTableMetaClient metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); - HoodieTestTable testTable = HoodieTestTable.of(metaClient); - String p0 = "2020/01/01"; - - // Make 3 files, one base file and 2 log files associated with base file - String file1P0 = testTable.addDeltaCommit("000").getFileIdsWithBaseFilesInPartitions(p0).get(p0); - testTable.forDeltaCommit("000") - .withLogFile(p0, file1P0, 1) - .withLogFile(p0, file1P0, 2); - - // Make 2 files, one base file and 1 log files associated with base file - testTable.addDeltaCommit("001") - .withBaseFilesInPartition(p0, file1P0) - .withLogFile(p0, file1P0, 3); - - List hoodieCleanStats = runCleaner(config); - assertEquals(3, - getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles() - .size(), "Must clean three files, one base and 2 log files"); - assertFalse(testTable.baseFileExists(p0, "000", file1P0)); - assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2)); - assertTrue(testTable.baseFileExists(p0, "001", file1P0)); - assertTrue(testTable.logFileExists(p0, "001", file1P0, 3)); - } - - /** - * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. - */ - @Test - public void testKeepLatestCommitsMOR() throws Exception { - - HoodieWriteConfig config = - HoodieWriteConfig.newBuilder().withPath(basePath) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) - .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()) - .build(); - - HoodieTableMetaClient metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); - HoodieTestTable testTable = HoodieTestTable.of(metaClient); - String p0 = "2020/01/01"; - - // Make 3 files, one base file and 2 log files associated with base file - String file1P0 = testTable.addDeltaCommit("000").getFileIdsWithBaseFilesInPartitions(p0).get(p0); - testTable.forDeltaCommit("000") - .withLogFile(p0, file1P0, 1) - .withLogFile(p0, file1P0, 2); - - // Make 2 files, one base file and 1 log files associated with base file - testTable.addDeltaCommit("001") - .withBaseFilesInPartition(p0, file1P0) - .withLogFile(p0, file1P0, 3); - - // Make 2 files, one base file and 1 log files associated with base file - testTable.addDeltaCommit("002") - .withBaseFilesInPartition(p0, file1P0) - .withLogFile(p0, file1P0, 4); - - List hoodieCleanStats = runCleaner(config); - assertEquals(3, - getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles() - .size(), "Must clean three files, one base and 2 log files"); - assertFalse(testTable.baseFileExists(p0, "000", file1P0)); - assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2)); - assertTrue(testTable.baseFileExists(p0, "001", file1P0)); - assertTrue(testTable.logFileExists(p0, "001", file1P0, 3)); - assertTrue(testTable.baseFileExists(p0, "002", file1P0)); - assertTrue(testTable.logFileExists(p0, "002", file1P0, 4)); + CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().containsInstant(makeNewCommitTime(--instantClean, "%09d"))); } @Test @@ -987,7 +797,7 @@ public class TestCleaner extends HoodieClientTestBase { metaClient = HoodieTableMetaClient.reload(metaClient); - List hoodieCleanStatsOne = runCleaner(config); + List hoodieCleanStatsOne = runCleanerWithInstantFormat(config, true); assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files"); assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); @@ -1001,7 +811,7 @@ public class TestCleaner extends HoodieClientTestBase { testTable.addReplaceCommit("00000000000002", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); // run cleaner - List hoodieCleanStatsTwo = runCleaner(config); + List hoodieCleanStatsTwo = runCleanerWithInstantFormat(config, true); assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files"); assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); @@ -1015,7 +825,7 @@ public class TestCleaner extends HoodieClientTestBase { testTable.addReplaceCommit("00000000000003", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); // run cleaner - List hoodieCleanStatsThree = runCleaner(config); + List hoodieCleanStatsThree = runCleanerWithInstantFormat(config, true); assertEquals(0, hoodieCleanStatsThree.size(), "Must not scan any partitions and clean any files"); assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); @@ -1030,7 +840,7 @@ public class TestCleaner extends HoodieClientTestBase { testTable.addReplaceCommit("00000000000004", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); // run cleaner - List hoodieCleanStatsFour = runCleaner(config, 5); + List hoodieCleanStatsFour = runCleaner(config, 5, true); assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3)); assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2)); @@ -1045,7 +855,7 @@ public class TestCleaner extends HoodieClientTestBase { replaceMetadata = generateReplaceCommitMetadata("00000000000006", p0, file3P1C2, file4P1C4); testTable.addReplaceCommit("00000000000006", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); - List hoodieCleanStatsFive = runCleaner(config, 7); + List hoodieCleanStatsFive = runCleaner(config, 7, true); assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3)); assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2)); @@ -1252,147 +1062,6 @@ public class TestCleaner extends HoodieClientTestBase { } } - protected static Stream argumentsForTestKeepLatestCommits() { - return Stream.of( - Arguments.of(false, false, false), - Arguments.of(true, false, false), - Arguments.of(false, true, false), - Arguments.of(false, false, true) - ); - } - - /** - * Test HoodieTable.clean() Cleaning by commit logic for COW table. - */ - @ParameterizedTest - @MethodSource("argumentsForTestKeepLatestCommits") - public void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception { - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) - .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withIncrementalCleaningMode(enableIncrementalClean) - .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) - .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) - .build(); - - HoodieTestTable testTable = HoodieTestTable.of(metaClient); - String p0 = "2020/01/01"; - String p1 = "2020/01/02"; - Map> bootstrapMapping = enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData(p0, p1) : null; - - // make 1 commit, with 1 file per partition - String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p0).get(0).getFileId() - : UUID.randomUUID().toString(); - String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p1).get(0).getFileId() - : UUID.randomUUID().toString(); - testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0); - - HoodieCommitMetadata commitMetadata = generateCommitMetadata("00000000000001", - Collections.unmodifiableMap(new HashMap>() { - { - put(p0, CollectionUtils.createImmutableList(file1P0C0)); - put(p1, CollectionUtils.createImmutableList(file1P1C0)); - } - }) - ); - metaClient.getActiveTimeline().saveAsComplete( - new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000001"), - Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - - metaClient = HoodieTableMetaClient.reload(metaClient); - - List hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry); - assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files"); - assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); - assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); - - // make next commit, with 1 insert & 1 update per partition - Map partitionAndFileId002 = testTable.addInflightCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0, p1); - String file2P0C1 = partitionAndFileId002.get(p0); - String file2P1C1 = partitionAndFileId002.get(p1); - testTable.forCommit("00000000000002").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0); - commitMetadata = generateCommitMetadata("00000000000002", new HashMap>() { - { - put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1)); - put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1)); - } - }); - metaClient.getActiveTimeline().saveAsComplete( - new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000002"), - Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - List hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry); - assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files"); - assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); - assertTrue(testTable.baseFileExists(p1, "00000000000002", file2P1C1)); - assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); - assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); - - // make next commit, with 2 updates to existing files, and 1 insert - String file3P0C2 = testTable.addInflightCommit("00000000000003") - .withBaseFilesInPartition(p0, file1P0C0) - .withBaseFilesInPartition(p0, file2P0C1) - .getFileIdsWithBaseFilesInPartitions(p0).get(p0); - commitMetadata = generateCommitMetadata("00000000000003", - CollectionUtils.createImmutableMap( - p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2))); - metaClient.getActiveTimeline().saveAsComplete( - new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000003"), - Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - - List hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry); - assertEquals(0, hoodieCleanStatsThree.size(), - "Must not clean any file. We have to keep 1 version before the latest commit time to keep"); - assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); - - // make next commit, with 2 updates to existing files, and 1 insert - String file4P0C3 = testTable.addInflightCommit("00000000000004") - .withBaseFilesInPartition(p0, file1P0C0) - .withBaseFilesInPartition(p0, file2P0C1) - .getFileIdsWithBaseFilesInPartitions(p0).get(p0); - commitMetadata = generateCommitMetadata("00000000000004", - CollectionUtils.createImmutableMap( - p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3))); - metaClient.getActiveTimeline().saveAsComplete( - new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000004"), - Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - - List hoodieCleanStatsFour = runCleaner(config, simulateFailureRetry); - // enableBootstrapSourceClean would delete the bootstrap base file as the same time - HoodieCleanStat partitionCleanStat = getCleanStat(hoodieCleanStatsFour, p0); - - assertEquals(enableBootstrapSourceClean ? 2 : 1, partitionCleanStat.getSuccessDeleteFiles().size() - + (partitionCleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 - : partitionCleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least one old file"); - assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); - assertTrue(testTable.baseFileExists(p0, "00000000000002", file1P0C0)); - assertTrue(testTable.baseFileExists(p0, "00000000000003", file1P0C0)); - assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); - assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1)); - assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2)); - assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3)); - if (enableBootstrapSourceClean) { - assertFalse(Files.exists(Paths.get(bootstrapMapping.get( - p0).get(0).getBootstrapFileStatus().getPath().getUri()))); - } - - // No cleaning on partially written file, with no commit. - testTable.forCommit("00000000000005").withBaseFilesInPartition(p0, file3P0C2); - commitMetadata = generateCommitMetadata("00000000000005", - CollectionUtils.createImmutableMap(p0, CollectionUtils.createImmutableList(file3P0C2))); - metaClient.getActiveTimeline().createNewInstant( - new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000005")); - metaClient.getActiveTimeline().transitionRequestedToInflight( - new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000005"), - Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - List hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry); - HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, p0); - assertNull(cleanStat, "Must not clean any files"); - assertTrue(testTable.baseFileExists(p0, "00000000000002", file1P0C0)); - assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); - assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2)); - } - /** * Generate Bootstrap index, bootstrap base file and corresponding metaClient. * @return Partition to BootstrapFileMapping Map @@ -1528,7 +1197,7 @@ public class TestCleaner extends HoodieClientTestBase { .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) .build(); - String commitTime = makeNewCommitTime(1); + String commitTime = makeNewCommitTime(1, "%09d"); List cleanerFileNames = Arrays.asList( HoodieTimeline.makeRequestedCleanerFileName(commitTime), HoodieTimeline.makeInflightCleanerFileName(commitTime)); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java index 961523eb6..f44d67e83 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java @@ -18,42 +18,437 @@ package org.apache.hudi.table.functional; +import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.BootstrapFileMapping; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieTableMetadataWriter; +import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.TestCleaner; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; -import java.util.Map; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNull; +/** + * Tests covering different clean plan policies/strategies. + */ public class TestCleanPlanExecutor extends TestCleaner { + @Test + public void testInvalidCleaningTriggerStrategy() { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(false).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withIncrementalCleaningMode(true) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) + .withCleanBootstrapBaseFileEnabled(true) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2) + .withCleaningTriggerStrategy("invalid_strategy").build()) + .build(); + Exception e = assertThrows(IllegalArgumentException.class, () -> { + runCleaner(config, true); + }, "should fail when invalid trigger strategy is provided!"); + assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.table.action.clean.CleaningTriggerStrategy.invalid_strategy")); + } + + private static Stream argumentsForTestKeepLatestCommits() { + return Stream.of( + Arguments.of(false, false, false), + Arguments.of(true, false, false), + Arguments.of(false, true, false), + Arguments.of(false, false, true) + ); + } + + /** + * Test HoodieTable.clean() Cleaning by commit logic for COW table. + */ + @ParameterizedTest + @MethodSource("argumentsForTestKeepLatestCommits") + public void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withIncrementalCleaningMode(enableIncrementalClean) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) + .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .retainCommits(2) + .withMaxCommitsBeforeCleaning(2).build()) + .build(); + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + String p0 = "2020/01/01"; + String p1 = "2020/01/02"; + Map> bootstrapMapping = enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData(p0, p1) : null; + + // make 1 commit, with 1 file per partition + String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p0).get(0).getFileId() + : UUID.randomUUID().toString(); + String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p1).get(0).getFileId() + : UUID.randomUUID().toString(); + testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0); + + HoodieCommitMetadata commitMetadata = generateCommitMetadata("00000000000001", + Collections.unmodifiableMap(new HashMap>() { + { + put(p0, CollectionUtils.createImmutableList(file1P0C0)); + put(p1, CollectionUtils.createImmutableList(file1P1C0)); + } + }) + ); + metaClient.getActiveTimeline().saveAsComplete( + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000001"), + Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + + metaClient = HoodieTableMetaClient.reload(metaClient); + + List hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry, 2, true); + assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files"); + assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); + assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); + + // make next commit, with 1 insert & 1 update per partition + Map partitionAndFileId002 = testTable.addInflightCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p0, p1); + String file2P0C1 = partitionAndFileId002.get(p0); + String file2P1C1 = partitionAndFileId002.get(p1); + testTable.forCommit("00000000000003").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0); + commitMetadata = generateCommitMetadata("00000000000003", new HashMap>() { + { + put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1)); + put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1)); + } + }); + metaClient.getActiveTimeline().saveAsComplete( + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000003"), + Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + List hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry, 4, true); + assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files"); + assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1)); + assertTrue(testTable.baseFileExists(p1, "00000000000003", file2P1C1)); + assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); + assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); + + // make next commit, with 2 updates to existing files, and 1 insert + String file3P0C2 = testTable.addInflightCommit("00000000000005") + .withBaseFilesInPartition(p0, file1P0C0) + .withBaseFilesInPartition(p0, file2P0C1) + .getFileIdsWithBaseFilesInPartitions(p0).get(p0); + commitMetadata = generateCommitMetadata("00000000000003", + CollectionUtils.createImmutableMap( + p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2))); + metaClient.getActiveTimeline().saveAsComplete( + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000005"), + Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + + List hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry, 6, true); + assertEquals(0, hoodieCleanStatsThree.size(), + "Must not clean any file. We have to keep 1 version before the latest commit time to keep"); + assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); + + // make next commit, with 2 updates to existing files, and 1 insert + String file4P0C3 = testTable.addInflightCommit("00000000000007") + .withBaseFilesInPartition(p0, file1P0C0) + .withBaseFilesInPartition(p0, file2P0C1) + .getFileIdsWithBaseFilesInPartitions(p0).get(p0); + commitMetadata = generateCommitMetadata("00000000000004", + CollectionUtils.createImmutableMap( + p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3))); + metaClient.getActiveTimeline().saveAsComplete( + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000007"), + Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + + List hoodieCleanStatsFour = runCleaner(config, simulateFailureRetry, 8, true); + // enableBootstrapSourceClean would delete the bootstrap base file as the same time + HoodieCleanStat partitionCleanStat = getCleanStat(hoodieCleanStatsFour, p0); + + assertEquals(enableBootstrapSourceClean ? 2 : 1, partitionCleanStat.getSuccessDeleteFiles().size() + + (partitionCleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 + : partitionCleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least one old file"); + assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); + assertTrue(testTable.baseFileExists(p0, "00000000000003", file1P0C0)); + assertTrue(testTable.baseFileExists(p0, "00000000000005", file1P0C0)); + assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1)); + assertTrue(testTable.baseFileExists(p0, "00000000000005", file2P0C1)); + assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2)); + assertTrue(testTable.baseFileExists(p0, "00000000000007", file4P0C3)); + if (enableBootstrapSourceClean) { + assertFalse(Files.exists(Paths.get(bootstrapMapping.get( + p0).get(0).getBootstrapFileStatus().getPath().getUri()))); + } + + metaClient = HoodieTableMetaClient.reload(metaClient); + + String file5P0C4 = testTable.addInflightCommit("00000000000009") + .withBaseFilesInPartition(p0, file1P0C0) + .withBaseFilesInPartition(p0, file2P0C1) + .getFileIdsWithBaseFilesInPartitions(p0).get(p0); + commitMetadata = generateCommitMetadata("00000000000009", CollectionUtils.createImmutableMap( + p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file5P0C4))); + metaClient.getActiveTimeline().saveAsComplete( + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000009"), + Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + + List hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry, 10, true); + + assertEquals(0, hoodieCleanStatsFive.size(), "Must not clean any files since at least 2 commits are needed from last clean operation before " + + "clean can be scheduled again"); + assertTrue(testTable.baseFileExists(p0, "00000000000003", file1P0C0)); + assertTrue(testTable.baseFileExists(p0, "00000000000005", file1P0C0)); + assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1)); + assertTrue(testTable.baseFileExists(p0, "00000000000005", file2P0C1)); + assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2)); + assertTrue(testTable.baseFileExists(p0, "00000000000007", file4P0C3)); + + // No cleaning on partially written file, with no commit. + testTable.forCommit("00000000000011").withBaseFilesInPartition(p0, file3P0C2); + commitMetadata = generateCommitMetadata("00000000000011", CollectionUtils.createImmutableMap(p0, + CollectionUtils.createImmutableList(file3P0C2))); + metaClient.getActiveTimeline().createNewInstant( + new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000011")); + metaClient.getActiveTimeline().transitionRequestedToInflight( + new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000011"), + Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + List hoodieCleanStatsFive2 = runCleaner(config, simulateFailureRetry, 12, true); + HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive2, p0); + assertNull(cleanStat, "Must not clean any files"); + assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2)); + assertTrue(testTable.baseFileExists(p0, "00000000000007", file4P0C3)); + } + + /** + * Test Hudi COW Table Cleaner - Keep the latest file versions policy. + */ + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throws Exception { + HoodieWriteConfig config = + HoodieWriteConfig.newBuilder().withPath(basePath) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) + .build(); + + HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context); + HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter); + + final String p0 = "2020/01/01"; + final String p1 = "2020/01/02"; + final Map> bootstrapMapping = enableBootstrapSourceClean + ? generateBootstrapIndexAndSourceData(p0, p1) : null; + + // make 1 commit, with 1 file per partition + final String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p0).get(0).getFileId() + : UUID.randomUUID().toString(); + final String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p1).get(0).getFileId() + : UUID.randomUUID().toString(); + + Map>> c1PartitionToFilesNameLengthMap = new HashMap<>(); + c1PartitionToFilesNameLengthMap.put(p0, Collections.singletonList(Pair.of(file1P0C0, 100))); + c1PartitionToFilesNameLengthMap.put(p1, Collections.singletonList(Pair.of(file1P1C0, 200))); + testTable.doWriteOperation("00000000000001", WriteOperationType.INSERT, Arrays.asList(p0, p1), + c1PartitionToFilesNameLengthMap, false, false); + + List hoodieCleanStatsOne = runCleanerWithInstantFormat(config, true); + assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files"); + assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); + assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); + + // make next commit, with 1 insert & 1 update per partition + final String file2P0C1 = UUID.randomUUID().toString(); + final String file2P1C1 = UUID.randomUUID().toString(); + Map>> c2PartitionToFilesNameLengthMap = new HashMap<>(); + c2PartitionToFilesNameLengthMap.put(p0, Arrays.asList(Pair.of(file1P0C0, 101), Pair.of(file2P0C1, 100))); + c2PartitionToFilesNameLengthMap.put(p1, Arrays.asList(Pair.of(file1P1C0, 201), Pair.of(file2P1C1, 200))); + testTable.doWriteOperation("00000000000002", WriteOperationType.UPSERT, Collections.emptyList(), + c2PartitionToFilesNameLengthMap, false, false); + + // enableBootstrapSourceClean would delete the bootstrap base file at the same time + List hoodieCleanStatsTwo = runCleaner(config, 1, true); + HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsTwo, p0); + assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size() + + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 + : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); + + if (enableBootstrapSourceClean) { + HoodieFileStatus fstatus = + bootstrapMapping.get(p0).get(0).getBootstrapFileStatus(); + // This ensures full path is recorded in metadata. + assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()), + "Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles() + + " but did not contain " + fstatus.getPath().getUri()); + assertFalse(Files.exists(Paths.get(bootstrapMapping.get( + p0).get(0).getBootstrapFileStatus().getPath().getUri()))); + } + + cleanStat = getCleanStat(hoodieCleanStatsTwo, p1); + assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); + assertTrue(testTable.baseFileExists(p1, "00000000000002", file2P1C1)); + assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); + assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); + assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size() + + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 + : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); + + if (enableBootstrapSourceClean) { + HoodieFileStatus fstatus = + bootstrapMapping.get(p1).get(0).getBootstrapFileStatus(); + // This ensures full path is recorded in metadata. + assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()), + "Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles() + + " but did not contain " + fstatus.getPath().getUri()); + assertFalse(Files.exists(Paths.get(bootstrapMapping.get( + p1).get(0).getBootstrapFileStatus().getPath().getUri()))); + } + + // make next commit, with 2 updates to existing files, and 1 insert + final String file3P0C2 = UUID.randomUUID().toString(); + Map>> c3PartitionToFilesNameLengthMap = new HashMap<>(); + c3PartitionToFilesNameLengthMap.put(p0, Arrays.asList(Pair.of(file1P0C0, 102), Pair.of(file2P0C1, 101), + Pair.of(file3P0C2, 100))); + testTable.doWriteOperation("00000000000003", WriteOperationType.UPSERT, Collections.emptyList(), + c3PartitionToFilesNameLengthMap, false, false); + + List hoodieCleanStatsThree = runCleaner(config, 3, true); + assertEquals(2, + getCleanStat(hoodieCleanStatsThree, p0) + .getSuccessDeleteFiles().size(), "Must clean two files"); + assertFalse(testTable.baseFileExists(p0, "00000000000002", file1P0C0)); + assertFalse(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); + assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2)); + + // No cleaning on partially written file, with no commit. + testTable.forCommit("00000000000004").withBaseFilesInPartition(p0, file3P0C2); + + List hoodieCleanStatsFour = runCleaner(config); + assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files"); + assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2)); + } + + /** + * Test HoodieTable.clean() Cleaning by versions logic for MOR table with Log files. + */ + @Test + public void testKeepLatestFileVersionsMOR() throws Exception { + + HoodieWriteConfig config = + HoodieWriteConfig.newBuilder().withPath(basePath) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) + .build(); + + HoodieTableMetaClient metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + String p0 = "2020/01/01"; + + // Make 3 files, one base file and 2 log files associated with base file + String file1P0 = testTable.addDeltaCommit("000").getFileIdsWithBaseFilesInPartitions(p0).get(p0); + testTable.forDeltaCommit("000") + .withLogFile(p0, file1P0, 1) + .withLogFile(p0, file1P0, 2); + + // Make 2 files, one base file and 1 log files associated with base file + testTable.addDeltaCommit("001") + .withBaseFilesInPartition(p0, file1P0) + .withLogFile(p0, file1P0, 3); + + List hoodieCleanStats = runCleaner(config); + assertEquals(3, + getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles() + .size(), "Must clean three files, one base and 2 log files"); + assertFalse(testTable.baseFileExists(p0, "000", file1P0)); + assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2)); + assertTrue(testTable.baseFileExists(p0, "001", file1P0)); + assertTrue(testTable.logFileExists(p0, "001", file1P0, 3)); + } + + /** + * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. + */ + @Test + public void testKeepLatestCommitsMOR() throws Exception { + + HoodieWriteConfig config = + HoodieWriteConfig.newBuilder().withPath(basePath) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()) + .build(); + + HoodieTableMetaClient metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + String p0 = "2020/01/01"; + + // Make 3 files, one base file and 2 log files associated with base file + String file1P0 = testTable.addDeltaCommit("000").getFileIdsWithBaseFilesInPartitions(p0).get(p0); + testTable.forDeltaCommit("000") + .withLogFile(p0, file1P0, 1) + .withLogFile(p0, file1P0, 2); + + // Make 2 files, one base file and 1 log files associated with base file + testTable.addDeltaCommit("001") + .withBaseFilesInPartition(p0, file1P0) + .withLogFile(p0, file1P0, 3); + + // Make 2 files, one base file and 1 log files associated with base file + testTable.addDeltaCommit("002") + .withBaseFilesInPartition(p0, file1P0) + .withLogFile(p0, file1P0, 4); + + List hoodieCleanStats = runCleaner(config); + assertEquals(3, + getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles() + .size(), "Must clean three files, one base and 2 log files"); + assertFalse(testTable.baseFileExists(p0, "000", file1P0)); + assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2)); + assertTrue(testTable.baseFileExists(p0, "001", file1P0)); + assertTrue(testTable.logFileExists(p0, "001", file1P0, 3)); + assertTrue(testTable.baseFileExists(p0, "002", file1P0)); + assertTrue(testTable.logFileExists(p0, "002", file1P0, 4)); + } + /** * Tests cleaning service based on number of hours retained. */ diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 6f49c6996..5f9aab84d 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -141,8 +141,8 @@ public class HoodieTestTable { return new HoodieTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient); } - public static String makeNewCommitTime(int sequence) { - return String.format("%09d", sequence); + public static String makeNewCommitTime(int sequence, String instantFormat) { + return String.format(instantFormat, sequence); } public static String makeNewCommitTime() { @@ -153,14 +153,6 @@ public class HoodieTestTable { return HoodieActiveTimeline.formatDate(Date.from(dateTime)); } - public static List makeIncrementalCommitTimes(int num) { - return makeIncrementalCommitTimes(num, 1); - } - - public static List makeIncrementalCommitTimes(int num, int firstOffsetSeconds) { - return makeIncrementalCommitTimes(num, firstOffsetSeconds, 0); - } - public static List makeIncrementalCommitTimes(int num, int firstOffsetSeconds, int deltaSecs) { final Instant now = Instant.now(); return IntStream.range(0, num)