diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 5d37a0a25..758d5ec5c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -181,7 +181,11 @@ public class UpsertPartitioner> implements Part + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket); for (int b = 0; b < insertBuckets; b++) { bucketNumbers.add(totalBuckets); - recordsPerBucket.add(totalUnassignedInserts / insertBuckets); + if (b < insertBuckets - 1) { + recordsPerBucket.add(insertRecordsPerBucket); + } else { + recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket); + } BucketInfo bucketInfo = new BucketInfo(); bucketInfo.bucketType = BucketType.INSERT; bucketInfo.partitionPath = partitionPath; diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 4b0fcdf95..2e1c33847 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -179,7 +179,11 @@ public class UpsertPartitioner> implements Part + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket); for (int b = 0; b < insertBuckets; b++) { bucketNumbers.add(totalBuckets); - recordsPerBucket.add(totalUnassignedInserts / insertBuckets); + if (b < insertBuckets - 1) { + recordsPerBucket.add(insertRecordsPerBucket); + } else { + recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket); + } BucketInfo bucketInfo = new BucketInfo(); bucketInfo.bucketType = BucketType.INSERT; bucketInfo.partitionPath = partitionPath;