From b4c14eaa29ed4f3b0f8f44dbfdd75ee68702645b Mon Sep 17 00:00:00 2001 From: rmahindra123 <76502047+rmahindra123@users.noreply.github.com> Date: Tue, 3 Aug 2021 17:51:25 -0700 Subject: [PATCH] [HUDI-2090] Ensure Disk Maps create a subfolder with appropriate prefixes and cleans them up on close (#3329) * Add UUID to the folder name for External Spillable File System * Fix to ensure that Disk maps folders do not interefere across users * Fix test * Fix test * Rebase with latest mater and address comments * Add Shutdown Hooks for the Disk Map Co-authored-by: Rajesh Mahindra --- .../view/FileSystemViewStorageConfig.java | 2 +- .../util/collection/BitCaskDiskMap.java | 28 ++------ .../hudi/common/util/collection/DiskMap.java | 69 +++++++++++++++++-- .../util/collection/RocksDbDiskMap.java | 22 +++--- .../util/collection/TestBitCaskDiskMap.java | 14 ++++ .../collection/TestExternalSpillableMap.java | 1 + 6 files changed, 98 insertions(+), 38 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java index 80d0d5788..603f6bc0c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java @@ -72,7 +72,7 @@ public class FileSystemViewStorageConfig extends HoodieConfig { public static final ConfigProperty FILESYSTEM_VIEW_SPILLABLE_DIR = ConfigProperty .key("hoodie.filesystem.view.spillable.dir") - .defaultValue("/tmp/view_map/") + .defaultValue("/tmp/") .withDocumentation("Path on local storage to use, when file system view is held in a spillable map."); public static final ConfigProperty FILESYSTEM_VIEW_SPILLABLE_MEM = ConfigProperty diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java index d9e9701f4..7590e9ace 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java @@ -61,13 +61,14 @@ import java.util.zip.InflaterInputStream; * * Inspired by https://github.com/basho/bitcask */ -public final class BitCaskDiskMap implements DiskMap { +public final class BitCaskDiskMap extends DiskMap { public static final int BUFFER_SIZE = 128 * 1024; // 128 KB private static final Logger LOG = LogManager.getLogger(BitCaskDiskMap.class); // Caching byte compression/decompression to avoid creating instances for every operation private static final ThreadLocal DISK_COMPRESSION_REF = ThreadLocal.withInitial(CompressionHandler::new); + // Stores the key and corresponding value's latest metadata spilled to disk private final Map valueMetadataMap; // Enables compression for all values stored in the disk map @@ -87,12 +88,11 @@ public final class BitCaskDiskMap randomAccessFile = new ThreadLocal<>(); private final Queue openedAccessFiles = new ConcurrentLinkedQueue<>(); - private transient Thread shutdownThread = null; - public BitCaskDiskMap(String baseFilePath, boolean isCompressionEnabled) throws IOException { + super(baseFilePath, ExternalSpillableMap.DiskMapType.BITCASK.name()); this.valueMetadataMap = new ConcurrentHashMap<>(); this.isCompressionEnabled = isCompressionEnabled; - this.writeOnlyFile = new File(baseFilePath, UUID.randomUUID().toString()); + this.writeOnlyFile = new File(diskMapPath, UUID.randomUUID().toString()); this.filePath = writeOnlyFile.getPath(); initFile(writeOnlyFile); this.fileOutputStream = new FileOutputStream(writeOnlyFile, true); @@ -138,16 +138,6 @@ public final class BitCaskDiskMap The generic type of the keys * @param The generic type of the values */ -public interface DiskMap extends Map, Iterable { +public abstract class DiskMap implements Map, Iterable { + + private static final Logger LOG = LogManager.getLogger(DiskMap.class); + private static final String SUBFOLDER_PREFIX = "hudi"; + private final File diskMapPathFile; + private transient Thread shutdownThread = null; + + // Base path for the write file + protected final String diskMapPath; + + public DiskMap(String basePath, String prefix) throws IOException { + this.diskMapPath = + String.format("%s/%s-%s-%s", basePath, SUBFOLDER_PREFIX, prefix, UUID.randomUUID().toString()); + diskMapPathFile = new File(diskMapPath); + FileIOUtils.deleteDirectory(diskMapPathFile); + FileIOUtils.mkdir(diskMapPathFile); + // Make sure the folder is deleted when JVM exits + diskMapPathFile.deleteOnExit(); + addShutDownHook(); + } + + /** + * Register shutdown hook to force flush contents of the data written to FileOutputStream from OS page cache + * (typically 4 KB) to disk. + */ + private void addShutDownHook() { + shutdownThread = new Thread(this::cleanup); + Runtime.getRuntime().addShutdownHook(shutdownThread); + } /** * @returns a stream of the values stored in the disk. */ - Stream valueStream(); + abstract Stream valueStream(); /** * Number of bytes spilled to disk. */ - long sizeOfFileOnDiskInBytes(); + abstract long sizeOfFileOnDiskInBytes(); /** - * Cleanup. + * Close and cleanup the Map. */ - void close(); + public void close() { + cleanup(false); + } + /** + * Cleanup all resources, files and folders + * triggered by shutdownhook. + */ + private void cleanup() { + cleanup(true); + } + + /** + * Cleanup all resources, files and folders. + */ + private void cleanup(boolean isTriggeredFromShutdownHook) { + try { + FileIOUtils.deleteDirectory(diskMapPathFile); + } catch (IOException exception) { + LOG.warn("Error while deleting the disk map directory=" + diskMapPath, exception); + } + if (!isTriggeredFromShutdownHook && shutdownThread != null) { + Runtime.getRuntime().removeShutdownHook(shutdownThread); + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java index c500876f0..21211a570 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java @@ -40,23 +40,22 @@ import java.util.stream.StreamSupport; * This class provides a disk spillable only map implementation. * All of the data is stored using the RocksDB implementation. */ -public final class RocksDbDiskMap implements DiskMap { +public final class RocksDbDiskMap extends DiskMap { // ColumnFamily allows partitioning data within RockDB, which allows // independent configuration and faster deletes across partitions // https://github.com/facebook/rocksdb/wiki/Column-Families // For this use case, we use a single static column family/ partition // - private static final String COLUMN_FAMILY_NAME = "spill_map"; + private static final String ROCKSDB_COL_FAMILY = "rocksdb-diskmap"; private static final Logger LOG = LogManager.getLogger(RocksDbDiskMap.class); // Stores the key and corresponding value's latest metadata spilled to disk private final Set keySet; - private final String rocksDbStoragePath; private RocksDBDAO rocksDb; public RocksDbDiskMap(String rocksDbStoragePath) throws IOException { + super(rocksDbStoragePath, ExternalSpillableMap.DiskMapType.ROCKS_DB.name()); this.keySet = new HashSet<>(); - this.rocksDbStoragePath = rocksDbStoragePath; } @Override @@ -84,12 +83,12 @@ public final class RocksDbDiskMap keyValues) { - getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, COLUMN_FAMILY_NAME, key, value))); + getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, ROCKSDB_COL_FAMILY, key, value))); keySet.addAll(keyValues.keySet()); } @@ -139,7 +138,7 @@ public final class RocksDbDiskMap iterator() { - return getRocksDb().iterator(COLUMN_FAMILY_NAME); + return getRocksDb().iterator(ROCKSDB_COL_FAMILY); } @Override @@ -159,14 +158,15 @@ public final class RocksDbDiskMap value = payload.getInsertValue(HoodieAvroUtils.addMetadataFields(getSimpleSchema())); assertEquals(originalRecord, value.get()); } + + verifyCleanup(records); } @ParameterizedTest @@ -111,6 +115,8 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness { oRecords.add(rec); assert recordKeys.contains(rec.getRecordKey()); } + + verifyCleanup(records); } @ParameterizedTest @@ -154,6 +160,7 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness { throw new UncheckedIOException(io); } } + verifyCleanup(records); } @Test @@ -236,4 +243,11 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness { System.out.println("Time taken :" + timeTaken); assertTrue(timeTaken < 100); } + + private void verifyCleanup(BitCaskDiskMap records) { + File basePathDir = new File(basePath); + assert Objects.requireNonNull(basePathDir.list()).length > 0; + records.close(); + assertEquals(Objects.requireNonNull(basePathDir.list()).length, 0); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java index 05131516a..4fed5a80e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java @@ -342,6 +342,7 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness { public void testLargeInsertUpsert() {} private static Stream testArguments() { + // Arguments : 1. Disk Map Type 2. isCompressionEnabled for BitCaskMap return Stream.of( arguments(ExternalSpillableMap.DiskMapType.BITCASK, false), arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false),