1
0

Serializing the complete payload object instead of serializing just the GenericRecord

Removing Converter hierarchy as we now depend purely on JavaSerialization and require the payload to be java serializable
This commit is contained in:
Nishith Agarwal
2018-11-04 16:03:56 -08:00
committed by vinoth chandar
parent e83dde3b95
commit 7243ce40c9
14 changed files with 53 additions and 458 deletions

View File

@@ -30,8 +30,6 @@ import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieRecordSizeEstimator;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.common.util.collection.ExternalSpillableMap;
import com.uber.hoodie.common.util.collection.converter.HoodieRecordConverter;
import com.uber.hoodie.common.util.collection.converter.StringConverter;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieUpsertException;
@@ -150,9 +148,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
// Load the new records in a map
logger.info("MaxMemoryPerPartitionMerge => " + config.getMaxMemoryPerPartitionMerge());
this.keyToNewRecords = new ExternalSpillableMap<>(config.getMaxMemoryPerPartitionMerge(),
config.getSpillableMapBasePath(), new StringConverter(),
new HoodieRecordConverter(schema, config.getPayloadClass()),
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema));
config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema));
} catch (IOException io) {
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
}

View File

@@ -17,6 +17,7 @@
package com.uber.hoodie.common.model;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.util.Optional;
import org.apache.avro.Schema;
@@ -29,10 +30,16 @@ import org.apache.avro.generic.IndexedRecord;
*/
public class HoodieAvroPayload implements HoodieRecordPayload<HoodieAvroPayload> {
private final Optional<GenericRecord> record;
// Store the GenericRecord converted to bytes - 1) Doesn't store schema hence memory efficient 2) Makes the payload
// java serializable
private final byte [] recordBytes;
public HoodieAvroPayload(Optional<GenericRecord> record) {
this.record = record;
try {
this.recordBytes = HoodieAvroUtils.avroToBytes(record.get());
} catch (IOException io) {
throw new HoodieIOException("Cannot convert record to bytes", io);
}
}
@Override
@@ -48,6 +55,7 @@ public class HoodieAvroPayload implements HoodieRecordPayload<HoodieAvroPayload>
@Override
public Optional<IndexedRecord> getInsertValue(Schema schema) throws IOException {
Optional<GenericRecord> record = Optional.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema));
return record.map(r -> HoodieAvroUtils.rewriteRecord(r, schema));
}
}

View File

@@ -23,8 +23,6 @@ import com.uber.hoodie.common.util.DefaultSizeEstimator;
import com.uber.hoodie.common.util.HoodieRecordSizeEstimator;
import com.uber.hoodie.common.util.HoodieTimer;
import com.uber.hoodie.common.util.collection.ExternalSpillableMap;
import com.uber.hoodie.common.util.collection.converter.HoodieRecordConverter;
import com.uber.hoodie.common.util.collection.converter.StringConverter;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.util.Iterator;
@@ -71,7 +69,6 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
try {
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath,
new StringConverter(), new HoodieRecordConverter(readerSchema, getPayloadClassFQN()),
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(readerSchema));
// Do the scan and merge
timer.startTimer();

View File

@@ -16,8 +16,8 @@
package com.uber.hoodie.common.util.collection;
import com.uber.hoodie.common.util.SerializationUtils;
import com.uber.hoodie.common.util.SpillableMapUtils;
import com.uber.hoodie.common.util.collection.converter.Converter;
import com.uber.hoodie.common.util.collection.io.storage.SizeAwareDataOutputStream;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
@@ -26,6 +26,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.net.InetAddress;
import java.util.AbstractMap;
import java.util.Collection;
@@ -45,15 +46,11 @@ import org.apache.log4j.Logger;
* without any rollover support. It uses the following : 1) An in-memory map that tracks the key-> latest ValueMetadata.
* 2) Current position in the file NOTE : Only String.class type supported for Key
*/
public final class DiskBasedMap<T, R> implements Map<T, R> {
public final class DiskBasedMap<T extends Serializable, R extends Serializable> implements Map<T, R> {
private static final Logger log = LogManager.getLogger(DiskBasedMap.class);
// Stores the key and corresponding value's latest metadata spilled to disk
private final Map<T, ValueMetadata> valueMetadataMap;
// Key converter to convert key type to bytes
private final Converter<T> keyConverter;
// Value converter to convert value type to bytes
private final Converter<R> valueConverter;
// Read only file access to be able to seek to random positions to readFromDisk values
private RandomAccessFile readOnlyFileHandle;
// Write only OutputStream to be able to ONLY append to the file
@@ -67,8 +64,7 @@ public final class DiskBasedMap<T, R> implements Map<T, R> {
private String filePath;
protected DiskBasedMap(String baseFilePath,
Converter<T> keyConverter, Converter<R> valueConverter) throws IOException {
protected DiskBasedMap(String baseFilePath) throws IOException {
this.valueMetadataMap = new HashMap<>();
File writeOnlyFileHandle = new File(baseFilePath, UUID.randomUUID().toString());
this.filePath = writeOnlyFileHandle.getPath();
@@ -76,8 +72,6 @@ public final class DiskBasedMap<T, R> implements Map<T, R> {
this.fileOutputStream = new FileOutputStream(writeOnlyFileHandle, true);
this.writeOnlyFileHandle = new SizeAwareDataOutputStream(fileOutputStream);
this.filePosition = new AtomicLong(0L);
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
}
private void initFile(File writeOnlyFileHandle) throws IOException {
@@ -125,7 +119,7 @@ public final class DiskBasedMap<T, R> implements Map<T, R> {
*/
public Iterator<R> iterator() {
return new LazyFileIterable(readOnlyFileHandle,
valueMetadataMap, valueConverter).iterator();
valueMetadataMap).iterator();
}
/**
@@ -162,7 +156,7 @@ public final class DiskBasedMap<T, R> implements Map<T, R> {
return null;
}
try {
return this.valueConverter.getData(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle,
return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle,
entry.getOffsetOfValue(), entry.getSizeOfValue()));
} catch (IOException e) {
throw new HoodieIOException("Unable to readFromDisk Hoodie Record from disk", e);
@@ -172,12 +166,12 @@ public final class DiskBasedMap<T, R> implements Map<T, R> {
@Override
public R put(T key, R value) {
try {
byte[] val = this.valueConverter.getBytes(value);
byte[] val = SerializationUtils.serialize(value);
Integer valueSize = val.length;
Long timestamp = new Date().getTime();
this.valueMetadataMap.put(key,
new DiskBasedMap.ValueMetadata(this.filePath, valueSize, filePosition.get(), timestamp));
byte[] serializedKey = keyConverter.getBytes(key);
byte[] serializedKey = SerializationUtils.serialize(key);
filePosition.set(SpillableMapUtils.spillToDisk(writeOnlyFileHandle,
new FileEntry(SpillableMapUtils.generateChecksum(val),
serializedKey.length, valueSize, serializedKey, val, timestamp)));

View File

@@ -18,9 +18,9 @@ package com.uber.hoodie.common.util.collection;
import com.twitter.common.objectsize.ObjectSizeCalculator;
import com.uber.hoodie.common.util.SizeEstimator;
import com.uber.hoodie.common.util.collection.converter.Converter;
import com.uber.hoodie.exception.HoodieNotSupportedException;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -39,7 +39,7 @@ import org.apache.log4j.Logger;
* trade-off: If the spill threshold is too high, the in-memory map may occupy more memory than is available, resulting
* in OOM. However, if the spill threshold is too low, we spill frequently and incur unnecessary disk writes.
*/
public class ExternalSpillableMap<T, R> implements Map<T, R> {
public class ExternalSpillableMap<T extends Serializable, R extends Serializable> implements Map<T, R> {
// Find the actual estimated payload size after inserting N records
private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE = 100;
@@ -53,10 +53,6 @@ public class ExternalSpillableMap<T, R> implements Map<T, R> {
// TODO(na) : a dynamic sizing factor to ensure we have space for other objects in memory and
// incorrect payload estimation
private final Double sizingFactorForInMemoryMap = 0.8;
// Key converter to convert key type to bytes
private final Converter<T> keyConverter;
// Value converter to convert value type to bytes
private final Converter<R> valueConverter;
// Size Estimator for key type
private final SizeEstimator<T> keySizeEstimator;
// Size Estimator for key types
@@ -69,15 +65,12 @@ public class ExternalSpillableMap<T, R> implements Map<T, R> {
private boolean shouldEstimatePayloadSize = true;
public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath,
Converter<T> keyConverter, Converter<R> valueConverter,
SizeEstimator<T> keySizeEstimator, SizeEstimator<R> valueSizeEstimator) throws IOException {
this.inMemoryMap = new HashMap<>();
this.diskBasedMap = new DiskBasedMap<>(baseFilePath, keyConverter, valueConverter);
this.diskBasedMap = new DiskBasedMap<>(baseFilePath);
this.maxInMemorySizeInBytes = (long) Math
.floor(maxInMemorySizeInBytes * sizingFactorForInMemoryMap);
this.currentInMemoryMapSize = 0L;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
this.keySizeEstimator = keySizeEstimator;
this.valueSizeEstimator = valueSizeEstimator;
}

View File

@@ -16,8 +16,8 @@
package com.uber.hoodie.common.util.collection;
import com.uber.hoodie.common.util.SerializationUtils;
import com.uber.hoodie.common.util.SpillableMapUtils;
import com.uber.hoodie.common.util.collection.converter.Converter;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
@@ -37,20 +37,16 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
private final RandomAccessFile readOnlyFileHandle;
// Stores the key and corresponding value's latest metadata spilled to disk
private final Map<T, DiskBasedMap.ValueMetadata> inMemoryMetadataOfSpilledData;
private final Converter<R> valueConverter;
public LazyFileIterable(RandomAccessFile file, Map<T, DiskBasedMap.ValueMetadata> map,
Converter<R> valueConverter) {
public LazyFileIterable(RandomAccessFile file, Map<T, DiskBasedMap.ValueMetadata> map) {
this.readOnlyFileHandle = file;
this.inMemoryMetadataOfSpilledData = map;
this.valueConverter = valueConverter;
}
@Override
public Iterator<R> iterator() {
try {
return new LazyFileIterator<>(readOnlyFileHandle, inMemoryMetadataOfSpilledData,
valueConverter);
return new LazyFileIterator<>(readOnlyFileHandle, inMemoryMetadataOfSpilledData);
} catch (IOException io) {
throw new HoodieException("Unable to initialize iterator for file on disk", io);
}
@@ -61,14 +57,11 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
*/
public class LazyFileIterator<T, R> implements Iterator<R> {
private final Converter<R> valueConverter;
private RandomAccessFile readOnlyFileHandle;
private Iterator<Map.Entry<T, DiskBasedMap.ValueMetadata>> metadataIterator;
public LazyFileIterator(RandomAccessFile file, Map<T, DiskBasedMap.ValueMetadata> map,
Converter<R> valueConverter) throws IOException {
public LazyFileIterator(RandomAccessFile file, Map<T, DiskBasedMap.ValueMetadata> map) throws IOException {
this.readOnlyFileHandle = file;
this.valueConverter = valueConverter;
// sort the map in increasing order of offset of value so disk seek is only in one(forward) direction
this.metadataIterator = map
.entrySet()
@@ -88,7 +81,7 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
public R next() {
Map.Entry<T, DiskBasedMap.ValueMetadata> entry = this.metadataIterator.next();
try {
return valueConverter.getData(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle,
return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle,
entry.getValue().getOffsetOfValue(), entry.getValue().getSizeOfValue()));
} catch (IOException e) {
throw new HoodieIOException("Unable to read hoodie record from value spilled to disk", e);

View File

@@ -1,34 +0,0 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.util.collection.converter;
/**
* A converter interface to getBytes or deserialize a payload. This is used in {@link
* com.uber.hoodie.common.util.collection.ExternalSpillableMap} to spillToDisk
*/
public interface Converter<T> {
/**
* This method is used to convert a payload to bytes
*/
byte[] getBytes(T t);
/**
* This method is used to convert the serialized payload (in bytes) to the actual payload instance
*/
T getData(byte[] bytes);
}

View File

@@ -1,109 +0,0 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.util.collection.converter;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.common.util.SerializationUtils;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.common.util.collection.Triple;
import com.uber.hoodie.exception.HoodieSerializationException;
import java.io.IOException;
import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/**
* A default converter implementation for HoodieRecord
*/
public class HoodieRecordConverter<V> implements
Converter<HoodieRecord<? extends HoodieRecordPayload>> {
// Schema used to get GenericRecord from HoodieRecordPayload then convert to bytes and vice-versa
private final Schema schema;
// The client implementation of HoodieRecordPayload used to re-create HoodieRecord from bytes
private final String payloadClazz;
private static Logger log = LogManager.getLogger(HoodieRecordConverter.class);
public HoodieRecordConverter(Schema schema, String payloadClazz) {
this.schema = schema;
this.payloadClazz = payloadClazz;
}
@Override
public byte[] getBytes(HoodieRecord hoodieRecord) {
try {
// Need to initialize this to 0 bytes since deletes are handled by putting an empty record in HoodieRecord
byte[] val = new byte[0];
if (hoodieRecord.getData().getInsertValue(schema).isPresent()) {
val = HoodieAvroUtils
.avroToBytes((GenericRecord) hoodieRecord.getData().getInsertValue(schema).get());
}
byte [] currentLocation = hoodieRecord.getCurrentLocation() != null ? SerializationUtils.serialize(hoodieRecord
.getCurrentLocation()) : new byte[0];
byte [] newLocation = hoodieRecord.getNewLocation().isPresent() ? SerializationUtils.serialize(
(HoodieRecordLocation) hoodieRecord.getNewLocation().get()) : new byte[0];
// Triple<Pair<RecordKey, PartitionPath>, Pair<oldLocation, newLocation>, data>
Triple<Pair<String, String>, Pair<byte [], byte []>, byte[]> data =
Triple.of(Pair.of(hoodieRecord.getKey().getRecordKey(),
hoodieRecord.getKey().getPartitionPath()), Pair.of(currentLocation, newLocation), val);
return SerializationUtils.serialize(data);
} catch (IOException io) {
throw new HoodieSerializationException("Cannot serialize value to bytes", io);
}
}
@Override
public HoodieRecord getData(byte[] bytes) {
try {
Triple<Pair<String, String>, Pair<byte [], byte []>, byte[]> data = SerializationUtils.deserialize(bytes);
Optional<GenericRecord> payload = Optional.empty();
HoodieRecordLocation currentLocation = null;
HoodieRecordLocation newLocation = null;
if (data.getRight().length > 0) {
// This can happen if the record is deleted, the payload is optional with 0 bytes
payload = Optional.of(HoodieAvroUtils.bytesToAvro(data.getRight(), schema));
}
// Get the currentLocation for the HoodieRecord
if (data.getMiddle().getLeft().length > 0) {
currentLocation = SerializationUtils.deserialize(data.getMiddle().getLeft());
}
// Get the newLocation for the HoodieRecord
if (data.getMiddle().getRight().length > 0) {
newLocation = SerializationUtils.deserialize(data.getMiddle().getRight());
}
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new HoodieRecord<>(
new HoodieKey(data.getLeft().getKey(), data.getLeft().getValue()),
ReflectionUtils
.loadPayload(payloadClazz,
new Object[]{payload}, Optional.class));
hoodieRecord.setCurrentLocation(currentLocation);
hoodieRecord.setNewLocation(newLocation);
return hoodieRecord;
} catch (IOException io) {
throw new HoodieSerializationException("Cannot de-serialize value from bytes", io);
}
}
}

View File

@@ -1,35 +0,0 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.util.collection.converter;
import java.nio.charset.StandardCharsets;
/**
* A default converter implementation for String type of payload key
*/
public class StringConverter implements Converter<String> {
@Override
public byte[] getBytes(String s) {
return s.getBytes(StandardCharsets.UTF_8);
}
@Override
public String getData(byte[] bytes) {
return new String(bytes);
}
}

View File

@@ -1,214 +0,0 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.util.collection;
import static com.uber.hoodie.common.util.SchemaTestUtil.getSimpleSchema;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import com.uber.hoodie.common.model.AvroBinaryTestPayload;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.HoodieRecordSizeEstimator;
import com.uber.hoodie.common.util.SchemaTestUtil;
import com.uber.hoodie.common.util.SpillableMapTestUtils;
import com.uber.hoodie.common.util.SpillableMapUtils;
import com.uber.hoodie.common.util.collection.converter.HoodieRecordConverter;
import com.uber.hoodie.common.util.collection.converter.StringConverter;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.junit.Ignore;
import org.junit.Test;
public class TestDiskBasedMap {
private static final String BASE_OUTPUT_PATH = "/tmp/";
@Test
public void testSimpleInsert() throws IOException, URISyntaxException {
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
String payloadClazz = HoodieAvroPayload.class.getName();
DiskBasedMap records = new DiskBasedMap<>(BASE_OUTPUT_PATH,
new StringConverter(), new HoodieRecordConverter(schema, payloadClazz));
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
// make sure records have spilled to disk
assertTrue(records.sizeOfFileOnDiskInBytes() > 0);
Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = records.iterator();
List<HoodieRecord> oRecords = new ArrayList<>();
while (itr.hasNext()) {
HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
oRecords.add(rec);
assert recordKeys.contains(rec.getRecordKey());
}
}
@Test
public void testSimpleInsertWithoutHoodieMetadata() throws IOException, URISyntaxException {
Schema schema = getSimpleSchema();
String payloadClazz = HoodieAvroPayload.class.getName();
DiskBasedMap records = new DiskBasedMap<>(BASE_OUTPUT_PATH,
new StringConverter(), new HoodieRecordConverter(schema, payloadClazz));
List<HoodieRecord> hoodieRecords = SchemaTestUtil
.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);
Set<String> recordKeys = new HashSet<>();
// insert generated records into the map
hoodieRecords.stream().forEach(r -> {
records.put(r.getRecordKey(), r);
recordKeys.add(r.getRecordKey());
});
// make sure records have spilled to disk
assertTrue(records.sizeOfFileOnDiskInBytes() > 0);
Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = records.iterator();
List<HoodieRecord> oRecords = new ArrayList<>();
while (itr.hasNext()) {
HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
oRecords.add(rec);
assert recordKeys.contains(rec.getRecordKey());
}
}
@Test
public void testSimpleUpsert() throws IOException, URISyntaxException {
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
String payloadClazz = HoodieAvroPayload.class.getName();
DiskBasedMap records = new DiskBasedMap<>(BASE_OUTPUT_PATH,
new StringConverter(), new HoodieRecordConverter(schema, payloadClazz));
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
// perform some inserts
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
long fileSize = records.sizeOfFileOnDiskInBytes();
// make sure records have spilled to disk
assertTrue(fileSize > 0);
// generate updates from inserts
List<IndexedRecord> updatedRecords =
SchemaTestUtil
.updateHoodieTestRecords(recordKeys, SchemaTestUtil.generateHoodieTestRecords(0, 100),
HoodieActiveTimeline.createNewCommitTime());
String newCommitTime = ((GenericRecord) updatedRecords.get(0))
.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
// perform upserts
recordKeys = SpillableMapTestUtils.upsertRecords(updatedRecords, records);
// upserts should be appended to the existing file, hence increasing the sizeOfFile on disk
assertTrue(records.sizeOfFileOnDiskInBytes() > fileSize);
// Upserted records (on disk) should have the latest commit time
Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = records.iterator();
while (itr.hasNext()) {
HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
assert recordKeys.contains(rec.getRecordKey());
try {
IndexedRecord indexedRecord = (IndexedRecord) rec.getData().getInsertValue(schema).get();
String latestCommitTime = ((GenericRecord) indexedRecord)
.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
assertEquals(latestCommitTime, newCommitTime);
} catch (IOException io) {
throw new UncheckedIOException(io);
}
}
}
@Test
public void testSizeEstimator() throws IOException, URISyntaxException {
Schema schema = SchemaTestUtil.getSimpleSchema();
// Test sizeEstimator without hoodie metadata fields
List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1, schema);
long payloadSize = SpillableMapUtils.computePayloadSize(hoodieRecords.remove(0),
new HoodieRecordSizeEstimator(schema));
assertTrue(payloadSize > 0);
// Test sizeEstimator with hoodie metadata fields
schema = HoodieAvroUtils.addMetadataFields(schema);
hoodieRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1, schema);
payloadSize = SpillableMapUtils.computePayloadSize(hoodieRecords.remove(0),
new HoodieRecordSizeEstimator(schema));
assertTrue(payloadSize > 0);
// Following tests payloads without an Avro Schema in the Record
// Test sizeEstimator without hoodie metadata fields and without schema object in the payload
schema = SchemaTestUtil.getSimpleSchema();
List<IndexedRecord> indexedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1);
hoodieRecords = indexedRecords.stream()
.map(r -> new HoodieRecord(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
new AvroBinaryTestPayload(Optional.of((GenericRecord) r)))).collect(Collectors.toList());
payloadSize = SpillableMapUtils.computePayloadSize(hoodieRecords.remove(0),
new HoodieRecordSizeEstimator(schema));
assertTrue(payloadSize > 0);
// Test sizeEstimator with hoodie metadata fields and without schema object in the payload
final Schema simpleSchemaWithMetadata = HoodieAvroUtils
.addMetadataFields(SchemaTestUtil.getSimpleSchema());
indexedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1);
hoodieRecords = indexedRecords.stream()
.map(r -> new HoodieRecord(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
new AvroBinaryTestPayload(Optional
.of(HoodieAvroUtils.rewriteRecord((GenericRecord) r, simpleSchemaWithMetadata)))))
.collect(Collectors.toList());
payloadSize = SpillableMapUtils.computePayloadSize(hoodieRecords.remove(0),
new HoodieRecordSizeEstimator(schema));
assertTrue(payloadSize > 0);
}
/**
* @na: Leaving this test here for a quick performance test
*/
@Ignore
@Test
public void testSizeEstimatorPerformance() throws IOException, URISyntaxException {
// Test sizeEstimatorPerformance with simpleSchema
Schema schema = SchemaTestUtil.getSimpleSchema();
List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1, schema);
HoodieRecordSizeEstimator sizeEstimator =
new HoodieRecordSizeEstimator(schema);
HoodieRecord record = hoodieRecords.remove(0);
long startTime = System.currentTimeMillis();
SpillableMapUtils.computePayloadSize(record, sizeEstimator);
long timeTaken = System.currentTimeMillis() - startTime;
System.out.println("Time taken :" + timeTaken);
assertTrue(timeTaken < 100);
}
}

View File

@@ -30,8 +30,6 @@ import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.HoodieRecordSizeEstimator;
import com.uber.hoodie.common.util.SchemaTestUtil;
import com.uber.hoodie.common.util.SpillableMapTestUtils;
import com.uber.hoodie.common.util.collection.converter.HoodieRecordConverter;
import com.uber.hoodie.common.util.collection.converter.StringConverter;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -67,8 +65,7 @@ public class TestExternalSpillableMap {
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
String payloadClazz = HoodieAvroPayload.class.getName();
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH, new StringConverter(),
new HoodieRecordConverter(schema, payloadClazz),
new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH,
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
@@ -90,8 +87,7 @@ public class TestExternalSpillableMap {
String payloadClazz = HoodieAvroPayload.class.getName();
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH, new StringConverter(),
new HoodieRecordConverter(schema, payloadClazz),
new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH,
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
@@ -129,8 +125,7 @@ public class TestExternalSpillableMap {
String payloadClazz = HoodieAvroPayload.class.getName();
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH, new StringConverter(),
new HoodieRecordConverter(schema, payloadClazz),
new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH,
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
@@ -186,8 +181,7 @@ public class TestExternalSpillableMap {
String payloadClazz = HoodieAvroPayload.class.getName();
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
new ExternalSpillableMap<>(16L, FAILURE_OUTPUT_PATH, new StringConverter(),
new HoodieRecordConverter(schema, payloadClazz),
new ExternalSpillableMap<>(16L, FAILURE_OUTPUT_PATH,
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
@@ -206,8 +200,7 @@ public class TestExternalSpillableMap {
String payloadClazz = HoodieAvroPayload.class.getName();
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH, new StringConverter(),
new HoodieRecordConverter(schema, payloadClazz),
new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH,
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B
List<String> recordKeys = new ArrayList<>();
@@ -260,8 +253,7 @@ public class TestExternalSpillableMap {
String payloadClazz = HoodieAvroPayload.class.getName();
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH, new StringConverter(),
new HoodieRecordConverter(schema, payloadClazz),
new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH,
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B
List<String> recordKeys = new ArrayList<>();

View File

@@ -18,7 +18,10 @@
package com.uber.hoodie;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.io.Serializable;
import org.apache.avro.generic.GenericRecord;
@@ -29,9 +32,14 @@ public abstract class BaseAvroPayload implements Serializable {
/**
* Avro data extracted from the source
* Avro data extracted from the source converted to bytes
*/
protected final GenericRecord record;
protected final byte [] recordBytes;
/**
* The schema of the Avro data
*/
protected final String schemaStr;
/**
* For purposes of preCombining
@@ -43,7 +51,12 @@ public abstract class BaseAvroPayload implements Serializable {
* @param orderingVal
*/
public BaseAvroPayload(GenericRecord record, Comparable orderingVal) {
this.record = record;
try {
this.recordBytes = HoodieAvroUtils.avroToBytes(record);
this.schemaStr = record.getSchema().toString();
} catch (IOException io) {
throw new HoodieIOException("Cannot convert GenericRecord to bytes", io);
}
this.orderingVal = orderingVal;
if (orderingVal == null) {
throw new HoodieException("Ordering value is null for record: " + record);

View File

@@ -66,6 +66,7 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload implements
@Override
public Optional<IndexedRecord> getInsertValue(Schema schema) throws IOException {
return Optional.of(HoodieAvroUtils.rewriteRecord(record, schema));
return Optional.of(HoodieAvroUtils.rewriteRecord(HoodieAvroUtils.bytesToAvro(recordBytes, Schema.parse(schemaStr)),
schema));
}
}

View File

@@ -107,11 +107,11 @@ class DataSourceDefaultsTest extends AssertionsForJUnit {
// it will provide the record with greatest combine value
val combinedPayload12 = overWritePayload1.preCombine(overWritePayload2)
val combinedGR12 = combinedPayload12.getInsertValue(schema).get().asInstanceOf[GenericRecord]
assertEquals("field2", combinedGR12.get("field1"))
assertEquals("field2", combinedGR12.get("field1").toString)
// and it will be deterministic, to order of processing.
val combinedPayload21 = overWritePayload2.preCombine(overWritePayload1)
val combinedGR21 = combinedPayload21.getInsertValue(schema).get().asInstanceOf[GenericRecord]
assertEquals("field2", combinedGR21.get("field1"))
assertEquals("field2", combinedGR21.get("field1").toString)
}
}