diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java index 6992a824d..094c0b390 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java @@ -55,6 +55,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; @@ -200,6 +201,31 @@ public class CompactionTestBase extends HoodieClientTestBase { } + protected void executeCompactionWithReplacedFiles(String compactionInstantTime, SparkRDDWriteClient client, HoodieTable table, + HoodieWriteConfig cfg, String[] partitions, Set replacedFileIds) throws IOException { + + client.compact(compactionInstantTime); + List fileSliceList = getCurrentLatestFileSlices(table); + assertTrue(fileSliceList.stream().findAny().isPresent(), "Ensure latest file-slices are not empty"); + assertFalse(fileSliceList.stream() + .anyMatch(fs -> replacedFileIds.contains(fs.getFileGroupId())), + "Compacted files should not show up in latest slices"); + + // verify that there is a commit + table = getHoodieTable(new HoodieTableMetaClient(hadoopConf, cfg.getBasePath(), true), cfg); + HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants(); + // verify compaction commit is visible in timeline + assertTrue(timeline.filterCompletedInstants().getInstants() + .filter(instant -> compactionInstantTime.equals(instant.getTimestamp())).findFirst().isPresent()); + for (String partition: partitions) { + table.getSliceView().getLatestFileSlicesBeforeOrOn(partition, compactionInstantTime, true).forEach(fs -> { + // verify that all log files are merged + assertEquals(0, fs.getLogFiles().count()); + assertTrue(fs.getBaseFile().isPresent()); + }); + } + } + protected List createNextDeltaCommit(String instantTime, List records, SparkRDDWriteClient client, HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, boolean skipCommit) { JavaRDD writeRecords = jsc.parallelize(records, 1); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java index 8da1f3ddc..fd6bd839c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -30,11 +31,14 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; +import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -332,4 +336,52 @@ public class TestAsyncCompaction extends CompactionTestBase { executeCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, true); } } + + @Test + public void testCompactionOnReplacedFiles() throws Exception { + // Schedule a compaction. Replace those file groups and ensure compaction completes successfully. + HoodieWriteConfig cfg = getConfig(true); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg, true);) { + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + String firstInstantTime = "001"; + String secondInstantTime = "004"; + String compactionInstantTime = "005"; + String replaceInstantTime = "006"; + String fourthInstantTime = "007"; + + int numRecs = 2000; + + List records = dataGen.generateInserts(firstInstantTime, numRecs); + runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, + new ArrayList<>()); + + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); + scheduleCompaction(compactionInstantTime, client, cfg); + metaClient.reloadActiveTimeline(); + HoodieInstant pendingCompactionInstant = + metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); + assertEquals(compactionInstantTime, pendingCompactionInstant.getTimestamp(), "Pending Compaction instant has expected instant time"); + + Set fileGroupsBeforeReplace = getAllFileGroups(hoodieTable, dataGen.getPartitionPaths()); + // replace by using insertOverwrite + JavaRDD replaceRecords = jsc.parallelize(dataGen.generateInserts(replaceInstantTime, numRecs), 1); + client.startCommitWithTime(replaceInstantTime, HoodieTimeline.REPLACE_COMMIT_ACTION); + client.insertOverwrite(replaceRecords, replaceInstantTime); + + metaClient.reloadActiveTimeline(); + hoodieTable = getHoodieTable(metaClient, cfg); + Set newFileGroups = getAllFileGroups(hoodieTable, dataGen.getPartitionPaths()); + // make sure earlier file groups are not visible + assertEquals(0, newFileGroups.stream().filter(fg -> fileGroupsBeforeReplace.contains(fg)).count()); + + // compaction should run with associated file groups are replaced + executeCompactionWithReplacedFiles(compactionInstantTime, client, hoodieTable, cfg, dataGen.getPartitionPaths(), fileGroupsBeforeReplace); + } + } + + private Set getAllFileGroups(HoodieTable table, String[] partitions) { + return Arrays.stream(partitions).flatMap(partition -> table.getSliceView().getLatestFileSlices(partition) + .map(fg -> fg.getFileGroupId())).collect(Collectors.toSet()); + } }