From e057a10499729301ebe96d7cd54902b113af1811 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Tue, 9 Nov 2021 15:40:00 +0800 Subject: [PATCH] [HUDI-2715] The BitCaskDiskMap iterator may cause memory leak (#3951) --- .../table/action/compact/HoodieCompactor.java | 3 +- .../hudi/common/util/ClosableIterator.java | 31 +++++++++++++++++++ .../util/collection/BitCaskDiskMap.java | 10 +++++- .../util/collection/LazyFileIterable.java | 7 +++-- .../format/mor/MergeOnReadInputFormat.java | 7 +---- 5 files changed, 47 insertions(+), 11 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/ClosableIterator.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index ad05876d7..419f88eef 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -181,6 +181,7 @@ public abstract class HoodieCompactor im .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) .build(); if (!scanner.iterator().hasNext()) { + scanner.close(); return new ArrayList<>(); } @@ -198,6 +199,7 @@ public abstract class HoodieCompactor im result = compactionHandler.handleInsert(instantTime, operation.getPartitionPath(), operation.getFileId(), scanner.getRecords()); } + scanner.close(); Iterable> resultIterable = () -> result; return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> { s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog()); @@ -212,7 +214,6 @@ public abstract class HoodieCompactor im RuntimeStats runtimeStats = new RuntimeStats(); runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks()); s.getStat().setRuntimeStats(runtimeStats); - scanner.close(); }).collect(toList()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClosableIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClosableIterator.java new file mode 100644 index 000000000..9e1d0c2b2 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClosableIterator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import java.util.Iterator; + +/** + * An iterator that give a chance to release resources. + * + * @param The return type + */ +public interface ClosableIterator extends Iterator, AutoCloseable { + @Override + void close(); // override to not throw exception +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java index 5f78fa3a8..289901df8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java @@ -20,6 +20,7 @@ package org.apache.hudi.common.util.collection; import org.apache.hudi.common.fs.SizeAwareDataOutputStream; import org.apache.hudi.common.util.BufferedRandomAccessFile; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.SerializationUtils; import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.exception.HoodieException; @@ -38,9 +39,11 @@ import java.io.InputStream; import java.io.RandomAccessFile; import java.io.Serializable; import java.util.AbstractMap; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; @@ -87,6 +90,8 @@ public final class BitCaskDiskMap randomAccessFile = new ThreadLocal<>(); private final Queue openedAccessFiles = new ConcurrentLinkedQueue<>(); + private final List> iterators = new ArrayList<>(); + public BitCaskDiskMap(String baseFilePath, boolean isCompressionEnabled) throws IOException { super(baseFilePath, ExternalSpillableMap.DiskMapType.BITCASK.name()); this.valueMetadataMap = new ConcurrentHashMap<>(); @@ -150,7 +155,9 @@ public final class BitCaskDiskMap iterator() { - return new LazyFileIterable(filePath, valueMetadataMap, isCompressionEnabled).iterator(); + ClosableIterator iterator = new LazyFileIterable(filePath, valueMetadataMap, isCompressionEnabled).iterator(); + this.iterators.add(iterator); + return iterator; } /** @@ -275,6 +282,7 @@ public final class BitCaskDiskMap implements Iterable { } @Override - public Iterator iterator() { + public ClosableIterator iterator() { try { return new LazyFileIterator<>(filePath, inMemoryMetadataOfSpilledData); } catch (IOException io) { @@ -64,7 +65,7 @@ public class LazyFileIterable implements Iterable { /** * Iterator implementation for the iterable defined above. */ - public class LazyFileIterator implements Iterator { + public class LazyFileIterator implements ClosableIterator { private final String filePath; private BufferedRandomAccessFile readOnlyFileHandle; @@ -111,7 +112,7 @@ public class LazyFileIterable implements Iterable { action.accept(next()); } - private void close() { + public void close() { closeHandle(); Runtime.getRuntime().removeShutdownHook(shutdownThread); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 566d4d318..2bf5bd58e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.log.InstantRange; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; @@ -447,12 +448,6 @@ public class MergeOnReadInputFormat // ------------------------------------------------------------------------- // Inner Class // ------------------------------------------------------------------------- - - private interface ClosableIterator extends Iterator, AutoCloseable { - @Override - void close(); // override to not throw exception - } - private interface RecordIterator { boolean reachedEnd() throws IOException;