1
0

Added support for Disk Spillable Compaction to prevent OOM issues

This commit is contained in:
Nishith Agarwal
2017-12-06 13:11:27 -08:00
committed by vinoth chandar
parent d495484399
commit 6fec9655a8
18 changed files with 1487 additions and 86 deletions

View File

@@ -27,7 +27,8 @@ import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.log.block.HoodieCommandBlock;
import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.common.util.SpillableMapUtils;
import com.uber.hoodie.common.util.collection.ExternalSpillableMap;
import com.uber.hoodie.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -40,10 +41,7 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -64,7 +62,7 @@ public class HoodieCompactedLogRecordScanner implements
private final static Logger log = LogManager.getLogger(HoodieCompactedLogRecordScanner.class);
// Final map of compacted/merged records
private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> records;
private final ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records;
// Reader schema for the records
private final Schema readerSchema;
// Total log files read - for metrics
@@ -82,22 +80,24 @@ public class HoodieCompactedLogRecordScanner implements
Deque<HoodieLogBlock> currentInstantLogBlocks = new ArrayDeque<>();
public HoodieCompactedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths,
Schema readerSchema, String latestInstantTime) {
Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes) {
this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime;
this.hoodieTableMetaClient = new HoodieTableMetaClient(fs.getConf(), basePath);
// load class from the payload fully qualified class name
this.payloadClassFQN = this.hoodieTableMetaClient.getTableConfig().getPayloadClass();
// Store merged records for all versions for this log file
this.records = Maps.newHashMap();
// iterate over the paths
Iterator<String> logFilePathsItr = logFilePaths.iterator();
while (logFilePathsItr.hasNext()) {
HoodieLogFile logFile = new HoodieLogFile(new Path(logFilePathsItr.next()));
log.info("Scanning log file " + logFile.getPath());
totalLogFiles.incrementAndGet();
try {
try {
// Store merged records for all versions for this log file, set the maxInMemoryMapSize to half,
// assign other half to the temporary map needed to read next block
records = new ExternalSpillableMap<>(maxMemorySizeInBytes, readerSchema,
payloadClassFQN, Optional.empty());
// iterate over the paths
Iterator<String> logFilePathsItr = logFilePaths.iterator();
while (logFilePathsItr.hasNext()) {
HoodieLogFile logFile = new HoodieLogFile(new Path(logFilePathsItr.next()));
log.info("Scanning log file " + logFile.getPath());
totalLogFiles.incrementAndGet();
// Use the HoodieLogFormatReader to iterate through the blocks in the log file
HoodieLogFormatReader reader = new HoodieLogFormatReader(fs, logFile, readerSchema, true);
while (reader.hasNext()) {
@@ -193,17 +193,21 @@ public class HoodieCompactedLogRecordScanner implements
break;
}
}
} catch (IOException e) {
throw new HoodieIOException("IOException when reading log file " + logFile);
}
// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty()) {
log.info("Merging the final data blocks in " + logFile.getPath());
merge(records, currentInstantLogBlocks);
// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty()) {
log.info("Merging the final blocks in " + logFile.getPath());
merge(records, currentInstantLogBlocks);
}
}
} catch (IOException e) {
throw new HoodieIOException("IOException when reading compacting log files");
}
this.totalRecordsToUpdate = records.size();
log.info("MaxMemoryInBytes allowed for compaction => " + maxMemorySizeInBytes);
log.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records.getInMemoryMapNumEntries());
log.info("Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records.getCurrentInMemoryMapSize());
log.info("Number of entries in DiskBasedMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries());
log.info("Size of file spilled to disk => " + records.getSizeOfFileOnDiskInBytes());
}
/**
@@ -223,21 +227,15 @@ public class HoodieCompactedLogRecordScanner implements
* the log records since the base data is merged on previous compaction
*/
private Map<String, HoodieRecord<? extends HoodieRecordPayload>> loadRecordsFromBlock(
HoodieAvroDataBlock dataBlock) {
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordsFromLastBlock = Maps
.newHashMap();
HoodieAvroDataBlock dataBlock) throws IOException {
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordsFromLastBlock = Maps.newHashMap();
List<IndexedRecord> recs = dataBlock.getRecords();
totalLogRecords.addAndGet(recs.size());
recs.forEach(rec -> {
String key = ((GenericRecord) rec).get(HoodieRecord.RECORD_KEY_METADATA_FIELD)
.toString();
String partitionPath =
((GenericRecord) rec).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
.toString();
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new HoodieRecord<>(
new HoodieKey(key, partitionPath),
ReflectionUtils
.loadPayload(this.payloadClassFQN, new Object[]{Optional.of(rec)}, Optional.class));
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord =
SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN);
if (recordsFromLastBlock.containsKey(key)) {
// Merge and store the merged record
HoodieRecordPayload combinedValue = recordsFromLastBlock.get(key).getData()
@@ -257,7 +255,7 @@ public class HoodieCompactedLogRecordScanner implements
* Merge the last seen log blocks with the accumulated records
*/
private void merge(Map<String, HoodieRecord<? extends HoodieRecordPayload>> records,
Deque<HoodieLogBlock> lastBlocks) {
Deque<HoodieLogBlock> lastBlocks) throws IOException {
while (!lastBlocks.isEmpty()) {
// poll the element at the bottom of the stack since that's the order it was inserted
HoodieLogBlock lastBlock = lastBlocks.pollLast();
@@ -280,7 +278,7 @@ public class HoodieCompactedLogRecordScanner implements
* Merge the records read from a single data block with the accumulated records
*/
private void merge(Map<String, HoodieRecord<? extends HoodieRecordPayload>> records,
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordsFromLastBlock) {
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordsFromLastBlock) {
recordsFromLastBlock.forEach((key, hoodieRecord) -> {
if (records.containsKey(key)) {
// Merge and store the merged record
@@ -297,7 +295,7 @@ public class HoodieCompactedLogRecordScanner implements
@Override
public Iterator<HoodieRecord<? extends HoodieRecordPayload>> iterator() {
return records.values().iterator();
return records.iterator();
}
public long getTotalLogFiles() {

View File

@@ -0,0 +1,155 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.util;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.util.collection.DiskBasedMap;
import com.uber.hoodie.common.util.collection.io.storage.SizeAwareDataOutputStream;
import com.uber.hoodie.exception.HoodieCorruptedDataException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Optional;
import java.util.zip.CRC32;
public class SpillableMapUtils {
public static ObjectMapper objectMapper = new ObjectMapper();
/**
* Using the schema and payload class, read and convert the bytes on disk to a HoodieRecord
* @param file
* @param schema
* @param payloadClazz
* @param valuePosition
* @param valueLength
* @param <R>
* @return
* @throws IOException
*/
public static <R> R readFromDisk(RandomAccessFile file, Schema schema, String payloadClazz,
long valuePosition, int valueLength) throws IOException {
DiskBasedMap.FileEntry fileEntry = readInternal(file, valuePosition, valueLength);
return (R) convertToHoodieRecordPayload(HoodieAvroUtils.bytesToAvro(fileEntry.getValue(), schema),
payloadClazz);
}
/**
* |crc|timestamp|sizeOfKey|SizeOfValue|key|value|
* @param file
* @param valuePosition
* @param valueLength
* @return
* @throws IOException
*/
private static DiskBasedMap.FileEntry readInternal(RandomAccessFile file, long valuePosition, int valueLength) throws IOException {
file.seek(valuePosition);
long crc = file.readLong();
long timestamp = file.readLong();
int keySize = file.readInt();
int valueSize = file.readInt();
byte [] key = new byte[keySize];
file.read(key, 0, keySize);
byte [] value = new byte[valueSize];
if(!(valueSize == valueLength)) {
throw new HoodieCorruptedDataException("unequal size of payload written to external file, data may be corrupted");
}
file.read(value, 0, valueSize);
long crcOfReadValue = generateChecksum(value);
if(!(crc == crcOfReadValue)) {
throw new HoodieCorruptedDataException("checksum of payload written to external disk does not match, " +
"data may be corrupted");
}
return new DiskBasedMap.FileEntry(crc, keySize, valueSize, key, value, timestamp);
}
/**
* Write Value and other metadata necessary to disk. Each entry has the following sequence of data
*
* |crc|timestamp|sizeOfKey|SizeOfValue|key|value|
*
* @param outputStream
* @param fileEntry
* @return
* @throws IOException
*/
public static long spillToDisk(SizeAwareDataOutputStream outputStream, DiskBasedMap.FileEntry fileEntry) throws IOException {
return spill(outputStream, fileEntry);
}
private static long spill(SizeAwareDataOutputStream outputStream, DiskBasedMap.FileEntry fileEntry)
throws IOException {
outputStream.writeLong(fileEntry.getCrc());
outputStream.writeLong(fileEntry.getTimestamp());
outputStream.writeInt(fileEntry.getSizeOfKey());
outputStream.writeInt(fileEntry.getSizeOfValue());
outputStream.write(fileEntry.getKey());
outputStream.write(fileEntry.getValue());
return outputStream.getSize();
}
/**
* Generate a checksum for a given set of bytes
* @param data
* @return
*/
public static long generateChecksum(byte [] data) {
CRC32 crc = new CRC32();
crc.update(data);
return crc.getValue();
}
/**
* Compute a bytes representation of the payload by serializing the contents
* This is used to estimate the size of the payload (either in memory or when written to disk)
* @param <R>
* @param value
* @param schema
* @return
* @throws IOException
*/
public static <R> int computePayloadSize(R value, Schema schema) throws IOException {
HoodieRecord payload = (HoodieRecord) value;
byte [] val = HoodieAvroUtils.avroToBytes((GenericRecord) payload.getData().getInsertValue(schema).get());
return val.length;
}
/**
* Utility method to convert bytes to HoodieRecord using schema and payload class
* @param rec
* @param payloadClazz
* @param <R>
* @return
* @throws IOException
*/
public static <R> R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz) {
String recKey = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)
.toString();
String partitionPath =
rec.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
.toString();
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new HoodieRecord<>(
new HoodieKey(recKey, partitionPath),
ReflectionUtils
.loadPayload(payloadClazz, new Object[]{Optional.of(rec)}, Optional.class));
return (R) hoodieRecord;
}
}

View File

@@ -0,0 +1,326 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.util.collection;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.SpillableMapUtils;
import com.uber.hoodie.common.util.collection.io.storage.SizeAwareDataOutputStream;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieNotSupportedException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
/**
* This class provides a disk spillable only map implementation. All of the data is
* currenly written to one file, without any rollover support. It uses the following :
* 1) An in-memory map that tracks the key-> latest ValueMetadata.
* 2) Current position in the file
* NOTE : Only String.class type supported for Key
* @param <T>
* @param <R>
*/
final public class DiskBasedMap<T,R> implements Map<T,R> {
// Stores the key and corresponding value's latest metadata spilled to disk
final private Map<T, ValueMetadata> inMemoryMetadataOfSpilledData;
// Read only file access to be able to seek to random positions to readFromDisk values
private RandomAccessFile readOnlyFileHandle;
// Write only OutputStream to be able to ONLY append to the file
private SizeAwareDataOutputStream writeOnlyFileHandle;
// FileOutputStream for the file handle to be able to force fsync
// since FileOutputStream's flush() does not force flush to disk
private FileOutputStream fileOutputStream;
// Current position in the file
private AtomicLong filePosition;
// Schema used to de-serialize payload written to disk
private Schema schema;
// Class used to de-serialize/realize payload written to disk
private String payloadClazz;
// FilePath to store the spilled data
private String filePath;
// Default file path prefix to put the spillable file
private static String DEFAULT_BASE_FILE_PATH = "/tmp/";
public final class ValueMetadata {
// FilePath to store the spilled data
private String filePath;
// Size (numberOfBytes) of the value written to disk
private Integer sizeOfValue;
// FilePosition of the value written to disk
private Long offsetOfValue;
// Current timestamp when the value was written to disk
private Long timestamp;
protected ValueMetadata(String filePath, int sizeOfValue, long offsetOfValue, long timestamp) {
this.filePath = filePath;
this.sizeOfValue = sizeOfValue;
this.offsetOfValue = offsetOfValue;
this.timestamp = timestamp;
}
public String getFilePath() {
return filePath;
}
public int getSizeOfValue() {
return sizeOfValue;
}
public Long getOffsetOfValue() {
return offsetOfValue;
}
public long getTimestamp() {
return timestamp;
}
}
public static final class FileEntry {
// Checksum of the value written to disk, compared during every readFromDisk to make sure no corruption
private Long crc;
// Size (numberOfBytes) of the key written to disk
private Integer sizeOfKey;
// Size (numberOfBytes) of the value written to disk
private Integer sizeOfValue;
// Actual key
private byte [] key;
// Actual value
private byte [] value;
// Current timestamp when the value was written to disk
private Long timestamp;
public FileEntry(long crc, int sizeOfKey, int sizeOfValue, byte [] key, byte [] value, long timestamp) {
this.crc = crc;
this.sizeOfKey = sizeOfKey;
this.sizeOfValue = sizeOfValue;
this.key = key;
this.value = value;
this.timestamp = timestamp;
}
public long getCrc() {
return crc;
}
public int getSizeOfKey() {
return sizeOfKey;
}
public int getSizeOfValue() {
return sizeOfValue;
}
public byte[] getKey() {
return key;
}
public byte[] getValue() {
return value;
}
public long getTimestamp() {
return timestamp;
}
}
protected DiskBasedMap(Schema schema, String payloadClazz, Optional<String> baseFilePath) throws IOException {
this.inMemoryMetadataOfSpilledData = new HashMap<>();
if(!baseFilePath.isPresent()) {
baseFilePath = Optional.of(DEFAULT_BASE_FILE_PATH);
}
this.filePath = baseFilePath.get() + UUID.randomUUID().toString();
File writeOnlyFileHandle = new File(filePath);
initFile(writeOnlyFileHandle);
this.fileOutputStream = new FileOutputStream(writeOnlyFileHandle, true);
this.writeOnlyFileHandle = new SizeAwareDataOutputStream(fileOutputStream);
this.filePosition = new AtomicLong(0L);
this.schema = schema;
this.payloadClazz = payloadClazz;
}
private void initFile(File writeOnlyFileHandle) throws IOException {
// delete the file if it exists
if(writeOnlyFileHandle.exists()) {
writeOnlyFileHandle.delete();
}
writeOnlyFileHandle.createNewFile();
// Open file in readFromDisk-only mode
readOnlyFileHandle = new RandomAccessFile(filePath, "r");
readOnlyFileHandle.seek(0);
// Make sure file is deleted when JVM exits
writeOnlyFileHandle.deleteOnExit();
addShutDownHook();
}
/**
* Register shutdown hook to force flush contents of the data written to FileOutputStream
* from OS page cache (typically 4 KB) to disk
*/
private void addShutDownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
if(writeOnlyFileHandle != null) {
writeOnlyFileHandle.flush();
fileOutputStream.getChannel().force(false);
writeOnlyFileHandle.close();
}
} catch(Exception e) {
// fail silently for any sort of exception
}
}
});
}
/**
* Custom iterator to iterate over values written to disk
* @return
*/
public Iterator<R> iterator() {
return new LazyFileIterable(readOnlyFileHandle,
inMemoryMetadataOfSpilledData, schema, payloadClazz).iterator();
}
/**
* Number of bytes spilled to disk
* @return
*/
public long sizeOfFileOnDiskInBytes() {
return filePosition.get();
}
@Override
public int size() {
return inMemoryMetadataOfSpilledData.size();
}
@Override
public boolean isEmpty() {
return inMemoryMetadataOfSpilledData.isEmpty();
}
@Override
public boolean containsKey(Object key) {
return inMemoryMetadataOfSpilledData.containsKey(key);
}
@Override
public boolean containsValue(Object value) {
throw new HoodieNotSupportedException("unable to compare values in map");
}
@Override
public R get(Object key) {
ValueMetadata entry = inMemoryMetadataOfSpilledData.get(key);
if(entry == null) {
return null;
}
try {
return SpillableMapUtils.readFromDisk(readOnlyFileHandle, schema,
payloadClazz, entry.getOffsetOfValue(), entry.getSizeOfValue());
} catch(IOException e) {
throw new HoodieIOException("Unable to readFromDisk Hoodie Record from disk", e);
}
}
@Override
public R put(T key, R value) {
//TODO (na) : check value instanceof HoodieRecordPayload, now assume every payload is HoodieRecord
HoodieRecord payload = (HoodieRecord) value;
try {
byte [] val = HoodieAvroUtils.avroToBytes((GenericRecord) payload.getData().getInsertValue(this.schema).get());
Integer valueSize = val.length;
Long timestamp = new Date().getTime();
this.inMemoryMetadataOfSpilledData.put(key, new DiskBasedMap.ValueMetadata(this.filePath, valueSize,
filePosition.get(), timestamp));
// TODO(na) : Test serializer performance for generic types
String serializedKey = SpillableMapUtils.objectMapper.writeValueAsString(key);
filePosition.set(SpillableMapUtils.spillToDisk(writeOnlyFileHandle,
new FileEntry(SpillableMapUtils.generateChecksum(val),
serializedKey.getBytes().length, valueSize, serializedKey.getBytes(), val, timestamp)));
} catch(IOException io) {
throw new HoodieIOException("Unable to store data in Disk Based map", io);
}
return value;
}
@Override
public R remove(Object key) {
R value = get(key);
inMemoryMetadataOfSpilledData.remove(key);
return value;
}
@Override
public void putAll(Map<? extends T, ? extends R> m) {
for(Map.Entry<? extends T, ? extends R> entry: m.entrySet()) {
put(entry.getKey(), entry.getValue());
}
}
@Override
public void clear() {
inMemoryMetadataOfSpilledData.clear();
// close input/output streams
try {
writeOnlyFileHandle.flush();
writeOnlyFileHandle.close();
new File(filePath).delete();
} catch(IOException e) {
throw new HoodieIOException("unable to clear map or delete file on disk", e);
}
}
@Override
public Set<T> keySet() {
return inMemoryMetadataOfSpilledData.keySet();
}
@Override
public Collection<R> values() {
throw new HoodieException("Unsupported Operation Exception");
}
@Override
public Set<Entry<T, R>> entrySet() {
Set<Entry<T, R>> entrySet = new HashSet<>();
for(T key: inMemoryMetadataOfSpilledData.keySet()) {
entrySet.add(new AbstractMap.SimpleEntry<>(key, get(key)));
}
return entrySet;
}
}

View File

@@ -0,0 +1,256 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.util.collection;
import com.uber.hoodie.common.util.SpillableMapUtils;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieNotSupportedException;
import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* An external map that spills content to disk when there is insufficient space for it
* to grow.
*
* This map holds 2 types of data structures :
*
* (1) Key-Value pairs in a in-memory map
* (2) Key-ValueMetadata pairs in an in-memory map which keeps a marker to the values spilled to disk
*
* NOTE : Values are only appended to disk. If a remove() is called, the entry is marked removed from the in-memory
* key-valueMetadata map but it's values will be lying around in the temp file on disk until the file is cleaned.
*
* The setting of the spill threshold faces the following trade-off: If the spill threshold is
* too high, the in-memory map may occupy more memory than is available, resulting in OOM.
* However, if the spill threshold is too low, we spill frequently and incur unnecessary disk
* writes.
* @param <T>
* @param <R>
*/
public class ExternalSpillableMap<T,R> implements Map<T,R> {
// maximum space allowed in-memory for this map
final private long maxInMemorySizeInBytes;
// current space occupied by this map in-memory
private Long currentInMemoryMapSize;
// Map to store key-values in memory until it hits maxInMemorySizeInBytes
final private Map<T,R> inMemoryMap;
// Map to store key-valuemetadata important to find the values spilled to disk
final private DiskBasedMap<T,R> diskBasedMap;
// Schema used to de-serialize and readFromDisk the records written to disk
final private Schema schema;
// An estimate of the size of each payload written to this map
private volatile long estimatedPayloadSize = 0;
// TODO(na) : a dynamic sizing factor to ensure we have space for other objects in memory and incorrect payload estimation
final private Double sizingFactorForInMemoryMap = 0.8;
private static Logger log = LogManager.getLogger(ExternalSpillableMap.class);
public ExternalSpillableMap(Long maxInMemorySizeInBytes, Schema schema,
String payloadClazz, Optional<String> baseFilePath) throws IOException {
this.inMemoryMap = new HashMap<>();
this.diskBasedMap = new DiskBasedMap<>(schema, payloadClazz, baseFilePath);
this.maxInMemorySizeInBytes = (long) Math.floor(maxInMemorySizeInBytes*sizingFactorForInMemoryMap);
this.schema = schema;
this.currentInMemoryMapSize = 0L;
}
/**
* A custom iterator to wrap over iterating in-memory + disk spilled data
* @return
*/
public Iterator<R> iterator() {
return new IteratorWrapper<>(inMemoryMap.values().iterator(), diskBasedMap.iterator());
}
/**
* Number of entries in DiskBasedMap
* @return
*/
public int getDiskBasedMapNumEntries() {
return diskBasedMap.size();
}
/**
* Number of bytes spilled to disk
* @return
*/
public long getSizeOfFileOnDiskInBytes() {
return diskBasedMap.sizeOfFileOnDiskInBytes();
}
/**
* Number of entries in InMemoryMap
* @return
*/
public int getInMemoryMapNumEntries() {
return inMemoryMap.size();
}
/**
* Approximate memory footprint of the in-memory map
* @return
*/
public long getCurrentInMemoryMapSize() {
return currentInMemoryMapSize;
}
@Override
public int size() {
return inMemoryMap.size() + diskBasedMap.size();
}
@Override
public boolean isEmpty() {
return inMemoryMap.isEmpty() && diskBasedMap.isEmpty();
}
@Override
public boolean containsKey(Object key) {
return inMemoryMap.containsKey(key) || diskBasedMap.containsKey(key);
}
@Override
public boolean containsValue(Object value) {
return inMemoryMap.containsValue(value) || diskBasedMap.containsValue(value);
}
@Override
public R get(Object key) {
if(inMemoryMap.containsKey(key)) {
return inMemoryMap.get(key);
} else if(diskBasedMap.containsKey(key)) {
return diskBasedMap.get(key);
}
return null;
}
@Override
public R put(T key, R value) {
try {
if (this.currentInMemoryMapSize < maxInMemorySizeInBytes || inMemoryMap.containsKey(key)) {
// Naive approach for now
if (estimatedPayloadSize == 0) {
this.estimatedPayloadSize = SpillableMapUtils.computePayloadSize(value, schema);
log.info("Estimated Payload size => " + estimatedPayloadSize);
}
if(!inMemoryMap.containsKey(key)) {
currentInMemoryMapSize += this.estimatedPayloadSize;
}
inMemoryMap.put(key, value);
} else {
diskBasedMap.put(key, value);
}
return value;
} catch(IOException io) {
throw new HoodieIOException("Unable to estimate size of payload", io);
}
}
@Override
public R remove(Object key) {
// NOTE : diskBasedMap.remove does not delete the data from disk
if(inMemoryMap.containsKey(key)) {
currentInMemoryMapSize -= estimatedPayloadSize;
return inMemoryMap.remove(key);
} else if(diskBasedMap.containsKey(key)) {
return diskBasedMap.remove(key);
}
return null;
}
@Override
public void putAll(Map<? extends T, ? extends R> m) {
for(Map.Entry<? extends T, ? extends R> entry: m.entrySet()) {
put(entry.getKey(), entry.getValue());
}
}
@Override
public void clear() {
inMemoryMap.clear();
diskBasedMap.clear();
currentInMemoryMapSize = 0L;
}
@Override
public Set<T> keySet() {
Set<T> keySet = new HashSet<T>();
keySet.addAll(inMemoryMap.keySet());
keySet.addAll(diskBasedMap.keySet());
return keySet;
}
@Override
public Collection<R> values() {
if(diskBasedMap.isEmpty()) {
return inMemoryMap.values();
}
throw new HoodieNotSupportedException("Cannot return all values in memory");
}
@Override
public Set<Entry<T, R>> entrySet() {
Set<Entry<T, R>> entrySet = new HashSet<>();
entrySet.addAll(inMemoryMap.entrySet());
entrySet.addAll(diskBasedMap.entrySet());
return entrySet;
}
/**
* Iterator that wraps iterating over all the values for this map
* 1) inMemoryIterator - Iterates over all the data in-memory map
* 2) diskLazyFileIterator - Iterates over all the data spilled to disk
* @param <R>
*/
private class IteratorWrapper<R> implements Iterator<R> {
private Iterator<R> inMemoryIterator;
private Iterator<R> diskLazyFileIterator;
public IteratorWrapper(Iterator<R> inMemoryIterator, Iterator<R> diskLazyFileIterator) {
this.inMemoryIterator = inMemoryIterator;
this.diskLazyFileIterator = diskLazyFileIterator;
}
@Override
public boolean hasNext() {
if(inMemoryIterator.hasNext()) {
return true;
}
return diskLazyFileIterator.hasNext();
}
@Override
public R next() {
if(inMemoryIterator.hasNext()) {
return inMemoryIterator.next();
}
return diskLazyFileIterator.next();
}
}
}

View File

@@ -0,0 +1,116 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.util.collection;
import com.uber.hoodie.common.util.SpillableMapUtils;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import org.apache.avro.Schema;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* Iterable to lazily fetch values spilled to disk.
* This class uses RandomAccessFile to randomly access the position of
* the latest value for a key spilled to disk and returns the result.
* @param <T>
*/
public class LazyFileIterable<T> implements Iterable<T> {
// Used to access the value written at a specific position in the file
private RandomAccessFile readOnlyFileHandle;
// Stores the key and corresponding value's latest metadata spilled to disk
private Map<T, DiskBasedMap.ValueMetadata> inMemoryMetadataOfSpilledData;
// Schema used to de-serialize payload written to disk
private Schema schema;
// Class used to de-serialize/realize payload written to disk
private String payloadClazz;
public LazyFileIterable(RandomAccessFile file, Map<T, DiskBasedMap.ValueMetadata> map,
Schema schema, String payloadClazz) {
this.readOnlyFileHandle = file;
this.inMemoryMetadataOfSpilledData = map;
this.schema = schema;
this.payloadClazz = payloadClazz;
}
@Override
public Iterator<T> iterator() {
try {
return new LazyFileIterator<>(readOnlyFileHandle, inMemoryMetadataOfSpilledData, schema, payloadClazz);
} catch(IOException io) {
throw new HoodieException("Unable to initialize iterator for file on disk", io);
}
}
/**
* Iterator implementation for the iterable defined above.
* @param <T>
*/
public class LazyFileIterator<T> implements Iterator<T> {
private RandomAccessFile readOnlyFileHandle;
private Schema schema;
private String payloadClazz;
private Iterator<Map.Entry<T, DiskBasedMap.ValueMetadata>> metadataIterator;
public LazyFileIterator(RandomAccessFile file, Map<T, DiskBasedMap.ValueMetadata> map,
Schema schema, String payloadClazz) throws IOException {
this.readOnlyFileHandle = file;
this.schema = schema;
this.payloadClazz = payloadClazz;
// sort the map in increasing order of offset of value so disk seek is only in one(forward) direction
this.metadataIterator = map
.entrySet()
.stream()
.sorted((Map.Entry<T, DiskBasedMap.ValueMetadata> o1, Map.Entry<T, DiskBasedMap.ValueMetadata> o2) ->
o1.getValue().getOffsetOfValue().compareTo(o2.getValue().getOffsetOfValue()))
.collect(Collectors.toList()).iterator();
}
@Override
public boolean hasNext() {
return this.metadataIterator.hasNext();
}
@Override
public T next() {
Map.Entry<T, DiskBasedMap.ValueMetadata> entry = this.metadataIterator.next();
try {
return SpillableMapUtils.readFromDisk(readOnlyFileHandle, schema,
payloadClazz, entry.getValue().getOffsetOfValue(), entry.getValue().getSizeOfValue());
} catch(IOException e) {
throw new HoodieIOException("Unable to read hoodie record from value spilled to disk", e);
}
}
@Override
public void remove() {
this.metadataIterator.remove();
}
@Override
public void forEachRemaining(Consumer<? super T> action) {
action.accept(next());
}
}
}

View File

@@ -0,0 +1,69 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.util.collection.io.storage;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
/**
* Wrapper for DataOutpuStream to keep track of number of bytes written
*/
public class SizeAwareDataOutputStream {
// Actual outpuStream
private DataOutputStream outputStream;
// Counter to keep track of number of bytes written
private AtomicLong size;
public SizeAwareDataOutputStream(FileOutputStream fileOutputStream) {
this.outputStream = new DataOutputStream(fileOutputStream);
this.size = new AtomicLong(0L);
}
public void writeLong(long v) throws IOException {
size.addAndGet(Long.BYTES);
outputStream.writeLong(v);
}
public void writeInt(int v) throws IOException {
size.addAndGet(Integer.BYTES);
outputStream.writeInt(v);
}
public void write(byte [] v) throws IOException {
size.addAndGet(v.length);
outputStream.write(v);
}
public void write(byte [] v, int offset, int len) throws IOException {
size.addAndGet(len + offset);
outputStream.write(v, offset, len);
}
public void flush() throws IOException {
outputStream.flush();
}
public void close() throws IOException {
outputStream.close();
}
public long getSize() {
return size.get();
}
}

View File

@@ -0,0 +1,32 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.exception;
/**
* <p> Exception thrown when any data corruption happens when reading/writing from temporary disk </p>
*/
public class HoodieCorruptedDataException extends HoodieException {
public HoodieCorruptedDataException(String msg) {
super(msg);
}
public HoodieCorruptedDataException(String msg, Throwable e) {
super(msg, e);
}
}