From a23aa41a1a3082eda080ec1cf3ad99e5ec37a9a0 Mon Sep 17 00:00:00 2001 From: steven zhang Date: Thu, 31 Dec 2020 11:06:41 +0800 Subject: [PATCH] [MINOR] Sync UpsertPartitioner modify of HUDI-1398 to flink/java (#2390) Co-authored-by: zhang wen --- .../apache/hudi/table/action/commit/UpsertPartitioner.java | 6 +++++- .../apache/hudi/table/action/commit/UpsertPartitioner.java | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 5d37a0a25..758d5ec5c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -181,7 +181,11 @@ public class UpsertPartitioner> implements Part + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket); for (int b = 0; b < insertBuckets; b++) { bucketNumbers.add(totalBuckets); - recordsPerBucket.add(totalUnassignedInserts / insertBuckets); + if (b < insertBuckets - 1) { + recordsPerBucket.add(insertRecordsPerBucket); + } else { + recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket); + } BucketInfo bucketInfo = new BucketInfo(); bucketInfo.bucketType = BucketType.INSERT; bucketInfo.partitionPath = partitionPath; diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 4b0fcdf95..2e1c33847 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -179,7 +179,11 @@ public class UpsertPartitioner> implements Part + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket); for (int b = 0; b < insertBuckets; b++) { bucketNumbers.add(totalBuckets); - recordsPerBucket.add(totalUnassignedInserts / insertBuckets); + if (b < insertBuckets - 1) { + recordsPerBucket.add(insertRecordsPerBucket); + } else { + recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket); + } BucketInfo bucketInfo = new BucketInfo(); bucketInfo.bucketType = BucketType.INSERT; bucketInfo.partitionPath = partitionPath;