[HUDI-2718] ExternalSpillableMap payload size re-estimation throws ArithmeticException (#3955)
- ExternalSpillableMap does the payload/value size estimation on the first put to determine when to spill over to disk map. The payload size re-estimation also happens after a minimum threshold of puts. This size re-estimation goes my the current in-memory map size for calculating average payload size and does attempts divide by zero operation when the map is size is empty. Avoiding the ArithmeticException during the payload size re-estimate by checking the map size upfront.
This commit is contained in:
committed by
GitHub
parent
4f217fe718
commit
9720820975
@@ -208,7 +208,8 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
|
||||
// Note, the converter may over estimate the size of a record in the JVM
|
||||
this.estimatedPayloadSize = keySizeEstimator.sizeEstimate(key) + valueSizeEstimator.sizeEstimate(value);
|
||||
LOG.info("Estimated Payload size => " + estimatedPayloadSize);
|
||||
} else if (shouldEstimatePayloadSize && inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0) {
|
||||
} else if (shouldEstimatePayloadSize && !inMemoryMap.isEmpty()
|
||||
&& (inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0)) {
|
||||
// Re-estimate the size of a record by calculating the size of the entire map containing
|
||||
// N entries and then dividing by the number of entries present (N). This helps to get a
|
||||
// correct estimation of the size of each record in the JVM.
|
||||
|
||||
@@ -51,6 +51,7 @@ import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
@@ -83,7 +84,7 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
|
||||
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
|
||||
assert (recordKeys.size() == 100);
|
||||
|
||||
|
||||
// Test iterator
|
||||
Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = records.iterator();
|
||||
int cntSize = 0;
|
||||
@@ -93,7 +94,7 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
|
||||
assert recordKeys.contains(rec.getRecordKey());
|
||||
}
|
||||
assertEquals(recordKeys.size(), cntSize);
|
||||
|
||||
|
||||
// Test value stream
|
||||
List<HoodieRecord<? extends HoodieRecordPayload>> values = records.valueStream().collect(Collectors.toList());
|
||||
cntSize = 0;
|
||||
@@ -221,7 +222,9 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("testArguments")
|
||||
public void testDataCorrectnessWithUpsertsToDataInMapAndOnDisk(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException, URISyntaxException {
|
||||
public void testDataCorrectnessWithUpsertsToDataInMapAndOnDisk(ExternalSpillableMap.DiskMapType diskMapType,
|
||||
boolean isCompressionEnabled) throws IOException,
|
||||
URISyntaxException {
|
||||
|
||||
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
|
||||
|
||||
@@ -274,7 +277,9 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("testArguments")
|
||||
public void testDataCorrectnessWithoutHoodieMetadata(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException, URISyntaxException {
|
||||
public void testDataCorrectnessWithoutHoodieMetadata(ExternalSpillableMap.DiskMapType diskMapType,
|
||||
boolean isCompressionEnabled) throws IOException,
|
||||
URISyntaxException {
|
||||
|
||||
Schema schema = SchemaTestUtil.getSimpleSchema();
|
||||
|
||||
@@ -337,9 +342,34 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
|
||||
assertEquals(gRecord.get(fieldName).toString(), newValue);
|
||||
}
|
||||
|
||||
// TODO : come up with a performance eval test for spillableMap
|
||||
@Test
|
||||
public void testLargeInsertUpsert() {}
|
||||
public void testEstimationWithEmptyMap() throws IOException, URISyntaxException {
|
||||
final ExternalSpillableMap.DiskMapType diskMapType = ExternalSpillableMap.DiskMapType.BITCASK;
|
||||
final boolean isCompressionEnabled = false;
|
||||
final Schema schema = SchemaTestUtil.getSimpleSchema();
|
||||
|
||||
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
||||
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
|
||||
new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled);
|
||||
|
||||
List<String> recordKeys = new ArrayList<>();
|
||||
|
||||
// Put a single record. Payload size estimation happens as part of this initial put.
|
||||
HoodieRecord seedRecord = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1).get(0);
|
||||
records.put(seedRecord.getRecordKey(), seedRecord);
|
||||
|
||||
// Remove the key immediately to make the map empty again.
|
||||
records.remove(seedRecord.getRecordKey());
|
||||
|
||||
// Verify payload size re-estimation does not throw exception
|
||||
List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 250);
|
||||
hoodieRecords.stream().forEach(hoodieRecord -> {
|
||||
assertDoesNotThrow(() -> {
|
||||
records.put(hoodieRecord.getRecordKey(), hoodieRecord);
|
||||
}, "ExternalSpillableMap put() should not throw exception!");
|
||||
recordKeys.add(hoodieRecord.getRecordKey());
|
||||
});
|
||||
}
|
||||
|
||||
private static Stream<Arguments> testArguments() {
|
||||
// Arguments : 1. Disk Map Type 2. isCompressionEnabled for BitCaskMap
|
||||
|
||||
Reference in New Issue
Block a user