[HUDI-1398] Align insert file size for reducing IO (#2256)
* [HUDI-1398] Align insert file size for reducing IO Co-authored-by: zhang wen <wen.zhang@dmall.com>
This commit is contained in:
@@ -218,7 +218,11 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
|
|||||||
+ ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket);
|
+ ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket);
|
||||||
for (int b = 0; b < insertBuckets; b++) {
|
for (int b = 0; b < insertBuckets; b++) {
|
||||||
bucketNumbers.add(totalBuckets);
|
bucketNumbers.add(totalBuckets);
|
||||||
recordsPerBucket.add(totalUnassignedInserts / insertBuckets);
|
if (b < insertBuckets - 1) {
|
||||||
|
recordsPerBucket.add(insertRecordsPerBucket);
|
||||||
|
} else {
|
||||||
|
recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket);
|
||||||
|
}
|
||||||
BucketInfo bucketInfo = new BucketInfo();
|
BucketInfo bucketInfo = new BucketInfo();
|
||||||
bucketInfo.bucketType = BucketType.INSERT;
|
bucketInfo.bucketType = BucketType.INSERT;
|
||||||
bucketInfo.partitionPath = partitionPath;
|
bucketInfo.partitionPath = partitionPath;
|
||||||
|
|||||||
@@ -185,6 +185,21 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
|
|||||||
assertEquals(2, insertBuckets.size(), "Total of 2 insert buckets");
|
assertEquals(2, insertBuckets.size(), "Total of 2 insert buckets");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpsertPartitionerWithRecordsPerBucket() throws Exception {
|
||||||
|
final String testPartitionPath = "2016/09/26";
|
||||||
|
// Inserts + Updates... Check all updates go together & inserts subsplit
|
||||||
|
UpsertPartitioner partitioner = getUpsertPartitioner(0, 250, 100, 1024, testPartitionPath, false);
|
||||||
|
List<InsertBucketCumulativeWeightPair> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
|
||||||
|
int insertSplitSize = partitioner.config.getCopyOnWriteInsertSplitSize();
|
||||||
|
int remainedInsertSize = 250 - 2 * insertSplitSize;
|
||||||
|
// will assigned 3 insertBuckets. 100, 100, 50 each
|
||||||
|
assertEquals(3, insertBuckets.size(), "Total of 3 insert buckets");
|
||||||
|
assertEquals(0.4, insertBuckets.get(0).getLeft().weight, "insert " + insertSplitSize + " records");
|
||||||
|
assertEquals(0.4, insertBuckets.get(1).getLeft().weight, "insert " + insertSplitSize + " records");
|
||||||
|
assertEquals(0.2, insertBuckets.get(2).getLeft().weight, "insert " + remainedInsertSize + " records");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPartitionWeight() throws Exception {
|
public void testPartitionWeight() throws Exception {
|
||||||
final String testPartitionPath = "2016/09/26";
|
final String testPartitionPath = "2016/09/26";
|
||||||
@@ -286,8 +301,8 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
|
|||||||
"Bucket 3 is INSERT");
|
"Bucket 3 is INSERT");
|
||||||
assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets");
|
assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets");
|
||||||
|
|
||||||
weights = new Double[] { 0.08, 0.31, 0.31, 0.31};
|
weights = new Double[] { 0.08, 0.42, 0.42, 0.08};
|
||||||
cumulativeWeights = new Double[] { 0.08, 0.39, 0.69, 1.0};
|
cumulativeWeights = new Double[] { 0.08, 0.5, 0.92, 1.0};
|
||||||
assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
|
assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user