[HUDI-2269] Release the disk map resource for flink streaming reader (#3384)
This commit is contained in:
@@ -357,7 +357,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
||||
}
|
||||
}
|
||||
|
||||
keyToNewRecords.clear();
|
||||
((ExternalSpillableMap) keyToNewRecords).close();
|
||||
writtenRecordKeys.clear();
|
||||
|
||||
|
||||
@@ -256,7 +256,9 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
|
||||
}
|
||||
|
||||
public void close() {
|
||||
inMemoryMap.clear();
|
||||
getDiskBasedMap().close();
|
||||
currentInMemoryMapSize = 0L;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -37,6 +37,7 @@ import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
|
||||
import org.apache.hudi.table.HoodieFlinkTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.format.FormatUtils;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
@@ -201,7 +202,8 @@ public class BootstrapFunction<I, O extends HoodieRecord>
|
||||
.filter(logFile -> logFile.getFileSize() > 0)
|
||||
.map(logFile -> logFile.getPath().toString())
|
||||
.collect(toList());
|
||||
HoodieMergedLogRecordScanner scanner = scanLog(logPaths, schema, latestCommitTime.get().getTimestamp());
|
||||
HoodieMergedLogRecordScanner scanner = FormatUtils.scanLog(logPaths, schema, latestCommitTime.get().getTimestamp(),
|
||||
writeConfig, hadoopConf);
|
||||
|
||||
try {
|
||||
for (String recordKey : scanner.getRecords().keySet()) {
|
||||
@@ -209,6 +211,8 @@ public class BootstrapFunction<I, O extends HoodieRecord>
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e);
|
||||
} finally {
|
||||
scanner.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -218,27 +222,6 @@ public class BootstrapFunction<I, O extends HoodieRecord>
|
||||
this.getClass().getSimpleName(), taskID, partitionPath, cost);
|
||||
}
|
||||
|
||||
private HoodieMergedLogRecordScanner scanLog(
|
||||
List<String> logPaths,
|
||||
Schema logSchema,
|
||||
String latestInstantTime) {
|
||||
String basePath = this.hoodieTable.getMetaClient().getBasePath();
|
||||
return HoodieMergedLogRecordScanner.newBuilder()
|
||||
.withFileSystem(FSUtils.getFs(basePath, this.hadoopConf))
|
||||
.withBasePath(basePath)
|
||||
.withLogFilePaths(logPaths)
|
||||
.withReaderSchema(logSchema)
|
||||
.withLatestInstantTime(latestInstantTime)
|
||||
.withReadBlocksLazily(this.writeConfig.getCompactionLazyBlockReadEnabled())
|
||||
.withReverseReader(false)
|
||||
.withBufferSize(this.writeConfig.getMaxDFSStreamBufferSize())
|
||||
.withMaxMemorySizeInBytes(this.writeConfig.getMaxMemoryPerPartitionMerge())
|
||||
.withSpillableMapBasePath(this.writeConfig.getSpillableMapBasePath())
|
||||
.withDiskMapType(this.writeConfig.getCommonConfig().getSpillableDiskMapType())
|
||||
.withBitCaskDiskMapCompressionEnabled(this.writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
|
||||
.build();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static HoodieRecord generateHoodieRecord(HoodieKey hoodieKey, FileSlice fileSlice) {
|
||||
HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, null);
|
||||
|
||||
@@ -20,6 +20,7 @@ package org.apache.hudi.table.format;
|
||||
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
|
||||
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
|
||||
@@ -83,6 +84,29 @@ public class FormatUtils {
|
||||
.build();
|
||||
}
|
||||
|
||||
public static HoodieMergedLogRecordScanner scanLog(
|
||||
List<String> logPaths,
|
||||
Schema logSchema,
|
||||
String latestInstantTime,
|
||||
HoodieWriteConfig writeConfig,
|
||||
Configuration hadoopConf) {
|
||||
String basePath = writeConfig.getBasePath();
|
||||
return HoodieMergedLogRecordScanner.newBuilder()
|
||||
.withFileSystem(FSUtils.getFs(basePath, hadoopConf))
|
||||
.withBasePath(basePath)
|
||||
.withLogFilePaths(logPaths)
|
||||
.withReaderSchema(logSchema)
|
||||
.withLatestInstantTime(latestInstantTime)
|
||||
.withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled())
|
||||
.withReverseReader(false)
|
||||
.withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
|
||||
.withMaxMemorySizeInBytes(writeConfig.getMaxMemoryPerPartitionMerge())
|
||||
.withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
|
||||
.withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType())
|
||||
.withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
|
||||
.build();
|
||||
}
|
||||
|
||||
private static Boolean string2Boolean(String s) {
|
||||
return "true".equals(s.toLowerCase(Locale.ROOT));
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
package org.apache.hudi.table.format.mor;
|
||||
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
|
||||
import org.apache.hudi.common.table.log.InstantRange;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
@@ -59,7 +59,6 @@ import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
@@ -293,15 +292,14 @@ public class MergeOnReadInputFormat
|
||||
Long.MAX_VALUE); // read the whole file
|
||||
}
|
||||
|
||||
private Iterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
|
||||
private ClosableIterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
|
||||
final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
|
||||
final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());
|
||||
final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
|
||||
final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
|
||||
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
|
||||
final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords =
|
||||
FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords();
|
||||
final Iterator<String> logRecordsKeyIterator = logRecords.keySet().iterator();
|
||||
final HoodieMergedLogRecordScanner scanner = FormatUtils.scanLog(split, tableSchema, hadoopConf);
|
||||
final Iterator<String> logRecordsKeyIterator = scanner.getRecords().keySet().iterator();
|
||||
final int[] pkOffset = tableState.getPkOffsetsInRequired();
|
||||
// flag saying whether the pk semantics has been dropped by user specified
|
||||
// projections. For e.g, if the pk fields are [a, b] but user only select a,
|
||||
@@ -310,7 +308,7 @@ public class MergeOnReadInputFormat
|
||||
final LogicalType[] pkTypes = pkSemanticLost ? null : tableState.getPkTypes(pkOffset);
|
||||
final StringToRowDataConverter converter = pkSemanticLost ? null : new StringToRowDataConverter(pkTypes);
|
||||
|
||||
return new Iterator<RowData>() {
|
||||
return new ClosableIterator<RowData>() {
|
||||
private RowData currentRecord;
|
||||
|
||||
@Override
|
||||
@@ -318,7 +316,7 @@ public class MergeOnReadInputFormat
|
||||
while (logRecordsKeyIterator.hasNext()) {
|
||||
String curAvroKey = logRecordsKeyIterator.next();
|
||||
Option<IndexedRecord> curAvroRecord = null;
|
||||
final HoodieRecord<?> hoodieRecord = logRecords.get(curAvroKey);
|
||||
final HoodieRecord<?> hoodieRecord = scanner.getRecords().get(curAvroKey);
|
||||
try {
|
||||
curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
|
||||
} catch (IOException e) {
|
||||
@@ -359,6 +357,11 @@ public class MergeOnReadInputFormat
|
||||
public RowData next() {
|
||||
return currentRecord;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
scanner.close();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -366,6 +369,11 @@ public class MergeOnReadInputFormat
|
||||
// Inner Class
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private interface ClosableIterator<E> extends Iterator<E>, AutoCloseable {
|
||||
@Override
|
||||
void close(); // override to not throw exception
|
||||
}
|
||||
|
||||
private interface RecordIterator {
|
||||
boolean reachedEnd() throws IOException;
|
||||
|
||||
@@ -453,9 +461,9 @@ public class MergeOnReadInputFormat
|
||||
|
||||
static class LogFileOnlyIterator implements RecordIterator {
|
||||
// iterator for log files
|
||||
private final Iterator<RowData> iterator;
|
||||
private final ClosableIterator<RowData> iterator;
|
||||
|
||||
LogFileOnlyIterator(Iterator<RowData> iterator) {
|
||||
LogFileOnlyIterator(ClosableIterator<RowData> iterator) {
|
||||
this.iterator = iterator;
|
||||
}
|
||||
|
||||
@@ -471,7 +479,9 @@ public class MergeOnReadInputFormat
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// no operation
|
||||
if (this.iterator != null) {
|
||||
this.iterator.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -479,7 +489,7 @@ public class MergeOnReadInputFormat
|
||||
// base file reader
|
||||
private final ParquetColumnarRowSplitReader reader;
|
||||
// iterator for log files
|
||||
private final Iterator<RowData> iterator;
|
||||
private final ClosableIterator<RowData> iterator;
|
||||
|
||||
// add the flag because the flink ParquetColumnarRowSplitReader is buggy:
|
||||
// method #reachedEnd() returns false after it returns true.
|
||||
@@ -488,7 +498,7 @@ public class MergeOnReadInputFormat
|
||||
|
||||
private RowData currentRecord;
|
||||
|
||||
SkipMergeIterator(ParquetColumnarRowSplitReader reader, Iterator<RowData> iterator) {
|
||||
SkipMergeIterator(ParquetColumnarRowSplitReader reader, ClosableIterator<RowData> iterator) {
|
||||
this.reader = reader;
|
||||
this.iterator = iterator;
|
||||
}
|
||||
@@ -517,6 +527,9 @@ public class MergeOnReadInputFormat
|
||||
if (this.reader != null) {
|
||||
this.reader.close();
|
||||
}
|
||||
if (this.iterator != null) {
|
||||
this.iterator.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -525,8 +538,8 @@ public class MergeOnReadInputFormat
|
||||
private final ParquetColumnarRowSplitReader reader;
|
||||
// log keys used for merging
|
||||
private final Iterator<String> logKeysIterator;
|
||||
// log records
|
||||
private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords;
|
||||
// scanner
|
||||
private final HoodieMergedLogRecordScanner scanner;
|
||||
|
||||
private final Schema tableSchema;
|
||||
private final Schema requiredSchema;
|
||||
@@ -559,8 +572,8 @@ public class MergeOnReadInputFormat
|
||||
ParquetColumnarRowSplitReader reader) { // the reader should be with full schema
|
||||
this.tableSchema = tableSchema;
|
||||
this.reader = reader;
|
||||
this.logRecords = FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords();
|
||||
this.logKeysIterator = this.logRecords.keySet().iterator();
|
||||
this.scanner = FormatUtils.scanLog(split, tableSchema, hadoopConf);
|
||||
this.logKeysIterator = scanner.getRecords().keySet().iterator();
|
||||
this.requiredSchema = requiredSchema;
|
||||
this.requiredPos = requiredPos;
|
||||
this.recordBuilder = new GenericRecordBuilder(requiredSchema);
|
||||
@@ -582,7 +595,7 @@ public class MergeOnReadInputFormat
|
||||
}
|
||||
}
|
||||
final String curKey = currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString();
|
||||
if (logRecords.containsKey(curKey)) {
|
||||
if (scanner.getRecords().containsKey(curKey)) {
|
||||
keyToSkip.add(curKey);
|
||||
Option<IndexedRecord> mergedAvroRecord = mergeRowWithLog(currentRecord, curKey);
|
||||
if (!mergedAvroRecord.isPresent()) {
|
||||
@@ -608,7 +621,7 @@ public class MergeOnReadInputFormat
|
||||
final String curKey = logKeysIterator.next();
|
||||
if (!keyToSkip.contains(curKey)) {
|
||||
Option<IndexedRecord> insertAvroRecord =
|
||||
logRecords.get(curKey).getData().getInsertValue(tableSchema);
|
||||
scanner.getRecords().get(curKey).getData().getInsertValue(tableSchema);
|
||||
if (insertAvroRecord.isPresent()) {
|
||||
// the record is a DELETE if insertAvroRecord not present, skipping
|
||||
GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
|
||||
@@ -634,13 +647,16 @@ public class MergeOnReadInputFormat
|
||||
if (this.reader != null) {
|
||||
this.reader.close();
|
||||
}
|
||||
if (this.scanner != null) {
|
||||
this.scanner.close();
|
||||
}
|
||||
}
|
||||
|
||||
private Option<IndexedRecord> mergeRowWithLog(
|
||||
RowData curRow,
|
||||
String curKey) throws IOException {
|
||||
GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow);
|
||||
return logRecords.get(curKey).getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema);
|
||||
return scanner.getRecords().get(curKey).getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user