Fixing missing hoodie record location in HoodieRecord when record is read from disk after being spilled
This commit is contained in:
committed by
vinoth chandar
parent
f62890ca1f
commit
44caf0d40c
@@ -165,9 +165,12 @@ public class TestHoodieReadClient extends TestHoodieClientBase {
|
|||||||
JavaRDD<WriteStatus> result =
|
JavaRDD<WriteStatus> result =
|
||||||
insertFirstBatch(hoodieWriteConfig, client, newCommitTime, initCommitTime, numRecords, insertFn, isPrepped,
|
insertFirstBatch(hoodieWriteConfig, client, newCommitTime, initCommitTime, numRecords, insertFn, isPrepped,
|
||||||
true, numRecords);
|
true, numRecords);
|
||||||
|
// Construct HoodieRecord from the WriteStatus but set HoodieKey, Data and HoodieRecordLocation accordingly
|
||||||
|
// since they have been modified in the DAG
|
||||||
JavaRDD<HoodieRecord> recordRDD =
|
JavaRDD<HoodieRecord> recordRDD =
|
||||||
jsc.parallelize(
|
jsc.parallelize(
|
||||||
result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
|
result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
|
||||||
|
.map(record -> new HoodieRecord(record.getKey(), null))
|
||||||
.collect(Collectors.toList()));
|
.collect(Collectors.toList()));
|
||||||
// Should have 100 records in table (check using Index), all in locations marked at commit
|
// Should have 100 records in table (check using Index), all in locations marked at commit
|
||||||
HoodieReadClient readClient = new HoodieReadClient(jsc, hoodieWriteConfig.getBasePath());
|
HoodieReadClient readClient = new HoodieReadClient(jsc, hoodieWriteConfig.getBasePath());
|
||||||
@@ -186,6 +189,7 @@ public class TestHoodieReadClient extends TestHoodieClientBase {
|
|||||||
recordRDD =
|
recordRDD =
|
||||||
jsc.parallelize(
|
jsc.parallelize(
|
||||||
result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
|
result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
|
||||||
|
.map(record -> new HoodieRecord(record.getKey(), null))
|
||||||
.collect(Collectors.toList()));
|
.collect(Collectors.toList()));
|
||||||
// Index should be able to locate all updates in correct locations.
|
// Index should be able to locate all updates in correct locations.
|
||||||
readClient = new HoodieReadClient(jsc, hoodieWriteConfig.getBasePath());
|
readClient = new HoodieReadClient(jsc, hoodieWriteConfig.getBasePath());
|
||||||
|
|||||||
@@ -102,7 +102,7 @@ public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Optional<HoodieRecordLocation> getNewLocation() {
|
public Optional<HoodieRecordLocation> getNewLocation() {
|
||||||
return Optional.of(this.newLocation);
|
return Optional.ofNullable(this.newLocation);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isCurrentLocationKnown() {
|
public boolean isCurrentLocationKnown() {
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ package com.uber.hoodie.common.util.collection.converter;
|
|||||||
|
|
||||||
import com.uber.hoodie.common.model.HoodieKey;
|
import com.uber.hoodie.common.model.HoodieKey;
|
||||||
import com.uber.hoodie.common.model.HoodieRecord;
|
import com.uber.hoodie.common.model.HoodieRecord;
|
||||||
|
import com.uber.hoodie.common.model.HoodieRecordLocation;
|
||||||
import com.uber.hoodie.common.model.HoodieRecordPayload;
|
import com.uber.hoodie.common.model.HoodieRecordPayload;
|
||||||
import com.uber.hoodie.common.util.HoodieAvroUtils;
|
import com.uber.hoodie.common.util.HoodieAvroUtils;
|
||||||
import com.uber.hoodie.common.util.ReflectionUtils;
|
import com.uber.hoodie.common.util.ReflectionUtils;
|
||||||
@@ -28,6 +29,7 @@ import org.apache.avro.Schema;
|
|||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.commons.lang3.SerializationUtils;
|
import org.apache.commons.lang3.SerializationUtils;
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
import org.apache.commons.lang3.tuple.Triple;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
@@ -58,9 +60,15 @@ public class HoodieRecordConverter<V> implements
|
|||||||
val = HoodieAvroUtils
|
val = HoodieAvroUtils
|
||||||
.avroToBytes((GenericRecord) hoodieRecord.getData().getInsertValue(schema).get());
|
.avroToBytes((GenericRecord) hoodieRecord.getData().getInsertValue(schema).get());
|
||||||
}
|
}
|
||||||
Pair<Pair<String, String>, byte[]> data =
|
byte [] currentLocation = hoodieRecord.getCurrentLocation() != null ? SerializationUtils.serialize(hoodieRecord
|
||||||
Pair.of(Pair.of(hoodieRecord.getKey().getRecordKey(),
|
.getCurrentLocation()) : new byte[0];
|
||||||
hoodieRecord.getKey().getPartitionPath()), val);
|
byte [] newLocation = hoodieRecord.getNewLocation().isPresent() ? SerializationUtils.serialize(
|
||||||
|
(HoodieRecordLocation) hoodieRecord.getNewLocation().get()) : new byte[0];
|
||||||
|
|
||||||
|
// Triple<Pair<RecordKey, PartitionPath>, Pair<oldLocation, newLocation>, data>
|
||||||
|
Triple<Pair<String, String>, Pair<byte [], byte []>, byte[]> data =
|
||||||
|
Triple.of(Pair.of(hoodieRecord.getKey().getRecordKey(),
|
||||||
|
hoodieRecord.getKey().getPartitionPath()), Pair.of(currentLocation, newLocation), val);
|
||||||
return SerializationUtils.serialize(data);
|
return SerializationUtils.serialize(data);
|
||||||
} catch (IOException io) {
|
} catch (IOException io) {
|
||||||
throw new HoodieNotSerializableException("Cannot serialize value to bytes", io);
|
throw new HoodieNotSerializableException("Cannot serialize value to bytes", io);
|
||||||
@@ -70,17 +78,29 @@ public class HoodieRecordConverter<V> implements
|
|||||||
@Override
|
@Override
|
||||||
public HoodieRecord getData(byte[] bytes) {
|
public HoodieRecord getData(byte[] bytes) {
|
||||||
try {
|
try {
|
||||||
Pair<Pair<String, String>, byte[]> data = SerializationUtils.deserialize(bytes);
|
Triple<Pair<String, String>, Pair<byte [], byte []>, byte[]> data = SerializationUtils.deserialize(bytes);
|
||||||
Optional<GenericRecord> payload = Optional.empty();
|
Optional<GenericRecord> payload = Optional.empty();
|
||||||
if (data.getValue().length > 0) {
|
HoodieRecordLocation currentLocation = null;
|
||||||
|
HoodieRecordLocation newLocation = null;
|
||||||
|
if (data.getRight().length > 0) {
|
||||||
// This can happen if the record is deleted, the payload is optional with 0 bytes
|
// This can happen if the record is deleted, the payload is optional with 0 bytes
|
||||||
payload = Optional.of(HoodieAvroUtils.bytesToAvro(data.getValue(), schema));
|
payload = Optional.of(HoodieAvroUtils.bytesToAvro(data.getRight(), schema));
|
||||||
|
}
|
||||||
|
// Get the currentLocation for the HoodieRecord
|
||||||
|
if (data.getMiddle().getLeft().length > 0) {
|
||||||
|
currentLocation = SerializationUtils.deserialize(data.getMiddle().getLeft());
|
||||||
|
}
|
||||||
|
// Get the newLocation for the HoodieRecord
|
||||||
|
if (data.getMiddle().getRight().length > 0) {
|
||||||
|
newLocation = SerializationUtils.deserialize(data.getMiddle().getRight());
|
||||||
}
|
}
|
||||||
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new HoodieRecord<>(
|
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new HoodieRecord<>(
|
||||||
new HoodieKey(data.getKey().getKey(), data.getKey().getValue()),
|
new HoodieKey(data.getLeft().getKey(), data.getLeft().getValue()),
|
||||||
ReflectionUtils
|
ReflectionUtils
|
||||||
.loadPayload(payloadClazz,
|
.loadPayload(payloadClazz,
|
||||||
new Object[]{payload}, Optional.class));
|
new Object[]{payload}, Optional.class));
|
||||||
|
hoodieRecord.setCurrentLocation(currentLocation);
|
||||||
|
hoodieRecord.setNewLocation(newLocation);
|
||||||
return hoodieRecord;
|
return hoodieRecord;
|
||||||
} catch (IOException io) {
|
} catch (IOException io) {
|
||||||
throw new HoodieNotSerializableException("Cannot de-serialize value from bytes", io);
|
throw new HoodieNotSerializableException("Cannot de-serialize value from bytes", io);
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ package com.uber.hoodie.common.util;
|
|||||||
import com.uber.hoodie.common.model.HoodieAvroPayload;
|
import com.uber.hoodie.common.model.HoodieAvroPayload;
|
||||||
import com.uber.hoodie.common.model.HoodieKey;
|
import com.uber.hoodie.common.model.HoodieKey;
|
||||||
import com.uber.hoodie.common.model.HoodieRecord;
|
import com.uber.hoodie.common.model.HoodieRecord;
|
||||||
|
import com.uber.hoodie.common.model.HoodieRecordLocation;
|
||||||
import com.uber.hoodie.common.model.HoodieRecordPayload;
|
import com.uber.hoodie.common.model.HoodieRecordPayload;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -29,6 +30,9 @@ import org.apache.avro.generic.IndexedRecord;
|
|||||||
|
|
||||||
public class SpillableMapTestUtils {
|
public class SpillableMapTestUtils {
|
||||||
|
|
||||||
|
public static final String DUMMY_COMMIT_TIME = "DUMMY_COMMIT_TIME";
|
||||||
|
public static final String DUMMY_FILE_ID = "DUMMY_FILE_ID";
|
||||||
|
|
||||||
public static List<String> upsertRecords(List<IndexedRecord> iRecords,
|
public static List<String> upsertRecords(List<IndexedRecord> iRecords,
|
||||||
Map<String, HoodieRecord<? extends HoodieRecordPayload>> records) {
|
Map<String, HoodieRecord<? extends HoodieRecordPayload>> records) {
|
||||||
List<String> recordKeys = new ArrayList<>();
|
List<String> recordKeys = new ArrayList<>();
|
||||||
@@ -38,8 +42,10 @@ public class SpillableMapTestUtils {
|
|||||||
String key = ((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
|
String key = ((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
|
||||||
String partitionPath = ((GenericRecord) r).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
|
String partitionPath = ((GenericRecord) r).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
|
||||||
recordKeys.add(key);
|
recordKeys.add(key);
|
||||||
records.put(key, new HoodieRecord<>(new HoodieKey(key, partitionPath),
|
HoodieRecord record = new HoodieRecord<>(new HoodieKey(key, partitionPath),
|
||||||
new HoodieAvroPayload(Optional.of((GenericRecord) r))));
|
new HoodieAvroPayload(Optional.of((GenericRecord) r)));
|
||||||
|
record.setCurrentLocation(new HoodieRecordLocation("DUMMY_COMMIT_TIME", "DUMMY_FILE_ID"));
|
||||||
|
records.put(key, record);
|
||||||
});
|
});
|
||||||
return recordKeys;
|
return recordKeys;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -149,11 +149,12 @@ public class TestExternalSpillableMap {
|
|||||||
new HoodieAvroPayload(Optional.of((GenericRecord) onDiskRecord)));
|
new HoodieAvroPayload(Optional.of((GenericRecord) onDiskRecord)));
|
||||||
// assert size
|
// assert size
|
||||||
assert records.size() == 100;
|
assert records.size() == 100;
|
||||||
// get should return the same HoodieKey and same value
|
// get should return the same HoodieKey, same location and same value
|
||||||
assert inMemoryHoodieRecord.getKey().equals(records.get(ikey).getKey());
|
assert inMemoryHoodieRecord.getKey().equals(records.get(ikey).getKey());
|
||||||
assert onDiskHoodieRecord.getKey().equals(records.get(dkey).getKey());
|
assert onDiskHoodieRecord.getKey().equals(records.get(dkey).getKey());
|
||||||
//assert inMemoryHoodieRecord.equals(records.get(ikey));
|
// compare the member variables of HoodieRecord not set by the constructor
|
||||||
//assert onDiskHoodieRecord.equals(records.get(dkey));
|
assert records.get(ikey).getCurrentLocation().getFileId().equals(SpillableMapTestUtils.DUMMY_FILE_ID);
|
||||||
|
assert records.get(ikey).getCurrentLocation().getCommitTime().equals(SpillableMapTestUtils.DUMMY_COMMIT_TIME);
|
||||||
|
|
||||||
// test contains
|
// test contains
|
||||||
assertTrue(records.containsKey(ikey));
|
assertTrue(records.containsKey(ikey));
|
||||||
|
|||||||
Reference in New Issue
Block a user