diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 79601f8fa..c5e43a080 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -301,6 +301,8 @@ public class SparkRDDWriteClient extends finalizeWrite(table, compactionCommitTime, writeStats); LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata); SparkCompactHelpers.newInstance().completeInflightCompaction(table, compactionCommitTime, metadata); + WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime) + .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); if (compactionTimer != null) { long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java index 4c9ab3dc9..c3f4395b5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java @@ -46,6 +46,7 @@ import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hudi.testutils.HoodieClientTestUtils; @@ -175,6 +176,7 @@ public class CompactionTestBase extends HoodieClientTestBase { HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException { client.compact(compactionInstantTime); + assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table, compactionInstantTime).doesMarkerDirExist()); List fileSliceList = getCurrentLatestFileSlices(table); assertTrue(fileSliceList.stream().findAny().isPresent(), "Ensure latest file-slices are not empty"); assertFalse(fileSliceList.stream() diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java index 823d651aa..ef52953a2 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java @@ -26,6 +26,8 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.marker.WriteMarkersFactory; import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -35,6 +37,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; public class TestInlineCompaction extends CompactionTestBase { @@ -84,6 +87,8 @@ public class TestInlineCompaction extends CompactionTestBase { metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); assertEquals(4, metaClient.getActiveTimeline().getWriteTimeline().countInstants()); assertEquals(HoodieTimeline.COMMIT_ACTION, metaClient.getActiveTimeline().lastInstant().get().getAction()); + String compactionTime = metaClient.getActiveTimeline().lastInstant().get().getTimestamp(); + assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), HoodieSparkTable.create(cfg, context), compactionTime).doesMarkerDirExist()); } }