[HUDI-2966] Closing LogRecordScanner in compactor (#4478)
* Closing LogRecordScanner in compactor * Addressing comments
This commit is contained in:
committed by
GitHub
parent
37b15ff458
commit
a66212d204
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecord;
|
|||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
|
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
|
|
||||||
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
|
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
|
||||||
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
|
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
|
||||||
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
|
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
|
||||||
@@ -54,6 +53,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
|
|||||||
private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> deltaRecordMap;
|
private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> deltaRecordMap;
|
||||||
|
|
||||||
private final Set<String> deltaRecordKeys;
|
private final Set<String> deltaRecordKeys;
|
||||||
|
private final HoodieMergedLogRecordScanner mergedLogRecordScanner;
|
||||||
private int recordKeyIndex = HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
|
private int recordKeyIndex = HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
|
||||||
private Iterator<String> deltaItr;
|
private Iterator<String> deltaItr;
|
||||||
|
|
||||||
@@ -61,7 +61,8 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
|
|||||||
RecordReader<NullWritable, ArrayWritable> realReader) throws IOException {
|
RecordReader<NullWritable, ArrayWritable> realReader) throws IOException {
|
||||||
super(split, job);
|
super(split, job);
|
||||||
this.parquetReader = realReader;
|
this.parquetReader = realReader;
|
||||||
this.deltaRecordMap = getMergedLogRecordScanner().getRecords();
|
this.mergedLogRecordScanner = getMergedLogRecordScanner();
|
||||||
|
this.deltaRecordMap = mergedLogRecordScanner.getRecords();
|
||||||
this.deltaRecordKeys = new HashSet<>(this.deltaRecordMap.keySet());
|
this.deltaRecordKeys = new HashSet<>(this.deltaRecordMap.keySet());
|
||||||
if (split.getHoodieVirtualKeyInfo().isPresent()) {
|
if (split.getHoodieVirtualKeyInfo().isPresent()) {
|
||||||
this.recordKeyIndex = split.getHoodieVirtualKeyInfo().get().getRecordKeyFieldIndex();
|
this.recordKeyIndex = split.getHoodieVirtualKeyInfo().get().getRecordKeyFieldIndex();
|
||||||
@@ -192,7 +193,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
|
|||||||
parquetReader.close();
|
parquetReader.close();
|
||||||
// need clean the tmp file which created by logScanner
|
// need clean the tmp file which created by logScanner
|
||||||
// Otherwise, for resident process such as presto, the /tmp directory will overflow
|
// Otherwise, for resident process such as presto, the /tmp directory will overflow
|
||||||
((ExternalSpillableMap) deltaRecordMap).close();
|
mergedLogRecordScanner.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
Reference in New Issue
Block a user