diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index bad5e1982..58898e722 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; @@ -189,6 +190,9 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader @Override public void close() throws IOException { parquetReader.close(); + // need clean the tmp file which created by logScanner + // Otherwise, for resident process such as presto, the /tmp directory will overflow + ((ExternalSpillableMap) deltaRecordMap).close(); } @Override diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index 9f6e77bd1..ede76dc34 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -82,6 +82,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -235,6 +236,7 @@ public class TestHoodieRealtimeRecordReader { jobConf.setBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), isCompressionEnabled); // validate record reader compaction + long logTmpFileStartTime = System.currentTimeMillis(); HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); // use reader to read base Parquet File and log file, merge in flight and return latest commit @@ -255,6 +257,8 @@ public class TestHoodieRealtimeRecordReader { assertEquals(1.0, recordReader.getProgress(), 0.05); assertEquals(120, recordCnt); recordReader.close(); + // the temp file produced by logScanner should be deleted + assertTrue(!getLogTempFile(logTmpFileStartTime, System.currentTimeMillis(), diskMapType.toString()).exists()); } catch (Exception ioe) { throw new HoodieException(ioe.getMessage(), ioe); } @@ -264,6 +268,13 @@ public class TestHoodieRealtimeRecordReader { } + private File getLogTempFile(long startTime, long endTime, String diskType) { + return Arrays.stream(new File("/tmp").listFiles()) + .filter(f -> f.isDirectory() && f.getName().startsWith("hudi-" + diskType) && f.lastModified() > startTime && f.lastModified() < endTime) + .findFirst() + .orElse(new File("")); + } + @Test public void testUnMergedReader() throws Exception { // initial commit @@ -473,6 +484,7 @@ public class TestHoodieRealtimeRecordReader { assertEquals("stringArray" + i + recordCommitTimeSuffix, arrayValues[i].toString(), "test value for field: stringArray"); } + reader.close(); } } @@ -552,6 +564,7 @@ public class TestHoodieRealtimeRecordReader { while (recordReader.next(key, value)) { // keep reading } + reader.close(); } private static Stream testArguments() {