HUDI-170 Updating hoodie record before inserting it into ExternalSpillableMap (#866)
This commit is contained in:
committed by
vinoth chandar
parent
40dd4dd637
commit
a6908ef44d
@@ -207,9 +207,10 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
|
|||||||
while (newRecordsItr.hasNext()) {
|
while (newRecordsItr.hasNext()) {
|
||||||
HoodieRecord<T> record = newRecordsItr.next();
|
HoodieRecord<T> record = newRecordsItr.next();
|
||||||
partitionPath = record.getPartitionPath();
|
partitionPath = record.getPartitionPath();
|
||||||
keyToNewRecords.put(record.getRecordKey(), record);
|
|
||||||
// update the new location of the record, so we know where to find it next
|
// update the new location of the record, so we know where to find it next
|
||||||
record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
|
record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
|
||||||
|
//NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist
|
||||||
|
keyToNewRecords.put(record.getRecordKey(), record);
|
||||||
}
|
}
|
||||||
logger.info("Number of entries in MemoryBasedMap => "
|
logger.info("Number of entries in MemoryBasedMap => "
|
||||||
+ ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries()
|
+ ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries()
|
||||||
|
|||||||
@@ -333,7 +333,13 @@ public class TestHoodieMergeHandle {
|
|||||||
// handling
|
// handling
|
||||||
Assert.assertEquals((long) statuses.stream()
|
Assert.assertEquals((long) statuses.stream()
|
||||||
.map(status -> status.getStat().getNumInserts()).reduce((a,b) -> a + b).get(), 100);
|
.map(status -> status.getStat().getNumInserts()).reduce((a,b) -> a + b).get(), 100);
|
||||||
|
// Verify all records have location set
|
||||||
|
statuses.forEach(writeStatus -> {
|
||||||
|
writeStatus.getWrittenRecords().forEach(r -> {
|
||||||
|
// Ensure New Location is set
|
||||||
|
Assert.assertTrue(r.getNewLocation().isPresent());
|
||||||
|
});
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private Dataset<Row> getRecords() {
|
private Dataset<Row> getRecords() {
|
||||||
@@ -366,6 +372,18 @@ public class TestHoodieMergeHandle {
|
|||||||
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
|
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
|
||||||
.forTable("test-trip-table")
|
.forTable("test-trip-table")
|
||||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
|
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
|
||||||
.withBulkInsertParallelism(2);
|
.withBulkInsertParallelism(2)
|
||||||
|
.withWriteStatusClass(TestWriteStatus.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Overridden so that we can capture and inspect all success records
|
||||||
|
*/
|
||||||
|
public static class TestWriteStatus extends WriteStatus {
|
||||||
|
|
||||||
|
public TestWriteStatus(Boolean trackSuccessRecords, Double failureFraction) {
|
||||||
|
// Track Success Records
|
||||||
|
super(true, failureFraction);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user