1
0

Add unit test for snapshot reads in hadoop-mr

This commit is contained in:
Satish Kotha
2020-06-08 12:09:21 -07:00
committed by n3nash
parent df2e0c760e
commit a7fd331624
2 changed files with 73 additions and 13 deletions

View File

@@ -90,6 +90,8 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
private HoodieParquetInputFormat roSnapshotInputFormat;
private JobConf roSnapshotJobConf;
private HoodieParquetInputFormat roInputFormat;
private JobConf roJobConf;
@@ -108,6 +110,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
initTestDataGenerator();
// initialize parquet input format
roSnapshotInputFormat = new HoodieParquetInputFormat();
roSnapshotJobConf = new JobConf(jsc.hadoopConfiguration());
roSnapshotInputFormat.setConf(roSnapshotJobConf);
roInputFormat = new HoodieParquetInputFormat();
roJobConf = new JobConf(hadoopConf);
roInputFormat.setConf(roJobConf);
@@ -185,13 +191,17 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
insertAndGetFilePaths(records001, client, cfg, commitTime1);
// verify only one parquet file shows up with commit time 001
FileStatus[] snapshotROFiles = getROSnapshotFiles(partitionPath);
validateFiles(partitionPath,1, snapshotROFiles, roSnapshotInputFormat,
roSnapshotJobConf,200, commitTime1);
FileStatus[] incrementalROFiles = getROIncrementalFiles(partitionPath, true);
validateIncrementalFiles(partitionPath, 1, incrementalROFiles, roInputFormat,
validateFiles(partitionPath, 1, incrementalROFiles, roInputFormat,
roJobConf,200, commitTime1);
Path firstFilePath = incrementalROFiles[0].getPath();
FileStatus[] incrementalRTFiles = getRTIncrementalFiles(partitionPath);
validateIncrementalFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat,
validateFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat,
rtJobConf,200, commitTime1);
assertEquals(firstFilePath, incrementalRTFiles[0].getPath());
@@ -205,13 +215,13 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// verify RO incremental reads - only one parquet file shows up because updates to into log files
incrementalROFiles = getROIncrementalFiles(partitionPath, false);
validateIncrementalFiles(partitionPath, 1, incrementalROFiles, roInputFormat,
validateFiles(partitionPath, 1, incrementalROFiles, roInputFormat,
roJobConf, 200, commitTime1);
assertEquals(firstFilePath, incrementalROFiles[0].getPath());
// verify RT incremental reads includes updates also
incrementalRTFiles = getRTIncrementalFiles(partitionPath);
validateIncrementalFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat,
validateFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat,
rtJobConf, 200, commitTime1, updateTime);
// request compaction, but do not perform compaction
@@ -220,12 +230,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// verify RO incremental reads - only one parquet file shows up because updates go into log files
incrementalROFiles = getROIncrementalFiles(partitionPath, true);
validateIncrementalFiles(partitionPath,1, incrementalROFiles, roInputFormat,
validateFiles(partitionPath,1, incrementalROFiles, roInputFormat,
roJobConf, 200, commitTime1);
// verify RT incremental reads includes updates also
incrementalRTFiles = getRTIncrementalFiles(partitionPath);
validateIncrementalFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat,
validateFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat,
rtJobConf, 200, commitTime1, updateTime);
// write 3 - more inserts
@@ -234,29 +244,39 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
client.startCommitWithTime(insertsTime);
insertAndGetFilePaths(records006, client, cfg, insertsTime);
// verify new write shows up in snapshot mode even though there is pending compaction
snapshotROFiles = getROSnapshotFiles(partitionPath);
validateFiles(partitionPath, 2, snapshotROFiles, roSnapshotInputFormat,
roSnapshotJobConf,400, commitTime1, insertsTime);
incrementalROFiles = getROIncrementalFiles(partitionPath, true);
assertEquals(firstFilePath, incrementalROFiles[0].getPath());
// verify 006 does not show up in RO mode because of pending compaction
validateIncrementalFiles(partitionPath, 1, incrementalROFiles, roInputFormat,
validateFiles(partitionPath, 1, incrementalROFiles, roInputFormat,
roJobConf, 200, commitTime1);
// verify that if stopAtCompaction is disabled, inserts from "insertsTime" show up
incrementalROFiles = getROIncrementalFiles(partitionPath, false);
validateIncrementalFiles(partitionPath,2, incrementalROFiles, roInputFormat,
validateFiles(partitionPath,2, incrementalROFiles, roInputFormat,
roJobConf, 400, commitTime1, insertsTime);
// verify 006 shows up in RT views
incrementalRTFiles = getRTIncrementalFiles(partitionPath);
validateIncrementalFiles(partitionPath, 2, incrementalRTFiles, rtInputFormat,
validateFiles(partitionPath, 2, incrementalRTFiles, rtInputFormat,
rtJobConf, 400, commitTime1, updateTime, insertsTime);
// perform the scheduled compaction
client.compact(compactionCommitTime);
// verify new write shows up in snapshot mode after compaction is complete
snapshotROFiles = getROSnapshotFiles(partitionPath);
validateFiles(partitionPath,2, snapshotROFiles, roSnapshotInputFormat,
roSnapshotJobConf,400, commitTime1, compactionCommitTime, insertsTime);
incrementalROFiles = getROIncrementalFiles(partitionPath, "002", -1, true);
assertTrue(incrementalROFiles.length == 2);
// verify 006 shows up because of pending compaction
validateIncrementalFiles(partitionPath, 2, incrementalROFiles, roInputFormat,
validateFiles(partitionPath, 2, incrementalROFiles, roInputFormat,
roJobConf, 400, commitTime1, compactionCommitTime, insertsTime);
}
}
@@ -1435,6 +1455,13 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
return HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
}
private FileStatus[] getROSnapshotFiles(String partitionPath)
throws Exception {
HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
FileInputFormat.setInputPaths(roSnapshotJobConf, basePath + "/" + partitionPath);
return roSnapshotInputFormat.listStatus(roSnapshotJobConf);
}
private FileStatus[] getROIncrementalFiles(String partitionPath, boolean stopAtCompaction)
throws Exception {
return getROIncrementalFiles(partitionPath, "000", -1, stopAtCompaction);
@@ -1479,9 +1506,9 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
jobConf.setBoolean(stopAtCompactionPropName, stopAtCompaction);
}
private void validateIncrementalFiles(String partitionPath, int expectedNumFiles,
FileStatus[] files, HoodieParquetInputFormat inputFormat,
JobConf jobConf, int expectedRecords, String... expectedCommits) {
private void validateFiles(String partitionPath, int expectedNumFiles,
FileStatus[] files, HoodieParquetInputFormat inputFormat,
JobConf jobConf, int expectedRecords, String... expectedCommits) {
assertEquals(expectedNumFiles, files.length);
Set<String> expectedCommitsSet = Arrays.stream(expectedCommits).collect(Collectors.toSet());

View File

@@ -185,6 +185,39 @@ public class TestHoodieParquetInputFormat {
+ "files from 200 commit", files, "100", 5);
}
@Test
public void testInputFormatWithCompaction() throws IOException {
// initial commit
File partitionDir = InputFormatTestUtil.prepareTable(basePath, 10, "100");
InputFormatTestUtil.commit(basePath, "100");
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 10);
assertEquals(10, inputSplits.length);
FileStatus[] files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length);
// simulate compaction requested
createCompactionFile(basePath, "125");
// add inserts after compaction timestamp
InputFormatTestUtil.simulateInserts(partitionDir, "fileId2", 5, "200");
InputFormatTestUtil.commit(basePath, "200");
// verify snapshot reads show all new inserts even though there is pending compaction
files = inputFormat.listStatus(jobConf);
assertEquals(15, files.length);
// verify that incremental reads do NOT show inserts after compaction timestamp
InputFormatTestUtil.setupIncremental(jobConf, "100", 10);
files = inputFormat.listStatus(jobConf);
assertEquals(0, files.length,
"We should exclude commit 200 when there is a pending compaction at 150");
}
@Test
public void testIncrementalSimple() throws IOException {
// initial commit