From c3279cd5989805946267b046007ea23ba4b615c2 Mon Sep 17 00:00:00 2001 From: Shen Hong Date: Thu, 23 Jul 2020 20:31:49 +0800 Subject: [PATCH] [HUDI-1082] Fix minor bug in deciding the insert buckets (#1838) --- .../action/commit/UpsertPartitioner.java | 11 ++-- .../action/commit/TestUpsertPartitioner.java | 57 +++++++++++++++++++ 2 files changed, 63 insertions(+), 5 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 8857fc38e..755854b09 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -67,9 +67,9 @@ public class UpsertPartitioner> extends Partiti */ private int totalBuckets = 0; /** - * Stat for the current workload. Helps in determining total inserts, upserts etc. + * Stat for the current workload. Helps in determining inserts, upserts etc. */ - private WorkloadStat globalStat; + private WorkloadProfile profile; /** * Helps decide which bucket an incoming update should go to. */ @@ -92,7 +92,7 @@ public class UpsertPartitioner> extends Partiti updateLocationToBucket = new HashMap<>(); partitionPathToInsertBuckets = new HashMap<>(); bucketInfoMap = new HashMap<>(); - globalStat = profile.getGlobalStat(); + this.profile = profile; this.table = table; this.config = config; assignUpdates(profile); @@ -269,10 +269,11 @@ public class UpsertPartitioner> extends Partiti HoodieRecordLocation location = keyLocation._2().get(); return updateLocationToBucket.get(location.getFileId()); } else { - List targetBuckets = partitionPathToInsertBuckets.get(keyLocation._1().getPartitionPath()); + String partitionPath = keyLocation._1().getPartitionPath(); + List targetBuckets = partitionPathToInsertBuckets.get(partitionPath); // pick the target bucket to use based on the weights. double totalWeight = 0.0; - final long totalInserts = Math.max(1, globalStat.getNumInserts()); + final long totalInserts = Math.max(1, profile.getWorkloadStat(partitionPath).getNumInserts()); final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation._1().getRecordKey()); final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts; for (InsertBucket insertBucket : targetBuckets) { diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index 09be8f184..c526ad1c0 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -45,14 +45,17 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import scala.Tuple2; import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat; import static org.apache.hudi.table.action.commit.UpsertPartitioner.averageBytesPerRecord; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -179,6 +182,60 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { assertEquals(2, insertBuckets.size(), "Total of 2 insert buckets"); } + @Test + public void testPartitionWeight() throws Exception { + final String testPartitionPath = "2016/09/26"; + int totalInsertNum = 2000; + + HoodieWriteConfig config = makeHoodieClientConfigBuilder() + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0) + .insertSplitSize(totalInsertNum / 2).autoTuneInsertSplits(false).build()).build(); + + HoodieClientTestUtils.fakeCommit(basePath, "001"); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf); + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath}); + List insertRecords = dataGenerator.generateInserts("001", totalInsertNum); + + WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(insertRecords)); + UpsertPartitioner partitioner = new UpsertPartitioner(profile, jsc, table, config); + List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); + + float bucket0Weight = 0.2f; + InsertBucket newInsertBucket0 = new InsertBucket(); + newInsertBucket0.bucketNumber = insertBuckets.get(0).bucketNumber; + newInsertBucket0.weight = bucket0Weight; + insertBuckets.remove(0); + insertBuckets.add(0, newInsertBucket0); + + InsertBucket newInsertBucket1 = new InsertBucket(); + newInsertBucket1.bucketNumber = insertBuckets.get(1).bucketNumber; + newInsertBucket1.weight = 1 - bucket0Weight; + insertBuckets.remove(1); + insertBuckets.add(1, newInsertBucket1); + + Map partition2numRecords = new HashMap(); + for (HoodieRecord hoodieRecord: insertRecords) { + int partition = partitioner.getPartition(new Tuple2<>( + hoodieRecord.getKey(), Option.ofNullable(hoodieRecord.getCurrentLocation()))); + if (!partition2numRecords.containsKey(partition)) { + partition2numRecords.put(partition, 0); + } + partition2numRecords.put(partition, partition2numRecords.get(partition) + 1); + } + + assertTrue(partition2numRecords.get(0) < partition2numRecords.get(1), + "The insert num of bucket1 should more than bucket0"); + assertTrue(partition2numRecords.get(0) + partition2numRecords.get(1) == totalInsertNum, + "The total insert records should be " + totalInsertNum); + assertEquals(String.valueOf(bucket0Weight), + String.format("%.1f", (partition2numRecords.get(0) * 1.0f / totalInsertNum)), + "The weight of bucket0 should be " + bucket0Weight); + assertEquals(String.valueOf(1 - bucket0Weight), + String.format("%.1f", (partition2numRecords.get(1) * 1.0f / totalInsertNum)), + "The weight of bucket1 should be " + (1 - bucket0Weight)); + } + @Test public void testUpsertPartitionerWithSmallInsertHandling() throws Exception { final String testPartitionPath = "2016/09/26";