diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index c7a910608..d00c7fadc 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -572,13 +572,13 @@ public class MergeOnReadInputFormat @Override public boolean reachedEnd() throws IOException { - if (!readLogs && !this.reader.reachedEnd()) { + while (!readLogs && !this.reader.reachedEnd()) { currentRecord = this.reader.nextRecord(); if (instantRange != null) { boolean isInRange = instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString()); if (!isInRange) { // filter base file by instant range - return reachedEnd(); + continue; } } final String curKey = currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString(); @@ -587,7 +587,7 @@ public class MergeOnReadInputFormat Option mergedAvroRecord = mergeRowWithLog(currentRecord, curKey); if (!mergedAvroRecord.isPresent()) { // deleted - return reachedEnd(); + continue; } else { GenericRecord record = buildAvroRecordBySchema( mergedAvroRecord.get(), @@ -601,27 +601,27 @@ public class MergeOnReadInputFormat // project the full record in base with required positions currentRecord = projection.project(currentRecord); return false; - } else { - readLogs = true; - while (logKeysIterator.hasNext()) { - final String curKey = logKeysIterator.next(); - if (!keyToSkip.contains(curKey)) { - Option insertAvroRecord = - logRecords.get(curKey).getData().getInsertValue(tableSchema); - if (insertAvroRecord.isPresent()) { - // the record is a DELETE if insertAvroRecord not present, skipping - GenericRecord requiredAvroRecord = buildAvroRecordBySchema( - insertAvroRecord.get(), - requiredSchema, - requiredPos, - recordBuilder); - this.currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord); - return false; - } + } + // read the logs + readLogs = true; + while (logKeysIterator.hasNext()) { + final String curKey = logKeysIterator.next(); + if (!keyToSkip.contains(curKey)) { + Option insertAvroRecord = + logRecords.get(curKey).getData().getInsertValue(tableSchema); + if (insertAvroRecord.isPresent()) { + // the record is a DELETE if insertAvroRecord not present, skipping + GenericRecord requiredAvroRecord = buildAvroRecordBySchema( + insertAvroRecord.get(), + requiredSchema, + requiredPos, + recordBuilder); + this.currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord); + return false; } } - return true; } + return true; } @Override