1
0

[HUDI-2074] Use while loop instead of recursive call in MergeOnReadInputFormat#MergeIterator to avoid StackOverflow (#3159)

This commit is contained in:
Danny Chan
2021-06-28 16:03:10 +08:00
committed by GitHub
parent e99a6b031b
commit d24341d10c

View File

@@ -572,13 +572,13 @@ public class MergeOnReadInputFormat
@Override @Override
public boolean reachedEnd() throws IOException { public boolean reachedEnd() throws IOException {
if (!readLogs && !this.reader.reachedEnd()) { while (!readLogs && !this.reader.reachedEnd()) {
currentRecord = this.reader.nextRecord(); currentRecord = this.reader.nextRecord();
if (instantRange != null) { if (instantRange != null) {
boolean isInRange = instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString()); boolean isInRange = instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString());
if (!isInRange) { if (!isInRange) {
// filter base file by instant range // filter base file by instant range
return reachedEnd(); continue;
} }
} }
final String curKey = currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString(); final String curKey = currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString();
@@ -587,7 +587,7 @@ public class MergeOnReadInputFormat
Option<IndexedRecord> mergedAvroRecord = mergeRowWithLog(currentRecord, curKey); Option<IndexedRecord> mergedAvroRecord = mergeRowWithLog(currentRecord, curKey);
if (!mergedAvroRecord.isPresent()) { if (!mergedAvroRecord.isPresent()) {
// deleted // deleted
return reachedEnd(); continue;
} else { } else {
GenericRecord record = buildAvroRecordBySchema( GenericRecord record = buildAvroRecordBySchema(
mergedAvroRecord.get(), mergedAvroRecord.get(),
@@ -601,27 +601,27 @@ public class MergeOnReadInputFormat
// project the full record in base with required positions // project the full record in base with required positions
currentRecord = projection.project(currentRecord); currentRecord = projection.project(currentRecord);
return false; return false;
} else { }
readLogs = true; // read the logs
while (logKeysIterator.hasNext()) { readLogs = true;
final String curKey = logKeysIterator.next(); while (logKeysIterator.hasNext()) {
if (!keyToSkip.contains(curKey)) { final String curKey = logKeysIterator.next();
Option<IndexedRecord> insertAvroRecord = if (!keyToSkip.contains(curKey)) {
logRecords.get(curKey).getData().getInsertValue(tableSchema); Option<IndexedRecord> insertAvroRecord =
if (insertAvroRecord.isPresent()) { logRecords.get(curKey).getData().getInsertValue(tableSchema);
// the record is a DELETE if insertAvroRecord not present, skipping if (insertAvroRecord.isPresent()) {
GenericRecord requiredAvroRecord = buildAvroRecordBySchema( // the record is a DELETE if insertAvroRecord not present, skipping
insertAvroRecord.get(), GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
requiredSchema, insertAvroRecord.get(),
requiredPos, requiredSchema,
recordBuilder); requiredPos,
this.currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord); recordBuilder);
return false; this.currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
} return false;
} }
} }
return true;
} }
return true;
} }
@Override @Override