From 22cd824d993bf43d88121ea89bad3a1f23a28518 Mon Sep 17 00:00:00 2001 From: garyli1019 Date: Thu, 14 May 2020 20:20:44 -0700 Subject: [PATCH] HUDI-494 fix incorrect record size estimation --- .../hudi/config/HoodieCompactionConfig.java | 13 ++ .../apache/hudi/config/HoodieWriteConfig.java | 4 + .../hudi/table/HoodieCopyOnWriteTable.java | 31 ----- .../action/commit/UpsertPartitioner.java | 9 +- .../TestHoodieClientOnCopyOnWriteStorage.java | 8 +- .../hudi/table/TestHoodieRecordSizing.java | 116 ------------------ .../action/commit/TestUpsertPartitioner.java | 91 ++++++++++++++ .../testutils/HoodieTestDataGenerator.java | 8 +- 8 files changed, 125 insertions(+), 155 deletions(-) delete mode 100644 hudi-client/src/test/java/org/apache/hudi/table/TestHoodieRecordSizing.java diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index 5e295acbe..f89fc0638 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -54,6 +54,12 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { public static final String PARQUET_SMALL_FILE_LIMIT_BYTES = "hoodie.parquet.small.file.limit"; // By default, treat any file <= 100MB as a small file. public static final String DEFAULT_PARQUET_SMALL_FILE_LIMIT_BYTES = String.valueOf(104857600); + // Hudi will use the previous commit to calculate the estimated record size by totalBytesWritten/totalRecordsWritten. + // If the previous commit is too small to make an accurate estimation, Hudi will search commits in the reverse order, + // until find a commit has totalBytesWritten larger than (PARQUET_SMALL_FILE_LIMIT_BYTES * RECORD_SIZE_ESTIMATION_THRESHOLD) + public static final String RECORD_SIZE_ESTIMATION_THRESHOLD_PROP = "hoodie.record.size.estimation.threshold"; + public static final String DEFAULT_RECORD_SIZE_ESTIMATION_THRESHOLD = "1.0"; + /** * Configs related to specific table types. */ @@ -173,6 +179,11 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { return this; } + public Builder compactionRecordSizeEstimateThreshold(double threshold) { + props.setProperty(RECORD_SIZE_ESTIMATION_THRESHOLD_PROP, String.valueOf(threshold)); + return this; + } + public Builder insertSplitSize(int insertSplitSize) { props.setProperty(COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE, String.valueOf(insertSplitSize)); return this; @@ -254,6 +265,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { DEFAULT_MIN_COMMITS_TO_KEEP); setDefaultOnCondition(props, !props.containsKey(PARQUET_SMALL_FILE_LIMIT_BYTES), PARQUET_SMALL_FILE_LIMIT_BYTES, DEFAULT_PARQUET_SMALL_FILE_LIMIT_BYTES); + setDefaultOnCondition(props, !props.containsKey(RECORD_SIZE_ESTIMATION_THRESHOLD_PROP), RECORD_SIZE_ESTIMATION_THRESHOLD_PROP, + DEFAULT_RECORD_SIZE_ESTIMATION_THRESHOLD); setDefaultOnCondition(props, !props.containsKey(COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE), COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE, DEFAULT_COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE); setDefaultOnCondition(props, !props.containsKey(COPY_ON_WRITE_TABLE_AUTO_SPLIT_INSERTS), diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index d6527fa3a..d89925711 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -272,6 +272,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Integer.parseInt(props.getProperty(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT_BYTES)); } + public double getRecordSizeEstimationThreshold() { + return Double.parseDouble(props.getProperty(HoodieCompactionConfig.RECORD_SIZE_ESTIMATION_THRESHOLD_PROP)); + } + public int getCopyOnWriteInsertSplitSize() { return Integer.parseInt(props.getProperty(HoodieCompactionConfig.COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE)); } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java index ed2918080..974d847c0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java @@ -29,14 +29,12 @@ import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.utils.ParquetReaderIterator; import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; @@ -299,33 +297,4 @@ public class HoodieCopyOnWriteTable extends Hoodi return sb.toString(); } } - - /** - * Obtains the average record size based on records written during previous commits. Used for estimating how many - * records pack into one file. - */ - protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, int defaultRecordSizeEstimate) { - long avgSize = defaultRecordSizeEstimate; - try { - if (!commitTimeline.empty()) { - // Go over the reverse ordered commits to get a more recent estimate of average record size. - Iterator instants = commitTimeline.getReverseOrderedInstants().iterator(); - while (instants.hasNext()) { - HoodieInstant instant = instants.next(); - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); - long totalBytesWritten = commitMetadata.fetchTotalBytesWritten(); - long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten(); - if (totalBytesWritten > 0 && totalRecordsWritten > 0) { - avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten); - break; - } - } - } - } catch (Throwable t) { - // make this fail safe. - LOG.error("Error trying to compute average bytes/record ", t); - } - return avgSize; - } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 745388c73..a59871052 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -131,7 +131,7 @@ public class UpsertPartitioner> extends Partiti Set partitionPaths = profile.getPartitionPaths(); long averageRecordSize = averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(), - config.getCopyOnWriteRecordSizeEstimate()); + config); LOG.info("AvgRecordSize => " + averageRecordSize); Map> partitionSmallFilesMap = @@ -289,8 +289,9 @@ public class UpsertPartitioner> extends Partiti * Obtains the average record size based on records written during previous commits. Used for estimating how many * records pack into one file. */ - protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, int defaultRecordSizeEstimate) { - long avgSize = defaultRecordSizeEstimate; + protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) { + long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate(); + long fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit()); try { if (!commitTimeline.empty()) { // Go over the reverse ordered commits to get a more recent estimate of average record size. @@ -301,7 +302,7 @@ public class UpsertPartitioner> extends Partiti .fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); long totalBytesWritten = commitMetadata.fetchTotalBytesWritten(); long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten(); - if (totalBytesWritten > 0 && totalRecordsWritten > 0) { + if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) { avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten); break; } diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index 8c2efed36..8107cdf62 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -1041,10 +1041,12 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { HoodieWriteConfig.Builder builder = getConfigBuilder(useNullSchema ? NULL_SCHEMA : TRIP_EXAMPLE_SCHEMA); return builder .withCompactionConfig( - HoodieCompactionConfig.newBuilder().compactionSmallFileSize(HoodieTestDataGenerator.SIZE_PER_RECORD * 15) - .insertSplitSize(insertSplitSize).build()) // tolerate upto 15 records + HoodieCompactionConfig.newBuilder() + .compactionSmallFileSize(dataGen.getEstimatedFileSizeInBytes(150)) + .insertSplitSize(insertSplitSize).build()) .withStorageConfig( - HoodieStorageConfig.newBuilder().limitFileSize(HoodieTestDataGenerator.SIZE_PER_RECORD * 20).build()) + HoodieStorageConfig.newBuilder() + .limitFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build()) .build(); } } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieRecordSizing.java b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieRecordSizing.java deleted file mode 100644 index 17b005091..000000000 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieRecordSizing.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table; - -import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.Option; - -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; - -import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat; -import static org.apache.hudi.table.HoodieCopyOnWriteTable.averageBytesPerRecord; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class TestHoodieRecordSizing { - - private static List setupHoodieInstants() { - List instants = new ArrayList<>(); - instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts1")); - instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts2")); - instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts3")); - instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts4")); - instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts5")); - Collections.reverse(instants); - return instants; - } - - private static List generateCommitStatWith(int totalRecordsWritten, int totalBytesWritten) { - List writeStatsList = generateFakeHoodieWriteStat(5); - // clear all record and byte stats except for last entry. - for (int i = 0; i < writeStatsList.size() - 1; i++) { - HoodieWriteStat writeStat = writeStatsList.get(i); - writeStat.setNumWrites(0); - writeStat.setTotalWriteBytes(0); - } - HoodieWriteStat lastWriteStat = writeStatsList.get(writeStatsList.size() - 1); - lastWriteStat.setTotalWriteBytes(totalBytesWritten); - lastWriteStat.setNumWrites(totalRecordsWritten); - return writeStatsList; - } - - private static HoodieCommitMetadata generateCommitMetadataWith(int totalRecordsWritten, int totalBytesWritten) { - List fakeHoodieWriteStats = generateCommitStatWith(totalRecordsWritten, totalBytesWritten); - HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); - fakeHoodieWriteStats.forEach(stat -> commitMetadata.addWriteStat(stat.getPartitionPath(), stat)); - return commitMetadata; - } - - /* - * This needs to be a stack so we test all cases when either/both recordsWritten ,bytesWritten is zero before a non - * zero averageRecordSize can be computed. - */ - private static LinkedList> generateCommitMetadataList() throws IOException { - LinkedList> commits = new LinkedList<>(); - // First commit with non zero records and bytes - commits.push(Option.of(generateCommitMetadataWith(2000, 10000).toJsonString().getBytes(StandardCharsets.UTF_8))); - // Second commit with non zero records and bytes - commits.push(Option.of(generateCommitMetadataWith(1500, 7500).toJsonString().getBytes(StandardCharsets.UTF_8))); - // Third commit with both zero records and zero bytes - commits.push(Option.of(generateCommitMetadataWith(0, 0).toJsonString().getBytes(StandardCharsets.UTF_8))); - // Fourth commit with zero records - commits.push(Option.of(generateCommitMetadataWith(0, 1500).toJsonString().getBytes(StandardCharsets.UTF_8))); - // Fifth commit with zero bytes - commits.push(Option.of(generateCommitMetadataWith(2500, 0).toJsonString().getBytes(StandardCharsets.UTF_8))); - return commits; - } - - @Test - public void testAverageBytesPerRecordForNonEmptyCommitTimeLine() throws Exception { - HoodieTimeline commitTimeLine = mock(HoodieTimeline.class); - when(commitTimeLine.empty()).thenReturn(false); - when(commitTimeLine.getReverseOrderedInstants()).thenReturn(setupHoodieInstants().stream()); - LinkedList> commits = generateCommitMetadataList(); - when(commitTimeLine.getInstantDetails(any(HoodieInstant.class))).thenAnswer(invocationOnMock -> commits.pop()); - long expectAvgSize = (long) Math.ceil((1.0 * 7500) / 1500); - long actualAvgSize = averageBytesPerRecord(commitTimeLine, 1234); - assertEquals(expectAvgSize, actualAvgSize); - } - - @Test - public void testAverageBytesPerRecordForEmptyCommitTimeLine() { - HoodieTimeline commitTimeLine = mock(HoodieTimeline.class); - when(commitTimeLine.empty()).thenReturn(true); - long expectAvgSize = 2345; - long actualAvgSize = averageBytesPerRecord(commitTimeLine, 2345); - assertEquals(expectAvgSize, actualAvgSize); - } -} diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index 0926a376c..91b3bb084 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -18,9 +18,13 @@ package org.apache.hudi.table.action.commit; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; @@ -37,12 +41,21 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.jupiter.api.Test; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; import java.util.List; import scala.Tuple2; +import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat; +import static org.apache.hudi.table.action.commit.UpsertPartitioner.averageBytesPerRecord; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestUpsertPartitioner extends HoodieClientTestBase { @@ -79,6 +92,84 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { return partitioner; } + private static List setupHoodieInstants() { + List instants = new ArrayList<>(); + instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts1")); + instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts2")); + instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts3")); + instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts4")); + instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts5")); + Collections.reverse(instants); + return instants; + } + + private static List generateCommitStatWith(int totalRecordsWritten, int totalBytesWritten) { + List writeStatsList = generateFakeHoodieWriteStat(5); + // clear all record and byte stats except for last entry. + for (int i = 0; i < writeStatsList.size() - 1; i++) { + HoodieWriteStat writeStat = writeStatsList.get(i); + writeStat.setNumWrites(0); + writeStat.setTotalWriteBytes(0); + } + HoodieWriteStat lastWriteStat = writeStatsList.get(writeStatsList.size() - 1); + lastWriteStat.setTotalWriteBytes(totalBytesWritten); + lastWriteStat.setNumWrites(totalRecordsWritten); + return writeStatsList; + } + + private static HoodieCommitMetadata generateCommitMetadataWith(int totalRecordsWritten, int totalBytesWritten) { + List fakeHoodieWriteStats = generateCommitStatWith(totalRecordsWritten, totalBytesWritten); + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + fakeHoodieWriteStats.forEach(stat -> commitMetadata.addWriteStat(stat.getPartitionPath(), stat)); + return commitMetadata; + } + + /* + * This needs to be a stack so we test all cases when either/both recordsWritten ,bytesWritten is zero before a non + * zero averageRecordSize can be computed. + */ + private static LinkedList> generateCommitMetadataList() throws IOException { + LinkedList> commits = new LinkedList<>(); + // First commit with non zero records and bytes + commits.push(Option.of(generateCommitMetadataWith(2000, 10000).toJsonString().getBytes(StandardCharsets.UTF_8))); + // Second commit with non zero records and bytes + commits.push(Option.of(generateCommitMetadataWith(1500, 7500).toJsonString().getBytes(StandardCharsets.UTF_8))); + // Third commit with a small file + commits.push(Option.of(generateCommitMetadataWith(100, 500).toJsonString().getBytes(StandardCharsets.UTF_8))); + // Fourth commit with both zero records and zero bytes + commits.push(Option.of(generateCommitMetadataWith(0, 0).toJsonString().getBytes(StandardCharsets.UTF_8))); + // Fifth commit with zero records + commits.push(Option.of(generateCommitMetadataWith(0, 1500).toJsonString().getBytes(StandardCharsets.UTF_8))); + // Sixth commit with zero bytes + commits.push(Option.of(generateCommitMetadataWith(2500, 0).toJsonString().getBytes(StandardCharsets.UTF_8))); + return commits; + } + + @Test + public void testAverageBytesPerRecordForNonEmptyCommitTimeLine() throws Exception { + HoodieTimeline commitTimeLine = mock(HoodieTimeline.class); + HoodieWriteConfig config = makeHoodieClientConfigBuilder() + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1000).build()) + .build(); + when(commitTimeLine.empty()).thenReturn(false); + when(commitTimeLine.getReverseOrderedInstants()).thenReturn(setupHoodieInstants().stream()); + LinkedList> commits = generateCommitMetadataList(); + when(commitTimeLine.getInstantDetails(any(HoodieInstant.class))).thenAnswer(invocationOnMock -> commits.pop()); + long expectAvgSize = (long) Math.ceil((1.0 * 7500) / 1500); + long actualAvgSize = averageBytesPerRecord(commitTimeLine, config); + assertEquals(expectAvgSize, actualAvgSize); + } + + @Test + public void testAverageBytesPerRecordForEmptyCommitTimeLine() throws Exception { + HoodieTimeline commitTimeLine = mock(HoodieTimeline.class); + HoodieWriteConfig config = makeHoodieClientConfigBuilder().build(); + when(commitTimeLine.empty()).thenReturn(true); + long expectAvgSize = config.getCopyOnWriteRecordSizeEstimate(); + long actualAvgSize = averageBytesPerRecord(commitTimeLine, config); + assertEquals(expectAvgSize, actualAvgSize); + } + @Test public void testUpsertPartitioner() throws Exception { final String testPartitionPath = "2016/09/26"; diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java index 9f7ed2362..a6de0f581 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java @@ -70,7 +70,9 @@ import java.util.stream.Stream; public class HoodieTestDataGenerator { // based on examination of sample file, the schema produces the following per record size - public static final int SIZE_PER_RECORD = 50 * 1024; + public static final int BYTES_PER_RECORD = (int) (1.2 * 1024); + // with default bloom filter with 60,000 entries and 0.000000001 FPRate + public static final int BLOOM_FILTER_BYTES = 323495; private static Logger logger = LogManager.getLogger(HoodieTestDataGenerator.class); public static final String DEFAULT_FIRST_PARTITION_PATH = "2016/03/15"; public static final String DEFAULT_SECOND_PARTITION_PATH = "2015/03/16"; @@ -144,6 +146,10 @@ public class HoodieTestDataGenerator { } } + public int getEstimatedFileSizeInBytes(int numOfRecords) { + return numOfRecords * BYTES_PER_RECORD + BLOOM_FILTER_BYTES; + } + public TestRawTripPayload generateRandomValueAsPerSchema(String schemaStr, HoodieKey key, String commitTime, boolean isFlattened) throws IOException { if (TRIP_EXAMPLE_SCHEMA.equals(schemaStr)) { return generateRandomValue(key, commitTime, isFlattened);