From 00493235f5f2b14709199dd6ef02bad9c0d3bab9 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Tue, 18 Feb 2020 11:12:20 -0800 Subject: [PATCH] [HUDI-108] Removing 2GB spark partition limitations in HoodieBloomIndex with spark 2.4.4 (#1315) --- .../hudi/index/bloom/HoodieBloomIndex.java | 72 ++++--------------- 1 file changed, 13 insertions(+), 59 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java index d95a89129..a6d46d8e4 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java @@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.MetadataNotFoundException; import org.apache.hudi.index.HoodieIndex; @@ -59,13 +58,7 @@ import static java.util.stream.Collectors.toList; */ public class HoodieBloomIndex extends HoodieIndex { - // we need to limit the join such that it stays within 1.5GB per Spark partition. (SPARK-1476) - private static final int SPARK_MAXIMUM_BYTES_PER_PARTITION = 1500 * 1024 * 1024; - // this is how much a triplet of (partitionPath, fileId, recordKey) costs. - private static final int BYTES_PER_PARTITION_FILE_KEY_TRIPLET = 300; private static final Logger LOG = LogManager.getLogger(HoodieBloomIndex.class); - private static int MAX_ITEMS_PER_SHUFFLE_PARTITION = - SPARK_MAXIMUM_BYTES_PER_PARTITION / BYTES_PER_PARTITION_FILE_KEY_TRIPLET; public HoodieBloomIndex(HoodieWriteConfig config) { super(config); @@ -73,7 +66,7 @@ public class HoodieBloomIndex extends HoodieIndex @Override public JavaRDD> tagLocation(JavaRDD> recordRDD, JavaSparkContext jsc, - HoodieTable hoodieTable) { + HoodieTable hoodieTable) { // Step 0: cache the input record RDD if (config.getBloomIndexUseCaching()) { @@ -112,13 +105,13 @@ public class HoodieBloomIndex extends HoodieIndex * Returns an RDD mapping each HoodieKey with a partitionPath/fileID which contains it. Option.Empty if the key is not * found. * - * @param hoodieKeys keys to lookup - * @param jsc spark context + * @param hoodieKeys keys to lookup + * @param jsc spark context * @param hoodieTable hoodie table object */ @Override public JavaPairRDD>> fetchRecordLocation(JavaRDD hoodieKeys, - JavaSparkContext jsc, HoodieTable hoodieTable) { + JavaSparkContext jsc, HoodieTable hoodieTable) { JavaPairRDD partitionRecordKeyPairRDD = hoodieKeys.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey())); @@ -159,8 +152,10 @@ public class HoodieBloomIndex extends HoodieIndex // that contains it. Map comparisonsPerFileGroup = computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD); - int safeParallelism = computeSafeParallelism(recordsPerPartition, comparisonsPerFileGroup); - int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), safeParallelism); + int inputParallelism = partitionRecordKeyPairRDD.partitions().size(); + int joinParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism()); + LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${" + + config.getBloomIndexParallelism() + "}"); return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, hoodieTable, comparisonsPerFileGroup); } @@ -169,8 +164,8 @@ public class HoodieBloomIndex extends HoodieIndex * Compute the estimated number of bloom filter comparisons to be performed on each file group. */ private Map computeComparisonsPerFileGroup(final Map recordsPerPartition, - final Map> partitionToFileInfo, - JavaPairRDD partitionRecordKeyPairRDD) { + final Map> partitionToFileInfo, + JavaPairRDD partitionRecordKeyPairRDD) { Map fileToComparisons; if (config.getBloomIndexPruneByRanges()) { @@ -190,53 +185,12 @@ public class HoodieBloomIndex extends HoodieIndex return fileToComparisons; } - /** - * Compute the minimum parallelism needed to play well with the spark 2GB limitation.. The index lookup can be skewed - * in three dimensions : #files, #partitions, #records - *

- * To be able to smoothly handle skews, we need to compute how to split each partitions into subpartitions. We do it - * here, in a way that keeps the amount of each Spark join partition to < 2GB. - *

- * If {@link HoodieIndexConfig#BLOOM_INDEX_PARALLELISM_PROP} is specified as a NON-zero number, then that is used - * explicitly. - */ - int computeSafeParallelism(Map recordsPerPartition, Map comparisonsPerFileGroup) { - long totalComparisons = comparisonsPerFileGroup.values().stream().mapToLong(Long::longValue).sum(); - long totalFiles = comparisonsPerFileGroup.size(); - long totalRecords = recordsPerPartition.values().stream().mapToLong(Long::longValue).sum(); - int parallelism = (int) (totalComparisons / MAX_ITEMS_PER_SHUFFLE_PARTITION + 1); - LOG.info(String.format( - "TotalRecords %d, TotalFiles %d, TotalAffectedPartitions %d, TotalComparisons %d, SafeParallelism %d", - totalRecords, totalFiles, recordsPerPartition.size(), totalComparisons, parallelism)); - return parallelism; - } - - /** - * Its crucial to pick the right parallelism. - *

- * totalSubPartitions : this is deemed safe limit, to be nice with Spark. inputParallelism : typically number of input - * file splits - *

- * We pick the max such that, we are always safe, but go higher if say a there are a lot of input files. (otherwise, - * we will fallback to number of partitions in input and end up with slow performance) - */ - private int determineParallelism(int inputParallelism, int totalSubPartitions) { - // If bloom index parallelism is set, use it to to check against the input parallelism and - // take the max - int indexParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism()); - int joinParallelism = Math.max(totalSubPartitions, indexParallelism); - LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${" - + config.getBloomIndexParallelism() + "}, TotalSubParts: ${" + totalSubPartitions + "}, " - + "Join Parallelism set to : " + joinParallelism); - return joinParallelism; - } - /** * Load all involved files as pair RDD. */ @VisibleForTesting List> loadInvolvedFiles(List partitions, final JavaSparkContext jsc, - final HoodieTable hoodieTable) { + final HoodieTable hoodieTable) { // Obtain the latest data files from all the partitions. List> partitionPathFileIDList = @@ -304,7 +258,7 @@ public class HoodieBloomIndex extends HoodieIndex * For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be * checked. For tables, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files * to be compared gets cut down a lot from range pruning. - * + *

* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on * recordKey ranges in the index info. */ @@ -392,7 +346,7 @@ public class HoodieBloomIndex extends HoodieIndex @Override public JavaRDD updateLocation(JavaRDD writeStatusRDD, JavaSparkContext jsc, - HoodieTable hoodieTable) { + HoodieTable hoodieTable) { return writeStatusRDD; } }