From 44caf0d40ca6b47fbf86701eff48fa9e8715b308 Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Thu, 12 Jul 2018 17:45:10 -0700 Subject: [PATCH] Fixing missing hoodie record location in HoodieRecord when record is read from disk after being spilled --- .../com/uber/hoodie/TestHoodieReadClient.java | 4 +++ .../hoodie/common/model/HoodieRecord.java | 2 +- .../converter/HoodieRecordConverter.java | 34 +++++++++++++++---- .../common/util/SpillableMapTestUtils.java | 10 ++++-- .../collection/TestExternalSpillableMap.java | 7 ++-- 5 files changed, 44 insertions(+), 13 deletions(-) diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java index 45cc4d381..9f60cc370 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java @@ -165,9 +165,12 @@ public class TestHoodieReadClient extends TestHoodieClientBase { JavaRDD result = insertFirstBatch(hoodieWriteConfig, client, newCommitTime, initCommitTime, numRecords, insertFn, isPrepped, true, numRecords); + // Construct HoodieRecord from the WriteStatus but set HoodieKey, Data and HoodieRecordLocation accordingly + // since they have been modified in the DAG JavaRDD recordRDD = jsc.parallelize( result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream) + .map(record -> new HoodieRecord(record.getKey(), null)) .collect(Collectors.toList())); // Should have 100 records in table (check using Index), all in locations marked at commit HoodieReadClient readClient = new HoodieReadClient(jsc, hoodieWriteConfig.getBasePath()); @@ -186,6 +189,7 @@ public class TestHoodieReadClient extends TestHoodieClientBase { recordRDD = jsc.parallelize( result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream) + .map(record -> new HoodieRecord(record.getKey(), null)) .collect(Collectors.toList())); // Index should be able to locate all updates in correct locations. readClient = new HoodieReadClient(jsc, hoodieWriteConfig.getBasePath()); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecord.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecord.java index 16d75491a..cef6e60a2 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecord.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecord.java @@ -102,7 +102,7 @@ public class HoodieRecord implements Serializable } public Optional getNewLocation() { - return Optional.of(this.newLocation); + return Optional.ofNullable(this.newLocation); } public boolean isCurrentLocationKnown() { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/HoodieRecordConverter.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/HoodieRecordConverter.java index ee6c90d9c..6aac7b365 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/HoodieRecordConverter.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/converter/HoodieRecordConverter.java @@ -18,6 +18,7 @@ package com.uber.hoodie.common.util.collection.converter; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.common.util.ReflectionUtils; @@ -28,6 +29,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.commons.lang3.SerializationUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -58,9 +60,15 @@ public class HoodieRecordConverter implements val = HoodieAvroUtils .avroToBytes((GenericRecord) hoodieRecord.getData().getInsertValue(schema).get()); } - Pair, byte[]> data = - Pair.of(Pair.of(hoodieRecord.getKey().getRecordKey(), - hoodieRecord.getKey().getPartitionPath()), val); + byte [] currentLocation = hoodieRecord.getCurrentLocation() != null ? SerializationUtils.serialize(hoodieRecord + .getCurrentLocation()) : new byte[0]; + byte [] newLocation = hoodieRecord.getNewLocation().isPresent() ? SerializationUtils.serialize( + (HoodieRecordLocation) hoodieRecord.getNewLocation().get()) : new byte[0]; + + // Triple, Pair, data> + Triple, Pair, byte[]> data = + Triple.of(Pair.of(hoodieRecord.getKey().getRecordKey(), + hoodieRecord.getKey().getPartitionPath()), Pair.of(currentLocation, newLocation), val); return SerializationUtils.serialize(data); } catch (IOException io) { throw new HoodieNotSerializableException("Cannot serialize value to bytes", io); @@ -70,17 +78,29 @@ public class HoodieRecordConverter implements @Override public HoodieRecord getData(byte[] bytes) { try { - Pair, byte[]> data = SerializationUtils.deserialize(bytes); + Triple, Pair, byte[]> data = SerializationUtils.deserialize(bytes); Optional payload = Optional.empty(); - if (data.getValue().length > 0) { + HoodieRecordLocation currentLocation = null; + HoodieRecordLocation newLocation = null; + if (data.getRight().length > 0) { // This can happen if the record is deleted, the payload is optional with 0 bytes - payload = Optional.of(HoodieAvroUtils.bytesToAvro(data.getValue(), schema)); + payload = Optional.of(HoodieAvroUtils.bytesToAvro(data.getRight(), schema)); + } + // Get the currentLocation for the HoodieRecord + if (data.getMiddle().getLeft().length > 0) { + currentLocation = SerializationUtils.deserialize(data.getMiddle().getLeft()); + } + // Get the newLocation for the HoodieRecord + if (data.getMiddle().getRight().length > 0) { + newLocation = SerializationUtils.deserialize(data.getMiddle().getRight()); } HoodieRecord hoodieRecord = new HoodieRecord<>( - new HoodieKey(data.getKey().getKey(), data.getKey().getValue()), + new HoodieKey(data.getLeft().getKey(), data.getLeft().getValue()), ReflectionUtils .loadPayload(payloadClazz, new Object[]{payload}, Optional.class)); + hoodieRecord.setCurrentLocation(currentLocation); + hoodieRecord.setNewLocation(newLocation); return hoodieRecord; } catch (IOException io) { throw new HoodieNotSerializableException("Cannot de-serialize value from bytes", io); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/SpillableMapTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/SpillableMapTestUtils.java index 42549afcf..3ffb23dff 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/SpillableMapTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/SpillableMapTestUtils.java @@ -19,6 +19,7 @@ package com.uber.hoodie.common.util; import com.uber.hoodie.common.model.HoodieAvroPayload; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; import java.util.ArrayList; import java.util.List; @@ -29,6 +30,9 @@ import org.apache.avro.generic.IndexedRecord; public class SpillableMapTestUtils { + public static final String DUMMY_COMMIT_TIME = "DUMMY_COMMIT_TIME"; + public static final String DUMMY_FILE_ID = "DUMMY_FILE_ID"; + public static List upsertRecords(List iRecords, Map> records) { List recordKeys = new ArrayList<>(); @@ -38,8 +42,10 @@ public class SpillableMapTestUtils { String key = ((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); String partitionPath = ((GenericRecord) r).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); recordKeys.add(key); - records.put(key, new HoodieRecord<>(new HoodieKey(key, partitionPath), - new HoodieAvroPayload(Optional.of((GenericRecord) r)))); + HoodieRecord record = new HoodieRecord<>(new HoodieKey(key, partitionPath), + new HoodieAvroPayload(Optional.of((GenericRecord) r))); + record.setCurrentLocation(new HoodieRecordLocation("DUMMY_COMMIT_TIME", "DUMMY_FILE_ID")); + records.put(key, record); }); return recordKeys; } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java index 7a5239f9d..c85e869dd 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java @@ -149,11 +149,12 @@ public class TestExternalSpillableMap { new HoodieAvroPayload(Optional.of((GenericRecord) onDiskRecord))); // assert size assert records.size() == 100; - // get should return the same HoodieKey and same value + // get should return the same HoodieKey, same location and same value assert inMemoryHoodieRecord.getKey().equals(records.get(ikey).getKey()); assert onDiskHoodieRecord.getKey().equals(records.get(dkey).getKey()); - //assert inMemoryHoodieRecord.equals(records.get(ikey)); - //assert onDiskHoodieRecord.equals(records.get(dkey)); + // compare the member variables of HoodieRecord not set by the constructor + assert records.get(ikey).getCurrentLocation().getFileId().equals(SpillableMapTestUtils.DUMMY_FILE_ID); + assert records.get(ikey).getCurrentLocation().getCommitTime().equals(SpillableMapTestUtils.DUMMY_COMMIT_TIME); // test contains assertTrue(records.containsKey(ikey));