1
0

Introduce config to control interval tree pruning

- turned on by default
 - Minor code refactoring/restructuring
This commit is contained in:
Vinoth Chandar
2019-04-18 15:21:52 -07:00
committed by vinoth chandar
parent 7129dc5bb7
commit ea20d47248
9 changed files with 74 additions and 82 deletions

View File

@@ -45,13 +45,12 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
public static final String DEFAULT_BLOOM_INDEX_PRUNE_BY_RANGES = "true";
public static final String BLOOM_INDEX_USE_CACHING_PROP = "hoodie.bloom.index.use.caching";
public static final String DEFAULT_BLOOM_INDEX_USE_CACHING = "true";
public static final String BLOOM_INDEX_TREE_BASED_FILTER_PROP = "hoodie.bloom.index.use.treebased.filter";
public static final String DEFAULT_BLOOM_INDEX_TREE_BASED_FILTER = "true";
public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL =
"hoodie.bloom.index.input.storage" + ".level";
public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
// ***** Bucketed Index Configs *****
public static final String BUCKETED_INDEX_NUM_BUCKETS_PROP = "hoodie.index.bucketed.numbuckets";
private HoodieIndexConfig(Properties props) {
super(props);
}
@@ -114,12 +113,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
return this;
}
public Builder numBucketsPerPartition(int numBuckets) {
props.setProperty(BUCKETED_INDEX_NUM_BUCKETS_PROP, String.valueOf(numBuckets));
public Builder bloomIndexTreebasedFilter(boolean useTreeFilter) {
props.setProperty(BLOOM_INDEX_TREE_BASED_FILTER_PROP, String.valueOf(useTreeFilter));
return this;
}
public Builder withBloomIndexInputStorageLevel(String level) {
props.setProperty(BLOOM_INDEX_INPUT_STORAGE_LEVEL, level);
return this;
@@ -141,6 +139,8 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
BLOOM_INDEX_USE_CACHING_PROP, DEFAULT_BLOOM_INDEX_USE_CACHING);
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_INPUT_STORAGE_LEVEL),
BLOOM_INDEX_INPUT_STORAGE_LEVEL, DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL);
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_TREE_BASED_FILTER_PROP),
BLOOM_INDEX_TREE_BASED_FILTER_PROP, DEFAULT_BLOOM_INDEX_TREE_BASED_FILTER);
// Throws IllegalArgumentException if the value set is not a known Hoodie Index Type
HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP));
return config;

View File

@@ -324,8 +324,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_USE_CACHING_PROP));
}
public int getNumBucketsPerPartition() {
return Integer.parseInt(props.getProperty(HoodieIndexConfig.BUCKETED_INDEX_NUM_BUCKETS_PROP));
public boolean useBloomIndexTreebasedFilter() {
return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_TREE_BASED_FILTER_PROP));
}
public StorageLevel getBloomIndexInputStorageLevel() {

View File

@@ -38,7 +38,6 @@ import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.MetadataNotFoundException;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -304,9 +303,9 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
JavaPairRDD<String, Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
IndexFileFilter indexFileFilter = config.getBloomIndexPruneByRanges()
IndexFileFilter indexFileFilter = config.useBloomIndexTreebasedFilter()
? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
: new SimpleIndexFileFilter(partitionToFileIndexInfo);
: new ListBasedIndexFileFilter(partitionToFileIndexInfo);
return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
String recordKey = partitionRecordKeyPair._2();
String partitionPath = partitionRecordKeyPair._1();

View File

@@ -21,6 +21,7 @@ package com.uber.hoodie.index.bloom;
import com.uber.hoodie.common.BloomFilter;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.HoodieTimer;
import com.uber.hoodie.common.util.ParquetUtils;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIndexException;
@@ -65,11 +66,12 @@ public class HoodieBloomIndexCheckFunction implements
try {
// Load all rowKeys from the file, to double-confirm
if (!candidateRecordKeys.isEmpty()) {
HoodieTimer timer = new HoodieTimer().startTimer();
Set<String> fileRowKeys = ParquetUtils.filterParquetRowKeys(configuration, filePath,
new HashSet<>(candidateRecordKeys));
foundRecordKeys.addAll(fileRowKeys);
logger.info("After checking with row keys, we have " + foundRecordKeys.size()
+ " results, for file " + filePath + " => " + foundRecordKeys);
logger.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size()));
if (logger.isDebugEnabled()) {
logger.debug("Keys matching for file " + filePath + " => " + foundRecordKeys);
}
@@ -98,6 +100,8 @@ public class HoodieBloomIndexCheckFunction implements
private String currentPartitionPath;
private long totalKeysChecked;
LazyKeyCheckIterator(
Iterator<Tuple2<String, Tuple2<String, HoodieKey>>> filePartitionRecordKeyTripletItr) {
super(filePartitionRecordKeyTripletItr);
@@ -105,6 +109,7 @@ public class HoodieBloomIndexCheckFunction implements
candidateRecordKeys = new ArrayList<>();
bloomFilter = null;
currentPartitionPath = null;
totalKeysChecked = 0;
}
@Override
@@ -114,16 +119,42 @@ public class HoodieBloomIndexCheckFunction implements
private void initState(String fileName, String partitionPath) throws HoodieIndexException {
try {
Path filePath = new Path(basePath + "/" + partitionPath + "/" + fileName);
bloomFilter = ParquetUtils
.readBloomFilterFromParquetMetadata(metaClient.getHadoopConf(), filePath);
HoodieTimer timer = new HoodieTimer().startTimer();
bloomFilter = ParquetUtils.readBloomFilterFromParquetMetadata(metaClient.getHadoopConf(), filePath);
logger.info(String.format("Read bloom filter from %s/%s in %d ms", partitionPath, fileName, timer.endTimer()));
candidateRecordKeys = new ArrayList<>();
currentFile = fileName;
currentPartitionPath = partitionPath;
totalKeysChecked = 0;
} catch (Exception e) {
throw new HoodieIndexException("Error checking candidate keys against file.", e);
}
}
// check record key against bloom filter of current file & add to possible keys if needed
private void checkAndAddCandidates(String recordKey) {
if (bloomFilter.mightContain(recordKey)) {
if (logger.isDebugEnabled()) {
logger.debug("Record key " + recordKey + " matches bloom filter in file " + currentPartitionPath
+ "/" + currentFile);
}
candidateRecordKeys.add(recordKey);
}
totalKeysChecked++;
}
private List<String> checkAgainstCurrentFile() {
Path filePath = new Path(basePath + "/" + currentPartitionPath + "/" + currentFile);
if (logger.isDebugEnabled()) {
logger.debug("#The candidate row keys for " + filePath + " => " + candidateRecordKeys);
}
List<String> matchingKeys = checkCandidatesAgainstFile(metaClient.getHadoopConf(), candidateRecordKeys, filePath);
logger.info(String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)",
totalKeysChecked, candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(),
matchingKeys.size()));
return matchingKeys;
}
@Override
protected List<KeyLookupResult> computeNext() {
@@ -131,7 +162,6 @@ public class HoodieBloomIndexCheckFunction implements
try {
// process one file in each go.
while (inputItr.hasNext()) {
Tuple2<String, Tuple2<String, HoodieKey>> currentTuple = inputItr.next();
String fileName = currentTuple._2._1;
String partitionPath = currentTuple._2._2.getPartitionPath();
@@ -142,53 +172,22 @@ public class HoodieBloomIndexCheckFunction implements
initState(fileName, partitionPath);
}
// if continue on current file)
// if continue on current file
if (fileName.equals(currentFile)) {
// check record key against bloom filter of current file & add to possible keys if
// needed
if (bloomFilter.mightContain(recordKey)) {
if (logger.isDebugEnabled()) {
logger.debug("#1 Adding " + recordKey + " as candidate for file " + fileName);
}
candidateRecordKeys.add(recordKey);
}
checkAndAddCandidates(recordKey);
} else {
// do the actual checking of file & break out
Path filePath = new Path(basePath + "/" + currentPartitionPath + "/" + currentFile);
logger.info(
"#1 After bloom filter, the candidate row keys is reduced to " + candidateRecordKeys
.size() + " for " + filePath);
if (logger.isDebugEnabled()) {
logger
.debug("#The candidate row keys for " + filePath + " => " + candidateRecordKeys);
}
ret.add(new KeyLookupResult(currentFile,
checkCandidatesAgainstFile(metaClient.getHadoopConf(), candidateRecordKeys, filePath)));
ret.add(new KeyLookupResult(currentFile, checkAgainstCurrentFile()));
initState(fileName, partitionPath);
if (bloomFilter.mightContain(recordKey)) {
if (logger.isDebugEnabled()) {
logger.debug("#2 Adding " + recordKey + " as candidate for file " + fileName);
}
candidateRecordKeys.add(recordKey);
}
checkAndAddCandidates(recordKey);
break;
}
}
// handle case, where we ran out of input, close pending work, update return val
if (!inputItr.hasNext()) {
Path filePath = new Path(basePath + "/" + currentPartitionPath + "/" + currentFile);
logger.info(
"#2 After bloom filter, the candidate row keys is reduced to " + candidateRecordKeys
.size() + " for " + filePath);
if (logger.isDebugEnabled()) {
logger.debug("#The candidate row keys for " + filePath + " => " + candidateRecordKeys);
}
ret.add(new KeyLookupResult(currentFile,
checkCandidatesAgainstFile(metaClient.getHadoopConf(), candidateRecordKeys, filePath)));
ret.add(new KeyLookupResult(currentFile, checkAgainstCurrentFile()));
}
} catch (Throwable e) {
if (e instanceof HoodieException) {
throw e;

View File

@@ -26,11 +26,12 @@ import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
@@ -85,7 +86,7 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
IndexFileFilter indexFileFilter = config.getBloomIndexPruneByRanges()
? new IntervalTreeBasedGlobalIndexFileFilter(partitionToFileIndexInfo)
: new SimpleGlobalIndexFileFilter(partitionToFileIndexInfo);
: new ListBasedGlobalIndexFileFilter(partitionToFileIndexInfo);
return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
String recordKey = partitionRecordKeyPair._2();
String partitionPath = partitionRecordKeyPair._1();

View File

@@ -18,6 +18,7 @@
package com.uber.hoodie.index.bloom;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -40,8 +41,8 @@ class IntervalTreeBasedGlobalIndexFileFilter implements IndexFileFilter {
* @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo}s
*/
IntervalTreeBasedGlobalIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) {
List<BloomIndexFileInfo> allIndexFiles = partitionToFileIndexInfo.values().stream().flatMap(
indexFiles -> indexFiles.stream()).collect(Collectors.toList());
List<BloomIndexFileInfo> allIndexFiles = partitionToFileIndexInfo.values().stream().flatMap(Collection::stream)
.collect(Collectors.toList());
// Note that the interval tree implementation doesn't have auto-balancing to ensure logN search time.
// So, we are shuffling the input here hoping the tree will not have any skewness. If not, the tree could be skewed
// which could result in N search time instead of NlogN.
@@ -60,9 +61,7 @@ class IntervalTreeBasedGlobalIndexFileFilter implements IndexFileFilter {
public Set<String> getMatchingFiles(String partitionPath, String recordKey) {
Set<String> toReturn = new HashSet<>();
toReturn.addAll(indexLookUpTree.getMatchingIndexFiles(recordKey));
filesWithNoRanges.forEach(indexFile -> {
toReturn.add(indexFile);
});
toReturn.addAll(filesWithNoRanges);
return toReturn;
}
}

View File

@@ -32,7 +32,7 @@ import java.util.Set;
class IntervalTreeBasedIndexFileFilter implements IndexFileFilter {
private final Map<String, KeyRangeLookupTree> partitionToFileIndexLookUpTree = new HashMap<>();
private final Map<String, Set<String>> filesWithNoRanges = new HashMap<>();
private final Map<String, Set<String>> partitionToFilesWithNoRanges = new HashMap<>();
/**
* Instantiates {@link IntervalTreeBasedIndexFileFilter}
@@ -51,10 +51,10 @@ class IntervalTreeBasedIndexFileFilter implements IndexFileFilter {
lookUpTree.insert(new KeyRangeNode(indexFileInfo.getMinRecordKey(),
indexFileInfo.getMaxRecordKey(), indexFileInfo.getFileName()));
} else {
if (!filesWithNoRanges.containsKey(partition)) {
filesWithNoRanges.put(partition, new HashSet<>());
if (!partitionToFilesWithNoRanges.containsKey(partition)) {
partitionToFilesWithNoRanges.put(partition, new HashSet<>());
}
filesWithNoRanges.get(partition).add(indexFileInfo.getFileName());
partitionToFilesWithNoRanges.get(partition).add(indexFileInfo.getFileName());
}
});
partitionToFileIndexLookUpTree.put(partition, lookUpTree);
@@ -64,18 +64,12 @@ class IntervalTreeBasedIndexFileFilter implements IndexFileFilter {
@Override
public Set<String> getMatchingFiles(String partitionPath, String recordKey) {
Set<String> toReturn = new HashSet<>();
if (partitionToFileIndexLookUpTree
.containsKey(partitionPath)) { // could be null, if there are no files in a given partition yet or if all
// index files has no ranges
partitionToFileIndexLookUpTree.get(partitionPath).getMatchingIndexFiles(recordKey)
.forEach(matchingFileName -> {
toReturn.add(matchingFileName);
});
// could be null, if there are no files in a given partition yet or if all index files have no ranges
if (partitionToFileIndexLookUpTree.containsKey(partitionPath)) {
toReturn.addAll(partitionToFileIndexLookUpTree.get(partitionPath).getMatchingIndexFiles(recordKey));
}
if (filesWithNoRanges.containsKey(partitionPath)) {
filesWithNoRanges.get(partitionPath).forEach(fileName -> {
toReturn.add(fileName);
});
if (partitionToFilesWithNoRanges.containsKey(partitionPath)) {
toReturn.addAll(partitionToFilesWithNoRanges.get(partitionPath));
}
return toReturn;
}

View File

@@ -23,14 +23,14 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
class SimpleGlobalIndexFileFilter extends SimpleIndexFileFilter {
class ListBasedGlobalIndexFileFilter extends ListBasedIndexFileFilter {
/**
* Instantiates {@link SimpleGlobalIndexFileFilter}
* Instantiates {@link ListBasedGlobalIndexFileFilter}
*
* @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo}
*/
SimpleGlobalIndexFileFilter(
ListBasedGlobalIndexFileFilter(
Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) {
super(partitionToFileIndexInfo);
}

View File

@@ -27,16 +27,16 @@ import java.util.Set;
* Simple implementation of {@link IndexFileFilter}. Sequentially goes through every index file in a given partition to
* search for potential index files to be searched for a given record key.
*/
class SimpleIndexFileFilter implements IndexFileFilter {
class ListBasedIndexFileFilter implements IndexFileFilter {
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo;
/**
* Instantiates {@link SimpleIndexFileFilter}
* Instantiates {@link ListBasedIndexFileFilter}
*
* @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo}
*/
SimpleIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) {
ListBasedIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) {
this.partitionToFileIndexInfo = partitionToFileIndexInfo;
}