From 91c221341293e80c28cce19f1642199495a96f66 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Wed, 28 Jul 2021 19:26:37 +0800 Subject: [PATCH] [HUDI-2245] BucketAssigner generates the fileId evenly to avoid data skew (#3362) --- .../partitioner/BucketAssignFunction.java | 1 + .../hudi/sink/partitioner/BucketAssigner.java | 38 +++++++++++--- .../sink/partitioner/BucketAssigners.java | 19 ++++--- .../sink/partitioner/TestBucketAssigner.java | 51 +++++++++++++++---- 4 files changed, 82 insertions(+), 27 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index ff859eef1..688cf1baf 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -123,6 +123,7 @@ public class BucketAssignFunction> new FlinkTaskContextSupplier(getRuntimeContext())); this.bucketAssigner = BucketAssigners.create( getRuntimeContext().getIndexOfThisSubtask(), + getRuntimeContext().getMaxNumberOfParallelSubtasks(), getRuntimeContext().getNumberOfParallelSubtasks(), WriteOperationType.isOverwrite(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))), HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)), diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java index 35b3429c1..43168aef0 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java @@ -28,14 +28,16 @@ import org.apache.hudi.table.action.commit.BucketType; import org.apache.hudi.table.action.commit.SmallFile; import org.apache.hudi.util.StreamerUtil; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.util.Preconditions; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * Bucket assigner that assigns the data buffer of one checkpoint into buckets. @@ -57,6 +59,11 @@ public class BucketAssigner implements AutoCloseable { */ private final int taskID; + /** + * The max parallelism. + */ + private final int maxParallelism; + /** * Number of tasks. */ @@ -89,10 +96,12 @@ public class BucketAssigner implements AutoCloseable { public BucketAssigner( int taskID, + int maxParallelism, int numTasks, WriteProfile profile, HoodieWriteConfig config) { this.taskID = taskID; + this.maxParallelism = maxParallelism; this.numTasks = numTasks; this.config = config; this.writeProfile = profile; @@ -148,7 +157,7 @@ public class BucketAssigner implements AutoCloseable { return bucketInfoMap.get(key); } } - BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath); + BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, createFileIdOfThisTask(), partitionPath); final String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix()); bucketInfoMap.put(key, bucketInfo); NewFileAssignState newFileAssignState = new NewFileAssignState(bucketInfo.getFileIdPrefix(), writeProfile.getRecordsPerBucket()); @@ -186,13 +195,26 @@ public class BucketAssigner implements AutoCloseable { return this.writeProfile.getTable(); } - private List smallFilesOfThisTask(List smallFiles) { - // computes the small files to write inserts for this task. - List smallFilesOfThisTask = new ArrayList<>(); - for (int i = taskID; i < smallFiles.size(); i += numTasks) { - smallFilesOfThisTask.add(smallFiles.get(i)); + private boolean fileIdOfThisTask(String fileId) { + // the file id can shuffle to this task + return KeyGroupRangeAssignment.assignKeyToParallelOperator(fileId, maxParallelism, numTasks) == taskID; + } + + @VisibleForTesting + public String createFileIdOfThisTask() { + String newFileIdPfx = FSUtils.createNewFileIdPfx(); + while (!fileIdOfThisTask(newFileIdPfx)) { + newFileIdPfx = FSUtils.createNewFileIdPfx(); } - return smallFilesOfThisTask; + return newFileIdPfx; + } + + @VisibleForTesting + public List smallFilesOfThisTask(List smallFiles) { + // computes the small files to write inserts for this task. + return smallFiles.stream() + .filter(smallFile -> fileIdOfThisTask(smallFile.location.getFileId())) + .collect(Collectors.toList()); } public void close() { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java index 354879059..8d304db3e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java @@ -29,21 +29,24 @@ import org.apache.hudi.sink.partitioner.profile.WriteProfiles; */ public abstract class BucketAssigners { - private BucketAssigners() {} + private BucketAssigners() { + } /** * Creates a {@code BucketAssigner}. * - * @param taskID The task ID - * @param numTasks The number of tasks - * @param overwrite Whether the write operation is OVERWRITE - * @param tableType The table type - * @param context The engine context - * @param config The configuration + * @param taskID The task ID + * @param maxParallelism The max parallelism + * @param numTasks The number of tasks + * @param overwrite Whether the write operation is OVERWRITE + * @param tableType The table type + * @param context The engine context + * @param config The configuration * @return the bucket assigner instance */ public static BucketAssigner create( int taskID, + int maxParallelism, int numTasks, boolean overwrite, HoodieTableType tableType, @@ -51,6 +54,6 @@ public abstract class BucketAssigners { HoodieWriteConfig config) { boolean delta = tableType.equals(HoodieTableType.MERGE_ON_READ); WriteProfile writeProfile = WriteProfiles.singleton(overwrite, delta, config, context); - return new BucketAssigner(taskID, numTasks, writeProfile, config); + return new BucketAssigner(taskID, maxParallelism, numTasks, writeProfile, config); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java index 4f18e8697..bea86001e 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java @@ -79,6 +79,35 @@ public class TestBucketAssigner { StreamerUtil.initTableIfNotExists(conf); } + /** + * Test that the file ids generated by the task can finally shuffled to itself. + */ + @Test + void testSmallFilesOfThisTask() { + MockBucketAssigner mockBucketAssigner1 = new MockBucketAssigner(context, writeConfig); + String fileId1 = mockBucketAssigner1.createFileIdOfThisTask(); + SmallFile smallFile1 = new SmallFile(); + smallFile1.location = new HoodieRecordLocation("t0", fileId1); + smallFile1.sizeBytes = 123; + List smallFiles1 = mockBucketAssigner1.smallFilesOfThisTask(Collections.singletonList(smallFile1)); + assertThat(smallFiles1.size(), is(1)); + + // modify the parallelism and test again + MockBucketAssigner mockBucketAssigner2 = new MockBucketAssigner(123, 200, context, writeConfig, Collections.emptyMap()); + String fileId2 = mockBucketAssigner2.createFileIdOfThisTask(); + SmallFile smallFile2 = new SmallFile(); + smallFile2.location = new HoodieRecordLocation("t0", fileId2); + smallFile2.sizeBytes = 123; + + String fileId3 = mockBucketAssigner2.createFileIdOfThisTask(); + SmallFile smallFile3 = new SmallFile(); + smallFile3.location = new HoodieRecordLocation("t0", fileId3); + smallFile3.sizeBytes = 456; + + List smallFiles2 = mockBucketAssigner1.smallFilesOfThisTask(Arrays.asList(smallFile2, smallFile3)); + assertThat(smallFiles2.size(), is(2)); + } + @Test public void testAddUpdate() { MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig); @@ -200,11 +229,11 @@ public class TestBucketAssigner { MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(0, 2, context, writeConfig, smallFilesMap); BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1"); - assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f2"); mockBucketAssigner.addInsert("par1"); bucketInfo = mockBucketAssigner.addInsert("par1"); - assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f2"); bucketInfo = mockBucketAssigner.addInsert("par3"); assertBucketEquals(bucketInfo, "par3", BucketType.INSERT); @@ -214,11 +243,11 @@ public class TestBucketAssigner { MockBucketAssigner mockBucketAssigner2 = new MockBucketAssigner(1, 2, context, writeConfig, smallFilesMap); BucketInfo bucketInfo2 = mockBucketAssigner2.addInsert("par1"); - assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); + assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f0"); mockBucketAssigner2.addInsert("par1"); bucketInfo2 = mockBucketAssigner2.addInsert("par1"); - assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); + assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f0"); bucketInfo2 = mockBucketAssigner2.addInsert("par3"); assertBucketEquals(bucketInfo2, "par3", BucketType.INSERT); @@ -292,34 +321,34 @@ public class TestBucketAssigner { mockBucketAssigner.addUpdate("par1", "f0"); BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1"); - assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f2"); mockBucketAssigner.addInsert("par1"); bucketInfo = mockBucketAssigner.addInsert("par1"); - assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f2"); mockBucketAssigner.addUpdate("par1", "f2"); mockBucketAssigner.addInsert("par1"); bucketInfo = mockBucketAssigner.addInsert("par1"); - assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f2"); MockBucketAssigner mockBucketAssigner2 = new MockBucketAssigner(1, 2, context, writeConfig, smallFilesMap); mockBucketAssigner2.addUpdate("par1", "f0"); BucketInfo bucketInfo2 = mockBucketAssigner2.addInsert("par1"); - assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); + assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f0"); mockBucketAssigner2.addInsert("par1"); bucketInfo2 = mockBucketAssigner2.addInsert("par1"); - assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); + assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f0"); mockBucketAssigner2.addUpdate("par1", "f2"); mockBucketAssigner2.addInsert("par1"); bucketInfo2 = mockBucketAssigner2.addInsert("par1"); - assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); + assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f0"); } @Test @@ -425,7 +454,7 @@ public class TestBucketAssigner { HoodieFlinkEngineContext context, HoodieWriteConfig config, Map> smallFilesMap) { - super(taskID, numTasks, new MockWriteProfile(config, context, smallFilesMap), config); + super(taskID, 1024, numTasks, new MockWriteProfile(config, context, smallFilesMap), config); } }