From 1d09c02f1cb9a3929995fb6df1bb7d58cdcb3eb3 Mon Sep 17 00:00:00 2001 From: Shen Hong Date: Sat, 22 Aug 2020 19:41:39 +0800 Subject: [PATCH] [HUDI-1083] Optimization in determining insert bucket location for a given key (#1868) - To determine insert bucket location for a given key, hudi walks through all insert buckets with O(N) cost, while this patch adds an optimization to make it O(logN). --- .../InsertBucketCumulativeWeightPair.java | 69 +++++++++++++++++++ .../action/commit/UpsertPartitioner.java | 39 ++++++----- .../action/commit/TestUpsertPartitioner.java | 49 ++++++++----- 3 files changed, 124 insertions(+), 33 deletions(-) create mode 100644 hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertBucketCumulativeWeightPair.java diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertBucketCumulativeWeightPair.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertBucketCumulativeWeightPair.java new file mode 100644 index 000000000..11db69a95 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertBucketCumulativeWeightPair.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.common.util.collection.Pair; + +/** + * Each InsertBucket has a weight, InsertBucketCumulativeWeightPair stored here is the cumulativeWeight of the + * InsertBucket. If there are multiple InsertBuckets in a partition, the InsertBuckets are numbered from 1, + * the cumulativeWeight of a InsertBucket is the sum of the InsertBucket weights from number 1 to its own number. + * + * Example, there are three InsertBucket in a partition, each bucketNumber and weight is: + * 1) bucketNumber: 1, weight: 0.2 + * 2) bucketNumber: 2, weight: 0.3 + * 3) bucketNumber: 3, weight: 0.5 + * + * Each cumulativeWeight of the bucket is: + * 1) bucketNumber: 1, cumulativeWeight: 0.2 + * 2) bucketNumber: 2, cumulativeWeight: 0.5 + * 3) bucketNumber: 3, cumulativeWeight: 1.0 + */ +public class InsertBucketCumulativeWeightPair extends Pair { + InsertBucket insertBucket; + Double cumulativeWeight; + + public InsertBucketCumulativeWeightPair(final InsertBucket insertBucket, final Double cumulativeWeight) { + super(); + this.insertBucket = insertBucket; + this.cumulativeWeight = cumulativeWeight; + } + + @Override + public InsertBucket getLeft() { + return insertBucket; + } + + @Override + public Double getRight() { + return cumulativeWeight; + } + + @Override + public int compareTo(final Pair other) { + // Only need to compare the cumulativeWeight. + return cumulativeWeight.compareTo(other.getRight()); + } + + @Override + public Double setValue(Double value) { + this.cumulativeWeight = value; + return value; + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 755854b09..86fa1bfd7 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -41,6 +41,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -77,7 +78,7 @@ public class UpsertPartitioner> extends Partiti /** * Helps us pack inserts into 1 or more buckets depending on number of incoming records. */ - private HashMap> partitionPathToInsertBuckets; + private HashMap> partitionPathToInsertBucketInfos; /** * Remembers what type each bucket is for later. */ @@ -90,7 +91,7 @@ public class UpsertPartitioner> extends Partiti public UpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc, HoodieTable table, HoodieWriteConfig config) { updateLocationToBucket = new HashMap<>(); - partitionPathToInsertBuckets = new HashMap<>(); + partitionPathToInsertBucketInfos = new HashMap<>(); bucketInfoMap = new HashMap<>(); this.profile = profile; this.table = table; @@ -99,7 +100,7 @@ public class UpsertPartitioner> extends Partiti assignInserts(profile, jsc); LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n" - + "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n" + + "Partition to insert buckets => " + partitionPathToInsertBucketInfos + ", \n" + "UpdateLocations mapped to buckets =>" + updateLocationToBucket); } @@ -193,15 +194,17 @@ public class UpsertPartitioner> extends Partiti } // Go over all such buckets, and assign weights as per amount of incoming inserts. - List insertBuckets = new ArrayList<>(); + List insertBuckets = new ArrayList<>(); + double curentCumulativeWeight = 0; for (int i = 0; i < bucketNumbers.size(); i++) { InsertBucket bkt = new InsertBucket(); bkt.bucketNumber = bucketNumbers.get(i); bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts(); - insertBuckets.add(bkt); + curentCumulativeWeight += bkt.weight; + insertBuckets.add(new InsertBucketCumulativeWeightPair(bkt, curentCumulativeWeight)); } LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets); - partitionPathToInsertBuckets.put(partitionPath, insertBuckets); + partitionPathToInsertBucketInfos.put(partitionPath, insertBuckets); } } } @@ -252,8 +255,8 @@ public class UpsertPartitioner> extends Partiti return bucketInfoMap.get(bucketNumber); } - public List getInsertBuckets(String partitionPath) { - return partitionPathToInsertBuckets.get(partitionPath); + public List getInsertBuckets(String partitionPath) { + return partitionPathToInsertBucketInfos.get(partitionPath); } @Override @@ -270,20 +273,24 @@ public class UpsertPartitioner> extends Partiti return updateLocationToBucket.get(location.getFileId()); } else { String partitionPath = keyLocation._1().getPartitionPath(); - List targetBuckets = partitionPathToInsertBuckets.get(partitionPath); + List targetBuckets = partitionPathToInsertBucketInfos.get(partitionPath); // pick the target bucket to use based on the weights. - double totalWeight = 0.0; final long totalInserts = Math.max(1, profile.getWorkloadStat(partitionPath).getNumInserts()); final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation._1().getRecordKey()); final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts; - for (InsertBucket insertBucket : targetBuckets) { - totalWeight += insertBucket.weight; - if (r <= totalWeight) { - return insertBucket.bucketNumber; - } + + int index = Collections.binarySearch(targetBuckets, new InsertBucketCumulativeWeightPair(new InsertBucket(), r)); + + if (index >= 0) { + return targetBuckets.get(index).getKey().bucketNumber; } + + if ((-1 * index - 1) < targetBuckets.size()) { + return targetBuckets.get((-1 * index - 1)).getKey().bucketNumber; + } + // return first one, by default - return targetBuckets.get(0).bucketNumber; + return targetBuckets.get(0).getKey().bucketNumber; } } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index f49d6d5c4..b8df5ef75 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -180,7 +180,7 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { final String testPartitionPath = "2016/09/26"; // Inserts + Updates... Check all updates go together & inserts subsplit UpsertPartitioner partitioner = getUpsertPartitioner(0, 200, 100, 1024, testPartitionPath, false); - List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); + List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); assertEquals(2, insertBuckets.size(), "Total of 2 insert buckets"); } @@ -201,20 +201,18 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(insertRecords)); UpsertPartitioner partitioner = new UpsertPartitioner(profile, jsc, table, config); - List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); + List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); float bucket0Weight = 0.2f; - InsertBucket newInsertBucket0 = new InsertBucket(); - newInsertBucket0.bucketNumber = insertBuckets.get(0).bucketNumber; - newInsertBucket0.weight = bucket0Weight; - insertBuckets.remove(0); - insertBuckets.add(0, newInsertBucket0); + InsertBucketCumulativeWeightPair pair = insertBuckets.remove(0); + pair.getKey().weight = bucket0Weight; + pair.setValue(new Double(bucket0Weight)); + insertBuckets.add(0, pair); - InsertBucket newInsertBucket1 = new InsertBucket(); - newInsertBucket1.bucketNumber = insertBuckets.get(1).bucketNumber; - newInsertBucket1.weight = 1 - bucket0Weight; - insertBuckets.remove(1); - insertBuckets.add(1, newInsertBucket1); + InsertBucketCumulativeWeightPair pair1 = insertBuckets.remove(1); + pair1.getKey().weight = 1 - bucket0Weight; + pair1.setValue(new Double(1)); + insertBuckets.add(1, pair1); Map partition2numRecords = new HashMap(); for (HoodieRecord hoodieRecord: insertRecords) { @@ -238,13 +236,26 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { "The weight of bucket1 should be " + (1 - bucket0Weight)); } + private void assertInsertBuckets(Double[] weights, + Double[] cumulativeWeights, + List insertBuckets) { + for (int i = 0; i < weights.length; i++) { + assertEquals(i, insertBuckets.get(i).getKey().bucketNumber, + String.format("BucketNumber of insert bucket %d must be same as %d", i, i)); + assertEquals(weights[i], insertBuckets.get(i).getKey().weight, 0.01, + String.format("Insert bucket %d should have weight %.1f", i, weights[i])); + assertEquals(cumulativeWeights[i], insertBuckets.get(i).getValue(), 0.01, + String.format("Insert bucket %d should have cumulativeWeight %.1f", i, cumulativeWeights[i])); + } + } + @Test public void testUpsertPartitionerWithSmallInsertHandling() throws Exception { final String testPartitionPath = "2016/09/26"; // Inserts + Updates .. Check updates go together & inserts subsplit, after expanding // smallest file UpsertPartitioner partitioner = getUpsertPartitioner(1000 * 1024, 400, 100, 800 * 1024, testPartitionPath, false); - List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); + List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions"); assertEquals(BucketType.UPDATE, partitioner.getBucketInfo(0).bucketType, @@ -254,8 +265,10 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { assertEquals(BucketType.INSERT, partitioner.getBucketInfo(2).bucketType, "Bucket 2 is INSERT"); assertEquals(3, insertBuckets.size(), "Total of 3 insert buckets"); - assertEquals(0, insertBuckets.get(0).bucketNumber, "First insert bucket must be same as update bucket"); - assertEquals(0.5, insertBuckets.get(0).weight, 0.01, "First insert bucket should have weight 0.5"); + + Double[] weights = { 0.5, 0.25, 0.25}; + Double[] cumulativeWeights = { 0.5, 0.75, 1.0}; + assertInsertBuckets(weights, cumulativeWeights, insertBuckets); // Now with insert split size auto tuned partitioner = getUpsertPartitioner(1000 * 1024, 2400, 100, 800 * 1024, testPartitionPath, true); @@ -271,8 +284,10 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { assertEquals(BucketType.INSERT, partitioner.getBucketInfo(3).bucketType, "Bucket 3 is INSERT"); assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets"); - assertEquals(0, insertBuckets.get(0).bucketNumber, "First insert bucket must be same as update bucket"); - assertEquals(200.0 / 2400, insertBuckets.get(0).weight, 0.01, "First insert bucket should have weight 0.5"); + + weights = new Double[] { 0.08, 0.31, 0.31, 0.31}; + cumulativeWeights = new Double[] { 0.08, 0.39, 0.69, 1.0}; + assertInsertBuckets(weights, cumulativeWeights, insertBuckets); } private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() {