diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index 58898e722..f00efa5ef 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; @@ -54,6 +53,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader private final Map> deltaRecordMap; private final Set deltaRecordKeys; + private final HoodieMergedLogRecordScanner mergedLogRecordScanner; private int recordKeyIndex = HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS; private Iterator deltaItr; @@ -61,7 +61,8 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader RecordReader realReader) throws IOException { super(split, job); this.parquetReader = realReader; - this.deltaRecordMap = getMergedLogRecordScanner().getRecords(); + this.mergedLogRecordScanner = getMergedLogRecordScanner(); + this.deltaRecordMap = mergedLogRecordScanner.getRecords(); this.deltaRecordKeys = new HashSet<>(this.deltaRecordMap.keySet()); if (split.getHoodieVirtualKeyInfo().isPresent()) { this.recordKeyIndex = split.getHoodieVirtualKeyInfo().get().getRecordKeyFieldIndex(); @@ -192,7 +193,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader parquetReader.close(); // need clean the tmp file which created by logScanner // Otherwise, for resident process such as presto, the /tmp directory will overflow - ((ExternalSpillableMap) deltaRecordMap).close(); + mergedLogRecordScanner.close(); } @Override