From 634163a990569aa4463b58830396f455dd15340c Mon Sep 17 00:00:00 2001 From: yuzhaojing <32435329+yuzhaojing@users.noreply.github.com> Date: Tue, 20 Jul 2021 17:46:45 +0800 Subject: [PATCH] [HUDI-2145] Create new bucket when NewFileAssignState filled (#3258) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 喻兆靖 --- .../hudi/sink/partitioner/BucketAssigner.java | 8 ++++--- .../sink/partitioner/TestBucketAssigner.java | 22 +++++++++++++++++++ 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java index 6d805ce8d..35b3429c1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java @@ -144,14 +144,16 @@ public class BucketAssigner implements AutoCloseable { NewFileAssignState newFileAssignState = newFileAssignStates.get(partitionPath); if (newFileAssignState.canAssign()) { newFileAssignState.assign(); + final String key = StreamerUtil.generateBucketKey(partitionPath, newFileAssignState.fileId); + return bucketInfoMap.get(key); } - final String key = StreamerUtil.generateBucketKey(partitionPath, newFileAssignState.fileId); - return bucketInfoMap.get(key); } BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath); final String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix()); bucketInfoMap.put(key, bucketInfo); - newFileAssignStates.put(partitionPath, new NewFileAssignState(bucketInfo.getFileIdPrefix(), writeProfile.getRecordsPerBucket())); + NewFileAssignState newFileAssignState = new NewFileAssignState(bucketInfo.getFileIdPrefix(), writeProfile.getRecordsPerBucket()); + newFileAssignState.assign(); + newFileAssignStates.put(partitionPath, newFileAssignState); return bucketInfo; } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java index 1fc6e29b7..4f18e8697 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.sink.partitioner.profile.WriteProfile; import org.apache.hudi.table.action.commit.BucketInfo; @@ -49,6 +50,7 @@ import java.util.Map; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -119,6 +121,26 @@ public class TestBucketAssigner { assertBucketEquals(bucketInfo, "par3", BucketType.INSERT); } + @Test + public void testInsertOverBucketAssigned() { + conf.setInteger(HoodieCompactionConfig.COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE.key(), 2); + writeConfig = StreamerUtil.getHoodieClientConfig(conf); + + MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig); + BucketInfo bucketInfo1 = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo1, "par1", BucketType.INSERT); + + BucketInfo bucketInfo2 = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo2, "par1", BucketType.INSERT); + + assertEquals(bucketInfo1, bucketInfo2); + + BucketInfo bucketInfo3 = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo3, "par1", BucketType.INSERT); + + assertNotEquals(bucketInfo1, bucketInfo3); + } + @Test public void testInsertWithSmallFiles() { SmallFile f0 = new SmallFile();