From 4c17528de021054699c60e292874cdfdbcf592af Mon Sep 17 00:00:00 2001 From: steven zhang Date: Tue, 29 Dec 2020 11:52:35 +0800 Subject: [PATCH] [HUDI-1398] Align insert file size for reducing IO (#2256) * [HUDI-1398] Align insert file size for reducing IO Co-authored-by: zhang wen --- .../action/commit/UpsertPartitioner.java | 6 +++++- .../action/commit/TestUpsertPartitioner.java | 19 +++++++++++++++++-- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index a84e9127e..ffcc1ed3c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -218,7 +218,11 @@ public class UpsertPartitioner> extends Partiti + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket); for (int b = 0; b < insertBuckets; b++) { bucketNumbers.add(totalBuckets); - recordsPerBucket.add(totalUnassignedInserts / insertBuckets); + if (b < insertBuckets - 1) { + recordsPerBucket.add(insertRecordsPerBucket); + } else { + recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket); + } BucketInfo bucketInfo = new BucketInfo(); bucketInfo.bucketType = BucketType.INSERT; bucketInfo.partitionPath = partitionPath; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index c19427c7f..f40a97c0b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -185,6 +185,21 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { assertEquals(2, insertBuckets.size(), "Total of 2 insert buckets"); } + @Test + public void testUpsertPartitionerWithRecordsPerBucket() throws Exception { + final String testPartitionPath = "2016/09/26"; + // Inserts + Updates... Check all updates go together & inserts subsplit + UpsertPartitioner partitioner = getUpsertPartitioner(0, 250, 100, 1024, testPartitionPath, false); + List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); + int insertSplitSize = partitioner.config.getCopyOnWriteInsertSplitSize(); + int remainedInsertSize = 250 - 2 * insertSplitSize; + // will assigned 3 insertBuckets. 100, 100, 50 each + assertEquals(3, insertBuckets.size(), "Total of 3 insert buckets"); + assertEquals(0.4, insertBuckets.get(0).getLeft().weight, "insert " + insertSplitSize + " records"); + assertEquals(0.4, insertBuckets.get(1).getLeft().weight, "insert " + insertSplitSize + " records"); + assertEquals(0.2, insertBuckets.get(2).getLeft().weight, "insert " + remainedInsertSize + " records"); + } + @Test public void testPartitionWeight() throws Exception { final String testPartitionPath = "2016/09/26"; @@ -286,8 +301,8 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { "Bucket 3 is INSERT"); assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets"); - weights = new Double[] { 0.08, 0.31, 0.31, 0.31}; - cumulativeWeights = new Double[] { 0.08, 0.39, 0.69, 1.0}; + weights = new Double[] { 0.08, 0.42, 0.42, 0.08}; + cumulativeWeights = new Double[] { 0.08, 0.5, 0.92, 1.0}; assertInsertBuckets(weights, cumulativeWeights, insertBuckets); }