diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index b5571dc37..001b1ac57 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -57,6 +57,8 @@ import org.apache.hadoop.mapred.RecordReader; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.io.IOException; @@ -75,16 +77,16 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieRealtimeRecordReader { private static final String PARTITION_COLUMN = "datestr"; - private JobConf jobConf; + private JobConf baseJobConf; private FileSystem fs; private Configuration hadoopConf; @BeforeEach public void setUp() { - jobConf = new JobConf(); - jobConf.set(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(1024 * 1024)); hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); - fs = FSUtils.getFs(basePath.toString(), hadoopConf); + baseJobConf = new JobConf(hadoopConf); + baseJobConf.set(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(1024 * 1024)); + fs = FSUtils.getFs(basePath.toString(), baseJobConf); } @TempDir @@ -97,16 +99,6 @@ public class TestHoodieRealtimeRecordReader { 0); } - @Test - public void testReader() throws Exception { - testReader(true); - } - - @Test - public void testNonPartitionedReader() throws Exception { - testReader(false); - } - private void setHiveColumnNameProps(List fields, JobConf jobConf, boolean isPartitioned) { String names = fields.stream().map(Field::name).collect(Collectors.joining(",")); String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); @@ -122,7 +114,9 @@ public class TestHoodieRealtimeRecordReader { jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveOrderedColumnNames); } - private void testReader(boolean partitioned) throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testReader(boolean partitioned) throws Exception { // initial commit Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); @@ -133,7 +127,7 @@ public class TestHoodieRealtimeRecordReader { HoodieTableType.MERGE_ON_READ); FileCreateUtils.createDeltaCommit(basePath.toString(), baseInstant); // Add the paths - FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); List> logVersionsWithAction = new ArrayList<>(); logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 1)); @@ -171,15 +165,15 @@ public class TestHoodieRealtimeRecordReader { // create a split with baseFile (parquet file written earlier) and new log file(s) fileSlice.addLogFile(writer.getLogFile()); HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( - new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + baseInstant + ".parquet"), 0, 1, jobConf), - basePath.toString(), fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) + new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + baseInstant + ".parquet"), 0, 1, baseJobConf), + basePath.toUri().toString(), fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) .map(h -> h.getPath().toString()).collect(Collectors.toList()), instantTime); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( - new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), jobConf, null); - JobConf jobConf = new JobConf(); + new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), baseJobConf, null); + JobConf jobConf = new JobConf(baseJobConf); List fields = schema.getFields(); setHiveColumnNameProps(fields, jobConf, partitioned); @@ -222,7 +216,7 @@ public class TestHoodieRealtimeRecordReader { HoodieTableType.MERGE_ON_READ); FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime); // Add the paths - FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); // insert new records to log file String newCommitTime = "101"; @@ -237,13 +231,13 @@ public class TestHoodieRealtimeRecordReader { // create a split with baseFile (parquet file written earlier) and new log file(s) String logFilePath = writer.getLogFile().getPath().toString(); HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( - new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, jobConf), - basePath.toString(), Collections.singletonList(logFilePath), newCommitTime); + new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf), + basePath.toUri().toString(), Collections.singletonList(logFilePath), newCommitTime); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( - new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), jobConf, null); - JobConf jobConf = new JobConf(); + new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), baseJobConf, null); + JobConf jobConf = new JobConf(baseJobConf); List fields = schema.getFields(); setHiveColumnNameProps(fields, jobConf, true); // Enable merge skipping. @@ -301,7 +295,7 @@ public class TestHoodieRealtimeRecordReader { instantTime, HoodieTableType.MERGE_ON_READ); InputFormatTestUtil.commit(basePath, instantTime); // Add the paths - FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); // update files or generate new log file String newCommitTime = "101"; @@ -315,13 +309,13 @@ public class TestHoodieRealtimeRecordReader { // create a split with baseFile (parquet file written earlier) and new log file(s) String logFilePath = writer.getLogFile().getPath().toString(); HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( - new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, jobConf), - basePath.toString(), Collections.singletonList(logFilePath), newCommitTime); + new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf), + basePath.toUri().toString(), Collections.singletonList(logFilePath), newCommitTime); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( - new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), jobConf, null); - JobConf jobConf = new JobConf(); + new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), baseJobConf, null); + JobConf jobConf = new JobConf(baseJobConf); List fields = schema.getFields(); setHiveColumnNameProps(fields, jobConf, true); @@ -432,7 +426,7 @@ public class TestHoodieRealtimeRecordReader { instantTime, HoodieTableType.MERGE_ON_READ); InputFormatTestUtil.commit(basePath, instantTime); // Add the paths - FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); List firstSchemaFields = schema.getFields(); // update files and generate new log file but don't commit @@ -456,13 +450,13 @@ public class TestHoodieRealtimeRecordReader { // create a split with baseFile (parquet file written earlier) and new log file(s) HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( - new FileSplit(new Path(partitionDir + "/fileid0_1_" + instantTime + ".parquet"), 0, 1, jobConf), - basePath.toString(), logFilePaths, newCommitTime); + new FileSplit(new Path(partitionDir + "/fileid0_1_" + instantTime + ".parquet"), 0, 1, baseJobConf), + basePath.toUri().toString(), logFilePaths, newCommitTime); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( - new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), jobConf, null); - JobConf jobConf = new JobConf(); + new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), baseJobConf, null); + JobConf jobConf = new JobConf(baseJobConf); List fields = schema.getFields(); assertFalse(firstSchemaFields.containsAll(fields));