1
0

[HUDI-1304] Add unit test for testing compaction on replaced file groups (#2150)

This commit is contained in:
satishkotha
2020-10-12 16:48:29 -07:00
committed by GitHub
parent c5e10d668f
commit 0d407342ef
2 changed files with 78 additions and 0 deletions

View File

@@ -55,6 +55,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
@@ -200,6 +201,31 @@ public class CompactionTestBase extends HoodieClientTestBase {
}
protected void executeCompactionWithReplacedFiles(String compactionInstantTime, SparkRDDWriteClient client, HoodieTable table,
HoodieWriteConfig cfg, String[] partitions, Set<HoodieFileGroupId> replacedFileIds) throws IOException {
client.compact(compactionInstantTime);
List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
assertTrue(fileSliceList.stream().findAny().isPresent(), "Ensure latest file-slices are not empty");
assertFalse(fileSliceList.stream()
.anyMatch(fs -> replacedFileIds.contains(fs.getFileGroupId())),
"Compacted files should not show up in latest slices");
// verify that there is a commit
table = getHoodieTable(new HoodieTableMetaClient(hadoopConf, cfg.getBasePath(), true), cfg);
HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants();
// verify compaction commit is visible in timeline
assertTrue(timeline.filterCompletedInstants().getInstants()
.filter(instant -> compactionInstantTime.equals(instant.getTimestamp())).findFirst().isPresent());
for (String partition: partitions) {
table.getSliceView().getLatestFileSlicesBeforeOrOn(partition, compactionInstantTime, true).forEach(fs -> {
// verify that all log files are merged
assertEquals(0, fs.getLogFiles().count());
assertTrue(fs.getBaseFile().isPresent());
});
}
}
protected List<WriteStatus> createNextDeltaCommit(String instantTime, List<HoodieRecord> records, SparkRDDWriteClient client,
HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, boolean skipCommit) {
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);

View File

@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -30,11 +31,14 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -332,4 +336,52 @@ public class TestAsyncCompaction extends CompactionTestBase {
executeCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, true);
}
}
@Test
public void testCompactionOnReplacedFiles() throws Exception {
// Schedule a compaction. Replace those file groups and ensure compaction completes successfully.
HoodieWriteConfig cfg = getConfig(true);
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg, true);) {
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
String secondInstantTime = "004";
String compactionInstantTime = "005";
String replaceInstantTime = "006";
String fourthInstantTime = "007";
int numRecs = 2000;
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
scheduleCompaction(compactionInstantTime, client, cfg);
metaClient.reloadActiveTimeline();
HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertEquals(compactionInstantTime, pendingCompactionInstant.getTimestamp(), "Pending Compaction instant has expected instant time");
Set<HoodieFileGroupId> fileGroupsBeforeReplace = getAllFileGroups(hoodieTable, dataGen.getPartitionPaths());
// replace by using insertOverwrite
JavaRDD<HoodieRecord> replaceRecords = jsc.parallelize(dataGen.generateInserts(replaceInstantTime, numRecs), 1);
client.startCommitWithTime(replaceInstantTime, HoodieTimeline.REPLACE_COMMIT_ACTION);
client.insertOverwrite(replaceRecords, replaceInstantTime);
metaClient.reloadActiveTimeline();
hoodieTable = getHoodieTable(metaClient, cfg);
Set<HoodieFileGroupId> newFileGroups = getAllFileGroups(hoodieTable, dataGen.getPartitionPaths());
// make sure earlier file groups are not visible
assertEquals(0, newFileGroups.stream().filter(fg -> fileGroupsBeforeReplace.contains(fg)).count());
// compaction should run with associated file groups are replaced
executeCompactionWithReplacedFiles(compactionInstantTime, client, hoodieTable, cfg, dataGen.getPartitionPaths(), fileGroupsBeforeReplace);
}
}
private Set<HoodieFileGroupId> getAllFileGroups(HoodieTable table, String[] partitions) {
return Arrays.stream(partitions).flatMap(partition -> table.getSliceView().getLatestFileSlices(partition)
.map(fg -> fg.getFileGroupId())).collect(Collectors.toSet());
}
}