From a5789c40673b36d40adab696706acc1446a286f8 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Fri, 23 Apr 2021 09:59:36 +0800 Subject: [PATCH] [HUDI-1829] Use while loop instead of recursive call in MergeOnReadInputFormat to avoid StackOverflow (#2862) Recursive all is risky for StackOverflow when there are too many. --- .../format/mor/MergeOnReadInputFormat.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 1186cff8a..142d8b0ec 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -306,16 +306,17 @@ public class MergeOnReadInputFormat @Override public boolean hasNext() { - if (logRecordsKeyIterator.hasNext()) { - String curAvrokey = logRecordsKeyIterator.next(); + while (logRecordsKeyIterator.hasNext()) { + String curAvroKey = logRecordsKeyIterator.next(); Option curAvroRecord = null; - final HoodieRecord hoodieRecord = logRecords.get(curAvrokey); + final HoodieRecord hoodieRecord = logRecords.get(curAvroKey); try { curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema); } catch (IOException e) { - throw new HoodieException("Get avro insert value error for key: " + curAvrokey, e); + throw new HoodieException("Get avro insert value error for key: " + curAvroKey, e); } if (!curAvroRecord.isPresent()) { + // delete record found if (emitDelete && !pkSemanticLost) { GenericRowData delete = new GenericRowData(tableState.getRequiredRowType().getFieldCount()); @@ -329,10 +330,9 @@ public class MergeOnReadInputFormat this.currentRecord = delete; return true; - } else { - // delete record found, skipping - return hasNext(); } + // skipping if the condition is unsatisfied + // continue; } else { // should improve the code when log scanner supports // seeking by log blocks with commit time which is more @@ -342,7 +342,7 @@ public class MergeOnReadInputFormat String commitTime = curAvroRecord.get().get(HOODIE_COMMIT_TIME_COL_POS).toString(); if (!split.getInstantRange().get().isInRange(commitTime)) { // filter out the records that are not in range - return hasNext(); + continue; } } GenericRecord requiredAvroRecord = buildAvroRecordBySchema( @@ -353,9 +353,8 @@ public class MergeOnReadInputFormat currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord); return true; } - } else { - return false; } + return false; } @Override