[HUDI-2145] Create new bucket when NewFileAssignState filled (#3258)
Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>
This commit is contained in:
@@ -144,14 +144,16 @@ public class BucketAssigner implements AutoCloseable {
|
|||||||
NewFileAssignState newFileAssignState = newFileAssignStates.get(partitionPath);
|
NewFileAssignState newFileAssignState = newFileAssignStates.get(partitionPath);
|
||||||
if (newFileAssignState.canAssign()) {
|
if (newFileAssignState.canAssign()) {
|
||||||
newFileAssignState.assign();
|
newFileAssignState.assign();
|
||||||
}
|
|
||||||
final String key = StreamerUtil.generateBucketKey(partitionPath, newFileAssignState.fileId);
|
final String key = StreamerUtil.generateBucketKey(partitionPath, newFileAssignState.fileId);
|
||||||
return bucketInfoMap.get(key);
|
return bucketInfoMap.get(key);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath);
|
BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath);
|
||||||
final String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix());
|
final String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix());
|
||||||
bucketInfoMap.put(key, bucketInfo);
|
bucketInfoMap.put(key, bucketInfo);
|
||||||
newFileAssignStates.put(partitionPath, new NewFileAssignState(bucketInfo.getFileIdPrefix(), writeProfile.getRecordsPerBucket()));
|
NewFileAssignState newFileAssignState = new NewFileAssignState(bucketInfo.getFileIdPrefix(), writeProfile.getRecordsPerBucket());
|
||||||
|
newFileAssignState.assign();
|
||||||
|
newFileAssignStates.put(partitionPath, newFileAssignState);
|
||||||
return bucketInfo;
|
return bucketInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
|
|||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.sink.partitioner.profile.WriteProfile;
|
import org.apache.hudi.sink.partitioner.profile.WriteProfile;
|
||||||
import org.apache.hudi.table.action.commit.BucketInfo;
|
import org.apache.hudi.table.action.commit.BucketInfo;
|
||||||
@@ -49,6 +50,7 @@ import java.util.Map;
|
|||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
@@ -119,6 +121,26 @@ public class TestBucketAssigner {
|
|||||||
assertBucketEquals(bucketInfo, "par3", BucketType.INSERT);
|
assertBucketEquals(bucketInfo, "par3", BucketType.INSERT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInsertOverBucketAssigned() {
|
||||||
|
conf.setInteger(HoodieCompactionConfig.COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE.key(), 2);
|
||||||
|
writeConfig = StreamerUtil.getHoodieClientConfig(conf);
|
||||||
|
|
||||||
|
MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig);
|
||||||
|
BucketInfo bucketInfo1 = mockBucketAssigner.addInsert("par1");
|
||||||
|
assertBucketEquals(bucketInfo1, "par1", BucketType.INSERT);
|
||||||
|
|
||||||
|
BucketInfo bucketInfo2 = mockBucketAssigner.addInsert("par1");
|
||||||
|
assertBucketEquals(bucketInfo2, "par1", BucketType.INSERT);
|
||||||
|
|
||||||
|
assertEquals(bucketInfo1, bucketInfo2);
|
||||||
|
|
||||||
|
BucketInfo bucketInfo3 = mockBucketAssigner.addInsert("par1");
|
||||||
|
assertBucketEquals(bucketInfo3, "par1", BucketType.INSERT);
|
||||||
|
|
||||||
|
assertNotEquals(bucketInfo1, bucketInfo3);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInsertWithSmallFiles() {
|
public void testInsertWithSmallFiles() {
|
||||||
SmallFile f0 = new SmallFile();
|
SmallFile f0 = new SmallFile();
|
||||||
|
|||||||
Reference in New Issue
Block a user