1
0

[HUDI-2715] The BitCaskDiskMap iterator may cause memory leak (#3951)

This commit is contained in:
Danny Chan
2021-11-09 15:40:00 +08:00
committed by GitHub
parent 6d109c6de5
commit e057a10499
5 changed files with 47 additions and 11 deletions

View File

@@ -181,6 +181,7 @@ public abstract class HoodieCompactor<T extends HoodieRecordPayload, I, K, O> im
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build();
if (!scanner.iterator().hasNext()) {
scanner.close();
return new ArrayList<>();
}
@@ -198,6 +199,7 @@ public abstract class HoodieCompactor<T extends HoodieRecordPayload, I, K, O> im
result = compactionHandler.handleInsert(instantTime, operation.getPartitionPath(), operation.getFileId(),
scanner.getRecords());
}
scanner.close();
Iterable<List<WriteStatus>> resultIterable = () -> result;
return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> {
s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
@@ -212,7 +214,6 @@ public abstract class HoodieCompactor<T extends HoodieRecordPayload, I, K, O> im
RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
s.getStat().setRuntimeStats(runtimeStats);
scanner.close();
}).collect(toList());
}

View File

@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.util;
import java.util.Iterator;
/**
* An iterator that give a chance to release resources.
*
* @param <R> The return type
*/
public interface ClosableIterator<R> extends Iterator<R>, AutoCloseable {
@Override
void close(); // override to not throw exception
}

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.common.util.collection;
import org.apache.hudi.common.fs.SizeAwareDataOutputStream;
import org.apache.hudi.common.util.BufferedRandomAccessFile;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.SerializationUtils;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.exception.HoodieException;
@@ -38,9 +39,11 @@ import java.io.InputStream;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
@@ -87,6 +90,8 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
private final ThreadLocal<BufferedRandomAccessFile> randomAccessFile = new ThreadLocal<>();
private final Queue<BufferedRandomAccessFile> openedAccessFiles = new ConcurrentLinkedQueue<>();
private final List<ClosableIterator<R>> iterators = new ArrayList<>();
public BitCaskDiskMap(String baseFilePath, boolean isCompressionEnabled) throws IOException {
super(baseFilePath, ExternalSpillableMap.DiskMapType.BITCASK.name());
this.valueMetadataMap = new ConcurrentHashMap<>();
@@ -150,7 +155,9 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
*/
@Override
public Iterator<R> iterator() {
return new LazyFileIterable(filePath, valueMetadataMap, isCompressionEnabled).iterator();
ClosableIterator<R> iterator = new LazyFileIterable(filePath, valueMetadataMap, isCompressionEnabled).iterator();
this.iterators.add(iterator);
return iterator;
}
/**
@@ -275,6 +282,7 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
}
}
writeOnlyFile.delete();
this.iterators.forEach(ClosableIterator::close);
} catch (Exception e) {
// delete the file for any sort of exception
writeOnlyFile.delete();

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.common.util.collection;
import org.apache.hudi.common.util.BufferedRandomAccessFile;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.exception.HoodieException;
import java.io.IOException;
@@ -53,7 +54,7 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
}
@Override
public Iterator<R> iterator() {
public ClosableIterator<R> iterator() {
try {
return new LazyFileIterator<>(filePath, inMemoryMetadataOfSpilledData);
} catch (IOException io) {
@@ -64,7 +65,7 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
/**
* Iterator implementation for the iterable defined above.
*/
public class LazyFileIterator<T, R> implements Iterator<R> {
public class LazyFileIterator<T, R> implements ClosableIterator<R> {
private final String filePath;
private BufferedRandomAccessFile readOnlyFileHandle;
@@ -111,7 +112,7 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
action.accept(next());
}
private void close() {
public void close() {
closeHandle();
Runtime.getRuntime().removeShutdownHook(shutdownThread);
}

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
@@ -447,12 +448,6 @@ public class MergeOnReadInputFormat
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
private interface ClosableIterator<E> extends Iterator<E>, AutoCloseable {
@Override
void close(); // override to not throw exception
}
private interface RecordIterator {
boolean reachedEnd() throws IOException;