1
0

Fix Hoodie Record Reader to work with non-partitioned dataset

This commit is contained in:
Balaji Varadarajan
2019-02-11 11:39:29 -08:00
committed by vinoth chandar
parent 3a0044216c
commit defcf6a0b9
3 changed files with 36 additions and 8 deletions

View File

@@ -333,8 +333,11 @@ public abstract class AbstractRealtimeRecordReader {
}
// Add partitioning fields to writer schema for resulting row to contain null values for these fields
List<String> partitioningFields = Arrays.stream(
jobConf.get("partition_columns", "").split(",")).collect(Collectors.toList());
String partitionFields = jobConf.get("partition_columns", "");
List<String> partitioningFields =
partitionFields.length() > 0 ? Arrays.stream(partitionFields.split(",")).collect(Collectors.toList())
: new ArrayList<>();
writerSchema = addPartitionFields(writerSchema, partitioningFields);
List<String> projectionFields = orderFields(

View File

@@ -99,11 +99,26 @@ public class InputFormatTestUtil {
basePath.create();
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.getRoot().toString());
File partitionPath = basePath.newFolder("2016", "05", "01");
createData(schema, partitionPath, numberOfFiles, numberOfRecords, commitNumber);
return partitionPath;
}
public static File prepareNonPartitionedParquetDataset(TemporaryFolder baseDir, Schema schema,
int numberOfFiles, int numberOfRecords, String commitNumber) throws IOException {
baseDir.create();
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), baseDir.getRoot().toString());
File basePath = baseDir.getRoot();
createData(schema, basePath, numberOfFiles, numberOfRecords, commitNumber);
return basePath;
}
private static void createData(Schema schema,
File partitionPath, int numberOfFiles, int numberOfRecords, String commitNumber)
throws IOException {
AvroParquetWriter parquetWriter;
for (int i = 0; i < numberOfFiles; i++) {
String fileId = FSUtils.makeDataFileName(commitNumber, 1, "fileid" + i);
File dataFile = new File(partitionPath, fileId);
// dataFile.createNewFile();
parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()), schema);
try {
for (GenericRecord record : generateAvroRecords(schema, numberOfRecords, commitNumber,
@@ -114,8 +129,6 @@ public class InputFormatTestUtil {
parquetWriter.close();
}
}
return partitionPath;
}
private static Iterable<? extends GenericRecord> generateAvroRecords(Schema schema,

View File

@@ -109,13 +109,23 @@ public class HoodieRealtimeRecordReaderTest {
@Test
public void testReader() throws Exception {
testReader(true);
}
@Test
public void testNonPartitionedReader() throws Exception {
testReader(false);
}
public void testReader(boolean partitioned) throws Exception {
// initial commit
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
HoodieTestUtils.initTableType(hadoopConf, basePath.getRoot().getAbsolutePath(),
HoodieTableType.MERGE_ON_READ);
String commitTime = "100";
File partitionDir = InputFormatTestUtil
.prepareParquetDataset(basePath, schema, 1, 100, commitTime);
File partitionDir =
partitioned ? InputFormatTestUtil.prepareParquetDataset(basePath, schema, 1, 100, commitTime)
: InputFormatTestUtil.prepareNonPartitionedParquetDataset(basePath, schema, 1, 100, commitTime);
InputFormatTestUtil.commit(basePath, commitTime);
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
@@ -146,7 +156,9 @@ public class HoodieRealtimeRecordReaderTest {
.collect(Collectors.joining(","));
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
jobConf.set("partition_columns", "datestr");
if (partitioned) {
jobConf.set("partition_columns", "datestr");
}
//validate record reader compaction
HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);