diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/BaseScheduleCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/BaseScheduleCompactionActionExecutor.java index b744a7e4b..25c2fec86 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/BaseScheduleCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/BaseScheduleCompactionActionExecutor.java @@ -63,7 +63,7 @@ public abstract class BaseScheduleCompactionActionExecutor conflictingInstants = table.getActiveTimeline() - .getWriteTimeline().getInstants() + .getWriteTimeline().filterCompletedAndCompactionInstants().getInstants() .filter(instant -> HoodieTimeline.compareTimestamps( instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime)) .collect(Collectors.toList()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java index 734fcc296..2706dce7d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java @@ -132,6 +132,30 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { } } + @Test + public void testScheduleCompactionWithInflightInstant() { + HoodieWriteConfig config = getConfig(); + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) { + // insert 100 records. + String newCommitTime = "100"; + writeClient.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, 100); + JavaRDD recordsRDD = jsc.parallelize(records, 1); + writeClient.insert(recordsRDD, newCommitTime).collect(); + + // create one inflight instance. + newCommitTime = "102"; + writeClient.startCommitWithTime(newCommitTime); + metaClient.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED, + HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty()); + + // create one compaction instance before exist inflight instance. + String compactionTime = "101"; + writeClient.scheduleCompactionAtInstant(compactionTime, Option.empty()); + } + } + @Test public void testWriteStatusContentsAfterCompaction() throws Exception { // insert 100 records