[HUDI-892] RealtimeParquetInputFormat skip adding projection columns if there are no log files (#2190)
* [HUDI-892] RealtimeParquetInputFormat skip adding projection columns if there are no log files * [HUDI-892] for test * [HUDI-892] fix bug generate array from split * [HUDI-892] revert test log
This commit is contained in:
@@ -79,9 +79,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
|
|||||||
return timeline;
|
return timeline;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf jobConf) {
|
||||||
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf jobConf,
|
|
||||||
final Reporter reporter) throws IOException {
|
|
||||||
// Hive on Spark invokes multiple getRecordReaders from different threads in the same spark task (and hence the
|
// Hive on Spark invokes multiple getRecordReaders from different threads in the same spark task (and hence the
|
||||||
// same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is shared across all threads, is at the
|
// same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is shared across all threads, is at the
|
||||||
// risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible
|
// risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible
|
||||||
@@ -101,22 +99,27 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
|
|||||||
// TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction
|
// TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction
|
||||||
// time.
|
// time.
|
||||||
HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConf);
|
HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConf);
|
||||||
|
if (!realtimeSplit.getDeltaLogPaths().isEmpty()) {
|
||||||
HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf);
|
HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf);
|
||||||
|
}
|
||||||
this.conf = jobConf;
|
this.conf = jobConf;
|
||||||
this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");
|
this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
|
@Override
|
||||||
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
|
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf jobConf,
|
||||||
|
final Reporter reporter) throws IOException {
|
||||||
// sanity check
|
// sanity check
|
||||||
ValidationUtils.checkArgument(split instanceof RealtimeSplit,
|
ValidationUtils.checkArgument(split instanceof RealtimeSplit,
|
||||||
"HoodieRealtimeRecordReader can only work on RealtimeSplit and not with " + split);
|
"HoodieRealtimeRecordReader can only work on RealtimeSplit and not with " + split);
|
||||||
|
RealtimeSplit realtimeSplit = (RealtimeSplit) split;
|
||||||
return new HoodieRealtimeRecordReader((RealtimeSplit) split, jobConf,
|
addProjectionToJobConf(realtimeSplit, jobConf);
|
||||||
|
LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
|
||||||
|
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
|
||||||
|
return new HoodieRealtimeRecordReader(realtimeSplit, jobConf,
|
||||||
super.getRecordReader(split, jobConf, reporter));
|
super.getRecordReader(split, jobConf, reporter));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -84,7 +84,8 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
|
|||||||
if (!result) {
|
if (!result) {
|
||||||
// if the result is false, then there are no more records
|
// if the result is false, then there are no more records
|
||||||
return false;
|
return false;
|
||||||
} else {
|
}
|
||||||
|
if (!deltaRecordMap.isEmpty()) {
|
||||||
// TODO(VC): Right now, we assume all records in log, have a matching base record. (which
|
// TODO(VC): Right now, we assume all records in log, have a matching base record. (which
|
||||||
// would be true until we have a way to index logs too)
|
// would be true until we have a way to index logs too)
|
||||||
// return from delta records map if we have some match.
|
// return from delta records map if we have some match.
|
||||||
@@ -134,8 +135,8 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
|
|||||||
throw new RuntimeException(errMsg, re);
|
throw new RuntimeException(errMsg, re);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -45,6 +45,8 @@ import org.apache.hadoop.io.LongWritable;
|
|||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
@@ -58,6 +60,7 @@ import java.util.TreeMap;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public class HoodieRealtimeRecordReaderUtils {
|
public class HoodieRealtimeRecordReaderUtils {
|
||||||
|
private static final Logger LOG = LogManager.getLogger(HoodieRealtimeRecordReaderUtils.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reads the schema from the base file.
|
* Reads the schema from the base file.
|
||||||
@@ -246,10 +249,10 @@ public class HoodieRealtimeRecordReaderUtils {
|
|||||||
// /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188}
|
// /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188}
|
||||||
// Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
|
// Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
|
||||||
// /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229}
|
// /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229}
|
||||||
String[] fieldOrdersWithDups = fieldOrderCsv.split(",");
|
String[] fieldOrdersWithDups = fieldOrderCsv.isEmpty() ? new String[0] : fieldOrderCsv.split(",");
|
||||||
Set<String> fieldOrdersSet = new LinkedHashSet<>(Arrays.asList(fieldOrdersWithDups));
|
Set<String> fieldOrdersSet = new LinkedHashSet<>(Arrays.asList(fieldOrdersWithDups));
|
||||||
String[] fieldOrders = fieldOrdersSet.toArray(new String[0]);
|
String[] fieldOrders = fieldOrdersSet.toArray(new String[0]);
|
||||||
List<String> fieldNames = Arrays.stream(fieldNameCsv.split(","))
|
List<String> fieldNames = fieldNameCsv.isEmpty() ? new ArrayList<>() : Arrays.stream(fieldNameCsv.split(","))
|
||||||
.filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
|
.filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
|
||||||
Set<String> fieldNamesSet = new LinkedHashSet<>(fieldNames);
|
Set<String> fieldNamesSet = new LinkedHashSet<>(fieldNames);
|
||||||
// Hive does not provide ids for partitioning fields, so check for lengths excluding that.
|
// Hive does not provide ids for partitioning fields, so check for lengths excluding that.
|
||||||
|
|||||||
Reference in New Issue
Block a user