1
0

[HUDI-232] Implement sealing/unsealing for HoodieRecord class (#938)

This commit is contained in:
leesf
2019-10-08 01:56:46 +08:00
committed by vinoth chandar
parent 8a55938ca1
commit d050d98071
11 changed files with 117 additions and 0 deletions

View File

@@ -132,7 +132,9 @@ public class InMemoryHashIndex<T extends HoodieRecordPayload> extends HoodieInde
while (hoodieRecordIterator.hasNext()) {
HoodieRecord<T> rec = hoodieRecordIterator.next();
if (recordLocationMap.containsKey(rec.getKey())) {
rec.unseal();
rec.setCurrentLocation(recordLocationMap.get(rec.getKey()));
rec.seal();
}
taggedRecords.add(rec);
}

View File

@@ -371,7 +371,9 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
// currentLocation 2 times and it will fail the second time. So creating a new in memory
// copy of the hoodie record.
record = new HoodieRecord<>(inputRecord);
record.unseal();
record.setCurrentLocation(location.get());
record.seal();
}
return record;
}

View File

@@ -239,7 +239,9 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
currentRecord = new HoodieRecord(
new HoodieKey(currentRecord.getRecordKey(), partitionPath),
currentRecord.getData());
currentRecord.unseal();
currentRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
currentRecord.seal();
taggedRecords.add(currentRecord);
// the key from Result and the key being processed should be same
assert (currentRecord.getRecordKey().contentEquals(keyFromResult));

View File

@@ -293,7 +293,9 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
private void writeToBuffer(HoodieRecord<T> record) {
// update the new location of the record, so we know where to find it next
record.unseal();
record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
record.seal();
Option<IndexedRecord> indexedRecord = getIndexedRecord(record);
if (indexedRecord.isPresent()) {
recordList.add(indexedRecord.get());

View File

@@ -101,7 +101,9 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get());
storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
// update the new location of record, so we know where to find it next
record.unseal();
record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId()));
record.seal();
recordsWritten++;
insertRecordsWritten++;
} else {

View File

@@ -208,7 +208,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
HoodieRecord<T> record = newRecordsItr.next();
partitionPath = record.getPartitionPath();
// update the new location of the record, so we know where to find it next
record.unseal();
record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
record.seal();
//NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist
keyToNewRecords.put(record.getRecordKey(), record);
}