diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java index b48f89146..0dfef07bc 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -333,8 +333,11 @@ public abstract class AbstractRealtimeRecordReader { } // Add partitioning fields to writer schema for resulting row to contain null values for these fields - List partitioningFields = Arrays.stream( - jobConf.get("partition_columns", "").split(",")).collect(Collectors.toList()); + + String partitionFields = jobConf.get("partition_columns", ""); + List partitioningFields = + partitionFields.length() > 0 ? Arrays.stream(partitionFields.split(",")).collect(Collectors.toList()) + : new ArrayList<>(); writerSchema = addPartitionFields(writerSchema, partitioningFields); List projectionFields = orderFields( diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java index 4088afdd0..351a306fe 100644 --- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java @@ -99,11 +99,26 @@ public class InputFormatTestUtil { basePath.create(); HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.getRoot().toString()); File partitionPath = basePath.newFolder("2016", "05", "01"); + createData(schema, partitionPath, numberOfFiles, numberOfRecords, commitNumber); + return partitionPath; + } + + public static File prepareNonPartitionedParquetDataset(TemporaryFolder baseDir, Schema schema, + int numberOfFiles, int numberOfRecords, String commitNumber) throws IOException { + baseDir.create(); + HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), baseDir.getRoot().toString()); + File basePath = baseDir.getRoot(); + createData(schema, basePath, numberOfFiles, numberOfRecords, commitNumber); + return basePath; + } + + private static void createData(Schema schema, + File partitionPath, int numberOfFiles, int numberOfRecords, String commitNumber) + throws IOException { AvroParquetWriter parquetWriter; for (int i = 0; i < numberOfFiles; i++) { String fileId = FSUtils.makeDataFileName(commitNumber, 1, "fileid" + i); File dataFile = new File(partitionPath, fileId); - // dataFile.createNewFile(); parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()), schema); try { for (GenericRecord record : generateAvroRecords(schema, numberOfRecords, commitNumber, @@ -114,8 +129,6 @@ public class InputFormatTestUtil { parquetWriter.close(); } } - return partitionPath; - } private static Iterable generateAvroRecords(Schema schema, diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java index b3b095592..fbd635e31 100644 --- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java @@ -109,13 +109,23 @@ public class HoodieRealtimeRecordReaderTest { @Test public void testReader() throws Exception { + testReader(true); + } + + @Test + public void testNonPartitionedReader() throws Exception { + testReader(false); + } + + public void testReader(boolean partitioned) throws Exception { // initial commit Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); HoodieTestUtils.initTableType(hadoopConf, basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ); String commitTime = "100"; - File partitionDir = InputFormatTestUtil - .prepareParquetDataset(basePath, schema, 1, 100, commitTime); + File partitionDir = + partitioned ? InputFormatTestUtil.prepareParquetDataset(basePath, schema, 1, 100, commitTime) + : InputFormatTestUtil.prepareNonPartitionedParquetDataset(basePath, schema, 1, 100, commitTime); InputFormatTestUtil.commit(basePath, commitTime); // Add the paths FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); @@ -146,7 +156,9 @@ public class HoodieRealtimeRecordReaderTest { .collect(Collectors.joining(",")); jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions); - jobConf.set("partition_columns", "datestr"); + if (partitioned) { + jobConf.set("partition_columns", "datestr"); + } //validate record reader compaction HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);