[HUDI-1720] Fix RealtimeCompactedRecordReader StackOverflowError (#2721)
This commit is contained in:
@@ -77,6 +77,14 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
|
|||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Option<GenericRecord> buildGenericRecordwithCustomPayload(HoodieRecord record) throws IOException {
|
||||||
|
if (usesCustomPayload) {
|
||||||
|
return record.getData().getInsertValue(getWriterSchema());
|
||||||
|
} else {
|
||||||
|
return record.getData().getInsertValue(getReaderSchema());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOException {
|
public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOException {
|
||||||
// Call the underlying parquetReader.next - which may replace the passed in ArrayWritable
|
// Call the underlying parquetReader.next - which may replace the passed in ArrayWritable
|
||||||
@@ -95,15 +103,24 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
|
|||||||
// TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the
|
// TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the
|
||||||
// deltaRecord may not be a full record and needs values of columns from the parquet
|
// deltaRecord may not be a full record and needs values of columns from the parquet
|
||||||
Option<GenericRecord> rec;
|
Option<GenericRecord> rec;
|
||||||
if (usesCustomPayload) {
|
rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key));
|
||||||
rec = deltaRecordMap.get(key).getData().getInsertValue(getWriterSchema());
|
// If the record is not present, this is a delete record using an empty payload so skip this base record
|
||||||
} else {
|
// and move to the next record
|
||||||
rec = deltaRecordMap.get(key).getData().getInsertValue(getReaderSchema());
|
while (!rec.isPresent()) {
|
||||||
|
// if current parquet reader has no record, return false
|
||||||
|
if (!this.parquetReader.next(aVoid, arrayWritable)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
String tempKey = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
|
||||||
|
if (deltaRecordMap.containsKey(tempKey)) {
|
||||||
|
rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(tempKey));
|
||||||
|
} else {
|
||||||
|
// need to return true, since now log file does not contain tempKey, but parquet file contains tempKey
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (!rec.isPresent()) {
|
if (!rec.isPresent()) {
|
||||||
// If the record is not present, this is a delete record using an empty payload so skip this base record
|
return false;
|
||||||
// and move to the next record
|
|
||||||
return next(aVoid, arrayWritable);
|
|
||||||
}
|
}
|
||||||
GenericRecord recordToReturn = rec.get();
|
GenericRecord recordToReturn = rec.get();
|
||||||
if (usesCustomPayload) {
|
if (usesCustomPayload) {
|
||||||
|
|||||||
Reference in New Issue
Block a user