[HUDI-4101] When BucketIndexPartitioner take partition path for dispersion may cause the fileID of the task to not be loaded correctly (#5763)
Co-authored-by: john.wick <john.wick@vipshop.com>
This commit is contained in:
@@ -57,11 +57,6 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
||||
|
||||
private String indexKeyFields;
|
||||
|
||||
/**
|
||||
* BucketID should be loaded in this task.
|
||||
*/
|
||||
private Set<Integer> bucketToLoad;
|
||||
|
||||
/**
|
||||
* BucketID to file group mapping in each partition.
|
||||
* Map(partition -> Map(bucketId, fileID)).
|
||||
@@ -91,7 +86,6 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
||||
this.indexKeyFields = config.getString(FlinkOptions.INDEX_KEY_FIELD);
|
||||
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
|
||||
this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
|
||||
this.bucketToLoad = getBucketToLoad();
|
||||
this.bucketIndex = new HashMap<>();
|
||||
this.incBucketIndex = new HashSet<>();
|
||||
}
|
||||
@@ -136,19 +130,12 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
||||
}
|
||||
|
||||
/**
|
||||
* Bootstrap bucket info from existing file system,
|
||||
* bucketNum % totalParallelism == this taskID belongs to this task.
|
||||
* Determine whether the current fileID belongs to the current task.
|
||||
* (partition + curBucket) % numPartitions == this taskID belongs to this task.
|
||||
*/
|
||||
private Set<Integer> getBucketToLoad() {
|
||||
Set<Integer> bucketToLoad = new HashSet<>();
|
||||
for (int i = 0; i < bucketNum; i++) {
|
||||
int partitionOfBucket = BucketIdentifier.mod(i, parallelism);
|
||||
if (partitionOfBucket == taskID) {
|
||||
bucketToLoad.add(i);
|
||||
}
|
||||
}
|
||||
LOG.info("Bucket number that belongs to task [{}/{}]: {}", taskID, parallelism, bucketToLoad);
|
||||
return bucketToLoad;
|
||||
public boolean isBucketToLoad(int bucketNumber, String partition) {
|
||||
int globalHash = ((partition + bucketNumber).hashCode()) & Integer.MAX_VALUE;
|
||||
return BucketIdentifier.mod(globalHash, parallelism) == taskID;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -167,7 +154,7 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
||||
this.writeClient.getHoodieTable().getFileSystemView().getAllFileGroups(partition).forEach(fileGroup -> {
|
||||
String fileID = fileGroup.getFileGroupId().getFileId();
|
||||
int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID);
|
||||
if (bucketToLoad.contains(bucketNumber)) {
|
||||
if (isBucketToLoad(bucketNumber, partition)) {
|
||||
LOG.info(String.format("Should load this partition bucket %s with fileID %s", bucketNumber, fileID));
|
||||
if (bucketToFileIDMap.containsKey(bucketNumber)) {
|
||||
throw new RuntimeException(String.format("Duplicate fileID %s from bucket %s of partition %s found "
|
||||
|
||||
Reference in New Issue
Block a user