1
0

Handling duplicate record update for single partition (duplicates in single or different parquet files)

This commit is contained in:
Omkar Joshi
2019-03-01 15:34:46 -08:00
committed by vinoth chandar
parent b514e1ab18
commit 4a8bec7ea5
3 changed files with 15 additions and 8 deletions

View File

@@ -391,15 +391,14 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
return rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map(v1 -> {
HoodieRecord<T> record = v1._1();
if (v1._2().isPresent()) {
// When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD
// will have 2 entries with the same exact in memory copy of the HoodieRecord and the 2
// separate filenames that the record is found in. This will result in setting
// currentLocation 2 times and it will fail the second time. So creating a new in memory
// copy of the hoodie record.
record = new HoodieRecord<>(v1._1());
String filename = v1._2().get();
if (filename != null && !filename.isEmpty()) {
// When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD will have 2
// entries with the same exact in memory copy of the HoodieRecord and the 2 separate filenames that the
// record is found in. This will result in setting currentLocation 2 times and it will fail the second time.
// This check will create a new in memory copy of the hoodie record.
if (record.getCurrentLocation() != null) {
record = new HoodieRecord<T>(record.getKey(), record.getData());
}
record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename)));
}

View File

@@ -202,9 +202,11 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
*/
public void write(GenericRecord oldRecord) {
String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
boolean copyOldRecord = true;
if (keyToNewRecords.containsKey(key)) {
// If we have duplicate records that we are updating, then the hoodie record will be deflated after
// writing the first record. So make a copy of the record to be merged
HoodieRecord<T> hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key));
try {
Optional<IndexedRecord> combinedAvroRecord = hoodieRecord.getData()
.combineAndGetUpdateValue(oldRecord, schema);

View File

@@ -68,6 +68,12 @@ public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable
this.newLocation = null;
}
public HoodieRecord(HoodieRecord<T> record) {
this(record.key, record.data);
this.currentLocation = record.currentLocation;
this.newLocation = record.newLocation;
}
public HoodieKey getKey() {
return key;
}