1
0

Bucketized Bloom Filter checking

- Tackles the skew seen in sort based partitioning/checking
 - Parameterized the HoodieBloomIndex test
 - Config to turn on/off (on by default)
 - Unit tests & also tested at scale
This commit is contained in:
Vinoth Chandar
2019-05-08 20:20:58 -07:00
committed by vinoth chandar
parent 4b27cc72bb
commit a0e62b7919
7 changed files with 466 additions and 159 deletions

View File

@@ -47,6 +47,15 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
public static final String DEFAULT_BLOOM_INDEX_USE_CACHING = "true";
public static final String BLOOM_INDEX_TREE_BASED_FILTER_PROP = "hoodie.bloom.index.use.treebased.filter";
public static final String DEFAULT_BLOOM_INDEX_TREE_BASED_FILTER = "true";
// TODO: On by default. Once stable, we will remove the other mode.
public static final String BLOOM_INDEX_BUCKETIZED_CHECKING_PROP = "hoodie.bloom.index.bucketized.checking";
public static final String DEFAULT_BLOOM_INDEX_BUCKETIZED_CHECKING = "true";
// 1B bloom filter checks happen in 250 seconds. 500ms to read a bloom filter.
// 10M checks in 2500ms, thus amortizing the cost of reading bloom filter across partitions.
public static final String BLOOM_INDEX_KEYS_PER_BUCKET_PROP = "hoodie.bloom.index.keys.per.bucket";
public static final String DEFAULT_BLOOM_INDEX_KEYS_PER_BUCKET = "10000000";
public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL =
"hoodie.bloom.index.input.storage" + ".level";
public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
@@ -118,6 +127,16 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
return this;
}
public Builder bloomIndexBucketizedChecking(boolean bucketizedChecking) {
props.setProperty(BLOOM_INDEX_BUCKETIZED_CHECKING_PROP, String.valueOf(bucketizedChecking));
return this;
}
public Builder bloomIndexKeysPerBucket(int keysPerBucket) {
props.setProperty(BLOOM_INDEX_KEYS_PER_BUCKET_PROP, String.valueOf(keysPerBucket));
return this;
}
public Builder withBloomIndexInputStorageLevel(String level) {
props.setProperty(BLOOM_INDEX_INPUT_STORAGE_LEVEL, level);
return this;
@@ -141,6 +160,10 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
BLOOM_INDEX_INPUT_STORAGE_LEVEL, DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL);
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_TREE_BASED_FILTER_PROP),
BLOOM_INDEX_TREE_BASED_FILTER_PROP, DEFAULT_BLOOM_INDEX_TREE_BASED_FILTER);
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_BUCKETIZED_CHECKING_PROP),
BLOOM_INDEX_BUCKETIZED_CHECKING_PROP, DEFAULT_BLOOM_INDEX_BUCKETIZED_CHECKING);
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_KEYS_PER_BUCKET_PROP),
BLOOM_INDEX_KEYS_PER_BUCKET_PROP, DEFAULT_BLOOM_INDEX_KEYS_PER_BUCKET);
// Throws IllegalArgumentException if the value set is not a known Hoodie Index Type
HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP));
return config;

View File

@@ -328,6 +328,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_TREE_BASED_FILTER_PROP));
}
public boolean useBloomIndexBucketizedChecking() {
return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING_PROP));
}
public int getBloomIndexKeysPerBucket() {
return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_KEYS_PER_BUCKET_PROP));
}
public StorageLevel getBloomIndexInputStorageLevel() {
return StorageLevel
.fromString(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL));

View File

@@ -0,0 +1,155 @@
/*
* Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.uber.hoodie.index.bloom;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.Hashing;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
/**
* Partitions bloom filter checks by spreading out comparisons across buckets of work.
*
* Each bucket incurs the following cost
* <pre>
* 1) Read bloom filter from file footer
* 2) Check keys against bloom filter
* 3) [Conditional] If any key had a hit, open file and check
* </pre>
*
* The partitioner performs a two phase bin packing algorithm, to pack enough work into each bucket such that cost of
* (1) & (3) is amortized. Also, avoids any skews in the sort based approach, by directly partitioning by the file to be
* checked against and ensuring each partition has similar number of buckets. Performance tests show that this approach
* could bound the amount of skew to std_dev(numberOfBucketsPerPartition) * cost of (3), lower than sort partitioning.
*
* Approach has two goals :
* <pre>
* 1) Pack as many buckets from same file group into same partition, to amortize cost of (1) and (2) further
* 2) Spread buckets across partitions evenly to achieve skew reduction
* </pre>
*/
public class BucketizedBloomCheckPartitioner extends Partitioner {
private static Logger logger = LogManager.getLogger(BucketizedBloomCheckPartitioner.class);
private int partitions;
/**
* Stores the final mapping of a file group to a list of partitions for its keys.
*/
private Map<String, List<Integer>> fileGroupToPartitions;
/**
* Create a partitioner that computes a plan based on provided workload characteristics.
*
* @param targetPartitions maximum number of partitions to target
* @param fileGroupToComparisons number of expected comparisons per file group
* @param keysPerBucket maximum number of keys to pack in a single bucket
*/
public BucketizedBloomCheckPartitioner(int targetPartitions, Map<String, Long> fileGroupToComparisons,
int keysPerBucket) {
this.fileGroupToPartitions = new HashMap<>();
Map<String, Integer> bucketsPerFileGroup = new HashMap<>();
// Compute the buckets needed per file group, using simple uniform distribution
fileGroupToComparisons.forEach((f, c) ->
bucketsPerFileGroup.put(f, (int) Math.ceil((c * 1.0) / keysPerBucket)));
int totalBuckets = bucketsPerFileGroup.values().stream().mapToInt(i -> i).sum();
// If totalBuckets > targetPartitions, no need to have extra partitions
this.partitions = Math.min(targetPartitions, totalBuckets);
// PHASE 1 : start filling upto minimum number of buckets into partitions, taking all but one bucket from each file
// This tries to first optimize for goal 1 above, with knowledge that each partition needs a certain minimum number
// of buckets and assigns buckets in the same order as file groups. If we were to simply round robin, then buckets
// for a file group is more or less guaranteed to be placed on different partitions all the time.
int minBucketsPerPartition = Math.max((int) Math.floor((1.0 * totalBuckets) / partitions), 1);
logger.info(String.format("TotalBuckets %d, min_buckets/partition %d", totalBuckets, minBucketsPerPartition));
int[] bucketsFilled = new int[partitions];
Map<String, AtomicInteger> bucketsFilledPerFileGroup = new HashMap<>();
int partitionIndex = 0;
for (Map.Entry<String, Integer> e : bucketsPerFileGroup.entrySet()) {
for (int b = 0; b < Math.max(1, e.getValue() - 1); b++) {
// keep filled counts upto date
bucketsFilled[partitionIndex]++;
AtomicInteger cnt = bucketsFilledPerFileGroup.getOrDefault(e.getKey(), new AtomicInteger(0));
cnt.incrementAndGet();
bucketsFilledPerFileGroup.put(e.getKey(), cnt);
// mark this partition against the file group
List<Integer> partitionList = this.fileGroupToPartitions.getOrDefault(e.getKey(), new ArrayList<>());
partitionList.add(partitionIndex);
this.fileGroupToPartitions.put(e.getKey(), partitionList);
// switch to new partition if needed
if (bucketsFilled[partitionIndex] >= minBucketsPerPartition) {
partitionIndex = (partitionIndex + 1) % partitions;
}
}
}
// PHASE 2 : for remaining unassigned buckets, round robin over partitions once. Since we withheld 1 bucket from
// each file group uniformly, this remaining is also an uniform mix across file groups. We just round robin to
// optimize for goal 2.
for (Map.Entry<String, Integer> e : bucketsPerFileGroup.entrySet()) {
int remaining = e.getValue() - bucketsFilledPerFileGroup.get(e.getKey()).intValue();
for (int r = 0; r < remaining; r++) {
// mark this partition against the file group
this.fileGroupToPartitions.get(e.getKey()).add(partitionIndex);
bucketsFilled[partitionIndex]++;
partitionIndex = (partitionIndex + 1) % partitions;
}
}
if (logger.isDebugEnabled()) {
logger.debug("Partitions assigned per file groups :" + fileGroupToPartitions);
StringBuilder str = new StringBuilder();
for (int i = 0; i < bucketsFilled.length; i++) {
str.append("p" + i + " : " + bucketsFilled[i] + ",");
}
logger.debug("Num buckets assigned per file group :" + str);
}
}
@Override
public int numPartitions() {
return partitions;
}
@Override
public int getPartition(Object key) {
String[] parts = ((String) key).split("#");
String fileName = parts[0];
final long hashOfKey = Hashing.md5().hashString(parts[1], StandardCharsets.UTF_8).asLong();
List<Integer> candidatePartitions = fileGroupToPartitions.get(fileName);
int idx = (int) Math.floorMod(hashOfKey, candidatePartitions.size());
assert idx >= 0;
return candidatePartitions.get(idx);
}
@VisibleForTesting
Map<String, List<Integer>> getFileGroupToPartitions() {
return fileGroupToPartitions;
}
}

View File

@@ -39,9 +39,9 @@ import com.uber.hoodie.exception.MetadataNotFoundException;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -151,53 +151,53 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
// Step 3: Obtain a RDD, for each incoming record, that already exists, with the file id,
// that contains it.
int parallelism = autoComputeParallelism(recordsPerPartition, partitionToFileInfo,
Map<String, Long> comparisonsPerFileGroup = computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo,
partitionRecordKeyPairRDD);
return findMatchingFilesForRecordKeys(partitionToFileInfo,
partitionRecordKeyPairRDD, parallelism, hoodieTable.getMetaClient());
int safeParallelism = computeSafeParallelism(recordsPerPartition, comparisonsPerFileGroup);
int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), safeParallelism);
return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism,
hoodieTable.getMetaClient(), comparisonsPerFileGroup);
}
/**
* The index lookup can be skewed in three dimensions : #files, #partitions, #records <p> To be able to smoothly
* handle skews, we need to compute how to split each partitions into subpartitions. We do it here, in a way that
* keeps the amount of each Spark join partition to < 2GB. <p> If
* {@link com.uber.hoodie.config.HoodieIndexConfig#BLOOM_INDEX_PARALLELISM_PROP}
* is specified as a NON-zero number, then that is used explicitly.
* Compute the estimated number of bloom filter comparisons to be performed on each file group
*/
private int autoComputeParallelism(final Map<String, Long> recordsPerPartition,
private Map<String, Long> computeComparisonsPerFileGroup(final Map<String, Long> recordsPerPartition,
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
long totalComparisons = 0;
Map<String, Long> fileToComparisons;
if (config.getBloomIndexPruneByRanges()) {
// we will just try exploding the input and then count to determine comparisons
totalComparisons = explodeRecordRDDWithFileComparisons(partitionToFileInfo,
partitionRecordKeyPairRDD).count();
// FIX(vc): Only do sampling here and extrapolate?
fileToComparisons = explodeRecordRDDWithFileComparisons(partitionToFileInfo,
partitionRecordKeyPairRDD).mapToPair(t -> t._2()).countByKey();
} else {
// if not pruning by ranges, then each file in a partition needs to compared against all
// records for a partition.
Map<String, Long> filesPerPartition = partitionToFileInfo.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> Long.valueOf(e.getValue().size())));
long totalFiles = 0;
long totalRecords = 0;
for (String partitionPath : recordsPerPartition.keySet()) {
long numRecords = recordsPerPartition.get(partitionPath);
long numFiles =
filesPerPartition.getOrDefault(partitionPath, 1L);
totalComparisons += numFiles * numRecords;
totalFiles +=
filesPerPartition.getOrDefault(partitionPath, 0L);
totalRecords += numRecords;
}
logger.info("TotalRecords: " + totalRecords + ", TotalFiles: " + totalFiles
+ ", TotalAffectedPartitions:" + recordsPerPartition.size());
fileToComparisons = new HashMap<>();
partitionToFileInfo.entrySet().stream().forEach(e -> {
for (BloomIndexFileInfo fileInfo : e.getValue()) {
//each file needs to be compared against all the records coming into the partition
fileToComparisons.put(fileInfo.getFileName(), recordsPerPartition.get(e.getKey()));
}
});
}
return fileToComparisons;
}
// each partition will have an item per comparison.
/**
* Compute the minimum parallelism needed to play well with the spark 2GB limitation.. The index lookup can be skewed
* in three dimensions : #files, #partitions, #records <p> To be able to smoothly handle skews, we need to compute how
* to split each partitions into subpartitions. We do it here, in a way that keeps the amount of each Spark join
* partition to < 2GB. <p> If {@link com.uber.hoodie.config.HoodieIndexConfig#BLOOM_INDEX_PARALLELISM_PROP} is
* specified as a NON-zero number, then that is used explicitly.
*/
int computeSafeParallelism(Map<String, Long> recordsPerPartition, Map<String, Long> comparisonsPerFileGroup) {
long totalComparisons = comparisonsPerFileGroup.values().stream().mapToLong(Long::longValue).sum();
long totalFiles = comparisonsPerFileGroup.size();
long totalRecords = recordsPerPartition.values().stream().mapToLong(Long::longValue).sum();
int parallelism = (int) (totalComparisons / MAX_ITEMS_PER_SHUFFLE_PARTITION + 1);
logger.info(
"Auto computed parallelism :" + parallelism + ", totalComparisons: " + totalComparisons);
logger.info(String.format("TotalRecords %d, TotalFiles %d, TotalAffectedPartitions %d, TotalComparisons %d, "
+ "SafeParallelism %d", totalRecords, totalFiles, recordsPerPartition.size(), totalComparisons, parallelism));
return parallelism;
}
@@ -329,18 +329,19 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
@VisibleForTesting
JavaPairRDD<String, String> findMatchingFilesForRecordKeys(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD, int totalSubpartitions, HoodieTableMetaClient metaClient) {
int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(),
totalSubpartitions);
JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTableMetaClient metaClient,
Map<String, Long> fileGroupToComparisons) {
JavaPairRDD<String, Tuple2<String, HoodieKey>> fileSortedTripletRDD =
explodeRecordRDDWithFileComparisons(
partitionToFileIndexInfo, partitionRecordKeyPairRDD)
// sort further based on filename, such that all checking for the file can happen within
// a single partition, on-the-fly
.sortByKey(true, joinParallelism);
explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD);
if (config.useBloomIndexBucketizedChecking()) {
BucketizedBloomCheckPartitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism,
fileGroupToComparisons, config.getBloomIndexKeysPerBucket());
fileSortedTripletRDD = fileSortedTripletRDD.repartitionAndSortWithinPartitions(partitioner);
} else {
// sort further based on filename, such that all checking for the file can happen within
// a single partition, on-the-fly
fileSortedTripletRDD = fileSortedTripletRDD.sortByKey(true, shuffleParallelism);
}
return fileSortedTripletRDD.mapPartitionsWithIndex(
new HoodieBloomIndexCheckFunction(metaClient, config.getBasePath()), true)
.flatMap(List::iterator)