From a7fd33162447b4c2816d1910bb166721cff1ed31 Mon Sep 17 00:00:00 2001 From: Satish Kotha Date: Mon, 8 Jun 2020 12:09:21 -0700 Subject: [PATCH] Add unit test for snapshot reads in hadoop-mr --- .../table/TestHoodieMergeOnReadTable.java | 53 ++++++++++++++----- .../hadoop/TestHoodieParquetInputFormat.java | 33 ++++++++++++ 2 files changed, 73 insertions(+), 13 deletions(-) diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index 53de65ec3..6092fd8b6 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -90,6 +90,8 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { + private HoodieParquetInputFormat roSnapshotInputFormat; + private JobConf roSnapshotJobConf; private HoodieParquetInputFormat roInputFormat; private JobConf roJobConf; @@ -108,6 +110,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { initTestDataGenerator(); // initialize parquet input format + roSnapshotInputFormat = new HoodieParquetInputFormat(); + roSnapshotJobConf = new JobConf(jsc.hadoopConfiguration()); + roSnapshotInputFormat.setConf(roSnapshotJobConf); + roInputFormat = new HoodieParquetInputFormat(); roJobConf = new JobConf(hadoopConf); roInputFormat.setConf(roJobConf); @@ -185,13 +191,17 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { insertAndGetFilePaths(records001, client, cfg, commitTime1); // verify only one parquet file shows up with commit time 001 + FileStatus[] snapshotROFiles = getROSnapshotFiles(partitionPath); + validateFiles(partitionPath,1, snapshotROFiles, roSnapshotInputFormat, + roSnapshotJobConf,200, commitTime1); + FileStatus[] incrementalROFiles = getROIncrementalFiles(partitionPath, true); - validateIncrementalFiles(partitionPath, 1, incrementalROFiles, roInputFormat, + validateFiles(partitionPath, 1, incrementalROFiles, roInputFormat, roJobConf,200, commitTime1); Path firstFilePath = incrementalROFiles[0].getPath(); FileStatus[] incrementalRTFiles = getRTIncrementalFiles(partitionPath); - validateIncrementalFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat, + validateFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat, rtJobConf,200, commitTime1); assertEquals(firstFilePath, incrementalRTFiles[0].getPath()); @@ -205,13 +215,13 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { // verify RO incremental reads - only one parquet file shows up because updates to into log files incrementalROFiles = getROIncrementalFiles(partitionPath, false); - validateIncrementalFiles(partitionPath, 1, incrementalROFiles, roInputFormat, + validateFiles(partitionPath, 1, incrementalROFiles, roInputFormat, roJobConf, 200, commitTime1); assertEquals(firstFilePath, incrementalROFiles[0].getPath()); // verify RT incremental reads includes updates also incrementalRTFiles = getRTIncrementalFiles(partitionPath); - validateIncrementalFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat, + validateFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat, rtJobConf, 200, commitTime1, updateTime); // request compaction, but do not perform compaction @@ -220,12 +230,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { // verify RO incremental reads - only one parquet file shows up because updates go into log files incrementalROFiles = getROIncrementalFiles(partitionPath, true); - validateIncrementalFiles(partitionPath,1, incrementalROFiles, roInputFormat, + validateFiles(partitionPath,1, incrementalROFiles, roInputFormat, roJobConf, 200, commitTime1); // verify RT incremental reads includes updates also incrementalRTFiles = getRTIncrementalFiles(partitionPath); - validateIncrementalFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat, + validateFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat, rtJobConf, 200, commitTime1, updateTime); // write 3 - more inserts @@ -234,29 +244,39 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { client.startCommitWithTime(insertsTime); insertAndGetFilePaths(records006, client, cfg, insertsTime); + // verify new write shows up in snapshot mode even though there is pending compaction + snapshotROFiles = getROSnapshotFiles(partitionPath); + validateFiles(partitionPath, 2, snapshotROFiles, roSnapshotInputFormat, + roSnapshotJobConf,400, commitTime1, insertsTime); + incrementalROFiles = getROIncrementalFiles(partitionPath, true); assertEquals(firstFilePath, incrementalROFiles[0].getPath()); // verify 006 does not show up in RO mode because of pending compaction - validateIncrementalFiles(partitionPath, 1, incrementalROFiles, roInputFormat, + validateFiles(partitionPath, 1, incrementalROFiles, roInputFormat, roJobConf, 200, commitTime1); // verify that if stopAtCompaction is disabled, inserts from "insertsTime" show up incrementalROFiles = getROIncrementalFiles(partitionPath, false); - validateIncrementalFiles(partitionPath,2, incrementalROFiles, roInputFormat, + validateFiles(partitionPath,2, incrementalROFiles, roInputFormat, roJobConf, 400, commitTime1, insertsTime); // verify 006 shows up in RT views incrementalRTFiles = getRTIncrementalFiles(partitionPath); - validateIncrementalFiles(partitionPath, 2, incrementalRTFiles, rtInputFormat, + validateFiles(partitionPath, 2, incrementalRTFiles, rtInputFormat, rtJobConf, 400, commitTime1, updateTime, insertsTime); // perform the scheduled compaction client.compact(compactionCommitTime); + // verify new write shows up in snapshot mode after compaction is complete + snapshotROFiles = getROSnapshotFiles(partitionPath); + validateFiles(partitionPath,2, snapshotROFiles, roSnapshotInputFormat, + roSnapshotJobConf,400, commitTime1, compactionCommitTime, insertsTime); + incrementalROFiles = getROIncrementalFiles(partitionPath, "002", -1, true); assertTrue(incrementalROFiles.length == 2); // verify 006 shows up because of pending compaction - validateIncrementalFiles(partitionPath, 2, incrementalROFiles, roInputFormat, + validateFiles(partitionPath, 2, incrementalROFiles, roInputFormat, roJobConf, 400, commitTime1, compactionCommitTime, insertsTime); } } @@ -1435,6 +1455,13 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { return HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); } + private FileStatus[] getROSnapshotFiles(String partitionPath) + throws Exception { + HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); + FileInputFormat.setInputPaths(roSnapshotJobConf, basePath + "/" + partitionPath); + return roSnapshotInputFormat.listStatus(roSnapshotJobConf); + } + private FileStatus[] getROIncrementalFiles(String partitionPath, boolean stopAtCompaction) throws Exception { return getROIncrementalFiles(partitionPath, "000", -1, stopAtCompaction); @@ -1479,9 +1506,9 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { jobConf.setBoolean(stopAtCompactionPropName, stopAtCompaction); } - private void validateIncrementalFiles(String partitionPath, int expectedNumFiles, - FileStatus[] files, HoodieParquetInputFormat inputFormat, - JobConf jobConf, int expectedRecords, String... expectedCommits) { + private void validateFiles(String partitionPath, int expectedNumFiles, + FileStatus[] files, HoodieParquetInputFormat inputFormat, + JobConf jobConf, int expectedRecords, String... expectedCommits) { assertEquals(expectedNumFiles, files.length); Set expectedCommitsSet = Arrays.stream(expectedCommits).collect(Collectors.toSet()); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index aa9d828c6..6e413cc39 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -185,6 +185,39 @@ public class TestHoodieParquetInputFormat { + "files from 200 commit", files, "100", 5); } + @Test + public void testInputFormatWithCompaction() throws IOException { + // initial commit + File partitionDir = InputFormatTestUtil.prepareTable(basePath, 10, "100"); + InputFormatTestUtil.commit(basePath, "100"); + + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + + InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 10); + assertEquals(10, inputSplits.length); + + FileStatus[] files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length); + + // simulate compaction requested + createCompactionFile(basePath, "125"); + + // add inserts after compaction timestamp + InputFormatTestUtil.simulateInserts(partitionDir, "fileId2", 5, "200"); + InputFormatTestUtil.commit(basePath, "200"); + + // verify snapshot reads show all new inserts even though there is pending compaction + files = inputFormat.listStatus(jobConf); + assertEquals(15, files.length); + + // verify that incremental reads do NOT show inserts after compaction timestamp + InputFormatTestUtil.setupIncremental(jobConf, "100", 10); + files = inputFormat.listStatus(jobConf); + assertEquals(0, files.length, + "We should exclude commit 200 when there is a pending compaction at 150"); + } + @Test public void testIncrementalSimple() throws IOException { // initial commit