[HUDI-1829] Use while loop instead of recursive call in MergeOnReadInputFormat to avoid StackOverflow (#2862)
Recursive all is risky for StackOverflow when there are too many.
This commit is contained in:
@@ -306,16 +306,17 @@ public class MergeOnReadInputFormat
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean hasNext() {
|
public boolean hasNext() {
|
||||||
if (logRecordsKeyIterator.hasNext()) {
|
while (logRecordsKeyIterator.hasNext()) {
|
||||||
String curAvrokey = logRecordsKeyIterator.next();
|
String curAvroKey = logRecordsKeyIterator.next();
|
||||||
Option<IndexedRecord> curAvroRecord = null;
|
Option<IndexedRecord> curAvroRecord = null;
|
||||||
final HoodieRecord<?> hoodieRecord = logRecords.get(curAvrokey);
|
final HoodieRecord<?> hoodieRecord = logRecords.get(curAvroKey);
|
||||||
try {
|
try {
|
||||||
curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
|
curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new HoodieException("Get avro insert value error for key: " + curAvrokey, e);
|
throw new HoodieException("Get avro insert value error for key: " + curAvroKey, e);
|
||||||
}
|
}
|
||||||
if (!curAvroRecord.isPresent()) {
|
if (!curAvroRecord.isPresent()) {
|
||||||
|
// delete record found
|
||||||
if (emitDelete && !pkSemanticLost) {
|
if (emitDelete && !pkSemanticLost) {
|
||||||
GenericRowData delete = new GenericRowData(tableState.getRequiredRowType().getFieldCount());
|
GenericRowData delete = new GenericRowData(tableState.getRequiredRowType().getFieldCount());
|
||||||
|
|
||||||
@@ -329,10 +330,9 @@ public class MergeOnReadInputFormat
|
|||||||
|
|
||||||
this.currentRecord = delete;
|
this.currentRecord = delete;
|
||||||
return true;
|
return true;
|
||||||
} else {
|
|
||||||
// delete record found, skipping
|
|
||||||
return hasNext();
|
|
||||||
}
|
}
|
||||||
|
// skipping if the condition is unsatisfied
|
||||||
|
// continue;
|
||||||
} else {
|
} else {
|
||||||
// should improve the code when log scanner supports
|
// should improve the code when log scanner supports
|
||||||
// seeking by log blocks with commit time which is more
|
// seeking by log blocks with commit time which is more
|
||||||
@@ -342,7 +342,7 @@ public class MergeOnReadInputFormat
|
|||||||
String commitTime = curAvroRecord.get().get(HOODIE_COMMIT_TIME_COL_POS).toString();
|
String commitTime = curAvroRecord.get().get(HOODIE_COMMIT_TIME_COL_POS).toString();
|
||||||
if (!split.getInstantRange().get().isInRange(commitTime)) {
|
if (!split.getInstantRange().get().isInRange(commitTime)) {
|
||||||
// filter out the records that are not in range
|
// filter out the records that are not in range
|
||||||
return hasNext();
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
|
GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
|
||||||
@@ -353,9 +353,8 @@ public class MergeOnReadInputFormat
|
|||||||
currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
|
currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
Reference in New Issue
Block a user