From a4dcbb5c5a2a94e4f69524194d8777d082af31ab Mon Sep 17 00:00:00 2001 From: rmahindra123 <76502047+rmahindra123@users.noreply.github.com> Date: Mon, 5 Jul 2021 23:03:41 -0700 Subject: [PATCH] [HUDI-2028] Implement RockDbBasedMap as an alternate to DiskBasedMap in ExternalSpillableMap (#3194) Co-authored-by: Rajesh Mahindra --- .../apache/hudi/config/HoodieWriteConfig.java | 16 ++ .../org/apache/hudi/io/HoodieMergeHandle.java | 5 +- .../log/HoodieMergedLogRecordScanner.java | 2 +- .../hudi/common/util/SpillableMapUtils.java | 2 +- ...{DiskBasedMap.java => BitCaskDiskMap.java} | 14 +- .../hudi/common/util/collection/DiskMap.java | 49 +++++ .../util/collection/ExternalSpillableMap.java | 50 ++++- .../util/collection/LazyFileIterable.java | 16 +- .../common/util/collection/RocksDBDAO.java | 59 ++++- .../util/collection/RocksDbDiskMap.java | 176 +++++++++++++++ ...kBasedMap.java => TestBitCaskDiskMap.java} | 12 +- .../collection/TestExternalSpillableMap.java | 71 ++++-- .../util/collection/TestRocksDbBasedMap.java | 4 +- .../util/collection/TestRocksDbDiskMap.java | 204 ++++++++++++++++++ 14 files changed, 620 insertions(+), 60 deletions(-) rename hudi-common/src/main/java/org/apache/hudi/common/util/collection/{DiskBasedMap.java => BitCaskDiskMap.java} (95%) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java rename hudi-common/src/test/java/org/apache/hudi/common/util/collection/{TestDiskBasedMap.java => TestBitCaskDiskMap.java} (96%) create mode 100644 hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbDiskMap.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index f41f2ac23..4a2f2c2fb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.constant.KeyGeneratorType; @@ -52,6 +53,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Arrays; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Properties; @@ -283,6 +285,11 @@ public class HoodieWriteConfig extends HoodieConfig { .defaultValue("false") .withDocumentation("Allow duplicates with inserts while merging with existing records"); + public static final ConfigProperty SPILLABLE_DISK_MAP_TYPE = ConfigProperty + .key("hoodie.spillable.diskmap.type") + .defaultValue(ExternalSpillableMap.DiskMapType.BITCASK) + .withDocumentation("Enable usage of either BITCASK or ROCKS_DB as disk map for External Spillable Map"); + public static final ConfigProperty CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP = ConfigProperty .key("hoodie.client.heartbeat.interval_in_ms") .defaultValue(60 * 1000) @@ -554,6 +561,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getBoolean(MERGE_ALLOW_DUPLICATE_ON_INSERTS); } + public ExternalSpillableMap.DiskMapType getSpillableDiskMapType() { + return ExternalSpillableMap.DiskMapType.valueOf(getString(SPILLABLE_DISK_MAP_TYPE).toUpperCase(Locale.ROOT)); + } + public EngineType getEngineType() { return engineType; } @@ -1504,6 +1515,11 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } + public Builder withSpillableDiskMapType(ExternalSpillableMap.DiskMapType diskMapType) { + writeConfig.setValue(SPILLABLE_DISK_MAP_TYPE, diskMapType.name()); + return this; + } + public Builder withHeartbeatIntervalInMs(Integer heartbeatIntervalInMs) { writeConfig.setValue(CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP, String.valueOf(heartbeatIntervalInMs)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index cb9dcbdf2..a4dc8d44e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -199,7 +199,8 @@ public class HoodieMergeHandle extends H long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config); LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge); this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(), - new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(tableSchema)); + new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(tableSchema), + config.getSpillableDiskMapType()); } catch (IOException io) { throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io); } @@ -231,7 +232,7 @@ public class HoodieMergeHandle extends H LOG.info("Number of entries in MemoryBasedMap => " + ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries() + "Total size in bytes of MemoryBasedMap => " - + ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() + "Number of entries in DiskBasedMap => " + + ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() + "Number of entries in BitCaskDiskMap => " + ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries() + "Size of file spilled to disk => " + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index 5f5bf1602..12e27bad0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -100,7 +100,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records.getInMemoryMapNumEntries()); LOG.info( "Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records.getCurrentInMemoryMapSize()); - LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries()); + LOG.info("Number of entries in BitCaskDiskMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries()); LOG.info("Size of file spilled to disk => " + records.getSizeOfFileOnDiskInBytes()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java index b6eab3cfb..2dbc9123f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java @@ -22,7 +22,7 @@ import org.apache.hudi.common.fs.SizeAwareDataOutputStream; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.collection.DiskBasedMap.FileEntry; +import org.apache.hudi.common.util.collection.BitCaskDiskMap.FileEntry; import org.apache.hudi.exception.HoodieCorruptedDataException; import org.apache.avro.generic.GenericRecord; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java similarity index 95% rename from hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java rename to hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java index fe4666305..b525289c9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java @@ -52,11 +52,13 @@ import java.util.stream.Stream; * This class provides a disk spillable only map implementation. All of the data is currenly written to one file, * without any rollover support. It uses the following : 1) An in-memory map that tracks the key-> latest ValueMetadata. * 2) Current position in the file NOTE : Only String.class type supported for Key + * + * Inspired by https://github.com/basho/bitcask */ -public final class DiskBasedMap implements Map, Iterable { +public final class BitCaskDiskMap implements DiskMap { public static final int BUFFER_SIZE = 128 * 1024; // 128 KB - private static final Logger LOG = LogManager.getLogger(DiskBasedMap.class); + private static final Logger LOG = LogManager.getLogger(BitCaskDiskMap.class); // Stores the key and corresponding value's latest metadata spilled to disk private final Map valueMetadataMap; // Write only file @@ -76,7 +78,7 @@ public final class DiskBasedMap private transient Thread shutdownThread = null; - public DiskBasedMap(String baseFilePath) throws IOException { + public BitCaskDiskMap(String baseFilePath) throws IOException { this.valueMetadataMap = new ConcurrentHashMap<>(); this.writeOnlyFile = new File(baseFilePath, UUID.randomUUID().toString()); this.filePath = writeOnlyFile.getPath(); @@ -136,7 +138,7 @@ public final class DiskBasedMap try { writeOnlyFileHandle.flush(); } catch (IOException e) { - throw new HoodieIOException("Failed to flush to DiskBasedMap file", e); + throw new HoodieIOException("Failed to flush to BitCaskDiskMap file", e); } } @@ -151,6 +153,7 @@ public final class DiskBasedMap /** * Number of bytes spilled to disk. */ + @Override public long sizeOfFileOnDiskInBytes() { return filePosition.get(); } @@ -203,7 +206,7 @@ public final class DiskBasedMap Integer valueSize = val.length; Long timestamp = System.currentTimeMillis(); this.valueMetadataMap.put(key, - new DiskBasedMap.ValueMetadata(this.filePath, valueSize, filePosition.get(), timestamp)); + new BitCaskDiskMap.ValueMetadata(this.filePath, valueSize, filePosition.get(), timestamp)); byte[] serializedKey = SerializationUtils.serialize(key); filePosition .set(SpillableMapUtils.spillToDisk(writeOnlyFileHandle, new FileEntry(SpillableMapUtils.generateChecksum(val), @@ -287,6 +290,7 @@ public final class DiskBasedMap throw new HoodieException("Unsupported Operation Exception"); } + @Override public Stream valueStream() { final BufferedRandomAccessFile file = getRandomAccessFile(); return valueMetadataMap.values().stream().sorted().sequential().map(valueMetaData -> (R) get(valueMetaData, file)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java new file mode 100644 index 000000000..5f3b441fe --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.collection; + +import java.io.Serializable; +import java.util.Map; +import java.util.stream.Stream; + +/** + * This interface provides the map interface for storing records in disk after they + * spill over from memory. Used by {@link ExternalSpillableMap}. + * + * @param The generic type of the keys + * @param The generic type of the values + */ +public interface DiskMap extends Map, Iterable { + + /** + * @returns a stream of the values stored in the disk. + */ + Stream valueStream(); + + /** + * Number of bytes spilled to disk. + */ + long sizeOfFileOnDiskInBytes(); + + /** + * Cleanup. + */ + void close(); + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java index 003d525b6..dd5b9c240 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java @@ -61,8 +61,8 @@ public class ExternalSpillableMap inMemoryMap; - // Map to store key-valuemetadata important to find the values spilled to disk - private transient volatile DiskBasedMap diskBasedMap; + // Map to store key-values on disk or db after it spilled over the memory + private transient volatile DiskMap diskBasedMap; // TODO(na) : a dynamic sizing factor to ensure we have space for other objects in memory and // incorrect payload estimation private final Double sizingFactorForInMemoryMap = 0.8; @@ -70,6 +70,8 @@ public class ExternalSpillableMap keySizeEstimator; // Size Estimator for key types private final SizeEstimator valueSizeEstimator; + // Type of the disk map + private final DiskMapType diskMapType; // current space occupied by this map in-memory private Long currentInMemoryMapSize; // An estimate of the size of each payload written to this map @@ -80,22 +82,34 @@ public class ExternalSpillableMap keySizeEstimator, - SizeEstimator valueSizeEstimator) throws IOException { + SizeEstimator valueSizeEstimator) throws IOException { + this(maxInMemorySizeInBytes, baseFilePath, keySizeEstimator, valueSizeEstimator, DiskMapType.BITCASK); + } + + public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, SizeEstimator keySizeEstimator, + SizeEstimator valueSizeEstimator, DiskMapType diskMapType) throws IOException { this.inMemoryMap = new HashMap<>(); this.baseFilePath = baseFilePath; - this.diskBasedMap = new DiskBasedMap<>(baseFilePath); this.maxInMemorySizeInBytes = (long) Math.floor(maxInMemorySizeInBytes * sizingFactorForInMemoryMap); this.currentInMemoryMapSize = 0L; this.keySizeEstimator = keySizeEstimator; this.valueSizeEstimator = valueSizeEstimator; + this.diskMapType = diskMapType; } - private DiskBasedMap getDiskBasedMap() { + private DiskMap getDiskBasedMap() { if (null == diskBasedMap) { synchronized (this) { if (null == diskBasedMap) { try { - diskBasedMap = new DiskBasedMap<>(baseFilePath); + switch (diskMapType) { + case ROCKS_DB: + diskBasedMap = new RocksDbDiskMap<>(baseFilePath); + break; + case BITCASK: + default: + diskBasedMap = new BitCaskDiskMap<>(baseFilePath); + } } catch (IOException e) { throw new HoodieIOException(e.getMessage(), e); } @@ -113,7 +127,7 @@ public class ExternalSpillableMap implements Iterator { - private Iterator inMemoryIterator; - private Iterator diskLazyFileIterator; + private final Iterator inMemoryIterator; + private final Iterator diskLazyFileIterator; public IteratorWrapper(Iterator inMemoryIterator, Iterator diskLazyFileIterator) { this.inMemoryIterator = inMemoryIterator; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java index 95b1ac2b3..024f55542 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java @@ -36,11 +36,11 @@ public class LazyFileIterable implements Iterable { // Used to access the value written at a specific position in the file private final String filePath; // Stores the key and corresponding value's latest metadata spilled to disk - private final Map inMemoryMetadataOfSpilledData; + private final Map inMemoryMetadataOfSpilledData; private transient Thread shutdownThread = null; - public LazyFileIterable(String filePath, Map map) { + public LazyFileIterable(String filePath, Map map) { this.filePath = filePath; this.inMemoryMetadataOfSpilledData = map; } @@ -61,16 +61,16 @@ public class LazyFileIterable implements Iterable { private final String filePath; private BufferedRandomAccessFile readOnlyFileHandle; - private final Iterator> metadataIterator; + private final Iterator> metadataIterator; - public LazyFileIterator(String filePath, Map map) throws IOException { + public LazyFileIterator(String filePath, Map map) throws IOException { this.filePath = filePath; - this.readOnlyFileHandle = new BufferedRandomAccessFile(filePath, "r", DiskBasedMap.BUFFER_SIZE); + this.readOnlyFileHandle = new BufferedRandomAccessFile(filePath, "r", BitCaskDiskMap.BUFFER_SIZE); readOnlyFileHandle.seek(0); // sort the map in increasing order of offset of value so disk seek is only in one(forward) direction this.metadataIterator = map.entrySet().stream() - .sorted((Map.Entry o1, Map.Entry o2) -> o1 + .sorted((Map.Entry o1, Map.Entry o2) -> o1 .getValue().getOffsetOfValue().compareTo(o2.getValue().getOffsetOfValue())) .collect(Collectors.toList()).iterator(); this.addShutdownHook(); @@ -90,8 +90,8 @@ public class LazyFileIterable implements Iterable { if (!hasNext()) { throw new IllegalStateException("next() called on EOF'ed stream. File :" + filePath); } - Map.Entry entry = this.metadataIterator.next(); - return DiskBasedMap.get(entry.getValue(), readOnlyFileHandle); + Map.Entry entry = this.metadataIterator.next(); + return BitCaskDiskMap.get(entry.getValue(), readOnlyFileHandle); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java index 3c08460f2..fe40d9859 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java @@ -45,6 +45,7 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.UUID; @@ -64,11 +65,13 @@ public class RocksDBDAO { private transient RocksDB rocksDB; private boolean closed = false; private final String rocksDBBasePath; + private long totalBytesWritten; public RocksDBDAO(String basePath, String rocksDBBasePath) { this.rocksDBBasePath = String.format("%s/%s/%s", rocksDBBasePath, basePath.replace("/", "_"), UUID.randomUUID().toString()); init(); + totalBytesWritten = 0L; } /** @@ -169,7 +172,7 @@ public class RocksDBDAO { */ public void putInBatch(WriteBatch batch, String columnFamilyName, String key, T value) { try { - byte[] payload = SerializationUtils.serialize(value); + byte[] payload = serializePayload(value); batch.put(managedHandlesMap.get(columnFamilyName), key.getBytes(), payload); } catch (Exception e) { throw new HoodieException(e); @@ -189,7 +192,7 @@ public class RocksDBDAO { K key, T value) { try { byte[] keyBytes = SerializationUtils.serialize(key); - byte[] payload = SerializationUtils.serialize(value); + byte[] payload = serializePayload(value); batch.put(managedHandlesMap.get(columnFamilyName), keyBytes, payload); } catch (Exception e) { throw new HoodieException(e); @@ -206,7 +209,7 @@ public class RocksDBDAO { */ public void put(String columnFamilyName, String key, T value) { try { - byte[] payload = SerializationUtils.serialize(value); + byte[] payload = serializePayload(value); getRocksDB().put(managedHandlesMap.get(columnFamilyName), key.getBytes(), payload); } catch (Exception e) { throw new HoodieException(e); @@ -223,7 +226,7 @@ public class RocksDBDAO { */ public void put(String columnFamilyName, K key, T value) { try { - byte[] payload = SerializationUtils.serialize(value); + byte[] payload = serializePayload(value); getRocksDB().put(managedHandlesMap.get(columnFamilyName), SerializationUtils.serialize(key), payload); } catch (Exception e) { throw new HoodieException(e); @@ -351,6 +354,16 @@ public class RocksDBDAO { return results.stream(); } + /** + * Return Iterator of key-value pairs from RocksIterator. + * + * @param columnFamilyName Column Family Name + * @param Type of value stored + */ + public Iterator iterator(String columnFamilyName) { + return new IteratorWrapper<>(getRocksDB().newIterator(managedHandlesMap.get(columnFamilyName))); + } + /** * Perform a prefix delete and return stream of key-value pairs retrieved. * @@ -448,10 +461,48 @@ public class RocksDBDAO { } } + public long getTotalBytesWritten() { + return totalBytesWritten; + } + + private byte[] serializePayload(T value) throws IOException { + byte[] payload = SerializationUtils.serialize(value); + totalBytesWritten += payload.length; + return payload; + } + String getRocksDBBasePath() { return rocksDBBasePath; } + /** + * {@link Iterator} wrapper for RocksDb Iterator {@link RocksIterator}. + */ + private static class IteratorWrapper implements Iterator { + + private final RocksIterator iterator; + + public IteratorWrapper(final RocksIterator iterator) { + this.iterator = iterator; + iterator.seekToFirst(); + } + + @Override + public boolean hasNext() { + return iterator.isValid(); + } + + @Override + public R next() { + if (!hasNext()) { + throw new IllegalStateException("next() called on rocksDB with no more valid entries"); + } + R val = SerializationUtils.deserialize(iterator.value()); + iterator.next(); + return val; + } + } + /** * Functional interface for stacking operation to Write batch. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java new file mode 100644 index 000000000..c500876f0 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.collection; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieNotSupportedException; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.Serializable; +import java.util.AbstractMap; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.Spliterators; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * This class provides a disk spillable only map implementation. + * All of the data is stored using the RocksDB implementation. + */ +public final class RocksDbDiskMap implements DiskMap { + // ColumnFamily allows partitioning data within RockDB, which allows + // independent configuration and faster deletes across partitions + // https://github.com/facebook/rocksdb/wiki/Column-Families + // For this use case, we use a single static column family/ partition + // + private static final String COLUMN_FAMILY_NAME = "spill_map"; + + private static final Logger LOG = LogManager.getLogger(RocksDbDiskMap.class); + // Stores the key and corresponding value's latest metadata spilled to disk + private final Set keySet; + private final String rocksDbStoragePath; + private RocksDBDAO rocksDb; + + public RocksDbDiskMap(String rocksDbStoragePath) throws IOException { + this.keySet = new HashSet<>(); + this.rocksDbStoragePath = rocksDbStoragePath; + } + + @Override + public int size() { + return keySet.size(); + } + + @Override + public boolean isEmpty() { + return keySet.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return keySet.contains((T) key); + } + + @Override + public boolean containsValue(Object value) { + throw new HoodieNotSupportedException("unable to compare values in map"); + } + + @Override + public R get(Object key) { + if (!containsKey(key)) { + return null; + } + return getRocksDb().get(COLUMN_FAMILY_NAME, (T) key); + } + + @Override + public R put(T key, R value) { + getRocksDb().put(COLUMN_FAMILY_NAME, key, value); + keySet.add(key); + return value; + } + + @Override + public R remove(Object key) { + R value = get(key); + if (value != null) { + keySet.remove((T) key); + getRocksDb().delete(COLUMN_FAMILY_NAME, (T) key); + } + return value; + } + + @Override + public void putAll(Map keyValues) { + getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, COLUMN_FAMILY_NAME, key, value))); + keySet.addAll(keyValues.keySet()); + } + + @Override + public void clear() { + close(); + } + + @Override + public Set keySet() { + return keySet; + } + + @Override + public Collection values() { + throw new HoodieException("Unsupported Operation Exception"); + } + + @Override + public Set> entrySet() { + Set> entrySet = new HashSet<>(); + for (T key : keySet) { + entrySet.add(new AbstractMap.SimpleEntry<>(key, get(key))); + } + return entrySet; + } + + /** + * Custom iterator to iterate over values written to disk. + */ + @Override + public Iterator iterator() { + return getRocksDb().iterator(COLUMN_FAMILY_NAME); + } + + @Override + public Stream valueStream() { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator(), 0), false); + } + + @Override + public long sizeOfFileOnDiskInBytes() { + return getRocksDb().getTotalBytesWritten(); + } + + @Override + public void close() { + keySet.clear(); + if (null != rocksDb) { + rocksDb.close(); + } + rocksDb = null; + } + + private RocksDBDAO getRocksDb() { + if (null == rocksDb) { + synchronized (this) { + if (null == rocksDb) { + rocksDb = new RocksDBDAO(COLUMN_FAMILY_NAME, rocksDbStoragePath); + rocksDb.addColumnFamily(COLUMN_FAMILY_NAME); + } + } + } + return rocksDb; + } + +} \ No newline at end of file diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestDiskBasedMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java similarity index 96% rename from hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestDiskBasedMap.java rename to hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java index e3cc88656..45aaff377 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestDiskBasedMap.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java @@ -57,9 +57,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; /** - * Tests dis based map {@link DiskBasedMap}. + * Tests dis based map {@link BitCaskDiskMap}. */ -public class TestDiskBasedMap extends HoodieCommonTestHarness { +public class TestBitCaskDiskMap extends HoodieCommonTestHarness { @BeforeEach public void setup() { @@ -68,7 +68,7 @@ public class TestDiskBasedMap extends HoodieCommonTestHarness { @Test public void testSimpleInsert() throws IOException, URISyntaxException { - DiskBasedMap records = new DiskBasedMap<>(basePath); + BitCaskDiskMap records = new BitCaskDiskMap<>(basePath); List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); ((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); @@ -86,7 +86,7 @@ public class TestDiskBasedMap extends HoodieCommonTestHarness { @Test public void testSimpleInsertWithoutHoodieMetadata() throws IOException, URISyntaxException { - DiskBasedMap records = new DiskBasedMap<>(basePath); + BitCaskDiskMap records = new BitCaskDiskMap<>(basePath); List hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000); Set recordKeys = new HashSet<>(); // insert generated records into the map @@ -109,7 +109,7 @@ public class TestDiskBasedMap extends HoodieCommonTestHarness { public void testSimpleUpsert() throws IOException, URISyntaxException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); - DiskBasedMap records = new DiskBasedMap<>(basePath); + BitCaskDiskMap records = new BitCaskDiskMap<>(basePath); List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); // perform some inserts @@ -189,7 +189,7 @@ public class TestDiskBasedMap extends HoodieCommonTestHarness { @Test public void testPutAll() throws IOException, URISyntaxException { - DiskBasedMap records = new DiskBasedMap<>(basePath); + BitCaskDiskMap records = new BitCaskDiskMap<>(basePath); List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); Map recordMap = new HashMap<>(); iRecords.forEach(r -> { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java index b240c8dd1..c82d99932 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java @@ -38,6 +38,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.MethodOrderer.Alphanumeric; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.io.IOException; import java.io.UncheckedIOException; @@ -45,6 +47,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -65,32 +68,48 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness { failureOutputPath = basePath + "/test_fail"; } - @Test - public void simpleInsertTest() throws IOException, URISyntaxException { + @ParameterizedTest + @EnumSource(ExternalSpillableMap.DiskMapType.class) + public void simpleInsertTest(ExternalSpillableMap.DiskMapType diskMapType) throws IOException, URISyntaxException { Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); - String payloadClazz = HoodieAvroPayload.class.getName(); + ExternalSpillableMap> records = - new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); // 16B + new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), + new HoodieRecordSizeEstimator(schema), diskMapType); // 16B List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); assert (recordKeys.size() == 100); + + // Test iterator Iterator> itr = records.iterator(); - List oRecords = new ArrayList<>(); + int cntSize = 0; while (itr.hasNext()) { HoodieRecord rec = itr.next(); - oRecords.add(rec); + cntSize++; assert recordKeys.contains(rec.getRecordKey()); } + assertEquals(recordKeys.size(), cntSize); + + // Test value stream + List> values = records.valueStream().collect(Collectors.toList()); + cntSize = 0; + for (HoodieRecord value : values) { + assert recordKeys.contains(value.getRecordKey()); + cntSize++; + } + assertEquals(recordKeys.size(), cntSize); } - @Test - public void testSimpleUpsert() throws IOException, URISyntaxException { + @ParameterizedTest + @EnumSource(ExternalSpillableMap.DiskMapType.class) + public void testSimpleUpsert(ExternalSpillableMap.DiskMapType diskMapType) throws IOException, URISyntaxException { Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); ExternalSpillableMap> records = - new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); // 16B + new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), + new HoodieRecordSizeEstimator(schema), diskMapType); // 16B List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); @@ -120,14 +139,16 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness { }); } - @Test - public void testAllMapOperations() throws IOException, URISyntaxException { + @ParameterizedTest + @EnumSource(ExternalSpillableMap.DiskMapType.class) + public void testAllMapOperations(ExternalSpillableMap.DiskMapType diskMapType) throws IOException, URISyntaxException { Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); String payloadClazz = HoodieAvroPayload.class.getName(); ExternalSpillableMap> records = - new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); // 16B + new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), + new HoodieRecordSizeEstimator(schema), diskMapType); // 16B List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); // insert a bunch of records so that values spill to disk too @@ -176,12 +197,14 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness { assertTrue(records.size() == 0); } - @Test - public void simpleTestWithException() throws IOException, URISyntaxException { + @ParameterizedTest + @EnumSource(ExternalSpillableMap.DiskMapType.class) + public void simpleTestWithException(ExternalSpillableMap.DiskMapType diskMapType) throws IOException, URISyntaxException { Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); ExternalSpillableMap> records = new ExternalSpillableMap<>(16L, - failureOutputPath, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); // 16B + failureOutputPath, new DefaultSizeEstimator(), + new HoodieRecordSizeEstimator(schema), diskMapType); // 16B List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); @@ -194,13 +217,15 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness { }); } - @Test - public void testDataCorrectnessWithUpsertsToDataInMapAndOnDisk() throws IOException, URISyntaxException { + @ParameterizedTest + @EnumSource(ExternalSpillableMap.DiskMapType.class) + public void testDataCorrectnessWithUpsertsToDataInMapAndOnDisk(ExternalSpillableMap.DiskMapType diskMapType) throws IOException, URISyntaxException { Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); ExternalSpillableMap> records = - new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); // 16B + new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), + new HoodieRecordSizeEstimator(schema), diskMapType); // 16B List recordKeys = new ArrayList<>(); // Ensure we spill to disk @@ -245,13 +270,15 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness { assert newCommitTime.contentEquals(gRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString()); } - @Test - public void testDataCorrectnessWithoutHoodieMetadata() throws IOException, URISyntaxException { + @ParameterizedTest + @EnumSource(ExternalSpillableMap.DiskMapType.class) + public void testDataCorrectnessWithoutHoodieMetadata(ExternalSpillableMap.DiskMapType diskMapType) throws IOException, URISyntaxException { Schema schema = SchemaTestUtil.getSimpleSchema(); ExternalSpillableMap> records = - new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); // 16B + new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), + new HoodieRecordSizeEstimator(schema), diskMapType); // 16B List recordKeys = new ArrayList<>(); // Ensure we spill to disk @@ -311,4 +338,4 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness { // TODO : come up with a performance eval test for spillableMap @Test public void testLargeInsertUpsert() {} -} +} \ No newline at end of file diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbBasedMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbBasedMap.java index 1111d10e4..5b71b5ec2 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbBasedMap.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbBasedMap.java @@ -38,7 +38,7 @@ import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; /** - * Tests RocksDB based map {@link RocksDBBasedMap}. + * Tests RocksDB based map {@link RocksDbDiskMap}. */ public class TestRocksDbBasedMap extends HoodieCommonTestHarness { @@ -49,7 +49,7 @@ public class TestRocksDbBasedMap extends HoodieCommonTestHarness { @Test public void testSimple() throws IOException, URISyntaxException { - RocksDBBasedMap records = new RocksDBBasedMap(basePath); + RocksDbDiskMap records = new RocksDbDiskMap(basePath); List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); ((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbDiskMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbDiskMap.java new file mode 100644 index 000000000..2ae521fc8 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbDiskMap.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.collection; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.testutils.SchemaTestUtil; +import org.apache.hudi.common.testutils.SpillableMapTestUtils; +import org.apache.hudi.common.util.Option; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test the rocksDb based Map {@link RocksDbDiskMap} + * that is used by {@link ExternalSpillableMap}. + */ +public class TestRocksDbDiskMap extends HoodieCommonTestHarness { + + @BeforeEach + public void setUp() { + initPath(); + } + + @Test + public void testSimpleInsertSequential() throws IOException, URISyntaxException { + RocksDbDiskMap> rocksDBBasedMap = new RocksDbDiskMap<>(basePath); + List recordKeys = setupMapWithRecords(rocksDBBasedMap, 100); + + Iterator> itr = rocksDBBasedMap.iterator(); + int cntSize = 0; + while (itr.hasNext()) { + HoodieRecord rec = itr.next(); + cntSize++; + assert recordKeys.contains(rec.getRecordKey()); + } + assertEquals(recordKeys.size(), cntSize); + + // Test value stream + long currentTimeMs = System.currentTimeMillis(); + List> values = + rocksDBBasedMap.valueStream().collect(Collectors.toList()); + cntSize = 0; + for (HoodieRecord value : values) { + assert recordKeys.contains(value.getRecordKey()); + cntSize++; + } + assertEquals(recordKeys.size(), cntSize); + } + + @Test + public void testSimpleInsertRandomAccess() throws IOException, URISyntaxException { + RocksDbDiskMap rocksDBBasedMap = new RocksDbDiskMap<>(basePath); + List recordKeys = setupMapWithRecords(rocksDBBasedMap, 100); + + Random random = new Random(); + for (int i = 0; i < recordKeys.size(); i++) { + String key = recordKeys.get(random.nextInt(recordKeys.size())); + assert rocksDBBasedMap.get(key) != null; + } + } + + @Test + public void testSimpleInsertWithoutHoodieMetadata() throws IOException, URISyntaxException { + RocksDbDiskMap rocksDBBasedMap = new RocksDbDiskMap<>(basePath); + List hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000); + Set recordKeys = new HashSet<>(); + // insert generated records into the map + hoodieRecords.forEach(r -> { + rocksDBBasedMap.put(r.getRecordKey(), r); + recordKeys.add(r.getRecordKey()); + }); + // make sure records have spilled to disk + assertTrue(rocksDBBasedMap.sizeOfFileOnDiskInBytes() > 0); + Iterator> itr = rocksDBBasedMap.iterator(); + int cntSize = 0; + while (itr.hasNext()) { + HoodieRecord rec = itr.next(); + cntSize++; + assert recordKeys.contains(rec.getRecordKey()); + } + assertEquals(recordKeys.size(), cntSize); + } + + @Test + public void testSimpleUpsert() throws IOException, URISyntaxException { + Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); + + RocksDbDiskMap rocksDBBasedMap = new RocksDbDiskMap<>(basePath); + List insertedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); + List recordKeys = SpillableMapTestUtils.upsertRecords(insertedRecords, rocksDBBasedMap); + String oldCommitTime = + ((GenericRecord) insertedRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); + + // generate updates from inserts for first 50 keys / subset of keys + List updatedRecords = SchemaTestUtil.updateHoodieTestRecords(recordKeys.subList(0, 50), + SchemaTestUtil.generateHoodieTestRecords(0, 50), HoodieActiveTimeline.createNewInstantTime()); + String newCommitTime = + ((GenericRecord) updatedRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); + + // perform upserts + List updatedRecordKeys = SpillableMapTestUtils.upsertRecords(updatedRecords, rocksDBBasedMap); + + // Upserted records (on disk) should have the latest commit time + Iterator> itr = rocksDBBasedMap.iterator(); + while (itr.hasNext()) { + HoodieRecord rec = itr.next(); + try { + IndexedRecord indexedRecord = (IndexedRecord) rec.getData().getInsertValue(schema).get(); + String latestCommitTime = + ((GenericRecord) indexedRecord).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); + assert recordKeys.contains(rec.getRecordKey()) || updatedRecordKeys.contains(rec.getRecordKey()); + assertEquals(latestCommitTime, updatedRecordKeys.contains(rec.getRecordKey()) ? newCommitTime : oldCommitTime); + } catch (IOException io) { + throw new UncheckedIOException(io); + } + } + } + + @Test + public void testPutAll() throws IOException, URISyntaxException { + RocksDbDiskMap rocksDBBasedMap = new RocksDbDiskMap<>(basePath); + List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); + Map recordMap = new HashMap<>(); + iRecords.forEach(r -> { + String key = ((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + String partitionPath = ((GenericRecord) r).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); + HoodieRecord value = new HoodieRecord<>(new HoodieKey(key, partitionPath), new HoodieAvroPayload(Option.of((GenericRecord) r))); + recordMap.put(key, value); + }); + + rocksDBBasedMap.putAll(recordMap); + // make sure records have spilled to disk + assertTrue(rocksDBBasedMap.sizeOfFileOnDiskInBytes() > 0); + + // make sure all added records are present + for (Map.Entry entry : rocksDBBasedMap.entrySet()) { + assertTrue(recordMap.containsKey(entry.getKey())); + } + } + + @Test + public void testSimpleRemove() throws IOException, URISyntaxException { + RocksDbDiskMap rocksDBBasedMap = new RocksDbDiskMap<>(basePath); + List recordKeys = setupMapWithRecords(rocksDBBasedMap, 100); + + List deleteKeys = recordKeys.subList(0, 10); + for (String deleteKey : deleteKeys) { + assert rocksDBBasedMap.remove(deleteKey) != null; + assert rocksDBBasedMap.get(deleteKey) == null; + } + } + + private List setupMapWithRecords(RocksDbDiskMap rocksDBBasedMap, int numRecords) throws IOException, URISyntaxException { + List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, numRecords); + List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, rocksDBBasedMap); + // Ensure the number of records is correct + assertEquals(rocksDBBasedMap.size(), recordKeys.size()); + // make sure records have spilled to disk + assertTrue(rocksDBBasedMap.sizeOfFileOnDiskInBytes() > 0); + return recordKeys; + } +} \ No newline at end of file