diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 1186cff8a..142d8b0ec 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -306,16 +306,17 @@ public class MergeOnReadInputFormat @Override public boolean hasNext() { - if (logRecordsKeyIterator.hasNext()) { - String curAvrokey = logRecordsKeyIterator.next(); + while (logRecordsKeyIterator.hasNext()) { + String curAvroKey = logRecordsKeyIterator.next(); Option curAvroRecord = null; - final HoodieRecord hoodieRecord = logRecords.get(curAvrokey); + final HoodieRecord hoodieRecord = logRecords.get(curAvroKey); try { curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema); } catch (IOException e) { - throw new HoodieException("Get avro insert value error for key: " + curAvrokey, e); + throw new HoodieException("Get avro insert value error for key: " + curAvroKey, e); } if (!curAvroRecord.isPresent()) { + // delete record found if (emitDelete && !pkSemanticLost) { GenericRowData delete = new GenericRowData(tableState.getRequiredRowType().getFieldCount()); @@ -329,10 +330,9 @@ public class MergeOnReadInputFormat this.currentRecord = delete; return true; - } else { - // delete record found, skipping - return hasNext(); } + // skipping if the condition is unsatisfied + // continue; } else { // should improve the code when log scanner supports // seeking by log blocks with commit time which is more @@ -342,7 +342,7 @@ public class MergeOnReadInputFormat String commitTime = curAvroRecord.get().get(HOODIE_COMMIT_TIME_COL_POS).toString(); if (!split.getInstantRange().get().isInRange(commitTime)) { // filter out the records that are not in range - return hasNext(); + continue; } } GenericRecord requiredAvroRecord = buildAvroRecordBySchema( @@ -353,9 +353,8 @@ public class MergeOnReadInputFormat currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord); return true; } - } else { - return false; } + return false; } @Override