1
0

- Fixing memory leak due to HoodieLogFileReader holding on to a logblock

- Removed inMemory HashMap usage in merge(..) code in LogScanner
This commit is contained in:
Nishith Agarwal
2018-03-13 22:56:29 -07:00
committed by vinoth chandar
parent d3df32fa03
commit 123da020e2
4 changed files with 27 additions and 41 deletions

View File

@@ -240,14 +240,12 @@ public class HoodieCompactedLogRecordScanner implements
/** /**
* Iterate over the GenericRecord in the block, read the hoodie key and partition path and merge * Iterate over the GenericRecord in the block, read the hoodie key and partition path and merge
* with the application specific payload if the same key was found before Sufficient to just merge * with the application specific payload if the same key was found before. Sufficient to just merge
* the log records since the base data is merged on previous compaction * the log records since the base data is merged on previous compaction.
* Finally, merge this log block with the accumulated records
*/ */
private Map<String, HoodieRecord<? extends HoodieRecordPayload>> loadRecordsFromBlock( private Map<String, HoodieRecord<? extends HoodieRecordPayload>> merge(
HoodieAvroDataBlock dataBlock) throws IOException { HoodieAvroDataBlock dataBlock) throws IOException {
// TODO (NA) - Instead of creating a new HashMap use the spillable map
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordsFromLastBlock = Maps
.newHashMap();
// TODO (NA) - Implemnt getRecordItr() in HoodieAvroDataBlock and use that here // TODO (NA) - Implemnt getRecordItr() in HoodieAvroDataBlock and use that here
List<IndexedRecord> recs = dataBlock.getRecords(); List<IndexedRecord> recs = dataBlock.getRecords();
totalLogRecords.addAndGet(recs.size()); totalLogRecords.addAndGet(recs.size());
@@ -256,19 +254,19 @@ public class HoodieCompactedLogRecordScanner implements
.toString(); .toString();
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = HoodieRecord<? extends HoodieRecordPayload> hoodieRecord =
SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN); SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN);
if (recordsFromLastBlock.containsKey(key)) { if (records.containsKey(key)) {
// Merge and store the merged record // Merge and store the merged record
HoodieRecordPayload combinedValue = recordsFromLastBlock.get(key).getData() HoodieRecordPayload combinedValue = records.get(key).getData()
.preCombine(hoodieRecord.getData()); .preCombine(hoodieRecord.getData());
recordsFromLastBlock records
.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), .put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()),
combinedValue)); combinedValue));
} else { } else {
// Put the record as is // Put the record as is
recordsFromLastBlock.put(key, hoodieRecord); records.put(key, hoodieRecord);
} }
}); });
return recordsFromLastBlock; return records;
} }
/** /**
@@ -277,11 +275,12 @@ public class HoodieCompactedLogRecordScanner implements
private void merge(Map<String, HoodieRecord<? extends HoodieRecordPayload>> records, private void merge(Map<String, HoodieRecord<? extends HoodieRecordPayload>> records,
Deque<HoodieLogBlock> lastBlocks) throws IOException { Deque<HoodieLogBlock> lastBlocks) throws IOException {
while (!lastBlocks.isEmpty()) { while (!lastBlocks.isEmpty()) {
log.info("Number of remaining logblocks to merge " + lastBlocks.size());
// poll the element at the bottom of the stack since that's the order it was inserted // poll the element at the bottom of the stack since that's the order it was inserted
HoodieLogBlock lastBlock = lastBlocks.pollLast(); HoodieLogBlock lastBlock = lastBlocks.pollLast();
switch (lastBlock.getBlockType()) { switch (lastBlock.getBlockType()) {
case AVRO_DATA_BLOCK: case AVRO_DATA_BLOCK:
merge(records, loadRecordsFromBlock((HoodieAvroDataBlock) lastBlock)); merge((HoodieAvroDataBlock) lastBlock);
break; break;
case DELETE_BLOCK: case DELETE_BLOCK:
// TODO : If delete is the only block written and/or records are present in parquet file // TODO : If delete is the only block written and/or records are present in parquet file
@@ -295,25 +294,6 @@ public class HoodieCompactedLogRecordScanner implements
} }
} }
/**
* Merge the records read from a single data block with the accumulated records
*/
private void merge(Map<String, HoodieRecord<? extends HoodieRecordPayload>> records,
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordsFromLastBlock) {
recordsFromLastBlock.forEach((key, hoodieRecord) -> {
if (records.containsKey(key)) {
// Merge and store the merged record
HoodieRecordPayload combinedValue = records.get(key).getData()
.preCombine(hoodieRecord.getData());
records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()),
combinedValue));
} else {
// Put the record as is
records.put(key, hoodieRecord);
}
});
}
@Override @Override
public Iterator<HoodieRecord<? extends HoodieRecordPayload>> iterator() { public Iterator<HoodieRecord<? extends HoodieRecordPayload>> iterator() {
return records.iterator(); return records.iterator();

View File

@@ -56,7 +56,6 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
private static final byte[] oldMagicBuffer = new byte[4]; private static final byte[] oldMagicBuffer = new byte[4];
private static final byte[] magicBuffer = new byte[6]; private static final byte[] magicBuffer = new byte[6];
private final Schema readerSchema; private final Schema readerSchema;
private HoodieLogBlock nextBlock = null;
private LogFormatVersion nextBlockVersion; private LogFormatVersion nextBlockVersion;
private boolean readBlockLazily; private boolean readBlockLazily;
private long reverseLogFilePosition; private long reverseLogFilePosition;
@@ -271,8 +270,8 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
if (isEOF) { if (isEOF) {
return false; return false;
} }
this.nextBlock = readBlock(); // If not hasNext(), we either we reach EOF or throw an exception on invalid magic header
return nextBlock != null; return true;
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException("IOException when reading logfile " + logFile, e); throw new HoodieIOException("IOException when reading logfile " + logFile, e);
} }
@@ -322,11 +321,12 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
@Override @Override
public HoodieLogBlock next() { public HoodieLogBlock next() {
if (nextBlock == null) { try {
// may be hasNext is not called // hasNext() must be called before next()
hasNext(); return readBlock();
} catch(IOException io) {
throw new HoodieIOException("IOException when reading logblock from log file " + logFile, io);
} }
return nextBlock;
} }
/** /**
@@ -378,7 +378,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
boolean hasNext = hasNext(); boolean hasNext = hasNext();
reverseLogFilePosition -= blockSize; reverseLogFilePosition -= blockSize;
lastReverseLogFilePosition = reverseLogFilePosition; lastReverseLogFilePosition = reverseLogFilePosition;
return this.nextBlock; return next();
} }
/** /**

View File

@@ -24,6 +24,8 @@ import org.apache.hadoop.fs.FileSystem;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
public class HoodieLogFormatReader implements HoodieLogFormat.Reader { public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
@@ -34,6 +36,8 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
private final boolean readBlocksLazily; private final boolean readBlocksLazily;
private final boolean reverseLogReader; private final boolean reverseLogReader;
private final static Logger log = LogManager.getLogger(HoodieLogFormatReader.class);
HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles,
Schema readerSchema, boolean readBlocksLazily, boolean reverseLogReader) throws IOException { Schema readerSchema, boolean readBlocksLazily, boolean reverseLogReader) throws IOException {
this.logFiles = logFiles; this.logFiles = logFiles;
@@ -77,6 +81,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
} catch (IOException io) { } catch (IOException io) {
throw new HoodieIOException("unable to initialize read with log file ", io); throw new HoodieIOException("unable to initialize read with log file ", io);
} }
log.info("Moving to the next reader for logfile " + currentReader.getLogFile());
return this.currentReader.hasNext(); return this.currentReader.hasNext();
} }
return false; return false;
@@ -84,8 +89,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
@Override @Override
public HoodieLogBlock next() { public HoodieLogBlock next() {
HoodieLogBlock block = currentReader.next(); return currentReader.next();
return block;
} }
@Override @Override

View File

@@ -374,6 +374,7 @@ public class HoodieLogFormatTest {
assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords1, assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords1,
dataBlockRead.getRecords()); dataBlockRead.getRecords());
reader.hasNext();
nextBlock = reader.next(); nextBlock = reader.next();
dataBlockRead = (HoodieAvroDataBlock) nextBlock; dataBlockRead = (HoodieAvroDataBlock) nextBlock;
assertEquals("Read records size should be equal to the written records size", assertEquals("Read records size should be equal to the written records size",
@@ -381,6 +382,7 @@ public class HoodieLogFormatTest {
assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords2, assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords2,
dataBlockRead.getRecords()); dataBlockRead.getRecords());
reader.hasNext();
nextBlock = reader.next(); nextBlock = reader.next();
dataBlockRead = (HoodieAvroDataBlock) nextBlock; dataBlockRead = (HoodieAvroDataBlock) nextBlock;
assertEquals("Read records size should be equal to the written records size", assertEquals("Read records size should be equal to the written records size",