R readFromDisk(RandomAccessFile file, Schema schema, String payloadClazz,
- long valuePosition, int valueLength) throws IOException {
-
+ public static byte[] readBytesFromDisk(RandomAccessFile file, long valuePosition, int valueLength) throws IOException {
DiskBasedMap.FileEntry fileEntry = readInternal(file, valuePosition, valueLength);
- return (R) convertToHoodieRecordPayload(HoodieAvroUtils.bytesToAvro(fileEntry.getValue(), schema),
- payloadClazz);
+ return fileEntry.getValue();
}
/**
* |crc|timestamp|sizeOfKey|SizeOfValue|key|value|
+ *
* @param file
* @param valuePosition
* @param valueLength
@@ -66,15 +60,15 @@ public class SpillableMapUtils {
long timestamp = file.readLong();
int keySize = file.readInt();
int valueSize = file.readInt();
- byte [] key = new byte[keySize];
+ byte[] key = new byte[keySize];
file.read(key, 0, keySize);
- byte [] value = new byte[valueSize];
- if(!(valueSize == valueLength)) {
+ byte[] value = new byte[valueSize];
+ if (!(valueSize == valueLength)) {
throw new HoodieCorruptedDataException("unequal size of payload written to external file, data may be corrupted");
}
file.read(value, 0, valueSize);
long crcOfReadValue = generateChecksum(value);
- if(!(crc == crcOfReadValue)) {
+ if (!(crc == crcOfReadValue)) {
throw new HoodieCorruptedDataException("checksum of payload written to external disk does not match, " +
"data may be corrupted");
}
@@ -83,7 +77,7 @@ public class SpillableMapUtils {
/**
* Write Value and other metadata necessary to disk. Each entry has the following sequence of data
- *
+ *
* |crc|timestamp|sizeOfKey|SizeOfValue|key|value|
*
* @param outputStream
@@ -108,10 +102,11 @@ public class SpillableMapUtils {
/**
* Generate a checksum for a given set of bytes
+ *
* @param data
* @return
*/
- public static long generateChecksum(byte [] data) {
+ public static long generateChecksum(byte[] data) {
CRC32 crc = new CRC32();
crc.update(data);
return crc.getValue();
@@ -120,20 +115,19 @@ public class SpillableMapUtils {
/**
* Compute a bytes representation of the payload by serializing the contents
* This is used to estimate the size of the payload (either in memory or when written to disk)
+ *
* @param
* @param value
- * @param schema
* @return
* @throws IOException
*/
- public static int computePayloadSize(R value, Schema schema) throws IOException {
- HoodieRecord payload = (HoodieRecord) value;
- byte [] val = HoodieAvroUtils.avroToBytes((GenericRecord) payload.getData().getInsertValue(schema).get());
- return val.length;
+ public static long computePayloadSize(R value, Converter valueConverter) throws IOException {
+ return valueConverter.sizeEstimate(value);
}
/**
* Utility method to convert bytes to HoodieRecord using schema and payload class
+ *
* @param rec
* @param payloadClazz
* @param
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java
index 2503c409f..27ee0fba2 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java
@@ -16,15 +16,14 @@
package com.uber.hoodie.common.util.collection;
-import com.uber.hoodie.common.model.HoodieRecord;
-import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.SpillableMapUtils;
+import com.uber.hoodie.common.util.collection.converter.Converter;
import com.uber.hoodie.common.util.collection.io.storage.SizeAwareDataOutputStream;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieNotSupportedException;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import java.io.File;
import java.io.FileOutputStream;
@@ -48,13 +47,11 @@ import java.util.concurrent.atomic.AtomicLong;
* 1) An in-memory map that tracks the key-> latest ValueMetadata.
* 2) Current position in the file
* NOTE : Only String.class type supported for Key
- * @param
- * @param
*/
-final public class DiskBasedMap implements Map {
+final public class DiskBasedMap implements Map {
// Stores the key and corresponding value's latest metadata spilled to disk
- final private Map inMemoryMetadataOfSpilledData;
+ final private Map valueMetadataMap;
// Read only file access to be able to seek to random positions to readFromDisk values
private RandomAccessFile readOnlyFileHandle;
// Write only OutputStream to be able to ONLY append to the file
@@ -64,16 +61,20 @@ final public class DiskBasedMap implements Map {
private FileOutputStream fileOutputStream;
// Current position in the file
private AtomicLong filePosition;
- // Schema used to de-serialize payload written to disk
- private Schema schema;
- // Class used to de-serialize/realize payload written to disk
- private String payloadClazz;
// FilePath to store the spilled data
private String filePath;
// Default file path prefix to put the spillable file
private static String DEFAULT_BASE_FILE_PATH = "/tmp/";
+ // Key converter to convert key type to bytes
+ final private Converter keyConverter;
+ // Value converter to convert value type to bytes
+ final private Converter valueConverter;
+
+ private static Logger log = LogManager.getLogger(DiskBasedMap.class);
+
public final class ValueMetadata {
+
// FilePath to store the spilled data
private String filePath;
// Size (numberOfBytes) of the value written to disk
@@ -108,6 +109,7 @@ final public class DiskBasedMap implements Map {
}
public static final class FileEntry {
+
// Checksum of the value written to disk, compared during every readFromDisk to make sure no corruption
private Long crc;
// Size (numberOfBytes) of the key written to disk
@@ -115,13 +117,14 @@ final public class DiskBasedMap implements Map {
// Size (numberOfBytes) of the value written to disk
private Integer sizeOfValue;
// Actual key
- private byte [] key;
+ private byte[] key;
// Actual value
- private byte [] value;
+ private byte[] value;
// Current timestamp when the value was written to disk
private Long timestamp;
- public FileEntry(long crc, int sizeOfKey, int sizeOfValue, byte [] key, byte [] value, long timestamp) {
+ public FileEntry(long crc, int sizeOfKey, int sizeOfValue, byte[] key, byte[] value,
+ long timestamp) {
this.crc = crc;
this.sizeOfKey = sizeOfKey;
this.sizeOfValue = sizeOfValue;
@@ -155,10 +158,11 @@ final public class DiskBasedMap implements Map {
}
}
- protected DiskBasedMap(Schema schema, String payloadClazz, Optional baseFilePath) throws IOException {
- this.inMemoryMetadataOfSpilledData = new HashMap<>();
+ protected DiskBasedMap(Optional baseFilePath,
+ Converter keyConverter, Converter valueConverter) throws IOException {
+ this.valueMetadataMap = new HashMap<>();
- if(!baseFilePath.isPresent()) {
+ if (!baseFilePath.isPresent()) {
baseFilePath = Optional.of(DEFAULT_BASE_FILE_PATH);
}
this.filePath = baseFilePath.get() + UUID.randomUUID().toString();
@@ -168,16 +172,18 @@ final public class DiskBasedMap implements Map {
this.fileOutputStream = new FileOutputStream(writeOnlyFileHandle, true);
this.writeOnlyFileHandle = new SizeAwareDataOutputStream(fileOutputStream);
this.filePosition = new AtomicLong(0L);
- this.schema = schema;
- this.payloadClazz = payloadClazz;
+ this.keyConverter = keyConverter;
+ this.valueConverter = valueConverter;
}
private void initFile(File writeOnlyFileHandle) throws IOException {
// delete the file if it exists
- if(writeOnlyFileHandle.exists()) {
+ if (writeOnlyFileHandle.exists()) {
writeOnlyFileHandle.delete();
}
writeOnlyFileHandle.createNewFile();
+
+ log.info("Spilling to file location " + writeOnlyFileHandle.getAbsolutePath());
// Open file in readFromDisk-only mode
readOnlyFileHandle = new RandomAccessFile(filePath, "r");
readOnlyFileHandle.seek(0);
@@ -194,12 +200,12 @@ final public class DiskBasedMap implements Map {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
- if(writeOnlyFileHandle != null) {
+ if (writeOnlyFileHandle != null) {
writeOnlyFileHandle.flush();
fileOutputStream.getChannel().force(false);
writeOnlyFileHandle.close();
}
- } catch(Exception e) {
+ } catch (Exception e) {
// fail silently for any sort of exception
}
}
@@ -208,16 +214,14 @@ final public class DiskBasedMap implements Map {
/**
* Custom iterator to iterate over values written to disk
- * @return
*/
public Iterator iterator() {
return new LazyFileIterable(readOnlyFileHandle,
- inMemoryMetadataOfSpilledData, schema, payloadClazz).iterator();
+ valueMetadataMap, valueConverter).iterator();
}
/**
* Number of bytes spilled to disk
- * @return
*/
public long sizeOfFileOnDiskInBytes() {
return filePosition.get();
@@ -225,17 +229,17 @@ final public class DiskBasedMap implements Map {
@Override
public int size() {
- return inMemoryMetadataOfSpilledData.size();
+ return valueMetadataMap.size();
}
@Override
public boolean isEmpty() {
- return inMemoryMetadataOfSpilledData.isEmpty();
+ return valueMetadataMap.isEmpty();
}
@Override
public boolean containsKey(Object key) {
- return inMemoryMetadataOfSpilledData.containsKey(key);
+ return valueMetadataMap.containsKey(key);
}
@Override
@@ -245,34 +249,31 @@ final public class DiskBasedMap implements Map {
@Override
public R get(Object key) {
- ValueMetadata entry = inMemoryMetadataOfSpilledData.get(key);
- if(entry == null) {
+ ValueMetadata entry = valueMetadataMap.get(key);
+ if (entry == null) {
return null;
}
try {
- return SpillableMapUtils.readFromDisk(readOnlyFileHandle, schema,
- payloadClazz, entry.getOffsetOfValue(), entry.getSizeOfValue());
- } catch(IOException e) {
+ return this.valueConverter.getData(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle,
+ entry.getOffsetOfValue(), entry.getSizeOfValue()));
+ } catch (IOException e) {
throw new HoodieIOException("Unable to readFromDisk Hoodie Record from disk", e);
}
}
@Override
public R put(T key, R value) {
- //TODO (na) : check value instanceof HoodieRecordPayload, now assume every payload is HoodieRecord
- HoodieRecord payload = (HoodieRecord) value;
try {
- byte [] val = HoodieAvroUtils.avroToBytes((GenericRecord) payload.getData().getInsertValue(this.schema).get());
+ byte[] val = this.valueConverter.getBytes(value);
Integer valueSize = val.length;
Long timestamp = new Date().getTime();
- this.inMemoryMetadataOfSpilledData.put(key, new DiskBasedMap.ValueMetadata(this.filePath, valueSize,
- filePosition.get(), timestamp));
- // TODO(na) : Test serializer performance for generic types
- String serializedKey = SpillableMapUtils.objectMapper.writeValueAsString(key);
+ this.valueMetadataMap.put(key,
+ new DiskBasedMap.ValueMetadata(this.filePath, valueSize, filePosition.get(), timestamp));
+ byte[] serializedKey = keyConverter.getBytes(key);
filePosition.set(SpillableMapUtils.spillToDisk(writeOnlyFileHandle,
new FileEntry(SpillableMapUtils.generateChecksum(val),
- serializedKey.getBytes().length, valueSize, serializedKey.getBytes(), val, timestamp)));
- } catch(IOException io) {
+ serializedKey.length, valueSize, serializedKey, val, timestamp)));
+ } catch (IOException io) {
throw new HoodieIOException("Unable to store data in Disk Based map", io);
}
return value;
@@ -281,33 +282,33 @@ final public class DiskBasedMap implements Map {
@Override
public R remove(Object key) {
R value = get(key);
- inMemoryMetadataOfSpilledData.remove(key);
+ valueMetadataMap.remove(key);
return value;
}
@Override
public void putAll(Map extends T, ? extends R> m) {
- for(Map.Entry extends T, ? extends R> entry: m.entrySet()) {
+ for (Map.Entry extends T, ? extends R> entry : m.entrySet()) {
put(entry.getKey(), entry.getValue());
}
}
@Override
public void clear() {
- inMemoryMetadataOfSpilledData.clear();
+ valueMetadataMap.clear();
// close input/output streams
try {
writeOnlyFileHandle.flush();
writeOnlyFileHandle.close();
new File(filePath).delete();
- } catch(IOException e) {
+ } catch (IOException e) {
throw new HoodieIOException("unable to clear map or delete file on disk", e);
}
}
@Override
public Set keySet() {
- return inMemoryMetadataOfSpilledData.keySet();
+ return valueMetadataMap.keySet();
}
@Override
@@ -318,7 +319,7 @@ final public class DiskBasedMap implements Map {
@Override
public Set> entrySet() {
Set> entrySet = new HashSet<>();
- for(T key: inMemoryMetadataOfSpilledData.keySet()) {
+ for (T key : valueMetadataMap.keySet()) {
entrySet.add(new AbstractMap.SimpleEntry<>(key, get(key)));
}
return entrySet;
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/ExternalSpillableMap.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/ExternalSpillableMap.java
index 261ca2c31..3d390690e 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/ExternalSpillableMap.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/ExternalSpillableMap.java
@@ -16,10 +16,10 @@
package com.uber.hoodie.common.util.collection;
-import com.uber.hoodie.common.util.SpillableMapUtils;
-import com.uber.hoodie.exception.HoodieIOException;
+import com.twitter.common.objectsize.ObjectSizeCalculator;
+import com.uber.hoodie.common.util.collection.converter.Converter;
+import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieNotSupportedException;
-import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -33,56 +33,54 @@ import java.util.Optional;
import java.util.Set;
/**
- * An external map that spills content to disk when there is insufficient space for it
- * to grow.
- *
- * This map holds 2 types of data structures :
- *
- * (1) Key-Value pairs in a in-memory map
- * (2) Key-ValueMetadata pairs in an in-memory map which keeps a marker to the values spilled to disk
- *
- * NOTE : Values are only appended to disk. If a remove() is called, the entry is marked removed from the in-memory
- * key-valueMetadata map but it's values will be lying around in the temp file on disk until the file is cleaned.
- *
- * The setting of the spill threshold faces the following trade-off: If the spill threshold is
- * too high, the in-memory map may occupy more memory than is available, resulting in OOM.
- * However, if the spill threshold is too low, we spill frequently and incur unnecessary disk
- * writes.
- * @param
- * @param
+ * An external map that spills content to disk when there is insufficient space for it to grow.
+ * This map holds 2 types of data structures :
(1) Key-Value pairs in a in-memory map (2)
+ * Key-ValueMetadata pairs in an in-memory map which keeps a marker to the values spilled to disk
+ *
NOTE : Values are only appended to disk. If a remove() is called, the entry is marked removed
+ * from the in-memory key-valueMetadata map but it's values will be lying around in the temp file on
+ * disk until the file is cleaned.
The setting of the spill threshold faces the following
+ * trade-off: If the spill threshold is too high, the in-memory map may occupy more memory than is
+ * available, resulting in OOM. However, if the spill threshold is too low, we spill frequently and
+ * incur unnecessary disk writes.
*/
-public class ExternalSpillableMap implements Map {
+public class ExternalSpillableMap implements Map {
+ // Find the actual estimated payload size after inserting N records
+ final private static int NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE = 100;
// maximum space allowed in-memory for this map
final private long maxInMemorySizeInBytes;
// current space occupied by this map in-memory
private Long currentInMemoryMapSize;
// Map to store key-values in memory until it hits maxInMemorySizeInBytes
- final private Map inMemoryMap;
+ final private Map inMemoryMap;
// Map to store key-valuemetadata important to find the values spilled to disk
- final private DiskBasedMap diskBasedMap;
- // Schema used to de-serialize and readFromDisk the records written to disk
- final private Schema schema;
+ final private DiskBasedMap diskBasedMap;
// An estimate of the size of each payload written to this map
private volatile long estimatedPayloadSize = 0;
// TODO(na) : a dynamic sizing factor to ensure we have space for other objects in memory and incorrect payload estimation
final private Double sizingFactorForInMemoryMap = 0.8;
+ // Key converter to convert key type to bytes
+ final private Converter keyConverter;
+ // Value converter to convert value type to bytes
+ final private Converter valueConverter;
+ // Flag to determine whether to stop re-estimating payload size
+ private boolean shouldEstimatePayloadSize = true;
private static Logger log = LogManager.getLogger(ExternalSpillableMap.class);
-
- public ExternalSpillableMap(Long maxInMemorySizeInBytes, Schema schema,
- String payloadClazz, Optional baseFilePath) throws IOException {
+ public ExternalSpillableMap(Long maxInMemorySizeInBytes, Optional baseFilePath,
+ Converter keyConverter, Converter valueConverter) throws IOException {
this.inMemoryMap = new HashMap<>();
- this.diskBasedMap = new DiskBasedMap<>(schema, payloadClazz, baseFilePath);
- this.maxInMemorySizeInBytes = (long) Math.floor(maxInMemorySizeInBytes*sizingFactorForInMemoryMap);
- this.schema = schema;
+ this.diskBasedMap = new DiskBasedMap<>(baseFilePath, keyConverter, valueConverter);
+ this.maxInMemorySizeInBytes = (long) Math
+ .floor(maxInMemorySizeInBytes * sizingFactorForInMemoryMap);
this.currentInMemoryMapSize = 0L;
+ this.keyConverter = keyConverter;
+ this.valueConverter = valueConverter;
}
/**
* A custom iterator to wrap over iterating in-memory + disk spilled data
- * @return
*/
public Iterator iterator() {
return new IteratorWrapper<>(inMemoryMap.values().iterator(), diskBasedMap.iterator());
@@ -90,7 +88,6 @@ public class ExternalSpillableMap implements Map {
/**
* Number of entries in DiskBasedMap
- * @return
*/
public int getDiskBasedMapNumEntries() {
return diskBasedMap.size();
@@ -98,7 +95,6 @@ public class ExternalSpillableMap implements Map {
/**
* Number of bytes spilled to disk
- * @return
*/
public long getSizeOfFileOnDiskInBytes() {
return diskBasedMap.sizeOfFileOnDiskInBytes();
@@ -106,7 +102,6 @@ public class ExternalSpillableMap implements Map {
/**
* Number of entries in InMemoryMap
- * @return
*/
public int getInMemoryMapNumEntries() {
return inMemoryMap.size();
@@ -114,7 +109,6 @@ public class ExternalSpillableMap implements Map {
/**
* Approximate memory footprint of the in-memory map
- * @return
*/
public long getCurrentInMemoryMapSize() {
return currentInMemoryMapSize;
@@ -142,9 +136,9 @@ public class ExternalSpillableMap implements Map {
@Override
public R get(Object key) {
- if(inMemoryMap.containsKey(key)) {
+ if (inMemoryMap.containsKey(key)) {
return inMemoryMap.get(key);
- } else if(diskBasedMap.containsKey(key)) {
+ } else if (diskBasedMap.containsKey(key)) {
return diskBasedMap.get(key);
}
return null;
@@ -152,33 +146,43 @@ public class ExternalSpillableMap implements Map {
@Override
public R put(T key, R value) {
- try {
- if (this.currentInMemoryMapSize < maxInMemorySizeInBytes || inMemoryMap.containsKey(key)) {
- // Naive approach for now
- if (estimatedPayloadSize == 0) {
- this.estimatedPayloadSize = SpillableMapUtils.computePayloadSize(value, schema);
- log.info("Estimated Payload size => " + estimatedPayloadSize);
- }
- if(!inMemoryMap.containsKey(key)) {
- currentInMemoryMapSize += this.estimatedPayloadSize;
- }
- inMemoryMap.put(key, value);
- } else {
- diskBasedMap.put(key, value);
+ if (this.currentInMemoryMapSize < maxInMemorySizeInBytes || inMemoryMap.containsKey(key)) {
+ if (shouldEstimatePayloadSize && estimatedPayloadSize == 0) {
+ // At first, use the sizeEstimate of a record being inserted into the spillable map.
+ // Note, the converter may over estimate the size of a record in the JVM
+ this.estimatedPayloadSize =
+ keyConverter.sizeEstimate(key) + valueConverter.sizeEstimate(value);
+ log.info("Estimated Payload size => " + estimatedPayloadSize);
}
- return value;
- } catch(IOException io) {
- throw new HoodieIOException("Unable to estimate size of payload", io);
+ else if(shouldEstimatePayloadSize &&
+ inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0) {
+ // Re-estimate the size of a record by calculating the size of the entire map containing
+ // N entries and then dividing by the number of entries present (N). This helps to get a
+ // correct estimation of the size of each record in the JVM.
+ long totalMapSize = ObjectSizeCalculator.getObjectSize(inMemoryMap);
+ this.currentInMemoryMapSize = totalMapSize;
+ this.estimatedPayloadSize = totalMapSize/inMemoryMap.size();
+ shouldEstimatePayloadSize = false;
+ log.info("New Estimated Payload size => " + this.estimatedPayloadSize);
+ }
+ if (!inMemoryMap.containsKey(key)) {
+ // TODO : Add support for adjusting payloadSize for updates to the same key
+ currentInMemoryMapSize += this.estimatedPayloadSize;
+ }
+ inMemoryMap.put(key, value);
+ } else {
+ diskBasedMap.put(key, value);
}
+ return value;
}
@Override
public R remove(Object key) {
// NOTE : diskBasedMap.remove does not delete the data from disk
- if(inMemoryMap.containsKey(key)) {
+ if (inMemoryMap.containsKey(key)) {
currentInMemoryMapSize -= estimatedPayloadSize;
return inMemoryMap.remove(key);
- } else if(diskBasedMap.containsKey(key)) {
+ } else if (diskBasedMap.containsKey(key)) {
return diskBasedMap.remove(key);
}
return null;
@@ -186,7 +190,7 @@ public class ExternalSpillableMap implements Map {
@Override
public void putAll(Map extends T, ? extends R> m) {
- for(Map.Entry extends T, ? extends R> entry: m.entrySet()) {
+ for (Map.Entry extends T, ? extends R> entry : m.entrySet()) {
put(entry.getKey(), entry.getValue());
}
}
@@ -208,7 +212,7 @@ public class ExternalSpillableMap implements Map {
@Override
public Collection values() {
- if(diskBasedMap.isEmpty()) {
+ if (diskBasedMap.isEmpty()) {
return inMemoryMap.values();
}
throw new HoodieNotSupportedException("Cannot return all values in memory");
@@ -226,7 +230,6 @@ public class ExternalSpillableMap implements Map {
* Iterator that wraps iterating over all the values for this map
* 1) inMemoryIterator - Iterates over all the data in-memory map
* 2) diskLazyFileIterator - Iterates over all the data spilled to disk
- * @param
*/
private class IteratorWrapper implements Iterator {
@@ -237,9 +240,10 @@ public class ExternalSpillableMap implements Map {
this.inMemoryIterator = inMemoryIterator;
this.diskLazyFileIterator = diskLazyFileIterator;
}
+
@Override
public boolean hasNext() {
- if(inMemoryIterator.hasNext()) {
+ if (inMemoryIterator.hasNext()) {
return true;
}
return diskLazyFileIterator.hasNext();
@@ -247,7 +251,7 @@ public class ExternalSpillableMap implements Map {
@Override
public R next() {
- if(inMemoryIterator.hasNext()) {
+ if (inMemoryIterator.hasNext()) {
return inMemoryIterator.next();
}
return diskLazyFileIterator.next();
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java
index f91894ecd..78eaa3f34 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java
@@ -17,14 +17,13 @@
package com.uber.hoodie.common.util.collection;
import com.uber.hoodie.common.util.SpillableMapUtils;
+import com.uber.hoodie.common.util.collection.converter.Converter;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
-import org.apache.avro.Schema;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -33,57 +32,52 @@ import java.util.stream.Collectors;
* Iterable to lazily fetch values spilled to disk.
* This class uses RandomAccessFile to randomly access the position of
* the latest value for a key spilled to disk and returns the result.
- * @param
*/
-public class LazyFileIterable implements Iterable {
+public class LazyFileIterable implements Iterable {
// Used to access the value written at a specific position in the file
- private RandomAccessFile readOnlyFileHandle;
+ private final RandomAccessFile readOnlyFileHandle;
// Stores the key and corresponding value's latest metadata spilled to disk
- private Map inMemoryMetadataOfSpilledData;
- // Schema used to de-serialize payload written to disk
- private Schema schema;
- // Class used to de-serialize/realize payload written to disk
- private String payloadClazz;
+ private final Map inMemoryMetadataOfSpilledData;
+ private final Converter valueConverter;
public LazyFileIterable(RandomAccessFile file, Map map,
- Schema schema, String payloadClazz) {
+ Converter valueConverter) {
this.readOnlyFileHandle = file;
this.inMemoryMetadataOfSpilledData = map;
- this.schema = schema;
- this.payloadClazz = payloadClazz;
+ this.valueConverter = valueConverter;
}
+
@Override
- public Iterator iterator() {
+ public Iterator iterator() {
try {
- return new LazyFileIterator<>(readOnlyFileHandle, inMemoryMetadataOfSpilledData, schema, payloadClazz);
- } catch(IOException io) {
+ return new LazyFileIterator<>(readOnlyFileHandle, inMemoryMetadataOfSpilledData,
+ valueConverter);
+ } catch (IOException io) {
throw new HoodieException("Unable to initialize iterator for file on disk", io);
}
}
/**
* Iterator implementation for the iterable defined above.
- * @param
*/
- public class LazyFileIterator implements Iterator {
+ public class LazyFileIterator implements Iterator {
private RandomAccessFile readOnlyFileHandle;
- private Schema schema;
- private String payloadClazz;
private Iterator> metadataIterator;
+ private final Converter valueConverter;
public LazyFileIterator(RandomAccessFile file, Map map,
- Schema schema, String payloadClazz) throws IOException {
+ Converter valueConverter) throws IOException {
this.readOnlyFileHandle = file;
- this.schema = schema;
- this.payloadClazz = payloadClazz;
+ this.valueConverter = valueConverter;
// sort the map in increasing order of offset of value so disk seek is only in one(forward) direction
this.metadataIterator = map
.entrySet()
.stream()
- .sorted((Map.Entry o1, Map.Entry o2) ->
- o1.getValue().getOffsetOfValue().compareTo(o2.getValue().getOffsetOfValue()))
+ .sorted(
+ (Map.Entry o1, Map.Entry o2) ->
+ o1.getValue().getOffsetOfValue().compareTo(o2.getValue().getOffsetOfValue()))
.collect(Collectors.toList()).iterator();
}
@@ -93,12 +87,12 @@ public class LazyFileIterable implements Iterable {
}
@Override
- public T next() {
+ public R next() {
Map.Entry entry = this.metadataIterator.next();
try {
- return SpillableMapUtils.readFromDisk(readOnlyFileHandle, schema,
- payloadClazz, entry.getValue().getOffsetOfValue(), entry.getValue().getSizeOfValue());
- } catch(IOException e) {
+ return valueConverter.getData(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle,
+ entry.getValue().getOffsetOfValue(), entry.getValue().getSizeOfValue()));
+ } catch (IOException e) {
throw new HoodieIOException("Unable to read hoodie record from value spilled to disk", e);
}
}
@@ -109,7 +103,7 @@ public class LazyFileIterable implements Iterable {
}
@Override
- public void forEachRemaining(Consumer super T> action) {
+ public void forEachRemaining(Consumer super R> action) {
action.accept(next());
}
}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/Converter.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/Converter.java
new file mode 100644
index 000000000..b06651973
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/Converter.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.util.collection.converter;
+
+/**
+ * A converter interface to getBytes or deserialize a payload. This is used in {@link
+ * com.uber.hoodie.common.util.collection.ExternalSpillableMap} to spillToDisk
+ */
+public interface Converter {
+
+ /**
+ * This method is used to convert a payload to bytes
+ */
+ byte[] getBytes(T t);
+
+ /**
+ * This method is used to convert the serialized payload (in bytes) to the actual payload instance
+ */
+ T getData(byte[] bytes);
+
+ /**
+ * This method is used to estimate the size of a payload in memory
+ */
+ long sizeEstimate(T t);
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/HoodieRecordConverter.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/HoodieRecordConverter.java
new file mode 100644
index 000000000..73dc0541a
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/HoodieRecordConverter.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.util.collection.converter;
+
+import com.twitter.common.objectsize.ObjectSizeCalculator;
+import com.uber.hoodie.common.model.HoodieKey;
+import com.uber.hoodie.common.model.HoodieRecord;
+import com.uber.hoodie.common.model.HoodieRecordPayload;
+import com.uber.hoodie.common.util.HoodieAvroUtils;
+import com.uber.hoodie.common.util.ReflectionUtils;
+import com.uber.hoodie.exception.HoodieException;
+import com.uber.hoodie.exception.HoodieNotSerializableException;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.stream.Stream;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * A default converter implementation for HoodieRecord
+ */
+public class HoodieRecordConverter implements
+ Converter> {
+
+ // Schema used to get GenericRecord from HoodieRecordPayload then convert to bytes and vice-versa
+ private final Schema schema;
+ // The client implementation of HoodieRecordPayload used to re-create HoodieRecord from bytes
+ private final String payloadClazz;
+
+ private static Logger log = LogManager.getLogger(HoodieRecordConverter.class);
+
+ public HoodieRecordConverter(Schema schema, String payloadClazz) {
+ this.schema = schema;
+ this.payloadClazz = payloadClazz;
+ }
+
+ @Override
+ public byte[] getBytes(HoodieRecord hoodieRecord) {
+ try {
+ // Need to initialize this to 0 bytes since deletes are handled by putting an empty record in HoodieRecord
+ byte[] val = new byte[0];
+ if (hoodieRecord.getData().getInsertValue(schema).isPresent()) {
+ val = HoodieAvroUtils
+ .avroToBytes((GenericRecord) hoodieRecord.getData().getInsertValue(schema).get());
+ }
+ Pair, byte[]> data =
+ Pair.of(Pair.of(hoodieRecord.getKey().getRecordKey(),
+ hoodieRecord.getKey().getPartitionPath()), val);
+ return SerializationUtils.serialize(data);
+ } catch (IOException io) {
+ throw new HoodieNotSerializableException("Cannot serialize value to bytes", io);
+ }
+ }
+
+ @Override
+ public HoodieRecord getData(byte[] bytes) {
+ try {
+ Pair, byte[]> data = SerializationUtils.deserialize(bytes);
+ Optional payload = Optional.empty();
+ if (data.getValue().length > 0) {
+ // This can happen if the record is deleted, the payload is optional with 0 bytes
+ payload = Optional.of(HoodieAvroUtils.bytesToAvro(data.getValue(), schema));
+ }
+ HoodieRecord extends HoodieRecordPayload> hoodieRecord = new HoodieRecord<>(
+ new HoodieKey(data.getKey().getKey(), data.getKey().getValue()),
+ ReflectionUtils
+ .loadPayload(payloadClazz,
+ new Object[]{payload}, Optional.class));
+ return hoodieRecord;
+ } catch (IOException io) {
+ throw new HoodieNotSerializableException("Cannot de-serialize value from bytes", io);
+ }
+ }
+
+ @Override
+ public long sizeEstimate(HoodieRecord extends HoodieRecordPayload> hoodieRecord) {
+ // Most HoodieRecords are bound to have data + schema. Although, the same schema object is shared amongst
+ // all records in the JVM. Calculate and print the size of the Schema and of the Record to
+ // note the sizes and differences. A correct estimation in such cases is handled in
+ /** {@link com.uber.hoodie.common.util.collection.ExternalSpillableMap} **/
+ long sizeOfRecord = ObjectSizeCalculator.getObjectSize(hoodieRecord);
+ long sizeOfSchema = ObjectSizeCalculator.getObjectSize(schema);
+ log.info("SizeOfRecord => " + sizeOfRecord + " SizeOfSchema => " + sizeOfSchema);
+ return sizeOfRecord;
+ }
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/StringConverter.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/StringConverter.java
new file mode 100644
index 000000000..ea84a7dd9
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/StringConverter.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.util.collection.converter;
+
+import com.twitter.common.objectsize.ObjectSizeCalculator;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * A default converter implementation for String type of payload key
+ */
+public class StringConverter implements Converter {
+
+ @Override
+ public byte[] getBytes(String s) {
+ return s.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public String getData(byte[] bytes) {
+ return new String(bytes);
+ }
+
+ @Override
+ public long sizeEstimate(String s) {
+ return ObjectSizeCalculator.getObjectSize(s);
+ }
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/exception/HoodieNotSerializableException.java b/hoodie-common/src/main/java/com/uber/hoodie/exception/HoodieNotSerializableException.java
new file mode 100644
index 000000000..17650c79d
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/exception/HoodieNotSerializableException.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.exception;
+
+import java.io.IOException;
+
+public class HoodieNotSerializableException extends HoodieException {
+
+ private IOException ioException;
+
+ public HoodieNotSerializableException(String msg, IOException t) {
+ super(msg, t);
+ this.ioException = t;
+ }
+
+ public HoodieNotSerializableException(String msg) {
+ super(msg);
+ }
+
+ public IOException getIOException() {
+ return ioException;
+ }
+}
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/AvroBinaryTestPayload.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/AvroBinaryTestPayload.java
new file mode 100644
index 000000000..984fb3a41
--- /dev/null
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/AvroBinaryTestPayload.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.model;
+
+import com.uber.hoodie.common.util.HoodieAvroUtils;
+import com.uber.hoodie.exception.HoodieIOException;
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+public class AvroBinaryTestPayload implements HoodieRecordPayload {
+
+ private final byte[] recordBytes;
+
+ public AvroBinaryTestPayload(Optional record) {
+
+ try {
+ if (record.isPresent()) {
+ recordBytes = HoodieAvroUtils.avroToBytes(record.get());
+ } else {
+ recordBytes = new byte[0];
+ }
+ } catch (IOException io) {
+ throw new HoodieIOException("unable to convert payload to bytes");
+ }
+ }
+
+ @Override
+ public HoodieRecordPayload preCombine(HoodieRecordPayload another) {
+ return this;
+ }
+
+ @Override
+ public Optional combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
+ return getInsertValue(schema);
+ }
+
+ @Override
+ public Optional getInsertValue(Schema schema) throws IOException {
+ return Optional.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema));
+ }
+}
\ No newline at end of file
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java
index 1de52e8ed..a736da8e5 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java
@@ -17,9 +17,13 @@
package com.uber.hoodie.common.util;
import com.uber.hoodie.avro.MercifulJsonConverter;
+import com.uber.hoodie.common.model.HoodieAvroPayload;
+import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
+import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.exception.HoodieIOException;
+
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -32,9 +36,11 @@ import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
@@ -109,21 +115,63 @@ public class SchemaTestUtil {
}
+ public static List generateHoodieTestRecords(int from, int limit, Schema schema)
+ throws IOException, URISyntaxException {
+ List records = generateTestRecords(from, limit);
+ return records.stream()
+ .map(s -> HoodieAvroUtils.rewriteRecord((GenericRecord) s, schema))
+ .map(p -> convertToHoodieRecords(p,
+ UUID.randomUUID().toString(), "000/00/00")).collect(
+ Collectors.toList());
+ }
+
+ private static HoodieRecord convertToHoodieRecords(IndexedRecord iRecord, String key, String partitionPath) {
+ return new HoodieRecord<>(new HoodieKey(key, partitionPath),
+ new HoodieAvroPayload(Optional.of((GenericRecord) iRecord)));
+ }
+
public static List updateHoodieTestRecords(List oldRecordKeys, List newRecords,
String commitTime)
throws IOException, URISyntaxException {
return newRecords.stream()
.map(p -> {
- ((GenericRecord)p).put(HoodieRecord.RECORD_KEY_METADATA_FIELD, oldRecordKeys.remove(0));
- ((GenericRecord)p).put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, "0000/00/00");
- ((GenericRecord)p).put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitTime);
+ ((GenericRecord) p).put(HoodieRecord.RECORD_KEY_METADATA_FIELD, oldRecordKeys.remove(0));
+ ((GenericRecord) p).put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, "0000/00/00");
+ ((GenericRecord) p).put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitTime);
return p;
}).collect(
Collectors.toList());
}
+ public static List generateHoodieTestRecordsWithoutHoodieMetadata(int from, int limit)
+ throws IOException, URISyntaxException {
+
+ List iRecords = generateTestRecords(from, limit);
+ return iRecords
+ .stream()
+ .map(r -> new HoodieRecord<>(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
+ new HoodieAvroPayload(Optional.of((GenericRecord) r)))).collect(Collectors.toList());
+ }
+
+ public static List updateHoodieTestRecordsWithoutHoodieMetadata(List oldRecords, Schema schema,
+ String fieldNameToUpdate, String newValue)
+ throws IOException, URISyntaxException {
+ return oldRecords
+ .stream()
+ .map(r -> {
+ try {
+ GenericRecord rec = (GenericRecord) r.getData().getInsertValue(schema).get();
+ rec.put(fieldNameToUpdate, newValue);
+ return new HoodieRecord<>(r.getKey(),
+ new HoodieAvroPayload(Optional.of(rec)));
+ } catch (IOException io) {
+ throw new HoodieIOException("unable to get data from hoodie record", io);
+ }
+ }).collect(Collectors.toList());
+ }
+
public static Schema getEvolvedSchema() throws IOException {
return new Schema.Parser()
.parse(SchemaTestUtil.class.getResourceAsStream("/simple-test-evolved.avro"));
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/SpillableMapTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/SpillableMapTestUtils.java
index 05be50144..f8360376e 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/SpillableMapTestUtils.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/SpillableMapTestUtils.java
@@ -36,8 +36,8 @@ public class SpillableMapTestUtils {
iRecords
.stream()
.forEach(r -> {
- String key = ((GenericRecord)r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
- String partitionPath = ((GenericRecord)r).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
+ String key = ((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+ String partitionPath = ((GenericRecord) r).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
recordKeys.add(key);
records.put(key, new HoodieRecord<>(new HoodieKey(key, partitionPath),
new HoodieAvroPayload(Optional.of((GenericRecord) r))));
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestDiskBasedMap.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestDiskBasedMap.java
index eb570a655..5eee98618 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestDiskBasedMap.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestDiskBasedMap.java
@@ -16,45 +16,83 @@
package com.uber.hoodie.common.util.collection;
+import static com.uber.hoodie.common.util.SchemaTestUtil.getSimpleSchema;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import com.uber.hoodie.common.model.HoodieAvroPayload;
+import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
+import com.uber.hoodie.common.model.AvroBinaryTestPayload;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.SchemaTestUtil;
import com.uber.hoodie.common.util.SpillableMapTestUtils;
+import com.uber.hoodie.common.util.SpillableMapUtils;
+import com.uber.hoodie.common.util.collection.converter.StringConverter;
+import com.uber.hoodie.common.util.collection.converter.HoodieRecordConverter;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.junit.Test;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Optional;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
-
public class TestDiskBasedMap {
@Test
public void testSimpleInsert() throws IOException, URISyntaxException {
- Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
- DiskBasedMap records = new DiskBasedMap<>(schema, HoodieAvroPayload.class.getName(),Optional.empty());
+ Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+ String payloadClazz = HoodieAvroPayload.class.getName();
+
+ DiskBasedMap records = new DiskBasedMap<>(Optional.empty(),
+ new StringConverter(), new HoodieRecordConverter(schema, payloadClazz));
List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
- String commitTime = ((GenericRecord)iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
+ String commitTime = ((GenericRecord) iRecords.get(0))
+ .get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
// make sure records have spilled to disk
assertTrue(records.sizeOfFileOnDiskInBytes() > 0);
Iterator> itr = records.iterator();
List oRecords = new ArrayList<>();
- while(itr.hasNext()) {
+ while (itr.hasNext()) {
+ HoodieRecord extends HoodieRecordPayload> rec = itr.next();
+ oRecords.add(rec);
+ assert recordKeys.contains(rec.getRecordKey());
+ }
+ }
+
+ @Test
+ public void testSimpleInsertWithoutHoodieMetadata() throws IOException, URISyntaxException {
+ Schema schema = getSimpleSchema();
+ String payloadClazz = HoodieAvroPayload.class.getName();
+
+ DiskBasedMap records = new DiskBasedMap<>(Optional.empty(),
+ new StringConverter(), new HoodieRecordConverter(schema, payloadClazz));
+ List hoodieRecords = SchemaTestUtil
+ .generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);
+ Set recordKeys = new HashSet<>();
+ // insert generated records into the map
+ hoodieRecords.stream().forEach(r -> {
+ records.put(r.getRecordKey(), r);
+ recordKeys.add(r.getRecordKey());
+ });
+ // make sure records have spilled to disk
+ assertTrue(records.sizeOfFileOnDiskInBytes() > 0);
+ Iterator> itr = records.iterator();
+ List oRecords = new ArrayList<>();
+ while (itr.hasNext()) {
HoodieRecord extends HoodieRecordPayload> rec = itr.next();
oRecords.add(rec);
assert recordKeys.contains(rec.getRecordKey());
@@ -64,10 +102,14 @@ public class TestDiskBasedMap {
@Test
public void testSimpleUpsert() throws IOException, URISyntaxException {
- Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
- DiskBasedMap records = new DiskBasedMap<>(schema, HoodieAvroPayload.class.getName(),Optional.empty());
+ Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+ String payloadClazz = HoodieAvroPayload.class.getName();
+
+ DiskBasedMap records = new DiskBasedMap<>(Optional.empty(),
+ new StringConverter(), new HoodieRecordConverter(schema, payloadClazz));
List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
- String commitTime = ((GenericRecord)iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
+ String commitTime = ((GenericRecord) iRecords.get(0))
+ .get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
// perform some inserts
List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
@@ -77,9 +119,11 @@ public class TestDiskBasedMap {
// generate updates from inserts
List updatedRecords =
- SchemaTestUtil.updateHoodieTestRecords(recordKeys, SchemaTestUtil.generateHoodieTestRecords(0, 100),
- HoodieActiveTimeline.createNewCommitTime());
- String newCommitTime = ((GenericRecord)updatedRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
+ SchemaTestUtil
+ .updateHoodieTestRecords(recordKeys, SchemaTestUtil.generateHoodieTestRecords(0, 100),
+ HoodieActiveTimeline.createNewCommitTime());
+ String newCommitTime = ((GenericRecord) updatedRecords.get(0))
+ .get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
// new commit time should be different
assertEquals(commitTime, newCommitTime);
@@ -92,16 +136,75 @@ public class TestDiskBasedMap {
// Upserted records (on disk) should have the latest commit time
Iterator> itr = records.iterator();
- while(itr.hasNext()) {
+ while (itr.hasNext()) {
HoodieRecord extends HoodieRecordPayload> rec = itr.next();
assert recordKeys.contains(rec.getRecordKey());
try {
- IndexedRecord indexedRecord = (IndexedRecord)rec.getData().getInsertValue(schema).get();
- String latestCommitTime = ((GenericRecord)indexedRecord).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
+ IndexedRecord indexedRecord = (IndexedRecord) rec.getData().getInsertValue(schema).get();
+ String latestCommitTime = ((GenericRecord) indexedRecord)
+ .get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
assertEquals(latestCommitTime, newCommitTime);
- } catch(IOException io) {
+ } catch (IOException io) {
throw new UncheckedIOException(io);
}
}
}
+
+ @Test
+ public void testSizeEstimator() throws IOException, URISyntaxException {
+ Schema schema = SchemaTestUtil.getSimpleSchema();
+
+ // Test sizeEstimator without hoodie metadata fields
+ List hoodieRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1, schema);
+
+ long payloadSize = SpillableMapUtils.computePayloadSize(hoodieRecords.remove(0),
+ new HoodieRecordConverter(schema, HoodieAvroPayload.class.getName()));
+ assertTrue(payloadSize > 0);
+
+ // Test sizeEstimator with hoodie metadata fields
+ schema = HoodieAvroUtils.addMetadataFields(schema);
+ hoodieRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1, schema);
+ payloadSize = SpillableMapUtils.computePayloadSize(hoodieRecords.remove(0),
+ new HoodieRecordConverter(schema, HoodieAvroPayload.class.getName()));
+ assertTrue(payloadSize > 0);
+
+ // Following tests payloads without an Avro Schema in the Record
+
+ // Test sizeEstimator without hoodie metadata fields and without schema object in the payload
+ schema = SchemaTestUtil.getSimpleSchema();
+ List indexedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1);
+ hoodieRecords = indexedRecords.stream()
+ .map(r -> new HoodieRecord(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
+ new AvroBinaryTestPayload(Optional.of((GenericRecord) r)))).collect(Collectors.toList());
+ payloadSize = SpillableMapUtils.computePayloadSize(hoodieRecords.remove(0),
+ new HoodieRecordConverter(schema, AvroBinaryTestPayload.class.getName()));
+ assertTrue(payloadSize > 0);
+
+ // Test sizeEstimator with hoodie metadata fields and without schema object in the payload
+ final Schema simpleSchemaWithMetadata = HoodieAvroUtils
+ .addMetadataFields(SchemaTestUtil.getSimpleSchema());
+ indexedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1);
+ hoodieRecords = indexedRecords.stream()
+ .map(r -> new HoodieRecord(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
+ new AvroBinaryTestPayload(Optional
+ .of(HoodieAvroUtils.rewriteRecord((GenericRecord) r, simpleSchemaWithMetadata)))))
+ .collect(Collectors.toList());
+ payloadSize = SpillableMapUtils.computePayloadSize(hoodieRecords.remove(0),
+ new HoodieRecordConverter(schema, AvroBinaryTestPayload.class.getName()));
+ assertTrue(payloadSize > 0);
+ }
+
+ @Test
+ public void testSizeEstimatorPerformance() throws IOException, URISyntaxException {
+ // Test sizeEstimatorPerformance with simpleSchema
+ Schema schema = SchemaTestUtil.getSimpleSchema();
+ List hoodieRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1, schema);
+ HoodieRecordConverter converter =
+ new HoodieRecordConverter(schema, HoodieAvroPayload.class.getName());
+ HoodieRecord record = hoodieRecords.remove(0);
+ long startTime = System.currentTimeMillis();
+ SpillableMapUtils.computePayloadSize(record, converter);
+ long timeTaken = System.currentTimeMillis() - startTime;
+ assertTrue(timeTaken < 100);
+ }
}
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java
index d15caf645..1c56b6cb8 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java
@@ -16,6 +16,10 @@
package com.uber.hoodie.common.util.collection;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
@@ -24,13 +28,8 @@ import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.SchemaTestUtil;
import com.uber.hoodie.common.util.SpillableMapTestUtils;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.junit.FixMethodOrder;
-import org.junit.Test;
-import org.junit.runners.MethodSorters;
-
+import com.uber.hoodie.common.util.collection.converter.StringConverter;
+import com.uber.hoodie.common.util.collection.converter.HoodieRecordConverter;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -39,10 +38,12 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class TestExternalSpillableMap {
@@ -51,17 +52,19 @@ public class TestExternalSpillableMap {
@Test
public void simpleInsertTest() throws IOException, URISyntaxException {
+ Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+ String payloadClazz = HoodieAvroPayload.class.getName();
ExternalSpillableMap> records =
new ExternalSpillableMap<>
- (16L, HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()),
- HoodieAvroPayload.class.getName(), Optional.empty()); //16B
+ (16L, Optional.empty(), new StringConverter(),
+ new HoodieRecordConverter(schema, payloadClazz)); //16B
List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
assert (recordKeys.size() == 100);
Iterator> itr = records.iterator();
List oRecords = new ArrayList<>();
- while(itr.hasNext()) {
+ while (itr.hasNext()) {
HoodieRecord extends HoodieRecordPayload> rec = itr.next();
oRecords.add(rec);
assert recordKeys.contains(rec.getRecordKey());
@@ -72,16 +75,18 @@ public class TestExternalSpillableMap {
public void testSimpleUpsert() throws IOException, URISyntaxException {
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+ String payloadClazz = HoodieAvroPayload.class.getName();
+
ExternalSpillableMap> records =
new ExternalSpillableMap<>
- (16L, schema,
- HoodieAvroPayload.class.getName(), Optional.of(FAILURE_OUTPUT_PATH)); //16B
+ (16L, Optional.of(FAILURE_OUTPUT_PATH), new StringConverter(),
+ new HoodieRecordConverter(schema, payloadClazz)); //16B
List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
assert (recordKeys.size() == 100);
Iterator> itr = records.iterator();
- while(itr.hasNext()) {
+ while (itr.hasNext()) {
HoodieRecord extends HoodieRecordPayload> rec = itr.next();
assert recordKeys.contains(rec.getRecordKey());
}
@@ -99,8 +104,8 @@ public class TestExternalSpillableMap {
updatedRecords.stream().forEach(record -> {
HoodieRecord rec = records.get(((GenericRecord) record).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
try {
- assertEquals(rec.getData().getInsertValue(schema).get(),record);
- } catch(IOException io) {
+ assertEquals(rec.getData().getInsertValue(schema).get(), record);
+ } catch (IOException io) {
throw new UncheckedIOException(io);
}
});
@@ -109,25 +114,28 @@ public class TestExternalSpillableMap {
@Test
public void testAllMapOperations() throws IOException, URISyntaxException {
+ Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+ String payloadClazz = HoodieAvroPayload.class.getName();
+
ExternalSpillableMap> records =
new ExternalSpillableMap<>
- (16L, HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()),
- HoodieAvroPayload.class.getName(), Optional.empty()); //16B
+ (16L, Optional.empty(), new StringConverter(),
+ new HoodieRecordConverter(schema, payloadClazz)); //16B
List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
// insert a bunch of records so that values spill to disk too
List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
IndexedRecord inMemoryRecord = iRecords.get(0);
- String ikey = ((GenericRecord)inMemoryRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
- String iPartitionPath = ((GenericRecord)inMemoryRecord).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
+ String ikey = ((GenericRecord) inMemoryRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+ String iPartitionPath = ((GenericRecord) inMemoryRecord).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
HoodieRecord inMemoryHoodieRecord = new HoodieRecord<>(new HoodieKey(ikey, iPartitionPath),
- new HoodieAvroPayload(Optional.of((GenericRecord)inMemoryRecord)));
+ new HoodieAvroPayload(Optional.of((GenericRecord) inMemoryRecord)));
IndexedRecord onDiskRecord = iRecords.get(99);
- String dkey = ((GenericRecord)onDiskRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
- String dPartitionPath = ((GenericRecord)onDiskRecord).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
+ String dkey = ((GenericRecord) onDiskRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+ String dPartitionPath = ((GenericRecord) onDiskRecord).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
HoodieRecord onDiskHoodieRecord = new HoodieRecord<>(new HoodieKey(dkey, dPartitionPath),
- new HoodieAvroPayload(Optional.of((GenericRecord)onDiskRecord)));
+ new HoodieAvroPayload(Optional.of((GenericRecord) onDiskRecord)));
// assert size
assert records.size() == 100;
// get should return the same HoodieKey and same value
@@ -162,16 +170,19 @@ public class TestExternalSpillableMap {
@Test(expected = IOException.class)
public void simpleTestWithException() throws IOException, URISyntaxException {
+ Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+ String payloadClazz = HoodieAvroPayload.class.getName();
+
ExternalSpillableMap> records =
new ExternalSpillableMap<>
- (16L, HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()),
- HoodieAvroPayload.class.getName(), Optional.of(FAILURE_OUTPUT_PATH)); //16B
+ (16L, Optional.of(FAILURE_OUTPUT_PATH), new StringConverter(),
+ new HoodieRecordConverter(schema, payloadClazz)); //16B
List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
assert (recordKeys.size() == 100);
Iterator> itr = records.iterator();
- while(itr.hasNext()) {
+ while (itr.hasNext()) {
throw new IOException("Testing failures...");
}
}
@@ -183,17 +194,19 @@ public class TestExternalSpillableMap {
}
@Test
- public void testDataCorrectnessInMapAndDisk() throws IOException, URISyntaxException {
+ public void testDataCorrectnessWithUpsertsToDataInMapAndOnDisk() throws IOException, URISyntaxException {
+
+ Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+ String payloadClazz = HoodieAvroPayload.class.getName();
- Schema schema = SchemaTestUtil.getSimpleSchema();
ExternalSpillableMap> records =
new ExternalSpillableMap<>
- (16L, HoodieAvroUtils.addMetadataFields(schema),
- HoodieAvroPayload.class.getName(), Optional.of(FAILURE_OUTPUT_PATH)); //16B
+ (16L, Optional.of(FAILURE_OUTPUT_PATH), new StringConverter(),
+ new HoodieRecordConverter(schema, payloadClazz)); //16B
List recordKeys = new ArrayList<>();
// Ensure we spill to disk
- while(records.getDiskBasedMapNumEntries() < 1) {
+ while (records.getDiskBasedMapNumEntries() < 1) {
List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
recordKeys.addAll(SpillableMapTestUtils.upsertRecords(iRecords, records));
}
@@ -202,7 +215,6 @@ public class TestExternalSpillableMap {
String key = recordKeys.get(0);
HoodieRecord record = records.get(key);
List recordsToUpdate = new ArrayList<>();
- schema = HoodieAvroUtils.addMetadataFields(schema);
recordsToUpdate.add((IndexedRecord) record.getData().getInsertValue(schema).get());
String newCommitTime = HoodieActiveTimeline.createNewCommitTime();
@@ -235,7 +247,72 @@ public class TestExternalSpillableMap {
gRecord = (GenericRecord) records.get(key).getData().getInsertValue(schema).get();
// The record returned for this key should have the updated commitTime
assert newCommitTime.contentEquals(gRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString());
+ }
+ @Test
+ public void testDataCorrectnessWithoutHoodieMetadata() throws IOException, URISyntaxException {
+
+ Schema schema = SchemaTestUtil.getSimpleSchema();
+ String payloadClazz = HoodieAvroPayload.class.getName();
+
+ ExternalSpillableMap> records =
+ new ExternalSpillableMap<>
+ (16L, Optional.of(FAILURE_OUTPUT_PATH), new StringConverter(),
+ new HoodieRecordConverter(schema, payloadClazz)); //16B
+
+ List recordKeys = new ArrayList<>();
+ // Ensure we spill to disk
+ while (records.getDiskBasedMapNumEntries() < 1) {
+ List hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 100);
+ hoodieRecords.stream().forEach(r -> {
+ records.put(r.getRecordKey(), r);
+ recordKeys.add(r.getRecordKey());
+ });
+ }
+
+ // Get a record from the in-Memory map
+ String key = recordKeys.get(0);
+ HoodieRecord record = records.get(key);
+ // Get the field we want to update
+ String fieldName = schema.getFields().stream().filter(field -> field.schema().getType() == Schema.Type.STRING)
+ .findAny().get().name();
+ // Use a new value to update this field
+ String newValue = "update1";
+ List recordsToUpdate = new ArrayList<>();
+ recordsToUpdate.add(record);
+
+ List updatedRecords =
+ SchemaTestUtil.updateHoodieTestRecordsWithoutHoodieMetadata(recordsToUpdate, schema, fieldName, newValue);
+
+ // Upsert this updated record
+ updatedRecords.stream().forEach(r -> {
+ records.put(r.getRecordKey(), r);
+ });
+ GenericRecord gRecord = (GenericRecord) records.get(key).getData().getInsertValue(schema).get();
+ // The record returned for this key should have the updated value for the field name
+ assertEquals(gRecord.get(fieldName).toString(), newValue);
+
+ // Get a record from the disk based map
+ key = recordKeys.get(recordKeys.size() - 1);
+ record = records.get(key);
+ // Get the field we want to update
+ fieldName = schema.getFields().stream().filter(field -> field.schema().getType() == Schema.Type.STRING)
+ .findAny().get().name();
+ // Use a new value to update this field
+ newValue = "update2";
+ recordsToUpdate = new ArrayList<>();
+ recordsToUpdate.add(record);
+
+ updatedRecords =
+ SchemaTestUtil.updateHoodieTestRecordsWithoutHoodieMetadata(recordsToUpdate, schema, fieldName, newValue);
+
+ // Upsert this updated record
+ updatedRecords.stream().forEach(r -> {
+ records.put(r.getRecordKey(), r);
+ });
+ gRecord = (GenericRecord) records.get(key).getData().getInsertValue(schema).get();
+ // The record returned for this key should have the updated value for the field name
+ assertEquals(gRecord.get(fieldName).toString(), newValue);
}
// TODO : come up with a performance eval test for spillableMap
diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java b/hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java
index 74424ac36..6b12b1f97 100644
--- a/hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java
+++ b/hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java
@@ -44,6 +44,10 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload implements
super(record, orderingVal);
}
+ public OverwriteWithLatestAvroPayload(Optional record) {
+ this(record.get(), (record1) -> 0); // natural order
+ }
+
@Override
public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload another) {
// pick the payload with greatest ordering value