1
0

[HUDI-1358] Fix leaks in DiskBasedMap and LazyFileIterable (#2249)

This commit is contained in:
Balaji Varadarajan
2020-11-23 10:56:26 -08:00
committed by GitHub
parent 751e4ee882
commit 0ebef1c0a0
4 changed files with 52 additions and 34 deletions

View File

@@ -273,7 +273,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
insertRecordsWritten++;
}
}
keyToNewRecords.clear();
((ExternalSpillableMap) keyToNewRecords).close();
writtenRecordKeys.clear();
if (fileWriter != null) {

View File

@@ -74,6 +74,8 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
private ThreadLocal<BufferedRandomAccessFile> randomAccessFile = new ThreadLocal<>();
private Queue<BufferedRandomAccessFile> openedAccessFiles = new ConcurrentLinkedQueue<>();
private transient Thread shutdownThread = null;
public DiskBasedMap(String baseFilePath) throws IOException {
this.valueMetadataMap = new ConcurrentHashMap<>();
this.writeOnlyFile = new File(baseFilePath, UUID.randomUUID().toString());
@@ -126,33 +128,8 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
* (typically 4 KB) to disk.
*/
private void addShutDownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
if (writeOnlyFileHandle != null) {
writeOnlyFileHandle.flush();
fileOutputStream.getChannel().force(false);
writeOnlyFileHandle.close();
}
while (!openedAccessFiles.isEmpty()) {
BufferedRandomAccessFile file = openedAccessFiles.poll();
if (null != file) {
try {
file.close();
} catch (IOException ioe) {
// skip exception
}
}
}
writeOnlyFile.delete();
} catch (Exception e) {
// delete the file for any sort of exception
writeOnlyFile.delete();
}
}
});
shutdownThread = new Thread(this::cleanup);
Runtime.getRuntime().addShutdownHook(shutdownThread);
}
private void flushToDisk() {
@@ -267,6 +244,39 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
// reducing concurrency). Instead, just clear the pointer map. The file will be removed on exit.
}
public void close() {
cleanup();
if (shutdownThread != null) {
Runtime.getRuntime().removeShutdownHook(shutdownThread);
}
}
private void cleanup() {
valueMetadataMap.clear();
try {
if (writeOnlyFileHandle != null) {
writeOnlyFileHandle.flush();
fileOutputStream.getChannel().force(false);
writeOnlyFileHandle.close();
}
while (!openedAccessFiles.isEmpty()) {
BufferedRandomAccessFile file = openedAccessFiles.poll();
if (null != file) {
try {
file.close();
} catch (IOException ioe) {
// skip exception
}
}
}
writeOnlyFile.delete();
} catch (Exception e) {
// delete the file for any sort of exception
writeOnlyFile.delete();
}
}
@Override
public Set<T> keySet() {
return valueMetadataMap.keySet();

View File

@@ -225,6 +225,10 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
currentInMemoryMapSize = 0L;
}
public void close() {
getDiskBasedMap().close();
}
@Override
public Set<T> keySet() {
Set<T> keySet = new HashSet<T>();

View File

@@ -38,6 +38,8 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
// Stores the key and corresponding value's latest metadata spilled to disk
private final Map<T, DiskBasedMap.ValueMetadata> inMemoryMetadataOfSpilledData;
private transient Thread shutdownThread = null;
public LazyFileIterable(String filePath, Map<T, DiskBasedMap.ValueMetadata> map) {
this.filePath = filePath;
this.inMemoryMetadataOfSpilledData = map;
@@ -103,6 +105,11 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
}
private void close() {
closeHandle();
Runtime.getRuntime().removeShutdownHook(shutdownThread);
}
private void closeHandle() {
if (readOnlyFileHandle != null) {
try {
readOnlyFileHandle.close();
@@ -114,12 +121,8 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
}
private void addShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
close();
}
});
shutdownThread = new Thread(this::closeHandle);
Runtime.getRuntime().addShutdownHook(shutdownThread);
}
}
}