[HUDI-1083] Optimization in determining insert bucket location for a given key (#1868)
- To determine insert bucket location for a given key, hudi walks through all insert buckets with O(N) cost, while this patch adds an optimization to make it O(logN).
This commit is contained in:
@@ -0,0 +1,69 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.action.commit;
|
||||
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
|
||||
/**
|
||||
* Each InsertBucket has a weight, InsertBucketCumulativeWeightPair stored here is the cumulativeWeight of the
|
||||
* InsertBucket. If there are multiple InsertBuckets in a partition, the InsertBuckets are numbered from 1,
|
||||
* the cumulativeWeight of a InsertBucket is the sum of the InsertBucket weights from number 1 to its own number.
|
||||
*
|
||||
* Example, there are three InsertBucket in a partition, each bucketNumber and weight is:
|
||||
* 1) bucketNumber: 1, weight: 0.2
|
||||
* 2) bucketNumber: 2, weight: 0.3
|
||||
* 3) bucketNumber: 3, weight: 0.5
|
||||
*
|
||||
* Each cumulativeWeight of the bucket is:
|
||||
* 1) bucketNumber: 1, cumulativeWeight: 0.2
|
||||
* 2) bucketNumber: 2, cumulativeWeight: 0.5
|
||||
* 3) bucketNumber: 3, cumulativeWeight: 1.0
|
||||
*/
|
||||
public class InsertBucketCumulativeWeightPair extends Pair<InsertBucket, Double> {
|
||||
InsertBucket insertBucket;
|
||||
Double cumulativeWeight;
|
||||
|
||||
public InsertBucketCumulativeWeightPair(final InsertBucket insertBucket, final Double cumulativeWeight) {
|
||||
super();
|
||||
this.insertBucket = insertBucket;
|
||||
this.cumulativeWeight = cumulativeWeight;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InsertBucket getLeft() {
|
||||
return insertBucket;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Double getRight() {
|
||||
return cumulativeWeight;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(final Pair<InsertBucket, Double> other) {
|
||||
// Only need to compare the cumulativeWeight.
|
||||
return cumulativeWeight.compareTo(other.getRight());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Double setValue(Double value) {
|
||||
this.cumulativeWeight = value;
|
||||
return value;
|
||||
}
|
||||
}
|
||||
@@ -41,6 +41,7 @@ import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.PairFunction;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
@@ -77,7 +78,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
|
||||
/**
|
||||
* Helps us pack inserts into 1 or more buckets depending on number of incoming records.
|
||||
*/
|
||||
private HashMap<String, List<InsertBucket>> partitionPathToInsertBuckets;
|
||||
private HashMap<String, List<InsertBucketCumulativeWeightPair>> partitionPathToInsertBucketInfos;
|
||||
/**
|
||||
* Remembers what type each bucket is for later.
|
||||
*/
|
||||
@@ -90,7 +91,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
|
||||
public UpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc, HoodieTable<T> table,
|
||||
HoodieWriteConfig config) {
|
||||
updateLocationToBucket = new HashMap<>();
|
||||
partitionPathToInsertBuckets = new HashMap<>();
|
||||
partitionPathToInsertBucketInfos = new HashMap<>();
|
||||
bucketInfoMap = new HashMap<>();
|
||||
this.profile = profile;
|
||||
this.table = table;
|
||||
@@ -99,7 +100,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
|
||||
assignInserts(profile, jsc);
|
||||
|
||||
LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n"
|
||||
+ "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n"
|
||||
+ "Partition to insert buckets => " + partitionPathToInsertBucketInfos + ", \n"
|
||||
+ "UpdateLocations mapped to buckets =>" + updateLocationToBucket);
|
||||
}
|
||||
|
||||
@@ -193,15 +194,17 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
|
||||
}
|
||||
|
||||
// Go over all such buckets, and assign weights as per amount of incoming inserts.
|
||||
List<InsertBucket> insertBuckets = new ArrayList<>();
|
||||
List<InsertBucketCumulativeWeightPair> insertBuckets = new ArrayList<>();
|
||||
double curentCumulativeWeight = 0;
|
||||
for (int i = 0; i < bucketNumbers.size(); i++) {
|
||||
InsertBucket bkt = new InsertBucket();
|
||||
bkt.bucketNumber = bucketNumbers.get(i);
|
||||
bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts();
|
||||
insertBuckets.add(bkt);
|
||||
curentCumulativeWeight += bkt.weight;
|
||||
insertBuckets.add(new InsertBucketCumulativeWeightPair(bkt, curentCumulativeWeight));
|
||||
}
|
||||
LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets);
|
||||
partitionPathToInsertBuckets.put(partitionPath, insertBuckets);
|
||||
partitionPathToInsertBucketInfos.put(partitionPath, insertBuckets);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -252,8 +255,8 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
|
||||
return bucketInfoMap.get(bucketNumber);
|
||||
}
|
||||
|
||||
public List<InsertBucket> getInsertBuckets(String partitionPath) {
|
||||
return partitionPathToInsertBuckets.get(partitionPath);
|
||||
public List<InsertBucketCumulativeWeightPair> getInsertBuckets(String partitionPath) {
|
||||
return partitionPathToInsertBucketInfos.get(partitionPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -270,20 +273,24 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
|
||||
return updateLocationToBucket.get(location.getFileId());
|
||||
} else {
|
||||
String partitionPath = keyLocation._1().getPartitionPath();
|
||||
List<InsertBucket> targetBuckets = partitionPathToInsertBuckets.get(partitionPath);
|
||||
List<InsertBucketCumulativeWeightPair> targetBuckets = partitionPathToInsertBucketInfos.get(partitionPath);
|
||||
// pick the target bucket to use based on the weights.
|
||||
double totalWeight = 0.0;
|
||||
final long totalInserts = Math.max(1, profile.getWorkloadStat(partitionPath).getNumInserts());
|
||||
final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation._1().getRecordKey());
|
||||
final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts;
|
||||
for (InsertBucket insertBucket : targetBuckets) {
|
||||
totalWeight += insertBucket.weight;
|
||||
if (r <= totalWeight) {
|
||||
return insertBucket.bucketNumber;
|
||||
}
|
||||
|
||||
int index = Collections.binarySearch(targetBuckets, new InsertBucketCumulativeWeightPair(new InsertBucket(), r));
|
||||
|
||||
if (index >= 0) {
|
||||
return targetBuckets.get(index).getKey().bucketNumber;
|
||||
}
|
||||
|
||||
if ((-1 * index - 1) < targetBuckets.size()) {
|
||||
return targetBuckets.get((-1 * index - 1)).getKey().bucketNumber;
|
||||
}
|
||||
|
||||
// return first one, by default
|
||||
return targetBuckets.get(0).bucketNumber;
|
||||
return targetBuckets.get(0).getKey().bucketNumber;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -180,7 +180,7 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
|
||||
final String testPartitionPath = "2016/09/26";
|
||||
// Inserts + Updates... Check all updates go together & inserts subsplit
|
||||
UpsertPartitioner partitioner = getUpsertPartitioner(0, 200, 100, 1024, testPartitionPath, false);
|
||||
List<InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
|
||||
List<InsertBucketCumulativeWeightPair> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
|
||||
assertEquals(2, insertBuckets.size(), "Total of 2 insert buckets");
|
||||
}
|
||||
|
||||
@@ -201,20 +201,18 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
|
||||
|
||||
WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(insertRecords));
|
||||
UpsertPartitioner partitioner = new UpsertPartitioner(profile, jsc, table, config);
|
||||
List<InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
|
||||
List<InsertBucketCumulativeWeightPair> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
|
||||
|
||||
float bucket0Weight = 0.2f;
|
||||
InsertBucket newInsertBucket0 = new InsertBucket();
|
||||
newInsertBucket0.bucketNumber = insertBuckets.get(0).bucketNumber;
|
||||
newInsertBucket0.weight = bucket0Weight;
|
||||
insertBuckets.remove(0);
|
||||
insertBuckets.add(0, newInsertBucket0);
|
||||
InsertBucketCumulativeWeightPair pair = insertBuckets.remove(0);
|
||||
pair.getKey().weight = bucket0Weight;
|
||||
pair.setValue(new Double(bucket0Weight));
|
||||
insertBuckets.add(0, pair);
|
||||
|
||||
InsertBucket newInsertBucket1 = new InsertBucket();
|
||||
newInsertBucket1.bucketNumber = insertBuckets.get(1).bucketNumber;
|
||||
newInsertBucket1.weight = 1 - bucket0Weight;
|
||||
insertBuckets.remove(1);
|
||||
insertBuckets.add(1, newInsertBucket1);
|
||||
InsertBucketCumulativeWeightPair pair1 = insertBuckets.remove(1);
|
||||
pair1.getKey().weight = 1 - bucket0Weight;
|
||||
pair1.setValue(new Double(1));
|
||||
insertBuckets.add(1, pair1);
|
||||
|
||||
Map<Integer, Integer> partition2numRecords = new HashMap<Integer, Integer>();
|
||||
for (HoodieRecord hoodieRecord: insertRecords) {
|
||||
@@ -238,13 +236,26 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
|
||||
"The weight of bucket1 should be " + (1 - bucket0Weight));
|
||||
}
|
||||
|
||||
private void assertInsertBuckets(Double[] weights,
|
||||
Double[] cumulativeWeights,
|
||||
List<InsertBucketCumulativeWeightPair> insertBuckets) {
|
||||
for (int i = 0; i < weights.length; i++) {
|
||||
assertEquals(i, insertBuckets.get(i).getKey().bucketNumber,
|
||||
String.format("BucketNumber of insert bucket %d must be same as %d", i, i));
|
||||
assertEquals(weights[i], insertBuckets.get(i).getKey().weight, 0.01,
|
||||
String.format("Insert bucket %d should have weight %.1f", i, weights[i]));
|
||||
assertEquals(cumulativeWeights[i], insertBuckets.get(i).getValue(), 0.01,
|
||||
String.format("Insert bucket %d should have cumulativeWeight %.1f", i, cumulativeWeights[i]));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpsertPartitionerWithSmallInsertHandling() throws Exception {
|
||||
final String testPartitionPath = "2016/09/26";
|
||||
// Inserts + Updates .. Check updates go together & inserts subsplit, after expanding
|
||||
// smallest file
|
||||
UpsertPartitioner partitioner = getUpsertPartitioner(1000 * 1024, 400, 100, 800 * 1024, testPartitionPath, false);
|
||||
List<InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
|
||||
List<InsertBucketCumulativeWeightPair> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
|
||||
|
||||
assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions");
|
||||
assertEquals(BucketType.UPDATE, partitioner.getBucketInfo(0).bucketType,
|
||||
@@ -254,8 +265,10 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
|
||||
assertEquals(BucketType.INSERT, partitioner.getBucketInfo(2).bucketType,
|
||||
"Bucket 2 is INSERT");
|
||||
assertEquals(3, insertBuckets.size(), "Total of 3 insert buckets");
|
||||
assertEquals(0, insertBuckets.get(0).bucketNumber, "First insert bucket must be same as update bucket");
|
||||
assertEquals(0.5, insertBuckets.get(0).weight, 0.01, "First insert bucket should have weight 0.5");
|
||||
|
||||
Double[] weights = { 0.5, 0.25, 0.25};
|
||||
Double[] cumulativeWeights = { 0.5, 0.75, 1.0};
|
||||
assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
|
||||
|
||||
// Now with insert split size auto tuned
|
||||
partitioner = getUpsertPartitioner(1000 * 1024, 2400, 100, 800 * 1024, testPartitionPath, true);
|
||||
@@ -271,8 +284,10 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
|
||||
assertEquals(BucketType.INSERT, partitioner.getBucketInfo(3).bucketType,
|
||||
"Bucket 3 is INSERT");
|
||||
assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets");
|
||||
assertEquals(0, insertBuckets.get(0).bucketNumber, "First insert bucket must be same as update bucket");
|
||||
assertEquals(200.0 / 2400, insertBuckets.get(0).weight, 0.01, "First insert bucket should have weight 0.5");
|
||||
|
||||
weights = new Double[] { 0.08, 0.31, 0.31, 0.31};
|
||||
cumulativeWeights = new Double[] { 0.08, 0.39, 0.69, 1.0};
|
||||
assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
|
||||
}
|
||||
|
||||
private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() {
|
||||
|
||||
Reference in New Issue
Block a user