diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index 08f377407..934d91a27 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.DefaultHoodieConfig; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy; @@ -46,6 +47,9 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { public static final String INLINE_COMPACT_PROP = "hoodie.compact.inline"; // Run a compaction every N delta commits public static final String INLINE_COMPACT_NUM_DELTA_COMMITS_PROP = "hoodie.compact.inline.max.delta.commits"; + // Run a compaction when time elapsed > N seconds since last compaction + public static final String INLINE_COMPACT_TIME_DELTA_SECONDS_PROP = "hoodie.compact.inline.max.delta.seconds"; + public static final String INLINE_COMPACT_TRIGGER_STRATEGY_PROP = "hoodie.compact.inline.trigger.strategy"; public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP = "hoodie.cleaner.fileversions.retained"; public static final String CLEANER_COMMITS_RETAINED_PROP = "hoodie.cleaner.commits.retained"; public static final String CLEANER_INCREMENTAL_MODE = "hoodie.cleaner.incremental.mode"; @@ -109,6 +113,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { private static final String DEFAULT_INLINE_COMPACT = "false"; private static final String DEFAULT_INCREMENTAL_CLEANER = "true"; private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = "5"; + private static final String DEFAULT_INLINE_COMPACT_TIME_DELTA_SECONDS = String.valueOf(60 * 60); + private static final String DEFAULT_INLINE_COMPACT_TRIGGER_STRATEGY = CompactionTriggerStrategy.NUM_COMMITS.name(); private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = "3"; private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "10"; private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30"; @@ -164,6 +170,11 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { return this; } + public Builder withInlineCompactionTriggerStrategy(CompactionTriggerStrategy compactionTriggerStrategy) { + props.setProperty(INLINE_COMPACT_TRIGGER_STRATEGY_PROP, compactionTriggerStrategy.name()); + return this; + } + public Builder withCleanerPolicy(HoodieCleaningPolicy policy) { props.setProperty(CLEANER_POLICY_PROP, policy.name()); return this; @@ -235,6 +246,11 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { return this; } + public Builder withMaxDeltaSecondsBeforeCompaction(int maxDeltaSecondsBeforeCompaction) { + props.setProperty(INLINE_COMPACT_TIME_DELTA_SECONDS_PROP, String.valueOf(maxDeltaSecondsBeforeCompaction)); + return this; + } + public Builder withCompactionLazyBlockReadEnabled(Boolean compactionLazyBlockReadEnabled) { props.setProperty(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, String.valueOf(compactionLazyBlockReadEnabled)); return this; @@ -271,6 +287,10 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { DEFAULT_INLINE_COMPACT); setDefaultOnCondition(props, !props.containsKey(INLINE_COMPACT_NUM_DELTA_COMMITS_PROP), INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS); + setDefaultOnCondition(props, !props.containsKey(INLINE_COMPACT_TIME_DELTA_SECONDS_PROP), + INLINE_COMPACT_TIME_DELTA_SECONDS_PROP, DEFAULT_INLINE_COMPACT_TIME_DELTA_SECONDS); + setDefaultOnCondition(props, !props.containsKey(INLINE_COMPACT_TRIGGER_STRATEGY_PROP), + INLINE_COMPACT_TRIGGER_STRATEGY_PROP, DEFAULT_INLINE_COMPACT_TRIGGER_STRATEGY); setDefaultOnCondition(props, !props.containsKey(CLEANER_POLICY_PROP), CLEANER_POLICY_PROP, DEFAULT_CLEANER_POLICY); setDefaultOnCondition(props, !props.containsKey(CLEANER_FILE_VERSIONS_RETAINED_PROP), diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index cadb2d126..e3c1ef681 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -34,6 +34,7 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.SimpleAvroKeyGenerator; import org.apache.hudi.metrics.MetricsReporterType; import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite; +import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; import org.apache.hadoop.hbase.io.compress.Compression; @@ -405,10 +406,18 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.INLINE_COMPACT_PROP)); } + public CompactionTriggerStrategy getInlineCompactTriggerStrategy() { + return CompactionTriggerStrategy.valueOf(props.getProperty(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY_PROP)); + } + public int getInlineCompactDeltaCommitMax() { return Integer.parseInt(props.getProperty(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP)); } + public int getInlineCompactDeltaSecondsMax() { + return Integer.parseInt(props.getProperty(HoodieCompactionConfig.INLINE_COMPACT_TIME_DELTA_SECONDS_PROP)); + } + public CompactionStrategy getCompactionStrategy() { return ReflectionUtils.loadClass(props.getProperty(HoodieCompactionConfig.COMPACTION_STRATEGY_PROP)); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionTriggerStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionTriggerStrategy.java new file mode 100644 index 000000000..6a4e634bc --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionTriggerStrategy.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.compact; + +public enum CompactionTriggerStrategy { + // trigger compaction when reach N delta commits + NUM_COMMITS, + // trigger compaction when time elapsed > N seconds since last compaction + TIME_ELAPSED, + // trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied + NUM_AND_TIME, + // trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied + NUM_OR_TIME +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java index 34db0a780..9c44499a8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java @@ -25,7 +25,9 @@ import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -37,6 +39,7 @@ import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import java.io.IOException; +import java.text.ParseException; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -58,36 +61,92 @@ public class SparkScheduleCompactionActionExecutor lastCompaction = table.getActiveTimeline().getCommitTimeline() - .filterCompletedInstants().lastInstant(); - String lastCompactionTs = "0"; - if (lastCompaction.isPresent()) { - lastCompactionTs = lastCompaction.get().getTimestamp(); + // judge if we need to compact according to num delta commits and time elapsed + boolean compactable = needCompact(config.getInlineCompactTriggerStrategy()); + if (compactable) { + LOG.info("Generating compaction plan for merge on read table " + config.getBasePath()); + HoodieSparkMergeOnReadTableCompactor compactor = new HoodieSparkMergeOnReadTableCompactor(); + try { + SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView(); + Set fgInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations() + .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId()) + .collect(Collectors.toSet()); + // exclude files in pending clustering from compaction. + fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet())); + return compactor.generateCompactionPlan(context, table, config, instantTime, fgInPendingCompactionAndClustering); + } catch (IOException e) { + throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e); + } } - int deltaCommitsSinceLastCompaction = table.getActiveTimeline().getDeltaCommitTimeline() - .findInstantsAfter(lastCompactionTs, Integer.MAX_VALUE).countInstants(); - if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) { - LOG.info("Not scheduling compaction as only " + deltaCommitsSinceLastCompaction - + " delta commits was found since last compaction " + lastCompactionTs + ". Waiting for " - + config.getInlineCompactDeltaCommitMax()); - return new HoodieCompactionPlan(); - } - - LOG.info("Generating compaction plan for merge on read table " + config.getBasePath()); - HoodieSparkMergeOnReadTableCompactor compactor = new HoodieSparkMergeOnReadTableCompactor(); - try { - SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView(); - Set fgInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations() - .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId()) - .collect(Collectors.toSet()); - // exclude files in pending clustering from compaction. - fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet())); - return compactor.generateCompactionPlan(context, table, config, instantTime, fgInPendingCompactionAndClustering); - - } catch (IOException e) { - throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e); - } + return new HoodieCompactionPlan(); } + public Pair getLatestDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) { + Option lastCompaction = table.getActiveTimeline().getCommitTimeline() + .filterCompletedInstants().lastInstant(); + HoodieTimeline deltaCommits = table.getActiveTimeline().getDeltaCommitTimeline(); + + String latestInstantTs; + int deltaCommitsSinceLastCompaction = 0; + if (lastCompaction.isPresent()) { + latestInstantTs = lastCompaction.get().getTimestamp(); + deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfter(latestInstantTs, Integer.MAX_VALUE).countInstants(); + } else { + latestInstantTs = deltaCommits.firstInstant().get().getTimestamp(); + deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfterOrEquals(latestInstantTs, Integer.MAX_VALUE).countInstants(); + } + return Pair.of(deltaCommitsSinceLastCompaction, latestInstantTs); + } + + public boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) { + boolean compactable; + // get deltaCommitsSinceLastCompaction and lastCompactionTs + Pair latestDeltaCommitInfo = getLatestDeltaCommitInfo(compactionTriggerStrategy); + int inlineCompactDeltaCommitMax = config.getInlineCompactDeltaCommitMax(); + int inlineCompactDeltaSecondsMax = config.getInlineCompactDeltaSecondsMax(); + switch (compactionTriggerStrategy) { + case NUM_COMMITS: + compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft(); + if (compactable) { + LOG.info(String.format("The delta commits >= %s, trigger compaction scheduler.", inlineCompactDeltaCommitMax)); + } + break; + case TIME_ELAPSED: + compactable = inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight()); + if (compactable) { + LOG.info(String.format("The elapsed time >=%ss, trigger compaction scheduler.", inlineCompactDeltaSecondsMax)); + } + break; + case NUM_OR_TIME: + compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft() + || inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight()); + if (compactable) { + LOG.info(String.format("The delta commits >= %s or elapsed_time >=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax, + inlineCompactDeltaSecondsMax)); + } + break; + case NUM_AND_TIME: + compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft() + && inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight()); + if (compactable) { + LOG.info(String.format("The delta commits >= %s and elapsed_time >=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax, + inlineCompactDeltaSecondsMax)); + } + break; + default: + throw new HoodieCompactionException("Unsupported compaction trigger strategy: " + config.getInlineCompactTriggerStrategy()); + } + return compactable; + } + + public Long parsedToSeconds(String time) { + long timestamp; + try { + timestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(time).getTime() / 1000; + } catch (ParseException e) { + throw new HoodieCompactionException(e.getMessage(), e); + } + return timestamp; + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java index 066a9656f..80542edfa 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java @@ -24,7 +24,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.junit.jupiter.api.Test; @@ -39,21 +38,25 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class TestInlineCompaction extends CompactionTestBase { - private HoodieWriteConfig getConfigForInlineCompaction(int maxDeltaCommits) { + private HoodieWriteConfig getConfigForInlineCompaction(int maxDeltaCommits, int maxDeltaTime, CompactionTriggerStrategy inlineCompactionType) { return getConfigBuilder(false) .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits).build()) + .withInlineCompaction(true) + .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits) + .withMaxDeltaSecondsBeforeCompaction(maxDeltaTime) + .withInlineCompactionTriggerStrategy(inlineCompactionType).build()) .build(); } @Test public void testCompactionIsNotScheduledEarly() throws Exception { // Given: make two commits - HoodieWriteConfig cfg = getConfigForInlineCompaction(3); + HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 60, CompactionTriggerStrategy.NUM_COMMITS); try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { - List records = dataGen.generateInserts("000", 100); + List records = dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 100); HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); - runNextDeltaCommits(writeClient, readClient, Arrays.asList("000", "001"), records, cfg, true, new ArrayList<>()); + List instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); + runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); // Then: ensure no compaction is executedm since there are only 2 delta commits @@ -62,9 +65,9 @@ public class TestInlineCompaction extends CompactionTestBase { } @Test - public void testSuccessfulCompaction() throws Exception { + public void testSuccessfulCompactionBasedOnNumCommits() throws Exception { // Given: make three commits - HoodieWriteConfig cfg = getConfigForInlineCompaction(3); + HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 60, CompactionTriggerStrategy.NUM_COMMITS); List instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { @@ -85,32 +88,181 @@ public class TestInlineCompaction extends CompactionTestBase { } @Test - public void testCompactionRetryOnFailure() throws Exception { + public void testSuccessfulCompactionBasedOnTime() throws Exception { + // Given: make one commit + HoodieWriteConfig cfg = getConfigForInlineCompaction(5, 10, CompactionTriggerStrategy.TIME_ELAPSED); + + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { + String instantTime = HoodieActiveTimeline.createNewInstantTime(); + List records = dataGen.generateInserts(instantTime, 10); + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + runNextDeltaCommits(writeClient, readClient, Arrays.asList(instantTime), records, cfg, true, new ArrayList<>()); + + // after 10s, that will trigger compaction + String finalInstant = HoodieActiveTimeline.createNewInstantTime(10000); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 100), writeClient, metaClient, cfg, false); + + // Then: ensure the file slices are compacted as per policy + metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + assertEquals(3, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); + assertEquals(HoodieTimeline.COMMIT_ACTION, metaClient.getActiveTimeline().lastInstant().get().getAction()); + } + } + + @Test + public void testSuccessfulCompactionBasedOnNumOrTime() throws Exception { + // Given: make three commits + HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 20, CompactionTriggerStrategy.NUM_OR_TIME); + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { + List records = dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10); + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + List instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); + runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); + // Then: trigger the compaction because reach 3 commits. + String finalInstant = HoodieActiveTimeline.createNewInstantTime(); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false); + + metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); + // 4th commit, that will trigger compaction because reach the time elapsed + metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + finalInstant = HoodieActiveTimeline.createNewInstantTime(20000); + createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false); + + metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + assertEquals(6, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); + } + } + + @Test + public void testSuccessfulCompactionBasedOnNumAndTime() throws Exception { + // Given: make three commits + HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 20, CompactionTriggerStrategy.NUM_AND_TIME); + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { + List records = dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10); + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + List instants = IntStream.range(0, 3).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); + runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + + // Then: ensure no compaction is executedm since there are only 3 delta commits + assertEquals(3, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); + // 4th commit, that will trigger compaction + metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + String finalInstant = HoodieActiveTimeline.createNewInstantTime(20000); + createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false); + + metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + assertEquals(5, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); + } + } + + @Test + public void testCompactionRetryOnFailureBasedOnNumCommits() throws Exception { // Given: two commits, schedule compaction and its failed/in-flight HoodieWriteConfig cfg = getConfigBuilder(false) .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .withInlineCompaction(false) + .withMaxNumDeltaCommitsBeforeCompaction(1).build()) .build(); - List instants = CollectionUtils.createImmutableList("000", "001"); + List instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); + String instantTime2; try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { List records = dataGen.generateInserts(instants.get(0), 100); HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); - // Schedule compaction 002, make it in-flight (simulates inline compaction failing) - scheduleCompaction("002", writeClient, cfg); - moveCompactionFromRequestedToInflight("002", cfg); + // Schedule compaction instant2, make it in-flight (simulates inline compaction failing) + instantTime2 = HoodieActiveTimeline.createNewInstantTime(); + scheduleCompaction(instantTime2, writeClient, cfg); + moveCompactionFromRequestedToInflight(instantTime2, cfg); } // When: a third commit happens - HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(2); + HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(2, 60, CompactionTriggerStrategy.NUM_COMMITS); + String instantTime3 = HoodieActiveTimeline.createNewInstantTime(); try (SparkRDDWriteClient writeClient = getHoodieWriteClient(inlineCfg)) { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); - createNextDeltaCommit("003", dataGen.generateUpdates("003", 100), writeClient, metaClient, inlineCfg, false); + createNextDeltaCommit(instantTime3, dataGen.generateUpdates(instantTime3, 100), writeClient, metaClient, inlineCfg, false); } // Then: 1 delta commit is done, the failed compaction is retried metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); - assertEquals("002", metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); + assertEquals(instantTime2, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); + } + + @Test + public void testCompactionRetryOnFailureBasedOnTime() throws Exception { + // Given: two commits, schedule compaction and its failed/in-flight + HoodieWriteConfig cfg = getConfigBuilder(false) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withInlineCompaction(false) + .withMaxDeltaSecondsBeforeCompaction(5) + .withInlineCompactionTriggerStrategy(CompactionTriggerStrategy.TIME_ELAPSED).build()) + .build(); + String instantTime; + List instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { + List records = dataGen.generateInserts(instants.get(0), 100); + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); + // Schedule compaction instantTime, make it in-flight (simulates inline compaction failing) + instantTime = HoodieActiveTimeline.createNewInstantTime(10000); + scheduleCompaction(instantTime, writeClient, cfg); + moveCompactionFromRequestedToInflight(instantTime, cfg); + } + + // When: commit happens after 10s + HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(5, 10, CompactionTriggerStrategy.TIME_ELAPSED); + String instantTime2; + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(inlineCfg)) { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + instantTime2 = HoodieActiveTimeline.createNewInstantTime(); + createNextDeltaCommit(instantTime2, dataGen.generateUpdates(instantTime2, 10), writeClient, metaClient, inlineCfg, false); + } + + // Then: 1 delta commit is done, the failed compaction is retried + metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); + assertEquals(instantTime, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); + } + + @Test + public void testCompactionRetryOnFailureBasedOnNumAndTime() throws Exception { + // Given: two commits, schedule compaction and its failed/in-flight + HoodieWriteConfig cfg = getConfigBuilder(false) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withInlineCompaction(false) + .withMaxDeltaSecondsBeforeCompaction(1) + .withMaxNumDeltaCommitsBeforeCompaction(1) + .withInlineCompactionTriggerStrategy(CompactionTriggerStrategy.NUM_AND_TIME).build()) + .build(); + String instantTime; + List instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { + List records = dataGen.generateInserts(instants.get(0), 10); + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); + // Schedule compaction instantTime, make it in-flight (simulates inline compaction failing) + instantTime = HoodieActiveTimeline.createNewInstantTime(); + scheduleCompaction(instantTime, writeClient, cfg); + moveCompactionFromRequestedToInflight(instantTime, cfg); + } + + // When: a third commit happens + HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(3, 20, CompactionTriggerStrategy.NUM_OR_TIME); + String instantTime2; + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(inlineCfg)) { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + instantTime2 = HoodieActiveTimeline.createNewInstantTime(); + createNextDeltaCommit(instantTime2, dataGen.generateUpdates(instantTime2, 10), writeClient, metaClient, inlineCfg, false); + } + + // Then: 1 delta commit is done, the failed compaction is retried + metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); + assertEquals(instantTime, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 865f0dc1e..4a03cd414 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -81,10 +81,18 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { * Ensures each instant time is atleast 1 second apart since we create instant times at second granularity */ public static String createNewInstantTime() { + return createNewInstantTime(0); + } + + /** + * Returns next instant time that adds N milliseconds in the {@link #COMMIT_FORMATTER} format. + * Ensures each instant time is atleast 1 second apart since we create instant times at second granularity + */ + public static String createNewInstantTime(long milliseconds) { return lastInstantTime.updateAndGet((oldVal) -> { String newCommitTime; do { - newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); + newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(System.currentTimeMillis() + milliseconds)); } while (HoodieTimeline.compareTimestamps(newCommitTime, LESSER_THAN_OR_EQUALS, oldVal)); return newCommitTime; }); diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieCompactException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieCompactException.java new file mode 100644 index 000000000..0d51706bb --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieCompactException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.exception; + +public class HoodieCompactException extends HoodieException { + + public HoodieCompactException(String msg) { + super(msg); + } + + public HoodieCompactException(String msg, Throwable e) { + super(msg, e); + } +}