1
0

[HUDI-2090] Ensure Disk Maps create a subfolder with appropriate prefixes and cleans them up on close (#3329)

* Add UUID to the folder name for External Spillable File System

* Fix to ensure that Disk maps folders do not interefere across users

* Fix test

* Fix test

* Rebase with latest mater and address comments

* Add Shutdown Hooks for the Disk Map

Co-authored-by: Rajesh Mahindra <rmahindra@Rajeshs-MacBook-Pro.local>
This commit is contained in:
rmahindra123
2021-08-03 17:51:25 -07:00
committed by GitHub
parent 91bb0d1318
commit b4c14eaa29
6 changed files with 98 additions and 38 deletions

View File

@@ -72,7 +72,7 @@ public class FileSystemViewStorageConfig extends HoodieConfig {
public static final ConfigProperty<String> FILESYSTEM_VIEW_SPILLABLE_DIR = ConfigProperty
.key("hoodie.filesystem.view.spillable.dir")
.defaultValue("/tmp/view_map/")
.defaultValue("/tmp/")
.withDocumentation("Path on local storage to use, when file system view is held in a spillable map.");
public static final ConfigProperty<Long> FILESYSTEM_VIEW_SPILLABLE_MEM = ConfigProperty

View File

@@ -61,13 +61,14 @@ import java.util.zip.InflaterInputStream;
*
* Inspired by https://github.com/basho/bitcask
*/
public final class BitCaskDiskMap<T extends Serializable, R extends Serializable> implements DiskMap<T, R> {
public final class BitCaskDiskMap<T extends Serializable, R extends Serializable> extends DiskMap<T, R> {
public static final int BUFFER_SIZE = 128 * 1024; // 128 KB
private static final Logger LOG = LogManager.getLogger(BitCaskDiskMap.class);
// Caching byte compression/decompression to avoid creating instances for every operation
private static final ThreadLocal<CompressionHandler> DISK_COMPRESSION_REF =
ThreadLocal.withInitial(CompressionHandler::new);
// Stores the key and corresponding value's latest metadata spilled to disk
private final Map<T, ValueMetadata> valueMetadataMap;
// Enables compression for all values stored in the disk map
@@ -87,12 +88,11 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
private final ThreadLocal<BufferedRandomAccessFile> randomAccessFile = new ThreadLocal<>();
private final Queue<BufferedRandomAccessFile> openedAccessFiles = new ConcurrentLinkedQueue<>();
private transient Thread shutdownThread = null;
public BitCaskDiskMap(String baseFilePath, boolean isCompressionEnabled) throws IOException {
super(baseFilePath, ExternalSpillableMap.DiskMapType.BITCASK.name());
this.valueMetadataMap = new ConcurrentHashMap<>();
this.isCompressionEnabled = isCompressionEnabled;
this.writeOnlyFile = new File(baseFilePath, UUID.randomUUID().toString());
this.writeOnlyFile = new File(diskMapPath, UUID.randomUUID().toString());
this.filePath = writeOnlyFile.getPath();
initFile(writeOnlyFile);
this.fileOutputStream = new FileOutputStream(writeOnlyFile, true);
@@ -138,16 +138,6 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
+ ")");
// Make sure file is deleted when JVM exits
writeOnlyFile.deleteOnExit();
addShutDownHook();
}
/**
* Register shutdown hook to force flush contents of the data written to FileOutputStream from OS page cache
* (typically 4 KB) to disk.
*/
private void addShutDownHook() {
shutdownThread = new Thread(this::cleanup);
Runtime.getRuntime().addShutdownHook(shutdownThread);
}
private void flushToDisk() {
@@ -267,14 +257,8 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
// reducing concurrency). Instead, just clear the pointer map. The file will be removed on exit.
}
@Override
public void close() {
cleanup();
if (shutdownThread != null) {
Runtime.getRuntime().removeShutdownHook(shutdownThread);
}
}
private void cleanup() {
valueMetadataMap.clear();
try {
if (writeOnlyFileHandle != null) {
@@ -297,6 +281,8 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
} catch (Exception e) {
// delete the file for any sort of exception
writeOnlyFile.delete();
} finally {
super.close();
}
}

View File

@@ -18,8 +18,16 @@
package org.apache.hudi.common.util.collection;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;
/**
@@ -29,21 +37,72 @@ import java.util.stream.Stream;
* @param <T> The generic type of the keys
* @param <R> The generic type of the values
*/
public interface DiskMap<T extends Serializable, R extends Serializable> extends Map<T, R>, Iterable<R> {
public abstract class DiskMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Iterable<R> {
private static final Logger LOG = LogManager.getLogger(DiskMap.class);
private static final String SUBFOLDER_PREFIX = "hudi";
private final File diskMapPathFile;
private transient Thread shutdownThread = null;
// Base path for the write file
protected final String diskMapPath;
public DiskMap(String basePath, String prefix) throws IOException {
this.diskMapPath =
String.format("%s/%s-%s-%s", basePath, SUBFOLDER_PREFIX, prefix, UUID.randomUUID().toString());
diskMapPathFile = new File(diskMapPath);
FileIOUtils.deleteDirectory(diskMapPathFile);
FileIOUtils.mkdir(diskMapPathFile);
// Make sure the folder is deleted when JVM exits
diskMapPathFile.deleteOnExit();
addShutDownHook();
}
/**
* Register shutdown hook to force flush contents of the data written to FileOutputStream from OS page cache
* (typically 4 KB) to disk.
*/
private void addShutDownHook() {
shutdownThread = new Thread(this::cleanup);
Runtime.getRuntime().addShutdownHook(shutdownThread);
}
/**
* @returns a stream of the values stored in the disk.
*/
Stream<R> valueStream();
abstract Stream<R> valueStream();
/**
* Number of bytes spilled to disk.
*/
long sizeOfFileOnDiskInBytes();
abstract long sizeOfFileOnDiskInBytes();
/**
* Cleanup.
* Close and cleanup the Map.
*/
void close();
public void close() {
cleanup(false);
}
/**
* Cleanup all resources, files and folders
* triggered by shutdownhook.
*/
private void cleanup() {
cleanup(true);
}
/**
* Cleanup all resources, files and folders.
*/
private void cleanup(boolean isTriggeredFromShutdownHook) {
try {
FileIOUtils.deleteDirectory(diskMapPathFile);
} catch (IOException exception) {
LOG.warn("Error while deleting the disk map directory=" + diskMapPath, exception);
}
if (!isTriggeredFromShutdownHook && shutdownThread != null) {
Runtime.getRuntime().removeShutdownHook(shutdownThread);
}
}
}

View File

@@ -40,23 +40,22 @@ import java.util.stream.StreamSupport;
* This class provides a disk spillable only map implementation.
* All of the data is stored using the RocksDB implementation.
*/
public final class RocksDbDiskMap<T extends Serializable, R extends Serializable> implements DiskMap<T, R> {
public final class RocksDbDiskMap<T extends Serializable, R extends Serializable> extends DiskMap<T, R> {
// ColumnFamily allows partitioning data within RockDB, which allows
// independent configuration and faster deletes across partitions
// https://github.com/facebook/rocksdb/wiki/Column-Families
// For this use case, we use a single static column family/ partition
//
private static final String COLUMN_FAMILY_NAME = "spill_map";
private static final String ROCKSDB_COL_FAMILY = "rocksdb-diskmap";
private static final Logger LOG = LogManager.getLogger(RocksDbDiskMap.class);
// Stores the key and corresponding value's latest metadata spilled to disk
private final Set<T> keySet;
private final String rocksDbStoragePath;
private RocksDBDAO rocksDb;
public RocksDbDiskMap(String rocksDbStoragePath) throws IOException {
super(rocksDbStoragePath, ExternalSpillableMap.DiskMapType.ROCKS_DB.name());
this.keySet = new HashSet<>();
this.rocksDbStoragePath = rocksDbStoragePath;
}
@Override
@@ -84,12 +83,12 @@ public final class RocksDbDiskMap<T extends Serializable, R extends Serializable
if (!containsKey(key)) {
return null;
}
return getRocksDb().get(COLUMN_FAMILY_NAME, (T) key);
return getRocksDb().get(ROCKSDB_COL_FAMILY, (T) key);
}
@Override
public R put(T key, R value) {
getRocksDb().put(COLUMN_FAMILY_NAME, key, value);
getRocksDb().put(ROCKSDB_COL_FAMILY, key, value);
keySet.add(key);
return value;
}
@@ -99,14 +98,14 @@ public final class RocksDbDiskMap<T extends Serializable, R extends Serializable
R value = get(key);
if (value != null) {
keySet.remove((T) key);
getRocksDb().delete(COLUMN_FAMILY_NAME, (T) key);
getRocksDb().delete(ROCKSDB_COL_FAMILY, (T) key);
}
return value;
}
@Override
public void putAll(Map<? extends T, ? extends R> keyValues) {
getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, COLUMN_FAMILY_NAME, key, value)));
getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, ROCKSDB_COL_FAMILY, key, value)));
keySet.addAll(keyValues.keySet());
}
@@ -139,7 +138,7 @@ public final class RocksDbDiskMap<T extends Serializable, R extends Serializable
*/
@Override
public Iterator<R> iterator() {
return getRocksDb().iterator(COLUMN_FAMILY_NAME);
return getRocksDb().iterator(ROCKSDB_COL_FAMILY);
}
@Override
@@ -159,14 +158,15 @@ public final class RocksDbDiskMap<T extends Serializable, R extends Serializable
rocksDb.close();
}
rocksDb = null;
super.close();
}
private RocksDBDAO getRocksDb() {
if (null == rocksDb) {
synchronized (this) {
if (null == rocksDb) {
rocksDb = new RocksDBDAO(COLUMN_FAMILY_NAME, rocksDbStoragePath);
rocksDb.addColumnFamily(COLUMN_FAMILY_NAME);
rocksDb = new RocksDBDAO(ROCKSDB_COL_FAMILY, diskMapPath);
rocksDb.addColumnFamily(ROCKSDB_COL_FAMILY);
}
}
}

View File

@@ -41,6 +41,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URISyntaxException;
@@ -50,6 +51,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -89,6 +91,8 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
Option<IndexedRecord> value = payload.getInsertValue(HoodieAvroUtils.addMetadataFields(getSimpleSchema()));
assertEquals(originalRecord, value.get());
}
verifyCleanup(records);
}
@ParameterizedTest
@@ -111,6 +115,8 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
oRecords.add(rec);
assert recordKeys.contains(rec.getRecordKey());
}
verifyCleanup(records);
}
@ParameterizedTest
@@ -154,6 +160,7 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
throw new UncheckedIOException(io);
}
}
verifyCleanup(records);
}
@Test
@@ -236,4 +243,11 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
System.out.println("Time taken :" + timeTaken);
assertTrue(timeTaken < 100);
}
private void verifyCleanup(BitCaskDiskMap<String, HoodieRecord> records) {
File basePathDir = new File(basePath);
assert Objects.requireNonNull(basePathDir.list()).length > 0;
records.close();
assertEquals(Objects.requireNonNull(basePathDir.list()).length, 0);
}
}

View File

@@ -342,6 +342,7 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
public void testLargeInsertUpsert() {}
private static Stream<Arguments> testArguments() {
// Arguments : 1. Disk Map Type 2. isCompressionEnabled for BitCaskMap
return Stream.of(
arguments(ExternalSpillableMap.DiskMapType.BITCASK, false),
arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false),