diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java index fcbd90f0b..5c187870f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java @@ -391,15 +391,14 @@ public class HoodieBloomIndex extends HoodieIndex return rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map(v1 -> { HoodieRecord record = v1._1(); if (v1._2().isPresent()) { + // When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD + // will have 2 entries with the same exact in memory copy of the HoodieRecord and the 2 + // separate filenames that the record is found in. This will result in setting + // currentLocation 2 times and it will fail the second time. So creating a new in memory + // copy of the hoodie record. + record = new HoodieRecord<>(v1._1()); String filename = v1._2().get(); if (filename != null && !filename.isEmpty()) { - // When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD will have 2 - // entries with the same exact in memory copy of the HoodieRecord and the 2 separate filenames that the - // record is found in. This will result in setting currentLocation 2 times and it will fail the second time. - // This check will create a new in memory copy of the hoodie record. - if (record.getCurrentLocation() != null) { - record = new HoodieRecord(record.getKey(), record.getData()); - } record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename))); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index 0700b4cbb..1b12931ee 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -202,9 +202,11 @@ public class HoodieMergeHandle extends HoodieIOHa */ public void write(GenericRecord oldRecord) { String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); - HoodieRecord hoodieRecord = keyToNewRecords.get(key); boolean copyOldRecord = true; if (keyToNewRecords.containsKey(key)) { + // If we have duplicate records that we are updating, then the hoodie record will be deflated after + // writing the first record. So make a copy of the record to be merged + HoodieRecord hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key)); try { Optional combinedAvroRecord = hoodieRecord.getData() .combineAndGetUpdateValue(oldRecord, schema); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecord.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecord.java index 857dcaaa9..52fa27988 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecord.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecord.java @@ -68,6 +68,12 @@ public class HoodieRecord implements Serializable this.newLocation = null; } + public HoodieRecord(HoodieRecord record) { + this(record.key, record.data); + this.currentLocation = record.currentLocation; + this.newLocation = record.newLocation; + } + public HoodieKey getKey() { return key; }