[HUDI-1082] Fix minor bug in deciding the insert buckets (#1838)
This commit is contained in:
@@ -67,9 +67,9 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
|
||||
*/
|
||||
private int totalBuckets = 0;
|
||||
/**
|
||||
* Stat for the current workload. Helps in determining total inserts, upserts etc.
|
||||
* Stat for the current workload. Helps in determining inserts, upserts etc.
|
||||
*/
|
||||
private WorkloadStat globalStat;
|
||||
private WorkloadProfile profile;
|
||||
/**
|
||||
* Helps decide which bucket an incoming update should go to.
|
||||
*/
|
||||
@@ -92,7 +92,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
|
||||
updateLocationToBucket = new HashMap<>();
|
||||
partitionPathToInsertBuckets = new HashMap<>();
|
||||
bucketInfoMap = new HashMap<>();
|
||||
globalStat = profile.getGlobalStat();
|
||||
this.profile = profile;
|
||||
this.table = table;
|
||||
this.config = config;
|
||||
assignUpdates(profile);
|
||||
@@ -269,10 +269,11 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
|
||||
HoodieRecordLocation location = keyLocation._2().get();
|
||||
return updateLocationToBucket.get(location.getFileId());
|
||||
} else {
|
||||
List<InsertBucket> targetBuckets = partitionPathToInsertBuckets.get(keyLocation._1().getPartitionPath());
|
||||
String partitionPath = keyLocation._1().getPartitionPath();
|
||||
List<InsertBucket> targetBuckets = partitionPathToInsertBuckets.get(partitionPath);
|
||||
// pick the target bucket to use based on the weights.
|
||||
double totalWeight = 0.0;
|
||||
final long totalInserts = Math.max(1, globalStat.getNumInserts());
|
||||
final long totalInserts = Math.max(1, profile.getWorkloadStat(partitionPath).getNumInserts());
|
||||
final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation._1().getRecordKey());
|
||||
final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts;
|
||||
for (InsertBucket insertBucket : targetBuckets) {
|
||||
|
||||
@@ -45,14 +45,17 @@ import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import scala.Tuple2;
|
||||
|
||||
import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat;
|
||||
import static org.apache.hudi.table.action.commit.UpsertPartitioner.averageBytesPerRecord;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
@@ -179,6 +182,60 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
|
||||
assertEquals(2, insertBuckets.size(), "Total of 2 insert buckets");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPartitionWeight() throws Exception {
|
||||
final String testPartitionPath = "2016/09/26";
|
||||
int totalInsertNum = 2000;
|
||||
|
||||
HoodieWriteConfig config = makeHoodieClientConfigBuilder()
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0)
|
||||
.insertSplitSize(totalInsertNum / 2).autoTuneInsertSplits(false).build()).build();
|
||||
|
||||
HoodieClientTestUtils.fakeCommit(basePath, "001");
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
|
||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath});
|
||||
List<HoodieRecord> insertRecords = dataGenerator.generateInserts("001", totalInsertNum);
|
||||
|
||||
WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(insertRecords));
|
||||
UpsertPartitioner partitioner = new UpsertPartitioner(profile, jsc, table, config);
|
||||
List<InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
|
||||
|
||||
float bucket0Weight = 0.2f;
|
||||
InsertBucket newInsertBucket0 = new InsertBucket();
|
||||
newInsertBucket0.bucketNumber = insertBuckets.get(0).bucketNumber;
|
||||
newInsertBucket0.weight = bucket0Weight;
|
||||
insertBuckets.remove(0);
|
||||
insertBuckets.add(0, newInsertBucket0);
|
||||
|
||||
InsertBucket newInsertBucket1 = new InsertBucket();
|
||||
newInsertBucket1.bucketNumber = insertBuckets.get(1).bucketNumber;
|
||||
newInsertBucket1.weight = 1 - bucket0Weight;
|
||||
insertBuckets.remove(1);
|
||||
insertBuckets.add(1, newInsertBucket1);
|
||||
|
||||
Map<Integer, Integer> partition2numRecords = new HashMap<Integer, Integer>();
|
||||
for (HoodieRecord hoodieRecord: insertRecords) {
|
||||
int partition = partitioner.getPartition(new Tuple2<>(
|
||||
hoodieRecord.getKey(), Option.ofNullable(hoodieRecord.getCurrentLocation())));
|
||||
if (!partition2numRecords.containsKey(partition)) {
|
||||
partition2numRecords.put(partition, 0);
|
||||
}
|
||||
partition2numRecords.put(partition, partition2numRecords.get(partition) + 1);
|
||||
}
|
||||
|
||||
assertTrue(partition2numRecords.get(0) < partition2numRecords.get(1),
|
||||
"The insert num of bucket1 should more than bucket0");
|
||||
assertTrue(partition2numRecords.get(0) + partition2numRecords.get(1) == totalInsertNum,
|
||||
"The total insert records should be " + totalInsertNum);
|
||||
assertEquals(String.valueOf(bucket0Weight),
|
||||
String.format("%.1f", (partition2numRecords.get(0) * 1.0f / totalInsertNum)),
|
||||
"The weight of bucket0 should be " + bucket0Weight);
|
||||
assertEquals(String.valueOf(1 - bucket0Weight),
|
||||
String.format("%.1f", (partition2numRecords.get(1) * 1.0f / totalInsertNum)),
|
||||
"The weight of bucket1 should be " + (1 - bucket0Weight));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpsertPartitionerWithSmallInsertHandling() throws Exception {
|
||||
final String testPartitionPath = "2016/09/26";
|
||||
|
||||
Reference in New Issue
Block a user