From 2e2d08cb7260e3316fdc9116c4061aed0ceb5214 Mon Sep 17 00:00:00 2001 From: Shawy Geng Date: Mon, 28 Mar 2022 19:50:36 +0800 Subject: [PATCH] [HUDI-3539] Flink bucket index bucketID bootstrap optimization. (#5093) * [HUDI-3539] Flink bucket index bucketID bootstrap optimization. Co-authored-by: gengxiaoyu --- .../bucket/BucketStreamWriteFunction.java | 95 +++++++++++-------- 1 file changed, 56 insertions(+), 39 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java index e53d2b240..7e4cf686c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java @@ -40,6 +40,8 @@ import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import static java.util.stream.Collectors.toList; @@ -67,16 +69,22 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { private String indexKeyFields; /** - * BucketID to file group mapping. + * BucketID should be load in this task. */ - private HashMap bucketIndex; + private Set bucketToLoad; + + /** + * BucketID to file group mapping in each partition. + * Map(partition -> Map(bucketId, fileID)). + */ + private Map> bucketIndex; /** * Incremental bucket index of the current checkpoint interval, * it is needed because the bucket type('I' or 'U') should be decided based on the committed files view, * all the records in one bucket should have the same bucket type. */ - private HashMap incBucketIndex; + private Set incBucketIndex; /** * Constructs a BucketStreamWriteFunction. @@ -95,9 +103,10 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); this.maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks(); + this.bucketToLoad = new HashSet<>(); this.bucketIndex = new HashMap<>(); - this.incBucketIndex = new HashMap<>(); - bootstrapIndex(); + this.incBucketIndex = new HashSet<>(); + getBucketToLoad(); } @Override @@ -109,7 +118,6 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { @Override public void snapshotState() { super.snapshotState(); - this.bucketIndex.putAll(this.incBucketIndex); this.incBucketIndex.clear(); } @@ -117,17 +125,23 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { public void processElement(I i, ProcessFunction.Context context, Collector collector) throws Exception { HoodieRecord record = (HoodieRecord) i; final HoodieKey hoodieKey = record.getKey(); + final String partition = hoodieKey.getPartitionPath(); final HoodieRecordLocation location; + bootstrapIndexIfNeed(partition); + Map bucketToFileIdMap = bucketIndex.get(partition); final int bucketNum = BucketIdentifier.getBucketId(hoodieKey, indexKeyFields, this.bucketNum); final String partitionBucketId = BucketIdentifier.partitionBucketIdStr(hoodieKey.getPartitionPath(), bucketNum); - if (bucketIndex.containsKey(partitionBucketId)) { - location = new HoodieRecordLocation("U", bucketIndex.get(partitionBucketId)); + if (incBucketIndex.contains(partitionBucketId)) { + location = new HoodieRecordLocation("I", bucketToFileIdMap.get(bucketNum)); + } else if (bucketToFileIdMap.containsKey(bucketNum)) { + location = new HoodieRecordLocation("U", bucketToFileIdMap.get(bucketNum)); } else { String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum); location = new HoodieRecordLocation("I", newFileId); - incBucketIndex.put(partitionBucketId, newFileId); + bucketToFileIdMap.put(bucketNum,newFileId); + incBucketIndex.add(partitionBucketId); } record.unseal(); record.setCurrentLocation(location); @@ -136,17 +150,10 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { } /** - * Get partition_bucket -> fileID mapping from the existing hudi table. - * This is a required operation for each restart to avoid having duplicate file ids for one bucket. + * Bootstrap bucket info from existing file system, + * bucketNum % totalParallelism == this taskID belongs to this task. */ - private void bootstrapIndex() throws IOException { - Option latestCommitTime = table.getFileSystemView().getTimeline().filterCompletedInstants().lastInstant(); - if (!latestCommitTime.isPresent()) { - return; - } - // bootstrap bucket info from existing file system - // bucketNum % totalParallelism == this taskID belongs to this task - HashSet bucketToLoad = new HashSet<>(); + private void getBucketToLoad() { for (int i = 0; i < bucketNum; i++) { int partitionOfBucket = BucketIdentifier.mod(i, parallelism); if (partitionOfBucket == taskID) { @@ -157,31 +164,41 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { } } bucketToLoad.forEach(bucket -> LOG.info(String.format("bucketToLoad contains %s", bucket))); + } + /** + * Get partition_bucket -> fileID mapping from the existing hudi table. + * This is a required operation for each restart to avoid having duplicate file ids for one bucket. + */ + private void bootstrapIndexIfNeed(String partition) throws IOException { + if (bucketIndex.containsKey(partition)) { + return; + } + Option latestCommitTime = table.getHoodieView().getTimeline().filterCompletedInstants().lastInstant(); + if (!latestCommitTime.isPresent()) { + bucketIndex.put(partition, new HashMap<>()); + return; + } LOG.info(String.format("Loading Hoodie Table %s, with path %s", table.getMetaClient().getTableConfig().getTableName(), - table.getMetaClient().getBasePath())); + table.getMetaClient().getBasePath() + "/" + partition)); - // Iterate through all existing partitions to load existing fileID belongs to this task - List partitions = table.getMetadata().getAllPartitionPaths(); - for (String partitionPath : partitions) { - List latestFileSlices = table.getSliceView() - .getLatestFileSlices(partitionPath) - .collect(toList()); - for (FileSlice fileslice : latestFileSlices) { - String fileID = fileslice.getFileId(); - int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID); - if (bucketToLoad.contains(bucketNumber)) { - String partitionBucketId = BucketIdentifier.partitionBucketIdStr(partitionPath, bucketNumber); - LOG.info(String.format("Should load this partition bucket %s with fileID %s", partitionBucketId, fileID)); - if (bucketIndex.containsKey(partitionBucketId)) { - throw new RuntimeException(String.format("Duplicate fileID %s from partitionBucket %s found " - + "during the BucketStreamWriteFunction index bootstrap.", fileID, partitionBucketId)); - } else { - LOG.info(String.format("Adding fileID %s to the partition bucket %s.", fileID, partitionBucketId)); - bucketIndex.put(partitionBucketId, fileID); - } + // Load existing fileID belongs to this task + Map bucketToFileIDMap = new HashMap<>(); + List fileSlices = table.getHoodieView().getLatestFileSlices(partition).collect(toList()); + for (FileSlice fileSlice : fileSlices) { + String fileID = fileSlice.getFileId(); + int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID); + if (bucketToLoad.contains(bucketNumber)) { + LOG.info(String.format("Should load this partition bucket %s with fileID %s", bucketNumber, fileID)); + if (bucketToFileIDMap.containsKey(bucketNumber)) { + throw new RuntimeException(String.format("Duplicate fileID %s from bucket %s of partition %s found " + + "during the BucketStreamWriteFunction index bootstrap.", fileID, bucketNumber, partition)); + } else { + LOG.info(String.format("Adding fileID %s to the bucket %s of partition %s.", fileID, bucketNumber, partition)); + bucketToFileIDMap.put(bucketNumber, fileID); } } } + bucketIndex.put(partition, bucketToFileIDMap); } }