[MINOR] Sync UpsertPartitioner modify of HUDI-1398 to flink/java (#2390)
Co-authored-by: zhang wen <wen.zhang@dmall.com>
This commit is contained in:
@@ -181,7 +181,11 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> implements Part
|
|||||||
+ ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket);
|
+ ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket);
|
||||||
for (int b = 0; b < insertBuckets; b++) {
|
for (int b = 0; b < insertBuckets; b++) {
|
||||||
bucketNumbers.add(totalBuckets);
|
bucketNumbers.add(totalBuckets);
|
||||||
recordsPerBucket.add(totalUnassignedInserts / insertBuckets);
|
if (b < insertBuckets - 1) {
|
||||||
|
recordsPerBucket.add(insertRecordsPerBucket);
|
||||||
|
} else {
|
||||||
|
recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket);
|
||||||
|
}
|
||||||
BucketInfo bucketInfo = new BucketInfo();
|
BucketInfo bucketInfo = new BucketInfo();
|
||||||
bucketInfo.bucketType = BucketType.INSERT;
|
bucketInfo.bucketType = BucketType.INSERT;
|
||||||
bucketInfo.partitionPath = partitionPath;
|
bucketInfo.partitionPath = partitionPath;
|
||||||
|
|||||||
@@ -179,7 +179,11 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> implements Part
|
|||||||
+ ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket);
|
+ ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket);
|
||||||
for (int b = 0; b < insertBuckets; b++) {
|
for (int b = 0; b < insertBuckets; b++) {
|
||||||
bucketNumbers.add(totalBuckets);
|
bucketNumbers.add(totalBuckets);
|
||||||
recordsPerBucket.add(totalUnassignedInserts / insertBuckets);
|
if (b < insertBuckets - 1) {
|
||||||
|
recordsPerBucket.add(insertRecordsPerBucket);
|
||||||
|
} else {
|
||||||
|
recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket);
|
||||||
|
}
|
||||||
BucketInfo bucketInfo = new BucketInfo();
|
BucketInfo bucketInfo = new BucketInfo();
|
||||||
bucketInfo.bucketType = BucketType.INSERT;
|
bucketInfo.bucketType = BucketType.INSERT;
|
||||||
bucketInfo.partitionPath = partitionPath;
|
bucketInfo.partitionPath = partitionPath;
|
||||||
|
|||||||
Reference in New Issue
Block a user