From 9720820975be61c9a211a24785a28cf0e8b0d7b3 Mon Sep 17 00:00:00 2001 From: Manoj Govindassamy Date: Fri, 12 Nov 2021 05:18:40 -0800 Subject: [PATCH] [HUDI-2718] ExternalSpillableMap payload size re-estimation throws ArithmeticException (#3955) - ExternalSpillableMap does the payload/value size estimation on the first put to determine when to spill over to disk map. The payload size re-estimation also happens after a minimum threshold of puts. This size re-estimation goes my the current in-memory map size for calculating average payload size and does attempts divide by zero operation when the map is size is empty. Avoiding the ArithmeticException during the payload size re-estimate by checking the map size upfront. --- .../util/collection/ExternalSpillableMap.java | 3 +- .../collection/TestExternalSpillableMap.java | 42 ++++++++++++++++--- 2 files changed, 38 insertions(+), 7 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java index d31b0aaa6..a6e8d5cfb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java @@ -208,7 +208,8 @@ public class ExternalSpillableMap " + estimatedPayloadSize); - } else if (shouldEstimatePayloadSize && inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0) { + } else if (shouldEstimatePayloadSize && !inMemoryMap.isEmpty() + && (inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0)) { // Re-estimate the size of a record by calculating the size of the entire map containing // N entries and then dividing by the number of entries present (N). This helps to get a // correct estimation of the size of each record in the JVM. diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java index 4fed5a80e..f7b45e9d8 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -83,7 +84,7 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness { List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); assert (recordKeys.size() == 100); - + // Test iterator Iterator> itr = records.iterator(); int cntSize = 0; @@ -93,7 +94,7 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness { assert recordKeys.contains(rec.getRecordKey()); } assertEquals(recordKeys.size(), cntSize); - + // Test value stream List> values = records.valueStream().collect(Collectors.toList()); cntSize = 0; @@ -221,7 +222,9 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness { @ParameterizedTest @MethodSource("testArguments") - public void testDataCorrectnessWithUpsertsToDataInMapAndOnDisk(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException, URISyntaxException { + public void testDataCorrectnessWithUpsertsToDataInMapAndOnDisk(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled) throws IOException, + URISyntaxException { Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); @@ -274,7 +277,9 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness { @ParameterizedTest @MethodSource("testArguments") - public void testDataCorrectnessWithoutHoodieMetadata(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException, URISyntaxException { + public void testDataCorrectnessWithoutHoodieMetadata(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled) throws IOException, + URISyntaxException { Schema schema = SchemaTestUtil.getSimpleSchema(); @@ -337,9 +342,34 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness { assertEquals(gRecord.get(fieldName).toString(), newValue); } - // TODO : come up with a performance eval test for spillableMap @Test - public void testLargeInsertUpsert() {} + public void testEstimationWithEmptyMap() throws IOException, URISyntaxException { + final ExternalSpillableMap.DiskMapType diskMapType = ExternalSpillableMap.DiskMapType.BITCASK; + final boolean isCompressionEnabled = false; + final Schema schema = SchemaTestUtil.getSimpleSchema(); + + ExternalSpillableMap> records = + new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), + new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled); + + List recordKeys = new ArrayList<>(); + + // Put a single record. Payload size estimation happens as part of this initial put. + HoodieRecord seedRecord = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1).get(0); + records.put(seedRecord.getRecordKey(), seedRecord); + + // Remove the key immediately to make the map empty again. + records.remove(seedRecord.getRecordKey()); + + // Verify payload size re-estimation does not throw exception + List hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 250); + hoodieRecords.stream().forEach(hoodieRecord -> { + assertDoesNotThrow(() -> { + records.put(hoodieRecord.getRecordKey(), hoodieRecord); + }, "ExternalSpillableMap put() should not throw exception!"); + recordKeys.add(hoodieRecord.getRecordKey()); + }); + } private static Stream testArguments() { // Arguments : 1. Disk Map Type 2. isCompressionEnabled for BitCaskMap