diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index 621b37a76..580975f92 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -30,8 +30,6 @@ import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieRecordSizeEstimator; import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.common.util.collection.ExternalSpillableMap; -import com.uber.hoodie.common.util.collection.converter.HoodieRecordConverter; -import com.uber.hoodie.common.util.collection.converter.StringConverter; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieUpsertException; @@ -150,9 +148,7 @@ public class HoodieMergeHandle extends HoodieIOHa // Load the new records in a map logger.info("MaxMemoryPerPartitionMerge => " + config.getMaxMemoryPerPartitionMerge()); this.keyToNewRecords = new ExternalSpillableMap<>(config.getMaxMemoryPerPartitionMerge(), - config.getSpillableMapBasePath(), new StringConverter(), - new HoodieRecordConverter(schema, config.getPayloadClass()), - new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); + config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); } catch (IOException io) { throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java index a6c45c737..9e4be0db9 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java @@ -17,6 +17,7 @@ package com.uber.hoodie.common.model; import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; import java.util.Optional; import org.apache.avro.Schema; @@ -29,10 +30,16 @@ import org.apache.avro.generic.IndexedRecord; */ public class HoodieAvroPayload implements HoodieRecordPayload { - private final Optional record; + // Store the GenericRecord converted to bytes - 1) Doesn't store schema hence memory efficient 2) Makes the payload + // java serializable + private final byte [] recordBytes; public HoodieAvroPayload(Optional record) { - this.record = record; + try { + this.recordBytes = HoodieAvroUtils.avroToBytes(record.get()); + } catch (IOException io) { + throw new HoodieIOException("Cannot convert record to bytes", io); + } } @Override @@ -48,6 +55,7 @@ public class HoodieAvroPayload implements HoodieRecordPayload @Override public Optional getInsertValue(Schema schema) throws IOException { + Optional record = Optional.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema)); return record.map(r -> HoodieAvroUtils.rewriteRecord(r, schema)); } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieMergedLogRecordScanner.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieMergedLogRecordScanner.java index ed9766069..6a12d5914 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieMergedLogRecordScanner.java @@ -23,8 +23,6 @@ import com.uber.hoodie.common.util.DefaultSizeEstimator; import com.uber.hoodie.common.util.HoodieRecordSizeEstimator; import com.uber.hoodie.common.util.HoodieTimer; import com.uber.hoodie.common.util.collection.ExternalSpillableMap; -import com.uber.hoodie.common.util.collection.converter.HoodieRecordConverter; -import com.uber.hoodie.common.util.collection.converter.StringConverter; import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; import java.util.Iterator; @@ -71,7 +69,6 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner try { // Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, - new StringConverter(), new HoodieRecordConverter(readerSchema, getPayloadClassFQN()), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(readerSchema)); // Do the scan and merge timer.startTimer(); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java index 598025471..047b99796 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java @@ -16,8 +16,8 @@ package com.uber.hoodie.common.util.collection; +import com.uber.hoodie.common.util.SerializationUtils; import com.uber.hoodie.common.util.SpillableMapUtils; -import com.uber.hoodie.common.util.collection.converter.Converter; import com.uber.hoodie.common.util.collection.io.storage.SizeAwareDataOutputStream; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIOException; @@ -26,6 +26,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; +import java.io.Serializable; import java.net.InetAddress; import java.util.AbstractMap; import java.util.Collection; @@ -45,15 +46,11 @@ import org.apache.log4j.Logger; * without any rollover support. It uses the following : 1) An in-memory map that tracks the key-> latest ValueMetadata. * 2) Current position in the file NOTE : Only String.class type supported for Key */ -public final class DiskBasedMap implements Map { +public final class DiskBasedMap implements Map { private static final Logger log = LogManager.getLogger(DiskBasedMap.class); // Stores the key and corresponding value's latest metadata spilled to disk private final Map valueMetadataMap; - // Key converter to convert key type to bytes - private final Converter keyConverter; - // Value converter to convert value type to bytes - private final Converter valueConverter; // Read only file access to be able to seek to random positions to readFromDisk values private RandomAccessFile readOnlyFileHandle; // Write only OutputStream to be able to ONLY append to the file @@ -67,8 +64,7 @@ public final class DiskBasedMap implements Map { private String filePath; - protected DiskBasedMap(String baseFilePath, - Converter keyConverter, Converter valueConverter) throws IOException { + protected DiskBasedMap(String baseFilePath) throws IOException { this.valueMetadataMap = new HashMap<>(); File writeOnlyFileHandle = new File(baseFilePath, UUID.randomUUID().toString()); this.filePath = writeOnlyFileHandle.getPath(); @@ -76,8 +72,6 @@ public final class DiskBasedMap implements Map { this.fileOutputStream = new FileOutputStream(writeOnlyFileHandle, true); this.writeOnlyFileHandle = new SizeAwareDataOutputStream(fileOutputStream); this.filePosition = new AtomicLong(0L); - this.keyConverter = keyConverter; - this.valueConverter = valueConverter; } private void initFile(File writeOnlyFileHandle) throws IOException { @@ -125,7 +119,7 @@ public final class DiskBasedMap implements Map { */ public Iterator iterator() { return new LazyFileIterable(readOnlyFileHandle, - valueMetadataMap, valueConverter).iterator(); + valueMetadataMap).iterator(); } /** @@ -162,7 +156,7 @@ public final class DiskBasedMap implements Map { return null; } try { - return this.valueConverter.getData(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle, + return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle, entry.getOffsetOfValue(), entry.getSizeOfValue())); } catch (IOException e) { throw new HoodieIOException("Unable to readFromDisk Hoodie Record from disk", e); @@ -172,12 +166,12 @@ public final class DiskBasedMap implements Map { @Override public R put(T key, R value) { try { - byte[] val = this.valueConverter.getBytes(value); + byte[] val = SerializationUtils.serialize(value); Integer valueSize = val.length; Long timestamp = new Date().getTime(); this.valueMetadataMap.put(key, new DiskBasedMap.ValueMetadata(this.filePath, valueSize, filePosition.get(), timestamp)); - byte[] serializedKey = keyConverter.getBytes(key); + byte[] serializedKey = SerializationUtils.serialize(key); filePosition.set(SpillableMapUtils.spillToDisk(writeOnlyFileHandle, new FileEntry(SpillableMapUtils.generateChecksum(val), serializedKey.length, valueSize, serializedKey, val, timestamp))); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/ExternalSpillableMap.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/ExternalSpillableMap.java index 081a889e7..5ac94d7c7 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/ExternalSpillableMap.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/ExternalSpillableMap.java @@ -18,9 +18,9 @@ package com.uber.hoodie.common.util.collection; import com.twitter.common.objectsize.ObjectSizeCalculator; import com.uber.hoodie.common.util.SizeEstimator; -import com.uber.hoodie.common.util.collection.converter.Converter; import com.uber.hoodie.exception.HoodieNotSupportedException; import java.io.IOException; +import java.io.Serializable; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -39,7 +39,7 @@ import org.apache.log4j.Logger; * trade-off: If the spill threshold is too high, the in-memory map may occupy more memory than is available, resulting * in OOM. However, if the spill threshold is too low, we spill frequently and incur unnecessary disk writes. */ -public class ExternalSpillableMap implements Map { +public class ExternalSpillableMap implements Map { // Find the actual estimated payload size after inserting N records private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE = 100; @@ -53,10 +53,6 @@ public class ExternalSpillableMap implements Map { // TODO(na) : a dynamic sizing factor to ensure we have space for other objects in memory and // incorrect payload estimation private final Double sizingFactorForInMemoryMap = 0.8; - // Key converter to convert key type to bytes - private final Converter keyConverter; - // Value converter to convert value type to bytes - private final Converter valueConverter; // Size Estimator for key type private final SizeEstimator keySizeEstimator; // Size Estimator for key types @@ -69,15 +65,12 @@ public class ExternalSpillableMap implements Map { private boolean shouldEstimatePayloadSize = true; public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, - Converter keyConverter, Converter valueConverter, SizeEstimator keySizeEstimator, SizeEstimator valueSizeEstimator) throws IOException { this.inMemoryMap = new HashMap<>(); - this.diskBasedMap = new DiskBasedMap<>(baseFilePath, keyConverter, valueConverter); + this.diskBasedMap = new DiskBasedMap<>(baseFilePath); this.maxInMemorySizeInBytes = (long) Math .floor(maxInMemorySizeInBytes * sizingFactorForInMemoryMap); this.currentInMemoryMapSize = 0L; - this.keyConverter = keyConverter; - this.valueConverter = valueConverter; this.keySizeEstimator = keySizeEstimator; this.valueSizeEstimator = valueSizeEstimator; } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java index 0d53e2659..c2d74e0bd 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java @@ -16,8 +16,8 @@ package com.uber.hoodie.common.util.collection; +import com.uber.hoodie.common.util.SerializationUtils; import com.uber.hoodie.common.util.SpillableMapUtils; -import com.uber.hoodie.common.util.collection.converter.Converter; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; @@ -37,20 +37,16 @@ public class LazyFileIterable implements Iterable { private final RandomAccessFile readOnlyFileHandle; // Stores the key and corresponding value's latest metadata spilled to disk private final Map inMemoryMetadataOfSpilledData; - private final Converter valueConverter; - public LazyFileIterable(RandomAccessFile file, Map map, - Converter valueConverter) { + public LazyFileIterable(RandomAccessFile file, Map map) { this.readOnlyFileHandle = file; this.inMemoryMetadataOfSpilledData = map; - this.valueConverter = valueConverter; } @Override public Iterator iterator() { try { - return new LazyFileIterator<>(readOnlyFileHandle, inMemoryMetadataOfSpilledData, - valueConverter); + return new LazyFileIterator<>(readOnlyFileHandle, inMemoryMetadataOfSpilledData); } catch (IOException io) { throw new HoodieException("Unable to initialize iterator for file on disk", io); } @@ -61,14 +57,11 @@ public class LazyFileIterable implements Iterable { */ public class LazyFileIterator implements Iterator { - private final Converter valueConverter; private RandomAccessFile readOnlyFileHandle; private Iterator> metadataIterator; - public LazyFileIterator(RandomAccessFile file, Map map, - Converter valueConverter) throws IOException { + public LazyFileIterator(RandomAccessFile file, Map map) throws IOException { this.readOnlyFileHandle = file; - this.valueConverter = valueConverter; // sort the map in increasing order of offset of value so disk seek is only in one(forward) direction this.metadataIterator = map .entrySet() @@ -88,7 +81,7 @@ public class LazyFileIterable implements Iterable { public R next() { Map.Entry entry = this.metadataIterator.next(); try { - return valueConverter.getData(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle, + return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle, entry.getValue().getOffsetOfValue(), entry.getValue().getSizeOfValue())); } catch (IOException e) { throw new HoodieIOException("Unable to read hoodie record from value spilled to disk", e); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/Converter.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/Converter.java deleted file mode 100644 index 55168baa9..000000000 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/Converter.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.common.util.collection.converter; - -/** - * A converter interface to getBytes or deserialize a payload. This is used in {@link - * com.uber.hoodie.common.util.collection.ExternalSpillableMap} to spillToDisk - */ -public interface Converter { - - /** - * This method is used to convert a payload to bytes - */ - byte[] getBytes(T t); - - /** - * This method is used to convert the serialized payload (in bytes) to the actual payload instance - */ - T getData(byte[] bytes); -} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/HoodieRecordConverter.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/HoodieRecordConverter.java deleted file mode 100644 index 1f5ad6b1f..000000000 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/HoodieRecordConverter.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.common.util.collection.converter; - -import com.uber.hoodie.common.model.HoodieKey; -import com.uber.hoodie.common.model.HoodieRecord; -import com.uber.hoodie.common.model.HoodieRecordLocation; -import com.uber.hoodie.common.model.HoodieRecordPayload; -import com.uber.hoodie.common.util.HoodieAvroUtils; -import com.uber.hoodie.common.util.ReflectionUtils; -import com.uber.hoodie.common.util.SerializationUtils; -import com.uber.hoodie.common.util.collection.Pair; -import com.uber.hoodie.common.util.collection.Triple; -import com.uber.hoodie.exception.HoodieSerializationException; -import java.io.IOException; -import java.util.Optional; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - -/** - * A default converter implementation for HoodieRecord - */ -public class HoodieRecordConverter implements - Converter> { - - // Schema used to get GenericRecord from HoodieRecordPayload then convert to bytes and vice-versa - private final Schema schema; - // The client implementation of HoodieRecordPayload used to re-create HoodieRecord from bytes - private final String payloadClazz; - - private static Logger log = LogManager.getLogger(HoodieRecordConverter.class); - - public HoodieRecordConverter(Schema schema, String payloadClazz) { - this.schema = schema; - this.payloadClazz = payloadClazz; - } - - @Override - public byte[] getBytes(HoodieRecord hoodieRecord) { - try { - // Need to initialize this to 0 bytes since deletes are handled by putting an empty record in HoodieRecord - byte[] val = new byte[0]; - if (hoodieRecord.getData().getInsertValue(schema).isPresent()) { - val = HoodieAvroUtils - .avroToBytes((GenericRecord) hoodieRecord.getData().getInsertValue(schema).get()); - } - byte [] currentLocation = hoodieRecord.getCurrentLocation() != null ? SerializationUtils.serialize(hoodieRecord - .getCurrentLocation()) : new byte[0]; - byte [] newLocation = hoodieRecord.getNewLocation().isPresent() ? SerializationUtils.serialize( - (HoodieRecordLocation) hoodieRecord.getNewLocation().get()) : new byte[0]; - - // Triple, Pair, data> - Triple, Pair, byte[]> data = - Triple.of(Pair.of(hoodieRecord.getKey().getRecordKey(), - hoodieRecord.getKey().getPartitionPath()), Pair.of(currentLocation, newLocation), val); - return SerializationUtils.serialize(data); - } catch (IOException io) { - throw new HoodieSerializationException("Cannot serialize value to bytes", io); - } - } - - @Override - public HoodieRecord getData(byte[] bytes) { - try { - Triple, Pair, byte[]> data = SerializationUtils.deserialize(bytes); - Optional payload = Optional.empty(); - HoodieRecordLocation currentLocation = null; - HoodieRecordLocation newLocation = null; - if (data.getRight().length > 0) { - // This can happen if the record is deleted, the payload is optional with 0 bytes - payload = Optional.of(HoodieAvroUtils.bytesToAvro(data.getRight(), schema)); - } - // Get the currentLocation for the HoodieRecord - if (data.getMiddle().getLeft().length > 0) { - currentLocation = SerializationUtils.deserialize(data.getMiddle().getLeft()); - } - // Get the newLocation for the HoodieRecord - if (data.getMiddle().getRight().length > 0) { - newLocation = SerializationUtils.deserialize(data.getMiddle().getRight()); - } - HoodieRecord hoodieRecord = new HoodieRecord<>( - new HoodieKey(data.getLeft().getKey(), data.getLeft().getValue()), - ReflectionUtils - .loadPayload(payloadClazz, - new Object[]{payload}, Optional.class)); - hoodieRecord.setCurrentLocation(currentLocation); - hoodieRecord.setNewLocation(newLocation); - return hoodieRecord; - } catch (IOException io) { - throw new HoodieSerializationException("Cannot de-serialize value from bytes", io); - } - } -} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/StringConverter.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/StringConverter.java deleted file mode 100644 index 7855484db..000000000 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/StringConverter.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.common.util.collection.converter; - -import java.nio.charset.StandardCharsets; - -/** - * A default converter implementation for String type of payload key - */ -public class StringConverter implements Converter { - - @Override - public byte[] getBytes(String s) { - return s.getBytes(StandardCharsets.UTF_8); - } - - @Override - public String getData(byte[] bytes) { - return new String(bytes); - } -} diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestDiskBasedMap.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestDiskBasedMap.java deleted file mode 100644 index 83d20bd2b..000000000 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestDiskBasedMap.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.common.util.collection; - -import static com.uber.hoodie.common.util.SchemaTestUtil.getSimpleSchema; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import com.uber.hoodie.common.model.AvroBinaryTestPayload; -import com.uber.hoodie.common.model.HoodieAvroPayload; -import com.uber.hoodie.common.model.HoodieKey; -import com.uber.hoodie.common.model.HoodieRecord; -import com.uber.hoodie.common.model.HoodieRecordPayload; -import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; -import com.uber.hoodie.common.util.HoodieAvroUtils; -import com.uber.hoodie.common.util.HoodieRecordSizeEstimator; -import com.uber.hoodie.common.util.SchemaTestUtil; -import com.uber.hoodie.common.util.SpillableMapTestUtils; -import com.uber.hoodie.common.util.SpillableMapUtils; -import com.uber.hoodie.common.util.collection.converter.HoodieRecordConverter; -import com.uber.hoodie.common.util.collection.converter.StringConverter; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.junit.Ignore; -import org.junit.Test; - -public class TestDiskBasedMap { - - private static final String BASE_OUTPUT_PATH = "/tmp/"; - - @Test - public void testSimpleInsert() throws IOException, URISyntaxException { - Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); - String payloadClazz = HoodieAvroPayload.class.getName(); - - DiskBasedMap records = new DiskBasedMap<>(BASE_OUTPUT_PATH, - new StringConverter(), new HoodieRecordConverter(schema, payloadClazz)); - List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); - ((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); - List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); - - // make sure records have spilled to disk - assertTrue(records.sizeOfFileOnDiskInBytes() > 0); - Iterator> itr = records.iterator(); - List oRecords = new ArrayList<>(); - while (itr.hasNext()) { - HoodieRecord rec = itr.next(); - oRecords.add(rec); - assert recordKeys.contains(rec.getRecordKey()); - } - } - - @Test - public void testSimpleInsertWithoutHoodieMetadata() throws IOException, URISyntaxException { - Schema schema = getSimpleSchema(); - String payloadClazz = HoodieAvroPayload.class.getName(); - - DiskBasedMap records = new DiskBasedMap<>(BASE_OUTPUT_PATH, - new StringConverter(), new HoodieRecordConverter(schema, payloadClazz)); - List hoodieRecords = SchemaTestUtil - .generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000); - Set recordKeys = new HashSet<>(); - // insert generated records into the map - hoodieRecords.stream().forEach(r -> { - records.put(r.getRecordKey(), r); - recordKeys.add(r.getRecordKey()); - }); - // make sure records have spilled to disk - assertTrue(records.sizeOfFileOnDiskInBytes() > 0); - Iterator> itr = records.iterator(); - List oRecords = new ArrayList<>(); - while (itr.hasNext()) { - HoodieRecord rec = itr.next(); - oRecords.add(rec); - assert recordKeys.contains(rec.getRecordKey()); - } - } - - @Test - public void testSimpleUpsert() throws IOException, URISyntaxException { - - Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); - String payloadClazz = HoodieAvroPayload.class.getName(); - - DiskBasedMap records = new DiskBasedMap<>(BASE_OUTPUT_PATH, - new StringConverter(), new HoodieRecordConverter(schema, payloadClazz)); - List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); - - // perform some inserts - List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); - - long fileSize = records.sizeOfFileOnDiskInBytes(); - // make sure records have spilled to disk - assertTrue(fileSize > 0); - - // generate updates from inserts - List updatedRecords = - SchemaTestUtil - .updateHoodieTestRecords(recordKeys, SchemaTestUtil.generateHoodieTestRecords(0, 100), - HoodieActiveTimeline.createNewCommitTime()); - String newCommitTime = ((GenericRecord) updatedRecords.get(0)) - .get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); - - // perform upserts - recordKeys = SpillableMapTestUtils.upsertRecords(updatedRecords, records); - - // upserts should be appended to the existing file, hence increasing the sizeOfFile on disk - assertTrue(records.sizeOfFileOnDiskInBytes() > fileSize); - - // Upserted records (on disk) should have the latest commit time - Iterator> itr = records.iterator(); - while (itr.hasNext()) { - HoodieRecord rec = itr.next(); - assert recordKeys.contains(rec.getRecordKey()); - try { - IndexedRecord indexedRecord = (IndexedRecord) rec.getData().getInsertValue(schema).get(); - String latestCommitTime = ((GenericRecord) indexedRecord) - .get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); - assertEquals(latestCommitTime, newCommitTime); - } catch (IOException io) { - throw new UncheckedIOException(io); - } - } - } - - @Test - public void testSizeEstimator() throws IOException, URISyntaxException { - Schema schema = SchemaTestUtil.getSimpleSchema(); - - // Test sizeEstimator without hoodie metadata fields - List hoodieRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1, schema); - - long payloadSize = SpillableMapUtils.computePayloadSize(hoodieRecords.remove(0), - new HoodieRecordSizeEstimator(schema)); - assertTrue(payloadSize > 0); - - // Test sizeEstimator with hoodie metadata fields - schema = HoodieAvroUtils.addMetadataFields(schema); - hoodieRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1, schema); - payloadSize = SpillableMapUtils.computePayloadSize(hoodieRecords.remove(0), - new HoodieRecordSizeEstimator(schema)); - assertTrue(payloadSize > 0); - - // Following tests payloads without an Avro Schema in the Record - - // Test sizeEstimator without hoodie metadata fields and without schema object in the payload - schema = SchemaTestUtil.getSimpleSchema(); - List indexedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1); - hoodieRecords = indexedRecords.stream() - .map(r -> new HoodieRecord(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"), - new AvroBinaryTestPayload(Optional.of((GenericRecord) r)))).collect(Collectors.toList()); - payloadSize = SpillableMapUtils.computePayloadSize(hoodieRecords.remove(0), - new HoodieRecordSizeEstimator(schema)); - assertTrue(payloadSize > 0); - - // Test sizeEstimator with hoodie metadata fields and without schema object in the payload - final Schema simpleSchemaWithMetadata = HoodieAvroUtils - .addMetadataFields(SchemaTestUtil.getSimpleSchema()); - indexedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1); - hoodieRecords = indexedRecords.stream() - .map(r -> new HoodieRecord(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"), - new AvroBinaryTestPayload(Optional - .of(HoodieAvroUtils.rewriteRecord((GenericRecord) r, simpleSchemaWithMetadata))))) - .collect(Collectors.toList()); - payloadSize = SpillableMapUtils.computePayloadSize(hoodieRecords.remove(0), - new HoodieRecordSizeEstimator(schema)); - assertTrue(payloadSize > 0); - } - - /** - * @na: Leaving this test here for a quick performance test - */ - @Ignore - @Test - public void testSizeEstimatorPerformance() throws IOException, URISyntaxException { - // Test sizeEstimatorPerformance with simpleSchema - Schema schema = SchemaTestUtil.getSimpleSchema(); - List hoodieRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1, schema); - HoodieRecordSizeEstimator sizeEstimator = - new HoodieRecordSizeEstimator(schema); - HoodieRecord record = hoodieRecords.remove(0); - long startTime = System.currentTimeMillis(); - SpillableMapUtils.computePayloadSize(record, sizeEstimator); - long timeTaken = System.currentTimeMillis() - startTime; - System.out.println("Time taken :" + timeTaken); - assertTrue(timeTaken < 100); - } -} diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java index c85e869dd..240684c1a 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java @@ -30,8 +30,6 @@ import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.common.util.HoodieRecordSizeEstimator; import com.uber.hoodie.common.util.SchemaTestUtil; import com.uber.hoodie.common.util.SpillableMapTestUtils; -import com.uber.hoodie.common.util.collection.converter.HoodieRecordConverter; -import com.uber.hoodie.common.util.collection.converter.StringConverter; import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; @@ -67,8 +65,7 @@ public class TestExternalSpillableMap { Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); String payloadClazz = HoodieAvroPayload.class.getName(); ExternalSpillableMap> records = - new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH, new StringConverter(), - new HoodieRecordConverter(schema, payloadClazz), + new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); @@ -90,8 +87,7 @@ public class TestExternalSpillableMap { String payloadClazz = HoodieAvroPayload.class.getName(); ExternalSpillableMap> records = - new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH, new StringConverter(), - new HoodieRecordConverter(schema, payloadClazz), + new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); @@ -129,8 +125,7 @@ public class TestExternalSpillableMap { String payloadClazz = HoodieAvroPayload.class.getName(); ExternalSpillableMap> records = - new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH, new StringConverter(), - new HoodieRecordConverter(schema, payloadClazz), + new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); @@ -186,8 +181,7 @@ public class TestExternalSpillableMap { String payloadClazz = HoodieAvroPayload.class.getName(); ExternalSpillableMap> records = - new ExternalSpillableMap<>(16L, FAILURE_OUTPUT_PATH, new StringConverter(), - new HoodieRecordConverter(schema, payloadClazz), + new ExternalSpillableMap<>(16L, FAILURE_OUTPUT_PATH, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); @@ -206,8 +200,7 @@ public class TestExternalSpillableMap { String payloadClazz = HoodieAvroPayload.class.getName(); ExternalSpillableMap> records = - new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH, new StringConverter(), - new HoodieRecordConverter(schema, payloadClazz), + new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B List recordKeys = new ArrayList<>(); @@ -260,8 +253,7 @@ public class TestExternalSpillableMap { String payloadClazz = HoodieAvroPayload.class.getName(); ExternalSpillableMap> records = - new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH, new StringConverter(), - new HoodieRecordConverter(schema, payloadClazz), + new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B List recordKeys = new ArrayList<>(); diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/BaseAvroPayload.java b/hoodie-spark/src/main/java/com/uber/hoodie/BaseAvroPayload.java index 191b2d236..c2a05c9a6 100644 --- a/hoodie-spark/src/main/java/com/uber/hoodie/BaseAvroPayload.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/BaseAvroPayload.java @@ -18,7 +18,10 @@ package com.uber.hoodie; +import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.exception.HoodieIOException; +import java.io.IOException; import java.io.Serializable; import org.apache.avro.generic.GenericRecord; @@ -29,9 +32,14 @@ public abstract class BaseAvroPayload implements Serializable { /** - * Avro data extracted from the source + * Avro data extracted from the source converted to bytes */ - protected final GenericRecord record; + protected final byte [] recordBytes; + + /** + * The schema of the Avro data + */ + protected final String schemaStr; /** * For purposes of preCombining @@ -43,7 +51,12 @@ public abstract class BaseAvroPayload implements Serializable { * @param orderingVal */ public BaseAvroPayload(GenericRecord record, Comparable orderingVal) { - this.record = record; + try { + this.recordBytes = HoodieAvroUtils.avroToBytes(record); + this.schemaStr = record.getSchema().toString(); + } catch (IOException io) { + throw new HoodieIOException("Cannot convert GenericRecord to bytes", io); + } this.orderingVal = orderingVal; if (orderingVal == null) { throw new HoodieException("Ordering value is null for record: " + record); diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java b/hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java index 48396f08c..0b454f1ef 100644 --- a/hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java @@ -66,6 +66,7 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload implements @Override public Optional getInsertValue(Schema schema) throws IOException { - return Optional.of(HoodieAvroUtils.rewriteRecord(record, schema)); + return Optional.of(HoodieAvroUtils.rewriteRecord(HoodieAvroUtils.bytesToAvro(recordBytes, Schema.parse(schemaStr)), + schema)); } } diff --git a/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala b/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala index 734cc7f53..e136545da 100644 --- a/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala +++ b/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala @@ -107,11 +107,11 @@ class DataSourceDefaultsTest extends AssertionsForJUnit { // it will provide the record with greatest combine value val combinedPayload12 = overWritePayload1.preCombine(overWritePayload2) val combinedGR12 = combinedPayload12.getInsertValue(schema).get().asInstanceOf[GenericRecord] - assertEquals("field2", combinedGR12.get("field1")) + assertEquals("field2", combinedGR12.get("field1").toString) // and it will be deterministic, to order of processing. val combinedPayload21 = overWritePayload2.preCombine(overWritePayload1) val combinedGR21 = combinedPayload21.getInsertValue(schema).get().asInstanceOf[GenericRecord] - assertEquals("field2", combinedGR21.get("field1")) + assertEquals("field2", combinedGR21.get("field1").toString) } }