diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 95335da6c..e1926d06e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -207,9 +207,10 @@ public class HoodieMergeHandle extends HoodieWrit while (newRecordsItr.hasNext()) { HoodieRecord record = newRecordsItr.next(); partitionPath = record.getPartitionPath(); - keyToNewRecords.put(record.getRecordKey(), record); // update the new location of the record, so we know where to find it next record.setNewLocation(new HoodieRecordLocation(instantTime, fileId)); + //NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist + keyToNewRecords.put(record.getRecordKey(), record); } logger.info("Number of entries in MemoryBasedMap => " + ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries() diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java index 1c2179ed5..e86d61f7c 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java @@ -333,7 +333,13 @@ public class TestHoodieMergeHandle { // handling Assert.assertEquals((long) statuses.stream() .map(status -> status.getStat().getNumInserts()).reduce((a,b) -> a + b).get(), 100); - + // Verify all records have location set + statuses.forEach(writeStatus -> { + writeStatus.getWrittenRecords().forEach(r -> { + // Ensure New Location is set + Assert.assertTrue(r.getNewLocation().isPresent()); + }); + }); } private Dataset getRecords() { @@ -366,6 +372,18 @@ public class TestHoodieMergeHandle { .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) .forTable("test-trip-table") .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) - .withBulkInsertParallelism(2); + .withBulkInsertParallelism(2) + .withWriteStatusClass(TestWriteStatus.class); + } + + /** + * Overridden so that we can capture and inspect all success records + */ + public static class TestWriteStatus extends WriteStatus { + + public TestWriteStatus(Boolean trackSuccessRecords, Double failureFraction) { + // Track Success Records + super(true, failureFraction); + } } }