From 5cb28e7b1ff09149ca88fc761d0e19369c6c3505 Mon Sep 17 00:00:00 2001 From: Balaji Varadarajan Date: Wed, 19 Sep 2018 13:13:04 -0700 Subject: [PATCH] Explicitly release resources in LogFileReader and TestHoodieClientBase --- .../cli/commands/ArchivedCommitsCommand.java | 1 + .../cli/commands/HoodieLogFileCommand.java | 2 ++ .../com/uber/hoodie/TestAsyncCompaction.java | 5 +++ .../java/com/uber/hoodie/TestCleaner.java | 5 +++ .../com/uber/hoodie/TestClientRollback.java | 6 ++++ .../com/uber/hoodie/TestHoodieClientBase.java | 36 +++++++++++++++++-- .../TestHoodieClientOnCopyOnWriteStorage.java | 6 ++++ .../com/uber/hoodie/TestHoodieReadClient.java | 6 ++++ .../hoodie/io/TestHoodieCommitArchiveLog.java | 1 + .../log/AbstractHoodieLogRecordScanner.java | 13 ++++++- .../common/table/log/HoodieLogFileReader.java | 10 ++++-- .../table/log/HoodieLogFormatReader.java | 23 ++++++++++++ .../common/table/log/HoodieLogFormatTest.java | 10 +++++- .../AbstractRealtimeRecordReader.java | 1 + .../com/uber/hoodie/hive/util/SchemaUtil.java | 1 + 15 files changed, 119 insertions(+), 7 deletions(-) diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/ArchivedCommitsCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/ArchivedCommitsCommand.java index 893e03fcf..51545d842 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/ArchivedCommitsCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/ArchivedCommitsCommand.java @@ -77,6 +77,7 @@ public class ArchivedCommitsCommand implements CommandMarker { List readCommits = readRecords.stream().map(r -> (GenericRecord) r).map(r -> readCommit(r)) .collect(Collectors.toList()); allCommits.addAll(readCommits); + reader.close(); } TableHeader header = new TableHeader().addTableHeaderField("CommitTime") diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HoodieLogFileCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HoodieLogFileCommand.java index 8533d7afb..23b815431 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HoodieLogFileCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HoodieLogFileCommand.java @@ -131,6 +131,7 @@ public class HoodieLogFileCommand implements CommandMarker { totalEntries++; } } + reader.close(); } List rows = new ArrayList<>(); int i = 0; @@ -221,6 +222,7 @@ public class HoodieLogFileCommand implements CommandMarker { } } } + reader.close(); if (allRecords.size() >= limit) { break; } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java b/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java index 26fddc17a..c13c0c43e 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java @@ -73,6 +73,11 @@ public class TestAsyncCompaction extends TestHoodieClientBase { .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()); } + @Override + public void tearDown() throws IOException { + super.tearDown(); + } + @Test public void testRollbackInflightIngestionWithPendingCompaction() throws Exception { // Rollback inflight ingestion when there is pending compaction diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java index 6b5440b3d..2d2417cca 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java @@ -85,6 +85,11 @@ public class TestCleaner extends TestHoodieClientBase { private static final int BIG_BATCH_INSERT_SIZE = 500; private static Logger logger = LogManager.getLogger(TestHoodieClientBase.class); + @Override + public void tearDown() throws IOException { + super.tearDown(); + } + /** * Helper method to do first batch of insert for clean by versions/commits tests * diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestClientRollback.java b/hoodie-client/src/test/java/com/uber/hoodie/TestClientRollback.java index 2c8860219..175318653 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestClientRollback.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestClientRollback.java @@ -37,6 +37,7 @@ import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.table.HoodieTable; import java.io.File; +import java.io.IOException; import java.util.List; import java.util.stream.Collectors; import org.apache.spark.api.java.JavaRDD; @@ -47,6 +48,11 @@ import org.junit.Test; */ public class TestClientRollback extends TestHoodieClientBase { + @Override + public void tearDown() throws IOException { + super.tearDown(); + } + /** * Test case for rollback-savepoint interaction */ diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java index 5545992fa..d33697baf 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java @@ -51,6 +51,8 @@ import java.util.Set; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; @@ -63,10 +65,13 @@ import org.junit.rules.TemporaryFolder; */ public class TestHoodieClientBase implements Serializable { + protected static Logger logger = LogManager.getLogger(TestHoodieClientBase.class); + protected transient JavaSparkContext jsc = null; protected transient SQLContext sqlContext; protected transient FileSystem fs; protected String basePath = null; + protected TemporaryFolder folder = null; protected transient HoodieTestDataGenerator dataGen = null; @Before @@ -78,10 +83,10 @@ public class TestHoodieClientBase implements Serializable { //SQLContext stuff sqlContext = new SQLContext(jsc); - // Create a temp folder as the base path - TemporaryFolder folder = new TemporaryFolder(); + folder = new TemporaryFolder(); folder.create(); basePath = folder.getRoot().getAbsolutePath(); + fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration()); if (fs instanceof LocalFileSystem) { LocalFileSystem lfs = (LocalFileSystem) fs; @@ -94,6 +99,33 @@ public class TestHoodieClientBase implements Serializable { dataGen = new HoodieTestDataGenerator(); } + @After + /** + * Properly release resources at end of each test + */ + public void tearDown() throws IOException { + if (null != sqlContext) { + logger.info("Clearing sql context cache of spark-session used in previous test-case"); + sqlContext.clearCache(); + } + + if (null != jsc) { + logger.info("Closing spark context used in previous test-case"); + jsc.close(); + } + + // Create a temp folder as the base path + if (null != folder) { + logger.info("Explicitly removing workspace used in previously run test-case"); + folder.delete(); + } + + if (null != fs) { + logger.warn("Closing file-system instance used in previous test-run"); + fs.close(); + } + } + /** * Get Default HoodieWriteConfig for tests * diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java index 09b8795ab..684e9ec20 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java @@ -43,6 +43,7 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.table.HoodieTable; import java.io.FileInputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -64,6 +65,11 @@ import scala.Option; @SuppressWarnings("unchecked") public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { + @Override + public void tearDown() throws IOException { + super.tearDown(); + } + /** * Test Auto Commit behavior for HoodieWriteClient insert API */ diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java index 9f60cc370..8070be795 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertTrue; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.config.HoodieWriteConfig; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -35,6 +36,11 @@ import scala.Option; */ public class TestHoodieReadClient extends TestHoodieClientBase { + @Override + public void tearDown() throws IOException { + super.tearDown(); + } + /** * Test ReadFilter API after writing new records using HoodieWriteClient.insert */ diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java index 6f0369180..76c364115 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java @@ -271,6 +271,7 @@ public class TestHoodieCommitArchiveLog { // verify in-flight instants after archive verifyInflightInstants(metaClient, 3); + reader.close(); } @Test diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java index 427adda65..56271904d 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java @@ -30,6 +30,7 @@ import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.common.util.SpillableMapUtils; import com.uber.hoodie.exception.HoodieIOException; +import java.io.IOException; import java.util.ArrayDeque; import java.util.Arrays; import java.util.Deque; @@ -115,9 +116,10 @@ public abstract class AbstractHoodieLogRecordScanner { * Scan Log files */ public void scan() { + HoodieLogFormatReader logFormatReaderWrapper = null; try { // iterate over the paths - HoodieLogFormatReader logFormatReaderWrapper = + logFormatReaderWrapper = new HoodieLogFormatReader(fs, logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))) .collect(Collectors.toList()), readerSchema, readBlocksLazily, reverseReader, bufferSize); @@ -239,6 +241,15 @@ public abstract class AbstractHoodieLogRecordScanner { } catch (Exception e) { log.error("Got exception when reading log file", e); throw new HoodieIOException("IOException when reading log file "); + } finally { + try { + if (null != logFormatReaderWrapper) { + logFormatReaderWrapper.close(); + } + } catch (IOException ioe) { + // Eat exception as we do not want to mask the original exception that can happen + log.error("Unable to close log format reader", ioe); + } } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java index dd0348b90..836870e6e 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java @@ -62,6 +62,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { private long reverseLogFilePosition; private long lastReverseLogFilePosition; private boolean reverseReader; + private boolean closed = false; HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readBlockLazily, boolean reverseReader) throws IOException { @@ -95,13 +96,13 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { } /** - * Close the inputstream when the JVM exits + * Close the inputstream if not closed when the JVM exits */ private void addShutDownHook() { Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { try { - inputStream.close(); + close(); } catch (Exception e) { log.warn("unable to close input stream for log file " + logFile, e); // fail silently for any sort of exception @@ -277,7 +278,10 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { @Override public void close() throws IOException { - this.inputStream.close(); + if (!closed) { + this.inputStream.close(); + closed = true; + } } @Override diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java index 8b49323e9..b2ddede01 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java @@ -20,6 +20,7 @@ import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; @@ -29,6 +30,8 @@ import org.apache.log4j.Logger; public class HoodieLogFormatReader implements HoodieLogFormat.Reader { private final List logFiles; + // Readers for previously scanned log-files that are still open + private final List prevReadersInOpenState; private HoodieLogFileReader currentReader; private final FileSystem fs; private final Schema readerSchema; @@ -46,6 +49,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { this.readBlocksLazily = readBlocksLazily; this.reverseLogReader = reverseLogReader; this.bufferSize = bufferSize; + this.prevReadersInOpenState = new ArrayList<>(); if (logFiles.size() > 0) { HoodieLogFile nextLogFile = logFiles.remove(0); this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false); @@ -53,7 +57,20 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { } @Override + /** + * Note : In lazy mode, clients must ensure close() should be called only after processing + * all log-blocks as the underlying inputstream will be closed. + * TODO: We can introduce invalidate() API at HoodieLogBlock and this object can call invalidate on + * all returned log-blocks so that we check this scenario specifically in HoodieLogBlock + */ public void close() throws IOException { + + for (HoodieLogFileReader reader : prevReadersInOpenState) { + reader.close(); + } + + prevReadersInOpenState.clear(); + if (currentReader != null) { currentReader.close(); } @@ -69,6 +86,12 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { } else if (logFiles.size() > 0) { try { HoodieLogFile nextLogFile = logFiles.remove(0); + // First close previous reader only if readBlockLazily is true + if (!readBlocksLazily) { + this.currentReader.close(); + } else { + this.prevReadersInOpenState.add(currentReader); + } this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false); } catch (IOException io) { diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java index dcc2c4996..3c57b55f2 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java @@ -304,6 +304,7 @@ public class HoodieLogFormatTest { dataBlockRead.getRecords().size()); assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords, dataBlockRead.getRecords()); + reader.close(); } @SuppressWarnings("unchecked") @@ -370,6 +371,7 @@ public class HoodieLogFormatTest { dataBlockRead.getRecords().size()); assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords3, dataBlockRead.getRecords()); + reader.close(); } @SuppressWarnings("unchecked") @@ -454,6 +456,8 @@ public class HoodieLogFormatTest { //assertEquals("", "something-random", new String(corruptBlock.getCorruptedBytes())); assertFalse("There should be no more block left", reader.hasNext()); + reader.close(); + // Simulate another failure back to back outputStream = fs.append(writer.getLogFile().getPath()); // create a block with @@ -493,6 +497,7 @@ public class HoodieLogFormatTest { assertTrue("We should get the last block next", reader.hasNext()); reader.next(); assertFalse("We should have no more blocks left", reader.hasNext()); + reader.close(); } @@ -1097,7 +1102,7 @@ public class HoodieLogFormatTest { assertEquals(block.getBlockType(), HoodieLogBlockType.AVRO_DATA_BLOCK); dBlock = (HoodieAvroDataBlock) block; assertEquals(dBlock.getRecords().size(), 100); - + reader.close(); } @SuppressWarnings("unchecked") @@ -1167,6 +1172,7 @@ public class HoodieLogFormatTest { dataBlockRead.getRecords()); assertFalse(reader.hasPrev()); + reader.close(); } @Test @@ -1224,6 +1230,7 @@ public class HoodieLogFormatTest { e.printStackTrace(); // We should have corrupted block } + reader.close(); } @SuppressWarnings("unchecked") @@ -1283,5 +1290,6 @@ public class HoodieLogFormatTest { dataBlockRead.getRecords()); assertFalse(reader.hasPrev()); + reader.close(); } } diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java index 22dff95ff..e2bd10b8d 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -294,6 +294,7 @@ public abstract class AbstractRealtimeRecordReader { lastBlock = (HoodieAvroDataBlock) block; } } + reader.close(); if (lastBlock != null) { return lastBlock.getSchema(); } diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/util/SchemaUtil.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/util/SchemaUtil.java index cb60ad2bd..c410273bd 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/util/SchemaUtil.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/util/SchemaUtil.java @@ -447,6 +447,7 @@ public class SchemaUtil { lastBlock = (HoodieAvroDataBlock) block; } } + reader.close(); if (lastBlock != null) { return new parquet.avro.AvroSchemaConverter().convert(lastBlock.getSchema()); }