1
0

[HUDI-108] Removing 2GB spark partition limitations in HoodieBloomIndex with spark 2.4.4 (#1315)

This commit is contained in:
Sivabalan Narayanan
2020-02-18 11:12:20 -08:00
committed by GitHub
parent b8f9d0ec45
commit 00493235f5

View File

@@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.MetadataNotFoundException; import org.apache.hudi.exception.MetadataNotFoundException;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
@@ -59,13 +58,7 @@ import static java.util.stream.Collectors.toList;
*/ */
public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> { public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
// we need to limit the join such that it stays within 1.5GB per Spark partition. (SPARK-1476)
private static final int SPARK_MAXIMUM_BYTES_PER_PARTITION = 1500 * 1024 * 1024;
// this is how much a triplet of (partitionPath, fileId, recordKey) costs.
private static final int BYTES_PER_PARTITION_FILE_KEY_TRIPLET = 300;
private static final Logger LOG = LogManager.getLogger(HoodieBloomIndex.class); private static final Logger LOG = LogManager.getLogger(HoodieBloomIndex.class);
private static int MAX_ITEMS_PER_SHUFFLE_PARTITION =
SPARK_MAXIMUM_BYTES_PER_PARTITION / BYTES_PER_PARTITION_FILE_KEY_TRIPLET;
public HoodieBloomIndex(HoodieWriteConfig config) { public HoodieBloomIndex(HoodieWriteConfig config) {
super(config); super(config);
@@ -73,7 +66,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
@Override @Override
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc, public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
HoodieTable<T> hoodieTable) { HoodieTable<T> hoodieTable) {
// Step 0: cache the input record RDD // Step 0: cache the input record RDD
if (config.getBloomIndexUseCaching()) { if (config.getBloomIndexUseCaching()) {
@@ -112,13 +105,13 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
* Returns an RDD mapping each HoodieKey with a partitionPath/fileID which contains it. Option.Empty if the key is not * Returns an RDD mapping each HoodieKey with a partitionPath/fileID which contains it. Option.Empty if the key is not
* found. * found.
* *
* @param hoodieKeys keys to lookup * @param hoodieKeys keys to lookup
* @param jsc spark context * @param jsc spark context
* @param hoodieTable hoodie table object * @param hoodieTable hoodie table object
*/ */
@Override @Override
public JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys, public JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
JavaSparkContext jsc, HoodieTable<T> hoodieTable) { JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
JavaPairRDD<String, String> partitionRecordKeyPairRDD = JavaPairRDD<String, String> partitionRecordKeyPairRDD =
hoodieKeys.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey())); hoodieKeys.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey()));
@@ -159,8 +152,10 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
// that contains it. // that contains it.
Map<String, Long> comparisonsPerFileGroup = Map<String, Long> comparisonsPerFileGroup =
computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD); computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD);
int safeParallelism = computeSafeParallelism(recordsPerPartition, comparisonsPerFileGroup); int inputParallelism = partitionRecordKeyPairRDD.partitions().size();
int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), safeParallelism); int joinParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism());
LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${"
+ config.getBloomIndexParallelism() + "}");
return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, hoodieTable, return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, hoodieTable,
comparisonsPerFileGroup); comparisonsPerFileGroup);
} }
@@ -169,8 +164,8 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
* Compute the estimated number of bloom filter comparisons to be performed on each file group. * Compute the estimated number of bloom filter comparisons to be performed on each file group.
*/ */
private Map<String, Long> computeComparisonsPerFileGroup(final Map<String, Long> recordsPerPartition, private Map<String, Long> computeComparisonsPerFileGroup(final Map<String, Long> recordsPerPartition,
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo, final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD) { JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
Map<String, Long> fileToComparisons; Map<String, Long> fileToComparisons;
if (config.getBloomIndexPruneByRanges()) { if (config.getBloomIndexPruneByRanges()) {
@@ -190,53 +185,12 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
return fileToComparisons; return fileToComparisons;
} }
/**
* Compute the minimum parallelism needed to play well with the spark 2GB limitation.. The index lookup can be skewed
* in three dimensions : #files, #partitions, #records
* <p>
* To be able to smoothly handle skews, we need to compute how to split each partitions into subpartitions. We do it
* here, in a way that keeps the amount of each Spark join partition to < 2GB.
* <p>
* If {@link HoodieIndexConfig#BLOOM_INDEX_PARALLELISM_PROP} is specified as a NON-zero number, then that is used
* explicitly.
*/
int computeSafeParallelism(Map<String, Long> recordsPerPartition, Map<String, Long> comparisonsPerFileGroup) {
long totalComparisons = comparisonsPerFileGroup.values().stream().mapToLong(Long::longValue).sum();
long totalFiles = comparisonsPerFileGroup.size();
long totalRecords = recordsPerPartition.values().stream().mapToLong(Long::longValue).sum();
int parallelism = (int) (totalComparisons / MAX_ITEMS_PER_SHUFFLE_PARTITION + 1);
LOG.info(String.format(
"TotalRecords %d, TotalFiles %d, TotalAffectedPartitions %d, TotalComparisons %d, SafeParallelism %d",
totalRecords, totalFiles, recordsPerPartition.size(), totalComparisons, parallelism));
return parallelism;
}
/**
* Its crucial to pick the right parallelism.
* <p>
* totalSubPartitions : this is deemed safe limit, to be nice with Spark. inputParallelism : typically number of input
* file splits
* <p>
* We pick the max such that, we are always safe, but go higher if say a there are a lot of input files. (otherwise,
* we will fallback to number of partitions in input and end up with slow performance)
*/
private int determineParallelism(int inputParallelism, int totalSubPartitions) {
// If bloom index parallelism is set, use it to to check against the input parallelism and
// take the max
int indexParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism());
int joinParallelism = Math.max(totalSubPartitions, indexParallelism);
LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${"
+ config.getBloomIndexParallelism() + "}, TotalSubParts: ${" + totalSubPartitions + "}, "
+ "Join Parallelism set to : " + joinParallelism);
return joinParallelism;
}
/** /**
* Load all involved files as <Partition, filename> pair RDD. * Load all involved files as <Partition, filename> pair RDD.
*/ */
@VisibleForTesting @VisibleForTesting
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc, List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc,
final HoodieTable hoodieTable) { final HoodieTable hoodieTable) {
// Obtain the latest data files from all the partitions. // Obtain the latest data files from all the partitions.
List<Pair<String, String>> partitionPathFileIDList = List<Pair<String, String>> partitionPathFileIDList =
@@ -304,7 +258,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
* For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be * For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be
* checked. For tables, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files * checked. For tables, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files
* to be compared gets cut down a lot from range pruning. * to be compared gets cut down a lot from range pruning.
* * <p>
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on * Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
* recordKey ranges in the index info. * recordKey ranges in the index info.
*/ */
@@ -392,7 +346,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
@Override @Override
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc, public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
HoodieTable<T> hoodieTable) { HoodieTable<T> hoodieTable) {
return writeStatusRDD; return writeStatusRDD;
} }
} }