diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java index 39f076e5a..75dd49d6c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java @@ -64,7 +64,6 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { // Turned off by default public static final String DEFAULT_PARQUET_SMALL_FILE_LIMIT_BYTES = String.valueOf(0); - /** * Configs related to specific table types **/ @@ -102,6 +101,10 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { public static final String DEFAULT_PAYLOAD_CLASS = HoodieAvroPayload.class.getName(); public static final String PAYLOAD_CLASS = "hoodie.compaction.payload.class"; + public static final String MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES_PROP = "hoodie.compaction.spill.threshold"; + // Default memory size per compaction, excess spills to disk + public static final String DEFAULT_MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES = String.valueOf(1024*1024*1024L); //1GB + private HoodieCompactionConfig(Properties props) { super(props); } @@ -210,6 +213,18 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { return this; } + public Builder withMaxMemorySizePerCompactionInBytes(long maxMemorySizePerCompactionInBytes) { + props.setProperty(MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES_PROP, + String.valueOf(maxMemorySizePerCompactionInBytes)); + return this; + } + + public Builder withMaxNumDeltaCommitsBeforeCompaction(int maxNumDeltaCommitsBeforeCompaction) { + props.setProperty(INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, + String.valueOf(maxNumDeltaCommitsBeforeCompaction)); + return this; + } + public HoodieCompactionConfig build() { HoodieCompactionConfig config = new HoodieCompactionConfig(props); setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), @@ -245,6 +260,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { PAYLOAD_CLASS, DEFAULT_PAYLOAD_CLASS); setDefaultOnCondition(props, !props.containsKey(TARGET_IO_PER_COMPACTION_IN_MB_PROP), TARGET_IO_PER_COMPACTION_IN_MB_PROP, DEFAULT_TARGET_IO_PER_COMPACTION_IN_MB); + setDefaultOnCondition(props, !props.containsKey(MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES_PROP), + MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES_PROP, DEFAULT_MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES); HoodieCleaningPolicy.valueOf(props.getProperty(CLEANER_POLICY_PROP)); Preconditions.checkArgument( diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 7b167ebbd..9933f3c9c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -210,6 +210,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { .parseLong(props.getProperty(HoodieCompactionConfig.TARGET_IO_PER_COMPACTION_IN_MB_PROP)); } + public Long getMaxMemorySizePerCompactionInBytes() { + return Long + .parseLong(props.getProperty(HoodieCompactionConfig.MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES_PROP)); + } + /** * index properties **/ diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index 35f751899..0720b133f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -47,6 +47,7 @@ import org.apache.spark.util.SizeEstimator; import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -108,8 +109,8 @@ public class HoodieAppendHandle extends HoodieIOH this.writer = HoodieLogFormat.newWriterBuilder() .onParentPath(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath)) .withFileId(fileId).overBaseCommit(baseCommitTime).withLogVersion(fileSlice.getLogFiles() - .max(HoodieLogFile.getLogVersionComparator().reversed()::compare) - .map(logFile -> logFile.getLogVersion()).orElse(HoodieLogFile.LOGFILE_BASE_VERSION)) + .map(logFile -> logFile.getLogVersion()) + .max(Comparator.naturalOrder()).orElse(HoodieLogFile.LOGFILE_BASE_VERSION)) .withSizeThreshold(config.getLogFileMaxSize()) .withFs(fs).withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); this.currentLogFile = writer.getLogFile(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index a974e8b38..5dbe39d1f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -142,6 +142,7 @@ public class HoodieMergeHandle extends HoodieIOHa */ private String init(String fileId, Iterator> newRecordsItr) { // Load the new records in a map + // TODO (NA) instantiate a ExternalSpillableMap this.keyToNewRecords = new HashMap<>(); String partitionPath = null; while (newRecordsItr.hasNext()) { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index 6e89c38cc..c72406d12 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -16,8 +16,6 @@ package com.uber.hoodie.io.compact; -import static java.util.stream.Collectors.toList; - import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -38,6 +36,13 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieCompactionException; import com.uber.hoodie.table.HoodieCopyOnWriteTable; import com.uber.hoodie.table.HoodieTable; +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; + import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collection; @@ -46,12 +51,8 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import org.apache.avro.Schema; -import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FlatMapFunction; + +import static java.util.stream.Collectors.toList; /** * HoodieRealtimeTableCompactor compacts a hoodie table with merge on read storage. Computes all @@ -73,6 +74,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { + HoodieTableType.MERGE_ON_READ + " and not " + hoodieTable.getMetaClient() .getTableType().name()); + //TODO : check if maxMemory is not greater than JVM or spark.executor memory // TODO - rollback any compactions in flight HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); log.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime); @@ -152,7 +154,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, metaClient.getBasePath(), - operation.getDeltaFilePaths(), readerSchema, maxInstantTime); + operation.getDeltaFilePaths(), readerSchema, maxInstantTime, config.getMaxMemorySizePerCompactionInBytes()); if (!scanner.iterator().hasNext()) { return Lists.newArrayList(); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java index f01e15f8e..465b53867 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java @@ -27,7 +27,8 @@ import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; import com.uber.hoodie.common.table.log.block.HoodieCommandBlock; import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock; -import com.uber.hoodie.common.util.ReflectionUtils; +import com.uber.hoodie.common.util.SpillableMapUtils; +import com.uber.hoodie.common.util.collection.ExternalSpillableMap; import com.uber.hoodie.exception.HoodieIOException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -40,10 +41,7 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.util.ArrayDeque; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.Deque; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -64,7 +62,7 @@ public class HoodieCompactedLogRecordScanner implements private final static Logger log = LogManager.getLogger(HoodieCompactedLogRecordScanner.class); // Final map of compacted/merged records - private final Map> records; + private final ExternalSpillableMap> records; // Reader schema for the records private final Schema readerSchema; // Total log files read - for metrics @@ -82,22 +80,24 @@ public class HoodieCompactedLogRecordScanner implements Deque currentInstantLogBlocks = new ArrayDeque<>(); public HoodieCompactedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, - Schema readerSchema, String latestInstantTime) { + Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes) { this.readerSchema = readerSchema; this.latestInstantTime = latestInstantTime; this.hoodieTableMetaClient = new HoodieTableMetaClient(fs.getConf(), basePath); // load class from the payload fully qualified class name this.payloadClassFQN = this.hoodieTableMetaClient.getTableConfig().getPayloadClass(); - // Store merged records for all versions for this log file - this.records = Maps.newHashMap(); - // iterate over the paths - Iterator logFilePathsItr = logFilePaths.iterator(); - while (logFilePathsItr.hasNext()) { - HoodieLogFile logFile = new HoodieLogFile(new Path(logFilePathsItr.next())); - log.info("Scanning log file " + logFile.getPath()); - totalLogFiles.incrementAndGet(); - try { + try { + // Store merged records for all versions for this log file, set the maxInMemoryMapSize to half, + // assign other half to the temporary map needed to read next block + records = new ExternalSpillableMap<>(maxMemorySizeInBytes, readerSchema, + payloadClassFQN, Optional.empty()); + // iterate over the paths + Iterator logFilePathsItr = logFilePaths.iterator(); + while (logFilePathsItr.hasNext()) { + HoodieLogFile logFile = new HoodieLogFile(new Path(logFilePathsItr.next())); + log.info("Scanning log file " + logFile.getPath()); + totalLogFiles.incrementAndGet(); // Use the HoodieLogFormatReader to iterate through the blocks in the log file HoodieLogFormatReader reader = new HoodieLogFormatReader(fs, logFile, readerSchema, true); while (reader.hasNext()) { @@ -193,17 +193,21 @@ public class HoodieCompactedLogRecordScanner implements break; } } - - } catch (IOException e) { - throw new HoodieIOException("IOException when reading log file " + logFile); - } - // merge the last read block when all the blocks are done reading - if (!currentInstantLogBlocks.isEmpty()) { - log.info("Merging the final data blocks in " + logFile.getPath()); - merge(records, currentInstantLogBlocks); + // merge the last read block when all the blocks are done reading + if (!currentInstantLogBlocks.isEmpty()) { + log.info("Merging the final blocks in " + logFile.getPath()); + merge(records, currentInstantLogBlocks); + } } + } catch (IOException e) { + throw new HoodieIOException("IOException when reading compacting log files"); } this.totalRecordsToUpdate = records.size(); + log.info("MaxMemoryInBytes allowed for compaction => " + maxMemorySizeInBytes); + log.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records.getInMemoryMapNumEntries()); + log.info("Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records.getCurrentInMemoryMapSize()); + log.info("Number of entries in DiskBasedMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries()); + log.info("Size of file spilled to disk => " + records.getSizeOfFileOnDiskInBytes()); } /** @@ -223,21 +227,15 @@ public class HoodieCompactedLogRecordScanner implements * the log records since the base data is merged on previous compaction */ private Map> loadRecordsFromBlock( - HoodieAvroDataBlock dataBlock) { - Map> recordsFromLastBlock = Maps - .newHashMap(); + HoodieAvroDataBlock dataBlock) throws IOException { + Map> recordsFromLastBlock = Maps.newHashMap(); List recs = dataBlock.getRecords(); totalLogRecords.addAndGet(recs.size()); recs.forEach(rec -> { String key = ((GenericRecord) rec).get(HoodieRecord.RECORD_KEY_METADATA_FIELD) .toString(); - String partitionPath = - ((GenericRecord) rec).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD) - .toString(); - HoodieRecord hoodieRecord = new HoodieRecord<>( - new HoodieKey(key, partitionPath), - ReflectionUtils - .loadPayload(this.payloadClassFQN, new Object[]{Optional.of(rec)}, Optional.class)); + HoodieRecord hoodieRecord = + SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN); if (recordsFromLastBlock.containsKey(key)) { // Merge and store the merged record HoodieRecordPayload combinedValue = recordsFromLastBlock.get(key).getData() @@ -257,7 +255,7 @@ public class HoodieCompactedLogRecordScanner implements * Merge the last seen log blocks with the accumulated records */ private void merge(Map> records, - Deque lastBlocks) { + Deque lastBlocks) throws IOException { while (!lastBlocks.isEmpty()) { // poll the element at the bottom of the stack since that's the order it was inserted HoodieLogBlock lastBlock = lastBlocks.pollLast(); @@ -280,7 +278,7 @@ public class HoodieCompactedLogRecordScanner implements * Merge the records read from a single data block with the accumulated records */ private void merge(Map> records, - Map> recordsFromLastBlock) { + Map> recordsFromLastBlock) { recordsFromLastBlock.forEach((key, hoodieRecord) -> { if (records.containsKey(key)) { // Merge and store the merged record @@ -297,7 +295,7 @@ public class HoodieCompactedLogRecordScanner implements @Override public Iterator> iterator() { - return records.values().iterator(); + return records.iterator(); } public long getTotalLogFiles() { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/SpillableMapUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/SpillableMapUtils.java new file mode 100644 index 000000000..4f915897e --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/SpillableMapUtils.java @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.uber.hoodie.common.util; + +import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.util.collection.DiskBasedMap; +import com.uber.hoodie.common.util.collection.io.storage.SizeAwareDataOutputStream; +import com.uber.hoodie.exception.HoodieCorruptedDataException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.Optional; +import java.util.zip.CRC32; + +public class SpillableMapUtils { + + public static ObjectMapper objectMapper = new ObjectMapper(); + /** + * Using the schema and payload class, read and convert the bytes on disk to a HoodieRecord + * @param file + * @param schema + * @param payloadClazz + * @param valuePosition + * @param valueLength + * @param + * @return + * @throws IOException + */ + public static R readFromDisk(RandomAccessFile file, Schema schema, String payloadClazz, + long valuePosition, int valueLength) throws IOException { + + DiskBasedMap.FileEntry fileEntry = readInternal(file, valuePosition, valueLength); + return (R) convertToHoodieRecordPayload(HoodieAvroUtils.bytesToAvro(fileEntry.getValue(), schema), + payloadClazz); + } + + /** + * |crc|timestamp|sizeOfKey|SizeOfValue|key|value| + * @param file + * @param valuePosition + * @param valueLength + * @return + * @throws IOException + */ + private static DiskBasedMap.FileEntry readInternal(RandomAccessFile file, long valuePosition, int valueLength) throws IOException { + file.seek(valuePosition); + long crc = file.readLong(); + long timestamp = file.readLong(); + int keySize = file.readInt(); + int valueSize = file.readInt(); + byte [] key = new byte[keySize]; + file.read(key, 0, keySize); + byte [] value = new byte[valueSize]; + if(!(valueSize == valueLength)) { + throw new HoodieCorruptedDataException("unequal size of payload written to external file, data may be corrupted"); + } + file.read(value, 0, valueSize); + long crcOfReadValue = generateChecksum(value); + if(!(crc == crcOfReadValue)) { + throw new HoodieCorruptedDataException("checksum of payload written to external disk does not match, " + + "data may be corrupted"); + } + return new DiskBasedMap.FileEntry(crc, keySize, valueSize, key, value, timestamp); + } + + /** + * Write Value and other metadata necessary to disk. Each entry has the following sequence of data + * + * |crc|timestamp|sizeOfKey|SizeOfValue|key|value| + * + * @param outputStream + * @param fileEntry + * @return + * @throws IOException + */ + public static long spillToDisk(SizeAwareDataOutputStream outputStream, DiskBasedMap.FileEntry fileEntry) throws IOException { + return spill(outputStream, fileEntry); + } + + private static long spill(SizeAwareDataOutputStream outputStream, DiskBasedMap.FileEntry fileEntry) + throws IOException { + outputStream.writeLong(fileEntry.getCrc()); + outputStream.writeLong(fileEntry.getTimestamp()); + outputStream.writeInt(fileEntry.getSizeOfKey()); + outputStream.writeInt(fileEntry.getSizeOfValue()); + outputStream.write(fileEntry.getKey()); + outputStream.write(fileEntry.getValue()); + return outputStream.getSize(); + } + + /** + * Generate a checksum for a given set of bytes + * @param data + * @return + */ + public static long generateChecksum(byte [] data) { + CRC32 crc = new CRC32(); + crc.update(data); + return crc.getValue(); + } + + /** + * Compute a bytes representation of the payload by serializing the contents + * This is used to estimate the size of the payload (either in memory or when written to disk) + * @param + * @param value + * @param schema + * @return + * @throws IOException + */ + public static int computePayloadSize(R value, Schema schema) throws IOException { + HoodieRecord payload = (HoodieRecord) value; + byte [] val = HoodieAvroUtils.avroToBytes((GenericRecord) payload.getData().getInsertValue(schema).get()); + return val.length; + } + + /** + * Utility method to convert bytes to HoodieRecord using schema and payload class + * @param rec + * @param payloadClazz + * @param + * @return + * @throws IOException + */ + public static R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz) { + String recKey = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD) + .toString(); + String partitionPath = + rec.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + .toString(); + HoodieRecord hoodieRecord = new HoodieRecord<>( + new HoodieKey(recKey, partitionPath), + ReflectionUtils + .loadPayload(payloadClazz, new Object[]{Optional.of(rec)}, Optional.class)); + return (R) hoodieRecord; + } +} \ No newline at end of file diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java new file mode 100644 index 000000000..2503c409f --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java @@ -0,0 +1,326 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.util.collection; + +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.common.util.SpillableMapUtils; +import com.uber.hoodie.common.util.collection.io.storage.SizeAwareDataOutputStream; +import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.exception.HoodieIOException; +import com.uber.hoodie.exception.HoodieNotSupportedException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.AbstractMap; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +/** + * This class provides a disk spillable only map implementation. All of the data is + * currenly written to one file, without any rollover support. It uses the following : + * 1) An in-memory map that tracks the key-> latest ValueMetadata. + * 2) Current position in the file + * NOTE : Only String.class type supported for Key + * @param + * @param + */ +final public class DiskBasedMap implements Map { + + // Stores the key and corresponding value's latest metadata spilled to disk + final private Map inMemoryMetadataOfSpilledData; + // Read only file access to be able to seek to random positions to readFromDisk values + private RandomAccessFile readOnlyFileHandle; + // Write only OutputStream to be able to ONLY append to the file + private SizeAwareDataOutputStream writeOnlyFileHandle; + // FileOutputStream for the file handle to be able to force fsync + // since FileOutputStream's flush() does not force flush to disk + private FileOutputStream fileOutputStream; + // Current position in the file + private AtomicLong filePosition; + // Schema used to de-serialize payload written to disk + private Schema schema; + // Class used to de-serialize/realize payload written to disk + private String payloadClazz; + // FilePath to store the spilled data + private String filePath; + // Default file path prefix to put the spillable file + private static String DEFAULT_BASE_FILE_PATH = "/tmp/"; + + public final class ValueMetadata { + // FilePath to store the spilled data + private String filePath; + // Size (numberOfBytes) of the value written to disk + private Integer sizeOfValue; + // FilePosition of the value written to disk + private Long offsetOfValue; + // Current timestamp when the value was written to disk + private Long timestamp; + + protected ValueMetadata(String filePath, int sizeOfValue, long offsetOfValue, long timestamp) { + this.filePath = filePath; + this.sizeOfValue = sizeOfValue; + this.offsetOfValue = offsetOfValue; + this.timestamp = timestamp; + } + + public String getFilePath() { + return filePath; + } + + public int getSizeOfValue() { + return sizeOfValue; + } + + public Long getOffsetOfValue() { + return offsetOfValue; + } + + public long getTimestamp() { + return timestamp; + } + } + + public static final class FileEntry { + // Checksum of the value written to disk, compared during every readFromDisk to make sure no corruption + private Long crc; + // Size (numberOfBytes) of the key written to disk + private Integer sizeOfKey; + // Size (numberOfBytes) of the value written to disk + private Integer sizeOfValue; + // Actual key + private byte [] key; + // Actual value + private byte [] value; + // Current timestamp when the value was written to disk + private Long timestamp; + + public FileEntry(long crc, int sizeOfKey, int sizeOfValue, byte [] key, byte [] value, long timestamp) { + this.crc = crc; + this.sizeOfKey = sizeOfKey; + this.sizeOfValue = sizeOfValue; + this.key = key; + this.value = value; + this.timestamp = timestamp; + } + + public long getCrc() { + return crc; + } + + public int getSizeOfKey() { + return sizeOfKey; + } + + public int getSizeOfValue() { + return sizeOfValue; + } + + public byte[] getKey() { + return key; + } + + public byte[] getValue() { + return value; + } + + public long getTimestamp() { + return timestamp; + } + } + + protected DiskBasedMap(Schema schema, String payloadClazz, Optional baseFilePath) throws IOException { + this.inMemoryMetadataOfSpilledData = new HashMap<>(); + + if(!baseFilePath.isPresent()) { + baseFilePath = Optional.of(DEFAULT_BASE_FILE_PATH); + } + this.filePath = baseFilePath.get() + UUID.randomUUID().toString(); + File writeOnlyFileHandle = new File(filePath); + initFile(writeOnlyFileHandle); + + this.fileOutputStream = new FileOutputStream(writeOnlyFileHandle, true); + this.writeOnlyFileHandle = new SizeAwareDataOutputStream(fileOutputStream); + this.filePosition = new AtomicLong(0L); + this.schema = schema; + this.payloadClazz = payloadClazz; + } + + private void initFile(File writeOnlyFileHandle) throws IOException { + // delete the file if it exists + if(writeOnlyFileHandle.exists()) { + writeOnlyFileHandle.delete(); + } + writeOnlyFileHandle.createNewFile(); + // Open file in readFromDisk-only mode + readOnlyFileHandle = new RandomAccessFile(filePath, "r"); + readOnlyFileHandle.seek(0); + // Make sure file is deleted when JVM exits + writeOnlyFileHandle.deleteOnExit(); + addShutDownHook(); + } + + /** + * Register shutdown hook to force flush contents of the data written to FileOutputStream + * from OS page cache (typically 4 KB) to disk + */ + private void addShutDownHook() { + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + try { + if(writeOnlyFileHandle != null) { + writeOnlyFileHandle.flush(); + fileOutputStream.getChannel().force(false); + writeOnlyFileHandle.close(); + } + } catch(Exception e) { + // fail silently for any sort of exception + } + } + }); + } + + /** + * Custom iterator to iterate over values written to disk + * @return + */ + public Iterator iterator() { + return new LazyFileIterable(readOnlyFileHandle, + inMemoryMetadataOfSpilledData, schema, payloadClazz).iterator(); + } + + /** + * Number of bytes spilled to disk + * @return + */ + public long sizeOfFileOnDiskInBytes() { + return filePosition.get(); + } + + @Override + public int size() { + return inMemoryMetadataOfSpilledData.size(); + } + + @Override + public boolean isEmpty() { + return inMemoryMetadataOfSpilledData.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return inMemoryMetadataOfSpilledData.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + throw new HoodieNotSupportedException("unable to compare values in map"); + } + + @Override + public R get(Object key) { + ValueMetadata entry = inMemoryMetadataOfSpilledData.get(key); + if(entry == null) { + return null; + } + try { + return SpillableMapUtils.readFromDisk(readOnlyFileHandle, schema, + payloadClazz, entry.getOffsetOfValue(), entry.getSizeOfValue()); + } catch(IOException e) { + throw new HoodieIOException("Unable to readFromDisk Hoodie Record from disk", e); + } + } + + @Override + public R put(T key, R value) { + //TODO (na) : check value instanceof HoodieRecordPayload, now assume every payload is HoodieRecord + HoodieRecord payload = (HoodieRecord) value; + try { + byte [] val = HoodieAvroUtils.avroToBytes((GenericRecord) payload.getData().getInsertValue(this.schema).get()); + Integer valueSize = val.length; + Long timestamp = new Date().getTime(); + this.inMemoryMetadataOfSpilledData.put(key, new DiskBasedMap.ValueMetadata(this.filePath, valueSize, + filePosition.get(), timestamp)); + // TODO(na) : Test serializer performance for generic types + String serializedKey = SpillableMapUtils.objectMapper.writeValueAsString(key); + filePosition.set(SpillableMapUtils.spillToDisk(writeOnlyFileHandle, + new FileEntry(SpillableMapUtils.generateChecksum(val), + serializedKey.getBytes().length, valueSize, serializedKey.getBytes(), val, timestamp))); + } catch(IOException io) { + throw new HoodieIOException("Unable to store data in Disk Based map", io); + } + return value; + } + + @Override + public R remove(Object key) { + R value = get(key); + inMemoryMetadataOfSpilledData.remove(key); + return value; + } + + @Override + public void putAll(Map m) { + for(Map.Entry entry: m.entrySet()) { + put(entry.getKey(), entry.getValue()); + } + } + + @Override + public void clear() { + inMemoryMetadataOfSpilledData.clear(); + // close input/output streams + try { + writeOnlyFileHandle.flush(); + writeOnlyFileHandle.close(); + new File(filePath).delete(); + } catch(IOException e) { + throw new HoodieIOException("unable to clear map or delete file on disk", e); + } + } + + @Override + public Set keySet() { + return inMemoryMetadataOfSpilledData.keySet(); + } + + @Override + public Collection values() { + throw new HoodieException("Unsupported Operation Exception"); + } + + @Override + public Set> entrySet() { + Set> entrySet = new HashSet<>(); + for(T key: inMemoryMetadataOfSpilledData.keySet()) { + entrySet.add(new AbstractMap.SimpleEntry<>(key, get(key))); + } + return entrySet; + } +} \ No newline at end of file diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/ExternalSpillableMap.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/ExternalSpillableMap.java new file mode 100644 index 000000000..261ca2c31 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/ExternalSpillableMap.java @@ -0,0 +1,256 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.util.collection; + +import com.uber.hoodie.common.util.SpillableMapUtils; +import com.uber.hoodie.exception.HoodieIOException; +import com.uber.hoodie.exception.HoodieNotSupportedException; +import org.apache.avro.Schema; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * An external map that spills content to disk when there is insufficient space for it + * to grow. + * + * This map holds 2 types of data structures : + * + * (1) Key-Value pairs in a in-memory map + * (2) Key-ValueMetadata pairs in an in-memory map which keeps a marker to the values spilled to disk + * + * NOTE : Values are only appended to disk. If a remove() is called, the entry is marked removed from the in-memory + * key-valueMetadata map but it's values will be lying around in the temp file on disk until the file is cleaned. + * + * The setting of the spill threshold faces the following trade-off: If the spill threshold is + * too high, the in-memory map may occupy more memory than is available, resulting in OOM. + * However, if the spill threshold is too low, we spill frequently and incur unnecessary disk + * writes. + * @param + * @param + */ +public class ExternalSpillableMap implements Map { + + // maximum space allowed in-memory for this map + final private long maxInMemorySizeInBytes; + // current space occupied by this map in-memory + private Long currentInMemoryMapSize; + // Map to store key-values in memory until it hits maxInMemorySizeInBytes + final private Map inMemoryMap; + // Map to store key-valuemetadata important to find the values spilled to disk + final private DiskBasedMap diskBasedMap; + // Schema used to de-serialize and readFromDisk the records written to disk + final private Schema schema; + // An estimate of the size of each payload written to this map + private volatile long estimatedPayloadSize = 0; + // TODO(na) : a dynamic sizing factor to ensure we have space for other objects in memory and incorrect payload estimation + final private Double sizingFactorForInMemoryMap = 0.8; + + private static Logger log = LogManager.getLogger(ExternalSpillableMap.class); + + + public ExternalSpillableMap(Long maxInMemorySizeInBytes, Schema schema, + String payloadClazz, Optional baseFilePath) throws IOException { + this.inMemoryMap = new HashMap<>(); + this.diskBasedMap = new DiskBasedMap<>(schema, payloadClazz, baseFilePath); + this.maxInMemorySizeInBytes = (long) Math.floor(maxInMemorySizeInBytes*sizingFactorForInMemoryMap); + this.schema = schema; + this.currentInMemoryMapSize = 0L; + } + + /** + * A custom iterator to wrap over iterating in-memory + disk spilled data + * @return + */ + public Iterator iterator() { + return new IteratorWrapper<>(inMemoryMap.values().iterator(), diskBasedMap.iterator()); + } + + /** + * Number of entries in DiskBasedMap + * @return + */ + public int getDiskBasedMapNumEntries() { + return diskBasedMap.size(); + } + + /** + * Number of bytes spilled to disk + * @return + */ + public long getSizeOfFileOnDiskInBytes() { + return diskBasedMap.sizeOfFileOnDiskInBytes(); + } + + /** + * Number of entries in InMemoryMap + * @return + */ + public int getInMemoryMapNumEntries() { + return inMemoryMap.size(); + } + + /** + * Approximate memory footprint of the in-memory map + * @return + */ + public long getCurrentInMemoryMapSize() { + return currentInMemoryMapSize; + } + + @Override + public int size() { + return inMemoryMap.size() + diskBasedMap.size(); + } + + @Override + public boolean isEmpty() { + return inMemoryMap.isEmpty() && diskBasedMap.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return inMemoryMap.containsKey(key) || diskBasedMap.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + return inMemoryMap.containsValue(value) || diskBasedMap.containsValue(value); + } + + @Override + public R get(Object key) { + if(inMemoryMap.containsKey(key)) { + return inMemoryMap.get(key); + } else if(diskBasedMap.containsKey(key)) { + return diskBasedMap.get(key); + } + return null; + } + + @Override + public R put(T key, R value) { + try { + if (this.currentInMemoryMapSize < maxInMemorySizeInBytes || inMemoryMap.containsKey(key)) { + // Naive approach for now + if (estimatedPayloadSize == 0) { + this.estimatedPayloadSize = SpillableMapUtils.computePayloadSize(value, schema); + log.info("Estimated Payload size => " + estimatedPayloadSize); + } + if(!inMemoryMap.containsKey(key)) { + currentInMemoryMapSize += this.estimatedPayloadSize; + } + inMemoryMap.put(key, value); + } else { + diskBasedMap.put(key, value); + } + return value; + } catch(IOException io) { + throw new HoodieIOException("Unable to estimate size of payload", io); + } + } + + @Override + public R remove(Object key) { + // NOTE : diskBasedMap.remove does not delete the data from disk + if(inMemoryMap.containsKey(key)) { + currentInMemoryMapSize -= estimatedPayloadSize; + return inMemoryMap.remove(key); + } else if(diskBasedMap.containsKey(key)) { + return diskBasedMap.remove(key); + } + return null; + } + + @Override + public void putAll(Map m) { + for(Map.Entry entry: m.entrySet()) { + put(entry.getKey(), entry.getValue()); + } + } + + @Override + public void clear() { + inMemoryMap.clear(); + diskBasedMap.clear(); + currentInMemoryMapSize = 0L; + } + + @Override + public Set keySet() { + Set keySet = new HashSet(); + keySet.addAll(inMemoryMap.keySet()); + keySet.addAll(diskBasedMap.keySet()); + return keySet; + } + + @Override + public Collection values() { + if(diskBasedMap.isEmpty()) { + return inMemoryMap.values(); + } + throw new HoodieNotSupportedException("Cannot return all values in memory"); + } + + @Override + public Set> entrySet() { + Set> entrySet = new HashSet<>(); + entrySet.addAll(inMemoryMap.entrySet()); + entrySet.addAll(diskBasedMap.entrySet()); + return entrySet; + } + + /** + * Iterator that wraps iterating over all the values for this map + * 1) inMemoryIterator - Iterates over all the data in-memory map + * 2) diskLazyFileIterator - Iterates over all the data spilled to disk + * @param + */ + private class IteratorWrapper implements Iterator { + + private Iterator inMemoryIterator; + private Iterator diskLazyFileIterator; + + public IteratorWrapper(Iterator inMemoryIterator, Iterator diskLazyFileIterator) { + this.inMemoryIterator = inMemoryIterator; + this.diskLazyFileIterator = diskLazyFileIterator; + } + @Override + public boolean hasNext() { + if(inMemoryIterator.hasNext()) { + return true; + } + return diskLazyFileIterator.hasNext(); + } + + @Override + public R next() { + if(inMemoryIterator.hasNext()) { + return inMemoryIterator.next(); + } + return diskLazyFileIterator.next(); + } + } +} \ No newline at end of file diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java new file mode 100644 index 000000000..f91894ecd --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.util.collection; + +import com.uber.hoodie.common.util.SpillableMapUtils; +import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.exception.HoodieIOException; +import org.apache.avro.Schema; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * Iterable to lazily fetch values spilled to disk. + * This class uses RandomAccessFile to randomly access the position of + * the latest value for a key spilled to disk and returns the result. + * @param + */ +public class LazyFileIterable implements Iterable { + + // Used to access the value written at a specific position in the file + private RandomAccessFile readOnlyFileHandle; + // Stores the key and corresponding value's latest metadata spilled to disk + private Map inMemoryMetadataOfSpilledData; + // Schema used to de-serialize payload written to disk + private Schema schema; + // Class used to de-serialize/realize payload written to disk + private String payloadClazz; + + public LazyFileIterable(RandomAccessFile file, Map map, + Schema schema, String payloadClazz) { + this.readOnlyFileHandle = file; + this.inMemoryMetadataOfSpilledData = map; + this.schema = schema; + this.payloadClazz = payloadClazz; + } + @Override + public Iterator iterator() { + try { + return new LazyFileIterator<>(readOnlyFileHandle, inMemoryMetadataOfSpilledData, schema, payloadClazz); + } catch(IOException io) { + throw new HoodieException("Unable to initialize iterator for file on disk", io); + } + } + + /** + * Iterator implementation for the iterable defined above. + * @param + */ + public class LazyFileIterator implements Iterator { + + private RandomAccessFile readOnlyFileHandle; + private Schema schema; + private String payloadClazz; + private Iterator> metadataIterator; + + public LazyFileIterator(RandomAccessFile file, Map map, + Schema schema, String payloadClazz) throws IOException { + this.readOnlyFileHandle = file; + this.schema = schema; + this.payloadClazz = payloadClazz; + // sort the map in increasing order of offset of value so disk seek is only in one(forward) direction + this.metadataIterator = map + .entrySet() + .stream() + .sorted((Map.Entry o1, Map.Entry o2) -> + o1.getValue().getOffsetOfValue().compareTo(o2.getValue().getOffsetOfValue())) + .collect(Collectors.toList()).iterator(); + } + + @Override + public boolean hasNext() { + return this.metadataIterator.hasNext(); + } + + @Override + public T next() { + Map.Entry entry = this.metadataIterator.next(); + try { + return SpillableMapUtils.readFromDisk(readOnlyFileHandle, schema, + payloadClazz, entry.getValue().getOffsetOfValue(), entry.getValue().getSizeOfValue()); + } catch(IOException e) { + throw new HoodieIOException("Unable to read hoodie record from value spilled to disk", e); + } + } + + @Override + public void remove() { + this.metadataIterator.remove(); + } + + @Override + public void forEachRemaining(Consumer action) { + action.accept(next()); + } + } +} \ No newline at end of file diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/io/storage/SizeAwareDataOutputStream.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/io/storage/SizeAwareDataOutputStream.java new file mode 100644 index 000000000..8280dda3f --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/io/storage/SizeAwareDataOutputStream.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.util.collection.io.storage; + +import java.io.DataOutputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Wrapper for DataOutpuStream to keep track of number of bytes written + */ +public class SizeAwareDataOutputStream { + // Actual outpuStream + private DataOutputStream outputStream; + // Counter to keep track of number of bytes written + private AtomicLong size; + + public SizeAwareDataOutputStream(FileOutputStream fileOutputStream) { + this.outputStream = new DataOutputStream(fileOutputStream); + this.size = new AtomicLong(0L); + } + + public void writeLong(long v) throws IOException { + size.addAndGet(Long.BYTES); + outputStream.writeLong(v); + } + + public void writeInt(int v) throws IOException { + size.addAndGet(Integer.BYTES); + outputStream.writeInt(v); + } + + public void write(byte [] v) throws IOException { + size.addAndGet(v.length); + outputStream.write(v); + } + + public void write(byte [] v, int offset, int len) throws IOException { + size.addAndGet(len + offset); + outputStream.write(v, offset, len); + } + + public void flush() throws IOException { + outputStream.flush(); + } + + public void close() throws IOException { + outputStream.close(); + } + + public long getSize() { + return size.get(); + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/exception/HoodieCorruptedDataException.java b/hoodie-common/src/main/java/com/uber/hoodie/exception/HoodieCorruptedDataException.java new file mode 100644 index 000000000..3c10a1537 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/exception/HoodieCorruptedDataException.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.exception; + +/** + *

Exception thrown when any data corruption happens when reading/writing from temporary disk

+ */ +public class HoodieCorruptedDataException extends HoodieException { + + public HoodieCorruptedDataException(String msg) { + super(msg); + } + + public HoodieCorruptedDataException(String msg, Throwable e) { + super(msg, e); + } + +} diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java index a8be14ee4..e57c7ebe7 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java @@ -16,12 +16,6 @@ package com.uber.hoodie.common.table.log; -import static com.uber.hoodie.common.util.SchemaTestUtil.getSimpleSchema; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import com.google.common.collect.Maps; import com.uber.hoodie.common.minicluster.MiniClusterUtil; import com.uber.hoodie.common.model.HoodieArchivedLogFile; @@ -41,15 +35,7 @@ import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.common.util.SchemaTestUtil; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; +import com.uber.hoodie.common.util.collection.DiskBasedMap; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; @@ -64,6 +50,23 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static com.uber.hoodie.common.util.SchemaTestUtil.getSimpleSchema; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + @SuppressWarnings("Duplicates") public class HoodieLogFormatTest { @@ -490,7 +493,7 @@ public class HoodieLogFormatTest { HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, - schema, "100"); + schema, "100", 10240L); assertEquals("", 200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); @@ -554,9 +557,8 @@ public class HoodieLogFormatTest { HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, - schema, "102"); - assertEquals("We read 200 records from 2 write batches", 200, - scanner.getTotalLogRecords()); + schema, "102", 10240L); + assertEquals("We only read 200 records, but only 200 of them are valid", 200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); assertEquals("Stream collect should return all 200 records", 200, readKeys.size()); @@ -635,9 +637,8 @@ public class HoodieLogFormatTest { HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, - schema, "103"); - assertEquals("We would read 200 records", 200, - scanner.getTotalLogRecords()); + schema, "103", 10240L); + assertEquals("We would read 200 records", 200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); assertEquals("Stream collect should return all 200 records", 200, readKeys.size()); @@ -700,9 +701,8 @@ public class HoodieLogFormatTest { HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, - schema, "102"); - assertEquals("We still would read 200 records", 200, - scanner.getTotalLogRecords()); + schema, "102", 10240L); + assertEquals("We still would read 200 records", 200, scanner.getTotalLogRecords()); final List readKeys = new ArrayList<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); assertEquals("Stream collect should return all 150 records", 150, readKeys.size()); @@ -720,7 +720,7 @@ public class HoodieLogFormatTest { writer = writer.appendBlock(commandBlock); readKeys.clear(); - scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, "101"); + scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, "101", 10240L); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); assertEquals("Stream collect should return all 200 records after rollback of delete", 200, readKeys.size()); @@ -783,12 +783,10 @@ public class HoodieLogFormatTest { .map(s -> s.getPath().toString()) .collect(Collectors.toList()); - HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, - allLogFiles, - schema, "100"); // all data must be rolled back before merge - assertEquals("We would read 0 records", 0, - scanner.getTotalLogRecords()); + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, + allLogFiles, schema, "100", 10240L); + assertEquals("We would have scanned 0 records because of rollback", 0, scanner.getTotalLogRecords()); final List readKeys = new ArrayList<>(); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); @@ -841,7 +839,7 @@ public class HoodieLogFormatTest { .collect(Collectors.toList()); HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, - allLogFiles, schema, "100"); + allLogFiles, schema, "100", 10240L); assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords()); } @@ -875,7 +873,7 @@ public class HoodieLogFormatTest { .collect(Collectors.toList()); HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, - allLogFiles, schema, "100"); + allLogFiles, schema, "100", 10240L); assertEquals("We still would read 100 records", 100, scanner.getTotalLogRecords()); final List readKeys = new ArrayList<>(100); @@ -931,7 +929,7 @@ public class HoodieLogFormatTest { .collect(Collectors.toList()); HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, - allLogFiles, schema, "101"); + allLogFiles, schema, "101", 10240L); assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords()); } @@ -1014,8 +1012,9 @@ public class HoodieLogFormatTest { .collect(Collectors.toList()); HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, - allLogFiles, schema, "101"); + allLogFiles, schema, "101", 10240L); assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords()); } + } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java index 9af169294..1de52e8ed 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java @@ -18,6 +18,7 @@ package com.uber.hoodie.common.util; import com.uber.hoodie.avro.MercifulJsonConverter; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; import java.net.URI; @@ -35,6 +36,7 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; @@ -53,7 +55,7 @@ public class SchemaTestUtil { } private static List toRecords(Schema writerSchema, Schema readerSchema, int from, - int limit) throws IOException, URISyntaxException { + int limit) throws IOException, URISyntaxException { GenericDatumReader reader = new GenericDatumReader<>(writerSchema, readerSchema); // Required to register the necessary JAR:// file system @@ -93,12 +95,29 @@ public class SchemaTestUtil { public static List generateHoodieTestRecords(int from, int limit) throws IOException, URISyntaxException { List records = generateTestRecords(from, limit); + String commitTime = HoodieActiveTimeline.createNewCommitTime(); Schema hoodieFieldsSchema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); return records.stream() .map(s -> HoodieAvroUtils.rewriteRecord((GenericRecord) s, hoodieFieldsSchema)) .map(p -> { p.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, UUID.randomUUID().toString()); p.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, "0000/00/00"); + p.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitTime); + return p; + }).collect( + Collectors.toList()); + + } + + public static List updateHoodieTestRecords(List oldRecordKeys, List newRecords, + String commitTime) + throws IOException, URISyntaxException { + + return newRecords.stream() + .map(p -> { + ((GenericRecord)p).put(HoodieRecord.RECORD_KEY_METADATA_FIELD, oldRecordKeys.remove(0)); + ((GenericRecord)p).put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, "0000/00/00"); + ((GenericRecord)p).put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitTime); return p; }).collect( Collectors.toList()); @@ -121,7 +140,7 @@ public class SchemaTestUtil { } public static GenericRecord generateAvroRecordFromJson(Schema schema, int recordNumber, - String commitTime, String fileId) throws IOException { + String commitTime, String fileId) throws IOException { TestRecord record = new TestRecord(commitTime, recordNumber, fileId); MercifulJsonConverter converter = new MercifulJsonConverter(schema); return converter.convert(record.toJsonString()); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/SpillableMapTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/SpillableMapTestUtils.java new file mode 100644 index 000000000..05be50144 --- /dev/null +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/SpillableMapTestUtils.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.util; + +import com.uber.hoodie.common.model.HoodieAvroPayload; +import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class SpillableMapTestUtils { + + public static List upsertRecords(List iRecords, + Map> records) { + List recordKeys = new ArrayList<>(); + iRecords + .stream() + .forEach(r -> { + String key = ((GenericRecord)r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + String partitionPath = ((GenericRecord)r).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); + recordKeys.add(key); + records.put(key, new HoodieRecord<>(new HoodieKey(key, partitionPath), + new HoodieAvroPayload(Optional.of((GenericRecord) r)))); + }); + return recordKeys; + } +} diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestDiskBasedMap.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestDiskBasedMap.java new file mode 100644 index 000000000..eb570a655 --- /dev/null +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestDiskBasedMap.java @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.util.collection; + +import com.uber.hoodie.common.model.HoodieAvroPayload; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; +import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.common.util.SchemaTestUtil; +import com.uber.hoodie.common.util.SpillableMapTestUtils; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.junit.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; + +public class TestDiskBasedMap { + + @Test + public void testSimpleInsert() throws IOException, URISyntaxException { + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); + DiskBasedMap records = new DiskBasedMap<>(schema, HoodieAvroPayload.class.getName(),Optional.empty()); + List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); + String commitTime = ((GenericRecord)iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); + List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); + + // make sure records have spilled to disk + assertTrue(records.sizeOfFileOnDiskInBytes() > 0); + Iterator> itr = records.iterator(); + List oRecords = new ArrayList<>(); + while(itr.hasNext()) { + HoodieRecord rec = itr.next(); + oRecords.add(rec); + assert recordKeys.contains(rec.getRecordKey()); + } + } + + @Test + public void testSimpleUpsert() throws IOException, URISyntaxException { + + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); + DiskBasedMap records = new DiskBasedMap<>(schema, HoodieAvroPayload.class.getName(),Optional.empty()); + List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); + String commitTime = ((GenericRecord)iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); + // perform some inserts + List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); + + long fileSize = records.sizeOfFileOnDiskInBytes(); + // make sure records have spilled to disk + assertTrue(fileSize > 0); + + // generate updates from inserts + List updatedRecords = + SchemaTestUtil.updateHoodieTestRecords(recordKeys, SchemaTestUtil.generateHoodieTestRecords(0, 100), + HoodieActiveTimeline.createNewCommitTime()); + String newCommitTime = ((GenericRecord)updatedRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); + + // new commit time should be different + assertEquals(commitTime, newCommitTime); + + // perform upserts + recordKeys = SpillableMapTestUtils.upsertRecords(updatedRecords, records); + + // upserts should be appended to the existing file, hence increasing the sizeOfFile on disk + assertTrue(records.sizeOfFileOnDiskInBytes() > fileSize); + + // Upserted records (on disk) should have the latest commit time + Iterator> itr = records.iterator(); + while(itr.hasNext()) { + HoodieRecord rec = itr.next(); + assert recordKeys.contains(rec.getRecordKey()); + try { + IndexedRecord indexedRecord = (IndexedRecord)rec.getData().getInsertValue(schema).get(); + String latestCommitTime = ((GenericRecord)indexedRecord).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); + assertEquals(latestCommitTime, newCommitTime); + } catch(IOException io) { + throw new UncheckedIOException(io); + } + } + } +} diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java new file mode 100644 index 000000000..d15caf645 --- /dev/null +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java @@ -0,0 +1,245 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.util.collection; + +import com.uber.hoodie.common.model.HoodieAvroPayload; +import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; +import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.common.util.SchemaTestUtil; +import com.uber.hoodie.common.util.SpillableMapTestUtils; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class TestExternalSpillableMap { + + private static final String FAILURE_OUTPUT_PATH = "/tmp/test_fail"; + + @Test + public void simpleInsertTest() throws IOException, URISyntaxException { + ExternalSpillableMap> records = + new ExternalSpillableMap<> + (16L, HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()), + HoodieAvroPayload.class.getName(), Optional.empty()); //16B + + List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); + List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); + assert (recordKeys.size() == 100); + Iterator> itr = records.iterator(); + List oRecords = new ArrayList<>(); + while(itr.hasNext()) { + HoodieRecord rec = itr.next(); + oRecords.add(rec); + assert recordKeys.contains(rec.getRecordKey()); + } + } + + @Test + public void testSimpleUpsert() throws IOException, URISyntaxException { + + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); + ExternalSpillableMap> records = + new ExternalSpillableMap<> + (16L, schema, + HoodieAvroPayload.class.getName(), Optional.of(FAILURE_OUTPUT_PATH)); //16B + + List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); + List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); + assert (recordKeys.size() == 100); + Iterator> itr = records.iterator(); + while(itr.hasNext()) { + HoodieRecord rec = itr.next(); + assert recordKeys.contains(rec.getRecordKey()); + } + List updatedRecords = + SchemaTestUtil.updateHoodieTestRecords(recordKeys, SchemaTestUtil.generateHoodieTestRecords(0, 100), + HoodieActiveTimeline.createNewCommitTime()); + + // update records already inserted + SpillableMapTestUtils.upsertRecords(updatedRecords, records); + + // make sure we have records spilled to disk + assertTrue(records.getDiskBasedMapNumEntries() > 0); + + // iterate over the updated records and compare the value from Map + updatedRecords.stream().forEach(record -> { + HoodieRecord rec = records.get(((GenericRecord) record).get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + try { + assertEquals(rec.getData().getInsertValue(schema).get(),record); + } catch(IOException io) { + throw new UncheckedIOException(io); + } + }); + } + + @Test + public void testAllMapOperations() throws IOException, URISyntaxException { + + ExternalSpillableMap> records = + new ExternalSpillableMap<> + (16L, HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()), + HoodieAvroPayload.class.getName(), Optional.empty()); //16B + + List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); + // insert a bunch of records so that values spill to disk too + List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); + IndexedRecord inMemoryRecord = iRecords.get(0); + String ikey = ((GenericRecord)inMemoryRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + String iPartitionPath = ((GenericRecord)inMemoryRecord).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); + HoodieRecord inMemoryHoodieRecord = new HoodieRecord<>(new HoodieKey(ikey, iPartitionPath), + new HoodieAvroPayload(Optional.of((GenericRecord)inMemoryRecord))); + + IndexedRecord onDiskRecord = iRecords.get(99); + String dkey = ((GenericRecord)onDiskRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + String dPartitionPath = ((GenericRecord)onDiskRecord).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); + HoodieRecord onDiskHoodieRecord = new HoodieRecord<>(new HoodieKey(dkey, dPartitionPath), + new HoodieAvroPayload(Optional.of((GenericRecord)onDiskRecord))); + // assert size + assert records.size() == 100; + // get should return the same HoodieKey and same value + assert inMemoryHoodieRecord.getKey().equals(records.get(ikey).getKey()); + assert onDiskHoodieRecord.getKey().equals(records.get(dkey).getKey()); + //assert inMemoryHoodieRecord.equals(records.get(ikey)); + //assert onDiskHoodieRecord.equals(records.get(dkey)); + + // test contains + assertTrue(records.containsKey(ikey)); + assertTrue(records.containsKey(dkey)); + + // test isEmpty + assertFalse(records.isEmpty()); + + // test containsAll + assertTrue(records.keySet().containsAll(recordKeys)); + + // remove (from inMemory and onDisk) + HoodieRecord removedRecord = records.remove(ikey); + assertTrue(removedRecord != null); + assertFalse(records.containsKey(ikey)); + + removedRecord = records.remove(dkey); + assertTrue(removedRecord != null); + assertFalse(records.containsKey(dkey)); + + // test clear + records.clear(); + assertTrue(records.size() == 0); + } + + @Test(expected = IOException.class) + public void simpleTestWithException() throws IOException, URISyntaxException { + ExternalSpillableMap> records = + new ExternalSpillableMap<> + (16L, HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()), + HoodieAvroPayload.class.getName(), Optional.of(FAILURE_OUTPUT_PATH)); //16B + + List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); + List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); + assert (recordKeys.size() == 100); + Iterator> itr = records.iterator(); + while(itr.hasNext()) { + throw new IOException("Testing failures..."); + } + } + + @Test + public void simpleTestWithExceptionValidateFileIsRemoved() throws Exception { + File file = new File(FAILURE_OUTPUT_PATH); + assertFalse(file.exists()); + } + + @Test + public void testDataCorrectnessInMapAndDisk() throws IOException, URISyntaxException { + + Schema schema = SchemaTestUtil.getSimpleSchema(); + ExternalSpillableMap> records = + new ExternalSpillableMap<> + (16L, HoodieAvroUtils.addMetadataFields(schema), + HoodieAvroPayload.class.getName(), Optional.of(FAILURE_OUTPUT_PATH)); //16B + + List recordKeys = new ArrayList<>(); + // Ensure we spill to disk + while(records.getDiskBasedMapNumEntries() < 1) { + List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); + recordKeys.addAll(SpillableMapTestUtils.upsertRecords(iRecords, records)); + } + + // Get a record from the in-Memory map + String key = recordKeys.get(0); + HoodieRecord record = records.get(key); + List recordsToUpdate = new ArrayList<>(); + schema = HoodieAvroUtils.addMetadataFields(schema); + recordsToUpdate.add((IndexedRecord) record.getData().getInsertValue(schema).get()); + + String newCommitTime = HoodieActiveTimeline.createNewCommitTime(); + List keysToBeUpdated = new ArrayList<>(); + keysToBeUpdated.add(key); + // Update the commitTime for this record + List updatedRecords = + SchemaTestUtil.updateHoodieTestRecords(keysToBeUpdated, recordsToUpdate, newCommitTime); + // Upsert this updated record + SpillableMapTestUtils.upsertRecords(updatedRecords, records); + GenericRecord gRecord = (GenericRecord) records.get(key).getData().getInsertValue(schema).get(); + // The record returned for this key should have the updated commitTime + assert newCommitTime.contentEquals(gRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString()); + + + // Get a record from the disk based map + key = recordKeys.get(recordKeys.size() - 1); + record = records.get(key); + recordsToUpdate = new ArrayList<>(); + recordsToUpdate.add((IndexedRecord) record.getData().getInsertValue(schema).get()); + + newCommitTime = HoodieActiveTimeline.createNewCommitTime(); + keysToBeUpdated = new ArrayList<>(); + keysToBeUpdated.add(key); + // Update the commitTime for this record + updatedRecords = + SchemaTestUtil.updateHoodieTestRecords(keysToBeUpdated, recordsToUpdate, newCommitTime); + // Upsert this updated record + SpillableMapTestUtils.upsertRecords(updatedRecords, records); + gRecord = (GenericRecord) records.get(key).getData().getInsertValue(schema).get(); + // The record returned for this key should have the updated commitTime + assert newCommitTime.contentEquals(gRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString()); + + } + + // TODO : come up with a performance eval test for spillableMap + @Test + public void testLargeInsertUpsert() { + } +} diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java index 35ef39eda..9f231084a 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java @@ -68,6 +68,10 @@ public class HoodieRealtimeRecordReader implements RecordReader deltaRecordMap; @@ -126,7 +130,9 @@ public class HoodieRealtimeRecordReader implements RecordReader the commit we are trying to read (if using readCommit() API) for (HoodieRecord hoodieRecord : compactedLogRecordScanner) {