[HUDI-2028] Implement RockDbBasedMap as an alternate to DiskBasedMap in ExternalSpillableMap (#3194)
Co-authored-by: Rajesh Mahindra <rmahindra@Rajeshs-MacBook-Pro.local>
This commit is contained in:
@@ -35,6 +35,7 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
|||||||
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||||
import org.apache.hudi.common.util.ReflectionUtils;
|
import org.apache.hudi.common.util.ReflectionUtils;
|
||||||
import org.apache.hudi.common.util.ValidationUtils;
|
import org.apache.hudi.common.util.ValidationUtils;
|
||||||
|
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
|
||||||
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
|
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
|
||||||
import org.apache.hudi.index.HoodieIndex;
|
import org.apache.hudi.index.HoodieIndex;
|
||||||
import org.apache.hudi.keygen.constant.KeyGeneratorType;
|
import org.apache.hudi.keygen.constant.KeyGeneratorType;
|
||||||
@@ -52,6 +53,7 @@ import java.io.IOException;
|
|||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
@@ -283,6 +285,11 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
.defaultValue("false")
|
.defaultValue("false")
|
||||||
.withDocumentation("Allow duplicates with inserts while merging with existing records");
|
.withDocumentation("Allow duplicates with inserts while merging with existing records");
|
||||||
|
|
||||||
|
public static final ConfigProperty<ExternalSpillableMap.DiskMapType> SPILLABLE_DISK_MAP_TYPE = ConfigProperty
|
||||||
|
.key("hoodie.spillable.diskmap.type")
|
||||||
|
.defaultValue(ExternalSpillableMap.DiskMapType.BITCASK)
|
||||||
|
.withDocumentation("Enable usage of either BITCASK or ROCKS_DB as disk map for External Spillable Map");
|
||||||
|
|
||||||
public static final ConfigProperty<Integer> CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP = ConfigProperty
|
public static final ConfigProperty<Integer> CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP = ConfigProperty
|
||||||
.key("hoodie.client.heartbeat.interval_in_ms")
|
.key("hoodie.client.heartbeat.interval_in_ms")
|
||||||
.defaultValue(60 * 1000)
|
.defaultValue(60 * 1000)
|
||||||
@@ -554,6 +561,10 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
return getBoolean(MERGE_ALLOW_DUPLICATE_ON_INSERTS);
|
return getBoolean(MERGE_ALLOW_DUPLICATE_ON_INSERTS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ExternalSpillableMap.DiskMapType getSpillableDiskMapType() {
|
||||||
|
return ExternalSpillableMap.DiskMapType.valueOf(getString(SPILLABLE_DISK_MAP_TYPE).toUpperCase(Locale.ROOT));
|
||||||
|
}
|
||||||
|
|
||||||
public EngineType getEngineType() {
|
public EngineType getEngineType() {
|
||||||
return engineType;
|
return engineType;
|
||||||
}
|
}
|
||||||
@@ -1504,6 +1515,11 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withSpillableDiskMapType(ExternalSpillableMap.DiskMapType diskMapType) {
|
||||||
|
writeConfig.setValue(SPILLABLE_DISK_MAP_TYPE, diskMapType.name());
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder withHeartbeatIntervalInMs(Integer heartbeatIntervalInMs) {
|
public Builder withHeartbeatIntervalInMs(Integer heartbeatIntervalInMs) {
|
||||||
writeConfig.setValue(CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP, String.valueOf(heartbeatIntervalInMs));
|
writeConfig.setValue(CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP, String.valueOf(heartbeatIntervalInMs));
|
||||||
return this;
|
return this;
|
||||||
|
|||||||
@@ -199,7 +199,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
|||||||
long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config);
|
long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config);
|
||||||
LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
|
LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
|
||||||
this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(),
|
this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(),
|
||||||
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(tableSchema));
|
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(tableSchema),
|
||||||
|
config.getSpillableDiskMapType());
|
||||||
} catch (IOException io) {
|
} catch (IOException io) {
|
||||||
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
|
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
|
||||||
}
|
}
|
||||||
@@ -231,7 +232,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
|||||||
LOG.info("Number of entries in MemoryBasedMap => "
|
LOG.info("Number of entries in MemoryBasedMap => "
|
||||||
+ ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries()
|
+ ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries()
|
||||||
+ "Total size in bytes of MemoryBasedMap => "
|
+ "Total size in bytes of MemoryBasedMap => "
|
||||||
+ ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() + "Number of entries in DiskBasedMap => "
|
+ ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() + "Number of entries in BitCaskDiskMap => "
|
||||||
+ ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries() + "Size of file spilled to disk => "
|
+ ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries() + "Size of file spilled to disk => "
|
||||||
+ ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
|
+ ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -100,7 +100,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
|
|||||||
LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records.getInMemoryMapNumEntries());
|
LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records.getInMemoryMapNumEntries());
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records.getCurrentInMemoryMapSize());
|
"Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records.getCurrentInMemoryMapSize());
|
||||||
LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries());
|
LOG.info("Number of entries in BitCaskDiskMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries());
|
||||||
LOG.info("Size of file spilled to disk => " + records.getSizeOfFileOnDiskInBytes());
|
LOG.info("Size of file spilled to disk => " + records.getSizeOfFileOnDiskInBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ import org.apache.hudi.common.fs.SizeAwareDataOutputStream;
|
|||||||
import org.apache.hudi.common.model.HoodieKey;
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
import org.apache.hudi.common.util.collection.DiskBasedMap.FileEntry;
|
import org.apache.hudi.common.util.collection.BitCaskDiskMap.FileEntry;
|
||||||
import org.apache.hudi.exception.HoodieCorruptedDataException;
|
import org.apache.hudi.exception.HoodieCorruptedDataException;
|
||||||
|
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
|
|||||||
@@ -52,11 +52,13 @@ import java.util.stream.Stream;
|
|||||||
* This class provides a disk spillable only map implementation. All of the data is currenly written to one file,
|
* This class provides a disk spillable only map implementation. All of the data is currenly written to one file,
|
||||||
* without any rollover support. It uses the following : 1) An in-memory map that tracks the key-> latest ValueMetadata.
|
* without any rollover support. It uses the following : 1) An in-memory map that tracks the key-> latest ValueMetadata.
|
||||||
* 2) Current position in the file NOTE : Only String.class type supported for Key
|
* 2) Current position in the file NOTE : Only String.class type supported for Key
|
||||||
|
*
|
||||||
|
* Inspired by https://github.com/basho/bitcask
|
||||||
*/
|
*/
|
||||||
public final class DiskBasedMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Iterable<R> {
|
public final class BitCaskDiskMap<T extends Serializable, R extends Serializable> implements DiskMap<T, R> {
|
||||||
|
|
||||||
public static final int BUFFER_SIZE = 128 * 1024; // 128 KB
|
public static final int BUFFER_SIZE = 128 * 1024; // 128 KB
|
||||||
private static final Logger LOG = LogManager.getLogger(DiskBasedMap.class);
|
private static final Logger LOG = LogManager.getLogger(BitCaskDiskMap.class);
|
||||||
// Stores the key and corresponding value's latest metadata spilled to disk
|
// Stores the key and corresponding value's latest metadata spilled to disk
|
||||||
private final Map<T, ValueMetadata> valueMetadataMap;
|
private final Map<T, ValueMetadata> valueMetadataMap;
|
||||||
// Write only file
|
// Write only file
|
||||||
@@ -76,7 +78,7 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
|
|||||||
|
|
||||||
private transient Thread shutdownThread = null;
|
private transient Thread shutdownThread = null;
|
||||||
|
|
||||||
public DiskBasedMap(String baseFilePath) throws IOException {
|
public BitCaskDiskMap(String baseFilePath) throws IOException {
|
||||||
this.valueMetadataMap = new ConcurrentHashMap<>();
|
this.valueMetadataMap = new ConcurrentHashMap<>();
|
||||||
this.writeOnlyFile = new File(baseFilePath, UUID.randomUUID().toString());
|
this.writeOnlyFile = new File(baseFilePath, UUID.randomUUID().toString());
|
||||||
this.filePath = writeOnlyFile.getPath();
|
this.filePath = writeOnlyFile.getPath();
|
||||||
@@ -136,7 +138,7 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
|
|||||||
try {
|
try {
|
||||||
writeOnlyFileHandle.flush();
|
writeOnlyFileHandle.flush();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new HoodieIOException("Failed to flush to DiskBasedMap file", e);
|
throw new HoodieIOException("Failed to flush to BitCaskDiskMap file", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -151,6 +153,7 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
|
|||||||
/**
|
/**
|
||||||
* Number of bytes spilled to disk.
|
* Number of bytes spilled to disk.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public long sizeOfFileOnDiskInBytes() {
|
public long sizeOfFileOnDiskInBytes() {
|
||||||
return filePosition.get();
|
return filePosition.get();
|
||||||
}
|
}
|
||||||
@@ -203,7 +206,7 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
|
|||||||
Integer valueSize = val.length;
|
Integer valueSize = val.length;
|
||||||
Long timestamp = System.currentTimeMillis();
|
Long timestamp = System.currentTimeMillis();
|
||||||
this.valueMetadataMap.put(key,
|
this.valueMetadataMap.put(key,
|
||||||
new DiskBasedMap.ValueMetadata(this.filePath, valueSize, filePosition.get(), timestamp));
|
new BitCaskDiskMap.ValueMetadata(this.filePath, valueSize, filePosition.get(), timestamp));
|
||||||
byte[] serializedKey = SerializationUtils.serialize(key);
|
byte[] serializedKey = SerializationUtils.serialize(key);
|
||||||
filePosition
|
filePosition
|
||||||
.set(SpillableMapUtils.spillToDisk(writeOnlyFileHandle, new FileEntry(SpillableMapUtils.generateChecksum(val),
|
.set(SpillableMapUtils.spillToDisk(writeOnlyFileHandle, new FileEntry(SpillableMapUtils.generateChecksum(val),
|
||||||
@@ -287,6 +290,7 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
|
|||||||
throw new HoodieException("Unsupported Operation Exception");
|
throw new HoodieException("Unsupported Operation Exception");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public Stream<R> valueStream() {
|
public Stream<R> valueStream() {
|
||||||
final BufferedRandomAccessFile file = getRandomAccessFile();
|
final BufferedRandomAccessFile file = getRandomAccessFile();
|
||||||
return valueMetadataMap.values().stream().sorted().sequential().map(valueMetaData -> (R) get(valueMetaData, file));
|
return valueMetadataMap.values().stream().sorted().sequential().map(valueMetaData -> (R) get(valueMetaData, file));
|
||||||
@@ -0,0 +1,49 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.common.util.collection;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This interface provides the map interface for storing records in disk after they
|
||||||
|
* spill over from memory. Used by {@link ExternalSpillableMap}.
|
||||||
|
*
|
||||||
|
* @param <T> The generic type of the keys
|
||||||
|
* @param <R> The generic type of the values
|
||||||
|
*/
|
||||||
|
public interface DiskMap<T extends Serializable, R extends Serializable> extends Map<T, R>, Iterable<R> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @returns a stream of the values stored in the disk.
|
||||||
|
*/
|
||||||
|
Stream<R> valueStream();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of bytes spilled to disk.
|
||||||
|
*/
|
||||||
|
long sizeOfFileOnDiskInBytes();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup.
|
||||||
|
*/
|
||||||
|
void close();
|
||||||
|
|
||||||
|
}
|
||||||
@@ -61,8 +61,8 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
|
|||||||
private final long maxInMemorySizeInBytes;
|
private final long maxInMemorySizeInBytes;
|
||||||
// Map to store key-values in memory until it hits maxInMemorySizeInBytes
|
// Map to store key-values in memory until it hits maxInMemorySizeInBytes
|
||||||
private final Map<T, R> inMemoryMap;
|
private final Map<T, R> inMemoryMap;
|
||||||
// Map to store key-valuemetadata important to find the values spilled to disk
|
// Map to store key-values on disk or db after it spilled over the memory
|
||||||
private transient volatile DiskBasedMap<T, R> diskBasedMap;
|
private transient volatile DiskMap<T, R> diskBasedMap;
|
||||||
// TODO(na) : a dynamic sizing factor to ensure we have space for other objects in memory and
|
// TODO(na) : a dynamic sizing factor to ensure we have space for other objects in memory and
|
||||||
// incorrect payload estimation
|
// incorrect payload estimation
|
||||||
private final Double sizingFactorForInMemoryMap = 0.8;
|
private final Double sizingFactorForInMemoryMap = 0.8;
|
||||||
@@ -70,6 +70,8 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
|
|||||||
private final SizeEstimator<T> keySizeEstimator;
|
private final SizeEstimator<T> keySizeEstimator;
|
||||||
// Size Estimator for key types
|
// Size Estimator for key types
|
||||||
private final SizeEstimator<R> valueSizeEstimator;
|
private final SizeEstimator<R> valueSizeEstimator;
|
||||||
|
// Type of the disk map
|
||||||
|
private final DiskMapType diskMapType;
|
||||||
// current space occupied by this map in-memory
|
// current space occupied by this map in-memory
|
||||||
private Long currentInMemoryMapSize;
|
private Long currentInMemoryMapSize;
|
||||||
// An estimate of the size of each payload written to this map
|
// An estimate of the size of each payload written to this map
|
||||||
@@ -80,22 +82,34 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
|
|||||||
private final String baseFilePath;
|
private final String baseFilePath;
|
||||||
|
|
||||||
public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, SizeEstimator<T> keySizeEstimator,
|
public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, SizeEstimator<T> keySizeEstimator,
|
||||||
SizeEstimator<R> valueSizeEstimator) throws IOException {
|
SizeEstimator<R> valueSizeEstimator) throws IOException {
|
||||||
|
this(maxInMemorySizeInBytes, baseFilePath, keySizeEstimator, valueSizeEstimator, DiskMapType.BITCASK);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, SizeEstimator<T> keySizeEstimator,
|
||||||
|
SizeEstimator<R> valueSizeEstimator, DiskMapType diskMapType) throws IOException {
|
||||||
this.inMemoryMap = new HashMap<>();
|
this.inMemoryMap = new HashMap<>();
|
||||||
this.baseFilePath = baseFilePath;
|
this.baseFilePath = baseFilePath;
|
||||||
this.diskBasedMap = new DiskBasedMap<>(baseFilePath);
|
|
||||||
this.maxInMemorySizeInBytes = (long) Math.floor(maxInMemorySizeInBytes * sizingFactorForInMemoryMap);
|
this.maxInMemorySizeInBytes = (long) Math.floor(maxInMemorySizeInBytes * sizingFactorForInMemoryMap);
|
||||||
this.currentInMemoryMapSize = 0L;
|
this.currentInMemoryMapSize = 0L;
|
||||||
this.keySizeEstimator = keySizeEstimator;
|
this.keySizeEstimator = keySizeEstimator;
|
||||||
this.valueSizeEstimator = valueSizeEstimator;
|
this.valueSizeEstimator = valueSizeEstimator;
|
||||||
|
this.diskMapType = diskMapType;
|
||||||
}
|
}
|
||||||
|
|
||||||
private DiskBasedMap<T, R> getDiskBasedMap() {
|
private DiskMap<T, R> getDiskBasedMap() {
|
||||||
if (null == diskBasedMap) {
|
if (null == diskBasedMap) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (null == diskBasedMap) {
|
if (null == diskBasedMap) {
|
||||||
try {
|
try {
|
||||||
diskBasedMap = new DiskBasedMap<>(baseFilePath);
|
switch (diskMapType) {
|
||||||
|
case ROCKS_DB:
|
||||||
|
diskBasedMap = new RocksDbDiskMap<>(baseFilePath);
|
||||||
|
break;
|
||||||
|
case BITCASK:
|
||||||
|
default:
|
||||||
|
diskBasedMap = new BitCaskDiskMap<>(baseFilePath);
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new HoodieIOException(e.getMessage(), e);
|
throw new HoodieIOException(e.getMessage(), e);
|
||||||
}
|
}
|
||||||
@@ -113,7 +127,7 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Number of entries in DiskBasedMap.
|
* Number of entries in BitCaskDiskMap.
|
||||||
*/
|
*/
|
||||||
public int getDiskBasedMapNumEntries() {
|
public int getDiskBasedMapNumEntries() {
|
||||||
return getDiskBasedMap().size();
|
return getDiskBasedMap().size();
|
||||||
@@ -160,6 +174,14 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
|
|||||||
return inMemoryMap.containsValue(value) || getDiskBasedMap().containsValue(value);
|
return inMemoryMap.containsValue(value) || getDiskBasedMap().containsValue(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean inMemoryContainsKey(Object key) {
|
||||||
|
return inMemoryMap.containsKey(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean inDiskContainsKey(Object key) {
|
||||||
|
return getDiskBasedMap().containsKey(key);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public R get(Object key) {
|
public R get(Object key) {
|
||||||
if (inMemoryMap.containsKey(key)) {
|
if (inMemoryMap.containsKey(key)) {
|
||||||
@@ -259,14 +281,24 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
|
|||||||
return entrySet;
|
return entrySet;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The type of map to use for storing the Key, values on disk after it spills
|
||||||
|
* from memory in the {@link ExternalSpillableMap}.
|
||||||
|
*/
|
||||||
|
public enum DiskMapType {
|
||||||
|
BITCASK,
|
||||||
|
ROCKS_DB,
|
||||||
|
UNKNOWN
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Iterator that wraps iterating over all the values for this map 1) inMemoryIterator - Iterates over all the data
|
* Iterator that wraps iterating over all the values for this map 1) inMemoryIterator - Iterates over all the data
|
||||||
* in-memory map 2) diskLazyFileIterator - Iterates over all the data spilled to disk.
|
* in-memory map 2) diskLazyFileIterator - Iterates over all the data spilled to disk.
|
||||||
*/
|
*/
|
||||||
private class IteratorWrapper<R> implements Iterator<R> {
|
private class IteratorWrapper<R> implements Iterator<R> {
|
||||||
|
|
||||||
private Iterator<R> inMemoryIterator;
|
private final Iterator<R> inMemoryIterator;
|
||||||
private Iterator<R> diskLazyFileIterator;
|
private final Iterator<R> diskLazyFileIterator;
|
||||||
|
|
||||||
public IteratorWrapper(Iterator<R> inMemoryIterator, Iterator<R> diskLazyFileIterator) {
|
public IteratorWrapper(Iterator<R> inMemoryIterator, Iterator<R> diskLazyFileIterator) {
|
||||||
this.inMemoryIterator = inMemoryIterator;
|
this.inMemoryIterator = inMemoryIterator;
|
||||||
|
|||||||
@@ -36,11 +36,11 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
|
|||||||
// Used to access the value written at a specific position in the file
|
// Used to access the value written at a specific position in the file
|
||||||
private final String filePath;
|
private final String filePath;
|
||||||
// Stores the key and corresponding value's latest metadata spilled to disk
|
// Stores the key and corresponding value's latest metadata spilled to disk
|
||||||
private final Map<T, DiskBasedMap.ValueMetadata> inMemoryMetadataOfSpilledData;
|
private final Map<T, BitCaskDiskMap.ValueMetadata> inMemoryMetadataOfSpilledData;
|
||||||
|
|
||||||
private transient Thread shutdownThread = null;
|
private transient Thread shutdownThread = null;
|
||||||
|
|
||||||
public LazyFileIterable(String filePath, Map<T, DiskBasedMap.ValueMetadata> map) {
|
public LazyFileIterable(String filePath, Map<T, BitCaskDiskMap.ValueMetadata> map) {
|
||||||
this.filePath = filePath;
|
this.filePath = filePath;
|
||||||
this.inMemoryMetadataOfSpilledData = map;
|
this.inMemoryMetadataOfSpilledData = map;
|
||||||
}
|
}
|
||||||
@@ -61,16 +61,16 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
|
|||||||
|
|
||||||
private final String filePath;
|
private final String filePath;
|
||||||
private BufferedRandomAccessFile readOnlyFileHandle;
|
private BufferedRandomAccessFile readOnlyFileHandle;
|
||||||
private final Iterator<Map.Entry<T, DiskBasedMap.ValueMetadata>> metadataIterator;
|
private final Iterator<Map.Entry<T, BitCaskDiskMap.ValueMetadata>> metadataIterator;
|
||||||
|
|
||||||
public LazyFileIterator(String filePath, Map<T, DiskBasedMap.ValueMetadata> map) throws IOException {
|
public LazyFileIterator(String filePath, Map<T, BitCaskDiskMap.ValueMetadata> map) throws IOException {
|
||||||
this.filePath = filePath;
|
this.filePath = filePath;
|
||||||
this.readOnlyFileHandle = new BufferedRandomAccessFile(filePath, "r", DiskBasedMap.BUFFER_SIZE);
|
this.readOnlyFileHandle = new BufferedRandomAccessFile(filePath, "r", BitCaskDiskMap.BUFFER_SIZE);
|
||||||
readOnlyFileHandle.seek(0);
|
readOnlyFileHandle.seek(0);
|
||||||
|
|
||||||
// sort the map in increasing order of offset of value so disk seek is only in one(forward) direction
|
// sort the map in increasing order of offset of value so disk seek is only in one(forward) direction
|
||||||
this.metadataIterator = map.entrySet().stream()
|
this.metadataIterator = map.entrySet().stream()
|
||||||
.sorted((Map.Entry<T, DiskBasedMap.ValueMetadata> o1, Map.Entry<T, DiskBasedMap.ValueMetadata> o2) -> o1
|
.sorted((Map.Entry<T, BitCaskDiskMap.ValueMetadata> o1, Map.Entry<T, BitCaskDiskMap.ValueMetadata> o2) -> o1
|
||||||
.getValue().getOffsetOfValue().compareTo(o2.getValue().getOffsetOfValue()))
|
.getValue().getOffsetOfValue().compareTo(o2.getValue().getOffsetOfValue()))
|
||||||
.collect(Collectors.toList()).iterator();
|
.collect(Collectors.toList()).iterator();
|
||||||
this.addShutdownHook();
|
this.addShutdownHook();
|
||||||
@@ -90,8 +90,8 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
|
|||||||
if (!hasNext()) {
|
if (!hasNext()) {
|
||||||
throw new IllegalStateException("next() called on EOF'ed stream. File :" + filePath);
|
throw new IllegalStateException("next() called on EOF'ed stream. File :" + filePath);
|
||||||
}
|
}
|
||||||
Map.Entry<T, DiskBasedMap.ValueMetadata> entry = this.metadataIterator.next();
|
Map.Entry<T, BitCaskDiskMap.ValueMetadata> entry = this.metadataIterator.next();
|
||||||
return DiskBasedMap.get(entry.getValue(), readOnlyFileHandle);
|
return BitCaskDiskMap.get(entry.getValue(), readOnlyFileHandle);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -45,6 +45,7 @@ import java.io.File;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
@@ -64,11 +65,13 @@ public class RocksDBDAO {
|
|||||||
private transient RocksDB rocksDB;
|
private transient RocksDB rocksDB;
|
||||||
private boolean closed = false;
|
private boolean closed = false;
|
||||||
private final String rocksDBBasePath;
|
private final String rocksDBBasePath;
|
||||||
|
private long totalBytesWritten;
|
||||||
|
|
||||||
public RocksDBDAO(String basePath, String rocksDBBasePath) {
|
public RocksDBDAO(String basePath, String rocksDBBasePath) {
|
||||||
this.rocksDBBasePath =
|
this.rocksDBBasePath =
|
||||||
String.format("%s/%s/%s", rocksDBBasePath, basePath.replace("/", "_"), UUID.randomUUID().toString());
|
String.format("%s/%s/%s", rocksDBBasePath, basePath.replace("/", "_"), UUID.randomUUID().toString());
|
||||||
init();
|
init();
|
||||||
|
totalBytesWritten = 0L;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -169,7 +172,7 @@ public class RocksDBDAO {
|
|||||||
*/
|
*/
|
||||||
public <T extends Serializable> void putInBatch(WriteBatch batch, String columnFamilyName, String key, T value) {
|
public <T extends Serializable> void putInBatch(WriteBatch batch, String columnFamilyName, String key, T value) {
|
||||||
try {
|
try {
|
||||||
byte[] payload = SerializationUtils.serialize(value);
|
byte[] payload = serializePayload(value);
|
||||||
batch.put(managedHandlesMap.get(columnFamilyName), key.getBytes(), payload);
|
batch.put(managedHandlesMap.get(columnFamilyName), key.getBytes(), payload);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new HoodieException(e);
|
throw new HoodieException(e);
|
||||||
@@ -189,7 +192,7 @@ public class RocksDBDAO {
|
|||||||
K key, T value) {
|
K key, T value) {
|
||||||
try {
|
try {
|
||||||
byte[] keyBytes = SerializationUtils.serialize(key);
|
byte[] keyBytes = SerializationUtils.serialize(key);
|
||||||
byte[] payload = SerializationUtils.serialize(value);
|
byte[] payload = serializePayload(value);
|
||||||
batch.put(managedHandlesMap.get(columnFamilyName), keyBytes, payload);
|
batch.put(managedHandlesMap.get(columnFamilyName), keyBytes, payload);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new HoodieException(e);
|
throw new HoodieException(e);
|
||||||
@@ -206,7 +209,7 @@ public class RocksDBDAO {
|
|||||||
*/
|
*/
|
||||||
public <T extends Serializable> void put(String columnFamilyName, String key, T value) {
|
public <T extends Serializable> void put(String columnFamilyName, String key, T value) {
|
||||||
try {
|
try {
|
||||||
byte[] payload = SerializationUtils.serialize(value);
|
byte[] payload = serializePayload(value);
|
||||||
getRocksDB().put(managedHandlesMap.get(columnFamilyName), key.getBytes(), payload);
|
getRocksDB().put(managedHandlesMap.get(columnFamilyName), key.getBytes(), payload);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new HoodieException(e);
|
throw new HoodieException(e);
|
||||||
@@ -223,7 +226,7 @@ public class RocksDBDAO {
|
|||||||
*/
|
*/
|
||||||
public <K extends Serializable, T extends Serializable> void put(String columnFamilyName, K key, T value) {
|
public <K extends Serializable, T extends Serializable> void put(String columnFamilyName, K key, T value) {
|
||||||
try {
|
try {
|
||||||
byte[] payload = SerializationUtils.serialize(value);
|
byte[] payload = serializePayload(value);
|
||||||
getRocksDB().put(managedHandlesMap.get(columnFamilyName), SerializationUtils.serialize(key), payload);
|
getRocksDB().put(managedHandlesMap.get(columnFamilyName), SerializationUtils.serialize(key), payload);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new HoodieException(e);
|
throw new HoodieException(e);
|
||||||
@@ -351,6 +354,16 @@ public class RocksDBDAO {
|
|||||||
return results.stream();
|
return results.stream();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return Iterator of key-value pairs from RocksIterator.
|
||||||
|
*
|
||||||
|
* @param columnFamilyName Column Family Name
|
||||||
|
* @param <T> Type of value stored
|
||||||
|
*/
|
||||||
|
public <T extends Serializable> Iterator<T> iterator(String columnFamilyName) {
|
||||||
|
return new IteratorWrapper<>(getRocksDB().newIterator(managedHandlesMap.get(columnFamilyName)));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Perform a prefix delete and return stream of key-value pairs retrieved.
|
* Perform a prefix delete and return stream of key-value pairs retrieved.
|
||||||
*
|
*
|
||||||
@@ -448,10 +461,48 @@ public class RocksDBDAO {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getTotalBytesWritten() {
|
||||||
|
return totalBytesWritten;
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T extends Serializable> byte[] serializePayload(T value) throws IOException {
|
||||||
|
byte[] payload = SerializationUtils.serialize(value);
|
||||||
|
totalBytesWritten += payload.length;
|
||||||
|
return payload;
|
||||||
|
}
|
||||||
|
|
||||||
String getRocksDBBasePath() {
|
String getRocksDBBasePath() {
|
||||||
return rocksDBBasePath;
|
return rocksDBBasePath;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link Iterator} wrapper for RocksDb Iterator {@link RocksIterator}.
|
||||||
|
*/
|
||||||
|
private static class IteratorWrapper<R> implements Iterator<R> {
|
||||||
|
|
||||||
|
private final RocksIterator iterator;
|
||||||
|
|
||||||
|
public IteratorWrapper(final RocksIterator iterator) {
|
||||||
|
this.iterator = iterator;
|
||||||
|
iterator.seekToFirst();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
return iterator.isValid();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public R next() {
|
||||||
|
if (!hasNext()) {
|
||||||
|
throw new IllegalStateException("next() called on rocksDB with no more valid entries");
|
||||||
|
}
|
||||||
|
R val = SerializationUtils.deserialize(iterator.value());
|
||||||
|
iterator.next();
|
||||||
|
return val;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Functional interface for stacking operation to Write batch.
|
* Functional interface for stacking operation to Write batch.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -0,0 +1,176 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.common.util.collection;
|
||||||
|
|
||||||
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
import org.apache.hudi.exception.HoodieNotSupportedException;
|
||||||
|
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.AbstractMap;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.Spliterators;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
import java.util.stream.StreamSupport;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class provides a disk spillable only map implementation.
|
||||||
|
* All of the data is stored using the RocksDB implementation.
|
||||||
|
*/
|
||||||
|
public final class RocksDbDiskMap<T extends Serializable, R extends Serializable> implements DiskMap<T, R> {
|
||||||
|
// ColumnFamily allows partitioning data within RockDB, which allows
|
||||||
|
// independent configuration and faster deletes across partitions
|
||||||
|
// https://github.com/facebook/rocksdb/wiki/Column-Families
|
||||||
|
// For this use case, we use a single static column family/ partition
|
||||||
|
//
|
||||||
|
private static final String COLUMN_FAMILY_NAME = "spill_map";
|
||||||
|
|
||||||
|
private static final Logger LOG = LogManager.getLogger(RocksDbDiskMap.class);
|
||||||
|
// Stores the key and corresponding value's latest metadata spilled to disk
|
||||||
|
private final Set<T> keySet;
|
||||||
|
private final String rocksDbStoragePath;
|
||||||
|
private RocksDBDAO rocksDb;
|
||||||
|
|
||||||
|
public RocksDbDiskMap(String rocksDbStoragePath) throws IOException {
|
||||||
|
this.keySet = new HashSet<>();
|
||||||
|
this.rocksDbStoragePath = rocksDbStoragePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int size() {
|
||||||
|
return keySet.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isEmpty() {
|
||||||
|
return keySet.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean containsKey(Object key) {
|
||||||
|
return keySet.contains((T) key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean containsValue(Object value) {
|
||||||
|
throw new HoodieNotSupportedException("unable to compare values in map");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public R get(Object key) {
|
||||||
|
if (!containsKey(key)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return getRocksDb().get(COLUMN_FAMILY_NAME, (T) key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public R put(T key, R value) {
|
||||||
|
getRocksDb().put(COLUMN_FAMILY_NAME, key, value);
|
||||||
|
keySet.add(key);
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public R remove(Object key) {
|
||||||
|
R value = get(key);
|
||||||
|
if (value != null) {
|
||||||
|
keySet.remove((T) key);
|
||||||
|
getRocksDb().delete(COLUMN_FAMILY_NAME, (T) key);
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void putAll(Map<? extends T, ? extends R> keyValues) {
|
||||||
|
getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, COLUMN_FAMILY_NAME, key, value)));
|
||||||
|
keySet.addAll(keyValues.keySet());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clear() {
|
||||||
|
close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<T> keySet() {
|
||||||
|
return keySet;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<R> values() {
|
||||||
|
throw new HoodieException("Unsupported Operation Exception");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Entry<T, R>> entrySet() {
|
||||||
|
Set<Entry<T, R>> entrySet = new HashSet<>();
|
||||||
|
for (T key : keySet) {
|
||||||
|
entrySet.add(new AbstractMap.SimpleEntry<>(key, get(key)));
|
||||||
|
}
|
||||||
|
return entrySet;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Custom iterator to iterate over values written to disk.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Iterator<R> iterator() {
|
||||||
|
return getRocksDb().iterator(COLUMN_FAMILY_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<R> valueStream() {
|
||||||
|
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator(), 0), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long sizeOfFileOnDiskInBytes() {
|
||||||
|
return getRocksDb().getTotalBytesWritten();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
keySet.clear();
|
||||||
|
if (null != rocksDb) {
|
||||||
|
rocksDb.close();
|
||||||
|
}
|
||||||
|
rocksDb = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private RocksDBDAO getRocksDb() {
|
||||||
|
if (null == rocksDb) {
|
||||||
|
synchronized (this) {
|
||||||
|
if (null == rocksDb) {
|
||||||
|
rocksDb = new RocksDBDAO(COLUMN_FAMILY_NAME, rocksDbStoragePath);
|
||||||
|
rocksDb.addColumnFamily(COLUMN_FAMILY_NAME);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return rocksDb;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -57,9 +57,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
|||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests dis based map {@link DiskBasedMap}.
|
* Tests dis based map {@link BitCaskDiskMap}.
|
||||||
*/
|
*/
|
||||||
public class TestDiskBasedMap extends HoodieCommonTestHarness {
|
public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void setup() {
|
public void setup() {
|
||||||
@@ -68,7 +68,7 @@ public class TestDiskBasedMap extends HoodieCommonTestHarness {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSimpleInsert() throws IOException, URISyntaxException {
|
public void testSimpleInsert() throws IOException, URISyntaxException {
|
||||||
DiskBasedMap records = new DiskBasedMap<>(basePath);
|
BitCaskDiskMap records = new BitCaskDiskMap<>(basePath);
|
||||||
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||||
((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
|
((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
|
||||||
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
|
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
|
||||||
@@ -86,7 +86,7 @@ public class TestDiskBasedMap extends HoodieCommonTestHarness {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSimpleInsertWithoutHoodieMetadata() throws IOException, URISyntaxException {
|
public void testSimpleInsertWithoutHoodieMetadata() throws IOException, URISyntaxException {
|
||||||
DiskBasedMap records = new DiskBasedMap<>(basePath);
|
BitCaskDiskMap records = new BitCaskDiskMap<>(basePath);
|
||||||
List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);
|
List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);
|
||||||
Set<String> recordKeys = new HashSet<>();
|
Set<String> recordKeys = new HashSet<>();
|
||||||
// insert generated records into the map
|
// insert generated records into the map
|
||||||
@@ -109,7 +109,7 @@ public class TestDiskBasedMap extends HoodieCommonTestHarness {
|
|||||||
public void testSimpleUpsert() throws IOException, URISyntaxException {
|
public void testSimpleUpsert() throws IOException, URISyntaxException {
|
||||||
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
|
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
|
||||||
|
|
||||||
DiskBasedMap records = new DiskBasedMap<>(basePath);
|
BitCaskDiskMap records = new BitCaskDiskMap<>(basePath);
|
||||||
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||||
|
|
||||||
// perform some inserts
|
// perform some inserts
|
||||||
@@ -189,7 +189,7 @@ public class TestDiskBasedMap extends HoodieCommonTestHarness {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPutAll() throws IOException, URISyntaxException {
|
public void testPutAll() throws IOException, URISyntaxException {
|
||||||
DiskBasedMap<String, HoodieRecord> records = new DiskBasedMap<>(basePath);
|
BitCaskDiskMap<String, HoodieRecord> records = new BitCaskDiskMap<>(basePath);
|
||||||
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||||
Map<String, HoodieRecord> recordMap = new HashMap<>();
|
Map<String, HoodieRecord> recordMap = new HashMap<>();
|
||||||
iRecords.forEach(r -> {
|
iRecords.forEach(r -> {
|
||||||
@@ -38,6 +38,8 @@ import org.junit.jupiter.api.BeforeEach;
|
|||||||
import org.junit.jupiter.api.MethodOrderer.Alphanumeric;
|
import org.junit.jupiter.api.MethodOrderer.Alphanumeric;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.TestMethodOrder;
|
import org.junit.jupiter.api.TestMethodOrder;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.EnumSource;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.UncheckedIOException;
|
import java.io.UncheckedIOException;
|
||||||
@@ -45,6 +47,7 @@ import java.net.URISyntaxException;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
@@ -65,32 +68,48 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
|
|||||||
failureOutputPath = basePath + "/test_fail";
|
failureOutputPath = basePath + "/test_fail";
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void simpleInsertTest() throws IOException, URISyntaxException {
|
@EnumSource(ExternalSpillableMap.DiskMapType.class)
|
||||||
|
public void simpleInsertTest(ExternalSpillableMap.DiskMapType diskMapType) throws IOException, URISyntaxException {
|
||||||
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
|
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
|
||||||
String payloadClazz = HoodieAvroPayload.class.getName();
|
|
||||||
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
||||||
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); // 16B
|
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
|
||||||
|
new HoodieRecordSizeEstimator(schema), diskMapType); // 16B
|
||||||
|
|
||||||
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||||
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
|
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
|
||||||
assert (recordKeys.size() == 100);
|
assert (recordKeys.size() == 100);
|
||||||
|
|
||||||
|
// Test iterator
|
||||||
Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = records.iterator();
|
Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = records.iterator();
|
||||||
List<HoodieRecord> oRecords = new ArrayList<>();
|
int cntSize = 0;
|
||||||
while (itr.hasNext()) {
|
while (itr.hasNext()) {
|
||||||
HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
|
HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
|
||||||
oRecords.add(rec);
|
cntSize++;
|
||||||
assert recordKeys.contains(rec.getRecordKey());
|
assert recordKeys.contains(rec.getRecordKey());
|
||||||
}
|
}
|
||||||
|
assertEquals(recordKeys.size(), cntSize);
|
||||||
|
|
||||||
|
// Test value stream
|
||||||
|
List<HoodieRecord<? extends HoodieRecordPayload>> values = records.valueStream().collect(Collectors.toList());
|
||||||
|
cntSize = 0;
|
||||||
|
for (HoodieRecord value : values) {
|
||||||
|
assert recordKeys.contains(value.getRecordKey());
|
||||||
|
cntSize++;
|
||||||
|
}
|
||||||
|
assertEquals(recordKeys.size(), cntSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void testSimpleUpsert() throws IOException, URISyntaxException {
|
@EnumSource(ExternalSpillableMap.DiskMapType.class)
|
||||||
|
public void testSimpleUpsert(ExternalSpillableMap.DiskMapType diskMapType) throws IOException, URISyntaxException {
|
||||||
|
|
||||||
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
|
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
|
||||||
|
|
||||||
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
||||||
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); // 16B
|
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
|
||||||
|
new HoodieRecordSizeEstimator(schema), diskMapType); // 16B
|
||||||
|
|
||||||
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||||
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
|
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
|
||||||
@@ -120,14 +139,16 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void testAllMapOperations() throws IOException, URISyntaxException {
|
@EnumSource(ExternalSpillableMap.DiskMapType.class)
|
||||||
|
public void testAllMapOperations(ExternalSpillableMap.DiskMapType diskMapType) throws IOException, URISyntaxException {
|
||||||
|
|
||||||
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
|
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
|
||||||
String payloadClazz = HoodieAvroPayload.class.getName();
|
String payloadClazz = HoodieAvroPayload.class.getName();
|
||||||
|
|
||||||
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
||||||
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); // 16B
|
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
|
||||||
|
new HoodieRecordSizeEstimator(schema), diskMapType); // 16B
|
||||||
|
|
||||||
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||||
// insert a bunch of records so that values spill to disk too
|
// insert a bunch of records so that values spill to disk too
|
||||||
@@ -176,12 +197,14 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
|
|||||||
assertTrue(records.size() == 0);
|
assertTrue(records.size() == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void simpleTestWithException() throws IOException, URISyntaxException {
|
@EnumSource(ExternalSpillableMap.DiskMapType.class)
|
||||||
|
public void simpleTestWithException(ExternalSpillableMap.DiskMapType diskMapType) throws IOException, URISyntaxException {
|
||||||
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
|
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
|
||||||
|
|
||||||
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records = new ExternalSpillableMap<>(16L,
|
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records = new ExternalSpillableMap<>(16L,
|
||||||
failureOutputPath, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); // 16B
|
failureOutputPath, new DefaultSizeEstimator(),
|
||||||
|
new HoodieRecordSizeEstimator(schema), diskMapType); // 16B
|
||||||
|
|
||||||
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||||
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
|
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
|
||||||
@@ -194,13 +217,15 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void testDataCorrectnessWithUpsertsToDataInMapAndOnDisk() throws IOException, URISyntaxException {
|
@EnumSource(ExternalSpillableMap.DiskMapType.class)
|
||||||
|
public void testDataCorrectnessWithUpsertsToDataInMapAndOnDisk(ExternalSpillableMap.DiskMapType diskMapType) throws IOException, URISyntaxException {
|
||||||
|
|
||||||
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
|
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
|
||||||
|
|
||||||
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
||||||
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); // 16B
|
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
|
||||||
|
new HoodieRecordSizeEstimator(schema), diskMapType); // 16B
|
||||||
|
|
||||||
List<String> recordKeys = new ArrayList<>();
|
List<String> recordKeys = new ArrayList<>();
|
||||||
// Ensure we spill to disk
|
// Ensure we spill to disk
|
||||||
@@ -245,13 +270,15 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
|
|||||||
assert newCommitTime.contentEquals(gRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString());
|
assert newCommitTime.contentEquals(gRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void testDataCorrectnessWithoutHoodieMetadata() throws IOException, URISyntaxException {
|
@EnumSource(ExternalSpillableMap.DiskMapType.class)
|
||||||
|
public void testDataCorrectnessWithoutHoodieMetadata(ExternalSpillableMap.DiskMapType diskMapType) throws IOException, URISyntaxException {
|
||||||
|
|
||||||
Schema schema = SchemaTestUtil.getSimpleSchema();
|
Schema schema = SchemaTestUtil.getSimpleSchema();
|
||||||
|
|
||||||
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
||||||
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); // 16B
|
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
|
||||||
|
new HoodieRecordSizeEstimator(schema), diskMapType); // 16B
|
||||||
|
|
||||||
List<String> recordKeys = new ArrayList<>();
|
List<String> recordKeys = new ArrayList<>();
|
||||||
// Ensure we spill to disk
|
// Ensure we spill to disk
|
||||||
@@ -311,4 +338,4 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
|
|||||||
// TODO : come up with a performance eval test for spillableMap
|
// TODO : come up with a performance eval test for spillableMap
|
||||||
@Test
|
@Test
|
||||||
public void testLargeInsertUpsert() {}
|
public void testLargeInsertUpsert() {}
|
||||||
}
|
}
|
||||||
@@ -38,7 +38,7 @@ import java.util.List;
|
|||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests RocksDB based map {@link RocksDBBasedMap}.
|
* Tests RocksDB based map {@link RocksDbDiskMap}.
|
||||||
*/
|
*/
|
||||||
public class TestRocksDbBasedMap extends HoodieCommonTestHarness {
|
public class TestRocksDbBasedMap extends HoodieCommonTestHarness {
|
||||||
|
|
||||||
@@ -49,7 +49,7 @@ public class TestRocksDbBasedMap extends HoodieCommonTestHarness {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSimple() throws IOException, URISyntaxException {
|
public void testSimple() throws IOException, URISyntaxException {
|
||||||
RocksDBBasedMap records = new RocksDBBasedMap(basePath);
|
RocksDbDiskMap records = new RocksDbDiskMap(basePath);
|
||||||
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||||
((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
|
((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
|
||||||
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
|
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
|
||||||
|
|||||||
@@ -0,0 +1,204 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.common.util.collection;
|
||||||
|
|
||||||
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.avro.generic.GenericRecord;
|
||||||
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
|
|
||||||
|
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||||
|
import org.apache.hudi.common.model.HoodieAvroPayload;
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
|
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
|
||||||
|
import org.apache.hudi.common.testutils.SchemaTestUtil;
|
||||||
|
import org.apache.hudi.common.testutils.SpillableMapTestUtils;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the rocksDb based Map {@link RocksDbDiskMap}
|
||||||
|
* that is used by {@link ExternalSpillableMap}.
|
||||||
|
*/
|
||||||
|
public class TestRocksDbDiskMap extends HoodieCommonTestHarness {
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() {
|
||||||
|
initPath();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleInsertSequential() throws IOException, URISyntaxException {
|
||||||
|
RocksDbDiskMap<String, HoodieRecord<? extends HoodieRecordPayload>> rocksDBBasedMap = new RocksDbDiskMap<>(basePath);
|
||||||
|
List<String> recordKeys = setupMapWithRecords(rocksDBBasedMap, 100);
|
||||||
|
|
||||||
|
Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = rocksDBBasedMap.iterator();
|
||||||
|
int cntSize = 0;
|
||||||
|
while (itr.hasNext()) {
|
||||||
|
HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
|
||||||
|
cntSize++;
|
||||||
|
assert recordKeys.contains(rec.getRecordKey());
|
||||||
|
}
|
||||||
|
assertEquals(recordKeys.size(), cntSize);
|
||||||
|
|
||||||
|
// Test value stream
|
||||||
|
long currentTimeMs = System.currentTimeMillis();
|
||||||
|
List<HoodieRecord<? extends HoodieRecordPayload>> values =
|
||||||
|
rocksDBBasedMap.valueStream().collect(Collectors.toList());
|
||||||
|
cntSize = 0;
|
||||||
|
for (HoodieRecord value : values) {
|
||||||
|
assert recordKeys.contains(value.getRecordKey());
|
||||||
|
cntSize++;
|
||||||
|
}
|
||||||
|
assertEquals(recordKeys.size(), cntSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleInsertRandomAccess() throws IOException, URISyntaxException {
|
||||||
|
RocksDbDiskMap rocksDBBasedMap = new RocksDbDiskMap<>(basePath);
|
||||||
|
List<String> recordKeys = setupMapWithRecords(rocksDBBasedMap, 100);
|
||||||
|
|
||||||
|
Random random = new Random();
|
||||||
|
for (int i = 0; i < recordKeys.size(); i++) {
|
||||||
|
String key = recordKeys.get(random.nextInt(recordKeys.size()));
|
||||||
|
assert rocksDBBasedMap.get(key) != null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleInsertWithoutHoodieMetadata() throws IOException, URISyntaxException {
|
||||||
|
RocksDbDiskMap rocksDBBasedMap = new RocksDbDiskMap<>(basePath);
|
||||||
|
List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);
|
||||||
|
Set<String> recordKeys = new HashSet<>();
|
||||||
|
// insert generated records into the map
|
||||||
|
hoodieRecords.forEach(r -> {
|
||||||
|
rocksDBBasedMap.put(r.getRecordKey(), r);
|
||||||
|
recordKeys.add(r.getRecordKey());
|
||||||
|
});
|
||||||
|
// make sure records have spilled to disk
|
||||||
|
assertTrue(rocksDBBasedMap.sizeOfFileOnDiskInBytes() > 0);
|
||||||
|
Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = rocksDBBasedMap.iterator();
|
||||||
|
int cntSize = 0;
|
||||||
|
while (itr.hasNext()) {
|
||||||
|
HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
|
||||||
|
cntSize++;
|
||||||
|
assert recordKeys.contains(rec.getRecordKey());
|
||||||
|
}
|
||||||
|
assertEquals(recordKeys.size(), cntSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleUpsert() throws IOException, URISyntaxException {
|
||||||
|
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
|
||||||
|
|
||||||
|
RocksDbDiskMap rocksDBBasedMap = new RocksDbDiskMap<>(basePath);
|
||||||
|
List<IndexedRecord> insertedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||||
|
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(insertedRecords, rocksDBBasedMap);
|
||||||
|
String oldCommitTime =
|
||||||
|
((GenericRecord) insertedRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
|
||||||
|
|
||||||
|
// generate updates from inserts for first 50 keys / subset of keys
|
||||||
|
List<IndexedRecord> updatedRecords = SchemaTestUtil.updateHoodieTestRecords(recordKeys.subList(0, 50),
|
||||||
|
SchemaTestUtil.generateHoodieTestRecords(0, 50), HoodieActiveTimeline.createNewInstantTime());
|
||||||
|
String newCommitTime =
|
||||||
|
((GenericRecord) updatedRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
|
||||||
|
|
||||||
|
// perform upserts
|
||||||
|
List<String> updatedRecordKeys = SpillableMapTestUtils.upsertRecords(updatedRecords, rocksDBBasedMap);
|
||||||
|
|
||||||
|
// Upserted records (on disk) should have the latest commit time
|
||||||
|
Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = rocksDBBasedMap.iterator();
|
||||||
|
while (itr.hasNext()) {
|
||||||
|
HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
|
||||||
|
try {
|
||||||
|
IndexedRecord indexedRecord = (IndexedRecord) rec.getData().getInsertValue(schema).get();
|
||||||
|
String latestCommitTime =
|
||||||
|
((GenericRecord) indexedRecord).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
|
||||||
|
assert recordKeys.contains(rec.getRecordKey()) || updatedRecordKeys.contains(rec.getRecordKey());
|
||||||
|
assertEquals(latestCommitTime, updatedRecordKeys.contains(rec.getRecordKey()) ? newCommitTime : oldCommitTime);
|
||||||
|
} catch (IOException io) {
|
||||||
|
throw new UncheckedIOException(io);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPutAll() throws IOException, URISyntaxException {
|
||||||
|
RocksDbDiskMap<String, HoodieRecord> rocksDBBasedMap = new RocksDbDiskMap<>(basePath);
|
||||||
|
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||||
|
Map<String, HoodieRecord> recordMap = new HashMap<>();
|
||||||
|
iRecords.forEach(r -> {
|
||||||
|
String key = ((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
|
||||||
|
String partitionPath = ((GenericRecord) r).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
|
||||||
|
HoodieRecord value = new HoodieRecord<>(new HoodieKey(key, partitionPath), new HoodieAvroPayload(Option.of((GenericRecord) r)));
|
||||||
|
recordMap.put(key, value);
|
||||||
|
});
|
||||||
|
|
||||||
|
rocksDBBasedMap.putAll(recordMap);
|
||||||
|
// make sure records have spilled to disk
|
||||||
|
assertTrue(rocksDBBasedMap.sizeOfFileOnDiskInBytes() > 0);
|
||||||
|
|
||||||
|
// make sure all added records are present
|
||||||
|
for (Map.Entry<String, HoodieRecord> entry : rocksDBBasedMap.entrySet()) {
|
||||||
|
assertTrue(recordMap.containsKey(entry.getKey()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleRemove() throws IOException, URISyntaxException {
|
||||||
|
RocksDbDiskMap rocksDBBasedMap = new RocksDbDiskMap<>(basePath);
|
||||||
|
List<String> recordKeys = setupMapWithRecords(rocksDBBasedMap, 100);
|
||||||
|
|
||||||
|
List<String> deleteKeys = recordKeys.subList(0, 10);
|
||||||
|
for (String deleteKey : deleteKeys) {
|
||||||
|
assert rocksDBBasedMap.remove(deleteKey) != null;
|
||||||
|
assert rocksDBBasedMap.get(deleteKey) == null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<String> setupMapWithRecords(RocksDbDiskMap rocksDBBasedMap, int numRecords) throws IOException, URISyntaxException {
|
||||||
|
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, numRecords);
|
||||||
|
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, rocksDBBasedMap);
|
||||||
|
// Ensure the number of records is correct
|
||||||
|
assertEquals(rocksDBBasedMap.size(), recordKeys.size());
|
||||||
|
// make sure records have spilled to disk
|
||||||
|
assertTrue(rocksDBBasedMap.sizeOfFileOnDiskInBytes() > 0);
|
||||||
|
return recordKeys;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user