1
0

Explicitly release resources in LogFileReader and TestHoodieClientBase

This commit is contained in:
Balaji Varadarajan
2018-09-19 13:13:04 -07:00
committed by vinoth chandar
parent 2728f96505
commit 5cb28e7b1f
15 changed files with 119 additions and 7 deletions

View File

@@ -77,6 +77,7 @@ public class ArchivedCommitsCommand implements CommandMarker {
List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r).map(r -> readCommit(r)) List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r).map(r -> readCommit(r))
.collect(Collectors.toList()); .collect(Collectors.toList());
allCommits.addAll(readCommits); allCommits.addAll(readCommits);
reader.close();
} }
TableHeader header = new TableHeader().addTableHeaderField("CommitTime") TableHeader header = new TableHeader().addTableHeaderField("CommitTime")

View File

@@ -131,6 +131,7 @@ public class HoodieLogFileCommand implements CommandMarker {
totalEntries++; totalEntries++;
} }
} }
reader.close();
} }
List<Comparable[]> rows = new ArrayList<>(); List<Comparable[]> rows = new ArrayList<>();
int i = 0; int i = 0;
@@ -221,6 +222,7 @@ public class HoodieLogFileCommand implements CommandMarker {
} }
} }
} }
reader.close();
if (allRecords.size() >= limit) { if (allRecords.size() >= limit) {
break; break;
} }

View File

@@ -73,6 +73,11 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()); .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build());
} }
@Override
public void tearDown() throws IOException {
super.tearDown();
}
@Test @Test
public void testRollbackInflightIngestionWithPendingCompaction() throws Exception { public void testRollbackInflightIngestionWithPendingCompaction() throws Exception {
// Rollback inflight ingestion when there is pending compaction // Rollback inflight ingestion when there is pending compaction

View File

@@ -85,6 +85,11 @@ public class TestCleaner extends TestHoodieClientBase {
private static final int BIG_BATCH_INSERT_SIZE = 500; private static final int BIG_BATCH_INSERT_SIZE = 500;
private static Logger logger = LogManager.getLogger(TestHoodieClientBase.class); private static Logger logger = LogManager.getLogger(TestHoodieClientBase.class);
@Override
public void tearDown() throws IOException {
super.tearDown();
}
/** /**
* Helper method to do first batch of insert for clean by versions/commits tests * Helper method to do first batch of insert for clean by versions/commits tests
* *

View File

@@ -37,6 +37,7 @@ import com.uber.hoodie.exception.HoodieRollbackException;
import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable; import com.uber.hoodie.table.HoodieTable;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
@@ -47,6 +48,11 @@ import org.junit.Test;
*/ */
public class TestClientRollback extends TestHoodieClientBase { public class TestClientRollback extends TestHoodieClientBase {
@Override
public void tearDown() throws IOException {
super.tearDown();
}
/** /**
* Test case for rollback-savepoint interaction * Test case for rollback-savepoint interaction
*/ */

View File

@@ -51,6 +51,8 @@ import java.util.Set;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SQLContext;
@@ -63,10 +65,13 @@ import org.junit.rules.TemporaryFolder;
*/ */
public class TestHoodieClientBase implements Serializable { public class TestHoodieClientBase implements Serializable {
protected static Logger logger = LogManager.getLogger(TestHoodieClientBase.class);
protected transient JavaSparkContext jsc = null; protected transient JavaSparkContext jsc = null;
protected transient SQLContext sqlContext; protected transient SQLContext sqlContext;
protected transient FileSystem fs; protected transient FileSystem fs;
protected String basePath = null; protected String basePath = null;
protected TemporaryFolder folder = null;
protected transient HoodieTestDataGenerator dataGen = null; protected transient HoodieTestDataGenerator dataGen = null;
@Before @Before
@@ -78,10 +83,10 @@ public class TestHoodieClientBase implements Serializable {
//SQLContext stuff //SQLContext stuff
sqlContext = new SQLContext(jsc); sqlContext = new SQLContext(jsc);
// Create a temp folder as the base path folder = new TemporaryFolder();
TemporaryFolder folder = new TemporaryFolder();
folder.create(); folder.create();
basePath = folder.getRoot().getAbsolutePath(); basePath = folder.getRoot().getAbsolutePath();
fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration()); fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
if (fs instanceof LocalFileSystem) { if (fs instanceof LocalFileSystem) {
LocalFileSystem lfs = (LocalFileSystem) fs; LocalFileSystem lfs = (LocalFileSystem) fs;
@@ -94,6 +99,33 @@ public class TestHoodieClientBase implements Serializable {
dataGen = new HoodieTestDataGenerator(); dataGen = new HoodieTestDataGenerator();
} }
@After
/**
* Properly release resources at end of each test
*/
public void tearDown() throws IOException {
if (null != sqlContext) {
logger.info("Clearing sql context cache of spark-session used in previous test-case");
sqlContext.clearCache();
}
if (null != jsc) {
logger.info("Closing spark context used in previous test-case");
jsc.close();
}
// Create a temp folder as the base path
if (null != folder) {
logger.info("Explicitly removing workspace used in previously run test-case");
folder.delete();
}
if (null != fs) {
logger.warn("Closing file-system instance used in previous test-run");
fs.close();
}
}
/** /**
* Get Default HoodieWriteConfig for tests * Get Default HoodieWriteConfig for tests
* *

View File

@@ -43,6 +43,7 @@ import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable; import com.uber.hoodie.table.HoodieTable;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@@ -64,6 +65,11 @@ import scala.Option;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
@Override
public void tearDown() throws IOException {
super.tearDown();
}
/** /**
* Test Auto Commit behavior for HoodieWriteClient insert API * Test Auto Commit behavior for HoodieWriteClient insert API
*/ */

View File

@@ -20,6 +20,7 @@ import static org.junit.Assert.assertTrue;
import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.config.HoodieWriteConfig;
import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@@ -35,6 +36,11 @@ import scala.Option;
*/ */
public class TestHoodieReadClient extends TestHoodieClientBase { public class TestHoodieReadClient extends TestHoodieClientBase {
@Override
public void tearDown() throws IOException {
super.tearDown();
}
/** /**
* Test ReadFilter API after writing new records using HoodieWriteClient.insert * Test ReadFilter API after writing new records using HoodieWriteClient.insert
*/ */

View File

@@ -271,6 +271,7 @@ public class TestHoodieCommitArchiveLog {
// verify in-flight instants after archive // verify in-flight instants after archive
verifyInflightInstants(metaClient, 3); verifyInflightInstants(metaClient, 3);
reader.close();
} }
@Test @Test

View File

@@ -30,6 +30,7 @@ import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.util.SpillableMapUtils; import com.uber.hoodie.common.util.SpillableMapUtils;
import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Arrays; import java.util.Arrays;
import java.util.Deque; import java.util.Deque;
@@ -115,9 +116,10 @@ public abstract class AbstractHoodieLogRecordScanner {
* Scan Log files * Scan Log files
*/ */
public void scan() { public void scan() {
HoodieLogFormatReader logFormatReaderWrapper = null;
try { try {
// iterate over the paths // iterate over the paths
HoodieLogFormatReader logFormatReaderWrapper = logFormatReaderWrapper =
new HoodieLogFormatReader(fs, new HoodieLogFormatReader(fs,
logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))) logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile)))
.collect(Collectors.toList()), readerSchema, readBlocksLazily, reverseReader, bufferSize); .collect(Collectors.toList()), readerSchema, readBlocksLazily, reverseReader, bufferSize);
@@ -239,6 +241,15 @@ public abstract class AbstractHoodieLogRecordScanner {
} catch (Exception e) { } catch (Exception e) {
log.error("Got exception when reading log file", e); log.error("Got exception when reading log file", e);
throw new HoodieIOException("IOException when reading log file "); throw new HoodieIOException("IOException when reading log file ");
} finally {
try {
if (null != logFormatReaderWrapper) {
logFormatReaderWrapper.close();
}
} catch (IOException ioe) {
// Eat exception as we do not want to mask the original exception that can happen
log.error("Unable to close log format reader", ioe);
}
} }
} }

View File

@@ -62,6 +62,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
private long reverseLogFilePosition; private long reverseLogFilePosition;
private long lastReverseLogFilePosition; private long lastReverseLogFilePosition;
private boolean reverseReader; private boolean reverseReader;
private boolean closed = false;
HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader) throws IOException { boolean readBlockLazily, boolean reverseReader) throws IOException {
@@ -95,13 +96,13 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
} }
/** /**
* Close the inputstream when the JVM exits * Close the inputstream if not closed when the JVM exits
*/ */
private void addShutDownHook() { private void addShutDownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() { Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() { public void run() {
try { try {
inputStream.close(); close();
} catch (Exception e) { } catch (Exception e) {
log.warn("unable to close input stream for log file " + logFile, e); log.warn("unable to close input stream for log file " + logFile, e);
// fail silently for any sort of exception // fail silently for any sort of exception
@@ -277,7 +278,10 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
this.inputStream.close(); if (!closed) {
this.inputStream.close();
closed = true;
}
} }
@Override @Override

View File

@@ -20,6 +20,7 @@ import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@@ -29,6 +30,8 @@ import org.apache.log4j.Logger;
public class HoodieLogFormatReader implements HoodieLogFormat.Reader { public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
private final List<HoodieLogFile> logFiles; private final List<HoodieLogFile> logFiles;
// Readers for previously scanned log-files that are still open
private final List<HoodieLogFileReader> prevReadersInOpenState;
private HoodieLogFileReader currentReader; private HoodieLogFileReader currentReader;
private final FileSystem fs; private final FileSystem fs;
private final Schema readerSchema; private final Schema readerSchema;
@@ -46,6 +49,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
this.readBlocksLazily = readBlocksLazily; this.readBlocksLazily = readBlocksLazily;
this.reverseLogReader = reverseLogReader; this.reverseLogReader = reverseLogReader;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.prevReadersInOpenState = new ArrayList<>();
if (logFiles.size() > 0) { if (logFiles.size() > 0) {
HoodieLogFile nextLogFile = logFiles.remove(0); HoodieLogFile nextLogFile = logFiles.remove(0);
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false); this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false);
@@ -53,7 +57,20 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
} }
@Override @Override
/**
* Note : In lazy mode, clients must ensure close() should be called only after processing
* all log-blocks as the underlying inputstream will be closed.
* TODO: We can introduce invalidate() API at HoodieLogBlock and this object can call invalidate on
* all returned log-blocks so that we check this scenario specifically in HoodieLogBlock
*/
public void close() throws IOException { public void close() throws IOException {
for (HoodieLogFileReader reader : prevReadersInOpenState) {
reader.close();
}
prevReadersInOpenState.clear();
if (currentReader != null) { if (currentReader != null) {
currentReader.close(); currentReader.close();
} }
@@ -69,6 +86,12 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
} else if (logFiles.size() > 0) { } else if (logFiles.size() > 0) {
try { try {
HoodieLogFile nextLogFile = logFiles.remove(0); HoodieLogFile nextLogFile = logFiles.remove(0);
// First close previous reader only if readBlockLazily is true
if (!readBlocksLazily) {
this.currentReader.close();
} else {
this.prevReadersInOpenState.add(currentReader);
}
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily,
false); false);
} catch (IOException io) { } catch (IOException io) {

View File

@@ -304,6 +304,7 @@ public class HoodieLogFormatTest {
dataBlockRead.getRecords().size()); dataBlockRead.getRecords().size());
assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords, assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords,
dataBlockRead.getRecords()); dataBlockRead.getRecords());
reader.close();
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@@ -370,6 +371,7 @@ public class HoodieLogFormatTest {
dataBlockRead.getRecords().size()); dataBlockRead.getRecords().size());
assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords3, assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords3,
dataBlockRead.getRecords()); dataBlockRead.getRecords());
reader.close();
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@@ -454,6 +456,8 @@ public class HoodieLogFormatTest {
//assertEquals("", "something-random", new String(corruptBlock.getCorruptedBytes())); //assertEquals("", "something-random", new String(corruptBlock.getCorruptedBytes()));
assertFalse("There should be no more block left", reader.hasNext()); assertFalse("There should be no more block left", reader.hasNext());
reader.close();
// Simulate another failure back to back // Simulate another failure back to back
outputStream = fs.append(writer.getLogFile().getPath()); outputStream = fs.append(writer.getLogFile().getPath());
// create a block with // create a block with
@@ -493,6 +497,7 @@ public class HoodieLogFormatTest {
assertTrue("We should get the last block next", reader.hasNext()); assertTrue("We should get the last block next", reader.hasNext());
reader.next(); reader.next();
assertFalse("We should have no more blocks left", reader.hasNext()); assertFalse("We should have no more blocks left", reader.hasNext());
reader.close();
} }
@@ -1097,7 +1102,7 @@ public class HoodieLogFormatTest {
assertEquals(block.getBlockType(), HoodieLogBlockType.AVRO_DATA_BLOCK); assertEquals(block.getBlockType(), HoodieLogBlockType.AVRO_DATA_BLOCK);
dBlock = (HoodieAvroDataBlock) block; dBlock = (HoodieAvroDataBlock) block;
assertEquals(dBlock.getRecords().size(), 100); assertEquals(dBlock.getRecords().size(), 100);
reader.close();
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@@ -1167,6 +1172,7 @@ public class HoodieLogFormatTest {
dataBlockRead.getRecords()); dataBlockRead.getRecords());
assertFalse(reader.hasPrev()); assertFalse(reader.hasPrev());
reader.close();
} }
@Test @Test
@@ -1224,6 +1230,7 @@ public class HoodieLogFormatTest {
e.printStackTrace(); e.printStackTrace();
// We should have corrupted block // We should have corrupted block
} }
reader.close();
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@@ -1283,5 +1290,6 @@ public class HoodieLogFormatTest {
dataBlockRead.getRecords()); dataBlockRead.getRecords());
assertFalse(reader.hasPrev()); assertFalse(reader.hasPrev());
reader.close();
} }
} }

View File

@@ -294,6 +294,7 @@ public abstract class AbstractRealtimeRecordReader {
lastBlock = (HoodieAvroDataBlock) block; lastBlock = (HoodieAvroDataBlock) block;
} }
} }
reader.close();
if (lastBlock != null) { if (lastBlock != null) {
return lastBlock.getSchema(); return lastBlock.getSchema();
} }

View File

@@ -447,6 +447,7 @@ public class SchemaUtil {
lastBlock = (HoodieAvroDataBlock) block; lastBlock = (HoodieAvroDataBlock) block;
} }
} }
reader.close();
if (lastBlock != null) { if (lastBlock != null) {
return new parquet.avro.AvroSchemaConverter().convert(lastBlock.getSchema()); return new parquet.avro.AvroSchemaConverter().convert(lastBlock.getSchema());
} }