1
0

[HUDI-2245] BucketAssigner generates the fileId evenly to avoid data skew (#3362)

This commit is contained in:
Danny Chan
2021-07-28 19:26:37 +08:00
committed by GitHub
parent 8105cf588e
commit 91c2213412
4 changed files with 82 additions and 27 deletions

View File

@@ -123,6 +123,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
new FlinkTaskContextSupplier(getRuntimeContext())); new FlinkTaskContextSupplier(getRuntimeContext()));
this.bucketAssigner = BucketAssigners.create( this.bucketAssigner = BucketAssigners.create(
getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getMaxNumberOfParallelSubtasks(),
getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getNumberOfParallelSubtasks(),
WriteOperationType.isOverwrite(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))), WriteOperationType.isOverwrite(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))),
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)), HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),

View File

@@ -28,14 +28,16 @@ import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.action.commit.SmallFile; import org.apache.hudi.table.action.commit.SmallFile;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
/** /**
* Bucket assigner that assigns the data buffer of one checkpoint into buckets. * Bucket assigner that assigns the data buffer of one checkpoint into buckets.
@@ -57,6 +59,11 @@ public class BucketAssigner implements AutoCloseable {
*/ */
private final int taskID; private final int taskID;
/**
* The max parallelism.
*/
private final int maxParallelism;
/** /**
* Number of tasks. * Number of tasks.
*/ */
@@ -89,10 +96,12 @@ public class BucketAssigner implements AutoCloseable {
public BucketAssigner( public BucketAssigner(
int taskID, int taskID,
int maxParallelism,
int numTasks, int numTasks,
WriteProfile profile, WriteProfile profile,
HoodieWriteConfig config) { HoodieWriteConfig config) {
this.taskID = taskID; this.taskID = taskID;
this.maxParallelism = maxParallelism;
this.numTasks = numTasks; this.numTasks = numTasks;
this.config = config; this.config = config;
this.writeProfile = profile; this.writeProfile = profile;
@@ -148,7 +157,7 @@ public class BucketAssigner implements AutoCloseable {
return bucketInfoMap.get(key); return bucketInfoMap.get(key);
} }
} }
BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath); BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, createFileIdOfThisTask(), partitionPath);
final String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix()); final String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix());
bucketInfoMap.put(key, bucketInfo); bucketInfoMap.put(key, bucketInfo);
NewFileAssignState newFileAssignState = new NewFileAssignState(bucketInfo.getFileIdPrefix(), writeProfile.getRecordsPerBucket()); NewFileAssignState newFileAssignState = new NewFileAssignState(bucketInfo.getFileIdPrefix(), writeProfile.getRecordsPerBucket());
@@ -186,13 +195,26 @@ public class BucketAssigner implements AutoCloseable {
return this.writeProfile.getTable(); return this.writeProfile.getTable();
} }
private List<SmallFile> smallFilesOfThisTask(List<SmallFile> smallFiles) { private boolean fileIdOfThisTask(String fileId) {
// computes the small files to write inserts for this task. // the file id can shuffle to this task
List<SmallFile> smallFilesOfThisTask = new ArrayList<>(); return KeyGroupRangeAssignment.assignKeyToParallelOperator(fileId, maxParallelism, numTasks) == taskID;
for (int i = taskID; i < smallFiles.size(); i += numTasks) { }
smallFilesOfThisTask.add(smallFiles.get(i));
@VisibleForTesting
public String createFileIdOfThisTask() {
String newFileIdPfx = FSUtils.createNewFileIdPfx();
while (!fileIdOfThisTask(newFileIdPfx)) {
newFileIdPfx = FSUtils.createNewFileIdPfx();
} }
return smallFilesOfThisTask; return newFileIdPfx;
}
@VisibleForTesting
public List<SmallFile> smallFilesOfThisTask(List<SmallFile> smallFiles) {
// computes the small files to write inserts for this task.
return smallFiles.stream()
.filter(smallFile -> fileIdOfThisTask(smallFile.location.getFileId()))
.collect(Collectors.toList());
} }
public void close() { public void close() {

View File

@@ -29,21 +29,24 @@ import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
*/ */
public abstract class BucketAssigners { public abstract class BucketAssigners {
private BucketAssigners() {} private BucketAssigners() {
}
/** /**
* Creates a {@code BucketAssigner}. * Creates a {@code BucketAssigner}.
* *
* @param taskID The task ID * @param taskID The task ID
* @param numTasks The number of tasks * @param maxParallelism The max parallelism
* @param overwrite Whether the write operation is OVERWRITE * @param numTasks The number of tasks
* @param tableType The table type * @param overwrite Whether the write operation is OVERWRITE
* @param context The engine context * @param tableType The table type
* @param config The configuration * @param context The engine context
* @param config The configuration
* @return the bucket assigner instance * @return the bucket assigner instance
*/ */
public static BucketAssigner create( public static BucketAssigner create(
int taskID, int taskID,
int maxParallelism,
int numTasks, int numTasks,
boolean overwrite, boolean overwrite,
HoodieTableType tableType, HoodieTableType tableType,
@@ -51,6 +54,6 @@ public abstract class BucketAssigners {
HoodieWriteConfig config) { HoodieWriteConfig config) {
boolean delta = tableType.equals(HoodieTableType.MERGE_ON_READ); boolean delta = tableType.equals(HoodieTableType.MERGE_ON_READ);
WriteProfile writeProfile = WriteProfiles.singleton(overwrite, delta, config, context); WriteProfile writeProfile = WriteProfiles.singleton(overwrite, delta, config, context);
return new BucketAssigner(taskID, numTasks, writeProfile, config); return new BucketAssigner(taskID, maxParallelism, numTasks, writeProfile, config);
} }
} }

View File

@@ -79,6 +79,35 @@ public class TestBucketAssigner {
StreamerUtil.initTableIfNotExists(conf); StreamerUtil.initTableIfNotExists(conf);
} }
/**
* Test that the file ids generated by the task can finally shuffled to itself.
*/
@Test
void testSmallFilesOfThisTask() {
MockBucketAssigner mockBucketAssigner1 = new MockBucketAssigner(context, writeConfig);
String fileId1 = mockBucketAssigner1.createFileIdOfThisTask();
SmallFile smallFile1 = new SmallFile();
smallFile1.location = new HoodieRecordLocation("t0", fileId1);
smallFile1.sizeBytes = 123;
List<SmallFile> smallFiles1 = mockBucketAssigner1.smallFilesOfThisTask(Collections.singletonList(smallFile1));
assertThat(smallFiles1.size(), is(1));
// modify the parallelism and test again
MockBucketAssigner mockBucketAssigner2 = new MockBucketAssigner(123, 200, context, writeConfig, Collections.emptyMap());
String fileId2 = mockBucketAssigner2.createFileIdOfThisTask();
SmallFile smallFile2 = new SmallFile();
smallFile2.location = new HoodieRecordLocation("t0", fileId2);
smallFile2.sizeBytes = 123;
String fileId3 = mockBucketAssigner2.createFileIdOfThisTask();
SmallFile smallFile3 = new SmallFile();
smallFile3.location = new HoodieRecordLocation("t0", fileId3);
smallFile3.sizeBytes = 456;
List<SmallFile> smallFiles2 = mockBucketAssigner1.smallFilesOfThisTask(Arrays.asList(smallFile2, smallFile3));
assertThat(smallFiles2.size(), is(2));
}
@Test @Test
public void testAddUpdate() { public void testAddUpdate() {
MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig); MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig);
@@ -200,11 +229,11 @@ public class TestBucketAssigner {
MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(0, 2, context, writeConfig, smallFilesMap); MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(0, 2, context, writeConfig, smallFilesMap);
BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1"); BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1");
assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f2");
mockBucketAssigner.addInsert("par1"); mockBucketAssigner.addInsert("par1");
bucketInfo = mockBucketAssigner.addInsert("par1"); bucketInfo = mockBucketAssigner.addInsert("par1");
assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f2");
bucketInfo = mockBucketAssigner.addInsert("par3"); bucketInfo = mockBucketAssigner.addInsert("par3");
assertBucketEquals(bucketInfo, "par3", BucketType.INSERT); assertBucketEquals(bucketInfo, "par3", BucketType.INSERT);
@@ -214,11 +243,11 @@ public class TestBucketAssigner {
MockBucketAssigner mockBucketAssigner2 = new MockBucketAssigner(1, 2, context, writeConfig, smallFilesMap); MockBucketAssigner mockBucketAssigner2 = new MockBucketAssigner(1, 2, context, writeConfig, smallFilesMap);
BucketInfo bucketInfo2 = mockBucketAssigner2.addInsert("par1"); BucketInfo bucketInfo2 = mockBucketAssigner2.addInsert("par1");
assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f0");
mockBucketAssigner2.addInsert("par1"); mockBucketAssigner2.addInsert("par1");
bucketInfo2 = mockBucketAssigner2.addInsert("par1"); bucketInfo2 = mockBucketAssigner2.addInsert("par1");
assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f0");
bucketInfo2 = mockBucketAssigner2.addInsert("par3"); bucketInfo2 = mockBucketAssigner2.addInsert("par3");
assertBucketEquals(bucketInfo2, "par3", BucketType.INSERT); assertBucketEquals(bucketInfo2, "par3", BucketType.INSERT);
@@ -292,34 +321,34 @@ public class TestBucketAssigner {
mockBucketAssigner.addUpdate("par1", "f0"); mockBucketAssigner.addUpdate("par1", "f0");
BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1"); BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1");
assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f2");
mockBucketAssigner.addInsert("par1"); mockBucketAssigner.addInsert("par1");
bucketInfo = mockBucketAssigner.addInsert("par1"); bucketInfo = mockBucketAssigner.addInsert("par1");
assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f2");
mockBucketAssigner.addUpdate("par1", "f2"); mockBucketAssigner.addUpdate("par1", "f2");
mockBucketAssigner.addInsert("par1"); mockBucketAssigner.addInsert("par1");
bucketInfo = mockBucketAssigner.addInsert("par1"); bucketInfo = mockBucketAssigner.addInsert("par1");
assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f2");
MockBucketAssigner mockBucketAssigner2 = new MockBucketAssigner(1, 2, context, writeConfig, smallFilesMap); MockBucketAssigner mockBucketAssigner2 = new MockBucketAssigner(1, 2, context, writeConfig, smallFilesMap);
mockBucketAssigner2.addUpdate("par1", "f0"); mockBucketAssigner2.addUpdate("par1", "f0");
BucketInfo bucketInfo2 = mockBucketAssigner2.addInsert("par1"); BucketInfo bucketInfo2 = mockBucketAssigner2.addInsert("par1");
assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f0");
mockBucketAssigner2.addInsert("par1"); mockBucketAssigner2.addInsert("par1");
bucketInfo2 = mockBucketAssigner2.addInsert("par1"); bucketInfo2 = mockBucketAssigner2.addInsert("par1");
assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f0");
mockBucketAssigner2.addUpdate("par1", "f2"); mockBucketAssigner2.addUpdate("par1", "f2");
mockBucketAssigner2.addInsert("par1"); mockBucketAssigner2.addInsert("par1");
bucketInfo2 = mockBucketAssigner2.addInsert("par1"); bucketInfo2 = mockBucketAssigner2.addInsert("par1");
assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f0");
} }
@Test @Test
@@ -425,7 +454,7 @@ public class TestBucketAssigner {
HoodieFlinkEngineContext context, HoodieFlinkEngineContext context,
HoodieWriteConfig config, HoodieWriteConfig config,
Map<String, List<SmallFile>> smallFilesMap) { Map<String, List<SmallFile>> smallFilesMap) {
super(taskID, numTasks, new MockWriteProfile(config, context, smallFilesMap), config); super(taskID, 1024, numTasks, new MockWriteProfile(config, context, smallFilesMap), config);
} }
} }