From a6908ef44dc21898ed53785fe0bd5f1e373ae779 Mon Sep 17 00:00:00 2001 From: Balaji Varadarajan Date: Fri, 30 Aug 2019 09:03:37 -0700 Subject: [PATCH] HUDI-170 Updating hoodie record before inserting it into ExternalSpillableMap (#866) --- .../org/apache/hudi/io/HoodieMergeHandle.java | 3 ++- .../apache/hudi/io/TestHoodieMergeHandle.java | 22 +++++++++++++++++-- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 95335da6c..e1926d06e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -207,9 +207,10 @@ public class HoodieMergeHandle extends HoodieWrit while (newRecordsItr.hasNext()) { HoodieRecord record = newRecordsItr.next(); partitionPath = record.getPartitionPath(); - keyToNewRecords.put(record.getRecordKey(), record); // update the new location of the record, so we know where to find it next record.setNewLocation(new HoodieRecordLocation(instantTime, fileId)); + //NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist + keyToNewRecords.put(record.getRecordKey(), record); } logger.info("Number of entries in MemoryBasedMap => " + ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries() diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java index 1c2179ed5..e86d61f7c 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java @@ -333,7 +333,13 @@ public class TestHoodieMergeHandle { // handling Assert.assertEquals((long) statuses.stream() .map(status -> status.getStat().getNumInserts()).reduce((a,b) -> a + b).get(), 100); - + // Verify all records have location set + statuses.forEach(writeStatus -> { + writeStatus.getWrittenRecords().forEach(r -> { + // Ensure New Location is set + Assert.assertTrue(r.getNewLocation().isPresent()); + }); + }); } private Dataset getRecords() { @@ -366,6 +372,18 @@ public class TestHoodieMergeHandle { .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) .forTable("test-trip-table") .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) - .withBulkInsertParallelism(2); + .withBulkInsertParallelism(2) + .withWriteStatusClass(TestWriteStatus.class); + } + + /** + * Overridden so that we can capture and inspect all success records + */ + public static class TestWriteStatus extends WriteStatus { + + public TestWriteStatus(Boolean trackSuccessRecords, Double failureFraction) { + // Track Success Records + super(true, failureFraction); + } } }