[HUDI-3539] Flink bucket index bucketID bootstrap optimization. (#5093)
* [HUDI-3539] Flink bucket index bucketID bootstrap optimization. Co-authored-by: gengxiaoyu <gengxiaoyu@bytedance.com>
This commit is contained in:
@@ -40,6 +40,8 @@ import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
@@ -67,16 +69,22 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
||||
private String indexKeyFields;
|
||||
|
||||
/**
|
||||
* BucketID to file group mapping.
|
||||
* BucketID should be load in this task.
|
||||
*/
|
||||
private HashMap<String, String> bucketIndex;
|
||||
private Set<Integer> bucketToLoad;
|
||||
|
||||
/**
|
||||
* BucketID to file group mapping in each partition.
|
||||
* Map(partition -> Map(bucketId, fileID)).
|
||||
*/
|
||||
private Map<String, Map<Integer, String>> bucketIndex;
|
||||
|
||||
/**
|
||||
* Incremental bucket index of the current checkpoint interval,
|
||||
* it is needed because the bucket type('I' or 'U') should be decided based on the committed files view,
|
||||
* all the records in one bucket should have the same bucket type.
|
||||
*/
|
||||
private HashMap<String, String> incBucketIndex;
|
||||
private Set<String> incBucketIndex;
|
||||
|
||||
/**
|
||||
* Constructs a BucketStreamWriteFunction.
|
||||
@@ -95,9 +103,10 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
||||
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
|
||||
this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
|
||||
this.maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
|
||||
this.bucketToLoad = new HashSet<>();
|
||||
this.bucketIndex = new HashMap<>();
|
||||
this.incBucketIndex = new HashMap<>();
|
||||
bootstrapIndex();
|
||||
this.incBucketIndex = new HashSet<>();
|
||||
getBucketToLoad();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -109,7 +118,6 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
||||
@Override
|
||||
public void snapshotState() {
|
||||
super.snapshotState();
|
||||
this.bucketIndex.putAll(this.incBucketIndex);
|
||||
this.incBucketIndex.clear();
|
||||
}
|
||||
|
||||
@@ -117,17 +125,23 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
||||
public void processElement(I i, ProcessFunction<I, Object>.Context context, Collector<Object> collector) throws Exception {
|
||||
HoodieRecord<?> record = (HoodieRecord<?>) i;
|
||||
final HoodieKey hoodieKey = record.getKey();
|
||||
final String partition = hoodieKey.getPartitionPath();
|
||||
final HoodieRecordLocation location;
|
||||
|
||||
bootstrapIndexIfNeed(partition);
|
||||
Map<Integer, String> bucketToFileIdMap = bucketIndex.get(partition);
|
||||
final int bucketNum = BucketIdentifier.getBucketId(hoodieKey, indexKeyFields, this.bucketNum);
|
||||
final String partitionBucketId = BucketIdentifier.partitionBucketIdStr(hoodieKey.getPartitionPath(), bucketNum);
|
||||
|
||||
if (bucketIndex.containsKey(partitionBucketId)) {
|
||||
location = new HoodieRecordLocation("U", bucketIndex.get(partitionBucketId));
|
||||
if (incBucketIndex.contains(partitionBucketId)) {
|
||||
location = new HoodieRecordLocation("I", bucketToFileIdMap.get(bucketNum));
|
||||
} else if (bucketToFileIdMap.containsKey(bucketNum)) {
|
||||
location = new HoodieRecordLocation("U", bucketToFileIdMap.get(bucketNum));
|
||||
} else {
|
||||
String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
|
||||
location = new HoodieRecordLocation("I", newFileId);
|
||||
incBucketIndex.put(partitionBucketId, newFileId);
|
||||
bucketToFileIdMap.put(bucketNum,newFileId);
|
||||
incBucketIndex.add(partitionBucketId);
|
||||
}
|
||||
record.unseal();
|
||||
record.setCurrentLocation(location);
|
||||
@@ -136,17 +150,10 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get partition_bucket -> fileID mapping from the existing hudi table.
|
||||
* This is a required operation for each restart to avoid having duplicate file ids for one bucket.
|
||||
* Bootstrap bucket info from existing file system,
|
||||
* bucketNum % totalParallelism == this taskID belongs to this task.
|
||||
*/
|
||||
private void bootstrapIndex() throws IOException {
|
||||
Option<HoodieInstant> latestCommitTime = table.getFileSystemView().getTimeline().filterCompletedInstants().lastInstant();
|
||||
if (!latestCommitTime.isPresent()) {
|
||||
return;
|
||||
}
|
||||
// bootstrap bucket info from existing file system
|
||||
// bucketNum % totalParallelism == this taskID belongs to this task
|
||||
HashSet<Integer> bucketToLoad = new HashSet<>();
|
||||
private void getBucketToLoad() {
|
||||
for (int i = 0; i < bucketNum; i++) {
|
||||
int partitionOfBucket = BucketIdentifier.mod(i, parallelism);
|
||||
if (partitionOfBucket == taskID) {
|
||||
@@ -157,31 +164,41 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
||||
}
|
||||
}
|
||||
bucketToLoad.forEach(bucket -> LOG.info(String.format("bucketToLoad contains %s", bucket)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get partition_bucket -> fileID mapping from the existing hudi table.
|
||||
* This is a required operation for each restart to avoid having duplicate file ids for one bucket.
|
||||
*/
|
||||
private void bootstrapIndexIfNeed(String partition) throws IOException {
|
||||
if (bucketIndex.containsKey(partition)) {
|
||||
return;
|
||||
}
|
||||
Option<HoodieInstant> latestCommitTime = table.getHoodieView().getTimeline().filterCompletedInstants().lastInstant();
|
||||
if (!latestCommitTime.isPresent()) {
|
||||
bucketIndex.put(partition, new HashMap<>());
|
||||
return;
|
||||
}
|
||||
LOG.info(String.format("Loading Hoodie Table %s, with path %s", table.getMetaClient().getTableConfig().getTableName(),
|
||||
table.getMetaClient().getBasePath()));
|
||||
table.getMetaClient().getBasePath() + "/" + partition));
|
||||
|
||||
// Iterate through all existing partitions to load existing fileID belongs to this task
|
||||
List<String> partitions = table.getMetadata().getAllPartitionPaths();
|
||||
for (String partitionPath : partitions) {
|
||||
List<FileSlice> latestFileSlices = table.getSliceView()
|
||||
.getLatestFileSlices(partitionPath)
|
||||
.collect(toList());
|
||||
for (FileSlice fileslice : latestFileSlices) {
|
||||
String fileID = fileslice.getFileId();
|
||||
int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID);
|
||||
if (bucketToLoad.contains(bucketNumber)) {
|
||||
String partitionBucketId = BucketIdentifier.partitionBucketIdStr(partitionPath, bucketNumber);
|
||||
LOG.info(String.format("Should load this partition bucket %s with fileID %s", partitionBucketId, fileID));
|
||||
if (bucketIndex.containsKey(partitionBucketId)) {
|
||||
throw new RuntimeException(String.format("Duplicate fileID %s from partitionBucket %s found "
|
||||
+ "during the BucketStreamWriteFunction index bootstrap.", fileID, partitionBucketId));
|
||||
} else {
|
||||
LOG.info(String.format("Adding fileID %s to the partition bucket %s.", fileID, partitionBucketId));
|
||||
bucketIndex.put(partitionBucketId, fileID);
|
||||
}
|
||||
// Load existing fileID belongs to this task
|
||||
Map<Integer, String> bucketToFileIDMap = new HashMap<>();
|
||||
List<FileSlice> fileSlices = table.getHoodieView().getLatestFileSlices(partition).collect(toList());
|
||||
for (FileSlice fileSlice : fileSlices) {
|
||||
String fileID = fileSlice.getFileId();
|
||||
int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID);
|
||||
if (bucketToLoad.contains(bucketNumber)) {
|
||||
LOG.info(String.format("Should load this partition bucket %s with fileID %s", bucketNumber, fileID));
|
||||
if (bucketToFileIDMap.containsKey(bucketNumber)) {
|
||||
throw new RuntimeException(String.format("Duplicate fileID %s from bucket %s of partition %s found "
|
||||
+ "during the BucketStreamWriteFunction index bootstrap.", fileID, bucketNumber, partition));
|
||||
} else {
|
||||
LOG.info(String.format("Adding fileID %s to the bucket %s of partition %s.", fileID, bucketNumber, partition));
|
||||
bucketToFileIDMap.put(bucketNumber, fileID);
|
||||
}
|
||||
}
|
||||
}
|
||||
bucketIndex.put(partition, bucketToFileIDMap);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user