diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index 5bcfbe94b..d8f0a01a9 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -79,9 +79,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i return timeline; } - @Override - public RecordReader getRecordReader(final InputSplit split, final JobConf jobConf, - final Reporter reporter) throws IOException { + void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf jobConf) { // Hive on Spark invokes multiple getRecordReaders from different threads in the same spark task (and hence the // same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is shared across all threads, is at the // risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible @@ -101,22 +99,27 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i // TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction // time. HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConf); - HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf); - + if (!realtimeSplit.getDeltaLogPaths().isEmpty()) { + HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf); + } this.conf = jobConf; this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true"); } } } + } - LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) - + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); - + @Override + public RecordReader getRecordReader(final InputSplit split, final JobConf jobConf, + final Reporter reporter) throws IOException { // sanity check ValidationUtils.checkArgument(split instanceof RealtimeSplit, "HoodieRealtimeRecordReader can only work on RealtimeSplit and not with " + split); - - return new HoodieRealtimeRecordReader((RealtimeSplit) split, jobConf, + RealtimeSplit realtimeSplit = (RealtimeSplit) split; + addProjectionToJobConf(realtimeSplit, jobConf); + LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); + return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, super.getRecordReader(split, jobConf, reporter)); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index 042199fd5..b710b599b 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -84,7 +84,8 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader if (!result) { // if the result is false, then there are no more records return false; - } else { + } + if (!deltaRecordMap.isEmpty()) { // TODO(VC): Right now, we assume all records in log, have a matching base record. (which // would be true until we have a way to index logs too) // return from delta records map if we have some match. @@ -134,8 +135,8 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader throw new RuntimeException(errMsg, re); } } - return true; } + return true; } @Override diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java index 871f7224d..b10e778f3 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java @@ -45,6 +45,8 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.IOException; import java.nio.ByteBuffer; @@ -58,6 +60,7 @@ import java.util.TreeMap; import java.util.stream.Collectors; public class HoodieRealtimeRecordReaderUtils { + private static final Logger LOG = LogManager.getLogger(HoodieRealtimeRecordReaderUtils.class); /** * Reads the schema from the base file. @@ -246,10 +249,10 @@ public class HoodieRealtimeRecordReaderUtils { // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188} // Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229} - String[] fieldOrdersWithDups = fieldOrderCsv.split(","); + String[] fieldOrdersWithDups = fieldOrderCsv.isEmpty() ? new String[0] : fieldOrderCsv.split(","); Set fieldOrdersSet = new LinkedHashSet<>(Arrays.asList(fieldOrdersWithDups)); String[] fieldOrders = fieldOrdersSet.toArray(new String[0]); - List fieldNames = Arrays.stream(fieldNameCsv.split(",")) + List fieldNames = fieldNameCsv.isEmpty() ? new ArrayList<>() : Arrays.stream(fieldNameCsv.split(",")) .filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList()); Set fieldNamesSet = new LinkedHashSet<>(fieldNames); // Hive does not provide ids for partitioning fields, so check for lengths excluding that.