[HUDI-1381] Schedule compaction based on time elapsed (#2260)
- introduce configs to control how compaction is triggered - Compaction can be triggered using time, number of delta commits and/or combinations - Default behaviour remains the same.
This commit is contained in:
@@ -24,7 +24,6 @@ import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.CollectionUtils;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -39,21 +38,25 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
public class TestInlineCompaction extends CompactionTestBase {
|
||||
|
||||
private HoodieWriteConfig getConfigForInlineCompaction(int maxDeltaCommits) {
|
||||
private HoodieWriteConfig getConfigForInlineCompaction(int maxDeltaCommits, int maxDeltaTime, CompactionTriggerStrategy inlineCompactionType) {
|
||||
return getConfigBuilder(false)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits).build())
|
||||
.withInlineCompaction(true)
|
||||
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits)
|
||||
.withMaxDeltaSecondsBeforeCompaction(maxDeltaTime)
|
||||
.withInlineCompactionTriggerStrategy(inlineCompactionType).build())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactionIsNotScheduledEarly() throws Exception {
|
||||
// Given: make two commits
|
||||
HoodieWriteConfig cfg = getConfigForInlineCompaction(3);
|
||||
HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 60, CompactionTriggerStrategy.NUM_COMMITS);
|
||||
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
|
||||
List<HoodieRecord> records = dataGen.generateInserts("000", 100);
|
||||
List<HoodieRecord> records = dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 100);
|
||||
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
|
||||
runNextDeltaCommits(writeClient, readClient, Arrays.asList("000", "001"), records, cfg, true, new ArrayList<>());
|
||||
List<String> instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
|
||||
runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
|
||||
// Then: ensure no compaction is executedm since there are only 2 delta commits
|
||||
@@ -62,9 +65,9 @@ public class TestInlineCompaction extends CompactionTestBase {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuccessfulCompaction() throws Exception {
|
||||
public void testSuccessfulCompactionBasedOnNumCommits() throws Exception {
|
||||
// Given: make three commits
|
||||
HoodieWriteConfig cfg = getConfigForInlineCompaction(3);
|
||||
HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 60, CompactionTriggerStrategy.NUM_COMMITS);
|
||||
List<String> instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
|
||||
|
||||
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
|
||||
@@ -85,32 +88,181 @@ public class TestInlineCompaction extends CompactionTestBase {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactionRetryOnFailure() throws Exception {
|
||||
public void testSuccessfulCompactionBasedOnTime() throws Exception {
|
||||
// Given: make one commit
|
||||
HoodieWriteConfig cfg = getConfigForInlineCompaction(5, 10, CompactionTriggerStrategy.TIME_ELAPSED);
|
||||
|
||||
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
|
||||
String instantTime = HoodieActiveTimeline.createNewInstantTime();
|
||||
List<HoodieRecord> records = dataGen.generateInserts(instantTime, 10);
|
||||
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
|
||||
runNextDeltaCommits(writeClient, readClient, Arrays.asList(instantTime), records, cfg, true, new ArrayList<>());
|
||||
|
||||
// after 10s, that will trigger compaction
|
||||
String finalInstant = HoodieActiveTimeline.createNewInstantTime(10000);
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 100), writeClient, metaClient, cfg, false);
|
||||
|
||||
// Then: ensure the file slices are compacted as per policy
|
||||
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
assertEquals(3, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
|
||||
assertEquals(HoodieTimeline.COMMIT_ACTION, metaClient.getActiveTimeline().lastInstant().get().getAction());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuccessfulCompactionBasedOnNumOrTime() throws Exception {
|
||||
// Given: make three commits
|
||||
HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 20, CompactionTriggerStrategy.NUM_OR_TIME);
|
||||
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
|
||||
List<HoodieRecord> records = dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10);
|
||||
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
|
||||
List<String> instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
|
||||
runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());
|
||||
// Then: trigger the compaction because reach 3 commits.
|
||||
String finalInstant = HoodieActiveTimeline.createNewInstantTime();
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
|
||||
|
||||
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
|
||||
// 4th commit, that will trigger compaction because reach the time elapsed
|
||||
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
finalInstant = HoodieActiveTimeline.createNewInstantTime(20000);
|
||||
createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
|
||||
|
||||
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
assertEquals(6, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuccessfulCompactionBasedOnNumAndTime() throws Exception {
|
||||
// Given: make three commits
|
||||
HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 20, CompactionTriggerStrategy.NUM_AND_TIME);
|
||||
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
|
||||
List<HoodieRecord> records = dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10);
|
||||
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
|
||||
List<String> instants = IntStream.range(0, 3).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
|
||||
runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
|
||||
// Then: ensure no compaction is executedm since there are only 3 delta commits
|
||||
assertEquals(3, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
|
||||
// 4th commit, that will trigger compaction
|
||||
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
String finalInstant = HoodieActiveTimeline.createNewInstantTime(20000);
|
||||
createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
|
||||
|
||||
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
assertEquals(5, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactionRetryOnFailureBasedOnNumCommits() throws Exception {
|
||||
// Given: two commits, schedule compaction and its failed/in-flight
|
||||
HoodieWriteConfig cfg = getConfigBuilder(false)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
|
||||
.withInlineCompaction(false)
|
||||
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
|
||||
.build();
|
||||
List<String> instants = CollectionUtils.createImmutableList("000", "001");
|
||||
List<String> instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
|
||||
String instantTime2;
|
||||
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
|
||||
List<HoodieRecord> records = dataGen.generateInserts(instants.get(0), 100);
|
||||
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
|
||||
runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());
|
||||
// Schedule compaction 002, make it in-flight (simulates inline compaction failing)
|
||||
scheduleCompaction("002", writeClient, cfg);
|
||||
moveCompactionFromRequestedToInflight("002", cfg);
|
||||
// Schedule compaction instant2, make it in-flight (simulates inline compaction failing)
|
||||
instantTime2 = HoodieActiveTimeline.createNewInstantTime();
|
||||
scheduleCompaction(instantTime2, writeClient, cfg);
|
||||
moveCompactionFromRequestedToInflight(instantTime2, cfg);
|
||||
}
|
||||
|
||||
// When: a third commit happens
|
||||
HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(2);
|
||||
HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(2, 60, CompactionTriggerStrategy.NUM_COMMITS);
|
||||
String instantTime3 = HoodieActiveTimeline.createNewInstantTime();
|
||||
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg)) {
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
createNextDeltaCommit("003", dataGen.generateUpdates("003", 100), writeClient, metaClient, inlineCfg, false);
|
||||
createNextDeltaCommit(instantTime3, dataGen.generateUpdates(instantTime3, 100), writeClient, metaClient, inlineCfg, false);
|
||||
}
|
||||
|
||||
// Then: 1 delta commit is done, the failed compaction is retried
|
||||
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
|
||||
assertEquals("002", metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
|
||||
assertEquals(instantTime2, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactionRetryOnFailureBasedOnTime() throws Exception {
|
||||
// Given: two commits, schedule compaction and its failed/in-flight
|
||||
HoodieWriteConfig cfg = getConfigBuilder(false)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withInlineCompaction(false)
|
||||
.withMaxDeltaSecondsBeforeCompaction(5)
|
||||
.withInlineCompactionTriggerStrategy(CompactionTriggerStrategy.TIME_ELAPSED).build())
|
||||
.build();
|
||||
String instantTime;
|
||||
List<String> instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
|
||||
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
|
||||
List<HoodieRecord> records = dataGen.generateInserts(instants.get(0), 100);
|
||||
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
|
||||
runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());
|
||||
// Schedule compaction instantTime, make it in-flight (simulates inline compaction failing)
|
||||
instantTime = HoodieActiveTimeline.createNewInstantTime(10000);
|
||||
scheduleCompaction(instantTime, writeClient, cfg);
|
||||
moveCompactionFromRequestedToInflight(instantTime, cfg);
|
||||
}
|
||||
|
||||
// When: commit happens after 10s
|
||||
HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(5, 10, CompactionTriggerStrategy.TIME_ELAPSED);
|
||||
String instantTime2;
|
||||
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg)) {
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
instantTime2 = HoodieActiveTimeline.createNewInstantTime();
|
||||
createNextDeltaCommit(instantTime2, dataGen.generateUpdates(instantTime2, 10), writeClient, metaClient, inlineCfg, false);
|
||||
}
|
||||
|
||||
// Then: 1 delta commit is done, the failed compaction is retried
|
||||
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
|
||||
assertEquals(instantTime, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactionRetryOnFailureBasedOnNumAndTime() throws Exception {
|
||||
// Given: two commits, schedule compaction and its failed/in-flight
|
||||
HoodieWriteConfig cfg = getConfigBuilder(false)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withInlineCompaction(false)
|
||||
.withMaxDeltaSecondsBeforeCompaction(1)
|
||||
.withMaxNumDeltaCommitsBeforeCompaction(1)
|
||||
.withInlineCompactionTriggerStrategy(CompactionTriggerStrategy.NUM_AND_TIME).build())
|
||||
.build();
|
||||
String instantTime;
|
||||
List<String> instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
|
||||
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
|
||||
List<HoodieRecord> records = dataGen.generateInserts(instants.get(0), 10);
|
||||
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
|
||||
runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());
|
||||
// Schedule compaction instantTime, make it in-flight (simulates inline compaction failing)
|
||||
instantTime = HoodieActiveTimeline.createNewInstantTime();
|
||||
scheduleCompaction(instantTime, writeClient, cfg);
|
||||
moveCompactionFromRequestedToInflight(instantTime, cfg);
|
||||
}
|
||||
|
||||
// When: a third commit happens
|
||||
HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(3, 20, CompactionTriggerStrategy.NUM_OR_TIME);
|
||||
String instantTime2;
|
||||
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg)) {
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
instantTime2 = HoodieActiveTimeline.createNewInstantTime();
|
||||
createNextDeltaCommit(instantTime2, dataGen.generateUpdates(instantTime2, 10), writeClient, metaClient, inlineCfg, false);
|
||||
}
|
||||
|
||||
// Then: 1 delta commit is done, the failed compaction is retried
|
||||
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
|
||||
assertEquals(instantTime, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user