1
0

[HUDI-2876] for hive/presto hudi should remove the temp file which created by HoodieMergedLogRecordSanner when the query finished. (#4139)

This commit is contained in:
xiarixiaoyao
2021-12-06 21:33:10 +08:00
committed by GitHub
parent 84b531ae75
commit 57c4bf8152
2 changed files with 17 additions and 0 deletions

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
@@ -189,6 +190,9 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
@Override
public void close() throws IOException {
parquetReader.close();
// need clean the tmp file which created by logScanner
// Otherwise, for resident process such as presto, the /tmp directory will overflow
((ExternalSpillableMap) deltaRecordMap).close();
}
@Override

View File

@@ -82,6 +82,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -235,6 +236,7 @@ public class TestHoodieRealtimeRecordReader {
jobConf.setBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), isCompressionEnabled);
// validate record reader compaction
long logTmpFileStartTime = System.currentTimeMillis();
HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);
// use reader to read base Parquet File and log file, merge in flight and return latest commit
@@ -255,6 +257,8 @@ public class TestHoodieRealtimeRecordReader {
assertEquals(1.0, recordReader.getProgress(), 0.05);
assertEquals(120, recordCnt);
recordReader.close();
// the temp file produced by logScanner should be deleted
assertTrue(!getLogTempFile(logTmpFileStartTime, System.currentTimeMillis(), diskMapType.toString()).exists());
} catch (Exception ioe) {
throw new HoodieException(ioe.getMessage(), ioe);
}
@@ -264,6 +268,13 @@ public class TestHoodieRealtimeRecordReader {
}
private File getLogTempFile(long startTime, long endTime, String diskType) {
return Arrays.stream(new File("/tmp").listFiles())
.filter(f -> f.isDirectory() && f.getName().startsWith("hudi-" + diskType) && f.lastModified() > startTime && f.lastModified() < endTime)
.findFirst()
.orElse(new File(""));
}
@Test
public void testUnMergedReader() throws Exception {
// initial commit
@@ -473,6 +484,7 @@ public class TestHoodieRealtimeRecordReader {
assertEquals("stringArray" + i + recordCommitTimeSuffix, arrayValues[i].toString(),
"test value for field: stringArray");
}
reader.close();
}
}
@@ -552,6 +564,7 @@ public class TestHoodieRealtimeRecordReader {
while (recordReader.next(key, value)) {
// keep reading
}
reader.close();
}
private static Stream<Arguments> testArguments() {