1
0

[HUDI-1811] Fix TestHoodieRealtimeRecordReader (#2873)

Pass basePath with scheme 'file://' to HoodieRealtimeFileSplit
This commit is contained in:
Raymond Xu
2021-04-30 11:16:55 -07:00
committed by GitHub
parent 929eca43fe
commit faf3785a2d

View File

@@ -57,6 +57,8 @@ import org.apache.hadoop.mapred.RecordReader;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@@ -75,16 +77,16 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieRealtimeRecordReader { public class TestHoodieRealtimeRecordReader {
private static final String PARTITION_COLUMN = "datestr"; private static final String PARTITION_COLUMN = "datestr";
private JobConf jobConf; private JobConf baseJobConf;
private FileSystem fs; private FileSystem fs;
private Configuration hadoopConf; private Configuration hadoopConf;
@BeforeEach @BeforeEach
public void setUp() { public void setUp() {
jobConf = new JobConf();
jobConf.set(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(1024 * 1024));
hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
fs = FSUtils.getFs(basePath.toString(), hadoopConf); baseJobConf = new JobConf(hadoopConf);
baseJobConf.set(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(1024 * 1024));
fs = FSUtils.getFs(basePath.toString(), baseJobConf);
} }
@TempDir @TempDir
@@ -97,16 +99,6 @@ public class TestHoodieRealtimeRecordReader {
0); 0);
} }
@Test
public void testReader() throws Exception {
testReader(true);
}
@Test
public void testNonPartitionedReader() throws Exception {
testReader(false);
}
private void setHiveColumnNameProps(List<Schema.Field> fields, JobConf jobConf, boolean isPartitioned) { private void setHiveColumnNameProps(List<Schema.Field> fields, JobConf jobConf, boolean isPartitioned) {
String names = fields.stream().map(Field::name).collect(Collectors.joining(",")); String names = fields.stream().map(Field::name).collect(Collectors.joining(","));
String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
@@ -122,7 +114,9 @@ public class TestHoodieRealtimeRecordReader {
jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveOrderedColumnNames); jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveOrderedColumnNames);
} }
private void testReader(boolean partitioned) throws Exception { @ParameterizedTest
@ValueSource(booleans = {true, false})
public void testReader(boolean partitioned) throws Exception {
// initial commit // initial commit
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ);
@@ -133,7 +127,7 @@ public class TestHoodieRealtimeRecordReader {
HoodieTableType.MERGE_ON_READ); HoodieTableType.MERGE_ON_READ);
FileCreateUtils.createDeltaCommit(basePath.toString(), baseInstant); FileCreateUtils.createDeltaCommit(basePath.toString(), baseInstant);
// Add the paths // Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
List<Pair<String, Integer>> logVersionsWithAction = new ArrayList<>(); List<Pair<String, Integer>> logVersionsWithAction = new ArrayList<>();
logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 1)); logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 1));
@@ -171,15 +165,15 @@ public class TestHoodieRealtimeRecordReader {
// create a split with baseFile (parquet file written earlier) and new log file(s) // create a split with baseFile (parquet file written earlier) and new log file(s)
fileSlice.addLogFile(writer.getLogFile()); fileSlice.addLogFile(writer.getLogFile());
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + baseInstant + ".parquet"), 0, 1, jobConf), new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + baseInstant + ".parquet"), 0, 1, baseJobConf),
basePath.toString(), fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) basePath.toUri().toString(), fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
.map(h -> h.getPath().toString()).collect(Collectors.toList()), .map(h -> h.getPath().toString()).collect(Collectors.toList()),
instantTime); instantTime);
// create a RecordReader to be used by HoodieRealtimeRecordReader // create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader( RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader(
new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), jobConf, null); new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), baseJobConf, null);
JobConf jobConf = new JobConf(); JobConf jobConf = new JobConf(baseJobConf);
List<Schema.Field> fields = schema.getFields(); List<Schema.Field> fields = schema.getFields();
setHiveColumnNameProps(fields, jobConf, partitioned); setHiveColumnNameProps(fields, jobConf, partitioned);
@@ -222,7 +216,7 @@ public class TestHoodieRealtimeRecordReader {
HoodieTableType.MERGE_ON_READ); HoodieTableType.MERGE_ON_READ);
FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime); FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime);
// Add the paths // Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
// insert new records to log file // insert new records to log file
String newCommitTime = "101"; String newCommitTime = "101";
@@ -237,13 +231,13 @@ public class TestHoodieRealtimeRecordReader {
// create a split with baseFile (parquet file written earlier) and new log file(s) // create a split with baseFile (parquet file written earlier) and new log file(s)
String logFilePath = writer.getLogFile().getPath().toString(); String logFilePath = writer.getLogFile().getPath().toString();
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, jobConf), new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf),
basePath.toString(), Collections.singletonList(logFilePath), newCommitTime); basePath.toUri().toString(), Collections.singletonList(logFilePath), newCommitTime);
// create a RecordReader to be used by HoodieRealtimeRecordReader // create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader( RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader(
new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), jobConf, null); new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), baseJobConf, null);
JobConf jobConf = new JobConf(); JobConf jobConf = new JobConf(baseJobConf);
List<Schema.Field> fields = schema.getFields(); List<Schema.Field> fields = schema.getFields();
setHiveColumnNameProps(fields, jobConf, true); setHiveColumnNameProps(fields, jobConf, true);
// Enable merge skipping. // Enable merge skipping.
@@ -301,7 +295,7 @@ public class TestHoodieRealtimeRecordReader {
instantTime, HoodieTableType.MERGE_ON_READ); instantTime, HoodieTableType.MERGE_ON_READ);
InputFormatTestUtil.commit(basePath, instantTime); InputFormatTestUtil.commit(basePath, instantTime);
// Add the paths // Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
// update files or generate new log file // update files or generate new log file
String newCommitTime = "101"; String newCommitTime = "101";
@@ -315,13 +309,13 @@ public class TestHoodieRealtimeRecordReader {
// create a split with baseFile (parquet file written earlier) and new log file(s) // create a split with baseFile (parquet file written earlier) and new log file(s)
String logFilePath = writer.getLogFile().getPath().toString(); String logFilePath = writer.getLogFile().getPath().toString();
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, jobConf), new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf),
basePath.toString(), Collections.singletonList(logFilePath), newCommitTime); basePath.toUri().toString(), Collections.singletonList(logFilePath), newCommitTime);
// create a RecordReader to be used by HoodieRealtimeRecordReader // create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader( RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader(
new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), jobConf, null); new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), baseJobConf, null);
JobConf jobConf = new JobConf(); JobConf jobConf = new JobConf(baseJobConf);
List<Schema.Field> fields = schema.getFields(); List<Schema.Field> fields = schema.getFields();
setHiveColumnNameProps(fields, jobConf, true); setHiveColumnNameProps(fields, jobConf, true);
@@ -432,7 +426,7 @@ public class TestHoodieRealtimeRecordReader {
instantTime, HoodieTableType.MERGE_ON_READ); instantTime, HoodieTableType.MERGE_ON_READ);
InputFormatTestUtil.commit(basePath, instantTime); InputFormatTestUtil.commit(basePath, instantTime);
// Add the paths // Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
List<Field> firstSchemaFields = schema.getFields(); List<Field> firstSchemaFields = schema.getFields();
// update files and generate new log file but don't commit // update files and generate new log file but don't commit
@@ -456,13 +450,13 @@ public class TestHoodieRealtimeRecordReader {
// create a split with baseFile (parquet file written earlier) and new log file(s) // create a split with baseFile (parquet file written earlier) and new log file(s)
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
new FileSplit(new Path(partitionDir + "/fileid0_1_" + instantTime + ".parquet"), 0, 1, jobConf), new FileSplit(new Path(partitionDir + "/fileid0_1_" + instantTime + ".parquet"), 0, 1, baseJobConf),
basePath.toString(), logFilePaths, newCommitTime); basePath.toUri().toString(), logFilePaths, newCommitTime);
// create a RecordReader to be used by HoodieRealtimeRecordReader // create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader( RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader(
new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), jobConf, null); new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), baseJobConf, null);
JobConf jobConf = new JobConf(); JobConf jobConf = new JobConf(baseJobConf);
List<Schema.Field> fields = schema.getFields(); List<Schema.Field> fields = schema.getFields();
assertFalse(firstSchemaFields.containsAll(fields)); assertFalse(firstSchemaFields.containsAll(fields));