From 123da020e209b1537a7b247a9991082071a862c2 Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Tue, 13 Mar 2018 22:56:29 -0700 Subject: [PATCH] - Fixing memory leak due to HoodieLogFileReader holding on to a logblock - Removed inMemory HashMap usage in merge(..) code in LogScanner --- .../log/HoodieCompactedLogRecordScanner.java | 42 +++++-------------- .../common/table/log/HoodieLogFileReader.java | 16 +++---- .../table/log/HoodieLogFormatReader.java | 8 +++- .../common/table/log/HoodieLogFormatTest.java | 2 + 4 files changed, 27 insertions(+), 41 deletions(-) diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java index 98dd29b78..c8fe0e9e3 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java @@ -240,14 +240,12 @@ public class HoodieCompactedLogRecordScanner implements /** * Iterate over the GenericRecord in the block, read the hoodie key and partition path and merge - * with the application specific payload if the same key was found before Sufficient to just merge - * the log records since the base data is merged on previous compaction + * with the application specific payload if the same key was found before. Sufficient to just merge + * the log records since the base data is merged on previous compaction. + * Finally, merge this log block with the accumulated records */ - private Map> loadRecordsFromBlock( + private Map> merge( HoodieAvroDataBlock dataBlock) throws IOException { - // TODO (NA) - Instead of creating a new HashMap use the spillable map - Map> recordsFromLastBlock = Maps - .newHashMap(); // TODO (NA) - Implemnt getRecordItr() in HoodieAvroDataBlock and use that here List recs = dataBlock.getRecords(); totalLogRecords.addAndGet(recs.size()); @@ -256,19 +254,19 @@ public class HoodieCompactedLogRecordScanner implements .toString(); HoodieRecord hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN); - if (recordsFromLastBlock.containsKey(key)) { + if (records.containsKey(key)) { // Merge and store the merged record - HoodieRecordPayload combinedValue = recordsFromLastBlock.get(key).getData() + HoodieRecordPayload combinedValue = records.get(key).getData() .preCombine(hoodieRecord.getData()); - recordsFromLastBlock + records .put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue)); } else { // Put the record as is - recordsFromLastBlock.put(key, hoodieRecord); + records.put(key, hoodieRecord); } }); - return recordsFromLastBlock; + return records; } /** @@ -277,11 +275,12 @@ public class HoodieCompactedLogRecordScanner implements private void merge(Map> records, Deque lastBlocks) throws IOException { while (!lastBlocks.isEmpty()) { + log.info("Number of remaining logblocks to merge " + lastBlocks.size()); // poll the element at the bottom of the stack since that's the order it was inserted HoodieLogBlock lastBlock = lastBlocks.pollLast(); switch (lastBlock.getBlockType()) { case AVRO_DATA_BLOCK: - merge(records, loadRecordsFromBlock((HoodieAvroDataBlock) lastBlock)); + merge((HoodieAvroDataBlock) lastBlock); break; case DELETE_BLOCK: // TODO : If delete is the only block written and/or records are present in parquet file @@ -295,25 +294,6 @@ public class HoodieCompactedLogRecordScanner implements } } - /** - * Merge the records read from a single data block with the accumulated records - */ - private void merge(Map> records, - Map> recordsFromLastBlock) { - recordsFromLastBlock.forEach((key, hoodieRecord) -> { - if (records.containsKey(key)) { - // Merge and store the merged record - HoodieRecordPayload combinedValue = records.get(key).getData() - .preCombine(hoodieRecord.getData()); - records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), - combinedValue)); - } else { - // Put the record as is - records.put(key, hoodieRecord); - } - }); - } - @Override public Iterator> iterator() { return records.iterator(); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java index f9a01c8c8..1d4cda061 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java @@ -56,7 +56,6 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { private static final byte[] oldMagicBuffer = new byte[4]; private static final byte[] magicBuffer = new byte[6]; private final Schema readerSchema; - private HoodieLogBlock nextBlock = null; private LogFormatVersion nextBlockVersion; private boolean readBlockLazily; private long reverseLogFilePosition; @@ -271,8 +270,8 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { if (isEOF) { return false; } - this.nextBlock = readBlock(); - return nextBlock != null; + // If not hasNext(), we either we reach EOF or throw an exception on invalid magic header + return true; } catch (IOException e) { throw new HoodieIOException("IOException when reading logfile " + logFile, e); } @@ -322,11 +321,12 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { @Override public HoodieLogBlock next() { - if (nextBlock == null) { - // may be hasNext is not called - hasNext(); + try { + // hasNext() must be called before next() + return readBlock(); + } catch(IOException io) { + throw new HoodieIOException("IOException when reading logblock from log file " + logFile, io); } - return nextBlock; } /** @@ -378,7 +378,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { boolean hasNext = hasNext(); reverseLogFilePosition -= blockSize; lastReverseLogFilePosition = reverseLogFilePosition; - return this.nextBlock; + return next(); } /** diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java index f62c9f0e5..e5f8f5c99 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java @@ -24,6 +24,8 @@ import org.apache.hadoop.fs.FileSystem; import java.io.IOException; import java.util.List; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; public class HoodieLogFormatReader implements HoodieLogFormat.Reader { @@ -34,6 +36,8 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { private final boolean readBlocksLazily; private final boolean reverseLogReader; + private final static Logger log = LogManager.getLogger(HoodieLogFormatReader.class); + HoodieLogFormatReader(FileSystem fs, List logFiles, Schema readerSchema, boolean readBlocksLazily, boolean reverseLogReader) throws IOException { this.logFiles = logFiles; @@ -77,6 +81,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { } catch (IOException io) { throw new HoodieIOException("unable to initialize read with log file ", io); } + log.info("Moving to the next reader for logfile " + currentReader.getLogFile()); return this.currentReader.hasNext(); } return false; @@ -84,8 +89,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { @Override public HoodieLogBlock next() { - HoodieLogBlock block = currentReader.next(); - return block; + return currentReader.next(); } @Override diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java index 3ef9f23bf..209a53cb4 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java @@ -374,6 +374,7 @@ public class HoodieLogFormatTest { assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords1, dataBlockRead.getRecords()); + reader.hasNext(); nextBlock = reader.next(); dataBlockRead = (HoodieAvroDataBlock) nextBlock; assertEquals("Read records size should be equal to the written records size", @@ -381,6 +382,7 @@ public class HoodieLogFormatTest { assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords2, dataBlockRead.getRecords()); + reader.hasNext(); nextBlock = reader.next(); dataBlockRead = (HoodieAvroDataBlock) nextBlock; assertEquals("Read records size should be equal to the written records size",